Skip to content

populationgenomics/cpg-flow

Repository files navigation

🐙 CPG Flow

CPG Flow logo

Python

⚙️ Test Workflow 🚀 Deploy To Production Workflow GitHub Latest Main Release GitHub Release semantic-release: conventional commits GitHub license

Technical Debt Duplicated Lines (%) Code Smells


📋 Table of Contents

  1. 🐙 What is this API ?
  2. Production and development links
  3. 🔨 Installation
  4. 🚀 Build
  5. 🤖 Usage
  6. 😵‍💫 Key Considerations and Limitations
  7. 🐳 Docker
  8. 💯 Tests
  9. ☑️ Code analysis and consistency
  10. 📈 Releases & Changelog
  11. 🎬 GitHub Actions
  12. ©️ License
  13. ❤️ Contributors

Welcome to CPG Flow!

This API provides a set of tools and workflows for managing population genomics data pipelines, designed to streamline the processing, analysis, and storage of large-scale genomic datasets. It facilitates automated pipeline execution, enabling reproducible research while integrating with cloud-based resources for scalable computation.

CPG Flow supports various stages of genomic data processing, from raw data ingestion to final analysis outputs, making it easier for researchers to manage and scale their population genomics workflows.

The API constructs a DAG (Directed Acyclic Graph) structure from a set of chained stages. This DAG structure then forms the pipeline.

🌐 Production

The production version of this API is documented at populationgenomics.github.io/cpg-flow/.

The documentation is updated automatically when a commit is pushed on the alpha (prerelease) or main (release) branch.

The packages are hosted on:

PyPI

To install this project, you will need to have Python and uv installed on your machine:

uv Python

Run the following commands, to create a virtual environment with uv and install the dependencies:

# Install the package using uv
uv sync

# Or equivalently use make (also installs pre-commit)
make init

🛠️ Development

To setup for development we recommend using the makefile setup

make init-dev # installs pre-commit as a hook

To install cpg-flow locally, run:

make install-local

To try out the pre-installed cpg-flow in a Docker image, find more information in the Docker section.

To build the project, run the following command:

make build

To make sure that you're actually using the installed build we suggest calling the following to install the build wheel.

make install-build

This project provides the framework to construct pipelines but does not offer hosting the logic of any pipelines themselves. This approach offers the benefit of making all components more modular, manageable and decoupled. Pipelines themselves are hosted in a separate repository.

The test_workflows_shared repository acts as a template and demonstrates how to structure a pipeline using CPG Flow.

The components required to build pipelines with CPG Flow:

config .toml file

This file contains the configuration settings to your pipeline. This file allows the pipeline developer to define settings such as:

  1. what stages will be run or skipped
  2. what dataset to use
  3. what access level to use
  4. any input cohorts
  5. sequencing type
[workflow]
dataset = 'fewgenomes'

# Note: for fewgenomes and sandbox mentioning datasets by name is not a security risk
# DO NOT DO THIS FOR OTHER DATASETS

input_cohorts = ['COH2142']
access_level = 'test'

# Force stage rerun
force_stages = [
    'GeneratePrimes', # the first stage
    'CumulativeCalc', # the second stage
    'FilterEvens', # the third stage
    'BuildAPrimePyramid', # the last stage
]

# Show a workflow graph locally or save to web bucket.
# Default is false, set to true to show the workflow graph.
show_workflow = true
# ...

For a full list of supported config options with documentation, see defaults.toml

This .toml file will be may be named anything, as long as it is correctly passed to the analysis-runner invocation. The analysis-runner supplies its own default settings, and combines it with the settings from this file, before submitting a job.

main.py or equivalent entrypoint for the pipeline

This file would store the workflow definition as a list of stages, and then run said workflow:

 import os
 from pathlib import Path
 from cpg_flow.workflow import run_workflow
 from cpg_utils.config import set_config_paths
 from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes

 CONFIG_FILE = str(Path(__file__).parent / '<YOUR_CONFIG>.toml')

 def run_cpg_flow(dry_run=False):

    #See the 'Key Considerations and Limitations' section for notes on the definition of the `workflow` variable.

    # This represents the flow of the DAG
     workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid]

     config_paths = os.environ['CPG_CONFIG_PATH'].split(',')

     # Inserting after the "defaults" config, but before user configs:
     set_config_paths(config_paths[:1] + [CONFIG_FILE] + config_paths[1:])
     run_workflow(stages=workflow, dry_run=dry_run)

 if __name__ == '__main__':
   run_cpg_flow()

