Skip to content

Commit

Permalink
update the script to check for IDs in retraction forms
Browse files Browse the repository at this point in the history
  • Loading branch information
xindiguo committed Dec 9, 2024
1 parent d05e927 commit fe4c501
Showing 1 changed file with 164 additions and 129 deletions.
293 changes: 164 additions & 129 deletions scripts/release/update_retraction_table.py
Original file line number Diff line number Diff line change
@@ -1,208 +1,243 @@
"""Update the BPC retraction table by 89+ patients and
latest main GENIE consortium release that is
tied to the most recent public release
Usage:
python update_retraction_table.py -c [cohort]] -m [version comment]
"""
import argparse
import logging
import pandas
import pandas as pd
import sys

import synapseclient
from synapseclient import Table

from synapseclient import Schema, Column, Table

# Synapse Table IDs
RETRACTION_TABLE_ID = "syn52915299"
PT_RETRACTION_TABLE_ID = "syn25998970"
SAMPLE_RETRACTION_TBL_ID = "syn25779833"
RELEASE_INFO_ID = "syn27628075"
BPC_PT_TABLE_ID = "syn21446700"

def download_synapse_table(syn, table_id, condition):
"""Download Synapse Table with the given table ID and condition
def download_synapse_table(syn, table_id, condition=None):
"""Download Synapse Table with the given table ID and condition.
Args:
syn: Synapse credential
table_id: Synapse ID of a table
condition: additional condition for querying the table
syn (Synapse): Synapse client object
table_id (str): Synapse Table ID
condition (str): SQL condition for querying the table (optional)
Returns:
Dataframe: synapse table
pd.DataFrame: DataFrame containing the queried table data
"""
if condition:
condition = " WHERE "+condition
synapse_table = syn.tableQuery("SELECT * from %s%s" % (table_id,condition))
synapse_table = synapse_table.asDataFrame()
return(synapse_table)

def setup_custom_logger(name):
"""Set up customer logger
condition_str = f" WHERE {condition}" if condition else ""
query = f"SELECT * FROM {table_id}{condition_str}"
synapse_table = syn.tableQuery(query)
return synapse_table.asDataFrame()

def setup_logger(name, log_file='log.txt'):
"""Set up a custom logger to log information to both console and a file.
Args:
name (String): Name of the logger
name (str): Logger name
log_file (str): Path to the log file
Returns:
logger
logging.Logger: Configured logger object
"""
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
handler = logging.FileHandler('log.txt', mode='w')
handler.setFormatter(formatter)
screen_handler = logging.StreamHandler(stream=sys.stdout)
screen_handler.setFormatter(formatter)
formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler = logging.FileHandler(log_file, mode='w')
file_handler.setFormatter(formatter)

console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)

logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.addHandler(screen_handler)
return(logger)
logger.addHandler(file_handler)
logger.addHandler(console_handler)

return logger

def synapse_login(synapse_config):
"""Log into Synapse
"""Log into Synapse using the provided configuration file.
Args:
synapse_config (String): File path to the Synapse config file
synapse_config (str): Path to the Synapse configuration file
Returns:
Synapse object
Synapse: Synapse client object
"""
try:
syn = synapseclient.login(silent=True)
except Exception:
syn = synapseclient.Synapse(configPath=synapse_config, silent=True)
syn.login()
return(syn)
return syn

def update_version(syn, table_id, comment):
"""Update the version of the table with a new snapshot and comment.
Args:
syn (Synapse): Synapse client object
table_id (str): Synapse Table ID
comment (str): Comment to attach to the snapshot
"""
Update the table version with given table ID and comment
"""
syn.restPOST("/entity/%s/table/snapshot" % table_id, body='{"snapshotComment":"%s"}' % comment)
snapshot_url = f"/entity/{table_id}/table/snapshot"
syn.restPOST(snapshot_url, body=f'{{"snapshotComment":"{comment}"}}')

def get_file_id_by_name(syn, folder_id, file_name):
"""Get file synapse ID by name while the parent folder is given
"""Retrieve the file ID for a given file name in a specified Synapse folder.
Args:
syn: Synapse Object
folder_id (String): Synapse Folder ID
file_name (String): File Name
syn (Synapse): Synapse client object
folder_id (str): Synapse Folder ID
file_name (str): The name of the file to search for
Returns:
str: File ID if found
"""
for f in syn.getChildren(folder_id):
f_name = f['name']
f_id = f['id']
if f_name == file_name:
return(f_id)

def download_sample_file(syn, file_id):
"""Download sample file by the file Synapse ID
for file in syn.getChildren(folder_id):
if file['name'] == file_name:
return file['id']
return None

