diff --git a/cg_lims/EPPs/files/base.py b/cg_lims/EPPs/files/base.py index e5cbef56..54df436a 100644 --- a/cg_lims/EPPs/files/base.py +++ b/cg_lims/EPPs/files/base.py @@ -3,16 +3,17 @@ from cg_lims.EPPs.files.barcode_tubes import make_barcode_csv from cg_lims.EPPs.files.csv_for_kapa_truble_shooting.csv_for_kapa_debug import trouble_shoot_kapa from cg_lims.EPPs.files.femtopulse_csv import make_femtopulse_csv -from cg_lims.EPPs.files.file_to_udf import csv_well_to_udf from cg_lims.EPPs.files.hamilton.base import hamilton -from cg_lims.EPPs.files.ont_json_to_udf import parse_ont_report +from cg_lims.EPPs.files.parsers.file_to_udf import csv_well_to_udf +from cg_lims.EPPs.files.parsers.illumina_xml_to_udf import parse_run_parameters +from cg_lims.EPPs.files.parsers.ont_json_to_udf import parse_ont_report +from cg_lims.EPPs.files.parsers.quantit_excel_to_udf import quantit_excel_to_udf from cg_lims.EPPs.files.placement_map.make_96well_placement_map import placement_map from cg_lims.EPPs.files.pooling_map.make_pooling_map import pool_map from cg_lims.EPPs.files.sample_sheet.create_ont_sample_sheet import create_ont_sample_sheet from cg_lims.EPPs.files.sample_sheet.create_sample_sheet import create_sample_sheet from cg_lims.EPPs.files.smrt_link.run_design import create_smrtlink_run_design from cg_lims.EPPs.files.smrt_link.sample_setup import create_smrtlink_sample_setup -from cg_lims.EPPs.files.xml_to_udf import parse_run_parameters @click.group(invoke_without_command=True) @@ -35,3 +36,4 @@ def files(ctx): files.add_command(make_femtopulse_csv) files.add_command(create_smrtlink_sample_setup) files.add_command(create_smrtlink_run_design) +files.add_command(quantit_excel_to_udf) diff --git a/cg_lims/EPPs/files/file_to_udf.py b/cg_lims/EPPs/files/file_to_udf.py deleted file mode 100644 index 9e44396b..00000000 --- a/cg_lims/EPPs/files/file_to_udf.py +++ /dev/null @@ -1,100 +0,0 @@ -#!/usr/bin/env python - -import csv -import logging -import sys -from pathlib import Path - -import click -from cg_lims import options -from cg_lims.exceptions import LimsError, MissingArtifactError, MissingFileError -from cg_lims.get.artifacts import get_artifact_by_name -from cg_lims.get.files import get_file_path -from genologics.entities import Artifact - -LOG = logging.getLogger(__name__) - - -def make_well_dict(process, lims, input): - """Creates a well dict based on input_output_map - keys: well of input artifact - values: input/output artifact depending on the input flag - """ - - well_dict = {} - for inp, outp in process.input_output_maps: - if outp.get("output-generation-type") == "PerAllInputs": - continue - in_art = Artifact(lims, id=inp["limsid"]) - out_art = Artifact(lims, id=outp["limsid"]) - source_art = in_art if input == True else out_art - col, row = source_art.location[1].split(":") - well = col + row - well_dict[well] = out_art - return well_dict - - -def set_udfs(well_field: str, value_field: str, udf: str, well_dict: dict, result_file: Path): - """Reads the csv and sets the value for each sample""" - - error_msg = [] - passed_arts = 0 - with open(result_file, newline="", encoding="latin1") as csvfile: - reader = csv.DictReader(csvfile) - for sample in reader: - well = sample.get(well_field) - value = sample.get(value_field) - if value is None: - error_msg.append("Some samples in the file had missing values.") - LOG.info(f"Missing value for sample {sample} in well {well}. Skipping!") - continue - elif well not in well_dict: - LOG.info(f"Well {well} was not found in the step. Skipping!") - continue - art = well_dict[well] - try: - art.udf[udf] = str(value) - except: - art.udf[udf] = float(value) - art.put() - passed_arts += 1 - - if passed_arts < len(well_dict.keys()): - error_msg.append("Some samples in the step were not represented in the file.") - - error_string = " ".join(list(set(error_msg))) - if error_msg: - raise MissingArtifactError(error_string) - - -@click.command() -@options.file_placeholder(help="File placeholder name.") -@options.local_file() -@options.udf() -@options.well_field() -@options.value_field() -@options.input() -@click.pass_context -def csv_well_to_udf( - ctx, file: str, well_field: str, value_field: str, udf: str, input: bool, local_file: str -): - """Script to copy data from file to udf based on well position""" - - LOG.info(f"Running {ctx.command_path} with params: {ctx.params}") - process = ctx.obj["process"] - lims = ctx.obj["lims"] - - if local_file: - file_path = local_file - else: - file_art = get_artifact_by_name(process=process, name=file) - file_path = get_file_path(file_art) - - try: - if not Path(file_path).is_file(): - raise MissingFileError(f"No such file: {file_path}") - well_dict = make_well_dict(process, lims, input) - set_udfs(well_field, value_field, udf, well_dict, file_path) - click.echo("The udfs were sucessfully populated.") - except LimsError as e: - sys.exit(e.message) diff --git a/cg_lims/EPPs/files/parsers/__init__.py b/cg_lims/EPPs/files/parsers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cg_lims/EPPs/files/parsers/file_to_udf.py b/cg_lims/EPPs/files/parsers/file_to_udf.py new file mode 100644 index 00000000..a7b96f10 --- /dev/null +++ b/cg_lims/EPPs/files/parsers/file_to_udf.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python + +import csv +import logging +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import click +from cg_lims import options +from cg_lims.exceptions import ArgumentError, LimsError, MissingArtifactError, MissingFileError +from cg_lims.get.artifacts import create_well_dict, get_artifact_by_name +from cg_lims.get.files import get_file_path +from genologics.entities import Artifact, Process + +LOG = logging.getLogger(__name__) + + +def make_udf_dict(udfs: Tuple[str], value_fields: Tuple[str]) -> Dict[str, str]: + """Create dictionary containing UDF names and their corresponding value field names.""" + if len(udfs) != len(value_fields): + raise ArgumentError( + f"The number of artifact-udfs to update and file value fields must be the same." + ) + udf_vf_dict: dict = {} + for i in range(len(udfs)): + udf_vf_dict[udfs[i]] = value_fields[i] + return udf_vf_dict + + +def get_file_placeholder_paths(placeholder_names: List[str], process: Process) -> List[str]: + """Convert a list of file placeholder names to complete file paths.""" + file_paths: List[str] = [] + for placeholder_name in placeholder_names: + file_artifact: Artifact = get_artifact_by_name(process=process, name=placeholder_name) + file_paths.append(get_file_path(file_artifact=file_artifact)) + return file_paths + + +def set_udfs_from_file( + well_field: str, udf_vf_dict: Dict[str, str], well_dict: dict, result_file: Path +) -> List[str]: + """Parse a CSV file and set the corresponding UDF values for each sample.""" + error_msg: List[str] = [] + passed_arts: int = 0 + with open(result_file, newline="", encoding="latin1") as csvfile: + reader: csv.DictReader = csv.DictReader(csvfile) + for udf_name in list(udf_vf_dict.keys()): + if udf_vf_dict[udf_name] not in reader.fieldnames: + LOG.info( + f"Value {udf_vf_dict[udf_name]} does not exist in file {result_file}, skipping." + ) + continue + value_field: str = udf_vf_dict.pop(udf_name) + + for sample in reader: + well: str = sample.get(well_field) + if well not in well_dict: + LOG.info(f"Well {well} was not found in the step. Skipping!") + continue + artifact: Artifact = well_dict[well] + value: Any = sample.get(value_field) + if value is None: + error_msg.append("Some samples in the file had missing values.") + LOG.info(f"Missing value for sample {sample} in well {well}. Skipping!") + continue + try: + artifact.udf[udf_name] = str(value) + except: + artifact.udf[udf_name] = float(value) + artifact.put() + passed_arts += 1 + + if passed_arts < len(well_dict.keys()): + error_msg.append("Some samples in the step were not represented in the file.") + + return error_msg + + +def set_udfs( + well_fields: List[str], + udf_vf_dict: Dict[str, str], + well_dict: dict, + file_placeholders: List[str], + local_files: Optional[List[str]], + process: Process, +) -> None: + """Loop through each given file and parse out the given values which are then set to their corresponding UDFs.""" + if local_files: + files: List[str] = local_files + else: + files: List[str] = get_file_placeholder_paths( + placeholder_names=file_placeholders, process=process + ) + if len(well_fields) != len(files): + raise ArgumentError(f"The number of files to read and file value fields must be the same.") + + file_well_list: zip = zip(files, well_fields) + error_message: List[str] = [] + + for file_tuple in file_well_list: + file: str = file_tuple[0] + well_field: str = file_tuple[1] + if not Path(file).is_file(): + raise MissingFileError(f"No such file: {file}") + error_message += set_udfs_from_file( + well_field=well_field, + udf_vf_dict=udf_vf_dict, + well_dict=well_dict, + result_file=Path(file), + ) + + if error_message: + error_string: str = " ".join(list(set(error_message))) + raise MissingArtifactError(error_string) + + +@click.command() +@options.file_placeholders(help="File placeholder name.") +@options.local_files() +@options.udf_values() +@options.well_fields() +@options.value_fields() +@options.input() +@click.pass_context +def csv_well_to_udf( + ctx, + files: Tuple[str], + local_files: Tuple[str], + udf_values: Tuple[str], + well_fields: Tuple[str], + value_fields: Tuple[str], + input: bool, +): + """Script to copy data from files to UDFs based on well position.""" + + LOG.info(f"Running {ctx.command_path} with params: {ctx.params}") + process: Process = ctx.obj["process"] + + try: + well_dict: Dict[str, Artifact] = create_well_dict(process=process, input_flag=input) + udf_vf_dict: Dict[str, str] = make_udf_dict(udfs=udf_values, value_fields=value_fields) + set_udfs( + well_fields=list(well_fields), + udf_vf_dict=udf_vf_dict, + well_dict=well_dict, + file_placeholders=list(files), + local_files=list(local_files), + process=process, + ) + click.echo("The UDFs were successfully populated.") + except LimsError as e: + sys.exit(e.message) diff --git a/cg_lims/EPPs/files/xml_to_udf.py b/cg_lims/EPPs/files/parsers/illumina_xml_to_udf.py similarity index 100% rename from cg_lims/EPPs/files/xml_to_udf.py rename to cg_lims/EPPs/files/parsers/illumina_xml_to_udf.py diff --git a/cg_lims/EPPs/files/ont_json_to_udf.py b/cg_lims/EPPs/files/parsers/ont_json_to_udf.py similarity index 100% rename from cg_lims/EPPs/files/ont_json_to_udf.py rename to cg_lims/EPPs/files/parsers/ont_json_to_udf.py diff --git a/cg_lims/EPPs/files/parsers/quantit_excel_to_udf.py b/cg_lims/EPPs/files/parsers/quantit_excel_to_udf.py new file mode 100644 index 00000000..a344a275 --- /dev/null +++ b/cg_lims/EPPs/files/parsers/quantit_excel_to_udf.py @@ -0,0 +1,80 @@ +import logging +import sys +from pathlib import Path +from typing import Dict + +import click +import pandas as pd +from cg_lims import options +from cg_lims.exceptions import LimsError, MissingArtifactError, MissingFileError +from cg_lims.get.artifacts import create_well_dict, get_artifact_by_name +from cg_lims.get.files import get_file_path +from genologics.entities import Artifact, Process + +LOG = logging.getLogger(__name__) + + +def set_udfs(udf: str, well_dict: dict, result_file: Path): + """Reads the Quant-iT Excel file and sets the value for each sample""" + + failed_artifacts: int = 0 + skipped_artifacts: int = 0 + df: pd.DataFrame = pd.read_excel(result_file, skiprows=11, header=None) + for index, row in df.iterrows(): + if row[0] not in well_dict.keys(): + LOG.info(f"Well {row[0]} is not used by a sample in the step, skipping.") + skipped_artifacts += 1 + continue + elif pd.isna(row[2]): + LOG.info( + f"Well {row[0]} does not have a valid concentration value ({row[2]}), skipping." + ) + failed_artifacts += 1 + continue + artifact: Artifact = well_dict[row[0]] + artifact.udf[udf] = row[2] + artifact.put() + + if failed_artifacts or skipped_artifacts: + error_message = "Warning:" + if failed_artifacts: + error_message += f" Skipped {failed_artifacts} artifact(s) with wrong and/or blank values for some UDFs." + if skipped_artifacts: + error_message += f" Skipped {failed_artifacts} artifact(s) as they weren't represented in the result file." + raise MissingArtifactError(error_message) + + +@click.command() +@options.file_placeholder(help="File placeholder name.") +@options.local_file() +@options.udf() +@options.input() +@click.pass_context +def quantit_excel_to_udf( + ctx, + file: str, + local_file: str, + udf: str, + input: bool, +): + """Script to copy data from a Quant-iT result Excel file to concentration UDFs based on well position""" + + LOG.info(f"Running {ctx.command_path} with params: {ctx.params}") + process: Process = ctx.obj["process"] + + if local_file: + file_path: str = local_file + else: + file_art: Artifact = get_artifact_by_name(process=process, name=file) + file_path: str = get_file_path(file_art) + + try: + if not Path(file_path).is_file(): + raise MissingFileError(f"No such file: {file_path}") + well_dict: Dict[str, Artifact] = create_well_dict( + process=process, input_flag=input, quantit_well_format=True + ) + set_udfs(udf=udf, well_dict=well_dict, result_file=Path(file_path)) + click.echo(f"Updated {len(well_dict.keys())} artifact(s) successfully.") + except LimsError as e: + sys.exit(e.message) diff --git a/cg_lims/get/artifacts.py b/cg_lims/get/artifacts.py index 2568b447..c05bb919 100644 --- a/cg_lims/get/artifacts.py +++ b/cg_lims/get/artifacts.py @@ -3,7 +3,8 @@ from enum import Enum from typing import Dict, List, Literal, Optional, Set, Tuple -from cg_lims.exceptions import FileError, MissingArtifactError +from cg_lims.exceptions import FileError, InvalidValueError, MissingArtifactError +from cg_lims.get.fields import get_artifact_well, get_quantit_artifact_well from genologics.entities import Artifact, Process, Sample from genologics.lims import Lims @@ -245,3 +246,36 @@ def get_non_pooled_artifacts(artifact: Artifact) -> List[Artifact]: for artifact in artifact.input_artifact_list(): artifacts.extend(get_non_pooled_artifacts(artifact)) return artifacts + + +def create_well_dict( + process: Process, + input_flag: bool = False, + native_well_format: bool = False, + quantit_well_format: bool = False, +) -> Dict[str, Artifact]: + """Creates a well dict based on the input_output_map + keys: well of input artifact + values: input/output artifact depending on the input flag + """ + + well_dict: Dict[str, Artifact] = {} + lims: Lims = process.lims + for input, output in process.input_output_maps: + if output.get("output-generation-type") == "PerAllInputs": + continue + input_artifact = Artifact(lims, id=input["limsid"]) + output_artifact = Artifact(lims, id=output["limsid"]) + source_artifact: Artifact = input_artifact if input_flag else output_artifact + if native_well_format: + well: str = source_artifact.location[1] + elif quantit_well_format: + well: str = get_quantit_artifact_well(artifact=source_artifact) + else: + well: str = get_artifact_well(artifact=source_artifact) + if well in well_dict.keys(): + raise InvalidValueError( + f"Can't create dictionary! Well {well} is already used by another artifact." + ) + well_dict[well] = output_artifact + return well_dict diff --git a/cg_lims/get/fields.py b/cg_lims/get/fields.py index 24f65442..b3fd6a9a 100644 --- a/cg_lims/get/fields.py +++ b/cg_lims/get/fields.py @@ -65,6 +65,15 @@ def get_artifact_well(artifact: Artifact) -> str: return location[1].replace(":", "") +def get_quantit_artifact_well(artifact: Artifact) -> str: + """Parsing out the well position from LocationDescriptor""" + + col, row = artifact.location[1].split(":") + if int(row) < 10: + row = "0" + row + return col + row + + def get_index_well(artifact: Artifact): """Parsing out the index well position from the reagent label string which typically looks like this: '44_A05 IDT_10nt_446 (AGCGTGACCT-CCATCCGAGT)' diff --git a/cg_lims/models/arnold/prep/twist/aliquot_samples_for_enzymatic_fragmentation_twist.py b/cg_lims/models/arnold/prep/twist/aliquot_samples_for_enzymatic_fragmentation_twist.py index c6168eef..fe164eff 100644 --- a/cg_lims/models/arnold/prep/twist/aliquot_samples_for_enzymatic_fragmentation_twist.py +++ b/cg_lims/models/arnold/prep/twist/aliquot_samples_for_enzymatic_fragmentation_twist.py @@ -23,7 +23,9 @@ class ProcessUDFs(BaseModel): lot_nr_h2o_aliquot_samples_fragmentation: Optional[str] = Field( None, alias="Nuclease free water" ) - lot_nr_elution_buffer_aliquot_samples_fragmentation: Optional[str] = Field(None, alias="Lot no: Elution Buffer") + lot_nr_elution_buffer_aliquot_samples_fragmentation: Optional[str] = Field( + None, alias="Lot no: Elution Buffer" + ) class ArtifactUDFs(BaseModel): diff --git a/cg_lims/models/arnold/prep/twist/bead_purification_twist.py b/cg_lims/models/arnold/prep/twist/bead_purification_twist.py index a9526185..8a6eeaec 100644 --- a/cg_lims/models/arnold/prep/twist/bead_purification_twist.py +++ b/cg_lims/models/arnold/prep/twist/bead_purification_twist.py @@ -14,7 +14,9 @@ class ArtifactUDFs(BaseModel): class ProcessUDFs(BaseModel): lot_nr_etoh_bead_purification_post_hyb: Optional[str] = Field(None, alias="Ethanol") lot_nr_h2o_bead_purification_post_hyb: Optional[str] = Field(None, alias="Nuclease free water") - lot_nr_elution_buffer_bead_purification_post_hyb: Optional[str] = Field(None, alias="Lot no: Elution Buffer") + lot_nr_elution_buffer_bead_purification_post_hyb: Optional[str] = Field( + None, alias="Lot no: Elution Buffer" + ) bead_purification_post_hyb_method: Optional[str] = Field(None, alias="Method document") binding_and_purification_beads: Optional[str] = Field( None, alias="Twist Binding and Purification beads" diff --git a/cg_lims/options.py b/cg_lims/options.py index b1e3473e..874a01dc 100644 --- a/cg_lims/options.py +++ b/cg_lims/options.py @@ -47,6 +47,12 @@ def well_field( return click.option("-wf", "--well-field", required=True, help=help) +def well_fields( + help: str = "Well field in file", +) -> click.option: + return click.option("-wf", "--well-fields", required=True, multiple=True, help=help) + + def value_field( help: str = "Value field in file", ) -> click.option: @@ -92,7 +98,7 @@ def file_placeholder( def file_placeholders( help: str = "File placeholder option used when multiple are possible.", ) -> click.option: - return click.option("-f", "--files", required=True, multiple=True, help=help) + return click.option("-f", "--files", required=False, multiple=True, help=help) def samples_file(help: str = "Txt file with sample ids") -> click.option: @@ -107,6 +113,10 @@ def local_file(help="local file path for debug purposes.") -> click.option: return click.option("-lf", "--local_file", required=False, help=help) +def local_files(help="local file paths for debug purposes.") -> click.option: + return click.option("-lf", "--local_files", required=False, multiple=True, help=help) + + def input( help: str = "Use this flag if you run the script from a QC step.", ) -> click.option: @@ -741,3 +751,9 @@ def round_decimals( help: str = "The number of decimals you want to round to.", ) -> click.option: return click.option("-r", "--round-decimals", required=False, help=help) + + +def value_fields( + help: str = "Value fields in file", +) -> click.option: + return click.option("-vf", "--value-fields", required=True, multiple=True, help=help)