CGATcore Pipeline Module¶
The pipeline
module is the core component of CGAT-core, providing essential functionality for building and executing computational pipelines.
Core Functions¶
Pipeline Decorators¶
@transform(input_files, suffix(".input"), ".output")
def task_function(infile, outfile):
"""Transform a single input file to an output file."""
pass
@merge(input_files, "output.txt")
def merge_task(infiles, outfile):
"""Merge multiple input files into a single output."""
pass
@split(input_file, "*.split")
def split_task(infile, outfiles):
"""Split a single input file into multiple outputs."""
pass
@follows(previous_task)
def dependent_task():
"""Execute after previous_task completes."""
pass
S3-Aware Decorators¶
@s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
"""Process files directly from S3."""
pass
@s3_merge(["s3://bucket/*.txt"], "s3://bucket/merged.txt")
def merge_s3_files(infiles, outfile):
"""Merge multiple S3 files."""
pass
Configuration Functions¶
Pipeline Setup¶
# Initialize pipeline
pipeline.initialize(options)
# Get pipeline parameters
params = pipeline.get_params()
# Configure cluster execution
pipeline.setup_cluster()
Resource Management¶
# Set memory requirements
pipeline.set_job_memory("4G")
# Set CPU requirements
pipeline.set_job_threads(4)
# Configure temporary directory
pipeline.set_tmpdir("/path/to/tmp")
Execution Functions¶
Running Tasks¶
# Execute a command
pipeline.run("samtools sort input.bam")
# Submit a Python function
pipeline.submit(
module="my_module",
function="process_data",
infiles="input.txt",
outfiles="output.txt"
)
Job Control¶
# Check job status
pipeline.is_running(job_id)
# Wait for job completion
pipeline.wait_for_jobs()
# Clean up temporary files
pipeline.cleanup()
Error Handling¶
Best Practices¶
- Resource Management
- Always specify memory and CPU requirements
- Use appropriate cluster queue settings
-
Clean up temporary files
-
Error Handling
- Implement proper error checking
- Use pipeline.log for logging
-
Handle temporary file cleanup
-
Performance
- Use appropriate chunk sizes for parallel processing
- Monitor resource usage
- Optimize cluster settings
For more details, see the Pipeline Overview and Writing Workflows guides.
pipeline.py - Tools for CGAT Ruffus Pipelines¶
This module provides a comprehensive set of tools to facilitate the creation and management of data processing pipelines using CGAT Ruffus. It includes functionalities for:
- Pipeline Control
- Task execution and dependency management
- Command-line interface for pipeline operations
-
Logging and error handling
-
Resource Management
- Cluster job submission and monitoring
- Memory and CPU allocation
-
Temporary file handling
-
Configuration
- Parameter management via YAML configuration
- Cluster settings customization
-
Pipeline state persistence
-
Cloud Integration
- AWS S3 support for input/output files
- Cloud-aware pipeline decorators
- Remote file handling
Example Usage¶
A basic pipeline using local files:
.. code-block:: python
from cgatcore import pipeline as P
# Standard pipeline task
@P.transform("input.txt", suffix(".txt"), ".processed")
def process_local_file(infile, outfile):
# Processing logic here
pass
Using S3 integration:
.. code-block:: python
# S3-aware pipeline task
@P.s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
# Processing logic here
pass
For detailed documentation, see: https://cgat-core.readthedocs.io/
get_s3_pipeline()
¶
Instantiate and return the S3Pipeline instance, lazy-loaded to avoid circular imports.