def download_sample_file(syn, file_id):
"""Download the sample file by its Synapse file ID.
Args:
syn: Synapse Object
file_id (String): Synapse file ID
syn (Synapse): Synapse client object
file_id (str): Synapse file ID
Returns:
pd.DataFrame: DataFrame with patient IDs from the sample file
"""
file_entity = syn.get(file_id, followLink=True)
return(pandas.read_csv(file_entity.path, sep='\t', header=None, usecols=[0]))
return pd.read_csv(file_entity.path, sep='\t', header=None, usecols=[0])

def get_genie_id_list_from_bpc_form(syn, table_id, cohort, id_col_name):
"""Get the list of retracted GENIE ids given table id, cohort, ID column name
Args:
syn (Object): Synapse Object
table_id (str): Synapse Table ID
cohort (str): name of the cohort
id_col_name (str): ID column name
Returns:
list: the list of GENIE IDs
"""
table = download_synapse_table(syn, table_id)
cohort_columns = table.columns[table.columns.str.match(cohort)]
genie_id_list = []
for col in cohort_columns:
genie_id_list.extend(table.loc[table[col],id_col_name].dropna().unique().tolist())
return genie_id_list

def main():
parser = argparse.ArgumentParser(
description='Update retraction for release table on Synapse for BPC')
parser.add_argument(
"-c", "--cohort",
help="Cohort to release. i.e. NSCLC, CRC, BrCa, BLADDER..."
)
parser.add_argument(
"-s", "--synapse_config",
default=synapseclient.client.CONFIG_FILE,
help="Synapse credentials file"
)
parser.add_argument(
"-m","--message",
default="",
help = "Version comment"
)
parser.add_argument(
"-d", "--dry_run",
action="store_true",
help="dry run flag"
)

"""Main function to update the retraction table for a specific cohort."""
parser = argparse.ArgumentParser(description='Update retraction for release table on Synapse for BPC')
parser.add_argument("-c", "--cohort", help="Cohort to release (e.g., NSCLC, CRC, BrCa, BLADDER)", required=True)
parser.add_argument("-s", "--synapse_config", default=synapseclient.client.CONFIG_FILE, help="Path to Synapse credentials file")
parser.add_argument("-m", "--message", default="", help="Version comment for the table update")
parser.add_argument("-d", "--dry_run", action="store_true", help="Flag for dry run (no updates will be made)")

args = parser.parse_args()

# Initialize variables
cohort = args.cohort
synapse_config = args.synapse_config
comment = args.message
dry_run = args.dry_run

#login to synapse
# Log in to Synapse
syn = synapse_login(synapse_config)

#create logger
logger_name = "testing" if dry_run else "production"
logger = setup_custom_logger(logger_name)
logger.info('Updating BPC retraction for release table on Synapse!')
# Create logger
logger_name = "dry_run" if dry_run else "production"
logger = setup_logger(logger_name)
logger.info('Starting BPC retraction update process!')

#read the BPC patient table and get 89+ patients for the cohort
bpc_cohort_patient = download_synapse_table(syn, BPC_PT_TABLE_ID, "cohort='"+cohort+"'")
# Download cohort-specific patient data from BPC table
bpc_cohort_patient = download_synapse_table(syn, BPC_PT_TABLE_ID, f"cohort='{cohort}'")
cohort_patient_list = list(bpc_cohort_patient['record_id'])
redacted_patient = bpc_cohort_patient[bpc_cohort_patient['redacted']=="Yes"]
redacted_patient = bpc_cohort_patient[bpc_cohort_patient['redacted'] == "Yes"]
redacted_patient_list = list(redacted_patient['record_id'])

#read release info
release_info = download_synapse_table(syn, RELEASE_INFO_ID, "cohort='"+cohort+"' and current=True")
# Retrieve release info
release_info = download_synapse_table(syn, RELEASE_INFO_ID, f"cohort='{cohort}' and current=True")

#load the main GENIE release
# Get the main GENIE release version
main_genie_release_folder = release_info['main_genie_release'].values[0]
main_genie_release_version = syn.get(main_genie_release_folder).name
clinical_file_id = get_file_id_by_name(syn, main_genie_release_folder, 'data_clinical_sample.txt')
clinical_pt_from_sample = download_sample_file(syn, clinical_file_id)
clinical_pt_from_sample.columns = ['patient_id']
main_genie_patient_list = list(set(clinical_pt_from_sample.iloc[5:]['patient_id']))

#load the existing redacted patient list
current_redacted = download_synapse_table(syn, RETRACTION_TABLE_ID, "cohort='"+cohort+"'")
# Load the existing redacted patient list and sample list
current_redacted = download_synapse_table(syn, RETRACTION_TABLE_ID, f"cohort='{cohort}'")
current_patient_list = list(current_redacted['patient_id'])
current_sample_list = list(current_redacted['sample_id'])

