diff --git a/src/ctlssa/suggestions/logic/domains.py b/src/ctlssa/suggestions/logic/domains.py index dbd4d05..f1d2f4d 100644 --- a/src/ctlssa/suggestions/logic/domains.py +++ b/src/ctlssa/suggestions/logic/domains.py @@ -87,13 +87,21 @@ def __init__(self): self.new_data = [] self.auto_write_batch_size = settings.AUTO_WRITE_BATCH_SIZE + def clean_domain(self, domain: str) -> str: + # some normalization as a data source might be polluted, merklemap is and there might be an off-day in ct too + domain = domain.lower().strip() + + # some domains end with a dot, because DNS entries are stored like this. This is a slight leak/pollution in the + # dataset from merklemap. + return domain.removesuffix(".") + def add_domain(self, domain: str, processing_date: date): # wildcards do not exist in the hostname field, so no need to filter. - # some normalization as a data source might be polluted, merklemap is and there might be an off-day in ct too - domain = domain.lower().strip() + domain = self.clean_domain(domain) # we can use this shortcut as there are no two level top level domains in the dutch zones + # this approach will fall apart in case of .co.uk domains # the partition method is the fastest: rest, delimiter, suffix = domain.rpartition(".") subdomain, delimiter, domain = rest.rpartition(".") diff --git a/tests/test_ingest.py b/tests/test_ingest.py index e93de65..b80e004 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -1,3 +1,4 @@ +from ctlssa.suggestions.logic.domains import CaseOptimizedBulkInsert from ctlssa.suggestions.logic.ingest import add_domains, certstream_callback from ctlssa.suggestions.models import Domain @@ -159,3 +160,15 @@ def test_add_domains(db, caplog): # sourcery skip: extract-duplicate-method # test if logging works correctly, disabled due to flooding # assert "ingesting" in caplog.text + + +def test_merklemap_clean_domain(): + + cobi = CaseOptimizedBulkInsert() + + # test removal of last dot in polluted data + assert cobi.clean_domain("test.nu.nl.") == "test.nu.nl" + assert cobi.clean_domain("test.nu.nl") == "test.nu.nl" + + # test case and padding removal + assert cobi.clean_domain(" TEST.nu.nl ") == "test.nu.nl"