diff --git a/src/dcd_mapping/lookup.py b/src/dcd_mapping/lookup.py index 5b1fa5f..5e8f873 100644 --- a/src/dcd_mapping/lookup.py +++ b/src/dcd_mapping/lookup.py @@ -12,7 +12,7 @@ from cool_seq_tool.schemas import TranscriptPriority from ga4gh.core._internal.models import Extension, Gene from ga4gh.vrs._internal.models import Allele, SequenceLocation -from ga4gh.vrs.extras.translator import Translator +from ga4gh.vrs.extras.translator import AlleleTranslator from gene.database import create_db from gene.query import QueryHandler from gene.schemas import SourceName @@ -79,15 +79,14 @@ def __new__(cls) -> QueryHandler: class VrsTranslatorBuilder: """Singleton constructor for VRS-Python translator instance.""" - # TODO this looks.... very wrong? - def __new__(cls) -> Translator: + def __new__(cls) -> AlleleTranslator: """Provide VRS-Python translator. Construct if unavailable. :return: singleton instances of Translator """ if not hasattr(cls, "instance"): cst = CoolSeqToolBuilder() - cls.instance = Translator(cst.seqrepo_access, normalize=False) + cls.instance = AlleleTranslator(cst.seqrepo_access, normalize=False) return cls.instance diff --git a/src/dcd_mapping/resources.py b/src/dcd_mapping/resources.py index f6e0b50..dde7cf7 100644 --- a/src/dcd_mapping/resources.py +++ b/src/dcd_mapping/resources.py @@ -28,6 +28,7 @@ "get_scoreset_urns", "get_human_urns", "get_raw_scoreset_metadata", + "get_scoreset_records", "get_scoreset_metadata", "get_human_urns", "get_ref_genome_file", diff --git a/src/dcd_mapping/schemas.py b/src/dcd_mapping/schemas.py index 400aee9..7958213 100644 --- a/src/dcd_mapping/schemas.py +++ b/src/dcd_mapping/schemas.py @@ -125,13 +125,19 @@ class TxSelectResult(BaseModel): class VrsMapping(BaseModel): - """Define pre-post mapping pair structure for VRS-structured variations.""" + """Define pre-post mapping pair structure for VRS-structured variations. + + Probably need to add score and accession to make json writing easier + """ pre_mapping: Union[Allele, Haplotype] mapped: Union[Allele, Haplotype] class VrsMappingResult(BaseModel): - """Define response object from VRS mappings method.""" + """Define response object from VRS mappings method. + + Might not be necessary (should just be list of VrsMappings?) + """ variations: List[VrsMapping] diff --git a/src/dcd_mapping/vrs_map.py b/src/dcd_mapping/vrs_map.py index efb0916..c2feebf 100644 --- a/src/dcd_mapping/vrs_map.py +++ b/src/dcd_mapping/vrs_map.py @@ -1,20 +1,19 @@ """Map transcripts to VRS objects.""" import logging -from typing import List, Optional, Union +from typing import Dict, List, Optional, Union import click from Bio.Seq import Seq -from cool_seq_tool.schemas import AnnotationLayer -from ga4gh.core import sha512t24u +from cool_seq_tool.schemas import AnnotationLayer, Strand +from ga4gh.core import ga4gh_identify, sha512t24u from ga4gh.vrs._internal.models import ( Allele, Haplotype, - LiteralSequenceExpression, - SequenceLocation, ) +from ga4gh.vrs.dataproxy import SeqRepoDataProxy from ga4gh.vrs.normalize import normalize -from dcd_mapping.lookup import hgvs_to_vrs +from dcd_mapping.lookup import get_chromosome_identifier, get_seqrepo, hgvs_to_vrs from dcd_mapping.schemas import ( AlignmentResult, ScoreRow, @@ -35,6 +34,19 @@ class VrsMapError(Exception): """Raise in case of VRS mapping errors.""" +class SequenceStore(SeqRepoDataProxy): + """Stand-in for SeqRepo data proxy that first performs lookups against a temporary + local store. + + Outstanding questions: + --------------------- + * not totally sure what internal sequence storage looks like yet + """ + + # key sequence digest to sequence + local_sequences: Dict[str, str] = {} + + def _get_sequence_id(sequence: str) -> str: """Make GA4GH sequence identifier @@ -45,229 +57,227 @@ def _get_sequence_id(sequence: str) -> str: return sequence_id -def _make_dup_state() -> LiteralSequenceExpression: - """TODO make the crazy dup sequence thing +def _map_protein_coding( + metadata: ScoresetMetadata, + records: List[ScoreRow], + transcript: TxSelectResult, + align_result: AlignmentResult, +) -> VrsMappingResult: + """Perform mapping on protein coding experiment results - :return: + :param metadata: + :param records: + :param align_results: """ - # 2*str(sr[str(allele.location.sequence_id)][allele.location.start.value:allele.location.end.value]) - raise NotImplementedError + variations = VrsMappingResult(variations=[]) + sequence_id = _get_sequence_id(metadata.target_sequence) + + # TODO get some clarification on this stuff + if len(set(str(metadata.target_sequence))) > 4: + formatted_sequence = str(metadata.target_sequence) + else: + formatted_sequence = str( + Seq(metadata.target_sequence).translate(table="1") + ).replace("*", "") + sequence_id = _get_sequence_id(formatted_sequence) + sequence_store = SequenceStore(sr=get_seqrepo().sr) + sequence_store.local_sequences[sequence_id] = formatted_sequence + for row in records: + if len(row.hgvs_pro) == 3: # TODO figure out why? + _logger.warning( + f"Skipping HGVS protein variation string '{row.hgvs_pro}' for {metadata.urn}" + ) + continue + if row.hgvs_pro in {"_wt", "_sy"}: + _logger.warning( + f"Can't process Enrich2-style variant syntax {row.hgvs_nt} for {metadata.urn}" + ) + continue + if row.hgvs_pro.startswith("NP"): + variations.variations.append( + VrsMapping( + pre_mapping=hgvs_to_vrs(row.hgvs_pro, {}), # TODO handle alias dict + mapped=hgvs_to_vrs(row.hgvs_pro, {}), + ) + ) + else: + # TODO there's a branching here based on whether the NP accession starts with "N" + # not clear to me if it's necessary? + layer = AnnotationLayer.PROTEIN + hgvs_strings = _create_hgvs_strings(align_result, row.hgvs_pro, layer) + variations.variations.append( + VrsMapping( + pre_mapping=_get_haplotype_allele( + hgvs_strings, + layer, + sequence_id, + formatted_sequence, + align_result, + True, + ), + mapped=_get_haplotype_allele( + hgvs_strings, + layer, + sequence_id, + formatted_sequence, + align_result, + False, + transcript.start, + ), + ) + ) + # technically no need to do a second loop through records, but this lets us + # keep control flow distinct for pro and nt variant strings + layer = AnnotationLayer.GENOMIC + for row in records: + # try NT variation + if "97" in metadata.urn: + _logger.warning(f"Skipping hgvs_nt for {metadata.urn}") + continue # TODO more information about this + elif row.hgvs_nt in {"_wt", "_sy"}: + _logger.warning( + f"Can't process Enrich2-style variant syntax {row.hgvs_nt} for {metadata.urn}" + ) + continue + hgvs_strings = _create_hgvs_strings(align_result, row.hgvs_nt, layer) + variations.variations.append( + VrsMapping( + pre_mapping=_get_haplotype_allele( + hgvs_strings, + layer, + sequence_id, + formatted_sequence, + align_result, + True, + ), + mapped=_get_haplotype_allele( + hgvs_strings, + layer, + sequence_id, + formatted_sequence, + align_result, + False, + ), + ) + ) + return variations -def _update_allele_coords(allele: Allele, hits, ranges) -> Allele: # noqa - """TODO - :param allele: - :param hits: - :return: - """ - """ - start = allele.location.start.value - if len(hits) == 1 and strand == 1: - i = 0 - diff = start - hits[i][0] - diff2 = allele.location.end.value - start - allele.location.start.value = ranges[i][0] + diff - allele.location.end.value = allele.location.start.value + diff2 +def _create_hgvs_strings( + alignment: AlignmentResult, raw_description: str, layer: AnnotationLayer +) -> List[str]: + chrom_ac = get_chromosome_identifier(alignment.chrom) + if chr(91) in raw_description: + descr_list = list(set(raw_description[1:-1].split(";"))) else: - for i in range(len(hits)): - if start >= hits[i][0] and start < hits[i][1]: - break - diff = start - hits[i][0] - diff2 = allele.location.end.value - start - if strand == 1: # positive orientation - allele.location.start.value = ranges[i][0] + diff - allele.location.end.value = allele.location.start.value + diff2 - if 'dup' in hgvs_string: - allele.state.sequence = 2*str(sr[str(allele.location.sequence_id)][allele.location.start.value:allele.location.end.value]) - else: - allele.location.start.value = ranges[i][1] - diff - diff2 - allele.location.end.value = allele.location.start.value + diff2 - if 'dup' in hgvs_string: - allele.state.sequence = 2*str(sr[str(allele.location.sequence_id)][allele.location.start.value:allele.location.end.value]) - allele.state.sequence = str(Seq(str(allele.state.sequence)).reverse_complement()) + descr_list = [raw_description] + hgvs_strings = [f"{chrom_ac}:{layer.value}.{d}" for d in descr_list] + return hgvs_strings + + +def _get_allele_sequence(allele: Allele) -> str: + """Get sequence for Allele + + :param allele: VRS allele + :return: sequence """ - raise NotImplementedError + sr = get_seqrepo() + start = allele.location.start # type: ignore + end = allele.location.end # type: ignore + base = sr.sr[allele.location.sequenceReference.refgetAccession] # type: ignore + selection = base[start:end] + return selection def _get_haplotype_allele( - hgvs_pro: str, - ref_acc: str, - offset: int, - sequence_id: str, + hgvs_strings: List[str], layer: AnnotationLayer, - pre_map: bool = True, + sequence_id: str, + sequence: str, + alignment: AlignmentResult, + pre_map: bool, + offset: int = 0, ) -> Union[Allele, Haplotype]: - """TODO - - :param hgvs_pro: HGVS variation - :param ref_acc: reference accession ID - :param offset: offset from start of reference - :param sequence_id: GA4GH sequence ID - :param layer: - :param pre_map: is pre-mapping (? TODO not really sure of this verbiage) + """Create variation (haplotype/allele). + + Outstanding questions: + --------------------- + * Make a class to shadow the seqrepo data proxy that handles custom sequences + without adding them to the system seqrepo. As it currently stands, I wouldn't + expect this code to complete successfully. + * Do we really need to go through an HGVS string/the VRS translator? Can't we just + build the object ourselves? + * trim duplicate code + * simply args + + :param hgvs_strings: + :param layer: annotation layer + :param sequence_id: target sequence digest eg ``"ga4gh:SQ.SQ.jUOcLPDjSqWFEo9kSOG8ITe1dr9QK3h6" + :param sequence: target sequence + :param alignment: + :param pre_map: if True, return object for pre mapping stage. Otherwise return for + post-mapping. + :param offset: + :return: """ - description = hgvs_pro.split(".", maxsplit=1)[-1] - if chr(91) in description: - description = description[1:-1] - description_list = list(set(description.split(";"))) - else: - description_list = [description] - alleles = [] - for variation in description_list: - hgvs_string = f"{ref_acc}:{layer.value}.{variation}" - allele = hgvs_to_vrs(hgvs_string, {}) # TODO alias map - + sequence_store = SequenceStore(sr=get_seqrepo().sr) + sequence_store.local_sequences[sequence_id] = sequence + for hgvs_string in hgvs_strings: + allele = hgvs_to_vrs(hgvs_string, {}) # TODO add alias map? + if "dup" in hgvs_string: + allele.state.sequence = 2 * _get_allele_sequence(allele) # type: ignore if pre_map: - allele.location.sequence_id = sequence_id - if "dup" in hgvs_string: - allele.state = _make_dup_state() + allele.location.sequenceReference.refgetAccession = sequence_id # type: ignore else: - if ( - not isinstance(allele.location, SequenceLocation) - or not isinstance(allele.location.start, int) - or not isinstance(allele.location.end, int) - ): - # shouldn't be possible anyway - raise ValueError - if layer == AnnotationLayer.GENOMIC: - allele = _update_allele_coords(allele, hits, ranges) # noqa + if layer != AnnotationLayer.GENOMIC: + allele.location.start += offset # type: ignore + allele.location.end += offset # type: ignore else: - allele.location.start += offset - allele.location.end += offset - if "dup" in hgvs_string: - allele.state = _make_dup_state() - if allele.state.sequence == "N" and layer != AnnotationLayer.GENOMIC: - # TODO - """ - allele.state.sequence = str(sr[str(allele.location.sequence_id)][allele.location.start.value:allele.location.end.value]) - - """ - raise NotImplementedError - allele = normalize(allele) # TODO inject custom data proxy? + start: int = allele.location.start # type: ignore + if ( + len(alignment.query_subranges) == 1 + and alignment.strand == Strand.POSITIVE + ): + subrange_start = alignment.query_subranges[0].start + diff = start - subrange_start + diff2: int = allele.location.end - start # type: ignore + allele.location.start = subrange_start + diff + allele.location.end = allele.location.start + diff2 # type: ignore + else: + for query_subrange, hit_subrange in zip( + alignment.query_subranges, alignment.hit_subranges + ): + if start >= query_subrange.start and start < query_subrange.end: + query_subrange_start = query_subrange.start + hit_subrange_start = hit_subrange.start + break + else: + raise ValueError( + "Could not find hit subrange compatible with allele position" + ) + diff = start - query_subrange_start + diff2: int = allele.location.end - start # type: ignore + if alignment.strand == Strand.POSITIVE: # positive orientation + allele.location.start = hit_subrange_start + diff + allele.location.end = allele.location.start + diff2 # type: ignore + else: + allele.location.start = hit_subrange_start - diff - diff2 + allele.location.end = allele.location.start + diff2 # type: ignore + allele.state.sequence = str( + Seq(str(allele.state.sequence)).reverse_complement() + ) # type: ignore + if allele.state.sequence == "N" and layer != AnnotationLayer.PROTEIN: + allele.state.sequence = _get_allele_sequence(allele) # type: ignore + allele = normalize(allele, data_proxy=sequence_store) + allele.id = ga4gh_identify(allele) alleles.append(allele) - if len(alleles) == 1: # Not haplotype + if len(alleles) == 1: return alleles[0] else: - return Haplotype( - members=alleles, - id=None, - label=None, - description=None, - digest=None, - type="Haplotype", - ) - - -def _map_protein_coding( - metadata: ScoresetMetadata, - transcript: TxSelectResult, - records: List[ScoreRow], -) -> None: - """TODO - - * there's a ``stri`` variable set if there are >4 unique chars in the target sequence?? - - :param metadata: scoreset metadata - :param transcript: selected transcript - :param records: score results - :return: - """ - alias_map = {} - mappings_list = [] # noqa - var_ids_pre_map = [] - var_ids_post_map = [] - - target_sequence = str(Seq(metadata.target_sequence).translate(table="1")).replace( - "*", "" - ) - sequence_id = _get_sequence_id(target_sequence) - - scores = [row.score for row in records] # noqa - accessions = [row.accession for row in records] # noqa - - for row in records: - if not isinstance(row.hgvs_pro, str): - raise ValueError # should be impossible -- TODO remove - elif len(row.hgvs_pro) == 3 or row.hgvs_pro == "_wt" or row.hgvs_pro == "_sy": - raise ValueError # TODO should understand why this matters - elif row.hgvs_pro.startswith("NP"): - allele = hgvs_to_vrs(row.hgvs_pro, alias_map) - var_ids_pre_map.append(allele) - var_ids_post_map.append(allele) - else: - pre_allele = _get_haplotype_allele( - row.hgvs_pro, transcript.np, 0, sequence_id, AnnotationLayer.PROTEIN - ) - var_ids_pre_map.append(pre_allele) - if transcript.np.startswith("N"): - post_allele = _get_haplotype_allele( - row.hgvs_pro, - transcript.np, - transcript.start, - sequence_id, - AnnotationLayer.PROTEIN, - pre_map=False, - ) - else: - # TODO - # I don't think this ever happens because it's referencing variables that - # aren't set yet in the notebooks - raise ValueError - var_ids_post_map.append(post_allele) - - # TODO process NT column - """ - tempdat = pd.DataFrame({'pre_mapping': var_ids_pre_map, 'mapped': var_ids_post_map}) - mappings_list.append(tempdat) - scores_list.append(spro) - accessions_list.append(accpro) - - # Process nt column if data present - if vardat['hgvs_nt'].isnull().values.all() == False and '97' not in dat.at[i, 'urn']: - var_ids_pre_map = [] - var_ids_post_map = [] - - item = mave_blat_dict[dat.at[i, 'urn']] - ranges = get_locs_list(item['hits']) - hits = get_hits_list(item['hits']) - ref = get_chr(dp, item['chrom']) - ts = dat.at[i, 'target_sequence'] - strand = mave_blat_dict[dat.at[i, 'urn']]['strand'] - - digest = 'SQ.' + sha512t24u(ts.encode('ascii')) - alias_dict_list = [{'namespace': 'ga4gh', 'alias': digest}] - sr.store(ts, nsaliases = alias_dict_list) # Add custom digest to SeqRepo - - ntlist = vardat['hgvs_nt'] - varm = vardat['hgvs_pro'] - sn = [] - accn = [] - - for j in range(len(ntlist)): - if type(ntlist[j]) != str or ntlist[j] == '_wt' or ntlist[j] == '_sy': - continue - else: - try: - var_ids_pre_map.append(get_haplotype_allele(ntlist[j][2:], ref, 0, 'g', tr, dp, ts,'pre', ranges, hits, strand).as_dict()) - var_ids_post_map.append(get_haplotype_allele(ntlist[j][2:], ref, 0, 'g', tr, dp, ts,'post', ranges, hits, strand).as_dict()) - sn.append(scores[j]) - accn.append(accessions[j]) - except: - continue - - tempdat = pd.DataFrame({'pre_mapping': var_ids_pre_map, 'mapped': var_ids_post_map}) - mappings_list.append(tempdat) - scores_list.append(sn) - accessions_list.append(accn) - - vrs_mappings_dict[dat.at[i, 'urn']] = mappings_list - scores_dict_coding[dat.at[i, 'urn']] = scores_list - mavedb_ids_coding[dat.at[i, 'urn']] = accessions_list - - """ + return Haplotype(members=alleles) # type: ignore def _map_regulatory_noncoding( @@ -275,34 +285,48 @@ def _map_regulatory_noncoding( records: List[ScoreRow], align_result: AlignmentResult, ) -> VrsMappingResult: - """Return VRS alleles representing pre- and post-mapped variation objects (?) + """Perform mapping on noncoding/regulatory experiment results :param metadata: metadata for URN :param records: list of MAVE experiment result rows + :param align_result: :return: TODO """ variations = VrsMappingResult(variations=[]) sequence_id = _get_sequence_id(metadata.target_sequence) for row in records: - if row.hgvs_nt == "_wt" or row.hgvs_nt == "_sy": - raise ValueError # old MAVE-HGVS syntax + if row.hgvs_nt in {"_wt", "_sy"}: + _logger.warning( + f"Can't process Enrich2-style variant syntax {row.hgvs_nt} for {metadata.urn}" + ) + continue + hgvs_strings = _create_hgvs_strings( + align_result, row.hgvs_nt[2:], AnnotationLayer.GENOMIC + ) pre_map_allele = _get_haplotype_allele( - row.hgvs_nt[2:], align_result.chrom, 0, sequence_id, AnnotationLayer.GENOMIC - ) # TODO need query/hit ranges and strand for something - if isinstance(pre_map_allele, Haplotype): - breakpoint() # TODO investigate - raise NotImplementedError - post_map_allele = _get_haplotype_allele( - row.hgvs_nt[2:], - align_result.chrom, - 0, + hgvs_strings, + AnnotationLayer.GENOMIC, sequence_id, + metadata.target_sequence, + align_result, + pre_map=True, + offset=0, + ) + post_map_allele = _get_haplotype_allele( + hgvs_strings, AnnotationLayer.GENOMIC, - False, - ) # TODO need query/hit ranges and strand for something - if isinstance(post_map_allele, Haplotype): - breakpoint() # TODO investigate + sequence_id, + metadata.target_sequence, + align_result, + pre_map=True, + offset=0, + ) + if isinstance(post_map_allele, Haplotype) or isinstance( + pre_map_allele, Haplotype + ): + # TODO investigate + # not totally sure how to handle this raise NotImplementedError variations.variations.append( VrsMapping(pre_mapping=pre_map_allele, mapped=post_map_allele) @@ -321,10 +345,6 @@ def vrs_map( """Given a description of a MAVE scoreset and an aligned transcript, generate the corresponding VRS objects. - Todo: - ---- - * Workaround for storing objects in SeqRepo - :param metadata: salient MAVE scoreset metadata :param transcript: output of transcript selection process :param records: scoreset records @@ -336,8 +356,7 @@ def vrs_map( click.echo(msg) _logger.info(msg) if metadata.target_gene_category == TargetType.PROTEIN_CODING and transcript: - _ = _map_protein_coding(metadata, transcript, records) - result = None + result = _map_protein_coding(metadata, records, transcript, align_result) else: result = _map_regulatory_noncoding(metadata, records, align_result) return result