Skip to content

CGATcore Pipeline Module

The pipeline module is the core component of CGAT-core, providing essential functionality for building and executing computational pipelines.

Core Functions

Pipeline Decorators

@transform(input_files, suffix(".input"), ".output")
def task_function(infile, outfile):
    """Transform a single input file to an output file."""
    pass

@merge(input_files, "output.txt")
def merge_task(infiles, outfile):
    """Merge multiple input files into a single output."""
    pass

@split(input_file, "*.split")
def split_task(infile, outfiles):
    """Split a single input file into multiple outputs."""
    pass

@follows(previous_task)
def dependent_task():
    """Execute after previous_task completes."""
    pass

S3-Aware Decorators

@s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
    """Process files directly from S3."""
    pass

@s3_merge(["s3://bucket/*.txt"], "s3://bucket/merged.txt")
def merge_s3_files(infiles, outfile):
    """Merge multiple S3 files."""
    pass

Configuration Functions

Pipeline Setup

# Initialize pipeline
pipeline.initialize(options)

# Get pipeline parameters
params = pipeline.get_params()

# Configure cluster execution
pipeline.setup_cluster()

Resource Management

# Set memory requirements
pipeline.set_job_memory("4G")

# Set CPU requirements
pipeline.set_job_threads(4)

# Configure temporary directory
pipeline.set_tmpdir("/path/to/tmp")

Execution Functions

Running Tasks

# Execute a command
pipeline.run("samtools sort input.bam")

# Submit a Python function
pipeline.submit(
    module="my_module",
    function="process_data",
    infiles="input.txt",
    outfiles="output.txt"
)

Job Control

# Check job status
pipeline.is_running(job_id)

# Wait for job completion
pipeline.wait_for_jobs()

# Clean up temporary files
pipeline.cleanup()

Error Handling

try:
    pipeline.run("risky_command")
except pipeline.PipelineError as e:
    pipeline.handle_error(e)

Best Practices

  1. Resource Management
  2. Always specify memory and CPU requirements
  3. Use appropriate cluster queue settings
  4. Clean up temporary files

  5. Error Handling

  6. Implement proper error checking
  7. Use pipeline.log for logging
  8. Handle temporary file cleanup

  9. Performance

  10. Use appropriate chunk sizes for parallel processing
  11. Monitor resource usage
  12. Optimize cluster settings

For more details, see the Pipeline Overview and Writing Workflows guides.

pipeline.py - Tools for CGAT Ruffus Pipelines

This module provides a comprehensive set of tools to facilitate the creation and management of data processing pipelines using CGAT Ruffus. It includes functionalities for:

  1. Pipeline Control
  2. Task execution and dependency management
  3. Command-line interface for pipeline operations
  4. Logging and error handling

  5. Resource Management

  6. Cluster job submission and monitoring
  7. Memory and CPU allocation
  8. Temporary file handling

  9. Configuration

  10. Parameter management via YAML configuration
  11. Cluster settings customization
  12. Pipeline state persistence

  13. Cloud Integration

  14. AWS S3 support for input/output files
  15. Cloud-aware pipeline decorators
  16. Remote file handling

Example Usage

A basic pipeline using local files:

.. code-block:: python

from cgatcore import pipeline as P

# Standard pipeline task
@P.transform("input.txt", suffix(".txt"), ".processed")
def process_local_file(infile, outfile):
    # Processing logic here
    pass

Using S3 integration:

.. code-block:: python

# S3-aware pipeline task
@P.s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
    # Processing logic here
    pass

For detailed documentation, see: https://cgat-core.readthedocs.io/

ContainerConfig

Container configuration for pipeline execution.

Source code in cgatcore/pipeline/execution.py
class ContainerConfig:
    """Container configuration for pipeline execution."""

    def __init__(self, image=None, volumes=None, env_vars=None, runtime="docker"):
        """
        Args:
            image (str): Container image (e.g., "ubuntu:20.04").
            volumes (list): Volume mappings (e.g., ['/data:/data']).
            env_vars (dict): Environment variables for the container.
            runtime (str): Container runtime ("docker" or "singularity").
        """
        self.image = image
        self.volumes = volumes or []
        self.env_vars = env_vars or {}
        self.runtime = runtime.lower()  # Normalise to lowercase

        if self.runtime not in ["docker", "singularity"]:
            raise ValueError("Unsupported container runtime: {}".format(self.runtime))

    def get_container_command(self, statement):
        """Convert a statement to run inside a container."""
        if not self.image:
            return statement

        if self.runtime == "docker":
            return self._get_docker_command(statement)
        elif self.runtime == "singularity":
            return self._get_singularity_command(statement)
        else:
            raise ValueError("Unsupported container runtime: {}".format(self.runtime))

    def _get_docker_command(self, statement):
        """Generate a Docker command."""
        volume_args = [f"-v {volume}" for volume in self.volumes]
        env_args = [f"-e {key}={value}" for key, value in self.env_vars.items()]

        return " ".join([
            "docker", "run", "--rm",
            *volume_args, *env_args, self.image,
            "/bin/bash", "-c", f"'{statement}'"
        ])

    def _get_singularity_command(self, statement):
        """Generate a Singularity command."""
        volume_args = [f"--bind {volume}" for volume in self.volumes]
        env_args = [f"--env {key}={value}" for key, value in self.env_vars.items()]

        return " ".join([
            "singularity", "exec",
            *volume_args, *env_args, self.image,
            "bash", "-c", f"'{statement}'"
        ])

__init__(image=None, volumes=None, env_vars=None, runtime='docker')

Parameters:

Name Type Description Default
image str

Container image (e.g., "ubuntu:20.04").

None
volumes list

Volume mappings (e.g., ['/data:/data']).

None
env_vars dict

Environment variables for the container.

None
runtime str

Container runtime ("docker" or "singularity").

'docker'
Source code in cgatcore/pipeline/execution.py
def __init__(self, image=None, volumes=None, env_vars=None, runtime="docker"):
    """
    Args:
        image (str): Container image (e.g., "ubuntu:20.04").
        volumes (list): Volume mappings (e.g., ['/data:/data']).
        env_vars (dict): Environment variables for the container.
        runtime (str): Container runtime ("docker" or "singularity").
    """
    self.image = image
    self.volumes = volumes or []
    self.env_vars = env_vars or {}
    self.runtime = runtime.lower()  # Normalise to lowercase

    if self.runtime not in ["docker", "singularity"]:
        raise ValueError("Unsupported container runtime: {}".format(self.runtime))

get_container_command(statement)

Convert a statement to run inside a container.

Source code in cgatcore/pipeline/execution.py
def get_container_command(self, statement):
    """Convert a statement to run inside a container."""
    if not self.image:
        return statement

    if self.runtime == "docker":
        return self._get_docker_command(statement)
    elif self.runtime == "singularity":
        return self._get_singularity_command(statement)
    else:
        raise ValueError("Unsupported container runtime: {}".format(self.runtime))

DRMAACluster

Bases: object

Source code in cgatcore/pipeline/cluster.py
class DRMAACluster(object):

    # dictionary mapping resource usage fields returned by DRMAA
    # to a common set of names.
    map_drmaa2benchmark_data = {}

    def __init__(self, session, ignore_errors=False):
        self.session = session
        self.ignore_errors = ignore_errors

    def get_resource_usage(self, job_id, retval, hostname):
        retval.resourceUsage["hostname"] = hostname
        return [retval]

    def setup_drmaa_job_template(self,
                                 drmaa_session,
                                 job_name,
                                 job_memory,
                                 job_threads,
                                 working_directory,
                                 **kwargs):
        '''Sets up a Drmma job template. Currently SGE, SLURM, Torque and PBSPro are
        supported'''
        if not job_memory:
            raise ValueError("Job memory must be specified when running"
                             "DRMAA jobs")

        jt = drmaa_session.createJobTemplate()
        jt.workingDirectory = working_directory
        jt.jobEnvironment = {'BASH_ENV': '~/.bashrc'}
        jt.args = []
        if not re.match("[a-zA-Z]", job_name[0]):
            job_name = "_" + job_name

        spec = self.get_native_specification(job_name,
                                             job_memory,
                                             job_threads,
                                             **kwargs)

        jt.nativeSpecification = " ".join(spec)

        # keep stdout and stderr separate
        jt.joinFiles = False

        self.update_template(jt)
        return jt

    def update_template(self, jt):
        pass

    def collect_single_job_from_cluster(self,
                                        job_id,
                                        statement,
                                        stdout_path, stderr_path,
                                        job_path):
        '''collects a single job on the cluster.

        This method waits until a job has completed and returns
        stdout, stderr and resource usage.
        '''
        try:
            retval = self.session.wait(
                job_id, drmaa.Session.TIMEOUT_WAIT_FOREVER)
        except Exception as msg:
            # ignore message 24, indicates jobs that have been qdel'ed
            if not str(msg).startswith("code 24"):
                raise
            retval = None

        stdout, stderr = self.get_drmaa_job_stdout_stderr(
            stdout_path, stderr_path)

        if retval is not None:
            error_msg = None
            if retval.exitStatus == 0:
                if retval.wasAborted is True:
                    error_msg = (
                        "Job {} has exit status 0, but marked as hasAborted=True, hasExited={} "
                        "(Job may have been cancelled by the user or the scheduler due to memory constraints)"
                        "The stderr was \n{}\nstatement = {}".format(
                            job_id, retval.hasExited, "".join(stderr), statement))
                if retval.hasSignal is True:
                    error_msg = ("Job {} has zero exitStatus {} but received signal: hasExited={},  wasAborted={}"
                                 "hasSignal={}, terminatedSignal='{}' "
                                 "\nstatement = {}".format(
                                     job_id, retval.exitStatus, retval.hasExited, retval.wasAborted,
                                     retval.hasSignal, retval.terminatedSignal,
                                     statement))
            else:
                error_msg = ("Job {} has non-zero exitStatus {}: hasExited={},  wasAborted={}"
                             "hasSignal={}, terminatedSignal='{}' "
                             "\nstatement = {}".format(
                                 job_id, retval.exitStatus, retval.hasExited, retval.wasAborted,
                                 retval.hasSignal, retval.terminatedSignal,
                                 statement))

            if error_msg:
                if stderr:
                    error_msg += "\n stderr = {}".format("".join(stderr))
                if self.ignore_errors:
                    get_logger().warning(error_msg)
                else:
                    raise OSError(error_msg)
        else:
            retval = JobInfo(job_id, {})

        # get hostname from job script
        try:
            hostname = stdout[-3][:-1]
        except IndexError:
            hostname = "unknown"

        try:
            resource_usage = self.get_resource_usage(job_id, retval, hostname)
        except (ValueError, KeyError, TypeError, IndexError) as ex:
            E.warn("could not collect resource usage for job {}: {}".format(job_id, ex))
            retval.resourceUsage["hostname"] = hostname
            resource_usage = [retval]

        try:
            os.unlink(job_path)
        except OSError:
            self.logger.warn(
                ("temporary job file %s not present for "
                 "clean-up - ignored") % job_path)

        return stdout, stderr, resource_usage

    def get_drmaa_job_stdout_stderr(self, stdout_path, stderr_path,
                                    tries=5, encoding="utf-8"):
        '''get stdout/stderr allowing for some lag.

        Try at most *tries* times. If unsuccessfull, throw OSError

        Removes the files once they are read.

        Returns tuple of stdout and stderr as unicode strings.
        '''
        x = tries
        while x >= 0:
            if os.path.exists(stdout_path):
                break
            gevent.sleep(GEVENT_TIMEOUT_WAIT)
            x -= 1

        x = tries
        while x >= 0:
            if os.path.exists(stderr_path):
                break
            gevent.sleep(GEVENT_TIMEOUT_WAIT)
            x -= 1

        try:
            with open(stdout_path, "r", encoding=encoding) as inf:
                stdout = inf.readlines()
        except IOError as msg:
            get_logger().warning("could not open stdout: %s" % msg)
            stdout = []

        try:
            with open(stderr_path, "r", encoding=encoding) as inf:
                stderr = inf.readlines()
        except IOError as msg:
            get_logger().warning("could not open stdout: %s" % msg)
            stderr = []

        try:
            os.unlink(stdout_path)
            os.unlink(stderr_path)
        except OSError as msg:
            pass

        return stdout, stderr

    def set_drmaa_job_paths(self, job_template, job_path):
        '''Adds the job_path, stdout_path and stderr_paths
           to the job_template.
        '''
        job_path = os.path.abspath(job_path)
        os.chmod(job_path, stat.S_IRWXG | stat.S_IRWXU)

        stdout_path = job_path + ".stdout"
        stderr_path = job_path + ".stderr"

        job_template.remoteCommand = job_path
        job_template.outputPath = ":" + stdout_path
        job_template.errorPath = ":" + stderr_path

        return stdout_path, stderr_path

    def map_resource_usage(self, resource_usage, data2type):
        """return job metrics mapped to common name and converted to right type."""
        def _convert(key, v, tpe):
            if v is None:
                return None
            else:
                try:
                    return tpe(v)
                except ValueError as ex:
                    E.warning("could not convert {} with value '{}' to {}: {}".format(
                        key, v, tpe, ex))
                    return v

        return dict([(key,
                      _convert(key, resource_usage.get(self.map_drmaa2benchmark_data.get(key, key), None), tpe))
                     for key, tpe in data2type.items()])

collect_single_job_from_cluster(job_id, statement, stdout_path, stderr_path, job_path)

collects a single job on the cluster.

This method waits until a job has completed and returns stdout, stderr and resource usage.

Source code in cgatcore/pipeline/cluster.py
def collect_single_job_from_cluster(self,
                                    job_id,
                                    statement,
                                    stdout_path, stderr_path,
                                    job_path):
    '''collects a single job on the cluster.

    This method waits until a job has completed and returns
    stdout, stderr and resource usage.
    '''
    try:
        retval = self.session.wait(
            job_id, drmaa.Session.TIMEOUT_WAIT_FOREVER)
    except Exception as msg:
        # ignore message 24, indicates jobs that have been qdel'ed
        if not str(msg).startswith("code 24"):
            raise
        retval = None

    stdout, stderr = self.get_drmaa_job_stdout_stderr(
        stdout_path, stderr_path)

    if retval is not None:
        error_msg = None
        if retval.exitStatus == 0:
            if retval.wasAborted is True:
                error_msg = (
                    "Job {} has exit status 0, but marked as hasAborted=True, hasExited={} "
                    "(Job may have been cancelled by the user or the scheduler due to memory constraints)"
                    "The stderr was \n{}\nstatement = {}".format(
                        job_id, retval.hasExited, "".join(stderr), statement))
            if retval.hasSignal is True:
                error_msg = ("Job {} has zero exitStatus {} but received signal: hasExited={},  wasAborted={}"
                             "hasSignal={}, terminatedSignal='{}' "
                             "\nstatement = {}".format(
                                 job_id, retval.exitStatus, retval.hasExited, retval.wasAborted,
                                 retval.hasSignal, retval.terminatedSignal,
                                 statement))
        else:
            error_msg = ("Job {} has non-zero exitStatus {}: hasExited={},  wasAborted={}"
                         "hasSignal={}, terminatedSignal='{}' "
                         "\nstatement = {}".format(
                             job_id, retval.exitStatus, retval.hasExited, retval.wasAborted,
                             retval.hasSignal, retval.terminatedSignal,
                             statement))

        if error_msg:
            if stderr:
                error_msg += "\n stderr = {}".format("".join(stderr))
            if self.ignore_errors:
                get_logger().warning(error_msg)
            else:
                raise OSError(error_msg)
    else:
        retval = JobInfo(job_id, {})

    # get hostname from job script
    try:
        hostname = stdout[-3][:-1]
    except IndexError:
        hostname = "unknown"

    try:
        resource_usage = self.get_resource_usage(job_id, retval, hostname)
    except (ValueError, KeyError, TypeError, IndexError) as ex:
        E.warn("could not collect resource usage for job {}: {}".format(job_id, ex))
        retval.resourceUsage["hostname"] = hostname
        resource_usage = [retval]

    try:
        os.unlink(job_path)
    except OSError:
        self.logger.warn(
            ("temporary job file %s not present for "
             "clean-up - ignored") % job_path)

    return stdout, stderr, resource_usage

get_drmaa_job_stdout_stderr(stdout_path, stderr_path, tries=5, encoding='utf-8')

get stdout/stderr allowing for some lag.

Try at most tries times. If unsuccessfull, throw OSError

Removes the files once they are read.

Returns tuple of stdout and stderr as unicode strings.

Source code in cgatcore/pipeline/cluster.py
def get_drmaa_job_stdout_stderr(self, stdout_path, stderr_path,
                                tries=5, encoding="utf-8"):
    '''get stdout/stderr allowing for some lag.

    Try at most *tries* times. If unsuccessfull, throw OSError

    Removes the files once they are read.

    Returns tuple of stdout and stderr as unicode strings.
    '''
    x = tries
    while x >= 0:
        if os.path.exists(stdout_path):
            break
        gevent.sleep(GEVENT_TIMEOUT_WAIT)
        x -= 1

    x = tries
    while x >= 0:
        if os.path.exists(stderr_path):
            break
        gevent.sleep(GEVENT_TIMEOUT_WAIT)
        x -= 1

    try:
        with open(stdout_path, "r", encoding=encoding) as inf:
            stdout = inf.readlines()
    except IOError as msg:
        get_logger().warning("could not open stdout: %s" % msg)
        stdout = []

    try:
        with open(stderr_path, "r", encoding=encoding) as inf:
            stderr = inf.readlines()
    except IOError as msg:
        get_logger().warning("could not open stdout: %s" % msg)
        stderr = []

    try:
        os.unlink(stdout_path)
        os.unlink(stderr_path)
    except OSError as msg:
        pass

    return stdout, stderr

map_resource_usage(resource_usage, data2type)

return job metrics mapped to common name and converted to right type.

Source code in cgatcore/pipeline/cluster.py
def map_resource_usage(self, resource_usage, data2type):
    """return job metrics mapped to common name and converted to right type."""
    def _convert(key, v, tpe):
        if v is None:
            return None
        else:
            try:
                return tpe(v)
            except ValueError as ex:
                E.warning("could not convert {} with value '{}' to {}: {}".format(
                    key, v, tpe, ex))
                return v

    return dict([(key,
                  _convert(key, resource_usage.get(self.map_drmaa2benchmark_data.get(key, key), None), tpe))
                 for key, tpe in data2type.items()])

set_drmaa_job_paths(job_template, job_path)

Adds the job_path, stdout_path and stderr_paths to the job_template.

Source code in cgatcore/pipeline/cluster.py
def set_drmaa_job_paths(self, job_template, job_path):
    '''Adds the job_path, stdout_path and stderr_paths
       to the job_template.
    '''
    job_path = os.path.abspath(job_path)
    os.chmod(job_path, stat.S_IRWXG | stat.S_IRWXU)

    stdout_path = job_path + ".stdout"
    stderr_path = job_path + ".stderr"

    job_template.remoteCommand = job_path
    job_template.outputPath = ":" + stdout_path
    job_template.errorPath = ":" + stderr_path

    return stdout_path, stderr_path

setup_drmaa_job_template(drmaa_session, job_name, job_memory, job_threads, working_directory, **kwargs)

Sets up a Drmma job template. Currently SGE, SLURM, Torque and PBSPro are supported

Source code in cgatcore/pipeline/cluster.py
def setup_drmaa_job_template(self,
                             drmaa_session,
                             job_name,
                             job_memory,
                             job_threads,
                             working_directory,
                             **kwargs):
    '''Sets up a Drmma job template. Currently SGE, SLURM, Torque and PBSPro are
    supported'''
    if not job_memory:
        raise ValueError("Job memory must be specified when running"
                         "DRMAA jobs")

    jt = drmaa_session.createJobTemplate()
    jt.workingDirectory = working_directory
    jt.jobEnvironment = {'BASH_ENV': '~/.bashrc'}
    jt.args = []
    if not re.match("[a-zA-Z]", job_name[0]):
        job_name = "_" + job_name

    spec = self.get_native_specification(job_name,
                                         job_memory,
                                         job_threads,
                                         **kwargs)

    jt.nativeSpecification = " ".join(spec)

    # keep stdout and stderr separate
    jt.joinFiles = False

    self.update_template(jt)
    return jt

EventPool

Bases: Pool

Source code in cgatcore/pipeline/control.py
class EventPool(gevent.pool.Pool):

    def __len__(self):
        """make sure that pool always evaluates to true."""
        line = gevent.pool.Pool.__len__(self)
        if not line:
            return 1
        return line

    def close(self):
        pass

    def terminate(self):
        self.kill()

__len__()

make sure that pool always evaluates to true.

Source code in cgatcore/pipeline/control.py
def __len__(self):
    """make sure that pool always evaluates to true."""
    line = gevent.pool.Pool.__len__(self)
    if not line:
        return 1
    return line

Executor

Bases: object

