Skip to content

Commit

Permalink
StandardizeDataRki: filter for "PANGOLIN_LATEST"
Browse files Browse the repository at this point in the history
Based on <#476 (comment)>,
we can filter for `PANGOLIN_LATEST` to get the latest lineage assignment.

If none of the lineages are marked as `PANGOLIN_LATEST`, then just use
the first one in the list since this was the behavior before the change.
If there are multiple `PANGOLIN_LATEST` lineages, then just use the
first one and output a warning. I've removed the assertion because this
should not block the whole ncov-ingest workflow.

Resolves <#478>
  • Loading branch information
joverlee521 committed Jan 30, 2025
1 parent daf3cae commit b3515a2
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions lib/utils/transformpipeline/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,37 @@ class StandardizeDataRki(Transformer):

def __init__(self):
self.line_count = 1
self.pango_method = "PANGOLIN_LATEST"

def transform_value(self, entry: dict) -> dict:
entry['sequence'] = entry['sequence'].replace('\n', '')
entry['length'] = len(entry['sequence'])

# Pull out latest pango lineage from json blob
# Defaults to '?' if no lineages are available
# Currently this pulls the first entry, but we've added an assert statement to see if there are ever more than one entry
# At that time, we can loop over the json blob to find the latest pango lineage assignment
# If there are multiple "latest" lineages, then output a warning and just use the first one.
lineage_json_blob = json.loads(entry['pango_lineage'])

if len(lineage_json_blob) == 0:
entry['pango_lineage'] = '?'
else:
entry['pango_lineage'] = lineage_json_blob[0]['lineage']
assert len(lineage_json_blob)==1, f"RKI pango_lineage unexpectedly had more than one entry. rki_accession: {entry['rki_accession']}"
latest_lineage = [
lineage["lineage"]
for lineage in lineage_json_blob
if (lineage.get("method", "") == self.pango_method and
lineage.get("lineage") is not None)
]

if len(latest_lineage) == 0:
print(f"WARNING: RKI pango_lineage does not include the {self.pango_method!r} lineage, using first lineage in the list.")
entry['pango_lineage'] = lineage_json_blob[0]['lineage']
else:
if len(latest_lineage) > 1:
print(f"WARNING: RKI pango_lineage had more than one {self.pango_method!r} lineage "
f"for rki_accession {entry['rki_accession']!r}. "
"Using the first lineage in the list.")

entry['pango_lineage'] = latest_lineage[0]

# Normalize all string data to Unicode Normalization Form C, for
# consistent, predictable string comparisons.
Expand Down

0 comments on commit b3515a2

Please sign in to comment.