Skip to content

Commit

Permalink
ClinVar Re-processing Updates (#388)
Browse files Browse the repository at this point in the history
* multiple amendments - see #388 text
  • Loading branch information
MattWellie authored Apr 24, 2024
1 parent 5dcdff9 commit d57e4ca
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 200 deletions.
92 changes: 33 additions & 59 deletions reanalysis/clinvar_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,31 @@
Entrypoint for clinvar summary generation
"""

import logging
from datetime import datetime
from os.path import join

import click

from hailtop.batch.job import Job

from cpg_utils import Path, to_path
from cpg_utils.config import get_config
from cpg_utils.hail_batch import (
authenticate_cloud_credentials_in_job,
get_batch,
query_command,
)
from cpg_utils.config import config_retrieve
from cpg_utils.hail_batch import authenticate_cloud_credentials_in_job, get_batch, query_command

from reanalysis import clinvar_by_codon, seqr_loader, summarise_clinvar_entries
from reanalysis.static_values import get_logger
from reanalysis.vep_jobs import add_vep_jobs


def generate_clinvar_table(
clinvar_table_path: Path,
clinvar_folder: Path,
snv_vcf: Path,
date: str | None = None,
):
def generate_clinvar_table(cloud_folder: Path, clinvar_outputs: str):
"""
set up the job that does de novo clinvar summary
Args:
clinvar_table_path (Path): where to write the new decisions
clinvar_folder (Path): where to write all clinvar files
snv_vcf (Path): SNV VCF to generate
date (str): date for submission filtering, optional
cloud_folder (Path): folder for this analysis
clinvar_outputs (str): prefix for writing new files/dirs
"""

bash_job = get_batch().new_bash_job(name='copy clinvar files to local')
bash_job.image(get_config()['workflow']['driver_image'])
bash_job.image(config_retrieve(['workflow', 'driver_image']))

directory = 'https://ftp.ncbi.nlm.nih.gov/pub/clinvar/tab_delimited/'
sub_file = 'submission_summary.txt.gz'
Expand All @@ -53,24 +40,26 @@ def generate_clinvar_table(
)

# write output files date-specific
get_batch().write_output(bash_job.subs, str(clinvar_folder / sub_file))
get_batch().write_output(bash_job.vars, str(clinvar_folder / var_file))
get_batch().write_output(bash_job.subs, str(cloud_folder / sub_file))
get_batch().write_output(bash_job.vars, str(cloud_folder / var_file))

# region: run the summarise_clinvar_entries script
summarise = get_batch().new_job(name='summarise clinvar')
summarise.depends_on(bash_job)

summarise.cpu(2).image(get_config()['workflow']['driver_image']).storage('20G')
summarise.cpu(2).image(config_retrieve(['workflow', 'driver_image'])).storage('20G')
authenticate_cloud_credentials_in_job(summarise)
command_options = f'-s {bash_job.subs} -v {bash_job.vars} -o {clinvar_table_path} --path_snv {snv_vcf} '
if date:
command_options += f' -d {date}'
summarise.command(f'python3 {summarise_clinvar_entries.__file__} {command_options}')
summarise.command(
f'python3 {summarise_clinvar_entries.__file__} '
f'-s {bash_job.subs} '
f'-v {bash_job.vars} '
f'-o {clinvar_outputs}',
)

return summarise


def generate_annotated_data(annotation_out: Path, snv_vcf: Path, tmp_path: Path, dependency: Job | None = None) -> Job:
def generate_annotated_data(annotation_out: Path, snv_vcf: str, tmp_path: Path, dependency: Job | None = None) -> Job:
"""
if the annotated data Table doesn't exist, generate it
Expand Down Expand Up @@ -104,7 +93,7 @@ def generate_annotated_data(annotation_out: Path, snv_vcf: Path, tmp_path: Path,
dependency = vep_jobs[-1]

j = get_batch().new_job('annotate cohort')
j.image(get_config()['workflow']['driver_image'])
j.image(config_retrieve(['workflow', 'driver_image']))

# run seqr_loader, only applying VEP annotations
j.command(
Expand All @@ -124,51 +113,36 @@ def generate_annotated_data(annotation_out: Path, snv_vcf: Path, tmp_path: Path,
return j


@click.command
@click.option('--date', help='Submission cut-off date, optional', default=None)
@click.option('--folder', help='Folder to write to, optional', default=None)
def main(date: str | None = None, folder: str | None = None):
def main():
"""
run the clinvar summary, output to common path
folder argument can override the common bucket output path
Args:
date (str | None): a cut-off data for Clinvar subs
folder (str | None): a folder to write to, optional
"""

# print the config we use once
_conf = get_config(True)

if folder is None:
cloud_folder = to_path(
join(get_config()['storage']['common']['analysis'], 'aip_clinvar', datetime.now().strftime('%y-%m')),
)

elif isinstance(folder, str):
cloud_folder = to_path(folder)

else:
raise ValueError('folder must be a string or None')
cloud_folder = to_path(
join(config_retrieve(['storage', 'common', 'analysis']), 'aip_clinvar', datetime.now().strftime('%y-%m')),
)

# clinvar VCF, decisions, annotated VCF, and PM5
snv_vcf = cloud_folder / 'pathogenic_snv.vcf.bgz'
clinvar_table_path = cloud_folder / 'clinvar_decisions.ht'
clinvar_output_path = join(str(cloud_folder), 'clinvar_decisions')
clinvar_ht = f'{clinvar_output_path}.ht'
snv_vcf = f'{clinvar_output_path}.vcf.bgz'
clinvar_pm5_path = cloud_folder / 'clinvar_pm5.ht'
annotated_clinvar = cloud_folder / 'annotated_clinvar.mt'

# check if we can just quit already
if all(this_path.exists() for this_path in [annotated_clinvar, clinvar_table_path, clinvar_pm5_path]):
logging.info('Clinvar data already exists, exiting')
if all(this_path.exists() for this_path in [annotated_clinvar, clinvar_ht, clinvar_pm5_path]):
get_logger().info('Clinvar data already exists, exiting')
return

temp_path = to_path(join(get_config()['storage']['common']['tmp'], 'aip_clinvar', datetime.now().strftime('%y-%m')))
temp_path = to_path(
join(config_retrieve(['storage', 'common', 'tmp']), 'aip_clinvar', datetime.now().strftime('%y-%m')),
)

dependency = None

# generate a new round of clinvar decisions
if not all(output.exists() for output in [clinvar_table_path, snv_vcf]):
dependency = generate_clinvar_table(clinvar_table_path, cloud_folder, snv_vcf, date)
if not all(to_path(output).exists() for output in [clinvar_ht, snv_vcf]):
dependency = generate_clinvar_table(cloud_folder, clinvar_output_path)

# create the annotation job(s)
if not annotated_clinvar.exists():
Expand All @@ -177,7 +151,7 @@ def main(date: str | None = None, folder: str | None = None):
# region: run the clinvar_by_codon script
if not clinvar_pm5_path.exists():
clinvar_by_codon_job = get_batch().new_job(name='clinvar_by_codon')
clinvar_by_codon_job.image(get_config()['workflow']['driver_image']).cpu(2).storage('20G')
clinvar_by_codon_job.image(config_retrieve(['workflow', 'driver_image'])).cpu(2).storage('20G')
authenticate_cloud_credentials_in_job(clinvar_by_codon_job)
clinvar_by_codon_job.command(
f'python3 {clinvar_by_codon.__file__} --mt_path {annotated_clinvar} --write_path {clinvar_pm5_path}',
Expand Down
Loading

0 comments on commit d57e4ca

Please sign in to comment.