Source code in cgatcore/pipeline/execution.py
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
class Executor(object):

    def __init__(self, **kwargs):

        self.logger = get_logger()
        self.queue_manager = None
        self.run_on_cluster = will_run_on_cluster(kwargs)
        self.job_threads = kwargs.get("job_threads", 1)
        self.active_jobs = []  # List to track active jobs

        if "job_memory" in kwargs and "job_total_memory" in kwargs:
            raise ValueError(
                "both job_memory and job_total_memory have been given")

        self.job_total_memory = kwargs.get('job_total_memory', None)
        self.job_memory = kwargs.get('job_memory', None)

        if self.job_total_memory == "unlimited" or self.job_memory == "unlimited":
            self.job_total_memory = self.job_memory = "unlimited"
        else:
            if self.job_total_memory:
                self.job_memory = iotools.bytes2human(
                    iotools.human2bytes(self.job_total_memory) / self.job_threads)
            elif self.job_memory:
                self.job_total_memory = self.job_memory * self.job_threads
            else:
                self.job_memory = get_params()["cluster"].get(
                    "memory_default", "4G")
                if self.job_memory == "unlimited":
                    self.job_total_memory = "unlimited"
                else:
                    self.job_total_memory = self.job_memory * self.job_threads

        self.ignore_pipe_errors = kwargs.get('ignore_pipe_errors', False)
        self.ignore_errors = kwargs.get('ignore_errors', False)

        self.job_name = kwargs.get("job_name", "unknow_job_name")
        self.task_name = kwargs.get("task_name", "unknown_task_name")

        # deduce output directory/directories, requires somewhat
        # consistent naming in the calling function.
        outfiles = []
        if "outfile" in kwargs:
            outfiles.append(kwargs["outfile"])
        if "outfiles" in kwargs:
            outfiles.extend(kwargs["outfiles"])

        self.output_directories = set(sorted(
            [os.path.dirname(x) for x in outfiles]))

        self.options = kwargs

        self.work_dir = get_params()["work_dir"]

        self.shellfile = kwargs.get("shell_logfile", None)
        if self.shellfile:
            if not self.shellfile.startswith(os.sep):
                self.shellfile = os.path.join(
                    self.work_dir, os.path.basename(self.shellfile))

        self.monitor_interval_queued = kwargs.get('monitor_interval_queued', None)
        if self.monitor_interval_queued is None:
            self.monitor_interval_queued = get_params()["cluster"].get(
                'monitor_interval_queued_default', GEVENT_TIMEOUT_WAIT)
        self.monitor_interval_running = kwargs.get('monitor_interval_running', None)
        if self.monitor_interval_running is None:
            self.monitor_interval_running = get_params()["cluster"].get(
                'monitor_interval_running_default', GEVENT_TIMEOUT_WAIT)
        # Set up signal handlers for clean-up on interruption
        self.setup_signal_handlers()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass

    def expand_statement(self, statement):
        '''add generic commands before and after statement.

        The method scans the statement for arvados mount points and
        inserts appropriate prefixes to make sure that the mount point
        exists.

        Arguments
        ---------
        statement : string
            Command line statement to expand

        Returns
        -------
        statement : string
            The expanded statement.

        '''

        setup_cmds = []
        teardown_cmds = []
        cleanup_funcs = []

        setup_cmds.append("umask 002")

        for var in ["MKL_NUM_THREADS",
                    "OPENBLAS_NUM_THREADS",
                    "OMP_NUM_THREADS"]:
            setup_cmds.append("export {}={}".format(
                var, self.options.get("job_threads", 1)))

        if "arv=" in statement:

            # Todo: permit setting this in params
            arvados_api_token = os.environ.get("ARVADOS_API_TOKEN", None)
            arvados_api_host = os.environ.get("ARVADOS_API_HOST", None)
            if not arvados_api_token:
                raise ValueError(
                    "arvados mount encountered in statement {}, "
                    "but ARVADOS_API_TOKEN not defined".format(statement))

            if not arvados_api_host:
                raise ValueError(
                    "arvados mount encountered in statement {}, "
                    "but ARVADOS_API_HOST not defined".format(statement))

            mountpoint = get_temp_filename(clear=True)

            arvados_options = "--disable-event-listening --read-only"
            setup_cmds.append("\n".join(
                ('export ARVADOS_API_TOKEN="{arvados_api_token}"',
                 'export ARVADOS_API_HOST="{arvados_api_host}"',
                 'export ARVADOS_API_HOST_INSECURE=true',
                 'export ARVADOS_MOUNT_POINT="{mountpoint}"',
                 'mkdir -p "{mountpoint}"',
                 'arv-mount {arvados_options} "{mountpoint}" 2>> /dev/null')).format(**locals()))

            statement = re.sub("arv=", mountpoint + "/", statement)

            # "arv-mount --unmount {mountpoint}" not available in newer
            # arvados installs (0.1.20170707152712), so keep using
            # fusermount. However, do not fail if you can't clean up, as
            # there are arvados racing issues.
            cleanup_funcs.append(("unmount_arvados",
                                  '''{{
                                  set +e &&
                                  fusermount -u {mountpoint} &&
                                  rm -rf {mountpoint} &&
                                  set -e
                                  }}'''.format(**locals())))

        if "job_condaenv" in self.options:
            # In conda < 4.4 there is an issue with parallel activations,
            # see https://github.com/conda/conda/issues/2837 .
            # This has been fixed in conda 4.4, but we are on conda
            # 4.3, presumably because we are still on py35. A work-around
            # to source activate is to add the explicit path of the environment
            # in version >= 4.4, do
            # setup_cmds.append(
            #     "conda activate {}".format(self.options["job_condaenv"]))
            # For old conda versions (note this will not work for tools that require
            # additional environment variables)
            setup_cmds.append(
                "export PATH={}:$PATH".format(
                    os.path.join(
                        get_conda_environment_directory(
                            self.options["job_condaenv"]),
                        "bin")))

        statement = "\n".join((
            "\n".join(setup_cmds),
            statement,
            "\n".join(teardown_cmds)))

        return statement, cleanup_funcs

    def build_job_script(self,
                         statement):
        '''build job script from statement.

        returns (name_of_script, stdout_path, stderr_path)
        '''
        tmpfilename = get_temp_filename(dir=self.work_dir, clear=True)
        tmpfilename = tmpfilename + ".sh"

        expanded_statement, cleanup_funcs = self.expand_statement(statement)

        with open(tmpfilename, "w") as tmpfile:
            # disabled: -l -O expand_aliases\n" )

            # make executable
            tmpfile.write("#!/bin/bash -eu\n")
            if not self.ignore_pipe_errors:
                tmpfile.write("set -o pipefail\n")

            os.chmod(tmpfilename, stat.S_IRWXG | stat.S_IRWXU)

            tmpfile.write("\ncd {}\n".format(self.work_dir))
            if self.output_directories is not None:
                for outdir in self.output_directories:
                    if outdir:
                        tmpfile.write("\nmkdir -p {}\n".format(outdir))

            # create and set system scratch dir for temporary files
            tmpfile.write("umask 002\n")

            cluster_tmpdir = get_params()["cluster_tmpdir"]

            if self.run_on_cluster and cluster_tmpdir:
                tmpdir = cluster_tmpdir
                tmpfile.write("TMPDIR=`mktemp -d -p {}`\n".format(tmpdir))
                tmpfile.write("export TMPDIR\n")
            else:
                tmpdir = get_temp_dir(dir=get_params()["tmpdir"],
                                      clear=True)
                tmpfile.write("mkdir -p {}\n".format(tmpdir))
                tmpfile.write("export TMPDIR={}\n".format(tmpdir))

            cleanup_funcs.append(
                ("clean_temp",
                 "{{ rm -rf {}; }}".format(tmpdir)))

            # output times whenever script exits, preserving
            # return status
            cleanup_funcs.append(("info",
                                  "{ echo 'benchmark'; hostname; times; }"))
            for cleanup_func, cleanup_code in cleanup_funcs:
                tmpfile.write("\n{}() {}\n".format(cleanup_func, cleanup_code))

            tmpfile.write("\nclean_all() {{ {}; }}\n".format(
                "; ".join([x[0] for x in cleanup_funcs])))

            tmpfile.write("\ntrap clean_all EXIT\n\n")

            if self.job_memory not in ("unlimited", "etc") and \
               self.options.get("cluster_memory_ulimit", False):
                # restrict virtual memory
                # Note that there are resources in SGE which could do this directly
                # such as v_hmem.
                # Note that limiting resident set sizes (RSS) with ulimit is not
                # possible in newer kernels.
                # -v and -m accept memory in kb
                requested_memory_kb = max(
                    1000,
                    int(math.ceil(
                        iotools.human2bytes(self.job_memory) / 1024 * self.job_threads)))
                # unsetting error exit as often not permissions
                tmpfile.write("set +e\n")
                tmpfile.write("ulimit -v {} > /dev/null \n".format(
                    requested_memory_kb))
                tmpfile.write("ulimit -m {} > /dev/null \n".format(
                    requested_memory_kb))
                # set as hard limit
                tmpfile.write("ulimit -H -v > /dev/null \n")
                tmpfile.write("set -e\n")

            if self.shellfile:

                # make sure path exists that we want to write to
                tmpfile.write("mkdir -p $(dirname \"{}\")\n".format(
                    self.shellfile))

                # output low-level debugging information to a shell log file
                tmpfile.write(
                    'echo "%s : START -> %s" >> %s\n' %
                    (self.job_name, tmpfilename, self.shellfile))
                # disabled - problems with quoting
                # tmpfile.write( '''echo 'statement=%s' >> %s\n''' %
                # (shellquote(statement), self.shellfile) )
                tmpfile.write("set | sed 's/^/%s : /' >> %s\n" %
                              (self.job_name, self.shellfile))
                tmpfile.write("pwd | sed 's/^/%s : /' >> %s\n" %
                              (self.job_name, self.shellfile))
                tmpfile.write("hostname | sed 's/^/%s: /' >> %s\n" %
                              (self.job_name, self.shellfile))
                # cat /proc/meminfo is Linux specific
                if get_params()['os'] == 'Linux':
                    tmpfile.write("cat /proc/meminfo | sed 's/^/%s: /' >> %s\n" %
                                  (self.job_name, self.shellfile))
                elif get_params()['os'] == 'Darwin':
                    tmpfile.write("vm_stat | sed 's/^/%s: /' >> %s\n" %
                                  (self.job_name, self.shellfile))
                tmpfile.write(
                    'echo "%s : END -> %s" >> %s\n' %
                    (self.job_name, tmpfilename, self.shellfile))
                tmpfile.write("ulimit | sed 's/^/%s: /' >> %s\n" %
                              (self.job_name, self.shellfile))

            job_path = os.path.abspath(tmpfilename)

            tmpfile.write(expanded_statement)
            tmpfile.write("\n\n")
            tmpfile.close()

        return statement, job_path

    def collect_benchmark_data(self,
                               statements,
                               resource_usage):
        """collect benchmark data from a job's stdout and any resource usage
        information that might be present.

        If time_data is given, read output from time command.
        """

        benchmark_data = []

        def get_val(d, v, alt):
            val = d.get(v, alt)
            if val == "unknown" or val is None:
                val = alt
            return val

        # build resource usage data structure - part native, part
        # mapped to common fields
        for jobinfo, statement in zip(resource_usage, statements):

            if resource_usage is None:
                E.warn("no resource usage for {}".format(self.task_name))
                continue

            # add some common fields
            data = {"task": self.task_name,
                    "engine": self.__class__.__name__,
                    "statement": statement,
                    "job_id": jobinfo.jobId,
                    "slots": self.job_threads}

            # native specs
            data.update(jobinfo.resourceUsage)

            # translate specs
            if self.queue_manager:
                data.update(
                    self.queue_manager.map_resource_usage(data, DATA2TYPE))

            cpu_time = float(get_val(data, "cpu_t", 0))
            start_time = float(get_val(data, "start_time", 0))
            end_time = float(get_val(data, "end_time", 0))
            data.update({
                # avoid division by 0 error
                "percent_cpu": (
                    100.0 * cpu_time / max(1.0, (end_time - start_time)) / self.job_threads),
                "total_t": end_time - start_time
            })
            benchmark_data.append(data)

        return benchmark_data

    def set_container_config(self, image, volumes=None, env_vars=None, runtime="docker"):
        """Set container configuration for all tasks executed by this executor."""

        if not image:
            raise ValueError("An image must be specified for the container configuration.")
        self.container_config = ContainerConfig(image=image, volumes=volumes, env_vars=env_vars, runtime=runtime)

    def start_job(self, job_info):
        """Add a job to active_jobs list when it starts."""
        self.active_jobs.append(job_info)
        self.logger.info(f"Job started: {job_info}")

    def finish_job(self, job_info):
        """Remove a job from active_jobs list when it finishes."""
        if job_info in self.active_jobs:
            self.active_jobs.remove(job_info)
            self.logger.info(f"Job completed: {job_info}")

    def cleanup_all_jobs(self):
        """Clean up all remaining active jobs on interruption."""
        self.logger.info("Cleaning up all job outputs due to pipeline interruption")
        for job_info in self.active_jobs:
            self.cleanup_failed_job(job_info)
        self.active_jobs.clear()  # Clear the list after cleanup

    def setup_signal_handlers(self):
        """Set up signal handlers to clean up jobs on SIGINT and SIGTERM."""

        def signal_handler(signum, frame):
            self.logger.info(f"Received signal {signum}. Starting clean-up.")
            self.cleanup_all_jobs()
            exit(1)

        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)

    def cleanup_failed_job(self, job_info):
        """Clean up files generated by a failed job."""
        if "outfile" in job_info:
            outfiles = [job_info["outfile"]]
        elif "outfiles" in job_info:
            outfiles = job_info["outfiles"]
        else:
            self.logger.warning(f"No output files found for job {job_info.get('job_name', 'unknown')}")
            return

        for outfile in outfiles:
            if os.path.exists(outfile):
                try:
                    os.remove(outfile)
                    self.logger.info(f"Removed failed job output file: {outfile}")
                except OSError as e:
                    self.logger.error(f"Error removing file {outfile}: {str(e)}")
            else:
                self.logger.info(f"Output file not found (already removed or not created): {outfile}")

    def run(
            self,
            statement_list,
            job_memory=None,
            job_threads=None,
            container_runtime=None,
            image=None,
            volumes=None,
            env_vars=None,
            **kwargs,):

        """
        Execute a list of statements with optional container support.

            Args:
                statement_list (list): List of commands to execute.
                job_memory (str): Memory requirements (e.g., "4G").
                job_threads (int): Number of threads to use.
                container_runtime (str): Container runtime ("docker" or "singularity").
                image (str): Container image to use.
                volumes (list): Volume mappings (e.g., ['/data:/data']).
                env_vars (dict): Environment variables for the container.
                **kwargs: Additional arguments.
        """
        # Validation checks
        if container_runtime and container_runtime not in ["docker", "singularity"]:
            self.logger.error(f"Invalid container_runtime: {container_runtime}")
            raise ValueError("Container runtime must be 'docker' or 'singularity'")

        if container_runtime and not image:
            self.logger.error(f"Container runtime specified without an image: {container_runtime}")
            raise ValueError("An image must be specified when using a container runtime")

        benchmark_data = []

        for statement in statement_list:
            job_info = {"statement": statement}
            self.start_job(job_info)

            try:
                # Prepare containerized execution
                if container_runtime:
                    self.set_container_config(image=image, volumes=volumes, env_vars=env_vars, runtime=container_runtime)
                    statement = self.container_config.get_container_command(statement)

                # Add memory and thread environment variables
                if job_memory:
                    env_vars = env_vars or {}
                    env_vars["JOB_MEMORY"] = job_memory
                if job_threads:
                    env_vars = env_vars or {}
                    env_vars["JOB_THREADS"] = job_threads

                # Debugging: Log the constructed command
                self.logger.info(f"Executing command: {statement}")

                # Build and execute the statement
                full_statement, job_path = self.build_job_script(statement)
                process = subprocess.Popen(
                    full_statement, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
                )
                stdout, stderr = process.communicate()

                if process.returncode != 0:
                    raise OSError(
                        f"Job failed with return code {process.returncode}.\n"
                        f"stderr: {stderr.decode('utf-8')}\ncommand: {statement}"
                    )

                # Collect benchmark data for successful jobs
                benchmark_data.append(
                    self.collect_benchmark_data(
                        statement, resource_usage={"job_id": process.pid}
                    )
                )
                self.finish_job(job_info)

            except Exception as e:
                self.logger.error(f"Job failed: {e}")
                self.cleanup_failed_job(job_info)
                if not self.ignore_errors:
                    raise

        return benchmark_data

build_job_script(statement)

build job script from statement.

returns (name_of_script, stdout_path, stderr_path)

Source code in cgatcore/pipeline/execution.py
def build_job_script(self,
                     statement):
    '''build job script from statement.

    returns (name_of_script, stdout_path, stderr_path)
    '''
    tmpfilename = get_temp_filename(dir=self.work_dir, clear=True)
    tmpfilename = tmpfilename + ".sh"

    expanded_statement, cleanup_funcs = self.expand_statement(statement)

    with open(tmpfilename, "w") as tmpfile:
        # disabled: -l -O expand_aliases\n" )

        # make executable
        tmpfile.write("#!/bin/bash -eu\n")
        if not self.ignore_pipe_errors:
            tmpfile.write("set -o pipefail\n")

        os.chmod(tmpfilename, stat.S_IRWXG | stat.S_IRWXU)

        tmpfile.write("\ncd {}\n".format(self.work_dir))
        if self.output_directories is not None:
            for outdir in self.output_directories:
                if outdir:
                    tmpfile.write("\nmkdir -p {}\n".format(outdir))

        # create and set system scratch dir for temporary files
        tmpfile.write("umask 002\n")

        cluster_tmpdir = get_params()["cluster_tmpdir"]

        if self.run_on_cluster and cluster_tmpdir:
            tmpdir = cluster_tmpdir
            tmpfile.write("TMPDIR=`mktemp -d -p {}`\n".format(tmpdir))
            tmpfile.write("export TMPDIR\n")
        else:
            tmpdir = get_temp_dir(dir=get_params()["tmpdir"],
                                  clear=True)
            tmpfile.write("mkdir -p {}\n".format(tmpdir))
            tmpfile.write("export TMPDIR={}\n".format(tmpdir))

        cleanup_funcs.append(
            ("clean_temp",
             "{{ rm -rf {}; }}".format(tmpdir)))

        # output times whenever script exits, preserving
        # return status
        cleanup_funcs.append(("info",
                              "{ echo 'benchmark'; hostname; times; }"))
        for cleanup_func, cleanup_code in cleanup_funcs:
            tmpfile.write("\n{}() {}\n".format(cleanup_func, cleanup_code))

        tmpfile.write("\nclean_all() {{ {}; }}\n".format(
            "; ".join([x[0] for x in cleanup_funcs])))

        tmpfile.write("\ntrap clean_all EXIT\n\n")

        if self.job_memory not in ("unlimited", "etc") and \
           self.options.get("cluster_memory_ulimit", False):
            # restrict virtual memory
            # Note that there are resources in SGE which could do this directly
            # such as v_hmem.
            # Note that limiting resident set sizes (RSS) with ulimit is not
            # possible in newer kernels.
            # -v and -m accept memory in kb
            requested_memory_kb = max(
                1000,
                int(math.ceil(
                    iotools.human2bytes(self.job_memory) / 1024 * self.job_threads)))
            # unsetting error exit as often not permissions
            tmpfile.write("set +e\n")
            tmpfile.write("ulimit -v {} > /dev/null \n".format(
                requested_memory_kb))
            tmpfile.write("ulimit -m {} > /dev/null \n".format(
                requested_memory_kb))
            # set as hard limit
            tmpfile.write("ulimit -H -v > /dev/null \n")
            tmpfile.write("set -e\n")

        if self.shellfile:

            # make sure path exists that we want to write to
            tmpfile.write("mkdir -p $(dirname \"{}\")\n".format(
                self.shellfile))

            # output low-level debugging information to a shell log file
            tmpfile.write(
                'echo "%s : START -> %s" >> %s\n' %
                (self.job_name, tmpfilename, self.shellfile))
            # disabled - problems with quoting
            # tmpfile.write( '''echo 'statement=%s' >> %s\n''' %
            # (shellquote(statement), self.shellfile) )
            tmpfile.write("set | sed 's/^/%s : /' >> %s\n" %
                          (self.job_name, self.shellfile))
            tmpfile.write("pwd | sed 's/^/%s : /' >> %s\n" %
                          (self.job_name, self.shellfile))
            tmpfile.write("hostname | sed 's/^/%s: /' >> %s\n" %
                          (self.job_name, self.shellfile))
            # cat /proc/meminfo is Linux specific
            if get_params()['os'] == 'Linux':
                tmpfile.write("cat /proc/meminfo | sed 's/^/%s: /' >> %s\n" %
                              (self.job_name, self.shellfile))
            elif get_params()['os'] == 'Darwin':
                tmpfile.write("vm_stat | sed 's/^/%s: /' >> %s\n" %
                              (self.job_name, self.shellfile))
            tmpfile.write(
                'echo "%s : END -> %s" >> %s\n' %
                (self.job_name, tmpfilename, self.shellfile))
            tmpfile.write("ulimit | sed 's/^/%s: /' >> %s\n" %
                          (self.job_name, self.shellfile))

        job_path = os.path.abspath(tmpfilename)

        tmpfile.write(expanded_statement)
        tmpfile.write("\n\n")
        tmpfile.close()

    return statement, job_path

cleanup_all_jobs()

Clean up all remaining active jobs on interruption.

Source code in cgatcore/pipeline/execution.py
def cleanup_all_jobs(self):
    """Clean up all remaining active jobs on interruption."""
    self.logger.info("Cleaning up all job outputs due to pipeline interruption")
    for job_info in self.active_jobs:
        self.cleanup_failed_job(job_info)
    self.active_jobs.clear()  # Clear the list after cleanup

cleanup_failed_job(job_info)

Clean up files generated by a failed job.

Source code in cgatcore/pipeline/execution.py
def cleanup_failed_job(self, job_info):
    """Clean up files generated by a failed job."""
    if "outfile" in job_info:
        outfiles = [job_info["outfile"]]
    elif "outfiles" in job_info:
        outfiles = job_info["outfiles"]
    else:
        self.logger.warning(f"No output files found for job {job_info.get('job_name', 'unknown')}")
        return

    for outfile in outfiles:
        if os.path.exists(outfile):
            try:
                os.remove(outfile)
                self.logger.info(f"Removed failed job output file: {outfile}")
            except OSError as e:
                self.logger.error(f"Error removing file {outfile}: {str(e)}")
        else:
            self.logger.info(f"Output file not found (already removed or not created): {outfile}")

collect_benchmark_data(statements, resource_usage)

collect benchmark data from a job's stdout and any resource usage information that might be present.

If time_data is given, read output from time command.

Source code in cgatcore/pipeline/execution.py
def collect_benchmark_data(self,
                           statements,
                           resource_usage):
    """collect benchmark data from a job's stdout and any resource usage
    information that might be present.

    If time_data is given, read output from time command.
    """

    benchmark_data = []

    def get_val(d, v, alt):
        val = d.get(v, alt)
        if val == "unknown" or val is None:
            val = alt
        return val

    # build resource usage data structure - part native, part
    # mapped to common fields
    for jobinfo, statement in zip(resource_usage, statements):

        if resource_usage is None:
            E.warn("no resource usage for {}".format(self.task_name))
            continue

        # add some common fields
        data = {"task": self.task_name,
                "engine": self.__class__.__name__,
                "statement": statement,
                "job_id": jobinfo.jobId,
                "slots": self.job_threads}

        # native specs
        data.update(jobinfo.resourceUsage)

        # translate specs
        if self.queue_manager:
            data.update(
                self.queue_manager.map_resource_usage(data, DATA2TYPE))

        cpu_time = float(get_val(data, "cpu_t", 0))
        start_time = float(get_val(data, "start_time", 0))
        end_time = float(get_val(data, "end_time", 0))
        data.update({
            # avoid division by 0 error
            "percent_cpu": (
                100.0 * cpu_time / max(1.0, (end_time - start_time)) / self.job_threads),
            "total_t": end_time - start_time
        })
        benchmark_data.append(data)

    return benchmark_data

expand_statement(statement)

add generic commands before and after statement.

The method scans the statement for arvados mount points and inserts appropriate prefixes to make sure that the mount point exists.

Arguments

statement : string Command line statement to expand

Returns

statement : string The expanded statement.