The workflow definition here forms a DAG (Directed Acyclic Graph) structure.

DAG

To generate a plot of the DAG, show_workflow = True should be included in the config. The DAG plot generated from the pipeline definition is available in the logs via the job URL. To find the link to the plot, search the Logs section for the string: "INFO - Link to the graph:".

There are some key considerations and limitations to take into account when designing the DAG:

stages.py or equivalent file(s) for the Stage definitions

A Stage represents a node in the DAG. The stages can be abstracted from either a DatasetStage, CohortStage, MultiCohortStage, or a SequencingGroupStage.

The stage definition should use the @stage decorator to optionally set:

  • dependent stages (this is used to build the DAG)
  • analysis keys (this determines what outputs should be written to metamist)
  • the analysis type (this determines the analysis-type to be written to metamist)

All stages require an expected_outputs class method definition, that sets the expected output path location for a given Target such as a SequencingGroup, Dataset, Cohort, or MultiCohort.

Also required, is a queue_jobs class method definition that calls pipeline jobs, and stores the results of these jobs to the paths defined in expected_outputs.

It is good practice to separate the Stage definitions into their own files, to keep the code compact, and manageable.

from cpg_flow.stage import SequencingGroupStage, StageInput, StageOutput, stage
from cpg_flow.targets.sequencing_group import SequencingGroup
from jobs import cumulative_calc

WORKFLOW_FOLDER = 'prime_pyramid'

# ...
# This stage depends on the `GeneratePrimes` stage, and requires outputs from that stage.
@stage(required_stages=[GeneratePrimes], analysis_keys=['cumulative'], analysis_type='custom')
class CumulativeCalc(SequencingGroupStage):
 def expected_outputs(self, sequencing_group: SequencingGroup):
     return {
         'cumulative': sequencing_group.dataset.prefix() / WORKFLOW_FOLDER / f'{sequencing_group.id}_cumulative.txt',
     }

 def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None:
     input_txt = inputs.as_path(sequencing_group, GeneratePrimes, 'primes')
     b = get_batch()

     cumulative_calc_output_path = str(self.expected_outputs(sequencing_group).get('cumulative', ''))

     # We define a job instance from the `cumulative_calc` job definition.
     job_cumulative_calc = cumulative_calc(b, sequencing_group, input_txt, cumulative_calc_output_path)

     jobs = [job_cumulative_calc]

     return self.make_outputs(
         sequencing_group,
         data=self.expected_outputs(sequencing_group),
         jobs=jobs,
     )
# ...

There is a key consideration to take into account when writing the stages:

jobs.py or equivalent file for Job definitions

Every Stage requires a collection of jobs that will be executed within. It is good practice to store these jobs in their own files, as the definitions can often get long.

from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job


def cumulative_calc(
    b: Batch,
    sequencing_group: SequencingGroup,
    input_file_path: str,
    output_file_path: str,
) -> list[Job]:
    title = f'Cumulative Calc: {sequencing_group.id}'
    job = b.new_job(name=title)
    primes_path = b.read_input(input_file_path)

    cmd = f"""
    primes=($(cat {primes_path}))
    csum=0
    cumulative=()
    for prime in "${{primes[@]}}"; do
        ((csum += prime))
        cumulative+=("$csum")
    done
    echo "${{cumulative[@]}}" > {job.cumulative}
    """

    job.command(cmd)

    print('-----PRINT CUMULATIVE-----')
    print(output_file_path)
    b.write_output(job.cumulative, output_file_path)

    return job

Once these required components are written, the pipeline is ready to be executed against this framework.

Running the pipeline

All pipelines can only be exclusively run using the analysis-runner package which grants the user appropriate permissions based on the dataset and access level defined above. analysis-runner requires a repo, commit and the entrypoint file, and then runs the code inside a "driver" image on Hail Batch, logging the invocation to metamist for future audit and reproducibility.

Therefore, the pipeline code needs to be pushed to a remote version control system, for analysis-runner to be able to pull it for execution. A job can then be submitted:

