diff --git a/src/ga4gh/vrs/extras/annotator/vcf.py b/src/ga4gh/vrs/extras/annotator/vcf.py index 5a29b63f..e117a772 100644 --- a/src/ga4gh/vrs/extras/annotator/vcf.py +++ b/src/ga4gh/vrs/extras/annotator/vcf.py @@ -59,10 +59,11 @@ def __init__(self, data_proxy: _DataProxy) -> None: def _update_vcf_header( self, vcf: pysam.VariantFile, - info_field_num: str, - info_field_desc: str, + incl_ref_allele: bool, incl_vrs_attrs: bool, ) -> None: + info_field_num = "R" if incl_ref_allele else "A" + info_field_desc = "REF and ALT" if incl_ref_allele else "ALT" vcf.header.info.add( self.VRS_ALLELE_IDS_FIELD, info_field_num, @@ -96,6 +97,84 @@ def _update_vcf_header( f"The literal sequence states used for the GA4GH VRS Alleles corresponding to the GT indexes of the {info_field_desc} alleles", ) + def _process_vcf_row( + self, + record: pysam.VariantRecord, + vrs_data: dict, + assembly: str, + vrs_info_fields: list[str], + incl_vrs_attrs: bool, + incl_ref_allele: bool, + output_pickle: bool, + require_validation: bool, + ) -> dict: + """Compute VRS objects for a VCF row. + + Get VRS data for record's reference (if requested) and alt alleles. Return + INFO field values to annotate VCF row with. + + # TODO update these + :param record: A row in the VCF file + :param vrs_data: Dictionary containing the VRS object information for the VCF. + Will be mutated if `output_pickle = True` + :param assembly: The assembly used in `record` + :param additional_info_fields: Additional VRS fields to add in INFO field + :param vrs_attributes: If `True` will include VRS_Start, VRS_End, VRS_State + fields in the INFO field. If `False` will not include these fields. Only + used if `output_vcf` set to `True`. + :param output_pickle: If `True`, VRS pickle file will be output. + :param output_vcf: If `True`, annotated VCF file will be output. + :param compute_for_ref: If true, compute VRS IDs for the reference allele + :param require_validation: If `True` then validation checks must pass in + order to return a VRS object. A `DataProxyValidationError` will be raised if + validation checks fail. If `False` then VRS object will be returned even if + validation checks fail. Defaults to `True`. + :return: If `output_vcf = True`, a dictionary containing VRS Fields and list + of associated values. If `output_vcf = False`, an empty dictionary will be + returned. + """ + vrs_info_field_data = {field: [] for field in vrs_info_fields} + + # Get VRS data for reference allele + gnomad_loc = f"{record.chrom}-{record.pos}" + if incl_ref_allele: + reference_allele = f"{gnomad_loc}-{record.ref}-{record.ref}" + self._get_vrs_object( + reference_allele, + vrs_data, + vrs_info_field_data, + assembly, + output_pickle=output_pickle, + output_vcf=output_vcf, + vrs_attributes=incl_vrs_attrs, + require_validation=require_validation, + ) + + # Get VRS data for alts + alts = record.alts or [] + alleles = [f"{gnomad_loc}-{record.ref}-{a}" for a in [*alts]] + data = f"{record.chrom}\t{record.pos}\t{record.ref}\t{record.alts}" + for allele in alleles: + if "*" in allele: + _logger.debug("Star allele found: %s", allele) + if output_vcf: + for field in vrs_info_fields: + vrs_info_field_data[field].append("") + else: + self._get_vrs_object( + allele, + vrs_data, + vrs_info_field_data, + assembly, + vrs_data_key=data, + output_pickle=output_pickle, + output_vcf=output_vcf, + vrs_attributes=incl_vrs_attrs, + require_validation=require_validation, + ) + + return vrs_info_field_data + @use_ga4gh_compute_identifier_when(VrsObjectIdentifierIs.MISSING) def annotate( self, @@ -103,8 +182,8 @@ def annotate( output_vcf_path: Path | None = None, output_pkl_path: Path | None = None, incl_vrs_attrs: bool = False, + incl_ref_allele: bool = True, assembly: str = "GRCh38", - compute_for_ref: bool = True, require_validation: bool = True, ) -> None: """Given a VCF, produce an output VCF annotated with VRS allele IDs, and/or @@ -117,8 +196,9 @@ def annotate( attributes should be included in output VCF info field. These properties may be useful to retain outside of the VRS object for reasons like searchability. Does nothing if ``output_vcf_path`` left unset. + :param incl_ref_allele: If true, perform VRS ID computation for REF allele and + include the corresponding VRS object in any data dumps :param assembly: The assembly used in `vcf_in` data - :param compute_for_ref: If true, compute VRS IDs for the reference allele :param require_validation: If ``True``, validation checks (i.e., REF value matches expected REF at given location) must pass in order to return a VRS object for a record. If ``False`` then VRS object will be returned even if @@ -130,9 +210,8 @@ def annotate( raise VCFAnnotatorError(msg) vcf = pysam.VariantFile(filename=str(input_vcf_path.absolute())) - info_field_num = "R" if compute_for_ref else "A" - info_field_desc = "REF and ALT" if compute_for_ref else "ALT" - self._update_vcf_header(vcf, info_field_num, info_field_desc, incl_vrs_attrs) + if output_vcf_path: + self._update_vcf_header(vcf, incl_ref_allele, incl_vrs_attrs) vcf_out = ( pysam.VariantFile(str(output_vcf_path.absolute()), "w", header=vcf.header) @@ -142,30 +221,33 @@ def annotate( vrs_data = {} for record in vcf: - additional_info_fields = [self.VRS_ALLELE_IDS_FIELD] - if incl_vrs_attrs: - additional_info_fields += [ - self.VRS_STARTS_FIELD, - self.VRS_ENDS_FIELD, - self.VRS_STATES_FIELD, - ] + if vcf_out: + vrs_info_fields = [self.VRS_ALLELE_IDS_FIELD] + if incl_vrs_attrs: + vrs_info_fields += [ + self.VRS_STARTS_FIELD, + self.VRS_ENDS_FIELD, + self.VRS_STATES_FIELD, + ] + else: + vrs_info_fields = [] try: - vrs_field_data = self._get_vrs_data( + vrs_field_data = self._process_vcf_row( record, vrs_data, assembly, - additional_info_fields, + vrs_info_fields, incl_vrs_attrs=incl_vrs_attrs, + incl_ref_allele=incl_ref_allele, output_pickle=output_pickle, output_vcf=output_vcf, - compute_for_ref=compute_for_ref, require_validation=require_validation, ) except Exception as ex: _logger.exception("VRS error on %s-%s", record.chrom, record.pos) err_msg = f"{ex}" or f"{type(ex)}" err_msg = err_msg.translate(self.VCF_ESCAPE_MAP) - additional_info_fields = [self.VRS_ERROR_FIELD] + vrs_info_fields = [self.VRS_ERROR_FIELD] vrs_field_data = {self.VRS_ERROR_FIELD: [err_msg]} _logger.debug( @@ -175,7 +257,7 @@ def annotate( vrs_field_data, ) if output_vcf_path and vcf_out: - for k in additional_info_fields: + for k in vrs_info_fields: record.info[k] = [value or "." for value in vrs_field_data[k]] vcf_out.write(record) @@ -277,80 +359,3 @@ def _get_vrs_object( vrs_field_data[self.VRS_STARTS_FIELD].append(start) vrs_field_data[self.VRS_ENDS_FIELD].append(end) vrs_field_data[self.VRS_STATES_FIELD].append(alt) - - def _get_vrs_data( - self, - record: pysam.VariantRecord, - vrs_data: dict, - assembly: str, - additional_info_fields: list[str], - incl_vrs_attrs: bool, - output_pickle: bool, - output_vcf: bool, - compute_for_ref: bool, - require_validation: bool, - ) -> dict: - """Get VRS data for record's reference and alt alleles. - - :param record: A row in the VCF file - :param vrs_data: Dictionary containing the VRS object information for the VCF. - Will be mutated if `output_pickle = True` - :param assembly: The assembly used in `record` - :param additional_info_fields: Additional VRS fields to add in INFO field - :param vrs_attributes: If `True` will include VRS_Start, VRS_End, VRS_State - fields in the INFO field. If `False` will not include these fields. Only - used if `output_vcf` set to `True`. - :param output_pickle: If `True`, VRS pickle file will be output. - :param output_vcf: If `True`, annotated VCF file will be output. - :param compute_for_ref: If true, compute VRS IDs for the reference allele - :param require_validation: If `True` then validation checks must pass in - order to return a VRS object. A `DataProxyValidationError` will be raised if - validation checks fail. If `False` then VRS object will be returned even if - validation checks fail. Defaults to `True`. - :return: If `output_vcf = True`, a dictionary containing VRS Fields and list - of associated values. If `output_vcf = False`, an empty dictionary will be - returned. - """ - vrs_field_data = ( - {field: [] for field in additional_info_fields} if output_vcf else {} - ) - - # Get VRS data for reference allele - gnomad_loc = f"{record.chrom}-{record.pos}" - if compute_for_ref: - reference_allele = f"{gnomad_loc}-{record.ref}-{record.ref}" - self._get_vrs_object( - reference_allele, - vrs_data, - vrs_field_data, - assembly, - output_pickle=output_pickle, - output_vcf=output_vcf, - vrs_attributes=incl_vrs_attrs, - require_validation=require_validation, - ) - - # Get VRS data for alts - alts = record.alts or [] - alleles = [f"{gnomad_loc}-{record.ref}-{a}" for a in [*alts]] - data = f"{record.chrom}\t{record.pos}\t{record.ref}\t{record.alts}" - for allele in alleles: - if "*" in allele: - _logger.debug("Star allele found: %s", allele) - if output_vcf: - for field in additional_info_fields: - vrs_field_data[field].append("") - else: - self._get_vrs_object( - allele, - vrs_data, - vrs_field_data, - assembly, - vrs_data_key=data, - output_pickle=output_pickle, - output_vcf=output_vcf, - vrs_attributes=incl_vrs_attrs, - require_validation=require_validation, - ) - - return vrs_field_data