Skip to content

CGATcore Pipeline Module

The pipeline module is the core component of CGAT-core, providing essential functionality for building and executing computational pipelines.

Core Functions

Pipeline Decorators

@transform(input_files, suffix(".input"), ".output")
def task_function(infile, outfile):
    """Transform a single input file to an output file."""
    pass

@merge(input_files, "output.txt")
def merge_task(infiles, outfile):
    """Merge multiple input files into a single output."""
    pass

@split(input_file, "*.split")
def split_task(infile, outfiles):
    """Split a single input file into multiple outputs."""
    pass

@follows(previous_task)
def dependent_task():
    """Execute after previous_task completes."""
    pass

S3-Aware Decorators

@s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
    """Process files directly from S3."""
    pass

@s3_merge(["s3://bucket/*.txt"], "s3://bucket/merged.txt")
def merge_s3_files(infiles, outfile):
    """Merge multiple S3 files."""
    pass

Configuration Functions

Pipeline Setup

# Initialize pipeline
pipeline.initialize(options)

# Get pipeline parameters
params = pipeline.get_params()

# Configure cluster execution
pipeline.setup_cluster()

Resource Management

# Set memory requirements
pipeline.set_job_memory("4G")

# Set CPU requirements
pipeline.set_job_threads(4)

# Configure temporary directory
pipeline.set_tmpdir("/path/to/tmp")

Execution Functions

Running Tasks

# Execute a command
pipeline.run("samtools sort input.bam")

# Submit a Python function
pipeline.submit(
    module="my_module",
    function="process_data",
    infiles="input.txt",
    outfiles="output.txt"
)

Job Control

# Check job status
pipeline.is_running(job_id)

# Wait for job completion
pipeline.wait_for_jobs()

# Clean up temporary files
pipeline.cleanup()

Error Handling

try:
    pipeline.run("risky_command")
except pipeline.PipelineError as e:
    pipeline.handle_error(e)

Best Practices

  1. Resource Management
  2. Always specify memory and CPU requirements
  3. Use appropriate cluster queue settings
  4. Clean up temporary files

  5. Error Handling

  6. Implement proper error checking
  7. Use pipeline.log for logging
  8. Handle temporary file cleanup

  9. Performance

  10. Use appropriate chunk sizes for parallel processing
  11. Monitor resource usage
  12. Optimize cluster settings

For more details, see the Pipeline Overview and Writing Workflows guides.

pipeline.py - Tools for CGAT Ruffus Pipelines

This module provides a comprehensive set of tools to facilitate the creation and management of data processing pipelines using CGAT Ruffus. It includes functionalities for:

  1. Pipeline Control
  2. Task execution and dependency management
  3. Command-line interface for pipeline operations
  4. Logging and error handling

  5. Resource Management

  6. Cluster job submission and monitoring
  7. Memory and CPU allocation
  8. Temporary file handling

  9. Configuration

  10. Parameter management via YAML configuration
  11. Cluster settings customization
  12. Pipeline state persistence

  13. Cloud Integration

  14. AWS S3 support for input/output files
  15. Cloud-aware pipeline decorators
  16. Remote file handling

Example Usage

A basic pipeline using local files:

.. code-block:: python

from cgatcore import pipeline as P

# Standard pipeline task
@P.transform("input.txt", suffix(".txt"), ".processed")
def process_local_file(infile, outfile):
    # Processing logic here
    pass

Using S3 integration:

.. code-block:: python

# S3-aware pipeline task
@P.s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
    # Processing logic here
    pass

For detailed documentation, see: https://cgat-core.readthedocs.io/

get_s3_pipeline()

Instantiate and return the S3Pipeline instance, lazy-loaded to avoid circular imports.

Source code in cgatcore/pipeline/__init__.py
def get_s3_pipeline():
    """Instantiate and return the S3Pipeline instance, lazy-loaded to avoid circular imports."""
    # Use get_remote() to access the remote functionality
    remote = cgatcore.get_remote()  # Now properly calls the method to initialize remote if needed
    return remote.file_handler.S3Pipeline()