analysis-runner \
  --image "australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow:1.0.0" \
  --dataset "fewgenomes" \
  --description "cpg-flow_test" \
  --access-level "test" \
  --output-dir "cpg-flow_test" \
  --config "<YOUR_CONFIG>.toml" \
  workflow.py

If the job is successfully created, the analysis-runner output will include a job URL. This driver job will trigger additional jobs, which can be monitored via the /batches page on Hail. Monitoring these jobs helps verify that the workflow ran successfully. When all expected jobs complete without errors, this confirms the successful execution of the workflow and indicates that the cpg_flow package is functioning as intended.

See the Docker section for instruction on pulling valid images releases.

No Forward Discovery

The framework exclusively relies on backward traversal. If a stage is not explicitly or indirectly linked to one of the final stages through the required_stages parameter of the @stage decorator, it will not be included in the workflow. In other words, stages that are not reachable from a final stage are effectively ignored. This backward discovery approach ensures that only the stages directly required for the specified final stages are included, optimizing the workflow by excluding irrelevant or unused stages.

Workflow Definition

The workflow definition serves as a lookup table for the final stages. If a final stage is not listed in this definition, it will not be part of the workflow, as there is no mechanism for forward discovery to identify it.

workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid]

Config Settings for expected_outputs

The expected_outputs method is called for every stage in the workflow, even if the config.toml configures the stage to be skipped. This ensures that the workflow can validate or reference the expected outputs of all stages.

Since this method may depend on workflow-specific configuration settings, these settings must be present in the workflow configuration, regardless of whether the stage will run. To avoid issues, it is common practice to include dummy values for such settings in the default configuration. This is not the intended behaviour and is marked as an area of improvement in a future release.

Verifying results of expected_outputs

The API uses the results of the expected_outputs method to determine whether a stage needs to run. A stage is scheduled for execution only if one or more Path objects returned by expected_outputs do not exist in Google Cloud Platform (GCP). If a returned Path object exists, the stage is considered to have already run successfully, and is therefore skipped.

For outputs such as Matrix Tables (.mt), Hail Tables (.ht), or Variant Datasets (.vds), which are complex structures of thousands of files, the check is performed on the object/_SUCCESS file to verify that the output was written completely. However, it has been observed that the object/_SUCCESS file may be written multiple times during processing, contrary to the expectation that it should only be written once after all associated files have been fully processed.

String outputs from expected_outputs

String outputs from the expected_outputs method are not checked by the API. This is because string outputs cannot reliably be assumed to represent valid file paths and may instead correspond to other forms of outputs.

Behavior of queue_jobs in relation to expected_outputs

When the expected_outputs check determines that one or more required files do not exist, and the stage is not configured to be skipped, the queue_jobs method is invoked to define the specific work that needs to be scheduled in the workflow.

The queue_jobs method runs within the driver image, before any jobs in the workflow are executed. Because of this, it cannot access or read files generated by earlier stages, as those outputs have not yet been created. The actual outputs from earlier jobs only become available as the jobs are executed during runtime.

Explicit dependency between all jobs from queue_jobs

When the queue_jobs method schedules a collection of jobs to Hail Batch, one or more jobs are returned from the method, and the framework sets an explicit dependency between these jobs, and all jobs from the Stages set in the required_stages parameter. Therefore, all jobs that run in a Stage must be returned within queue_jobs to ensure no jobs start out of sequence. As an example:

# test_workflows_shared/cpg_flow_test/jobs/filter_evens.py
def filter_evens(
    b: Batch,
    inputs: StageInput,
    previous_stage: Stage,
    sequencing_groups: list[SequencingGroup],
    input_files: dict[str, dict[str, Any]],
    sg_outputs: dict[str, dict[str, Any]],
    output_file_path: str,
) -> list[Job]:
    title = 'Filter Evens'

    # Compute the no evens list for each sequencing group
    sg_jobs = []
    sg_output_files = []
    for sg in sequencing_groups:  # type: ignore
        job = b.new_job(name=title + ': ' + sg.id)
        ...

        cmd = f"""
        ...
        """

        job.command(cmd)
        b.write_output(job.sg_no_evens_file, no_evens_output_file_path)
        sg_jobs.append(job)

    # Merge the no evens lists for all sequencing groups into a single file
    job = b.new_job(name=title)
    job.depends_on(*sg_jobs)
    inputs = ' '.join([b.read_input(f) for f in sg_output_files])
    job.command(f'cat {inputs} >> {job.no_evens_file}')
    b.write_output(job.no_evens_file, output_file_path)

    # ALL jobs are returned back to `queue_jobs`
    # including new jobs created within this job.
    all_jobs = [job, *sg_jobs]
    return all_jobs