Source code in cgatcore/pipeline/execution.py
def expand_statement(self, statement):
    '''add generic commands before and after statement.

    The method scans the statement for arvados mount points and
    inserts appropriate prefixes to make sure that the mount point
    exists.

    Arguments
    ---------
    statement : string
        Command line statement to expand

    Returns
    -------
    statement : string
        The expanded statement.

    '''

    setup_cmds = []
    teardown_cmds = []
    cleanup_funcs = []

    setup_cmds.append("umask 002")

    for var in ["MKL_NUM_THREADS",
                "OPENBLAS_NUM_THREADS",
                "OMP_NUM_THREADS"]:
        setup_cmds.append("export {}={}".format(
            var, self.options.get("job_threads", 1)))

    if "arv=" in statement:

        # Todo: permit setting this in params
        arvados_api_token = os.environ.get("ARVADOS_API_TOKEN", None)
        arvados_api_host = os.environ.get("ARVADOS_API_HOST", None)
        if not arvados_api_token:
            raise ValueError(
                "arvados mount encountered in statement {}, "
                "but ARVADOS_API_TOKEN not defined".format(statement))

        if not arvados_api_host:
            raise ValueError(
                "arvados mount encountered in statement {}, "
                "but ARVADOS_API_HOST not defined".format(statement))

        mountpoint = get_temp_filename(clear=True)

        arvados_options = "--disable-event-listening --read-only"
        setup_cmds.append("\n".join(
            ('export ARVADOS_API_TOKEN="{arvados_api_token}"',
             'export ARVADOS_API_HOST="{arvados_api_host}"',
             'export ARVADOS_API_HOST_INSECURE=true',
             'export ARVADOS_MOUNT_POINT="{mountpoint}"',
             'mkdir -p "{mountpoint}"',
             'arv-mount {arvados_options} "{mountpoint}" 2>> /dev/null')).format(**locals()))

        statement = re.sub("arv=", mountpoint + "/", statement)

        # "arv-mount --unmount {mountpoint}" not available in newer
        # arvados installs (0.1.20170707152712), so keep using
        # fusermount. However, do not fail if you can't clean up, as
        # there are arvados racing issues.
        cleanup_funcs.append(("unmount_arvados",
                              '''{{
                              set +e &&
                              fusermount -u {mountpoint} &&
                              rm -rf {mountpoint} &&
                              set -e
                              }}'''.format(**locals())))

    if "job_condaenv" in self.options:
        # In conda < 4.4 there is an issue with parallel activations,
        # see https://github.com/conda/conda/issues/2837 .
        # This has been fixed in conda 4.4, but we are on conda
        # 4.3, presumably because we are still on py35. A work-around
        # to source activate is to add the explicit path of the environment
        # in version >= 4.4, do
        # setup_cmds.append(
        #     "conda activate {}".format(self.options["job_condaenv"]))
        # For old conda versions (note this will not work for tools that require
        # additional environment variables)
        setup_cmds.append(
            "export PATH={}:$PATH".format(
                os.path.join(
                    get_conda_environment_directory(
                        self.options["job_condaenv"]),
                    "bin")))

    statement = "\n".join((
        "\n".join(setup_cmds),
        statement,
        "\n".join(teardown_cmds)))

    return statement, cleanup_funcs

finish_job(job_info)

Remove a job from active_jobs list when it finishes.

Source code in cgatcore/pipeline/execution.py
def finish_job(self, job_info):
    """Remove a job from active_jobs list when it finishes."""
    if job_info in self.active_jobs:
        self.active_jobs.remove(job_info)
        self.logger.info(f"Job completed: {job_info}")

run(statement_list, job_memory=None, job_threads=None, container_runtime=None, image=None, volumes=None, env_vars=None, **kwargs)

Execute a list of statements with optional container support.

Args:
    statement_list (list): List of commands to execute.
    job_memory (str): Memory requirements (e.g., "4G").
    job_threads (int): Number of threads to use.
    container_runtime (str): Container runtime ("docker" or "singularity").
    image (str): Container image to use.
    volumes (list): Volume mappings (e.g., ['/data:/data']).
    env_vars (dict): Environment variables for the container.
    **kwargs: Additional arguments.
Source code in cgatcore/pipeline/execution.py
def run(
        self,
        statement_list,
        job_memory=None,
        job_threads=None,
        container_runtime=None,
        image=None,
        volumes=None,
        env_vars=None,
        **kwargs,):

    """
    Execute a list of statements with optional container support.

        Args:
            statement_list (list): List of commands to execute.
            job_memory (str): Memory requirements (e.g., "4G").
            job_threads (int): Number of threads to use.
            container_runtime (str): Container runtime ("docker" or "singularity").
            image (str): Container image to use.
            volumes (list): Volume mappings (e.g., ['/data:/data']).
            env_vars (dict): Environment variables for the container.
            **kwargs: Additional arguments.
    """
    # Validation checks
    if container_runtime and container_runtime not in ["docker", "singularity"]:
        self.logger.error(f"Invalid container_runtime: {container_runtime}")
        raise ValueError("Container runtime must be 'docker' or 'singularity'")

    if container_runtime and not image:
        self.logger.error(f"Container runtime specified without an image: {container_runtime}")
        raise ValueError("An image must be specified when using a container runtime")

    benchmark_data = []

    for statement in statement_list:
        job_info = {"statement": statement}
        self.start_job(job_info)

        try:
            # Prepare containerized execution
            if container_runtime:
                self.set_container_config(image=image, volumes=volumes, env_vars=env_vars, runtime=container_runtime)
                statement = self.container_config.get_container_command(statement)

            # Add memory and thread environment variables
            if job_memory:
                env_vars = env_vars or {}
                env_vars["JOB_MEMORY"] = job_memory
            if job_threads:
                env_vars = env_vars or {}
                env_vars["JOB_THREADS"] = job_threads

            # Debugging: Log the constructed command
            self.logger.info(f"Executing command: {statement}")

            # Build and execute the statement
            full_statement, job_path = self.build_job_script(statement)
            process = subprocess.Popen(
                full_statement, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
            )
            stdout, stderr = process.communicate()

            if process.returncode != 0:
                raise OSError(
                    f"Job failed with return code {process.returncode}.\n"
                    f"stderr: {stderr.decode('utf-8')}\ncommand: {statement}"
                )

            # Collect benchmark data for successful jobs
            benchmark_data.append(
                self.collect_benchmark_data(
                    statement, resource_usage={"job_id": process.pid}
                )
            )
            self.finish_job(job_info)

        except Exception as e:
            self.logger.error(f"Job failed: {e}")
            self.cleanup_failed_job(job_info)
            if not self.ignore_errors:
                raise

    return benchmark_data

set_container_config(image, volumes=None, env_vars=None, runtime='docker')

Set container configuration for all tasks executed by this executor.

Source code in cgatcore/pipeline/execution.py
def set_container_config(self, image, volumes=None, env_vars=None, runtime="docker"):
    """Set container configuration for all tasks executed by this executor."""

    if not image:
        raise ValueError("An image must be specified for the container configuration.")
    self.container_config = ContainerConfig(image=image, volumes=volumes, env_vars=env_vars, runtime=runtime)

setup_signal_handlers()

Set up signal handlers to clean up jobs on SIGINT and SIGTERM.

Source code in cgatcore/pipeline/execution.py
def setup_signal_handlers(self):
    """Set up signal handlers to clean up jobs on SIGINT and SIGTERM."""

    def signal_handler(signum, frame):
        self.logger.info(f"Received signal {signum}. Starting clean-up.")
        self.cleanup_all_jobs()
        exit(1)

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

start_job(job_info)

Add a job to active_jobs list when it starts.

Source code in cgatcore/pipeline/execution.py
def start_job(self, job_info):
    """Add a job to active_jobs list when it starts."""
    self.active_jobs.append(job_info)
    self.logger.info(f"Job started: {job_info}")

KubernetesExecutor

Bases: BaseExecutor

Executor for managing and running jobs on a Kubernetes cluster.

This class is responsible for submitting jobs to a Kubernetes cluster, monitoring their execution, and collecting benchmark data related to their performance.

Attributes:

Name Type Description
namespace str

The Kubernetes namespace in which to run the jobs. Defaults to 'default'.

api CoreV1Api

The Kubernetes Core API client for interacting with the cluster.

batch_api BatchV1Api

The Kubernetes Batch API client for managing jobs.

Source code in cgatcore/pipeline/kubernetes.py
class KubernetesExecutor(BaseExecutor):
    """Executor for managing and running jobs on a Kubernetes cluster.

    This class is responsible for submitting jobs to a Kubernetes cluster, monitoring their execution,
    and collecting benchmark data related to their performance.

    Attributes:
        namespace (str): The Kubernetes namespace in which to run the jobs. Defaults to 'default'.
        api (CoreV1Api): The Kubernetes Core API client for interacting with the cluster.
        batch_api (BatchV1Api): The Kubernetes Batch API client for managing jobs.
    """

    def __init__(self, **kwargs):
        """Initializes the KubernetesExecutor with the specified configuration options.

        Args:
            **kwargs: Additional configuration options, including the namespace.
        """
        super().__init__(**kwargs)
        self.namespace = kwargs.get("namespace", "default")

        # Load Kubernetes configuration
        try:
            config.load_kube_config()
            self.api = client.CoreV1Api()
            self.batch_api = client.BatchV1Api()
            logger.info("Kubernetes configuration loaded successfully.")
        except exceptions.ConfigException as e:
            logger.error("Failed to load Kubernetes configuration", exc_info=True)
            raise e

    def run(self, statement, job_path, job_condaenv):
        """Submits a job to the Kubernetes cluster to run the specified command.

        This method creates a Kubernetes Job object and submits it to the cluster. The job runs the
        specified command in a container, using the provided Conda environment.

        Args:
            statement (str): The command to execute in the job.
            job_path (str): The path to the job script.
            job_condaenv (str): The name of the Conda environment to use.
        """
        job_name = f"cgat-{os.path.basename(job_path)}-{int(time.time())}"
        container_image = "your-docker-image:tag"  # Replace with your Docker image

        # Define Kubernetes Job spec
        job_spec = client.V1Job(
            metadata=client.V1ObjectMeta(name=job_name),
            spec=client.V1JobSpec(
                template=client.V1PodTemplateSpec(
                    spec=client.V1PodSpec(
                        containers=[
                            client.V1Container(
                                name="cgat-job",
                                image=container_image,
                                command=["/bin/bash", "-c", statement],
                                env=[client.V1EnvVar(name="CONDA_ENV", value=job_condaenv)],
                            )
                        ],
                        restart_policy="Never"
                    )
                ),
                backoff_limit=4  # Retry policy in case of transient failures
            )
        )

        # Create and monitor Kubernetes Job
        try:
            logger.info(f"Creating Kubernetes Job '{job_name}' in namespace '{self.namespace}'.")
            start_time = datetime.now()
            self.batch_api.create_namespaced_job(self.namespace, job_spec)
            self._wait_for_job_completion(job_name)
            end_time = datetime.now()
            logs = self._get_pod_logs(job_name)
            self.collect_metric_data("Kubernetes Job", start_time, end_time, "time_data.json")
        finally:
            self._cleanup_job(job_name)

        return logs

    def _wait_for_job_completion(self, job_name):
        """Wait until the job completes or fails."""
        while True:
            job_status = self.batch_api.read_namespaced_job_status(job_name, self.namespace).status
            if job_status.succeeded:
                logger.info(f"Job '{job_name}' completed successfully.")
                return
            if job_status.failed:
                logger.error(f"Job '{job_name}' failed.")
                raise RuntimeError(f"Kubernetes Job {job_name} failed.")
            time.sleep(5)

    def _get_pod_logs(self, job_name):
        """Retrieve logs from the Job's pod."""
        pods = self.api.list_namespaced_pod(self.namespace, label_selector=f"job-name={job_name}").items
        if not pods:
            logger.error(f"No pod found for job '{job_name}'.")
            raise RuntimeError(f"No pod found for job '{job_name}'.")

        pod_name = pods[0].metadata.name
        logger.info(f"Fetching logs from pod '{pod_name}'.")
        return self.api.read_namespaced_pod_log(pod_name, self.namespace)

    def _cleanup_job(self, job_name):
        """Delete the Job and its pods."""
        try:
            self.batch_api.delete_namespaced_job(job_name, self.namespace, propagation_policy="Background")
            logger.info(f"Job '{job_name}' cleaned up successfully.")
        except exceptions.ApiException as e:
            logger.warning(f"Failed to delete Job '{job_name}'", exc_info=True)

    def collect_benchmark_data(self, statements, resource_usage=None):
        """Collect benchmark data for Kubernetes jobs.

        This method gathers information about the executed statements and any resource usage data.

        Args:
            statements (list): List of executed statements.
            resource_usage (list, optional): Resource usage data.

        Returns:
            dict: A dictionary containing the task name, total execution time, executed statements,
                  and resource usage data.
        """
        return {
            "task": "kubernetes_task",
            "total_t": 12,  # Example value, adjust as needed
            "statements": statements,
            "resource_usage": resource_usage or []
        }

    def collect_metric_data(self, process, start_time, end_time, time_data_file):
        """
        Collects metric data related to job duration and writes it to a file.

        Parameters:
        - process (str): Process name for tracking purposes.
        - start_time (datetime): Timestamp when the job started.
        - end_time (datetime): Timestamp when the job ended.
        - time_data_file (str): Path to a file where timing data will be saved.
        """
        duration = (end_time - start_time).total_seconds()
        metric_data = {
            "process": process,
            "start_time": start_time.isoformat(),
            "end_time": end_time.isoformat(),
            "duration_seconds": duration
        }

        # Log metric data
        logger.info(
            f"Metric data collected for process '{process}': start time = {start_time}, end time = {end_time}, "
            f"duration = {duration} seconds."
        )

        # Write metric data to file
        try:
            with open(time_data_file, "w") as f:
                json.dump(metric_data, f, indent=4)
            logger.info(f"Metric data saved to {time_data_file}")

        except Exception as e:
            logger.error("Error writing metric data to file", exc_info=True)
            raise e

__init__(**kwargs)

Initializes the KubernetesExecutor with the specified configuration options.

Parameters:

Name Type Description Default
**kwargs

Additional configuration options, including the namespace.

{}
Source code in cgatcore/pipeline/kubernetes.py
def __init__(self, **kwargs):
    """Initializes the KubernetesExecutor with the specified configuration options.

    Args:
        **kwargs: Additional configuration options, including the namespace.
    """
    super().__init__(**kwargs)
    self.namespace = kwargs.get("namespace", "default")

    # Load Kubernetes configuration
    try:
        config.load_kube_config()
        self.api = client.CoreV1Api()
        self.batch_api = client.BatchV1Api()
        logger.info("Kubernetes configuration loaded successfully.")
    except exceptions.ConfigException as e:
        logger.error("Failed to load Kubernetes configuration", exc_info=True)
        raise e

collect_benchmark_data(statements, resource_usage=None)

Collect benchmark data for Kubernetes jobs.

This method gathers information about the executed statements and any resource usage data.

Parameters:

Name Type Description Default
statements list

List of executed statements.

required
resource_usage list

Resource usage data.

None

Returns:

Name Type Description
dict

A dictionary containing the task name, total execution time, executed statements, and resource usage data.

Source code in cgatcore/pipeline/kubernetes.py
def collect_benchmark_data(self, statements, resource_usage=None):
    """Collect benchmark data for Kubernetes jobs.

    This method gathers information about the executed statements and any resource usage data.

    Args:
        statements (list): List of executed statements.
        resource_usage (list, optional): Resource usage data.

    Returns:
        dict: A dictionary containing the task name, total execution time, executed statements,
              and resource usage data.
    """
    return {
        "task": "kubernetes_task",
        "total_t": 12,  # Example value, adjust as needed
        "statements": statements,
        "resource_usage": resource_usage or []
    }

collect_metric_data(process, start_time, end_time, time_data_file)

Collects metric data related to job duration and writes it to a file.

Parameters: - process (str): Process name for tracking purposes. - start_time (datetime): Timestamp when the job started. - end_time (datetime): Timestamp when the job ended. - time_data_file (str): Path to a file where timing data will be saved.

Source code in cgatcore/pipeline/kubernetes.py
def collect_metric_data(self, process, start_time, end_time, time_data_file):
    """
    Collects metric data related to job duration and writes it to a file.

    Parameters:
    - process (str): Process name for tracking purposes.
    - start_time (datetime): Timestamp when the job started.
    - end_time (datetime): Timestamp when the job ended.
    - time_data_file (str): Path to a file where timing data will be saved.
    """
    duration = (end_time - start_time).total_seconds()
    metric_data = {
        "process": process,
        "start_time": start_time.isoformat(),
        "end_time": end_time.isoformat(),
        "duration_seconds": duration
    }

    # Log metric data
    logger.info(
        f"Metric data collected for process '{process}': start time = {start_time}, end time = {end_time}, "
        f"duration = {duration} seconds."
    )

    # Write metric data to file
    try:
        with open(time_data_file, "w") as f:
            json.dump(metric_data, f, indent=4)
        logger.info(f"Metric data saved to {time_data_file}")

    except Exception as e:
        logger.error("Error writing metric data to file", exc_info=True)
        raise e

run(statement, job_path, job_condaenv)

Submits a job to the Kubernetes cluster to run the specified command.

This method creates a Kubernetes Job object and submits it to the cluster. The job runs the specified command in a container, using the provided Conda environment.

Parameters:

Name Type Description Default
statement str

The command to execute in the job.

required
job_path str

The path to the job script.

required
job_condaenv str

The name of the Conda environment to use.

required
Source code in cgatcore/pipeline/kubernetes.py
def run(self, statement, job_path, job_condaenv):
    """Submits a job to the Kubernetes cluster to run the specified command.

    This method creates a Kubernetes Job object and submits it to the cluster. The job runs the
    specified command in a container, using the provided Conda environment.

    Args:
        statement (str): The command to execute in the job.
        job_path (str): The path to the job script.
        job_condaenv (str): The name of the Conda environment to use.
    """
    job_name = f"cgat-{os.path.basename(job_path)}-{int(time.time())}"
    container_image = "your-docker-image:tag"  # Replace with your Docker image

    # Define Kubernetes Job spec
    job_spec = client.V1Job(
        metadata=client.V1ObjectMeta(name=job_name),
        spec=client.V1JobSpec(
            template=client.V1PodTemplateSpec(
                spec=client.V1PodSpec(
                    containers=[
                        client.V1Container(
                            name="cgat-job",
                            image=container_image,
                            command=["/bin/bash", "-c", statement],
                            env=[client.V1EnvVar(name="CONDA_ENV", value=job_condaenv)],
                        )
                    ],
                    restart_policy="Never"
                )
            ),
            backoff_limit=4  # Retry policy in case of transient failures
        )
    )

    # Create and monitor Kubernetes Job
    try:
        logger.info(f"Creating Kubernetes Job '{job_name}' in namespace '{self.namespace}'.")
        start_time = datetime.now()
        self.batch_api.create_namespaced_job(self.namespace, job_spec)
        self._wait_for_job_completion(job_name)
        end_time = datetime.now()
        logs = self._get_pod_logs(job_name)
        self.collect_metric_data("Kubernetes Job", start_time, end_time, "time_data.json")
    finally:
        self._cleanup_job(job_name)

    return logs

LoggingFilterProgress

Bases: Filter

add progress information to the log-stream.

A :term:task is a ruffus_ decorated function, which will execute one or more :term:jobs.

Valid task/job status: update task/job needs updating completed task/job completed successfully failed task/job failed running task/job is running ignore ignore task/job (is up-to-date)

This filter adds the following context to a log record:

task task_name

task_status task status

task_total number of jobs in task

task_completed number of jobs in task completed

task_completed_percent percentage of task completed

The filter will also generate an additional log message in json format with the fields above.

Arguments

ruffus_text : string Log messages from ruffus.pipeline_printout. These are used to collect all tasks that will be executed during pipeline execution.

Source code in cgatcore/pipeline/control.py
class LoggingFilterProgress(logging.Filter):
    """add progress information to the log-stream.

    A :term:`task` is a ruffus_ decorated function, which will execute
    one or more :term:`jobs`.

    Valid task/job status:
    update
       task/job needs updating
    completed
       task/job completed successfully
    failed
       task/job failed
    running
       task/job is running
    ignore
       ignore task/job (is up-to-date)

    This filter adds the following context to a log record:

    task
       task_name

    task_status
       task status

    task_total
       number of jobs in task

    task_completed
       number of jobs in task completed

    task_completed_percent
       percentage of task completed

    The filter will also generate an additional log message in json format
    with the fields above.

    Arguments
    ---------
    ruffus_text : string
        Log messages from ruffus.pipeline_printout. These are used
        to collect all tasks that will be executed during pipeline
        execution.

    """

    def __init__(self,
                 ruffus_text):

        # dictionary of jobs to run
        self.jobs = {}
        self.tasks = {}
        self.map_job2task = {}
        self.logger = get_logger()

        def split_by_job(text):
            # ignore optional docstring at beginning (is bracketed by '"')
            text = re.sub(r'^\"[^"]+\"', "", "".join(text))
            for line in re.split(r"Job\s+=", text):
                if not line.strip():
                    continue
                if "Make missing directories" in line:
                    continue
                try:
                    # long file names cause additional wrapping and
                    # additional white-space characters
                    job_name = re.search(
                        r"\[.*-> ([^\]]+)\]", line).groups()[0]
                except AttributeError:
                    continue
                    # raise AttributeError("could not parse '%s'" % line)
                job_status = "ignore"
                if "Job needs update" in line:
                    job_status = "update"

                yield job_name, job_status

        def split_by_task(text):
            block, task_name = [], None
            task_status = None
            for line in text.splitlines():
                line = line.strip()

                if line.startswith("Tasks which will be run"):
                    task_status = "update"
                    block = []
                    continue
                elif line.startswith("Tasks which are up-to-date"):
                    task_status = "ignore"
                    block = []
                    continue

                if line.startswith("Task = "):
                    if task_name:
                        yield task_name, task_status, list(split_by_job(block))
                    block = []
                    task_name = re.match("Task = (.*)", line).groups()[0]
                    continue
                if line:
                    block.append(line)
            if task_name:
                yield task_name, task_status, list(split_by_job(block))

        # populate with initial messages
        for task_name, task_status, jobs in split_by_task(ruffus_text):
            if task_name.startswith("(mkdir"):
                continue

            to_run = 0
            for job_name, job_status in jobs:
                self.jobs[job_name] = (task_name, job_name)
                if job_status == "update":
                    to_run += 1
                self.map_job2task[re.sub(r"\s", "", job_name)] = task_name

            self.tasks[task_name] = [task_status,
                                     len(jobs),
                                     len(jobs) - to_run]

    def filter(self, record):

        if not record.filename.endswith("task.py"):
            return True

        # update task counts and status
        job_name, task_name = None, None
        if re.search(r"Job\s+=", record.msg):
            try:
                job_name = re.search(
                    r"\[.*-> ([^\]]+)\]", record.msg).groups()[0]
            except AttributeError:
                return True
            job_name = re.sub(r"\s", "", job_name)
            task_name = self.map_job2task.get(job_name, None)
            if task_name is None:
                return
            if "completed" in record.msg:
                self.tasks[task_name][2] += 1

        elif re.search(r"Task\s+=", record.msg):
            try:
                before, task_name = record.msg.strip().split(" = ")
            except ValueError:
                return True

            # ignore the mkdir, etc tasks
            if task_name not in self.tasks:
                return True

            if before == "Task enters queue":
                self.tasks[task_name][0] = "running"
            elif before == "Completed Task":
                self.tasks[task_name][0] = "completed"
            elif before == "Uptodate Task":
                self.tasks[task_name][0] = "uptodate"
            else:
                return True
        else:
            return True

        if task_name is None:
            return

        # update log record
        task_status, task_total, task_completed = self.tasks[task_name]
        if task_total > 0:
            task_completed_percent = 100.0 * task_completed / task_total
        else:
            task_completed_percent = 0

        # ignore prefix:: in task_name for output
        task_name = re.sub("^[^:]+::", "", task_name)
        data = {
            "task": task_name,
            "task_status": task_status,
            "task_total": task_total,
            "task_completed": task_completed,
            "task_completed_percent": task_completed_percent}

        record.task_status = task_status
        record.task_total = task_total
        record.task_completed = task_completed
        record.task_completed_percent = task_completed_percent

        # log status
        self.logger.info(json.dumps(data))

        return True

LoggingFilterpipelineName

Bases: Filter

add pipeline name to log message.

With this filter, %(app_name)s can be used in log formats.

