Execution Module¶
execution.py manages job submission and monitoring for cgatcore pipelines.
It is the bridge between pipeline task functions and the compute backend
(cluster or local machine).
Executor selection¶
When P.run(statement) is called, an executor is chosen automatically:
GridExecutor(DRMAA) — used when a DRMAA library is installed (e.g.slurm-drmaa,drmaa-python) and a session is active. This is the preferred path for HPC clusters.- CLI-based executor (
SlurmExecutor,SGEExecutor,TorqueExecutor,KubernetesExecutor) — used when DRMAA is unavailable. LocalExecutor— used whento_cluster=Falseorwithout_cluster=True.
See Executors for a full description of each executor class.
DRMAA session management¶
start_session() is called automatically by run_workflow(). If the DRMAA
library is not installed, HAS_DRMAA is False and no session is started;
jobs fall back to the CLI-based executors.
run(statement, **kwargs)¶
The primary function for submitting jobs from pipeline task functions:
P.run(statement)
P.run(statement, job_memory="8G", job_threads=4)
P.run(statement, to_cluster=False) # force local execution
Context resolution¶
Options are merged in this priority order (lowest to highest):
- Global
PARAMSdefaults - Local variables in the calling function's scope
- Explicit
kwargspassed torun() - A
paramsnamedtuple in the caller's scope (from TaskLibrary)
Key options¶
| Option | Description |
|---|---|
job_memory |
Memory per thread (e.g. "4G") |
job_total_memory |
Total memory for the job (divided by job_threads) |
job_threads |
Number of threads/CPUs to request |
job_options |
Additional scheduler options string |
job_queue |
Queue/partition to submit to |
job_condaenv |
Conda environment to activate in the job |
to_cluster |
True to use cluster, False to run locally (default: True) |
without_cluster |
Alias for to_cluster=False |
dryrun |
If True, log the statement without executing it |
Return value¶
Returns a list of BenchmarkData namedtuples with fields including
task, job_id, start_time, end_time, exit_status, max_rss,
wall_t, etc.
GridExecutor and GridArrayExecutor¶
These classes (in execution.py) handle DRMAA-based cluster submission:
GridExecutor: Submits a single job via DRMAA, polls status usingdrmaa.Session, collects resource usage fromsacct/qaccton completion.GridArrayExecutor: Submits array jobs via DRMAA for parallel task execution.
Both inherit from Executor, which provides job tracking, signal handling,
and the context manager interface.
Executor base class (DRMAA path)¶
Executor (in execution.py) is the base for GridExecutor. It is
distinct from BaseExecutor (in base_executor.py) which is the base
for the CLI executors.
Key responsibilities:
- Tracks
active_jobsfor cleanup on pipeline interruption - Installs SIGTERM/SIGINT handlers via
setup_signal_handlers() - Provides
cleanup_all_jobs()andcleanup_failed_job()
will_run_on_cluster(options)¶
Helper that returns True when DRMAA is available, a session is active,
to_cluster is truthy, and without_cluster is not set. Returns False
otherwise (no exception is raised if DRMAA is absent).
get_executor(options)¶
Factory function returning the appropriate CLI-based executor when DRMAA is
unavailable. Checks options["cluster_queue_manager"] and the presence of
scheduler binaries:
"slurm" + sbatch found → SlurmExecutor
"sge" + qsub found → SGEExecutor
"torque" + qsub found → TorqueExecutor
"kubernetes" → KubernetesExecutor
otherwise → LocalExecutor
LocalExecutor (execution.py)¶
A second LocalExecutor in execution.py (distinct from the one in
executors.py) handles local job execution using gevent. It is used when
DRMAA is unavailable and no cluster scheduler is configured.