Docker Image Usage for cpg-flow Python Package

Pulling and Using the Docker Image

These steps are restricted to CPG members only. Anyone will have access to the code in this public repositry and can build a version of cpg-flow themselves. The following requires authentication with the CPG's GCP.

To pull and use the Docker image for the cpg-flow Python package, follow these steps:

  1. Authenticate with Google Cloud Registry:

    gcloud auth configure-docker australia-southeast1-docker.pkg.dev
  2. Pull the Docker Image:

    • For alpha releases:

      docker pull australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow:0.1.0-alpha.11
    • For main releases:

      docker pull australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow:1.0.0
  3. Run the Docker Container:

    docker run -it australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow:<tag>

Temporary Images for Development

Temporary images are created for each commit and expire in 30 days. These images are useful for development and testing purposes.

  • Example of pulling a temporary image:

    docker pull australia-southeast1-docker.pkg.dev/cpg-common/images-tmp/cpg_flow:991cf5783d7d35dee56a7ab0452d54e69c695c4e

Accessing Build Images for CPG Members

Members of the CPG can find the build images in the Google Cloud Registry under the following paths:

  • Alpha and main releases: australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow
  • Temporary images: australia-southeast1-docker.pkg.dev/cpg-common/images-tmp/cpg_flow

Ensure you have the necessary permissions and are authenticated with Google Cloud to access these images.

Unit Tests

Unit tests are run in the Test CI workflow for each branch.

E2E Test

We recommend frequently running the manual test workflow found in test_workflows_shared specifically the cpg_flow_test workflow during development to ensure updates work with the CPG production environment.

Docummentation for running the tests are found in the repository readme.

▶️ Commands

Before testing, you must follow the installation steps.

🔍 Code linting & formatting

Precommit

In order to keep the code clean, consistent and free of bad python practices, more than Over 10 pre-commit hooks are enabled !

Complete list of all enabled rules is available in the .pre-commit-config.yaml file.

▶️ Commands

Before linting, you must follow the installation steps.

Then, run the following command

# Lint
pre-commit run --all-files

When setting up local linting for development you can also run the following once:

# Install the pre-commit hook
pre-commit install

# Or equivalently
make init || make init-dev

🥇 Project quality scanner

Multiple tools are set up to maintain the best code quality and to prevent vulnerabilities:

SonarQube

SonarQube summary is available here.

Coverage Duplicated Lines (%) Quality Gate Status

Technical Debt Vulnerabilities Code Smells

Reliability Rating Security Rating Bugs

Releases on main branch are generated and published automatically, pre-releases on the alpha branch are also generated and published by:

Semantic Release

It uses the conventional commit strategy.

This is enforced using the commitlint pre-commit hook that checks commit messages conform to the conventional commit standard.

We recommend installing and using the tool **commitizen in order to create commit messages. Once installed, you can use either cz commit or git cz to create a commitizen generated commit message.

Each change when a new release comes up is listed in the CHANGELOG.md file.

Also, you can keep up with changes by watching releases via the Watch GitHub button at the top of this page.

This project uses GitHub Actions to automate some boring tasks.

You can find all the workflows in the .github/workflows directory.

🎢 Workflows

Name Description & Status Triggered on
Docker Builds and pushes Docker images for the project.

Docker
pull_request on main, alpha and push on main, alpha and workflow_dispatch
Lint Runs linting checks using pre-commit hooks.

Lint
push
Package Packages the project and publishes it to PyPI and GitHub Releases.

Package
push on main, alpha
Renovate Runs Renovate to update dependencies.

Renovate
schedule and workflow_dispatch
Security Checks Performs security checks using pip-audit.

Security Checks
workflow_dispatch and push
Test Runs unit tests and generates coverage reports.

Test
push
Update Badges Updates badges.yaml with test results and coverage.

Update Badges
workflow_run (completed)
mkdocs Deploys API documentation to GitHub Pages.

mkdocs
push on alpha

This project is licensed under the MIT License.

There is no contributor yet. Want to be the first ?

If you want to contribute to this project, please read the contribution guide.