new_redacted_df = pandas.DataFrame()

#compare redacted patient between BPC table vs Redaction for Release Table
# Prepare new redacted patient/sample
new_retracted_df_list = []

# Compare redacted patients from BPC
new_patient_from_bpc = list(set(redacted_patient_list) - set(current_patient_list))
if len(new_patient_from_bpc) == 0:
logger.info('No additional redacted patient is added to the '+cohort)
else:
logger.info(str(len(new_patient_from_bpc)) + " patients are added to the retraction table due to 89+")
new_patient_from_bpc_df = pandas.DataFrame({'cohort':cohort,
'patient_id':new_patient_from_bpc,
'reason': '89+'})

#compare redacted patient between main GENIE vs BPC tables
redacted_from_main = list(set(cohort_patient_list)-set(main_genie_patient_list))
if len(redacted_from_main) == 0:
logger.info('No additional redacted patient is found due to main GENIE retraction to the '+cohort)
else:
logger.info(str(len(redacted_from_main)) + " patients are found to be retracted in "+main_genie_release_version)
if new_patient_from_bpc:
logger.info(f"{len(new_patient_from_bpc)} patients added to retraction table due to 89+")
new_patient_from_bpc_df = pd.DataFrame({
'cohort': cohort,
'patient_id': new_patient_from_bpc,
'reason': '89+'
})
new_retracted_df_list.append(new_patient_from_bpc_df)

# Compare redacted patients from GENIE release
redacted_from_main = list(set(cohort_patient_list) - set(main_genie_patient_list))
if redacted_from_main:
logger.info(f"{len(redacted_from_main)} patients found to be retracted in {main_genie_release_version}")
new_patient_from_main = list(set(redacted_from_main) - set(current_patient_list))
logger.info(str(len(new_patient_from_main)) + " patients are added to the retraction table due to main GENIE retraction")
new_patient_from_main_df = pandas.DataFrame({'cohort':cohort,
'patient_id':new_patient_from_main,
'reason': main_genie_release_version})

# append the table
new_retracted_df = pandas.DataFrame()
if 'new_patient_from_bpc_df' in locals():
new_retracted_df = new_retracted_df.append(new_patient_from_bpc_df)
if 'new_patient_from_main_df' in locals():
new_retracted_df = new_retracted_df.append(new_patient_from_main_df)
logger.info(f"{len(new_patient_from_main)} patients added to retraction table due to main GENIE retraction")
new_patient_from_main_df = pd.DataFrame({
'cohort': cohort,
'patient_id': new_patient_from_main,
'reason': main_genie_release_version
})
new_retracted_df_list.append(new_patient_from_main_df)

# Compare retracted patients from BPC form
patient_from_form = get_genie_id_list_from_bpc_form(syn, PT_RETRACTION_TABLE_ID, cohort,'record_id')
new_patient_from_form = list(set(patient_from_form) - set(current_patient_list))
if new_patient_from_form:
logger.info(f"{len(new_patient_from_form)} patients added to retraction table; submitted by the sites")
new_patient_from_bpc_form = pd.DataFrame({
'cohort': cohort,
'patient_id': new_patient_from_form,
'reason': 'retraction form'
})
new_retracted_df_list.append(new_patient_from_bpc_form)

# Compare retracted sample from BPC form
sample_from_form = get_genie_id_list_from_bpc_form(syn, SAMPLE_RETRACTION_TBL_ID, cohort, 'SAMPLE_ID')
new_sample_from_form = list(set(sample_from_form) - set(current_sample_list))
if new_sample_from_form:
logger.info(f"{len(new_sample_from_form)} samples added to retraction table; submitted by the sites")
new_patient_from_bpc_form = pd.DataFrame({
'cohort': cohort,
'sample_id': new_sample_from_form,
'reason': 'retraction form'
})
new_retracted_df_list.append(new_sample_from_form)

new_retracted_df = pd.concat(new_retracted_df_list)

# Final actions based on dry run flag
if new_retracted_df.empty:
logger.info('No new patient is added to the retraction for release from BPC and '+main_genie_release_version+" for "+cohort)
logger.info(f"No new patient/sample added to the retraction table for {cohort} cohort.")
else:
if dry_run:
logger.info("Write to a temp file for review")
logger.info("Dry run: Writing new retracted patients/samples to temp file.")
new_retracted_df.to_csv("retraction_temp.csv")
else:
logger.info("Upating the retraction for release table...")
logger.info("Updating retraction table...")
table_schema = syn.get(RETRACTION_TABLE_ID)
table = syn.store(Table(table_schema, new_retracted_df))
update_version(syn, RETRACTION_TABLE_ID, comment)

if __name__ == "__main__":
main()
main()

0 comments on commit fe4c501

Please sign in to comment.