Skip to content

Commit

Permalink
Handle split enricher interfaces in nomenklatura
Browse files Browse the repository at this point in the history
  • Loading branch information
jbothma committed Nov 19, 2024
1 parent 4c682de commit fe1af3a
Showing 1 changed file with 114 additions and 25 deletions.
139 changes: 114 additions & 25 deletions nomenklatura/enrich/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import logging
from importlib import import_module
from typing import Iterable, Generator, Optional, Type, cast
from typing import Dict, Iterable, Generator, Optional, Type, cast

from nomenklatura.entity import CE
from nomenklatura.dataset import DS
from nomenklatura.cache import Cache
from nomenklatura.matching import DefaultAlgorithm
from nomenklatura.enrich.common import Enricher, EnricherConfig, BulkEnricher
from nomenklatura.enrich.common import (
Enricher,
EnricherConfig,
ItemEnricher,
BulkEnricher,
)
from nomenklatura.enrich.common import EnrichmentAbort, EnrichmentException
from nomenklatura.judgement import Judgement
from nomenklatura.resolver import Resolver
Expand Down Expand Up @@ -43,44 +48,128 @@ def make_enricher(
# nk dedupe -i entities-with-matches.json -r resolver.json
def match(
enricher: Enricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
if isinstance(enricher, BulkEnricher):
yield from get_bulk_matches(enricher, resolver, entities)
elif isinstance(enricher, ItemEnricher):
yield from get_itemwise_matches(enricher, resolver, entities)
else:
raise EnrichmentException("Invalid enricher type: %r" % enricher)


def get_itemwise_matches(
enricher: ItemEnricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
for entity in entities:
yield entity
try:
for match in enricher.match_wrapped(entity):
if entity.id is None or match.id is None:
continue
if not resolver.check_candidate(entity.id, match.id):
continue
if not entity.schema.can_match(match.schema):
continue
result = DefaultAlgorithm.compare(entity, match)
log.info("Match [%s]: %.2f -> %s", entity, result.score, match)
resolver.suggest(entity.id, match.id, result.score)
match.datasets.add(enricher.dataset.name)
match = resolver.apply(match)
yield match
match_result = match_item(entity, match, resolver, enricher.dataset)
if match_result is not None:
yield match_result
except EnrichmentException:
log.exception("Failed to match: %r" % entity)


def get_bulk_matches(
enricher: BulkEnricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
entity_lookup: Dict[str, CE] = {}
for entity in entities:
try:
enricher.load_wrapped(entity)
if entity.id is None:
raise EnrichmentException("Entity has no ID: %r" % entity)
if entity.id in entity_lookup:
raise EnrichmentException("Duplicate entity ID: %r" % entity.id)
entity_lookup[entity.id] = entity
except EnrichmentException:
log.exception("Failed to match: %r" % entity)
for entity_id, candidate_set in enricher.candidates():
entity = entity_lookup[entity_id.id]
try:
for match in enricher.match_candidates(entity, candidate_set):
match_result = match_item(entity, match, resolver, enricher.dataset)
if match_result is not None:
yield match_result
except EnrichmentException:
log.exception("Failed to match: %r" % entity)


def match_item(
entity: CE, match: CE, resolver: Resolver[CE], dataset: DS
) -> Optional[CE]:
if entity.id is None or match.id is None:
return None
if not resolver.check_candidate(entity.id, match.id):
return None
if not entity.schema.can_match(match.schema):
return None
result = DefaultAlgorithm.compare(entity, match)
log.info("Match [%s]: %.2f -> %s", entity, result.score, match)
resolver.suggest(entity.id, match.id, result.score)
match.datasets.add(dataset.name)
match = resolver.apply(match)
return match


# nk enrich -i entities.json -r resolver.json -o combined.json
def enrich(
enricher: Enricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
if isinstance(enricher, BulkEnricher):
yield from get_bulk_enrichments(enricher, resolver, entities)
elif isinstance(enricher, ItemEnricher):
yield from get_itemwise_enrichments(enricher, resolver, entities)
else:
raise EnrichmentException("Invalid enricher type: %r" % enricher)


def get_itemwise_enrichments(
enricher: ItemEnricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
for entity in entities:
try:
for match in enricher.match_wrapped(entity):
if entity.id is None or match.id is None:
continue
judgement = resolver.get_judgement(match.id, entity.id)
if judgement != Judgement.POSITIVE:
continue

log.info("Enrich [%s]: %r", entity, match)
for adjacent in enricher.expand_wrapped(entity, match):
adjacent.datasets.add(enricher.dataset.name)
adjacent = resolver.apply(adjacent)
yield adjacent
yield from enrich_item(enricher, entity, match, resolver)
except EnrichmentException:
log.exception("Failed to enrich: %r" % entity)


def get_bulk_enrichments(
enricher: BulkEnricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
entity_lookup: Dict[str, CE] = {}
for entity in entities:
try:
enricher.load_wrapped(entity)
if entity.id is None:
raise EnrichmentException("Entity has no ID: %r" % entity)
if entity.id in entity_lookup:
raise EnrichmentException("Duplicate entity ID: %r" % entity.id)
entity_lookup[entity.id] = entity
except EnrichmentException:
log.exception("Failed to match: %r" % entity)
for entity_id, candidate_set in enricher.candidates():
entity = entity_lookup[entity_id.id]
try:
for match in enricher.match_candidates(entity, candidate_set):
yield from enrich_item(enricher, entity, match, resolver)
except EnrichmentException:
log.exception("Failed to enrich: %r" % entity)


def enrich_item(
enricher: Enricher[DS], entity: CE, match: CE, resolver: Resolver[CE]
) -> Generator[CE, None, None]:
if entity.id is None or match.id is None:
return None
judgement = resolver.get_judgement(match.id, entity.id)
if judgement != Judgement.POSITIVE:
return None

log.info("Enrich [%s]: %r", entity, match)
for adjacent in enricher.expand_wrapped(entity, match):
adjacent.datasets.add(enricher.dataset.name)
adjacent = resolver.apply(adjacent)
yield adjacent

0 comments on commit fe1af3a

Please sign in to comment.