Source code in cgatcore/pipeline/control.py
class LoggingFilterpipelineName(logging.Filter):
    """add pipeline name to log message.

    With this filter, %(app_name)s can be used in log formats.
    """

    def __init__(self, name, *args, **kwargs):
        logging.Filter.__init__(self, *args, **kwargs)
        self.app_name = name

    def filter(self, record):
        record.app_name = self.app_name
        message = record.getMessage()
        if message.startswith("- {"):
            json_message = json.loads(message[2:])
        elif message.startswith("{"):
            json_message = json.loads(message)
        else:
            json_message = None
        if json_message:
            for k, v in list(json_message.items()):
                setattr(record, k, v)

        return True

MultiLineFormatter

Bases: Formatter

logfile formatter: add identation for multi-line entries.

Source code in cgatcore/experiment.py
class MultiLineFormatter(logging.Formatter):
    '''logfile formatter: add identation for multi-line entries.'''

    def format(self, record):

        s = logging.Formatter.format(self, record)
        if s.startswith("#"):
            prefix = "#"
        else:
            prefix = ""
        if record.message:
            header, footer = s.split(record.message)
            s = s.replace("\n", " \\\n%s" % prefix + " " * (len(header) - 1))
        return s

SGEExecutor

Bases: BaseExecutor

Source code in cgatcore/pipeline/executors.py
class SGEExecutor(BaseExecutor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.logger = logging.getLogger(__name__)
        self.task_name = "sge_task"
        self.default_total_time = 8

    def run(self, statement_list):
        benchmark_data = []
        for statement in statement_list:
            self.logger.info(f"Running statement on SGE: {statement}")

            full_statement, job_path = self.build_job_script(statement)

            # Build the SGE job submission command
            sge_command = f"qsub -N {self.config.get('job_name', 'default_job')} -cwd -o {job_path}.o -e {job_path}.e {job_path}"

            process = subprocess.run(sge_command, shell=True, capture_output=True, text=True)

            if process.returncode != 0:
                self.logger.error(f"SGE job submission failed: {process.stderr}")
                raise RuntimeError(f"SGE job submission failed: {process.stderr}")

            self.logger.info(f"SGE job submitted: {process.stdout.strip()}")

            # Monitor job completion
            self.monitor_job_completion(process.stdout.strip())

            benchmark_data.append(self.collect_benchmark_data([statement], resource_usage=[]))

        return benchmark_data

    def build_job_script(self, statement):
        """Custom build job script for SGE."""
        return super().build_job_script(statement)

    def monitor_job_completion(self, job_id):
        """Monitor the completion of an SGE job.

        Args:
            job_id (str): The SGE job ID to monitor.

        Raises:
            RuntimeError: If the job fails or times out.
        """
        while True:
            # Use qstat to get job status
            cmd = f"qstat -j {job_id}"
            process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

            if process.returncode != 0:
                # Job not found in qstat could mean it's completed
                # Use qacct to get final status
                cmd = f"qacct -j {job_id}"
                process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

                if "exit_status" in process.stdout:
                    exit_status = process.stdout.split("exit_status")[1].split()[0]
                    if exit_status == "0":
                        self.logger.info(f"Job {job_id} completed successfully")
                        break
                    else:
                        self.logger.error(f"Job {job_id} failed with exit status: {exit_status}")
                        raise RuntimeError(f"Job {job_id} failed with exit status: {exit_status}")

                self.logger.error(f"Failed to get job status: {process.stderr}")
                raise RuntimeError(f"Failed to get job status: {process.stderr}")

            # Wait before checking again
            time.sleep(10)

    def collect_benchmark_data(self, statements, resource_usage=None):
        """Collect benchmark data for SGE jobs.

        Args:
            statements (list): List of executed statements
            resource_usage (list, optional): Resource usage data

        Returns:
            dict: Benchmark data including task name and execution time
        """
        return {
            "task": self.task_name,
            "total_t": self.default_total_time,
            "statements": statements,
            "resource_usage": resource_usage or []
        }

build_job_script(statement)

Custom build job script for SGE.

Source code in cgatcore/pipeline/executors.py
def build_job_script(self, statement):
    """Custom build job script for SGE."""
    return super().build_job_script(statement)

collect_benchmark_data(statements, resource_usage=None)

Collect benchmark data for SGE jobs.

Parameters:

Name Type Description Default
statements list

List of executed statements

required
resource_usage list

Resource usage data

None

Returns:

Name Type Description
dict

Benchmark data including task name and execution time

Source code in cgatcore/pipeline/executors.py
def collect_benchmark_data(self, statements, resource_usage=None):
    """Collect benchmark data for SGE jobs.

    Args:
        statements (list): List of executed statements
        resource_usage (list, optional): Resource usage data

    Returns:
        dict: Benchmark data including task name and execution time
    """
    return {
        "task": self.task_name,
        "total_t": self.default_total_time,
        "statements": statements,
        "resource_usage": resource_usage or []
    }

monitor_job_completion(job_id)

Monitor the completion of an SGE job.

Parameters:

Name Type Description Default
job_id str

The SGE job ID to monitor.

required

Raises:

Type Description
RuntimeError

If the job fails or times out.

Source code in cgatcore/pipeline/executors.py
def monitor_job_completion(self, job_id):
    """Monitor the completion of an SGE job.

    Args:
        job_id (str): The SGE job ID to monitor.

    Raises:
        RuntimeError: If the job fails or times out.
    """
    while True:
        # Use qstat to get job status
        cmd = f"qstat -j {job_id}"
        process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        if process.returncode != 0:
            # Job not found in qstat could mean it's completed
            # Use qacct to get final status
            cmd = f"qacct -j {job_id}"
            process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

            if "exit_status" in process.stdout:
                exit_status = process.stdout.split("exit_status")[1].split()[0]
                if exit_status == "0":
                    self.logger.info(f"Job {job_id} completed successfully")
                    break
                else:
                    self.logger.error(f"Job {job_id} failed with exit status: {exit_status}")
                    raise RuntimeError(f"Job {job_id} failed with exit status: {exit_status}")

            self.logger.error(f"Failed to get job status: {process.stderr}")
            raise RuntimeError(f"Failed to get job status: {process.stderr}")

        # Wait before checking again
        time.sleep(10)

SlurmExecutor

Bases: BaseExecutor

Executor for running jobs on Slurm cluster.

Source code in cgatcore/pipeline/executors.py
class SlurmExecutor(BaseExecutor):
    """Executor for running jobs on Slurm cluster."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.logger = logging.getLogger(__name__)
        self.task_name = "slurm_task"
        self.default_total_time = 10

    def run(self, statement_list):
        benchmark_data = []
        for statement in statement_list:
            self.logger.info(f"Running statement on Slurm: {statement}")

            full_statement, job_path = self.build_job_script(statement)

            # Build the Slurm job submission command
            slurm_command = f"sbatch --job-name={self.config.get('job_name', 'default_job')} --output={job_path}.o --error={job_path}.e {job_path}"

            process = subprocess.run(slurm_command, shell=True, capture_output=True, text=True)

            if process.returncode != 0:
                self.logger.error(f"Slurm job submission failed: {process.stderr}")
                raise RuntimeError(f"Slurm job submission failed: {process.stderr}")

            job_id = process.stdout.strip()
            self.logger.info(f"Slurm job submitted with ID: {job_id}")

            # Monitor job completion
            self.monitor_job_completion(job_id)

            benchmark_data.append(self.collect_benchmark_data([statement], resource_usage=[]))

        return benchmark_data

    def build_job_script(self, statement):
        """Custom build job script for Slurm."""
        return super().build_job_script(statement)

    def monitor_job_completion(self, job_id):
        """Monitor the completion of a Slurm job.

        Args:
            job_id (str): The Slurm job ID to monitor.

        Raises:
            RuntimeError: If the job fails or times out.
        """
        while True:
            # Use sacct to get job status
            cmd = f"sacct -j {job_id} --format=State --noheader --parsable2"
            process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

            if process.returncode != 0:
                self.logger.error(f"Failed to get job status: {process.stderr}")
                raise RuntimeError(f"Failed to get job status: {process.stderr}")

            status = process.stdout.strip()

            # Check job status
            if status in ["COMPLETED", "COMPLETED+"]:
                self.logger.info(f"Job {job_id} completed successfully")
                break
            elif status in ["FAILED", "TIMEOUT", "CANCELLED", "NODE_FAIL"]:
                self.logger.error(f"Job {job_id} failed with status: {status}")
                raise RuntimeError(f"Job {job_id} failed with status: {status}")

            # Wait before checking again
            time.sleep(10)

    def collect_benchmark_data(self, statements, resource_usage=None):
        """Collect benchmark data for Slurm jobs.

        Args:
            statements (list): List of executed statements
            resource_usage (list, optional): Resource usage data

        Returns:
            dict: Benchmark data including task name and execution time
        """
        return {
            "task": self.task_name,
            "total_t": self.default_total_time,
            "statements": statements,
            "resource_usage": resource_usage or []
        }

build_job_script(statement)

Custom build job script for Slurm.

Source code in cgatcore/pipeline/executors.py
def build_job_script(self, statement):
    """Custom build job script for Slurm."""
    return super().build_job_script(statement)

collect_benchmark_data(statements, resource_usage=None)

Collect benchmark data for Slurm jobs.

Parameters:

Name Type Description Default
statements list

List of executed statements

required
resource_usage list

Resource usage data

None

Returns:

Name Type Description
dict

Benchmark data including task name and execution time

Source code in cgatcore/pipeline/executors.py
def collect_benchmark_data(self, statements, resource_usage=None):
    """Collect benchmark data for Slurm jobs.

    Args:
        statements (list): List of executed statements
        resource_usage (list, optional): Resource usage data

    Returns:
        dict: Benchmark data including task name and execution time
    """
    return {
        "task": self.task_name,
        "total_t": self.default_total_time,
        "statements": statements,
        "resource_usage": resource_usage or []
    }

monitor_job_completion(job_id)

Monitor the completion of a Slurm job.

Parameters:

Name Type Description Default
job_id str

The Slurm job ID to monitor.

required

Raises:

Type Description
RuntimeError

If the job fails or times out.

Source code in cgatcore/pipeline/executors.py
def monitor_job_completion(self, job_id):
    """Monitor the completion of a Slurm job.

    Args:
        job_id (str): The Slurm job ID to monitor.

    Raises:
        RuntimeError: If the job fails or times out.
    """
    while True:
        # Use sacct to get job status
        cmd = f"sacct -j {job_id} --format=State --noheader --parsable2"
        process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        if process.returncode != 0:
            self.logger.error(f"Failed to get job status: {process.stderr}")
            raise RuntimeError(f"Failed to get job status: {process.stderr}")

        status = process.stdout.strip()

        # Check job status
        if status in ["COMPLETED", "COMPLETED+"]:
            self.logger.info(f"Job {job_id} completed successfully")
            break
        elif status in ["FAILED", "TIMEOUT", "CANCELLED", "NODE_FAIL"]:
            self.logger.error(f"Job {job_id} failed with status: {status}")
            raise RuntimeError(f"Job {job_id} failed with status: {status}")

        # Wait before checking again
        time.sleep(10)

TorqueExecutor

Bases: BaseExecutor

Executor for running jobs on Torque cluster.

Source code in cgatcore/pipeline/executors.py
class TorqueExecutor(BaseExecutor):
    """Executor for running jobs on Torque cluster."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.logger = logging.getLogger(__name__)
        self.task_name = "torque_task"
        self.default_total_time = 7

    def run(self, statement_list):
        benchmark_data = []
        for statement in statement_list:
            self.logger.info(f"Running statement on Torque: {statement}")

            full_statement, job_path = self.build_job_script(statement)

            # Build the Torque job submission command
            torque_command = f"qsub -N {self.config.get('job_name', 'default_job')} -o {job_path}.o -e {job_path}.e {job_path}"

            process = subprocess.run(torque_command, shell=True, capture_output=True, text=True)

            if process.returncode != 0:
                self.logger.error(f"Torque job submission failed: {process.stderr}")
                raise RuntimeError(f"Torque job submission failed: {process.stderr}")

            job_id = process.stdout.strip()
            self.logger.info(f"Torque job submitted with ID: {job_id}")

            # Monitor job completion
            self.monitor_job_completion(job_id)

            benchmark_data.append(self.collect_benchmark_data([statement], resource_usage=[]))

        return benchmark_data

    def build_job_script(self, statement):
        """Custom build job script for Torque."""
        return super().build_job_script(statement)

    def monitor_job_completion(self, job_id):
        """Monitor the completion of a Torque job.

        Args:
            job_id (str): The Torque job ID to monitor.

        Raises:
            RuntimeError: If the job fails or times out.
        """
        while True:
            # Use qstat to get job status
            cmd = f"qstat -f {job_id}"
            process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

            if process.returncode != 0:
                # Job not found in qstat could mean it's completed
                # Use tracejob to get final status
                cmd = f"tracejob {job_id}"
                process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

                if "Exit_status=" in process.stdout:
                    if "Exit_status=0" in process.stdout:
                        self.logger.info(f"Job {job_id} completed successfully")
                        break
                    else:
                        status = process.stdout.split("Exit_status=")[1].split()[0]
                        self.logger.error(f"Job {job_id} failed with exit status: {status}")
                        raise RuntimeError(f"Job {job_id} failed with exit status: {status}")

                self.logger.error(f"Failed to get job status: {process.stderr}")
                raise RuntimeError(f"Failed to get job status: {process.stderr}")

            # Wait before checking again
            time.sleep(10)

    def collect_benchmark_data(self, statements, resource_usage=None):
        """Collect benchmark data for Torque jobs.

        Args:
            statements (list): List of executed statements
            resource_usage (list, optional): Resource usage data

        Returns:
            dict: Benchmark data including task name and execution time
        """
        return {
            "task": self.task_name,
            "total_t": self.default_total_time,
            "statements": statements,
            "resource_usage": resource_usage or []
        }

build_job_script(statement)

Custom build job script for Torque.

Source code in cgatcore/pipeline/executors.py
def build_job_script(self, statement):
    """Custom build job script for Torque."""
    return super().build_job_script(statement)

collect_benchmark_data(statements, resource_usage=None)

Collect benchmark data for Torque jobs.

Parameters:

Name Type Description Default
statements list

List of executed statements

required
resource_usage list

Resource usage data

None

Returns:

Name Type Description
dict

Benchmark data including task name and execution time

Source code in cgatcore/pipeline/executors.py
def collect_benchmark_data(self, statements, resource_usage=None):
    """Collect benchmark data for Torque jobs.

    Args:
        statements (list): List of executed statements
        resource_usage (list, optional): Resource usage data

    Returns:
        dict: Benchmark data including task name and execution time
    """
    return {
        "task": self.task_name,
        "total_t": self.default_total_time,
        "statements": statements,
        "resource_usage": resource_usage or []
    }

monitor_job_completion(job_id)

Monitor the completion of a Torque job.

Parameters:

Name Type Description Default
job_id str

The Torque job ID to monitor.

required

Raises:

Type Description
RuntimeError

If the job fails or times out.

Source code in cgatcore/pipeline/executors.py
def monitor_job_completion(self, job_id):
    """Monitor the completion of a Torque job.

    Args:
        job_id (str): The Torque job ID to monitor.

    Raises:
        RuntimeError: If the job fails or times out.
    """
    while True:
        # Use qstat to get job status
        cmd = f"qstat -f {job_id}"
        process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        if process.returncode != 0:
            # Job not found in qstat could mean it's completed
            # Use tracejob to get final status
            cmd = f"tracejob {job_id}"
            process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

            if "Exit_status=" in process.stdout:
                if "Exit_status=0" in process.stdout:
                    self.logger.info(f"Job {job_id} completed successfully")
                    break
                else:
                    status = process.stdout.split("Exit_status=")[1].split()[0]
                    self.logger.error(f"Job {job_id} failed with exit status: {status}")
                    raise RuntimeError(f"Job {job_id} failed with exit status: {status}")

            self.logger.error(f"Failed to get job status: {process.stderr}")
            raise RuntimeError(f"Failed to get job status: {process.stderr}")

        # Wait before checking again
        time.sleep(10)

add_doc(value, replace=False)

add doc string of value to function that is decorated.

The original doc-string is added as the first paragraph(s) inside the new doc-string.

Parameter

bool

If True, replace documentation rather than appending

Source code in cgatcore/pipeline/utils.py
def add_doc(value, replace=False):
    """add doc string of value to function that is decorated.

    The original doc-string is added as the first paragraph(s)
    inside the new doc-string.

    Parameter
    ---------

    replace : bool
       If True, replace documentation rather than appending
    """
    def _doc(func):
        if func.__doc__:
            lines = value.__doc__.split("\n")
            for x, line in enumerate(lines):
                if line.strip() == "":
                    break
            # insert appropriate indentiation
            # currently hard-coded, can be derived
            # from doc string?
            if not replace:
                lines.insert(x + 1, " " * 4 + func.__doc__)
                func.__doc__ = "\n".join(lines)
            else:
                func.__doc__ = value.__doc__
        else:
            func.__doc__ = value.__doc__
        return func
    return _doc

as_list(value)

return a value as a list.

If the value is a string and contains a ,, the string will be split at ,.

Returns

list

Source code in cgatcore/pipeline/parameters.py
def as_list(value):
    '''return a value as a list.

    If the value is a string and contains a ``,``, the string will
    be split at ``,``.

    Returns
    -------
    list

    '''
    if isinstance(value, str):
        try:
            values = [x.strip() for x in value.strip().split(",")]
        except AttributeError:
            values = [value.strip()]
        return [x for x in values if x != ""]
    elif type(value) in (list, tuple):
        return value
    else:
        return [value]

build_load_statement(tablename, retry=True, options='')

build a command line statement to upload data.

Upload is performed via the :doc:csv2db script.

The returned statement is suitable to use in pipe expression. This method is aware of the configuration values for database access and the chosen database backend.

For example::

load_statement = P.build_load_statement("data")
statement = "cat data.txt | %(load_statement)s"
P.run(statement)

Arguments

tablename : string Tablename for upload retry : bool Add the --retry option to csv2db.py options : string Command line options to be passed on to csv2db.py

Returns

string

Source code in cgatcore/pipeline/database.py
def build_load_statement(tablename, retry=True, options=""):
    """build a command line statement to upload data.

    Upload is performed via the :doc:`csv2db` script.

    The returned statement is suitable to use in pipe expression.
    This method is aware of the configuration values for database
    access and the chosen database backend.

    For example::

        load_statement = P.build_load_statement("data")
        statement = "cat data.txt | %(load_statement)s"
        P.run(statement)

    Arguments
    ---------
    tablename : string
        Tablename for upload
    retry : bool
        Add the ``--retry`` option to `csv2db.py`
    options : string
        Command line options to be passed on to `csv2db.py`

    Returns
    -------
    string

    """

    opts = []

    if retry:
        opts.append(" --retry ")

    params = get_params()
    opts.append("--database-url={}".format(params["database"]["url"]))

    db_options = " ".join(opts)
    load_statement = (
        "python -m cgatcore.csv2db {db_options} {options} --table={tablename}".format(**locals()))

    return load_statement

check_executables(filenames)

check for the presence/absence of executables

Source code in cgatcore/pipeline/files.py
def check_executables(filenames):
    """check for the presence/absence of executables"""

    missing = []

    for filename in filenames:
        if not iotools.which(filename):
            missing.append(filename)

    if missing:
        raise ValueError("missing executables: %s" % ",".join(missing))

check_parameter(param)

check if parameter key is set

Source code in cgatcore/pipeline/parameters.py
def check_parameter(param):
    """check if parameter ``key`` is set"""
    if param not in PARAMS:
        raise ValueError("need `%s` to be set" % param)

check_scripts(filenames)

check for the presence/absence of scripts

Source code in cgatcore/pipeline/files.py
def check_scripts(filenames):
    """check for the presence/absence of scripts"""
    missing = []
    for filename in filenames:
        if not os.path.exists(filename):
            missing.append(filename)

    if missing:
        raise ValueError("missing scripts: %s" % ",".join(missing))

clean(files, logfile)

clean up files given by glob expressions.

Files are cleaned up by zapping, i.e. the files are set to size 0. Links to files are replaced with place-holders.

Information about the original file is written to logfile.

Arguments

files : list List of glob expressions of files to clean up. logfile : string Filename of logfile.

Source code in cgatcore/pipeline/control.py
def clean(files, logfile):
    '''clean up files given by glob expressions.

    Files are cleaned up by zapping, i.e. the files are set to size
    0. Links to files are replaced with place-holders.

    Information about the original file is written to `logfile`.

    Arguments
    ---------
    files : list
        List of glob expressions of files to clean up.
    logfile : string
        Filename of logfile.

    '''
    fields = ('st_atime', 'st_blksize', 'st_blocks',
              'st_ctime', 'st_dev', 'st_gid', 'st_ino',
              'st_mode', 'st_mtime', 'st_nlink',
              'st_rdev', 'st_size', 'st_uid')

    dry_run = get_params().get("dryrun", False)

    if not dry_run:
        if not os.path.exists(logfile):
            outfile = iotools.open_file(logfile, "w")
            outfile.write("filename\tzapped\tlinkdest\t%s\n" %
                          "\t".join(fields))
        else:
            outfile = iotools.open_file(logfile, "a")

    c = E.Counter()
    for fn in files:
        c.files += 1
        if not dry_run:
            stat, linkdest = iotools.zap_file(fn)
            if stat is not None:
                c.zapped += 1
                if linkdest is not None:
                    c.links += 1
                outfile.write("%s\t%s\t%s\t%s\n" % (
                    fn,
                    time.asctime(time.localtime(time.time())),
                    linkdest,
                    "\t".join([str(getattr(stat, x)) for x in fields])))

    get_logger().info("zapped: %s" % (c))
    outfile.close()

    return c

clone_pipeline(srcdir, destdir=None)

clone a pipeline.

Cloning entails creating a mirror of the source pipeline. Generally, data files are mirrored by linking. Configuration files and the pipeline database will be copied.

Without modification of any files, building the cloned pipeline in destdir should not re-run any commands. However, on deleting selected files, the pipeline should run from the appropriate point. Newly created files will not affect the original pipeline.

Cloning pipelines permits sharing partial results between pipelines, for example for parameter optimization.

Arguments

scrdir : string Source directory destdir : string Destination directory. If None, use the current directory.

Source code in cgatcore/pipeline/control.py
def clone_pipeline(srcdir, destdir=None):
    '''clone a pipeline.

    Cloning entails creating a mirror of the source pipeline.
    Generally, data files are mirrored by linking. Configuration
    files and the pipeline database will be copied.

    Without modification of any files, building the cloned pipeline in
    `destdir` should not re-run any commands. However, on deleting
    selected files, the pipeline should run from the appropriate
    point.  Newly created files will not affect the original pipeline.

    Cloning pipelines permits sharing partial results between
    pipelines, for example for parameter optimization.

    Arguments
    ---------
    scrdir : string
        Source directory
    destdir : string
        Destination directory. If None, use the current directory.

    '''

    if destdir is None:
        destdir = os.path.curdir

    get_logger().info("cloning pipeline from %s to %s" % (srcdir, destdir))

    copy_files = ("conf.py", "pipeline.yml", "benchmark.yml", "csvdb")
    ignore_prefix = (
        "report", "_cache", "export", "tmp", "ctmp",
        "_static", "_templates", "shell.log", "pipeline.log",
        "results.commit")

    def _ignore(p):
        for x in ignore_prefix:
            if p.startswith(x):
                return True
        return False

    for root, dirs, files in os.walk(srcdir):

        relpath = os.path.relpath(root, srcdir)
        if _ignore(relpath):
            continue

        for d in dirs:
            if _ignore(d):
                continue
            dest = os.path.join(os.path.join(destdir, relpath, d))
            os.mkdir(dest)
            # touch
            s = os.stat(os.path.join(root, d))
            os.utime(dest, (s.st_atime, s.st_mtime))

        for f in files:
            if _ignore(f):
                continue

            fn = os.path.join(root, f)
            dest_fn = os.path.join(destdir, relpath, f)
            if f in copy_files:
                shutil.copyfile(fn, dest_fn)
            else:
                # realpath resolves links - thus links will be linked to
                # the original target
                os.symlink(os.path.realpath(fn),
                           dest_fn)

close_session()

close the global DRMAA session.

Source code in cgatcore/pipeline/execution.py
def close_session():
    """close the global DRMAA session."""
    global GLOBAL_SESSION

    if GLOBAL_SESSION is not None:
        GLOBAL_SESSION.exit()
        GLOBAL_SESSION = None

cluster_runnable(func)

A dectorator that allows a function to be run on the cluster.

The decorated function now takes extra arguments. The most important is submit. If set to true, it will submit the function to the cluster via the pipeline.submit framework. Arguments to the function are pickled, so this will only work if arguments are picklable. Other arguments to submit are also accepted.

Note that this allows the unusal combination of submit false, and to_cluster true. This will submit the function as an external job, but run it on the local machine.

Note: all arguments in the decorated function must be passed as key-word arguments.

Source code in cgatcore/pipeline/execution.py
def cluster_runnable(func):
    '''A dectorator that allows a function to be run on the cluster.

    The decorated function now takes extra arguments. The most important
    is *submit*. If set to true, it will submit the function to the cluster
    via the pipeline.submit framework. Arguments to the function are
    pickled, so this will only work if arguments are picklable. Other
    arguments to submit are also accepted.

    Note that this allows the unusal combination of *submit* false,
    and *to_cluster* true. This will submit the function as an external
    job, but run it on the local machine.

    Note: all arguments in the decorated function must be passed as
    key-word arguments.
    '''

    # MM: when decorating functions with cluster_runnable, provide
    # them as kwargs, else will throw attribute error

    function_name = func.__name__

    def submit_function(*args, **kwargs):

        if "submit" in kwargs and kwargs["submit"]:
            del kwargs["submit"]
            submit_args, args_file = _pickle_args(args, kwargs)
            module_file = os.path.abspath(
                sys.modules[func.__module__].__file__)
            submit(iotools.snip(__file__),
                   "run_pickled",
                   args=[iotools.snip(module_file), function_name, args_file],
                   **submit_args)
        else:
            # remove job contral options before running function
            for x in ("submit", "job_options", "job_queue"):
                if x in kwargs:
                    del kwargs[x]
            return func(*args, **kwargs)

    return submit_function

concatenate_and_load(infiles, outfile, regex_filename=None, header=None, cat='track', has_titles=True, missing_value='na', retry=True, tablename=None, options='', job_memory=None, to_cluster=True)

concatenate multiple tab-separated files and upload into database.

The table name is given by outfile without the ".load" suffix.

A typical concatenate and load task in ruffus would look like this::

@merge("*.tsv.gz", ".load")
def loadData(infile, outfile):
    P.concatenateAndLoad(infiles, outfile)

Upload is performed via the :doc:csv2db script.

Arguments

infiles : list Filenames of the input data outfile : string Output filename. This will contain the logging information. The table name is derived from outfile. regex_filename : string If given, regex_filename is applied to the filename to extract the track name. If the pattern contains multiple groups, they are added as additional columns. For example, if cat is set to track,method and regex_filename is (.*)_(.*).tsv.gz it will add the columns track and method to the table. header : string Comma-separated list of values for header. cat : string Column title for column containing the track name. The track name is derived from the filename, see regex_filename. has_titles : bool If True, files are expected to have column titles in their first row. missing_value : string String to use for missing values. retry : bool If True, multiple attempts will be made if the data can not be loaded at the first try, for example if a table is locked. tablename: string Name to use for table. If unset derive from outfile. options : string Command line options for the csv2db.py script. job_memory : string Amount of memory to allocate for job. If unset, uses the global default. Implies to_cluster=True. to_cluster : bool By default load jobs are not submitted to the cluster as they sometimes become blocked. Setting this true will override this behavoir.

Source code in cgatcore/pipeline/database.py
def concatenate_and_load(infiles,
                         outfile,
                         regex_filename=None,
                         header=None,
                         cat="track",
                         has_titles=True,
                         missing_value="na",
                         retry=True,
                         tablename=None,
                         options="",
                         job_memory=None,
                         to_cluster=True):
    """concatenate multiple tab-separated files and upload into database.

    The table name is given by outfile without the
    ".load" suffix.

    A typical concatenate and load task in ruffus would look like this::

        @merge("*.tsv.gz", ".load")
        def loadData(infile, outfile):
            P.concatenateAndLoad(infiles, outfile)

    Upload is performed via the :doc:`csv2db` script.

    Arguments
    ---------
    infiles : list
        Filenames of the input data
    outfile : string
        Output filename. This will contain the logging information. The
        table name is derived from `outfile`.
    regex_filename : string
        If given, *regex_filename* is applied to the filename to extract
        the track name. If the pattern contains multiple groups, they are
        added as additional columns. For example, if `cat` is set to
        ``track,method`` and `regex_filename` is ``(.*)_(.*).tsv.gz``
        it will add the columns ``track`` and method to the table.
    header : string
        Comma-separated list of values for header.
    cat : string
        Column title for column containing the track name. The track name
        is derived from the filename, see `regex_filename`.
    has_titles : bool
        If True, files are expected to have column titles in their first row.
    missing_value : string
        String to use for missing values.
    retry : bool
        If True, multiple attempts will be made if the data can
        not be loaded at the first try, for example if a table is locked.
    tablename: string
        Name to use for table. If unset derive from outfile.
    options : string
        Command line options for the `csv2db.py` script.
    job_memory : string
        Amount of memory to allocate for job. If unset, uses the global
        default. Implies to_cluster=True.
    to_cluster : bool
        By default load jobs are not submitted to the cluster as they sometimes
        become blocked. Setting this true will override this behavoir.
    """
    if job_memory is None:
        job_memory = get_params()["cluster_memory_default"]

    if tablename is None:
        tablename = to_table(outfile)

    infiles = " ".join(infiles)

    passed_options = options
    load_options, cat_options = ["--add-index=track"], []

    if regex_filename:
        cat_options.append("--regex-filename='%s'" % regex_filename)

    if header:
        load_options.append("--header-names=%s" % header)

    if not has_titles:
        cat_options.append("--no-titles")

    cat_options = " ".join(cat_options)
    load_options = " ".join(load_options) + " " + passed_options

    load_statement = build_load_statement(tablename,
                                          options=load_options,
                                          retry=retry)

    statement = '''python -m cgatcore.tables
    --cat=%(cat)s
    --missing-value=%(missing_value)s
    %(cat_options)s
    %(infiles)s
    | %(load_statement)s
    > %(outfile)s'''

    run(statement)

config_to_dictionary(config)

convert the contents of a :py:class:ConfigParser.ConfigParser object to a dictionary

This method works by iterating over all configuration values in a :py:class:ConfigParser.ConfigParser object and inserting values into a dictionary. Section names are prefixed using and underscore. Thus::

[sample]
name=12

is entered as sample_name=12 into the dictionary. The sections general and DEFAULT are treated specially in that both the prefixed and the unprefixed values are inserted: ::

[general] genome=hg19

will be added as general_genome=hg19 and genome=hg19.

Numbers will be automatically recognized as such and converted into integers or floats.

Returns

config : dict A dictionary of configuration values

Source code in cgatcore/pipeline/parameters.py
def config_to_dictionary(config):
    """convert the contents of a :py:class:`ConfigParser.ConfigParser`
    object to a dictionary

    This method works by iterating over all configuration values in a
    :py:class:`ConfigParser.ConfigParser` object and inserting values
    into a dictionary. Section names are prefixed using and underscore.
    Thus::

        [sample]
        name=12

    is entered as ``sample_name=12`` into the dictionary. The sections
    ``general`` and ``DEFAULT`` are treated specially in that both
    the prefixed and the unprefixed values are inserted: ::

       [general]
       genome=hg19

    will be added as ``general_genome=hg19`` and ``genome=hg19``.

    Numbers will be automatically recognized as such and converted into
    integers or floats.

    Returns
    -------
    config : dict
        A dictionary of configuration values

    """
    p = defaultdict(lambda: defaultdict(TriggeredDefaultFactory()))
    for section in config.sections():
        for key, value in config.items(section):
            try:
                v = iotools.str2val(value)
            except TypeError:
                E.error("error converting key %s, value %s" % (key, value))
                E.error("Possible multiple concurrent attempts to "
                        "read configuration")
                raise

            p["%s_%s" % (section, key)] = v

            # IMS: new heirarchical format
            try:
                p[section][key] = v
            except TypeError:
                # fails with things like genome_dir=abc
                # if [genome] does not exist.
                continue

            if section in ("general", "DEFAULT"):
                p["%s" % (key)] = v

    for key, value in config.defaults().items():
        p["%s" % (key)] = iotools.str2val(value)

    return p

connect()

connect to SQLite database used in this pipeline.

.. note:: This method is currently only implemented for sqlite databases. It needs refactoring for generic access. Alternatively, use an full or partial ORM.

If annotations_database is in params, this method will attach the named database as annotations.

Returns

dbh a database handle

Source code in cgatcore/pipeline/database.py
def connect():
    """connect to SQLite database used in this pipeline.

    .. note::
       This method is currently only implemented for sqlite
       databases. It needs refactoring for generic access.
       Alternatively, use an full or partial ORM.

    If ``annotations_database`` is in params, this method
    will attach the named database as ``annotations``.

    Returns
    -------
    dbh
       a database handle

    """

    # Note that in the future this might return an sqlalchemy or
    # db.py handle.
    url = get_params()["database"]["url"]
    is_sqlite3 = url.startswith("sqlite")

    if is_sqlite3:
        connect_args = {'check_same_thread': False}
    else:
        connect_args = {}

    creator = None
    if is_sqlite3 and "annotations_dir" in get_params():
        # not sure what the correct way is for url
        # sqlite:///./csvdb -> ./csvdb
        # sqlite:////path/to/csvdb -> /path/to/csvdb
        filename = os.path.abspath(url[len("sqlite:///"):])

        def creator():
            conn = sqlite3.connect(filename)
            conn.execute("ATTACH DATABASE '{}' as annotations".format(
                os.path.join(get_params()["annotations_dir"], "csvdb")))
            return conn

    engine = sqlalchemy.create_engine(
        url,
        connect_args=connect_args,
        creator=creator)

    return engine

create_view(dbhandle, tables, tablename, outfile, view_type='TABLE', ignore_duplicates=True)

create a database view for a list of tables.

This method performs a join across multiple tables and stores the result either as a view or a table in the database.

Arguments

dbhandle : A database handle. tables : list of tuples Tables to merge. Each tuple contains the name of a table and the field to join with the first table. For example::

    tables = (
        "reads_summary", "track",
        "bam_stats", "track",
        "context_stats", "track",
        "picard_stats_alignment_summary_metrics", "track")
string

Name of the view or table to be created.

outfile : string Output filename for status information. view_type : string Type of view, either VIEW or TABLE. If a view is to be created across multiple databases, use TABLE. ignore_duplicates : bool If set to False, duplicate column names will be added with the tablename as prefix. The default is to ignore.

Source code in cgatcore/pipeline/database.py
def create_view(dbhandle, tables, tablename, outfile,
                view_type="TABLE",
                ignore_duplicates=True):
    '''create a database view for a list of tables.

    This method performs a join across multiple tables and stores the
    result either as a view or a table in the database.

    Arguments
    ---------
    dbhandle :
        A database handle.
    tables : list of tuples
        Tables to merge. Each tuple contains the name of a table and
        the field to join with the first table. For example::

            tables = (
                "reads_summary", "track",
                "bam_stats", "track",
                "context_stats", "track",
                "picard_stats_alignment_summary_metrics", "track")

    tablename : string
        Name of the view or table to be created.
    outfile : string
        Output filename for status information.
    view_type : string
        Type of view, either ``VIEW`` or ``TABLE``.  If a view is to be
        created across multiple databases, use ``TABLE``.
    ignore_duplicates : bool
        If set to False, duplicate column names will be added with the
        tablename as prefix. The default is to ignore.

    '''

    database.executewait(
        dbhandle,
        "DROP %(view_type)s IF EXISTS %(tablename)s" % locals())

    tracks, columns = [], []
    tablenames = [x[0] for x in tables]
    for table, track in tables:
        d = database.executewait(
            dbhandle,
            "SELECT COUNT(DISTINCT %s) FROM %s" % (track, table))
        tracks.append(d.fetchone()[0])
        columns.append(
            [x.lower() for x in database.getColumnNames(dbhandle, table)
             if x != track])

    E.info("creating %s from the following tables: %s" %
           (tablename, str(list(zip(tablenames, tracks)))))
    if min(tracks) != max(tracks):
        raise ValueError(
            "number of rows not identical - will not create view")

    from_statement = " , ".join(
        ["%s as t%i" % (y[0], x) for x, y in enumerate(tables)])
    f = tables[0][1]
    where_statement = " AND ".join(
        ["t0.%s = t%i.%s" % (f, x + 1, y[1])
         for x, y in enumerate(tables[1:])])

    all_columns, taken = [], set()
    for x, c in enumerate(columns):
        i = set(taken).intersection(set(c))
        if i:
            E.warn("duplicate column names: %s " % i)
            if not ignore_duplicates:
                table = tables[x][0]
                all_columns.extend(
                    ["t%i.%s AS %s_%s" % (x, y, table, y) for y in i])
                c = [y for y in c if y not in i]

        all_columns.extend(["t%i.%s" % (x, y) for y in c])
        taken.update(set(c))

    all_columns = ",".join(all_columns)
    statement = '''
    CREATE %(view_type)s %(tablename)s AS SELECT t0.track, %(all_columns)s
    FROM %(from_statement)s
    WHERE %(where_statement)s
    ''' % locals()
    database.executewait(dbhandle, statement)

    nrows = database.executewait(
        dbhandle, "SELECT COUNT(*) FROM view_mapping").fetchone()[0]

    if nrows == 0:
        raise ValueError(
            "empty view mapping, check statement = %s" %
            (statement % locals()))
    if nrows != min(tracks):
        E.warn("view creates duplicate rows, got %i, expected %i" %
               (nrows, min(tracks)))

    E.info("created view_mapping with %i rows" % nrows)
    touch_file(outfile)

execute(statement, **kwargs)

execute a statement locally.

This method implements the same parameter interpolation as the function :func:run.

Arguments

statement : string Command line statement to run.

Returns

stdout : string Data sent to standard output by command stderr : string Data sent to standard error by command

Source code in cgatcore/pipeline/execution.py
def execute(statement, **kwargs):
    '''execute a statement locally.

    This method implements the same parameter interpolation
    as the function :func:`run`.

    Arguments
    ---------
    statement : string
        Command line statement to run.

    Returns
    -------
    stdout : string
        Data sent to standard output by command
    stderr : string
        Data sent to standard error by command
    '''

    if not kwargs:
        kwargs = get_caller_locals()

    kwargs = dict(list(get_params().items()) + list(kwargs.items()))

    logger = get_logger()
    logger.info("running %s" % (statement % kwargs))

    if "cwd" not in kwargs:
        cwd = get_params()["work_dir"]
    else:
        cwd = kwargs["cwd"]

    # cleaning up of statement
    # remove new lines and superfluous spaces and tabs
    statement = " ".join(re.sub("\t+", " ", statement).split("\n")).strip()
    if statement.endswith(";"):
        statement = statement[:-1]

    # always use bash
    os.environ.update(
        {'BASH_ENV': os.path.join(os.environ['HOME'], '.bashrc')})
    process = subprocess.Popen(statement % kwargs,
                               cwd=cwd,
                               shell=True,
                               stdin=sys.stdin,
                               stdout=sys.stdout,
                               stderr=sys.stderr,
                               env=os.environ.copy(),
                               executable="/bin/bash")

    # process.stdin.close()
    stdout, stderr = process.communicate()

    if process.returncode != 0:
        raise OSError(
            "Child was terminated by signal %i: \n"
            "The stderr was: \n%s\n%s\n" %
            (-process.returncode, stderr, statement))

    return stdout, stderr

file_is_mounted(filename)

return True if filename is mounted.

A file is likely to be mounted if it is located inside a subdirectory of the local scratch directory.

Source code in cgatcore/pipeline/execution.py
def file_is_mounted(filename):
    """return True if filename is mounted.

    A file is likely to be mounted if it is located
    inside a subdirectory of the local scratch directory.
    """
    if get_params()["mount_point"]:
        return os.path.abspath(filename).startswith(get_params()["mount_point"])
    else:
        return False

get_caller(decorators=0)

return the name of the calling class/module

Arguments

decorators : int Number of contexts to go up to reach calling function of interest.

Returns

mod : object The calling module/class

Source code in cgatcore/pipeline/utils.py
def get_caller(decorators=0):
    """return the name of the calling class/module

    Arguments
    ---------
    decorators : int
        Number of contexts to go up to reach calling function
        of interest.

    Returns
    -------
    mod : object
        The calling module/class
    """

    frm = inspect.stack()
    return inspect.getmodule(frm[2 + decorators].frame)

get_caller_locals(decorators=0)

returns the locals of the calling function.

from http://pylab.blogspot.com/2009/02/ python-accessing-caller-locals-from.html

Arguments

decorators : int Number of contexts to go up to reach calling function of interest.

Returns

locals : dict Dictionary of variable defined in the context of the calling function.

Source code in cgatcore/pipeline/utils.py
def get_caller_locals(decorators=0):
    '''returns the locals of the calling function.

    from http://pylab.blogspot.com/2009/02/
         python-accessing-caller-locals-from.html

    Arguments
    ---------
    decorators : int
        Number of contexts to go up to reach calling function
        of interest.

    Returns
    -------
    locals : dict
        Dictionary of variable defined in the context of the
        calling function.
    '''
    f = sys._getframe(2 + decorators)
    args = inspect.getargvalues(f)
    return args[3]

get_calling_function(decorators=0)

return the name of the calling function

Arguments

decorators : int Number of contexts to go up to reach calling function of interest.

Returns

mod : object The calling module

Source code in cgatcore/pipeline/utils.py
def get_calling_function(decorators=0):
    """return the name of the calling function

    Arguments
    ---------
    decorators : int
        Number of contexts to go up to reach calling function
        of interest.

    Returns
    -------
    mod : object
        The calling module
    """

    frm = inspect.stack()
    return frm[2 + decorators].function

get_database_name()

Return the database name associated with the pipeline.

This method lookis in different sections in the ini file to permit both old style database and new style database_name.

This method has been implemented for backwards compatibility.

Returns

databasename : string database name. Returns empty string if not found.

Raises

KeyError If no database name is found

Source code in cgatcore/pipeline/database.py
def get_database_name():
    '''Return the database name associated with the pipeline.

    This method lookis in different sections in the ini file to permit
    both old style ``database`` and new style ``database_name``.

    This method has been implemented for backwards compatibility.

    Returns
    -------
    databasename : string
        database name. Returns empty string if not found.

    Raises
    ------
    KeyError
       If no database name is found

    '''

    locations = ["database_name", "database"]
    params = get_params()
    for location in locations:
        database = params.get(location, None)
        if database is not None:
            return database

    raise KeyError("database name not found")

get_executor(options=None)

Return an executor instance based on the specified queue manager in options.

  • options (dict): Dictionary containing execution options, including "cluster_queue_manager".

Returns: - Executor instance appropriate for the specified queue manager.

Source code in cgatcore/pipeline/execution.py
def get_executor(options=None):
    """
    Return an executor instance based on the specified queue manager in options.

    Parameters:
    - options (dict): Dictionary containing execution options, 
                      including "cluster_queue_manager".

    Returns:
    - Executor instance appropriate for the specified queue manager.
    """
    if options is None:
        options = get_params()

    if options.get("testing", False):
        return LocalExecutor(**options)

    # Check if to_cluster is explicitly set to False
    if not options.get("to_cluster", True):  # Defaults to True if not specified
        return LocalExecutor(**options)

    queue_manager = options.get("cluster_queue_manager", None)

    # Check for KubernetesExecutor
    if queue_manager == "kubernetes" and KubernetesExecutor is not None:
        return KubernetesExecutor(**options)

    # Check for SGEExecutor (Sun Grid Engine)
    elif queue_manager == "sge" and shutil.which("qsub") is not None:
        return SGEExecutor(**options)

    # Check for SlurmExecutor
    elif queue_manager == "slurm" and shutil.which("sbatch") is not None:
        return SlurmExecutor(**options)

    # Check for TorqueExecutor
    elif queue_manager == "torque" and shutil.which("qsub") is not None:
        return TorqueExecutor(**options)

    # Fallback to LocalExecutor, not sure if this should raise an error though, feels like it should
    else:
        return LocalExecutor(**options)

get_header()

return a header string with command line options and timestamp

Source code in cgatcore/experiment.py
def get_header():
    """return a header string with command line options and timestamp

    """
    system, host, release, version, machine = os.uname()

    return "output generated by %s\njob started at %s on %s -- %s\npid: %i, system: %s %s %s %s" %\
           (" ".join(sys.argv),
            time.asctime(time.localtime(time.time())),
            host,
            global_id,
            os.getpid(),
            system, release, version, machine)

get_mounted_location(filename)

return location of filename within mounted directory

Source code in cgatcore/pipeline/execution.py
def get_mounted_location(filename):
    """return location of filename within mounted directory

    """
    return os.path.abspath(filename)[len(get_params()["mount_point"]):]

get_param_output(options=None)

return a string containing script parameters.

Parameters are all variables that start with param_.

Source code in cgatcore/pipeline/control.py
def get_param_output(options=None):
    """return a string containing script parameters.

    Parameters are all variables that start with ``param_``.
    """
    result = []
    if options:
        members = options
        for k, v in sorted(members.items()):
            result.append("%-40s: %s" % (k, str(v)))
    else:
        vars = inspect.currentframe().f_back.f_locals
        for var in [x for x in list(vars.keys()) if re.match("param_", x)]:
            result.append("%-40s: %s" %
                          (var, str(vars[var])))

    if result:
        return "\n".join(result)
    else:
        return "# no parameters."

get_param_section(section)

return config values in section

Sections are built by common prefixes.

Source code in cgatcore/pipeline/parameters.py
def get_param_section(section):
    """return config values in section

    Sections are built by common prefixes.
    """
    if not section.endswith("_"):
        section = section + "_"
    n = len(section)
    return [(x[n:], y) for x, y in PARAMS.items() if x.startswith(section)]

get_parameters(filenames=None, defaults=None, site_ini=True, user=True, only_import=None)

read one or more config files and build global PARAMS configuration dictionary.

Arguments

filenames : list List of filenames of the configuration files to read. defaults : dict Dictionary with default values. These will be overwrite any hard-coded parameters, but will be overwritten by user specified parameters in the configuration files. user : bool If set, configuration files will also be read from a file called :file:.cgat.yml in the users home directory. only_import : bool If set to a boolean, the parameter dictionary will be a defaultcollection. This is useful for pipelines that are imported (for example for documentation generation) but not executed as there might not be an appropriate .yml file available. Ifonly_import` is None, it will be set to the default, which is to raise an exception unless the calling script is imported or the option --is-test has been passed at the command line.

Returns

params : dict Global configuration dictionary.

Source code in cgatcore/pipeline/parameters.py
def get_parameters(filenames=None,
                   defaults=None,
                   site_ini=True,
                   user=True,
                   only_import=None):
    '''read one or more config files and build global PARAMS configuration
    dictionary.

    Arguments
    ---------
    filenames : list
       List of filenames of the configuration files to read.
    defaults : dict
       Dictionary with default values. These will be overwrite
       any hard-coded parameters, but will be overwritten by user
       specified parameters in the configuration files.
    user : bool
       If set, configuration files will also be read from a
       file called :file:`.cgat.yml` in the user`s
       home directory.
    only_import : bool
       If set to a boolean, the parameter dictionary will be a
       defaultcollection. This is useful for pipelines that are
       imported (for example for documentation generation) but not
       executed as there might not be an appropriate .yml file
       available. If `only_import` is None, it will be set to the
       default, which is to raise an exception unless the calling
       script is imported or the option ``--is-test`` has been passed
       at the command line.

    Returns
    -------
    params : dict
       Global configuration dictionary.
    '''
    global PARAMS, HAVE_INITIALIZED
    # only execute function once
    if HAVE_INITIALIZED:
        return PARAMS

    if filenames is None:
        filenames = ["pipeline.yml", "cgat.yml"]
    elif isinstance(filenames, str):
        filenames = [filenames]

    old_id = id(PARAMS)

    caller_locals = get_caller_locals()

    # check if this is only for import
    if only_import is None:
        only_import = is_test() or "__name__" not in caller_locals or \
            caller_locals["__name__"] != "__main__"

    # important: only update the PARAMS variable as
    # it is referenced in other modules. Thus the type
    # needs to be fixed at import. Raise error where this
    # is not the case.
    # Note: Parameter sharing in the pipeline module needs
    # to be reorganized.
    if only_import:
        # turn on default dictionary
        TriggeredDefaultFactory.with_default = True

    # check if the pipeline is in testing mode
    found = False
    if 'argv' in caller_locals and caller_locals['argv'] is not None:
        for e in caller_locals['argv']:
            if 'template_pipeline.py' in e:
                found = True
    PARAMS['testing'] = 'self' in caller_locals or found

    if site_ini:
        # read configuration from /etc/cgat/pipeline.yml
        fn = "/etc/cgat/pipeline.yml"
        if os.path.exists(fn):
            filenames.insert(0, fn)

    if user:
        # read configuration from a users home directory
        fn = os.path.join(os.path.expanduser("~"),
                          ".cgat.yml")
        if os.path.exists(fn):
            if 'pipeline.yml' in filenames:
                filenames.insert(filenames.index('pipeline.yml'), fn)
            else:
                filenames.append(fn)

    filenames = [x.strip() for x in filenames if os.path.exists(x)]

    # save list of config files
    PARAMS["pipeline_yml"] = filenames

    # update with hard-coded PARAMS
    nested_update(PARAMS, HARDCODED_PARAMS)
    if defaults:
        nested_update(PARAMS, defaults)

    # reset working directory. Set in PARAMS to prevent repeated calls to
    # os.getcwd() failing if network is busy
    PARAMS["start_dir"] = os.path.abspath(os.getcwd())
    # location of pipelines - set via location of top frame (cgatflow command)
    if '__file__' in caller_locals:
        PARAMS["pipelinedir"] = os.path.dirname(caller_locals["__file__"])
    else:
        PARAMS["pipelinedir"] = 'unknown'

    for filename in filenames:
        if not os.path.exists(filename):
            continue
        get_logger().info("reading config from file {}".format(
            filename))

        with open(filename, 'rt', encoding='utf8') as inf:
            p = yaml.load(inf, Loader=yaml.FullLoader)
            if p:
                nested_update(PARAMS, p)

    # for backwards compatibility - normalize dictionaries
    p = {}
    for k, v in PARAMS.items():
        if isinstance(v, Mapping):
            for kk, vv in v.items():
                new_key = "{}_{}".format(k, kk)
                if new_key in p:
                    raise ValueError(
                        "key {} does already exist".format(new_key))
                p[new_key] = vv
    nested_update(PARAMS, p)

    # interpolate some params with other parameters
    for param in INTERPOLATE_PARAMS:
        try:
            PARAMS[param] = PARAMS[param] % PARAMS
        except TypeError as msg:
            raise TypeError('could not interpolate %s: %s' %
                            (PARAMS[param], msg))

    # expand directory pathnames
    for param, value in list(PARAMS.items()):
        if (param.endswith("dir") and isinstance(value, str) and value.startswith(".")):
            PARAMS[param] = os.path.abspath(value)

    # make sure that the dictionary reference has not changed
    assert id(PARAMS) == old_id
    HAVE_INITIALIZED = True
    return PARAMS

get_parameters_as_namedtuple(*args, **kwargs)

return PARAM dictionary as a namedtuple.

Source code in cgatcore/pipeline/parameters.py
def get_parameters_as_namedtuple(*args, **kwargs):
    """return PARAM dictionary as a namedtuple.
    """
    d = get_parameters(*args, **kwargs)
    return collections.namedtuple('GenericDict', list(d.keys()))(**d)

get_params()

return handle to global parameter dictionary

Source code in cgatcore/pipeline/parameters.py
def get_params():
    """return handle to global parameter dictionary"""
    return PARAMS

get_s3_pipeline()

Instantiate and return the S3Pipeline instance, lazy-loaded to avoid circular imports.

Source code in cgatcore/pipeline/__init__.py
def get_s3_pipeline():
    """Instantiate and return the S3Pipeline instance, lazy-loaded to avoid circular imports."""
    # Use get_remote() to access the remote functionality
    remote = cgatcore.get_remote()  # Now properly calls the method to initialize remote if needed
    return remote.file_handler.S3Pipeline()

get_temp_dir(dir=None, shared=False, clear=False)

get a temporary directory.

The directory is created and the caller needs to delete the temporary directory once it is not used any more.

If dir does not exist, it will be created.

Arguments

dir : string Directory of the temporary directory and if not given is set to the default temporary location in the global configuration dictionary. shared : bool If set, the tempory directory will be in a shared temporary location.

Returns

filename : string Absolute pathname of temporary file.

Source code in cgatcore/pipeline/files.py
def get_temp_dir(dir=None, shared=False, clear=False):
    '''get a temporary directory.

    The directory is created and the caller needs to delete the temporary
    directory once it is not used any more.

    If dir does not exist, it will be created.

    Arguments
    ---------
    dir : string
        Directory of the temporary directory and if not given is set to the
        default temporary location in the global configuration dictionary.
    shared : bool
        If set, the tempory directory will be in a shared temporary
        location.

    Returns
    -------
    filename : string
        Absolute pathname of temporary file.

    '''
    if dir is None:
        if shared:
            dir = get_params()['shared_tmpdir']
        else:
            dir = get_params()['tmpdir']

    if not os.path.exists(dir):
        os.makedirs(dir)

    tmpdir = tempfile.mkdtemp(dir=dir, prefix="ctmp")
    if clear:
        os.rmdir(tmpdir)
    return tmpdir

get_temp_file(dir=None, shared=False, suffix='', mode='w+', encoding='utf-8')

get a temporary file.

The file is created and the caller needs to close and delete the temporary file once it is not used any more. By default, the file is opened as a text file (mode w+) with encoding utf-8 instead of the default mode w+b used in :class:tempfile.NamedTemporaryFile

If dir does not exist, it will be created.

Arguments

dir : string Directory of the temporary file and if not given is set to the default temporary location in the global configuration dictionary. shared : bool If set, the tempory file will be in a shared temporary location (given by the global configuration directory). suffix : string Filename suffix

Returns

file : File A file object of the temporary file.

Source code in cgatcore/pipeline/files.py
def get_temp_file(dir=None, shared=False, suffix="", mode="w+", encoding="utf-8"):
    '''get a temporary file.

    The file is created and the caller needs to close and delete the
    temporary file once it is not used any more. By default, the file
    is opened as a text file (mode ``w+``) with encoding ``utf-8``
    instead of the default mode ``w+b`` used in
    :class:`tempfile.NamedTemporaryFile`

    If dir does not exist, it will be created.

    Arguments
    ---------
    dir : string
        Directory of the temporary file and if not given is set to the
        default temporary location in the global configuration dictionary.
    shared : bool
        If set, the tempory file will be in a shared temporary
        location (given by the global configuration directory).
    suffix : string
        Filename suffix

    Returns
    -------
    file : File
        A file object of the temporary file.

    '''
    if dir is None:
        if shared:
            dir = get_params()['shared_tmpdir']
        else:
            dir = get_params()['tmpdir']

    if not os.path.exists(dir):
        try:
            os.makedirs(dir)
        except OSError:
            # avoid race condition when several processes try to create
            # temporary directory.
            pass
        if not os.path.exists(dir):
            raise OSError(
                "temporary directory {} could not be created".format(dir))

    return tempfile.NamedTemporaryFile(dir=dir, delete=False, prefix="ctmp",
                                       mode=mode,
                                       encoding=encoding, suffix=suffix)

get_temp_filename(dir=None, shared=False, clear=True, suffix='')

return a temporary filename.

The file is created and the caller needs to delete the temporary file once it is not used any more (unless clear is set`).

If dir does not exist, it will be created.

Arguments

dir : string Directory of the temporary file and if not given is set to the default temporary location in the global configuration dictionary. shared : bool If set, the tempory file will be in a shared temporary location. clear : bool If set, remove the temporary file after creation. suffix : string Filename suffix

Returns

filename : string Absolute pathname of temporary file.

Source code in cgatcore/pipeline/files.py
def get_temp_filename(dir=None, shared=False, clear=True, suffix=""):
    '''return a temporary filename.

    The file is created and the caller needs to delete the temporary
    file once it is not used any more (unless `clear` is set`).

    If dir does not exist, it will be created.

    Arguments
    ---------
    dir : string
        Directory of the temporary file and if not given is set to the
        default temporary location in the global configuration dictionary.
    shared : bool
        If set, the tempory file will be in a shared temporary
        location.
    clear : bool
        If set, remove the temporary file after creation.
    suffix : string
        Filename suffix

    Returns
    -------
    filename : string
        Absolute pathname of temporary file.

    '''
    tmpfile = get_temp_file(dir=dir, shared=shared, suffix=suffix)
    tmpfile.close()
    if clear:
        os.unlink(tmpfile.name)
    return tmpfile.name

initialize(argv=None, caller=None, defaults=None, optparse=True, **kwargs)

setup the pipeline framework.

Arguments

options: object Container for command line arguments. args : list List of command line arguments. defaults : dictionary Dictionary with default values to be added to global parameters dictionary.

Additional keyword arguments will be passed to the :func:~.parse_commandline function to set command-line defaults.

Source code in cgatcore/pipeline/control.py
def initialize(argv=None, caller=None, defaults=None, optparse=True, **kwargs):
    """setup the pipeline framework.

    Arguments
    ---------
    options: object
        Container for command line arguments.
    args : list
        List of command line arguments.
    defaults : dictionary
        Dictionary with default values to be added to global
        parameters dictionary.

    Additional keyword arguments will be passed to the
    :func:`~.parse_commandline` function to set command-line defaults.

    """
    if argv is None:
        argv = sys.argv

    # load default options from config files
    if caller:
        path = os.path.splitext(caller)[0]
    else:
        try:
            path = os.path.splitext(get_caller().__file__)[0]
        except AttributeError as ex:
            path = "unknown"

    parse_commandline(argv, optparse, **kwargs)
    args = E.get_args()
    get_parameters(
        [os.path.join(path, "pipeline.yml"),
         "../pipeline.yml",
         args.config_file],
        defaults=defaults)

    logger = logging.getLogger("cgatcore.pipeline")
    logger.info("started in directory: {}".format(
        get_params().get("start_dir")))

    # At this point, the PARAMS dictionary has already been
    # built. It now needs to be updated with selected command
    # line options as these should always take precedence over
    # configuration files.
    update_params_with_commandline_options(get_params(), args)

    logger.info(get_header())

    logger.info(get_param_output(get_params()))

    code_location, version = get_version()
    logger.info("code location: {}".format(code_location))
    logger.info("code version: {}".format(version))

    logger.info("working directory is: {}".format(
        get_params().get("work_dir")))
    work_dir = get_params().get("work_dir")
    if not os.path.exists(work_dir):
        E.info("working directory {} does not exist - creating".format(work_dir))
        os.makedirs(work_dir)
    logger.info("changing directory to {}".format(work_dir))
    os.chdir(work_dir)

    logger.info("pipeline has been initialized")

    return args

input_validation(PARAMS, pipeline_script='')

Inspects the PARAMS dictionary looking for problematic input values.

So far we just check that:

* all required 3rd party tools are on the PATH

* input parameters are not empty

* input parameters do not contain the "?" character (used as a
  placeholder in different pipelines)

* if the input is a file, check whether it exists and
  is readable
Source code in cgatcore/pipeline/parameters.py
def input_validation(PARAMS, pipeline_script=""):
    '''Inspects the PARAMS dictionary looking for problematic input values.

    So far we just check that:

        * all required 3rd party tools are on the PATH

        * input parameters are not empty

        * input parameters do not contain the "?" character (used as a
          placeholder in different pipelines)

        * if the input is a file, check whether it exists and
          is readable
    '''

    E.info('''input Validation starting''')
    E.info('''checking 3rd party dependencies''')

    # check 3rd party dependencies
    if len(pipeline_script) > 0:
        # this import requires the PYTHONPATH in the following order
        # PYTHONPATH=<src>/CGATpipelines:<src>/cgat
        import scripts.cgat_check_deps as cd
        deps, check_path_failures = cd.checkDepedencies(pipeline_script)
        # print info about dependencies
        if len(deps) == 0:
            E.info('no dependencies found')
        else:
            # print dictionary ordered by value
            for k in sorted(deps, key=deps.get, reverse=True):
                E.info('Program: {0!s} used {1} time(s)'.format(k, deps[k]))
            n_failures = len(check_path_failures)
            if n_failures == 0:
                E.info('All required programs are available on your PATH')
            else:
                E.info('The following programs are not on your PATH')
                for p in check_path_failures:
                    E.info('{0!s}'.format(p))

    # check PARAMS
    num_missing = 0
    num_questions = 0

    E.info('''checking pipeline configuration''')

    for key, value in sorted(PARAMS.items()):

        key = str(key)
        value = str(value)

        # check for missing values
        if value == "":
            E.warn('\n"{}" is empty, is that expected?'.format(key))
            num_missing += 1

        # check for a question mark in the dictironary (indicates
        # that there is a missing input parameter)
        if "?" in value:
            E.warn('\n"{}" is not defined (?), is that expected?'.format(key))
            num_questions += 1

        # validate input files listed in PARAMS
        if (value.startswith("/") or value.endswith(".gz") or value.endswith(".gtf")) and "," not in value:
            if not os.access(value, os.R_OK):
                E.warn('\n"{}": "{}" is not readable'.format(key, value))

    if num_missing or num_questions:
        raise ValueError("pipeline has configuration issues")

interpolate_statement(statement, kwargs)

interpolate command line statement with parameters

The skeleton of the statement should be defined in kwargs. The method then applies string interpolation using a dictionary built from the global configuration dictionary PARAMS, but augmented by kwargs. The latter takes precedence.

Arguments

statement: string Command line statement to be interpolated. kwargs : dict Keyword arguments that are used for parameter interpolation.

Returns

statement : string The command line statement with interpolated parameters.

Raises

KeyError If statement contains unresolved references.

Source code in cgatcore/pipeline/execution.py
def interpolate_statement(statement, kwargs):
    '''interpolate command line statement with parameters

    The skeleton of the statement should be defined in kwargs.  The
    method then applies string interpolation using a dictionary built
    from the global configuration dictionary PARAMS, but augmented by
    `kwargs`. The latter takes precedence.

    Arguments
    ---------
    statement: string
        Command line statement to be interpolated.
    kwargs : dict
        Keyword arguments that are used for parameter interpolation.

    Returns
    -------
    statement : string
        The command line statement with interpolated parameters.

    Raises
    ------
    KeyError
        If ``statement`` contains unresolved references.

    '''

    local_params = substitute_parameters(**kwargs)

    # build the statement
    try:
        statement = statement % local_params
    except KeyError as msg:
        raise KeyError(
            "Error when creating command: could not "
            "find %s in dictionaries" % msg)
    except ValueError as msg:
        raise ValueError(
            "Error when creating command: %s, statement = %s" % (
                msg, statement))

    # cleaning up of statement
    # remove new lines and superfluous spaces and tabs
    statement = " ".join(re.sub("\t+", " ", statement).split("\n")).strip()
    if statement.endswith(";"):
        statement = statement[:-1]

    # mark arvados mount points in statement
    if get_params().get("mount_point", None):
        statement = re.sub(get_params()["mount_point"], "arv=", statement)

    return statement

is_test()

return True if the pipeline is run in a "testing" mode.

This method checks if -is-test has been given as a command line option.

Source code in cgatcore/pipeline/utils.py
def is_test():
    """return True if the pipeline is run in a "testing" mode.

    This method checks if ``-is-test`` has been given as a
    command line option.
    """
    return "--is-test" in sys.argv

is_true(param, **kwargs)

return True if param has a True value.

A parameter is False if it is:

  • not set
  • 0
  • the empty string
  • false or False

Otherwise the value is True.

Arguments

param : string Parameter to be tested kwargs : dict Dictionary of local configuration values. These will be passed to :func:substitute_parameters before evaluating param

Returns

bool

Source code in cgatcore/pipeline/parameters.py
def is_true(param, **kwargs):
    '''return True if param has a True value.

    A parameter is False if it is:

    * not set
    * 0
    * the empty string
    * false or False

    Otherwise the value is True.

    Arguments
    ---------
    param : string
        Parameter to be tested
    kwargs : dict
        Dictionary of local configuration values. These will be passed
        to :func:`substitute_parameters` before evaluating `param`

    Returns
    -------
    bool

    '''
    if kwargs:
        p = substitute_parameters(**kwargs)
    else:
        p = PARAMS
    value = p.get(param, 0)
    return value not in (0, '', 'false', 'False')

join_statements(statements, infile, outfile=None)

join a chain of statements into a single statement.

Each statement contains an @IN@ or a @OUT@ placeholder or both. These will be replaced by the names of successive temporary files.

In the first statement, @IN@ is replaced with infile and, if given, the @OUT@ is replaced by outfile in the last statement.

Arguments

statements : list A list of command line statements. infile : string Filename of the first data set. outfile : string Filename of the target data set.

Returns

last_file : string Filename of last file created, outfile, if given. statement : string A command line statement built from merging the statements cleanup : string A command line statement for cleaning up.

Source code in cgatcore/pipeline/execution.py
def join_statements(statements, infile, outfile=None):
    '''join a chain of statements into a single statement.

    Each statement contains an @IN@ or a @OUT@ placeholder or both.
    These will be replaced by the names of successive temporary files.

    In the first statement, @IN@ is replaced with `infile` and, if given,
    the @OUT@ is replaced by outfile in the last statement.

    Arguments
    ---------
    statements : list
        A list of command line statements.
    infile : string
        Filename of the first data set.
    outfile : string
        Filename of the target data set.

    Returns
    -------
    last_file : string
        Filename of last file created, outfile, if given.
    statement : string
        A command line statement built from merging the statements
    cleanup : string
        A command line statement for cleaning up.

    '''

    prefix = get_temp_filename()

    pattern = "%s_%%i" % prefix

    result = []
    for x, statement in enumerate(statements):
        s = statement
        if x == 0:
            if infile is not None:
                s = re.sub("@IN@", infile, s)
        else:
            s = re.sub("@IN@", pattern % x, s)
            if x > 2:
                s = re.sub("@IN-2@", pattern % (x - 2), s)
            if x > 1:
                s = re.sub("@IN-1@", pattern % (x - 1), s)

        s = re.sub("@OUT@", pattern % (x + 1), s).strip()

        if s.endswith(";"):
            s = s[:-1]
        result.append(s)

    result = "; ".join(result)
    last_file = pattern % (x + 1)
    if outfile:
        result = re.sub(last_file, outfile, result)
        last_file = outfile

    assert prefix != ""
    return last_file, result, "rm -f %s*" % prefix

load(infile, outfile=None, options='', collapse=False, transpose=False, tablename=None, retry=True, limit=0, shuffle=False, job_memory=None, to_cluster=False)

import data from a tab-separated file into database.

The table name is given by outfile without the ".load" suffix.

A typical load task in ruffus would look like this::

@transform("*.tsv.gz", suffix(".tsv.gz"), ".load")
def loadData(infile, outfile):
    P.load(infile, outfile)

Upload is performed via the :doc:csv2db script.

Arguments

infile : string Filename of the input data outfile : string Output filename. This will contain the logging information. The table name is derived from outfile if tablename is not set. options : string Command line options for the csv2db.py script. collapse : string If set, the table will be collapsed before loading. This transforms a data set with two columns where the first column is the row name into a multi-column table. The value of collapse is the value used for missing values. transpose : string If set, the table will be transposed before loading. The first column in the first row will be set to the string within transpose. retry : bool If True, multiple attempts will be made if the data can not be loaded at the first try, for example if a table is locked. limit : int If set, only load the first n lines. shuffle : bool If set, randomize lines before loading. Together with limit this permits loading a sample of rows. job_memory : string Amount of memory to allocate for job. If unset, uses the global default. Implies to_cluster=True. to_cluster : bool By default load jobs are not submitted to the cluster as they sometimes become blocked. Setting this true will override this behavoir.

Source code in cgatcore/pipeline/database.py
def load(infile,
         outfile=None,
         options="",
         collapse=False,
         transpose=False,
         tablename=None,
         retry=True,
         limit=0,
         shuffle=False,
         job_memory=None,
         to_cluster=False):
    """import data from a tab-separated file into database.

    The table name is given by outfile without the
    ".load" suffix.

    A typical load task in ruffus would look like this::

        @transform("*.tsv.gz", suffix(".tsv.gz"), ".load")
        def loadData(infile, outfile):
            P.load(infile, outfile)

    Upload is performed via the :doc:`csv2db` script.

    Arguments
    ---------
    infile : string
        Filename of the input data
    outfile : string
        Output filename. This will contain the logging information. The
        table name is derived from `outfile` if `tablename` is not set.
    options : string
        Command line options for the `csv2db.py` script.
    collapse : string
        If set, the table will be collapsed before loading. This
        transforms a data set with two columns where the first column
        is the row name into a multi-column table.  The value of
        collapse is the value used for missing values.
    transpose : string
        If set, the table will be transposed before loading. The first
        column in the first row will be set to the string within
        transpose.
    retry : bool
        If True, multiple attempts will be made if the data can
        not be loaded at the first try, for example if a table is locked.
    limit : int
        If set, only load the first n lines.
    shuffle : bool
        If set, randomize lines before loading. Together with `limit`
        this permits loading a sample of rows.
    job_memory : string
        Amount of memory to allocate for job. If unset, uses the global
        default. Implies to_cluster=True.
    to_cluster : bool
        By default load jobs are not submitted to the cluster as they sometimes
        become blocked. Setting this true will override this behavoir.
    """

    if job_memory is None:
        job_memory = get_params()["cluster_memory_default"]

    if not tablename:
        tablename = to_table(outfile)

    statement = []

    if infile.endswith(".gz"):
        statement.append("zcat %(infile)s")
    else:
        statement.append("cat %(infile)s")

    if collapse:
        statement.append(
            "python -m cgatcore.table "
            "--log=%(outfile)s.collapse.log "
            "--collapse=%(collapse)s")

    if transpose:
        statement.append(
            "python -m cgatcore.table "
            "--log=%(outfile)s.transpose.log "
            "--transpose "
            "--set-transpose-field=%(transpose)s")

    if shuffle:
        statement.append(
            "python -m cgatcore.table "
            "--log=%(outfile)s.shuffle.log "
            "--method=randomize-rows")

    if limit > 0:
        # use awk to filter in order to avoid a pipeline broken error from head
        statement.append("awk 'NR > %i {exit(0)} {print}'" % (limit + 1))
        # ignore errors from cat or zcat due to broken pipe
        ignore_pipe_errors = True

    statement.append(build_load_statement(tablename,
                                          options=options,
                                          retry=retry))

    statement = " | ".join(statement) + " > %(outfile)s"

    run(statement)

load_from_iterator(outfile, tablename, iterator, columns=None, indices=None)

import data from an iterator into a database.

Arguments

outfile : string Output file name tablename : string Table name iterator : iterator Iterator to import data from. The iterator should yield either list/tuples or dictionaries for each row in the table. columns : list Column names. If not given, the assumption is that iterator will dictionaries and column names are derived from that. indices : list List of column names to add indices on.

Source code in cgatcore/pipeline/database.py
def load_from_iterator(
        outfile,
        tablename,
        iterator,
        columns=None,
        indices=None):
    '''import data from an iterator into a database.

    Arguments
    ---------
    outfile : string
        Output file name
    tablename : string
        Table name
    iterator : iterator
        Iterator to import data from. The iterator should
        yield either list/tuples or dictionaries for each
        row in the table.
    columns : list
        Column names. If not given, the assumption is that
        iterator will dictionaries and column names are derived
        from that.
    indices : list
        List of column names to add indices on.
    '''

    tmpfile = get_temp_file(".")

    if columns:
        keys, values = list(zip(*list(columns.items())))
        tmpfile.write("\t".join(values) + "\n")

    for row in iterator:
        if not columns:
            keys = list(row[0].keys())
            values = keys
            columns = keys
            tmpfile.write("\t".join(values) + "\n")

        tmpfile.write("\t".join(str(row[x]) for x in keys) + "\n")

    tmpfile.close()

    if indices:
        indices = " ".join("--add-index=%s" % x for x in indices)
    else:
        indices = ""

    load(tmpfile.name,
         outfile,
         tablename=tablename,
         options=indices)

    os.unlink(tmpfile.name)

main(argv=None)

command line control function for a pipeline.

This method defines command line options for the pipeline and updates the global configuration dictionary correspondingly.

It then provides a command parser to execute particular tasks using the ruffus pipeline control functions. See the generated command line help for usage.

To use it, add::

import CGAT.pipeline as P

if __name__ == "__main__":
    sys.exit(P.main(sys.argv))

to your pipeline script.

Arguments

args : list List of command line arguments.

Source code in cgatcore/pipeline/control.py
def main(argv=None):
    """command line control function for a pipeline.

    This method defines command line options for the pipeline and
    updates the global configuration dictionary correspondingly.

    It then provides a command parser to execute particular tasks
    using the ruffus pipeline control functions. See the generated
    command line help for usage.

    To use it, add::

        import CGAT.pipeline as P

        if __name__ == "__main__":
            sys.exit(P.main(sys.argv))

    to your pipeline script.

    Arguments
    ---------
    args : list
        List of command line arguments.

    """

    if argv is None:
        argv = sys.argv

    if E.get_args() is None:
        initialize(caller=get_caller().__file__)

    args = E.get_args()

    run_workflow(args)

match_parameter(param)

find an exact match or prefix-match in the global configuration dictionary param.

Arguments

param : string Parameter to search for.

Returns

name : string The full parameter name.

Raises

KeyError if param can't be matched.

Source code in cgatcore/pipeline/parameters.py
def match_parameter(param):
    '''find an exact match or prefix-match in the global
    configuration dictionary param.

    Arguments
    ---------
    param : string
        Parameter to search for.

    Returns
    -------
    name : string
        The full parameter name.

    Raises
    ------
    KeyError if param can't be matched.

    '''
    if param in PARAMS:
        return param

    for key in list(PARAMS.keys()):
        if "%" in key:
            rx = re.compile(re.sub("%", ".*", key))
            if rx.search(param):
                return key

    raise KeyError("parameter '%s' can not be matched in dictionary" %
                   param)

merge_and_load(infiles, outfile, suffix=None, columns=(0, 1), regex=None, row_wise=True, retry=True, options='', prefixes=None)

merge multiple categorical tables and load into a database.

The tables are merged and entered row-wise, i.e, the contents of each file are a row.

For example, the statement::

mergeAndLoad(['file1.txt', 'file2.txt'],
             "test_table.load")

with the two files:: > cat file1.txt Category Result length 12 width 100

> cat file2.txt
Category    Result
length      20
width       50

will be added into table test_table as:: track length width file1 12 100 file2 20 50

If row-wise is set:: mergeAndLoad(['file1.txt', 'file2.txt'], "test_table.load", row_wise=True)

test_table will be transposed and look like this:: track file1 file2 length 12 20 width 20 50

Arguments

infiles : list Filenames of the input data outfile : string Output filename. This will contain the logging information. The table name is derived from outfile. suffix : string If suffix is given, the suffix will be removed from the filenames. columns : list The columns to be taken. By default, the first two columns are taken with the first being the key. Filenames are stored in a track column. Directory names are chopped off. If columns is set to None, all columns will be taken. Here, column names will receive a prefix given by prefixes. If prefixes is None, the filename will be added as a prefix. regex : string If set, the full filename will be used to extract a track name via the supplied regular expression. row_wise : bool If set to False, each table will be a column in the resulting table. This is useful if histograms are being merged. retry : bool If True, multiple attempts will be made if the data can not be loaded at the first try, for example if a table is locked. options : string Command line options for the csv2db.py script. prefixes : list If given, the respective prefix will be added to each column. The number of prefixes and infiles needs to be the same.

Source code in cgatcore/pipeline/database.py
def merge_and_load(infiles,
                   outfile,
                   suffix=None,
                   columns=(0, 1),
                   regex=None,
                   row_wise=True,
                   retry=True,
                   options="",
                   prefixes=None):
    '''merge multiple categorical tables and load into a database.

    The tables are merged and entered row-wise, i.e, the contents of
    each file are a row.

    For example, the statement::

        mergeAndLoad(['file1.txt', 'file2.txt'],
                     "test_table.load")

    with the two files::
        > cat file1.txt
        Category    Result
        length      12
        width       100

        > cat file2.txt
        Category    Result
        length      20
        width       50

    will be added into table ``test_table`` as::
        track   length   width
        file1   12       100
        file2   20       50

    If row-wise is set::
        mergeAndLoad(['file1.txt', 'file2.txt'],
                     "test_table.load", row_wise=True)

    ``test_table`` will be transposed and look like this::
        track    file1 file2
        length   12    20
        width    20    50

    Arguments
    ---------
    infiles : list
        Filenames of the input data
    outfile : string
        Output filename. This will contain the logging information. The
        table name is derived from `outfile`.
    suffix : string
        If `suffix` is given, the suffix will be removed from the filenames.
    columns : list
        The columns to be taken. By default, the first two columns are
        taken with the first being the key. Filenames are stored in a
        ``track`` column. Directory names are chopped off.  If
        `columns` is set to None, all columns will be taken. Here,
        column names will receive a prefix given by `prefixes`. If
        `prefixes` is None, the filename will be added as a prefix.
    regex : string
        If set, the full filename will be used to extract a
        track name via the supplied regular expression.
    row_wise : bool
        If set to False, each table will be a column in the resulting
        table.  This is useful if histograms are being merged.
    retry : bool
        If True, multiple attempts will be made if the data can
        not be loaded at the first try, for example if a table is locked.
    options : string
        Command line options for the `csv2db.py` script.
    prefixes : list
        If given, the respective prefix will be added to each
        column. The number of `prefixes` and `infiles` needs to be the
        same.
    '''
    if len(infiles) == 0:
        raise ValueError("no files for merging")

    if suffix:
        header = ",".join([os.path.basename(snip(x, suffix)) for x in infiles])
    elif regex:
        header = ",".join(["-".join(re.search(regex, x).groups())
                           for x in infiles])
    else:
        header = ",".join([os.path.basename(x) for x in infiles])

    header_stmt = "--header-names=%s" % header

    if columns:
        column_filter = "| cut -f %s" % ",".join(map(str,
                                                     [x + 1 for x in columns]))
    else:
        column_filter = ""
        if prefixes:
            assert len(prefixes) == len(infiles)
            header_stmt = "--prefixes=%s" % ",".join(prefixes)
        else:
            header_stmt = "--add-file-prefix"

    if infiles[0].endswith(".gz"):
        filenames = " ".join(
            ["<( zcat %s %s )" % (x, column_filter) for x in infiles])
    else:
        filenames = " ".join(
            ["<( cat %s %s )" % (x, column_filter) for x in infiles])

    if row_wise:
        transform = """| perl -p -e "s/bin/track/"
        | python -m cgatcore.table --transpose"""
    else:
        transform = ""

    load_statement = build_load_statement(
        to_table(outfile),
        options="--add-index=track " + options,
        retry=retry)

    statement = """python -m cgatcore.tables
    %(header_stmt)s
    --skip-titles
    --missing-value=0
    --ignore-empty
    %(filenames)s
    %(transform)s
    | %(load_statement)s
    > %(outfile)s
    """
    run(statement)

nested_update(old, new)

Update potentially nested dictionaries. If both old[x] and new[x] inherit from collections.abc.Mapping, then update old[x] with entries from new[x], otherwise set old[x] to new[x]

Source code in cgatcore/pipeline/parameters.py
def nested_update(old, new):
    '''Update potentially nested dictionaries. If both old[x] and new[x]
    inherit from collections.abc.Mapping, then update old[x] with entries from
    new[x], otherwise set old[x] to new[x]'''

    for key, value in new.items():
        if isinstance(value, Mapping) and \
           isinstance(old.get(key, str()), Mapping):
            old[key].update(new[key])
        else:
            old[key] = new[key]

parse_commandline(argv=None, optparse=True, **kwargs)

parse command line.

Create option parser and parse command line.

Arguments

argv : list List of command line options to parse. If None, use sys.argv.

**kwargs: dict Additional arguments overwrite default option settings.

Source code in cgatcore/pipeline/control.py
def parse_commandline(argv=None, optparse=True, **kwargs):
    """parse command line.

    Create option parser and parse command line.

    Arguments
    ---------
    argv : list
        List of command line options to parse. If None, use sys.argv.

    **kwargs: dict
        Additional arguments overwrite default option settings.

    """
    if argv is None:
        argv = sys.argv

    if optparse is True:

        parser = E.OptionParser(version="%prog version: $Id$",
                                usage=USAGE)

        parser.add_option("--pipeline-action", dest="pipeline_action",
                          type="choice",
                          choices=(
                              "make", "show", "plot", "dump", "config",
                              "clone", "check", "regenerate", "state",
                              "printconfig", "svg"),
                          help="action to take [default=%default].")

        parser.add_option("--pipeline-format", dest="pipeline_format",
                          type="choice",
                          choices=("dot", "jpg", "svg", "ps", "png"),
                          help="pipeline format [default=%default].")

        parser.add_option("-n", "--dry-run", dest="dry_run",
                          action="store_true",
                          help="perform a dry run (do not execute any shell "
                          "commands) [default=%default].")

        parser.add_option("-c", "--config-file", dest="config_file",
                          help="benchmark configuration file "
                          "[default=%default].")

        parser.add_option("-f", "--force-run", dest="force_run",
                          type="string",
                          help="force running the pipeline even if there are "
                          "up-to-date tasks. If option is 'all', all tasks "
                          "will be rerun. Otherwise, only the tasks given as "
                          "arguments will be rerun. "
                          "[default=%default].")

        parser.add_option("-p", "--multiprocess", dest="multiprocess", type="int",
                          help="number of parallel processes to use on "
                          "submit host "
                          "(different from number of jobs to use for "
                          "cluster jobs) "
                          "[default=%default].")

        parser.add_option("-e", "--exceptions", dest="log_exceptions",
                          action="store_true",
                          help="echo exceptions immediately as they occur "
                          "[default=%default].")

        parser.add_option("-i", "--terminate", dest="terminate",
                          action="store_true",
                          help="terminate immediately at the first exception "
                          "[default=%default].")

        parser.add_option("-d", "--debug", dest="debug",
                          action="store_true",
                          help="output debugging information on console, "
                          "and not the logfile "
                          "[default=%default].")

        parser.add_option("-s", "--set", dest="variables_to_set",
                          type="string", action="append",
                          help="explicitely set paramater values "
                          "[default=%default].")

        parser.add_option("--input-glob", "--input-glob", dest="input_globs",
                          type="string", action="append",
                          help="glob expression for input filenames. The exact format "
                          "is pipeline specific. If the pipeline expects only a single input, "
                          "`--input-glob=*.bam` will be sufficient. If the pipeline expects "
                          "multiple types of input, a qualifier might need to be added, for example "
                          "`--input-glob=bam=*.bam` --input-glob=bed=*.bed.gz`. Giving this option "
                          "overrides the default of a pipeline looking for input in the current directory "
                          "or specified the config file. "
                          "[default=%default].")

        parser.add_option("--checksums", dest="ruffus_checksums_level",
                          type="int",
                          help="set the level of ruffus checksums"
                          "[default=%default].")

        parser.add_option("-t", "--is-test", dest="is_test",
                          action="store_true",
                          help="this is a test run"
                          "[default=%default].")

        parser.add_option("--engine", dest="engine",
                          choices=("local", "arvados"),
                          help="engine to use."
                          "[default=%default].")

        parser.add_option(
            "--always-mount", dest="always_mount",
            action="store_true",
            help="force mounting of arvados keep [%default]")

        parser.add_option("--only-info", dest="only_info",
                          action="store_true",
                          help="only update meta information, do not run "
                          "[default=%default].")

        parser.add_option("--work-dir", dest="work_dir",
                          type="string",
                          help="working directory. Will be created if it does not exist "
                          "[default=%default].")

        group = E.OptionGroup(parser, "pipeline logging configuration")

        group.add_option("--pipeline-logfile", dest="pipeline_logfile",
                         type="string",
                         help="primary logging destination."
                         "[default=%default].")

        group.add_option("--shell-logfile", dest="shell_logfile",
                         type="string",
                         help="filename for shell debugging information. "
                         "If it is not an absolute path, "
                         "the output will be written into the current working "
                         "directory. If unset, no logging will be output. "
                         "[default=%default].")

        parser.add_option("--input-validation", dest="input_validation",
                          action="store_true",
                          help="perform input validation before starting "
                          "[default=%default].")

        parser.add_option_group(group)

        parser.set_defaults(
            pipeline_action=None,
            pipeline_format="svg",
            pipeline_targets=[],
            force_run=False,
            multiprocess=None,
            pipeline_logfile="pipeline.log",
            shell_logfile=None,
            dry_run=False,
            log_exceptions=True,
            engine="local",
            exceptions_terminate_immediately=None,
            debug=False,
            variables_to_set=[],
            is_test=False,
            ruffus_checksums_level=0,
            config_file="pipeline.yml",
            work_dir=None,
            always_mount=False,
            only_info=False,
            input_globs=[],
            input_validation=False)

        parser.set_defaults(**kwargs)

        if "callback" in kwargs:
            kwargs["callback"](parser)

        logger_callback = setup_logging
        (options, args) = E.start(
            parser,
            add_cluster_options=True,
            argv=argv,
            logger_callback=logger_callback)
        options.pipeline_name = argv[0]
        if args:
            options.pipeline_action = args[0]
            options.pipeline_targets = args[1:]

    else:
        parser = E.ArgumentParser(description=USAGE)

        parser.add_argument("--pipeline-action", dest="pipeline_action",
                            type=str,
                            choices=(
                                "make", "show", "plot", "dump", "config",
                                "clone", "check", "regenerate", "state",
                                "printconfig", "svg"),
                            help="action to take.")

        parser.add_argument("--pipeline-format", dest="pipeline_format",
                            type=str,
                            choices=("dot", "jpg", "svg", "ps", "png"),
                            help="pipeline format.")

        parser.add_argument("-n", "--dry-run", dest="dry_run",
                            action="store_true",
                            help="perform a dry run (do not execute any shell "
                            "commands).")

        parser.add_argument("-c", "--config-file", dest="config_file",
                            help="benchmark configuration file ")

        parser.add_argument("-f", "--force-run", dest="force_run",
                            type=str,
                            help="force running the pipeline even if there are "
                            "up-to-date tasks. If option is 'all', all tasks "
                            "will be rerun. Otherwise, only the tasks given as "
                            "arguments will be rerun. ")

        parser.add_argument("-p", "--multiprocess", dest="multiprocess", type=int,
                            help="number of parallel processes to use on "
                            "submit host "
                            "(different from number of jobs to use for "
                            "cluster jobs) ")

        parser.add_argument("-e", "--exceptions", dest="log_exceptions",
                            action="store_true",
                            help="echo exceptions immediately as they occur ")

        parser.add_argument("-i", "--terminate", dest="terminate",
                            action="store_true",
                            help="terminate immediately at the first exception")

        parser.add_argument("-d", "--debug", dest="debug",
                            action="store_true",
                            help="output debugging information on console, "
                            "and not the logfile ")

        parser.add_argument("-s", "--set", dest="variables_to_set",
                            type=str, action="append",
                            help="explicitely set paramater values ")

        parser.add_argument("--input-glob", "--input-glob", dest="input_globs",
                            type=str, action="append",
                            help="glob expression for input filenames. The exact format "
                            "is pipeline specific. If the pipeline expects only a single input, "
                            "`--input-glob=*.bam` will be sufficient. If the pipeline expects "
                            "multiple types of input, a qualifier might need to be added, for example "
                            "`--input-glob=bam=*.bam` --input-glob=bed=*.bed.gz`. Giving this option "
                            "overrides the default of a pipeline looking for input in the current directory "
                            "or specified the config file.")

        parser.add_argument("--checksums", dest="ruffus_checksums_level",
                            type=int,
                            help="set the level of ruffus checksums")

        parser.add_argument("-t", "--is-test", dest="is_test",
                            action="store_true",
                            help="this is a test run")

        parser.add_argument("--engine", dest="engine",
                            type=str,
                            choices=("local", "arvados"),
                            help="engine to use.")

        parser.add_argument(
            "--always-mount", dest="always_mount",
            action="store_true",
            help="force mounting of arvados keep")

        parser.add_argument("--only-info", dest="only_info",
                            action="store_true",
                            help="only update meta information, do not run")

        parser.add_argument("--work-dir", dest="work_dir",
                            type=str,
                            help="working directory. Will be created if it does not exist")

        parser.add_argument("--cleanup-on-fail", action="store_true", default=True,
                            help="Enable cleanup of jobs on pipeline failure.")

        group = parser.add_argument_group("pipeline logging configuration")

        group.add_argument("--pipeline-logfile", dest="pipeline_logfile",
                           type=str,
                           help="primary logging destination.")

        group.add_argument("--shell-logfile", dest="shell_logfile",
                           type=str,
                           help="filename for shell debugging information. "
                           "If it is not an absolute path, "
                           "the output will be written into the current working "
                           "directory. If unset, no logging will be output.")

        group.add_argument("--input-validation", dest="input_validation",
                           action="store_true",
                           help="perform input validation before starting")

        parser.set_defaults(
            pipeline_action=None,
            pipeline_format="svg",
            pipeline_targets=[],
            force_run=False,
            multiprocess=None,
            pipeline_logfile="pipeline.log",
            shell_logfile=None,
            dry_run=False,
            log_exceptions=True,
            engine="local",
            exceptions_terminate_immediately=None,
            debug=False,
            variables_to_set=[],
            is_test=False,
            ruffus_checksums_level=0,
            config_file="pipeline.yml",
            work_dir=None,
            always_mount=False,
            only_info=False,
            input_globs=[],
            input_validation=False)

        parser.set_defaults(**kwargs)

        if "callback" in kwargs:
            kwargs["callback"](parser)

        logger_callback = setup_logging
        args, unknown = E.start(
            parser,
            add_cluster_options=True,
            argv=argv,
            logger_callback=logger_callback,
            unknowns=True)

        args.pipeline_name = argv[0]

peek_parameters(workingdir, pipeline, on_error_raise=None, prefix=None, update_interface=False, restrict_interface=False)

peek configuration parameters from external pipeline.

As the paramater dictionary is built at runtime, this method executes the pipeline in workingdir, dumping its configuration values and reading them into a dictionary.

If either pipeline or workingdir are not found, an error is raised. This behaviour can be changed by setting on_error_raise to False. In that case, an empty dictionary is returned.

Arguments

workingdir : string Working directory. This is the directory that the pipeline was executed in. pipeline : string Name of the pipeline script. The pipeline is assumed to live in the same directory as the current pipeline. on_error_raise : Bool If set to a boolean, an error will be raised (or not) if there is an error during parameter peeking, for example if workingdir can not be found. If on_error_raise is None, it will be set to the default, which is to raise an exception unless the calling script is imported or the option --is-test has been passed at the command line. prefix : string Add a prefix to all parameters. This is useful if the paramaters are added to the configuration dictionary of the calling pipeline. update_interface : bool If True, this method will prefix any options in the [interface] section with workingdir. This allows transparent access to files in the external pipeline. restrict_interface : bool If True, only interface parameters will be imported.

Returns

config : dict Dictionary of configuration values.

Source code in cgatcore/pipeline/control.py
def peek_parameters(workingdir,
                    pipeline,
                    on_error_raise=None,
                    prefix=None,
                    update_interface=False,
                    restrict_interface=False):
    '''peek configuration parameters from external pipeline.

    As the paramater dictionary is built at runtime, this method
    executes the pipeline in workingdir, dumping its configuration
    values and reading them into a dictionary.

    If either `pipeline` or `workingdir` are not found, an error is
    raised. This behaviour can be changed by setting `on_error_raise`
    to False. In that case, an empty dictionary is returned.

    Arguments
    ---------
    workingdir : string
       Working directory. This is the directory that the pipeline
       was executed in.
    pipeline : string
       Name of the pipeline script. The pipeline is assumed to live
       in the same directory as the current pipeline.
    on_error_raise : Bool
       If set to a boolean, an error will be raised (or not) if there
       is an error during parameter peeking, for example if
       `workingdir` can not be found. If `on_error_raise` is None, it
       will be set to the default, which is to raise an exception
       unless the calling script is imported or the option
       ``--is-test`` has been passed at the command line.
    prefix : string
       Add a prefix to all parameters. This is useful if the paramaters
       are added to the configuration dictionary of the calling pipeline.
    update_interface : bool
       If True, this method will prefix any options in the
       ``[interface]`` section with `workingdir`. This allows
       transparent access to files in the external pipeline.
    restrict_interface : bool
       If  True, only interface parameters will be imported.

    Returns
    -------
    config : dict
        Dictionary of configuration values.

    '''
    caller_locals = get_caller_locals()

    # check if we should raise errors
    if on_error_raise is None:
        on_error_raise = not is_test() and \
            "__name__" in caller_locals and \
            caller_locals["__name__"] == "__main__"

    # patch - if --help or -h in command line arguments,
    # do not peek as there might be no config file.
    if "--help" in sys.argv or "-h" in sys.argv:
        return {}

    if workingdir == "":
        workingdir = os.path.abspath(".")

    # patch for the "config" target - use default
    # pipeline directory if directory is not specified
    # working dir is set to "?!"
    if ("config" in sys.argv or "check" in sys.argv or "clone" in sys.argv and workingdir == "?!"):
        workingdir = os.path.join(get_params()["pipelinedir"],
                                  "pipeline_" + pipeline)

    if not os.path.exists(workingdir):
        if on_error_raise:
            raise ValueError(
                "can't find working dir %s" % workingdir)
        else:
            return {}

    statement = "cgatflow {} dump -v 0".format(pipeline)

    os.environ.update(
        {'BASH_ENV': os.path.join(os.environ['HOME'], '.bashrc')})
    process = subprocess.Popen(statement,
                               cwd=workingdir,
                               shell=True,
                               stdin=subprocess.PIPE,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE,
                               env=os.environ.copy())

    # process.stdin.close()
    stdout, stderr = process.communicate()
    if process.returncode != 0:
        raise OSError(
            ("Child was terminated by signal %i: \n"
             "Statement: %s\n"
             "The stderr was: \n%s\n"
             "Stdout: %s") %
            (-process.returncode, statement, stderr, stdout))

    # subprocess only accepts encoding argument in py >= 3.6 so
    # decode here.
    stdout = stdout.decode("utf-8").splitlines()
    # remove any log messages
    stdout = [x for x in stdout if x.startswith("{")]
    if len(stdout) > 1:
        raise ValueError("received multiple configurations")
    dump = json.loads(stdout[0])

    # update interface
    if update_interface:
        for key, value in list(dump.items()):
            if key.startswith("interface"):
                if isinstance(value, str):
                    dump[key] = os.path.join(workingdir, value)
                elif isinstance(value, Mapping):
                    for kkey, vvalue in list(value.items()):
                        value[key] = os.path.join(workingdir, vvalue)

    # keep only interface if so required
    if restrict_interface:
        dump = dict([(k, v) for k, v in dump.items()
                     if k.startswith("interface")])

    # prefix all parameters
    if prefix is not None:
        dump = dict([("%s%s" % (prefix, x), y) for x, y in list(dump.items())])

    return dump

print_config_files()

Print the list of .ini files used to configure the pipeline along with their associated priorities. Priority 1 is the highest.

Source code in cgatcore/pipeline/control.py
def print_config_files():
    '''
        Print the list of .ini files used to configure the pipeline
        along with their associated priorities.
        Priority 1 is the highest.
    '''

    filenames = get_params()['pipeline_yml']
    print("\n List of .yml files used to configure the pipeline")
    s = len(filenames)
    if s == 0:
        print(" No yml files passed!")
    elif s >= 1:
        print(" %-11s: %s " % ("Priority", "File"))
        for f in filenames:
            if s == 1:
                print(" (highest) %s: %s\n" % (s, f))
            else:
                print(" %-11s: %s " % (s, f))
            s -= 1

run(statement, **kwargs)

run a command line statement.

This function runs a single or multiple statements either locally or on the cluster using drmaa. How a statement is executed or how it is modified depends on the context.

The context is provided by keyword arguments provided as named function arguments ('kwargs') but also from defaults (see below). The following keyword arguments are recognized:

job_memory memory to use for the job per thread. Memory specification should be in a format that is accepted by the job scheduler. Note that memory is per thread. If you have 6 threads and the total memory is 6Gb, use 1G as job_memory. job_total_memory total memory to use for a job. This will be divided by the number of threads. job_threads number of threads to request for the job. job_options options to the job scheduler. job_condaenv conda environment to use for the job. job_array if set, run statement as an array job. Job_array should be tuple with start, end, and increment.

In addition, any additional variables will be used to interpolate the command line string using python's '%' string interpolation operator.

The context is build in a hierarchical manner with successive operations overwriting previous values.

  1. Global variables The context is initialized with system-wide defaults stored in the global PARAMS singleton.
  2. Context of caller The context of the calling function is examined and any local variables defined in this context are added.
  3. kwargs Any options given explicitely as options to the run() method are added.
  4. params If the context of the calling function contains a params variable, its contents are added to the context. This permits setting variables in configuration files in TaskLibrary functions.

By default, a job is sent to the cluster, unless:

* ``to_cluster`` is present and set to None.

* ``without_cluster`` is True.

* ``--local`` has been specified on the command line
  and the option ``without_cluster`` has been set as
  a result.

* no libdrmaa is present

* the global session is not initialized (GLOBAL_SESSION is
  None)

Troubleshooting:

  1. DRMAA creates sessions and their is a limited number of sessions available. If there are two many or sessions become not available after failed jobs, use qconf -secl to list sessions and qconf -kec # to delete sessions.

  2. Memory: 1G of free memory can be requested using the job_memory variable: job_memory = "1G" If there are error messages like "no available queue", then the problem could be that a particular complex attribute has not been defined (the code should be hc for host:complex and not hl for host:local. Note that qrsh/qsub directly still works.

The job will be executed within PARAMS["work_dir"], unless PARAMS["work_dir"] is not local. In that case, the job will be executed in a shared temporary directory.

Arguments

statement : string or list of strings A command line statement or a list of command line statements to be executed. kwargs : dictionary Context for job. The context is used to interpolate the command line statement.

Source code in cgatcore/pipeline/execution.py
def run(statement, **kwargs):
    """run a command line statement.

    This function runs a single or multiple statements either locally
    or on the cluster using drmaa. How a statement is executed or how
    it is modified depends on the context.

    The context is provided by keyword arguments provided as named
    function arguments ('kwargs') but also from defaults (see
    below). The following keyword arguments are recognized:

    job_memory
        memory to use for the job per thread. Memory specification should be in a
        format that is accepted by the job scheduler. Note that memory
        is per thread. If you have 6 threads and the total memory is
        6Gb, use 1G as job_memory.
    job_total_memory
        total memory to use for a job. This will be divided by the number of
        threads.
    job_threads
        number of threads to request for the job.
    job_options
        options to the job scheduler.
    job_condaenv
        conda environment to use for the job.
    job_array
        if set, run statement as an array job. Job_array should be
        tuple with start, end, and increment.

    In addition, any additional variables will be used to interpolate
    the command line string using python's '%' string interpolation
    operator.

    The context is build in a hierarchical manner with successive
    operations overwriting previous values.

    1. Global variables
       The context is initialized
       with system-wide defaults stored in the global PARAMS
       singleton.
    2. Context of caller
       The context of the calling function is examined
       and any local variables defined in this context are added.
    3. kwargs
       Any options given explicitely as options to the run() method
       are added.
    4. params
       If the context of the calling function contains a params
       variable, its contents are added to the context. This permits
       setting variables in configuration files in TaskLibrary
       functions.

    By default, a job is sent to the cluster, unless:

        * ``to_cluster`` is present and set to None.

        * ``without_cluster`` is True.

        * ``--local`` has been specified on the command line
          and the option ``without_cluster`` has been set as
          a result.

        * no libdrmaa is present

        * the global session is not initialized (GLOBAL_SESSION is
          None)

    Troubleshooting:

       1. DRMAA creates sessions and their is a limited number
          of sessions available. If there are two many or sessions
          become not available after failed jobs, use ``qconf -secl``
          to list sessions and ``qconf -kec #`` to delete sessions.

       2. Memory: 1G of free memory can be requested using the job_memory
          variable: ``job_memory = "1G"``
          If there are error messages like "no available queue", then the
          problem could be that a particular complex attribute has
          not been defined (the code should be ``hc`` for ``host:complex``
          and not ``hl`` for ``host:local``. Note that qrsh/qsub directly
          still works.

    The job will be executed within PARAMS["work_dir"], unless
    PARAMS["work_dir"] is not local. In that case, the job will
    be executed in a shared temporary directory.

    Arguments
    ---------
    statement : string or list of strings
        A command line statement or a list of command line statements
        to be executed.
    kwargs : dictionary
        Context for job. The context is used to interpolate the command
        line statement.

    """
    logger = get_logger()

    # Combine options using priority
    options = dict(list(get_params().items()))
    caller_options = get_caller_locals()
    options.update(list(caller_options.items()))

    if "self" in options:
        del options["self"]
    options.update(list(kwargs.items()))

    # Inject params named tuple from TaskLibrary functions into option
    # dict. This allows overriding options set in the code with options set
    # in a .yml file
    if "params" in options:
        try:
            options.update(options["params"]._asdict())
        except AttributeError:
            pass

    # Insert parameters supplied through simplified interface such
    # as job_memory, job_options, job_queue
    options['cluster']['options'] = options.get(
        'job_options', options['cluster']['options'])
    options['cluster']['queue'] = options.get(
        'job_queue', options['cluster']['queue'])
    options['without_cluster'] = options.get('without_cluster')

    # SGE compatible job_name
    name_substrate = str(options.get("outfile", "cgatcore"))
    if os.path.basename(name_substrate).startswith("result"):
        name_substrate = os.path.basename(os.path.dirname(name_substrate))
    else:
        name_substrate = os.path.basename(name_substrate)

    options["job_name"] = re.sub("[:]", "_", name_substrate)
    try:
        calling_module = get_caller().__name__
    except AttributeError:
        calling_module = "unknown"

    options["task_name"] = calling_module + "." + get_calling_function()

    # Build statements using parameter interpolation
    if isinstance(statement, list):
        statement_list = [interpolate_statement(stmt, options) for stmt in statement]
    else:
        statement_list = [interpolate_statement(statement, options)]

    if len(statement_list) == 0:
        logger.warn("No statements found - no execution")
        return []

    if options.get("dryrun", False):
        for statement in statement_list:
            logger.info("Dry-run: {}".format(statement))
        return []

    # Use get_executor to get the appropriate executor
    executor = get_executor(options)  # Updated to use get_executor

    # Execute statement list within the context of the executor
    with executor as e:
        benchmark_data = e.run(statement_list)

    # Log benchmark data
    for data in benchmark_data:
        logger.info(json.dumps(data))

    BenchmarkData = collections.namedtuple('BenchmarkData', sorted(benchmark_data[0]))
    return [BenchmarkData(**d) for d in benchmark_data]

run_pickled(params)

run a function whose arguments have been pickled.

expects that params is [module_name, function_name, arguments_file]

Source code in cgatcore/pipeline/execution.py
def run_pickled(params):
    ''' run a function whose arguments have been pickled.

    expects that params is [module_name, function_name, arguments_file] '''

    module_name, func_name, args_file = params
    location = os.path.dirname(module_name)
    if location != "":
        sys.path.append(location)

    module_base_name = os.path.basename(module_name)
    logger = get_logger()
    logger.info("importing module '%s' " % module_base_name)
    logger.debug("sys.path is: %s" % sys.path)

    module = importlib.import_module(module_base_name)
    try:
        function = getattr(module, func_name)
    except AttributeError as msg:
        raise AttributeError(msg.message
                             + "unknown function, available functions are: %s" %
                             ",".join([x for x in dir(module)
                                       if not x.startswith("_")]))

    args, kwargs = pickle.load(open(args_file, "rb"))
    logger.info("arguments = %s" % str(args))
    logger.info("keyword arguments = %s" % str(kwargs))

    function(*args, **kwargs)

    os.unlink(args_file)

shellquote(statement)

shell quote a string to be used as a function argument.

from http://stackoverflow.com/questions/967443/ python-module-to-shellquote-unshellquote

Source code in cgatcore/pipeline/execution.py
def shellquote(statement):
    '''shell quote a string to be used as a function argument.

    from http://stackoverflow.com/questions/967443/
    python-module-to-shellquote-unshellquote
    '''
    _quote_pos = re.compile('(?=[^-0-9a-zA-Z_./\n])')

    if statement:
        return _quote_pos.sub('\\\\', statement).replace('\n', "'\n'")
    else:
        return "''"

snip(filename, extension=None, alt_extension=None, strip_path=False)

return prefix of filename, that is the part without the extension.

If extension is given, make sure that filename has the extension (or alt_extension). Both extension or alt_extension can be list of extensions.

If strip_path is set to true, the path is stripped from the file name.

Source code in cgatcore/iotools.py
def snip(filename, extension=None, alt_extension=None,
         strip_path=False):
    '''return prefix of `filename`, that is the part without the
    extension.

    If `extension` is given, make sure that filename has the
    extension (or `alt_extension`). Both extension or alt_extension
    can be list of extensions.

    If `strip_path` is set to true, the path is stripped from the file
    name.

    '''
    if extension is None:
        extension = []
    elif isinstance(extension, str):
        extension = [extension]

    if alt_extension is None:
        alt_extension = []
    elif isinstance(alt_extension, str):
        alt_extension = [alt_extension]

    if extension:
        for ext in extension + alt_extension:
            if filename.endswith(ext):
                root = filename[:-len(ext)]
                break
        else:
            raise ValueError("'%s' expected to end in '%s'" %
                             (filename, ",".join(
                                 extension + alt_extension)))
    else:
        root, ext = os.path.splitext(filename)

    if strip_path:
        snipped = os.path.basename(root)
    else:
        snipped = root

    return snipped

start_session()

start and initialize the global DRMAA session.

Source code in cgatcore/pipeline/execution.py
def start_session():
    """start and initialize the global DRMAA session."""
    global GLOBAL_SESSION

    if HAS_DRMAA and GLOBAL_SESSION is None:
        GLOBAL_SESSION = drmaa.Session()
        try:
            GLOBAL_SESSION.initialize()
        except drmaa.errors.InternalException as ex:
            get_logger().warn("could not initialize global drmaa session: {}".format(
                ex))
            GLOBAL_SESSION = None
        return GLOBAL_SESSION

submit(module, function, args=None, infiles=None, outfiles=None, to_cluster=True, logfile=None, job_options='', job_threads=1, job_memory=False)

submit a python function as a job to the cluster.

This method runs the script :file:run_function using the :func:run method in this module thus providing the same control options as for command line tools.

Arguments

module : string Module name that contains the function. If module is not part of the PYTHONPATH, an absolute path can be given. function : string Name of function to execute infiles : string or list Filenames of input data outfiles : string or list Filenames of output data logfile : filename Logfile to provide to the --log option job_options : string String for generic job options for the queuing system job_threads : int Number of slots (threads/cores/CPU) to use for the task job_memory : string Amount of memory to reserve for the job.

Source code in cgatcore/pipeline/execution.py
def submit(module,
           function,
           args=None,
           infiles=None,
           outfiles=None,
           to_cluster=True,
           logfile=None,
           job_options="",
           job_threads=1,
           job_memory=False):
    '''submit a python *function* as a job to the cluster.

    This method runs the script :file:`run_function` using the
    :func:`run` method in this module thus providing the same
    control options as for command line tools.

    Arguments
    ---------
    module : string
        Module name that contains the function. If `module` is
        not part of the PYTHONPATH, an absolute path can be given.
    function : string
        Name of function to execute
    infiles : string or list
        Filenames of input data
    outfiles : string or list
        Filenames of output data
    logfile : filename
        Logfile to provide to the ``--log`` option
    job_options : string
        String for generic job options for the queuing system
    job_threads : int
        Number of slots (threads/cores/CPU) to use for the task
    job_memory : string
        Amount of memory to reserve for the job.

    '''

    if not job_memory:
        job_memory = get_params().get("cluster_memory_default", "2G")

    if type(infiles) in (list, tuple):
        infiles = " ".join(["--input=%s" % x for x in infiles])
    else:
        infiles = "--input=%s" % infiles

    if type(outfiles) in (list, tuple):
        outfiles = " ".join(["--output-section=%s" % x for x in outfiles])
    else:
        outfiles = "--output-section=%s" % outfiles

    if logfile:
        logfile = "--log=%s" % logfile
    else:
        logfile = ""

    if args:
        args = "--args=%s" % ",".join(args)
    else:
        args = ""

    statement = (
        "python -m cgatcore.pipeline.run_function "
        "--module=%(module)s "
        "--function=%(function)s "
        "%(logfile)s "
        "%(infiles)s "
        "%(outfiles)s "
        "%(args)s")
    run(statement)

substitute_parameters(**kwargs)

return a parameter dictionary.

This method builds a dictionary of parameter values to apply for a specific task. The dictionary is built in the following order:

  1. take values from the global dictionary (:py:data:PARAMS)
  2. substitute values appearing in kwargs.
  3. Apply task specific configuration values by looking for the presence of outfile in kwargs.

The substition of task specific values works by looking for any parameter values starting with the value of outfile. The suffix of the parameter value will then be substituted.

For example::

PARAMS = {"tophat_threads": 4,
          "tophat_cutoff": 0.5,
          "sample1.bam.gz_tophat_threads" : 6}
outfile = "sample1.bam.gz"
print(substitute_parameters(**locals()))
{"tophat_cutoff": 0.5, "tophat_threads": 6}

Returns

params : dict Dictionary with parameter values.

Source code in cgatcore/pipeline/parameters.py
def substitute_parameters(**kwargs):
    '''return a parameter dictionary.

    This method builds a dictionary of parameter values to
    apply for a specific task. The dictionary is built in
    the following order:

    1. take values from the global dictionary (:py:data:`PARAMS`)
    2. substitute values appearing in `kwargs`.
    3. Apply task specific configuration values by looking for the
       presence of ``outfile`` in kwargs.

    The substition of task specific values works by looking for any
    parameter values starting with the value of ``outfile``.  The
    suffix of the parameter value will then be substituted.

    For example::

        PARAMS = {"tophat_threads": 4,
                  "tophat_cutoff": 0.5,
                  "sample1.bam.gz_tophat_threads" : 6}
        outfile = "sample1.bam.gz"
        print(substitute_parameters(**locals()))
        {"tophat_cutoff": 0.5, "tophat_threads": 6}

    Returns
    -------
    params : dict
        Dictionary with parameter values.

    '''

    # build parameter dictionary
    # note the order of addition to make sure that kwargs takes precedence
    local_params = dict(list(PARAMS.items()) + list(kwargs.items()))

    if "outfile" in local_params:
        # replace specific parameters with task (outfile) specific parameters
        outfile = local_params["outfile"]
        keys = list(local_params.keys())
        for k in keys:
            if k.startswith(outfile):
                p = k[len(outfile) + 1:]
                if p not in local_params:
                    # do not raise error, argument might be a prefix
                    continue
                get_logger.debug("substituting task specific parameter "
                                 "for %s: %s = %s" %
                                 (outfile, p, local_params[k]))
                local_params[p] = local_params[k]

    return local_params

tablequote(track)

quote a track name such that is suitable as a table name.

Source code in cgatcore/pipeline/database.py
def tablequote(track):
    '''quote a track name such that is suitable as a table name.'''
    return re.sub(r"[-(),\[\].]", "_", track)

to_table(outfile)

convert a filename from a load statement into a table name.

This method checks if the filename ends with ".load". The suffix is then removed and the filename quoted so that it is suitable as a table name.

Arguments

outfile : string A filename ending in ".load".

Returns

tablename : string

Source code in cgatcore/pipeline/database.py
def to_table(outfile):
    '''convert a filename from a load statement into a table name.

    This method checks if the filename ends with ".load". The suffix
    is then removed and the filename quoted so that it is suitable
    as a table name.

    Arguments
    ---------
    outfile : string
        A filename ending in ".load".

    Returns
    -------
    tablename : string

    '''
    assert outfile.endswith(".load")
    name = os.path.basename(outfile[:-len(".load")])
    return tablequote(name)

touch_file(filename, mode=438, times=None, dir_fd=None, ref=None, **kwargs)

update/create a sentinel file.

modified from: https://stackoverflow.com/questions/1158076/implement-touch-using-python

Compressed files (ending in .gz) are created as empty 'gzip' files, i.e., with a header.

Source code in cgatcore/iotools.py
def touch_file(filename, mode=0o666, times=None, dir_fd=None, ref=None, **kwargs):
    '''update/create a sentinel file.

    modified from: https://stackoverflow.com/questions/1158076/implement-touch-using-python

    Compressed files (ending in .gz) are created as empty 'gzip'
    files, i.e., with a header.

    '''
    flags = os.O_CREAT | os.O_APPEND
    existed = os.path.exists(filename)

    if filename.endswith(".gz") and not existed:
        # this will automatically add a gzip header
        with gzip.GzipFile(filename, "w") as fhandle:
            pass

    if ref:
        stattime = os.stat(ref)
        times = (stattime.st_atime, stattime.st_mtime)

    with os.fdopen(os.open(
            filename, flags=flags, mode=mode, dir_fd=dir_fd)) as fhandle:
        os.utime(
            fhandle.fileno() if os.utime in os.supports_fd else filename,
            dir_fd=None if os.supports_fd else dir_fd,
            **kwargs)

update_params_with_commandline_options(params, args)

add and update selected parameters in the parameter dictionary with command line args.

Source code in cgatcore/pipeline/control.py
def update_params_with_commandline_options(params, args):
    """add and update selected parameters in the parameter
    dictionary with command line args.
    """

    params["pipeline_name"] = args.pipeline_name
    params["dryrun"] = args.dry_run

    # translate cluster options into dict
    for key in params["cluster"].keys():
        arg_key = "cluster_{}".format(key)
        if hasattr(args, arg_key):
            val = getattr(args, arg_key)
            if val is not None:
                params["cluster"][key] = val

    if args.without_cluster:
        params["without_cluster"] = True

    params["shell_logfile"] = args.shell_logfile

    params["ruffus_checksums_level"] = args.ruffus_checksums_level
    # always create an "input" section
    params["input_globs"] = {}
    for variable in args.input_globs:
        if "=" in variable:
            variable, value = variable.split("=")
            params["input_globs"][variable.strip()] = value.strip()
        else:
            params["input_globs"]["default"] = variable.strip()

    for variables in args.variables_to_set:
        variable, value = variables.split("=")
        value = iotools.str2val(value.strip())
        # enter old style
        params[variable.strip()] = value
        # enter new style
        parts = variable.split("_")
        for x in range(1, len(parts)):
            prefix = "_".join(parts[:x])
            if prefix in params:
                suffix = "_".join(parts[x:])
                params[prefix][suffix] = value

    if args.work_dir:
        params["work_dir"] = os.path.abspath(args.work_dir)
    else:
        params["work_dir"] = params["start_dir"]

write_config_files(pipeline_path, general_path)

create default configuration files in path.

Source code in cgatcore/pipeline/control.py
def write_config_files(pipeline_path, general_path):
    '''create default configuration files in `path`.
    '''

    paths = [pipeline_path, general_path]
    config_files = ['pipeline.yml']

    for dest in config_files:
        if os.path.exists(dest):
            E.warn("file `%s` already exists - skipped" % dest)
            continue

        for path in paths:
            src = os.path.join(path, dest)
            if os.path.exists(src):
                shutil.copyfile(src, dest)
                E.info("created new configuration file `%s` " % dest)
                break
        else:
            raise ValueError(
                "default config file `%s` not found in %s" %
                (config_files, paths))