From 25598387406f5851398b818a2b1e6ec8132cf6d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Fri, 17 Nov 2023 18:01:35 +0100 Subject: [PATCH 1/3] Implement Experiment Results generation based on analysis = Overview of scope of this PR Inside of this PR I implement an Experiment Results analysis approach based on the analysis tables. The basic idea is to take the analysis keys that are generated by comparing an individual observation with ground truth data. Through a very large set of rules we are able to assign individual blocking, down and ok rules based on how confident we are in that particular signal being a sign for censorship. We then take all the scores pertaining to a particular observation group relevant to a measurement and generate a `MeasurementExperimentResult` which should be backward compatible with out existing PR. Based on this we add support for generating the experiment results based on the analysis inside of the `mkanalysis` command and a simple web interface for inspecting them. In terms of performance some cursory benchmarks were run the dataset from 2023-09-01 - 2023-11-01 and it was processing data at a rate of ~7k observations per second scaling on 34 cores. = Summary of changes * Implement Experiment Result generation based on the analysis tables * Implement minimal UI for MeasurementExperimentResult * Add support for generating MeasurementExperimentResult as part of mkanalysis cli command * Add more tests for all of the above --- oonidata/analysis/web_analysis.py | 24 +- .../analysis/website_experiment_results.py | 1009 +++++++++++++++++ oonidata/cli/command.py | 1 + oonidata/dataviz/templates/analysis.html | 250 ++++ oonidata/dataviz/web.py | 89 ++ oonidata/db/create_tables.py | 36 +- oonidata/models/analysis.py | 4 + oonidata/models/experiment_result.py | 26 +- oonidata/workers/analysis.py | 36 +- oonidata/workers/common.py | 33 +- oonidata/workers/observations.py | 6 + tests/test_analysis.py | 6 +- tests/test_cli.py | 10 +- tests/test_ctrl.py | 12 +- tests/test_experiment_results.py | 67 ++ tests/test_workers.py | 73 +- 16 files changed, 1615 insertions(+), 67 deletions(-) create mode 100644 oonidata/analysis/website_experiment_results.py create mode 100644 oonidata/dataviz/templates/analysis.html create mode 100644 tests/test_experiment_results.py diff --git a/oonidata/analysis/web_analysis.py b/oonidata/analysis/web_analysis.py index c7a8b253..041977a0 100644 --- a/oonidata/analysis/web_analysis.py +++ b/oonidata/analysis/web_analysis.py @@ -83,6 +83,7 @@ def encode_address(ip: str, port: Optional[int]) -> str: class TCPAnalysis: address: str success: bool + failure: Optional[str] ground_truth_failure_count: Optional[int] ground_truth_failure_asn_cc_count: Optional[int] @@ -105,6 +106,7 @@ def make_tcp_analysis( return TCPAnalysis( address=blocking_subject, success=True, + failure=None, ground_truth_failure_asn_cc_count=None, ground_truth_failure_count=None, ground_truth_ok_asn_cc_count=None, @@ -154,6 +156,7 @@ def make_tcp_analysis( return TCPAnalysis( address=blocking_subject, success=False, + failure=web_o.tcp_failure, ground_truth_failure_asn_cc_count=tcp_ground_truth_failure_asn_cc_count, ground_truth_failure_count=tcp_ground_truth_failure_count, ground_truth_ok_asn_cc_count=tcp_ground_truth_ok_asn_cc_count, @@ -261,6 +264,7 @@ class DNSConsistencyResults: is_answer_bogon: bool = False answer_fp_name: str = "" + answer_fp_scope: str = "" is_answer_fp_match: bool = False is_answer_fp_country_consistent: bool = False is_answer_fp_false_positive: bool = False @@ -318,6 +322,7 @@ def check_dns_consistency( # XXX in the event of multiple matches, we are overriding it with # the last value. It's probably OK for now. consistency_results.answer_fp_name = fp.name + consistency_results.answer_fp_scope = fp.scope or "" if not web_o.dns_engine or web_o.dns_engine in SYSTEM_RESOLVERS: # TODO: do the same thing for the non-system resolver @@ -460,18 +465,17 @@ def make_tls_analysis( handshake_time=web_o.tls_handshake_time, ) ground_truths = filter( - lambda gt: gt.http_request_url and gt.hostname == web_o.hostname, - web_ground_truths, + lambda gt: gt.ip == web_o.ip and gt.port == web_o.port, web_ground_truths ) failure_cc_asn = set() ok_cc_asn = set() for gt in ground_truths: # We don't check for strict == True, since depending on the DB engine # True could also be represented as 1 - if gt.http_success is None: + if gt.tls_success is None: continue - if gt.http_success: + if gt.tls_success: if gt.is_trusted_vp: tls_analysis.ground_truth_trusted_ok_count += gt.count else: @@ -510,6 +514,7 @@ class HTTPAnalysis: ground_truth_body_length: int = 0 fp_name: str = "" + fp_scope: str = "" is_http_fp_match: bool = False is_http_fp_country_consistent: bool = False is_http_fp_false_positive: bool = False @@ -589,6 +594,7 @@ def make_http_analysis( http_analysis.is_http_fp_country_consistent = True if fp.name: http_analysis.fp_name = fp.name + http_analysis.fp_scope = fp.scope if web_o.http_response_body_length: http_analysis.response_body_length = web_o.http_response_body_length @@ -638,7 +644,7 @@ def make_web_analysis( if web_o.ip: try: ipaddr = ipaddress.ip_address(web_o.ip) - # FIXME: for the moment we just ignore all IPv6 results, because they are too noisy + # TODO(arturo): for the moment we just ignore all IPv6 results, because they are too noisy if isinstance(ipaddr, ipaddress.IPv6Address): continue address = encode_address(web_o.ip, web_o.port) @@ -744,6 +750,9 @@ def make_web_analysis( website_analysis.dns_consistency_system_answer_fp_name = ( dns_analysis.consistency_system.answer_fp_name ) + website_analysis.dns_consistency_system_answer_fp_scope = ( + dns_analysis.consistency_system.answer_fp_scope + ) website_analysis.dns_consistency_system_is_answer_fp_match = ( dns_analysis.consistency_system.is_answer_fp_match ) @@ -835,6 +844,9 @@ def make_web_analysis( website_analysis.dns_consistency_other_answer_fp_name = ( dns_analysis.consistency_other.answer_fp_name ) + website_analysis.dns_consistency_other_answer_fp_scope = ( + dns_analysis.consistency_other.answer_fp_scope + ) website_analysis.dns_consistency_other_is_answer_fp_match = ( dns_analysis.consistency_other.is_answer_fp_match ) @@ -899,6 +911,7 @@ def make_web_analysis( if tcp_analysis: website_analysis.tcp_address = tcp_analysis.address website_analysis.tcp_success = tcp_analysis.success + website_analysis.tcp_failure = tcp_analysis.failure website_analysis.tcp_ground_truth_failure_count = ( tcp_analysis.ground_truth_failure_count ) @@ -954,6 +967,7 @@ def make_web_analysis( http_analysis.ground_truth_body_length ) website_analysis.http_fp_name = http_analysis.fp_name + website_analysis.http_fp_scope = http_analysis.fp_scope website_analysis.http_is_http_fp_match = http_analysis.is_http_fp_match website_analysis.http_is_http_fp_country_consistent = ( http_analysis.is_http_fp_country_consistent diff --git a/oonidata/analysis/website_experiment_results.py b/oonidata/analysis/website_experiment_results.py new file mode 100644 index 00000000..3c3f8431 --- /dev/null +++ b/oonidata/analysis/website_experiment_results.py @@ -0,0 +1,1009 @@ +from dataclasses import dataclass +from datetime import datetime +import logging +from typing import Dict, Generator, List, NamedTuple, Optional, Tuple + +from oonidata.models.analysis import WebAnalysis +from oonidata.models.experiment_result import MeasurementExperimentResult + +log = logging.getLogger("oonidata.analysis") + + +def map_analysis_to_target_name(analysis): + # Poormans mapping to target name + # TODO(arturo) we eventually want to look these up in some sort of database that groups together related domains + return analysis.target_domain_name.lstrip("www.") + + +NXDOMAIN_FAILURES = ["android_dns_cache_no_data", "dns_nxdomain_error"] + + +@dataclass +class OutcomeStatus: + key: str + value: float + scope: Optional[str] = None + + +@dataclass +class OutcomeSpace: + dns: Optional[OutcomeStatus] = None + tcp: Optional[OutcomeStatus] = None + tls: Optional[OutcomeStatus] = None + http: Optional[OutcomeStatus] = None + https: Optional[OutcomeStatus] = None + + def to_dict(self) -> Dict[str, float]: + d = {} + if self.dns: + d[self.dns.key] = self.dns.value + if self.tcp: + d[self.tcp.key] = self.tcp.value + if self.tls: + d[self.tls.key] = self.tls.value + if self.http: + d[self.http.key] = self.http.value + if self.https: + d[self.https.key] = self.https.value + return d + + def sum(self) -> float: + s = 0 + for _, val in self.to_dict().items(): + s += val + return s + + +@dataclass +class LoNI: + ok_final: float + + ok: OutcomeSpace + down: OutcomeSpace + blocked: OutcomeSpace + blocking_scope: Optional[str] + + def to_dict(self): + return { + "ok": self.ok.to_dict(), + "down": self.down.to_dict(), + "blocked": self.blocked.to_dict(), + "blocking_scope": self.blocking_scope, + "ok_final": self.ok_final, + } + + +def calculate_web_loni( + web_analysis: WebAnalysis, +) -> Tuple[LoNI, List[str]]: + ok_value = 0 + ok = OutcomeSpace() + down = OutcomeSpace() + blocked = OutcomeSpace() + + # TODO(arturo): make this not nullable + blocking_scope = None + analysis_transcript = [] + + # We start off not knowing anything. + # So basically without any additional information we may as well be rolling + # a 3 sided dice. + # Yes, you can make a 3 sided dice: https://upload.wikimedia.org/wikipedia/commons/3/3b/04ds3.JPG + blocked_key, down_key = None, None + ok_value, down_value, blocked_value = 0.33, 0.33, 0.33 + # We are in the case of a DNS query failing, i.e. we got no answer. + if web_analysis.dns_consistency_system_failure is not None: + """ + Relevant keys for this section of the analysis: + + web_analysis.dns_ground_truth_failure_cc_asn_count + web_analysis.dns_ground_truth_failure_count + web_analysis.dns_ground_truth_nxdomain_count + web_analysis.dns_ground_truth_nxdomain_cc_asn_count + web_analysis.dns_ground_truth_ok_count + web_analysis.dns_ground_truth_ok_cc_asn_count + """ + # For sure, no matter what, the target is having some issue. + ok_value = 0.0 + # We now need to figure out if the failure is because of the target + # being down or if it's blocked. + blocked_key, down_key = "dns", "dns" + blocked_value, down_value = 0.5, 0.5 + dns_ground_truth_failure_count = ( + web_analysis.dns_ground_truth_failure_count or 0 + ) + dns_ground_truth_ok_count = web_analysis.dns_ground_truth_ok_count or 0 + dns_ground_truth_failure_cc_asn_count = ( + web_analysis.dns_ground_truth_failure_cc_asn_count or 0 + ) + dns_ground_truth_ok_cc_asn_count = ( + web_analysis.dns_ground_truth_ok_cc_asn_count or 0 + ) + + # Without any additional information, it could be 50/50 + if web_analysis.dns_consistency_system_failure in NXDOMAIN_FAILURES: + # NXDOMAIN errors are more commonly associated with censorship, let's bump up the blocked_value + blocked_key, down_key = "dns.nxdomain", "dns.nxdomain" + blocked_value = 0.6 + down_value = 1 - blocked_value + analysis_transcript.append( + "web_analysis.dns_consistency_system_failure in NXDOMAIN_FAILURES" + ) + if dns_ground_truth_failure_count > dns_ground_truth_ok_count: + # It's failing more than it's succeeding. This smells like an unreliable site. + blocked_value = 0.3 + down_value = 1 - blocked_value + analysis_transcript.append( + "dns_ground_truth_failure_count > dns_ground_truth_ok_count" + ) + if ( + dns_ground_truth_failure_cc_asn_count + > dns_ground_truth_ok_cc_asn_count + ): + # Even more if that is happening globally + blocked_value = 0.2 + down_value = 1 - blocked_value + analysis_transcript.append( + "dns_ground_truth_failure_cc_asn_count > dns_ground_truth_ok_cc_asn_count" + ) + elif ( + dns_ground_truth_ok_count > 0 + and web_analysis.dns_ground_truth_nxdomain_count == 0 + and web_analysis.dns_ground_truth_nxdomain_cc_asn_count == 0 + ): + # If we never saw a single NXDOMAIN in our ground truth, then + # it's really fishy. Let's bump up the blocking reason. + # TODO(arturo): when we introduce web_obs based ground truthing, + # we should use a threshold here based on the number of metrics + analysis_transcript.append( + "dns_ground_truth_ok_count > 0 and web_analysis.dns_ground_truth_nxdomain_count == 0 and web_analysis.dns_ground_truth_nxdomain_cc_asn_count == 0" + ) + blocked_value = 0.75 + down_value = 1 - blocked_value + else: + analysis_transcript.append( + "web_analysis.dns_consistency_system_failure not in NXDOMAIN_FAILURES" + ) + if dns_ground_truth_failure_count > dns_ground_truth_ok_count: + analysis_transcript.append( + "dns_ground_truth_failure_count > dns_ground_truth_ok_count" + ) + # it's failing more than it's succeeding, more likely to be blocked. + blocked_key, down_key = "dns.failure", "dns.failure" + blocked_value = 0.6 + down_value = 1 - blocked_value + + elif len(web_analysis.dns_consistency_system_answers) > 0: + analysis_transcript.append( + "len(web_analysis.dns_consistency_system_answers) > 0" + ) + # Ok we got some answers. Now we need to figure out if what we got is a + # good answer. + blocked_key = "dns" + down_key = "dns" + blocked_value = 0.5 + ok_value = 0.5 + # No matter what happens, it's not gonna be flagged as down + down_value = 0 + if web_analysis.dns_consistency_system_is_answer_tls_consistent == True: + # "easy" case: we got a TLS consistent answer we can flag it as good + # and move on with our business. + # + # XXX(arturo): there is an important caveat here. We have seen + # cases where you get a surely wrong answer via DNS (eg. a bogon), + # but for some reason you are then still establishing a good TLS + # handshake. Technically it's probably OK to mark these as unblocked + # since eventually you do get the content, but it's worth pointing + # it out. + # Here is a sample measurement for this case: https://explorer.ooni.org/m/20230101013014.694846_AE_webconnectivity_f9f1078ce75936a1 + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_tls_consistent == True" + ) + blocked_value = 0 + down_value = 0 + ok_value = 1 + elif ( + web_analysis.dns_consistency_system_is_answer_fp_match == True + and web_analysis.dns_consistency_system_is_answer_fp_false_positive == False + ): + # TODO(arturo): will we eventually have false positives in the DNS? If so how do we handle them? + # We matched a signature known to be used to implemented censorship. We can mark this as confirmed blocked. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_fp_match == True and web_analysis.dns_consistency_system_is_answer_fp_false_positive == False" + ) + blocked_key = "dns.confirmed" + blocking_scope = web_analysis.dns_consistency_system_answer_fp_scope + blocked_value = 0.9 + if ( + web_analysis.dns_consistency_system_is_answer_fp_country_consistent + == True + ): + blocked_key = "dns.confirmed.country_consistent" + blocked_value = 1.0 + elif ( + web_analysis.dns_consistency_system_is_answer_fp_country_consistent + == False + ): + # We let the blocked value be slightly less for cases where the fingerprint is not country consistent + blocked_key = "dns.confirmed.not_country_consistent" + blocked_value = 0.8 + ok_value = 0 + down_value = 0 + elif web_analysis.dns_consistency_system_is_answer_bogon == True: + # Bogons are always fishy, yet we don't know if we see it because + # the site is misconfigured. + # In any case a bogon is not a routable IP, so the target is either + # down or blocked. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_bogon == True" + ) + blocked_key, down_key = "dns.bogon", "dns.bogon" + blocked_value = 0.5 + down_value = 0.5 + ok_value = 0 + if ( + web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers + == False + ): + # If we didn't see the bogon in the trusted answers, then it's probably censorship + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers == False" + ) + blocked_value = 0.8 + down_value = 1 - blocked_value + ok_value = 0 + elif ( + web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers + == True + ): + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers == True" + ) + # If we did see it in the trusted answers, then we should actually ignore this case and mark the target as down. + blocked_value = 0.2 + down_value = 1 - blocked_value + ok_value = 0 + elif ( + web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers == True + ): + # Direct hit of the IP in the trusted answers. We nothing to see here. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers == True" + ) + blocked_value = 0.1 + ok_value = 1 - blocked_value + down_value = 0 + elif ( + web_analysis.dns_consistency_system_is_answer_asn_in_trusted_answers == True + or web_analysis.dns_consistency_system_is_answer_asorg_in_trusted_answers + == True + ): + # The ASN or AS org name of the observation matches that of our control. Most likely this is not a case of blocking. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_asn_in_trusted_answers == True" + ) + blocked_value = 0.2 + ok_value = 1 - blocked_value + down_value = 0 + if web_analysis.dns_consistency_other_is_answer_cloud_provider == True: + # We are even more confident about it not being blocked if it's a cloud provider + analysis_transcript.append( + "web_analysis.dns_consistency_other_is_answer_cloud_provider == True" + ) + blocked_value = 0.1 + ok_value = 1 - blocked_value + down_value = 0 + else: + # We are done with all the simpler cases. We can now move into the + # more sketchy dubious analysis strategies. + # We assume that if we weren't able to determine consistency through + # the several previous methods, we will air on the side of saying + # it's blocked, but marginally. + analysis_transcript.append("not a_simple_case") + blocked_value = 0.6 + ok_value = 1 - blocked_value + down_value = 0 + # TODO(arturo): if we ever add false positive fingerprints to DNS we + # should add case for them here. + if web_analysis.dns_consistency_system_is_answer_probe_cc_match == True: + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_probe_cc_match == True" + ) + # It's common for blockpages to be hosted in the country of where the blocking is happening, let's bump up the blocking score. + blocked_key = "dns.inconsistent" + blocked_value = 0.65 + ok_value = 1 - blocked_value + if ( + web_analysis.dns_consistency_system_is_answer_cloud_provider + == False + ): + # If it's not a cloud provider, even more a reason to believe that's the case. + # TODO(arturo): add a new metric which tells us if the + # target domain is being hosted on a cloud provider and use + # that instead since this metric here will actually never be set to false + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_cloud_provider == False" + ) + blocked_value = 0.75 + ok_value = 1 - blocked_value + elif ( + web_analysis.dns_consistency_system_is_answer_cloud_provider == True + ): + # If it's a cloud provider, this is probably a false positive. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_cloud_provider == True" + ) + blocked_key = "dns.cloud" + blocked_value = 0.3 + ok_value = 1 - blocked_value + elif ( + web_analysis.dns_consistency_system_is_answer_probe_asn_match + == True + ): + # It's not a cloud provider, but it's in the same network. Somethings up. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_probe_asn_match == True" + ) + blocked_value = 0.7 + ok_value = 1 - blocked_value + + if blocked_key and down_key: + # We have finished the DNS analysis. We can store the + # final analysis to blocked and down dictionaries. + blocked.dns = OutcomeStatus( + key=blocked_key, value=blocked_value, scope=blocking_scope + ) + down.dns = OutcomeStatus(key=down_key, value=down_value) + ok.dns = OutcomeStatus(key="dns", value=ok_value) + assert ( + round(blocked.sum() + down.sum() + ok_value) == 1 + ), f"{blocked} + {down} + {ok_value} != 1" + if ok_value < 0.5: + # If the DNS analysis is leading us to believe the target is more down + # or blocked, than OK, we better off just call it day and return early. + # If we don't know if the answer we got was DNS consistent, we can't + # really trust the TCP and TLS analysis results. + # TODO(arturo): do we want to have different thresholds here? + analysis_transcript.append(f"ok_value < 0.5 # OK is {ok_value} after DNS") + return ( + LoNI( + ok_final=ok_value, + ok=ok, + blocked=blocked, + down=down, + blocking_scope=blocking_scope, + ), + analysis_transcript, + ) + + # TODO(arturo): convert paper notes into proof to go in here + if web_analysis.tcp_success == True: + # We succeeded via TCP, no matter what there are no TCP level issues + blocked_key, down_key = "tcp", "tcp" + down_value, blocked_value = 0.0, 0.0 + blocked.tcp = OutcomeStatus(key=blocked_key, value=blocked_value) + down.tcp = OutcomeStatus(key=down_key, value=down_value) + + elif web_analysis.tcp_success == False: + analysis_transcript.append("web_analysis.tcp_success == False") + # No matter what the target is + blocked_key, down_key = "tcp.failure", "tcp.failure" + + down_value, blocked_value = 0.5, 0.5 + tcp_ground_truth_failure_count = ( + web_analysis.tcp_ground_truth_trusted_failure_count or 0 + ) + # TODO(arturo): Here we are only using the trusted ground truths (i.e. the control measurements) + # eventually we want to switch to using other OONI measurements too. + tcp_ground_truth_ok_count = web_analysis.tcp_ground_truth_trusted_ok_count or 0 + tcp_ground_truth_failure_asn_cc_count = ( + web_analysis.tcp_ground_truth_failure_asn_cc_count or 0 + ) + tcp_ground_truth_ok_asn_cc_count = ( + web_analysis.tcp_ground_truth_ok_asn_cc_count or 0 + ) + if tcp_ground_truth_failure_count > tcp_ground_truth_ok_count: + analysis_transcript.append( + "tcp_ground_truth_failure_count > tcp_ground_truth_ok_count" + ) + # It's failing more than it's succeeding. Probably the site is unreliable + blocked_value = 0.3 + down_value = 1 - blocked_value + if tcp_ground_truth_failure_asn_cc_count > tcp_ground_truth_ok_asn_cc_count: + analysis_transcript.append( + "tcp_ground_truth_failure_asn_cc_count > tcp_ground_truth_ok_asn_cc_count" + ) + + # Even more if it's happening globally + blocked_value = 0.2 + down_value = 1 - blocked_value + elif tcp_ground_truth_ok_count > tcp_ground_truth_failure_count: + analysis_transcript.append( + "tcp_ground_truth_ok_count > tcp_ground_truth_failure_count" + ) + # OTOH, if it's mostly working, then this is a sign of blocking + blocked_value = 0.7 + down_value = 1 - blocked_value + if web_analysis.tcp_failure == "connection_reset": + analysis_transcript.append( + 'web_analysis.tcp_failure == "connection_reset"' + ) + # Connection reset is very fishy. Let's bump up the blocking value. + blocked_value = 0.8 + down_value = 1 - blocked_value + elif web_analysis.tcp_failure == "connection_reset": + analysis_transcript.append('web_analysis.tcp_failure == "connection_reset"') + # Connection reset is very fishy. Let's bump up the blocking value. + blocked_value = 0.7 + down_value = 1 - blocked_value + + # Let's set some nice blocking keys + if web_analysis.tcp_failure in ["generic_timeout_error", "timed_out"]: + blocked_key, down_key = "tcp.timeout", "tcp.timeout" + elif web_analysis.tcp_failure == "connection_reset": + blocked_key, down_key = "tcp.connection_reset", "tcp.connection_reset" + else: + blocked_key = f"{blocked_key}.{web_analysis.tcp_failure}" + down_key = f"{down_key}.{web_analysis.tcp_failure}" + + blocked.tcp = OutcomeStatus(key=blocked_key, value=blocked_value * ok_value) + down.tcp = OutcomeStatus(key=down_key, value=down_value * ok_value) + # TODO(arturo): double check this is correct + ok.tcp = OutcomeStatus(key="tcp", value=1 - (blocked.sum() + down.sum())) + + if blocked_key and down_key: + old_ok_value = ok_value + ok_value = 1 - (blocked.sum() + down.sum()) + assert ( + round(blocked.sum() + down.sum() + ok_value) == 1 + ), f"{blocked} + {down} + {ok_value} != 1" + + if ok_value < 0.5: + # If the TCP analysis is leading us to believe the target is more down + # or blocked, than OK, we better off just call it day and return early. + # TODO(arturo): How should we map multiple failure types? This is OK for + # web 0.4, but doesn't apply to wc 0.5 + analysis_transcript.append( + f"ok_value < 0.5 # OK went after TCP from {old_ok_value} -> {ok_value}" + ) + return ( + LoNI( + ok_final=ok_value, + ok=ok, + blocked=blocked, + down=down, + blocking_scope=blocking_scope, + ), + analysis_transcript, + ) + + if web_analysis.tls_success == True: + blocked_key, down_key = "tls", "tls" + down_value, blocked_value = 0.0, 0.0 + blocked.tls = OutcomeStatus(key=blocked_key, value=blocked_value) + down.tls = OutcomeStatus(key=down_key, value=down_value) + + elif web_analysis.tls_success == False: + analysis_transcript.append("web_analysis.tls_success == False") + # No matter what we are in a tls failure case + blocked_key, down_key = "tls.failure", "tls.failure" + + down_value, blocked_value = 0.5, 0.5 + + # TODO(arturo): Here we are only using the trusted ground truths (i.e. + # the control measurements) eventually we want to switch to using other + # OONI measurements too. + tls_ground_truth_failure_count = ( + web_analysis.tls_ground_truth_trusted_failure_count or 0 + ) + tls_ground_truth_ok_count = web_analysis.tls_ground_truth_trusted_ok_count or 0 + tls_ground_truth_failure_asn_cc_count = ( + web_analysis.tls_ground_truth_failure_asn_cc_count or 0 + ) + tls_ground_truth_ok_asn_cc_count = ( + web_analysis.tls_ground_truth_ok_asn_cc_count or 0 + ) + if tls_ground_truth_failure_count > tls_ground_truth_ok_count: + analysis_transcript.append( + "tls_ground_truth_failure_count > tls_ground_truth_ok_count" + ) + # It's failing more than it's succeeding. Probably the site is unreliable + blocked_value = 0.3 + down_value = 1 - blocked_value + if tls_ground_truth_failure_asn_cc_count > tls_ground_truth_ok_asn_cc_count: + analysis_transcript.append( + "tls_ground_truth_failure_asn_cc_count > tls_ground_truth_ok_asn_cc_count" + ) + # Even more if it's happening globally + blocked_value = 0.2 + down_value = 1 - blocked_value + elif tls_ground_truth_ok_count > tls_ground_truth_failure_count: + analysis_transcript.append( + "tls_ground_truth_ok_count > tls_ground_truth_failure_count" + ) + # OTOH, if it's mostly working, then this is a sign of blocking + blocked_value = 0.7 + down_value = 1 - blocked_value + if web_analysis.tls_is_tls_certificate_invalid == True: + analysis_transcript.append( + "web_analysis.tls_is_tls_certificate_invalid == True" + ) + # bad certificate is very fishy. Let's bump up the blocking value. + blocked_value = 0.9 + down_value = 1 - blocked_value + elif web_analysis.tls_failure == "connection_reset": + # bad certificate is very fishy. Let's bump up the blocking value. + analysis_transcript.append( + "web_analysis.tls_failure == 'connection_reset'" + ) + blocked_value = 0.8 + down_value = 1 - blocked_value + + elif web_analysis.tls_is_tls_certificate_invalid == True: + analysis_transcript.append( + "web_analysis.tls_is_tls_certificate_invalid == True" + ) + # bad certificate is very fishy. Let's bump up the blocking value. + blocked_value = 0.8 + down_value = 1 - blocked_value + elif web_analysis.tls_failure == "connection_reset": + # connection_reset very fishy. Let's bump up the blocking value. + analysis_transcript.append("web_analysis.tls_failure == 'connection_reset'") + blocked_value = 0.7 + down_value = 1 - blocked_value + + # Let's set some nice blocking keys + if web_analysis.tls_failure in ["generic_timeout_error", "timed_out"]: + blocked_key, down_key = "tls.timeout", "tls.timeout" + elif web_analysis.tls_failure == "connection_reset": + blocked_key, down_key = "tls.connection_reset", "tls.connection_reset" + else: + blocked_key = f"{blocked_key}.{web_analysis.tls_failure}" + down_key = f"{down_key}.{web_analysis.tls_failure}" + + blocked.tls = OutcomeStatus(key=blocked_key, value=blocked_value * ok_value) + down.tls = OutcomeStatus(key=down_key, value=down_value * ok_value) + # TODO(arturo): double check this is correct + ok.tls = OutcomeStatus(key="tls", value=1 - (blocked.sum() + down.sum())) + + if blocked_key and down_key: + old_ok_value = ok_value + ok_value = 1 - (blocked.sum() + down.sum()) + assert ( + round(blocked.sum() + down.sum() + ok_value) + ) == 1, f"{blocked} + {down} + {ok_value} != 1" + + if ok_value < 0.5: + # If the TLS analysis is leading us to believe the target is more down + # or blocked, than OK, we better off just call it day and return early. + analysis_transcript.append( + f"ok_value < 0.5 # OK went after TLS from {old_ok_value} -> {ok_value}" + ) + return ( + LoNI( + ok_final=ok_value, + ok=ok, + blocked=blocked, + down=down, + blocking_scope=blocking_scope, + ), + analysis_transcript, + ) + + if web_analysis.http_is_http_request_encrypted is not None: + # If the connection is encrypted we will map these to TLS failures, + # since they are equivalent to the TLS level anomalies. + prefix = "http" + if web_analysis.http_is_http_request_encrypted == True: + prefix = "tls" + + # This is the special case to handle the situation where the HTTP + # analysis happens on it's own. Our prior is set to 1.0 + # TODO(arturo): add more details on why this works + if not blocked_key and not down_key: + ok_value = 1.0 + + blocked_key, down_key = prefix, prefix + + if ( + web_analysis.http_is_http_request_encrypted == True + and web_analysis.http_success == True + ): + analysis_transcript.append( + "web_analysis.http_is_http_request_encrypted == True and web_analysis.http_success == True" + ) + down_value, blocked_value = 0.0, 0.0 + + elif ( + web_analysis.http_is_http_request_encrypted == False + and web_analysis.http_success == True + ): + down_value = 0.0 + # We got an answer via HTTP, yet we don't know if the answer is correct. + analysis_transcript.append( + "web_analysis.http_is_http_request_encrypted == False and web_analysis.http_success == True" + ) + if web_analysis.http_is_http_fp_match == True: + # It matches a known fingerprint, we can say stuff + analysis_transcript.append("web_analysis.http_is_http_fp_match == True") + if web_analysis.http_is_http_fp_false_positive == False: + # We matched a signature known to be used to implemented censorship. We can mark this as confirmed blocked. + analysis_transcript.append( + "web_analysis.http_is_http_fp_false_positive == False" + ) + blocked_key = "http.confirmed" + blocking_scope = web_analysis.http_fp_scope + blocked_value = 0.9 + if web_analysis.http_is_http_fp_country_consistent == True: + analysis_transcript.append( + "web_analysis.http_is_http_fp_country_consistent == True" + ) + blocked_key = "http.confirmed.country_consistent" + blocked_value = 1.0 + elif web_analysis.http_is_http_fp_country_consistent == False: + # We let the blocked value be slightly less for cases where the fingerprint is not country consistent + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_fp_country_consistent == False" + ) + blocked_key = "http.confirmed.not_country_consistent" + blocked_value = 0.8 + elif web_analysis.http_is_http_fp_false_positive == True: + blocked_value = 0.0 + else: + # We need to apply some fuzzy logic to fingerprint it + # TODO(arturo): in the future can use more features, such as the following + """ + web_analysis.http_response_status_code + web_analysis.http_response_body_proportion + web_analysis.http_response_body_length + web_analysis.http_ground_truth_body_length + """ + http_response_body_length = web_analysis.http_response_body_length or 0 + http_ground_truth_body_length = ( + web_analysis.http_ground_truth_body_length or 0 + ) + body_proportion = (http_response_body_length + 1) / ( + http_ground_truth_body_length + 1 + ) + if body_proportion < 0.7: + analysis_transcript.append( + "(http_response_body_length + 1)/ (http_ground_truth_body_length + 1) < 0.7" + ) + blocked_key = "http.inconsistent.body_length_mismatch" + blocked_value = 0.7 + # TODO(arturo): check if this indeed has the desired effect. + down_value = 0 + + elif web_analysis.http_failure: + analysis_transcript.append(f"web_analysis.http_failure # ok: {ok_value}") + # No matter what we are in a failure case + + blocked_key, down_key = f"{prefix}.failure", f"{prefix}.failure" + down_value, blocked_value = 0.5, 0.5 + + # TODO(arturo): Here we are only using the trusted ground truths (i.e. + # the control measurements) eventually we want to switch to using other + # OONI measurements too. + https_ground_truth_failure_count = ( + web_analysis.http_ground_truth_trusted_failure_count or 0 + ) + https_ground_truth_ok_count = ( + web_analysis.http_ground_truth_trusted_ok_count or 0 + ) + https_ground_truth_failure_asn_cc_count = ( + web_analysis.http_ground_truth_failure_asn_cc_count or 0 + ) + https_ground_truth_ok_asn_cc_count = ( + web_analysis.http_ground_truth_ok_asn_cc_count or 0 + ) + if https_ground_truth_failure_count > https_ground_truth_ok_count: + analysis_transcript.append( + "https_ground_truth_failure_count > https_ground_truth_ok_count" + ) + # It's failing more than it's succeeding. Probably the site is unreliable + blocked_value = 0.3 + down_value = 0.7 + if ( + https_ground_truth_failure_asn_cc_count + > https_ground_truth_ok_asn_cc_count + ): + analysis_transcript.append( + "https_ground_truth_failure_asn_cc_count > https_ground_truth_ok_asn_cc_count" + ) + # Even more if it's happening globally + blocked_value = 0.2 + down_value = 0.8 + elif https_ground_truth_ok_count > https_ground_truth_failure_count: + analysis_transcript.append( + "https_ground_truth_ok_count > https_ground_truth_failure_count" + ) + # OTOH, if it's mostly working, then this is a sign of blocking + blocked_value = 0.7 + down_value = 0.3 + if "ssl_" in web_analysis.http_failure: + analysis_transcript.append('"ssl_" in web_analysis.http_failure') + # bad certificate is very fishy. Let's bump up the blocking value. + blocked_value = 0.9 + down_value = 0.1 + elif web_analysis.http_failure == "connection_reset": + # connection reset is very fishy. Let's bump up the blocking value. + analysis_transcript.append( + 'web_analysis.http_failure == "connection_reset"' + ) + blocked_value = 0.8 + down_value = 0.2 + + elif web_analysis.http_failure == "connection_reset": + # connection_reset very fishy. Let's bump up the blocking value. + analysis_transcript.append( + "web_analysis.http_failure == 'connection_reset'" + ) + blocked_value = 0.7 + down_value = 0.3 + + # Let's set some nice blocking keys + if web_analysis.http_failure in ["generic_timeout_error", "timed_out"]: + blocked_key, down_key = f"{prefix}.timeout", f"{prefix}.timeout" + elif web_analysis.http_failure == "connection_reset": + blocked_key, down_key = ( + f"{prefix}.connection_reset", + f"{prefix}.connection_reset", + ) + else: + blocked_key = f"{blocked_key}.{web_analysis.http_failure}" + down_key = f"{down_key}.{web_analysis.http_failure}" + + if prefix == "tls": + if blocked.tls is not None: + log.info( + f"Overwriting previous TLS blocking status {blocked.tls} - {down.tls} with {blocked_value} {down_value} ({web_analysis.measurement_uid})" + ) + blocked.tls = OutcomeStatus(key=blocked_key, value=blocked_value * ok_value) + down.tls = OutcomeStatus(key=down_key, value=down_value * ok_value) + # TODO(arturo): double check this is correct + ok.tls = OutcomeStatus(key="tls", value=1 - (blocked.sum() + down.sum())) + else: + blocked.http = OutcomeStatus( + key=blocked_key, value=blocked_value * ok_value, scope=blocking_scope + ) + down.http = OutcomeStatus(key=down_key, value=down_value * ok_value) + # TODO(arturo): double check this is correct + ok.http = OutcomeStatus(key="http", value=1 - (blocked.sum() + down.sum())) + + if blocked_key and down_key: + old_ok_value = ok_value + ok_value = 1 - (blocked.sum() + down.sum()) + assert ( + round(blocked.sum() + down.sum() + ok_value) == 1 + ), f"{blocked} + {down} + {ok_value} != 1" + + return ( + LoNI( + ok_final=ok_value, + ok=ok, + blocked=blocked, + down=down, + blocking_scope=blocking_scope, + ), + analysis_transcript, + ) + + +def make_website_experiment_results( + web_analysis: List[WebAnalysis], +) -> Generator[MeasurementExperimentResult, None, None]: + """ + Takes as input a list of web_analysis and outputs a list of + ExperimentResults for the website. + """ + experiment_result_id = "XXXX" + observation_id_list = [] + first_analysis = web_analysis[0] + + measurement_uid = first_analysis.measurement_uid + timeofday = first_analysis.measurement_start_time + + target_nettest_group = "websites" + target_category = "MISC" + target_name = map_analysis_to_target_name(first_analysis) + target_domain_name = first_analysis.target_domain_name + target_detail = first_analysis.target_detail + + analysis_transcript_list = [] + loni_list: List[LoNI] = [] + loni_blocked_list: List[OutcomeSpace] = [] + loni_down_list: List[OutcomeSpace] = [] + loni_ok_list: List[OutcomeSpace] = [] + for wa in web_analysis: + loni, analysis_transcript = calculate_web_loni(wa) + analysis_transcript_list.append(analysis_transcript) + loni_list.append(loni) + loni_blocked_list.append(loni.blocked) + loni_down_list.append(loni.down) + loni_ok_list.append(loni.ok) + + final_blocked = OutcomeSpace() + final_down = OutcomeSpace() + final_ok = OutcomeSpace() + ok_value = 0 + blocking_scope = None + + # TODO(arturo): this section needs to be formalized and verified a bit more + # in depth. Currently it's just a prototype to start seeing how the data + # looks like. + + def get_agg_outcome(loni_list, category, agg_func) -> Optional[OutcomeStatus]: + """ + Returns the min or max outcome status of the specified category given the loni list + """ + try: + return agg_func( + filter( + lambda x: x is not None, + map(lambda x: getattr(x, category), loni_list), + ), + key=lambda d: d.value if d else 0, + ) + except ValueError: + return None + + ### FINAL DNS + max_dns_blocked = get_agg_outcome(loni_blocked_list, "dns", max) + max_dns_down = get_agg_outcome(loni_down_list, "dns", max) + min_dns_ok = get_agg_outcome(loni_ok_list, "dns", min) + + if max_dns_blocked and max_dns_down and min_dns_ok: + ok_value = min_dns_ok.value + final_ok.dns = OutcomeStatus(key="dns", value=min_dns_ok.value) + final_blocked.dns = OutcomeStatus( + key=max_dns_blocked.key, value=max_dns_blocked.value + ) + final_down.dns = OutcomeStatus( + # TODO(arturo): this is overestimating blocking. + key=max_dns_down.key, + value=1 - (min_dns_ok.value + max_dns_blocked.value), + ) + if max_dns_blocked.scope: + # TODO(arturo): set this on the parent OutcomeStatus too + blocking_scope = max_dns_blocked.scope + log.debug(f"DNS done {ok_value}") + + ### FINAL TCP + max_tcp_blocked = get_agg_outcome(loni_blocked_list, "tcp", max) + max_tcp_down = get_agg_outcome(loni_down_list, "tcp", max) + min_tcp_ok = get_agg_outcome(loni_ok_list, "tcp", min) + if max_tcp_blocked and max_tcp_down and min_tcp_ok: + log.debug(f"PERFORMING TCP {ok_value}") + log.debug(f"max_tcp_blocked: {max_tcp_blocked}") + log.debug(f"max_tcp_down: {max_tcp_down}") + log.debug(f"min_tcp_ok: {min_tcp_ok}") + log.debug(f"final_down: {final_down}") + log.debug(f"final_blocked: {final_blocked}") + log.debug(f"final_ok: {final_ok}") + final_blocked.tcp = OutcomeStatus( + key=max_tcp_blocked.key, value=max_tcp_blocked.value * ok_value + ) + final_down.tcp = OutcomeStatus( + key=max_tcp_down.key, + value=(1 - (min_tcp_ok.value + max_tcp_blocked.value)) * ok_value, + ) + final_ok.tcp = OutcomeStatus(key="tcp", value=min_tcp_ok.value) + # TODO(arturo): should we update the DNS down key value in light of the + # fact we notice TCP is bad and hence the answer might have been bad to + # begin with? + old_ok_value = ok_value + ok_value = 1 - (final_blocked.sum() + final_down.sum()) + log.debug(f"TCP done {old_ok_value} -> {ok_value}") + log.debug(f"final_down: {final_down}") + log.debug(f"final_blocked: {final_blocked}") + log.debug(f"final_ok: {final_ok}") + + ### FINAL TLS + max_tls_blocked = get_agg_outcome(loni_blocked_list, "tls", max) + max_tls_down = get_agg_outcome(loni_down_list, "tls", max) + min_tls_ok = get_agg_outcome(loni_ok_list, "tls", min) + if max_tls_blocked and max_tls_down and min_tls_ok: + final_blocked.tls = OutcomeStatus( + key=max_tls_blocked.key, value=max_tls_blocked.value * ok_value + ) + final_down.tls = OutcomeStatus( + key=max_tls_down.key, + value=(1 - (min_tls_ok.value + max_tls_blocked.value)) * ok_value, + ) + final_ok.tls = OutcomeStatus(key="tls", value=min_tls_ok.value) + old_ok_value = ok_value + ok_value = 1 - (final_blocked.sum() + final_down.sum()) + log.debug(f"TLS done {old_ok_value} -> {ok_value}") + log.debug(f"final_down: {final_down}") + log.debug(f"final_blocked: {final_blocked}") + log.debug(f"final_ok: {final_ok}") + + ### FINAL HTTP + max_http_blocked = get_agg_outcome(loni_blocked_list, "http", max) + max_http_down = get_agg_outcome(loni_down_list, "http", max) + min_http_ok = get_agg_outcome(loni_ok_list, "http", min) + + if max_http_blocked and max_http_down and min_http_ok: + final_blocked.http = OutcomeStatus( + key=max_http_blocked.key, value=max_http_blocked.value * ok_value + ) + final_down.http = OutcomeStatus( + key=max_http_down.key, + value=(1 - (min_http_ok.value + max_http_blocked.value)) * ok_value, + ) + final_ok.http = OutcomeStatus(key="http", value=min_http_ok.value) + if max_http_blocked.scope: + if blocking_scope is not None: + log.warning(f"overwriting blocking_scope key: {blocking_scope}") + # TODO(arturo): set this on the parent OutcomeStatus too + blocking_scope = max_http_blocked.scope + + old_ok_value = ok_value + ok_value = 1 - (final_blocked.sum() + final_down.sum()) + log.debug(f"HTTP done {old_ok_value} -> {ok_value}") + log.debug(f"final_down: {final_down}") + log.debug(f"final_blocked: {final_blocked}") + log.debug(f"final_ok: {final_ok}") + + final_loni = LoNI( + ok_final=ok_value, + ok=final_ok, + down=final_down, + blocked=final_blocked, + blocking_scope=blocking_scope, + ) + log.debug(f"final_loni: {final_loni}") + + loni_ok_value = final_loni.ok_final + + loni_down = final_loni.down.to_dict() + loni_down_keys, loni_down_values = list(loni_down.keys()), list(loni_down.values()) + + loni_blocked = final_loni.blocked.to_dict() + loni_blocked_keys, loni_blocked_values = list(loni_blocked.keys()), list( + loni_blocked.values() + ) + + loni_ok = final_loni.ok.to_dict() + loni_ok_keys, loni_ok_values = list(loni_ok.keys()), list(loni_ok.values()) + + is_anomaly = loni_ok_value < 0.6 + is_confirmed = final_loni.blocked.sum() > 0.9 + + er = MeasurementExperimentResult( + measurement_uid=measurement_uid, + observation_id_list=observation_id_list, + timeofday=timeofday, + created_at=datetime.utcnow(), + location_network_type=first_analysis.network_type, + location_network_asn=first_analysis.probe_asn, + location_network_cc=first_analysis.probe_cc, + location_network_as_org_name=first_analysis.probe_as_org_name, + location_network_as_cc=first_analysis.probe_as_cc, + location_resolver_asn=first_analysis.resolver_asn, + location_resolver_as_org_name=first_analysis.resolver_as_org_name, + location_resolver_as_cc=first_analysis.resolver_as_cc, + location_resolver_cc=first_analysis.resolver_cc, + location_blocking_scope=None, + target_nettest_group=target_nettest_group, + target_category=target_category, + target_name=target_name, + target_domain_name=target_domain_name, + target_detail=target_detail, + loni_ok_value=loni_ok_value, + loni_down_keys=loni_down_keys, + loni_down_values=loni_down_values, + loni_blocked_keys=loni_blocked_keys, + loni_blocked_values=loni_blocked_values, + loni_ok_keys=loni_ok_keys, + loni_ok_values=loni_ok_values, + loni_list=list(map(lambda x: x.to_dict(), loni_list)), + analysis_transcript_list=analysis_transcript_list, + measurement_count=1, + observation_count=len(web_analysis), + vp_count=1, + anomaly=is_anomaly, + confirmed=is_confirmed, + ) + + yield er diff --git a/oonidata/cli/command.py b/oonidata/cli/command.py index 9348bf1a..0959fdfd 100644 --- a/oonidata/cli/command.py +++ b/oonidata/cli/command.py @@ -437,6 +437,7 @@ def checkdb( Check if the database tables require migrations. If the create-tables flag is not specified, it will not perform any operations. """ + if create_tables: if not clickhouse: click.echo("--clickhouse needs to be specified when creating tables") diff --git a/oonidata/dataviz/templates/analysis.html b/oonidata/dataviz/templates/analysis.html new file mode 100644 index 00000000..ef306635 --- /dev/null +++ b/oonidata/dataviz/templates/analysis.html @@ -0,0 +1,250 @@ + + + + + + + + + + + +
+
+
+
+

OONI Data Kraken Analysis

+

When data krakens and LoNIs join forces!

+
+
+
+ +
+ +

Experiment result output

+ +
+
+

{{website_experiment_result["measurement_uid"]}}

+ open in explorer +
+ Time Of Day {{website_experiment_result["timeofday"]}} +
+ +

Target

+
+
+ Target nettest_group {{website_experiment_result["target_nettest_group"]}} +
+
+ Target category {{website_experiment_result["target_category"]}} +
+
+ Target name {{website_experiment_result["target_name"]}} +
+
+ Target domain_name {{website_experiment_result["target_domain_name"]}} +
+
+ Target detail {{website_experiment_result["target_detail"]}} +
+
+ +

Location

+
+ +
+ ASN {{website_experiment_result["location_network_asn"]}} ({{website_experiment_result["location_network_as_org_name"]}}) +
+
+ Network type {{website_experiment_result["location_network_type"]}} +
+
+ Country {{website_experiment_result["location_network_cc"]}} +
+
+ Resolver ASN {{website_experiment_result["location_resolver_asn"]}} ({{website_experiment_result["location_resolver_as_org_name"]}}) +
+
+ Blocking scope {{website_experiment_result["location_blocking_scope"]}} +
+
+ +

Experiment result

+
+
+ anomaly{{website_experiment_result["anomaly"]}} +
+
+ confirmed{{website_experiment_result["confirmed"]}} +
+
+ + +
+
+
OK
+
{{loni_ok_value}}
+
+
+
blocked
+
{{loni_blocked_value}}
+
{{loni_blocked_dict}}
+
+
+
down
+
{{loni_down_value}}
+
{{loni_down_dict}}
+
+
+ +
+
+
{{website_experiment_result["measurement_count"]}}
+
measurement count
+
+
+
{{website_experiment_result["observation_count"]}}
+
observation count
+
+
+
{{website_experiment_result["vp_count"]}}
+
vantage point count
+
+
+ +
+
+ + +

Individual LoNIs

+ +{% for loni in loni_list %} +
+
+

Loni #{{loop.index}}

+
ok_final: {{loni["ok_final"]}}
+
+ + + + + + + + + + {% for status, value in loni["blocked"].items() %} + + + + + + {% endfor %} + {% for status, value in loni["down"].items() %} + + + + + + {% endfor %} + {% for status, value in loni["ok"].items() %} + + + + + + {% endfor %} + +
OutcomeSpaceOutcomeStatusLikelyhood
blocked{{status}}{{value}}
down{{status}}{{value}}
ok{{status}}{{value}}
+
+

Analysis transcript

+
+            {% for analysis_line in analysis_transcript_list[loop.index0] %}
+{{ analysis_line }}
+            {% endfor %}
+            
+
+
+{% endfor %} + + +

Analysis output

+ +{% for wa in web_analysis %} +

Analysis #{{loop.index}}

+
+ + + + + + + + + {% for key, value in wa.items() %} + + + + + {% endfor %} + +
keyvalue
{{key}}{{value}}
+
+{% endfor %} + +

Web Observations

+{% for wo in web_observations %} +

Observation #{{loop.index}}

+
+ + + + + + + + + {% for key, value in wo.items() %} + + + + + {% endfor %} + +
keyvalue
{{key}}{{value}}
+
+{% endfor %} + +
+ + + + + + + + + {% for key, value in website_experiment_result.items() %} + + + + + {% endfor %} + +
keyvalue
{{key}}{{value}}
+
+ + + +
+ + + diff --git a/oonidata/dataviz/web.py b/oonidata/dataviz/web.py index 43087550..88cd66a0 100644 --- a/oonidata/dataviz/web.py +++ b/oonidata/dataviz/web.py @@ -1,3 +1,15 @@ +from dataclasses import asdict +import json +from pathlib import Path +from oonidata.analysis.control import ( + BodyDB, + WebGroundTruthDB, + iter_ground_truths_from_web_control, +) +from oonidata.analysis.datasources import load_measurement +from oonidata.analysis.web_analysis import make_web_analysis +from oonidata.analysis.website_experiment_results import make_website_experiment_results +from oonidata.apiclient import get_measurement_dict_by_uid from oonidata.dataviz.viz import ( plot_blocking_world_map, plot_blocking_of_domain_in_asn, @@ -11,10 +23,87 @@ get_df_dns_analysis_raw, ) from flask import Flask, request, render_template +from oonidata.fingerprintdb import FingerprintDB +from oonidata.netinfo import NetinfoDB + +from oonidata.transforms import measurement_to_observations app = Flask(__name__) +def to_pretty_json(value): + return json.dumps( + value, sort_keys=True, indent=4, separators=(",", ": "), default=str + ) + + +app.jinja_env.filters["tojson_pretty"] = to_pretty_json + + +@app.route("/analysis/m/") +def analysis_by_msmt(measurement_uid): + data_dir = Path("tests/data/datadir/") + + fingerprintdb = FingerprintDB(datadir=data_dir, download=False) + netinfodb = NetinfoDB(datadir=data_dir, download=False) + raw_msmt = get_measurement_dict_by_uid(measurement_uid) + msmt = load_measurement(msmt=raw_msmt) + web_observations, web_control_observations = measurement_to_observations( + msmt, netinfodb=netinfodb + ) + web_ground_truth_db = WebGroundTruthDB() + web_ground_truth_db.build_from_rows( + rows=iter_ground_truths_from_web_control( + web_control_observations=web_control_observations, + netinfodb=netinfodb, + ), + ) + + web_ground_truths = web_ground_truth_db.lookup_by_web_obs(web_obs=web_observations) + web_analysis = list( + make_web_analysis( + web_observations=web_observations, + web_ground_truths=web_ground_truths, + body_db=BodyDB(db=None), # type: ignore + fingerprintdb=fingerprintdb, + ) + ) + + # assert len(web_analysis) == len( + # web_observations + # ), f"web_analysis != web_obs {len(web_analysis)} != {len(web_observations)}" + # for wa in web_analysis: + # print_nice_vertical(wa) + + website_er = list(make_website_experiment_results(web_analysis)) + assert len(website_er) == 1 + + wer = website_er[0] + analysis_transcript_list = wer.analysis_transcript_list + + # wer.analysis_transcript_list = None + # print_nice_vertical(wer) + # for loni in loni_list: + # pprint(loni.to_dict()) + # print(analysis_transcript_list) + + return render_template( + "analysis.html", + website_experiment_result=asdict(wer), + analysis_transcript_list=analysis_transcript_list, + loni_list=wer.loni_list, + raw_msmt=raw_msmt, + measurement_uid=measurement_uid, + web_analysis=list(map(lambda x: asdict(x), web_analysis)), + web_observations=list(map(lambda x: asdict(x), web_observations)), + loni_blocked_dict=dict(zip(wer.loni_blocked_keys, wer.loni_blocked_values)), + loni_blocked_value=sum(wer.loni_blocked_values), + loni_down_dict=dict(zip(wer.loni_down_keys, wer.loni_down_values)), + loni_down_value=sum(wer.loni_down_values), + loni_ok_value=wer.loni_ok_value, + ) + + @app.route("/api/_/viz/data/world_map") def data_world_map(): blocking_threshold = float(request.args.get("blocking_threshold", 0.7)) diff --git a/oonidata/db/create_tables.py b/oonidata/db/create_tables.py index 4c967427..8485b6a1 100644 --- a/oonidata/db/create_tables.py +++ b/oonidata/db/create_tables.py @@ -6,6 +6,7 @@ from oonidata.db.connections import ClickhouseConnection from oonidata.models.experiment_result import ( ExperimentResult, + MeasurementExperimentResult, ) from oonidata.models.analysis import WebAnalysis from oonidata.models.observations import ( @@ -61,9 +62,19 @@ def typing_to_clickhouse(t: Any) -> str: if t == dict: return "String" + # Casting it to JSON + if t == List[Dict]: + return "String" + if t == Optional[float]: return "Nullable(Float64)" + if t == List[float]: + return "Array(Float64)" + + if t == List[List[str]]: + return "Array(Array(String))" + if t in (List[str], List[bytes]): return "Array(String)" @@ -104,29 +115,6 @@ def create_query_for_observation(obs_class: Type[ObservationBase]) -> Tuple[str, ) -def create_query_for_experiment_result() -> Tuple[str, str]: - columns = [] - for f in ExperimentResult._fields: - t = ExperimentResult.__annotations__.get(f) - type_str = typing_to_clickhouse(t) - columns.append(f" {f} {type_str}") - - columns_str = ",\n".join(columns) - table_name = ExperimentResult.__table_name__ - - return ( - f""" - CREATE TABLE IF NOT EXISTS {table_name} ( -{columns_str} - ) - ENGINE = ReplacingMergeTree - ORDER BY (measurement_uid, experiment_result_id) - SETTINGS index_granularity = 8192; - """, - "experiment_result", - ) - - def create_query_for_analysis(base_class) -> Tuple[str, str]: columns = [] for f in fields(base_class): @@ -169,8 +157,8 @@ def create_query_for_analysis(base_class) -> Tuple[str, str]: create_query_for_observation(WebObservation), create_query_for_observation(WebControlObservation), create_query_for_observation(HTTPMiddleboxObservation), - create_query_for_experiment_result(), create_query_for_analysis(WebAnalysis), + create_query_for_analysis(MeasurementExperimentResult), CREATE_QUERY_FOR_LOGS, ] diff --git a/oonidata/models/analysis.py b/oonidata/models/analysis.py index 7cedd4be..382e78f8 100644 --- a/oonidata/models/analysis.py +++ b/oonidata/models/analysis.py @@ -67,6 +67,7 @@ class WebAnalysis: dns_consistency_system_is_answer_probe_cc_match: Optional[bool] = None dns_consistency_system_is_answer_bogon: Optional[bool] = None dns_consistency_system_answer_fp_name: Optional[str] = None + dns_consistency_system_answer_fp_scope: Optional[str] = None dns_consistency_system_is_answer_fp_match: Optional[bool] = None dns_consistency_system_is_answer_fp_country_consistent: Optional[bool] = None dns_consistency_system_is_answer_fp_false_positive: Optional[bool] = None @@ -88,6 +89,7 @@ class WebAnalysis: dns_consistency_other_is_answer_probe_cc_match: Optional[bool] = None dns_consistency_other_is_answer_bogon: Optional[bool] = None dns_consistency_other_answer_fp_name: Optional[str] = None + dns_consistency_other_answer_fp_scope: Optional[str] = None dns_consistency_other_is_answer_fp_match: Optional[bool] = None dns_consistency_other_is_answer_fp_country_consistent: Optional[bool] = None dns_consistency_other_is_answer_fp_false_positive: Optional[bool] = None @@ -112,6 +114,7 @@ class WebAnalysis: tls_ground_truth_trusted_ok_count: Optional[int] = None tcp_address: Optional[str] = None tcp_success: Optional[bool] = None + tcp_failure: Optional[str] = None tcp_ground_truth_failure_count: Optional[int] = None tcp_ground_truth_failure_asn_cc_count: Optional[int] = None tcp_ground_truth_ok_count: Optional[int] = None @@ -132,6 +135,7 @@ class WebAnalysis: http_ground_truth_trusted_failure_count: Optional[int] = None http_ground_truth_body_length: Optional[int] = None http_fp_name: Optional[str] = None + http_fp_scope: Optional[str] = None http_is_http_fp_match: Optional[bool] = None http_is_http_fp_country_consistent: Optional[bool] = None http_is_http_fp_false_positive: Optional[bool] = None diff --git a/oonidata/models/experiment_result.py b/oonidata/models/experiment_result.py index 7fc9395f..f97c8a20 100644 --- a/oonidata/models/experiment_result.py +++ b/oonidata/models/experiment_result.py @@ -1,4 +1,5 @@ import dataclasses +from dataclasses import dataclass import logging from typing import Any, Dict, Generator, List, Optional, NamedTuple, Mapping, Tuple from enum import Enum @@ -68,16 +69,19 @@ class Outcome(NamedTuple): blocked_score: float -class ExperimentResultInstant(NamedTuple): - __table_name__ = "experiment_result_instant" - - experiment_result_id: str +@dataclass +class MeasurementExperimentResult: + __table_name__ = "measurement_experiment_result" + __table_index__ = ( + "measurement_uid", + "timeofday", + ) # The measurement used to generate this experiment result measurement_uid: str # The list of observations used to generate this experiment result - observation_id: List[str] + observation_id_list: List[str] # The timeofday for which this experiment result is relevant. We use the # timeofday convention to differentiate it from the timestamp which is an @@ -161,9 +165,15 @@ class ExperimentResultInstant(NamedTuple): loni_blocked_keys: List[str] loni_blocked_values: List[float] + loni_ok_keys: List[str] + loni_ok_values: List[float] + + # Encoded as JSON + loni_list: List[Dict] + # Inside this string we include a representation of the logic that lead us # to produce the above loni values - analysis_transcript: Optional[str] + analysis_transcript_list: List[List[str]] # Number of measurements used to produce this experiment result measurement_count: int @@ -173,8 +183,8 @@ class ExperimentResultInstant(NamedTuple): vp_count: int # Backward compatible anomaly/confirmed flags - anomaly: bool - confirmed: bool + anomaly: Optional[bool] + confirmed: Optional[bool] class ExperimentResult(NamedTuple): diff --git a/oonidata/workers/analysis.py b/oonidata/workers/analysis.py index dad0ab24..f6bd9ad3 100644 --- a/oonidata/workers/analysis.py +++ b/oonidata/workers/analysis.py @@ -3,6 +3,7 @@ import pathlib from datetime import date, datetime from typing import Dict, List +import orjson import statsd @@ -14,11 +15,13 @@ from oonidata.analysis.control import BodyDB, WebGroundTruthDB from oonidata.analysis.datasources import iter_web_observations from oonidata.analysis.web_analysis import make_web_analysis +from oonidata.analysis.website_experiment_results import make_website_experiment_results from oonidata.dataclient import date_interval from oonidata.datautils import PerfTimer from oonidata.db.connections import ClickhouseConnection from oonidata.fingerprintdb import FingerprintDB from oonidata.models.analysis import WebAnalysis +from oonidata.models.experiment_result import MeasurementExperimentResult from oonidata.netinfo import NetinfoDB from oonidata.workers.ground_truths import maybe_build_web_ground_truth @@ -27,6 +30,7 @@ get_prev_range, make_db_rows, maybe_delete_prev_range, + optimize_all_tables, ) log = logging.getLogger("oonidata.processing") @@ -58,13 +62,17 @@ def make_analysis_in_a_day( fast_fail: bool, day: date, ): + log.info("Optimizing all tables") + optimize_all_tables(clickhouse) + statsd_client = statsd.StatsClient("localhost", 8125) fingerprintdb = FingerprintDB(datadir=data_dir, download=False) body_db = BodyDB(db=ClickhouseConnection(clickhouse)) db_writer = ClickhouseConnection(clickhouse, row_buffer_size=10_000) db_lookup = ClickhouseConnection(clickhouse) - column_names = [f.name for f in dataclasses.fields(WebAnalysis)] + column_names_wa = [f.name for f in dataclasses.fields(WebAnalysis)] + column_names_er = [f.name for f in dataclasses.fields(MeasurementExperimentResult)] prev_range = get_prev_range( db=db_lookup, @@ -115,7 +123,7 @@ def make_analysis_in_a_day( continue idx += 1 table_name, rows = make_db_rows( - dc_list=website_analysis, column_names=column_names + dc_list=website_analysis, column_names=column_names_wa ) statsd_client.incr("oonidata.web_analysis.analysis.obs", 1, rate=0.1) # type: ignore statsd_client.gauge("oonidata.web_analysis.analysis.obs_idx", idx, rate=0.1) # type: ignore @@ -125,8 +133,23 @@ def make_analysis_in_a_day( db_writer.write_rows( table_name=table_name, rows=rows, - column_names=column_names, + column_names=column_names_wa, + ) + + with statsd_client.timer("oonidata.web_analysis.experiment_results.timing"): + website_er = list(make_website_experiment_results(website_analysis)) + table_name, rows = make_db_rows( + dc_list=website_er, + column_names=column_names_er, + custom_remap={"loni_list": orjson.dumps}, ) + + db_writer.write_rows( + table_name=table_name, + rows=rows, + column_names=column_names_er, + ) + except: web_obs_ids = ",".join(map(lambda wo: wo.observation_id, web_obs)) log.error(f"failed to generate analysis for {web_obs_ids}", exc_info=True) @@ -175,11 +198,12 @@ def make_cc_batches( cc_batches = [] current_cc_batch_size = 0 current_cc_batch = [] - while cnt_by_cc: + cnt_by_cc_sorted = sorted(cnt_by_cc.items(), key=lambda x: x[0]) + while cnt_by_cc_sorted: while current_cc_batch_size <= max_obs_per_batch: try: - cc, cnt = cnt_by_cc.popitem() - except KeyError: + cc, cnt = cnt_by_cc_sorted.pop() + except IndexError: break current_cc_batch.append(cc) current_cc_batch_size += cnt diff --git a/oonidata/workers/common.py b/oonidata/workers/common.py index 7a5346f8..35187ed6 100644 --- a/oonidata/workers/common.py +++ b/oonidata/workers/common.py @@ -6,6 +6,8 @@ from datetime import date, datetime, timedelta from typing import ( + Any, + Callable, Dict, List, NamedTuple, @@ -21,6 +23,7 @@ from oonidata.db.connections import ( ClickhouseConnection, ) +from oonidata.db.create_tables import create_queries log = logging.getLogger("oonidata.processing") @@ -73,12 +76,13 @@ def get_prev_range( end_timestamp = None where = None where = "WHERE bucket_date = %(bucket_date)s" - q_args = {"bucket_date": bucket_date} + q_args: Dict[str, Any] = {"bucket_date": bucket_date} if timestamp: start_timestamp = timestamp end_timestamp = timestamp + timedelta(days=1) - q_args = {"start_timestamp": start_timestamp, "end_timestamp": end_timestamp} - where = f"WHERE {timestamp_column} >= %(start_timestamp)s AND {timestamp_column} < %(end_timestamp)s" + q_args["start_timestamp"] = start_timestamp + q_args["end_timestamp"] = end_timestamp + where += f" AND {timestamp_column} >= %(start_timestamp)s AND {timestamp_column} < %(end_timestamp)s" if len(test_name) > 0: test_name_list = [] @@ -95,7 +99,10 @@ def get_prev_range( probe_cc_list.append(f"'{cc}'") where += " AND probe_cc IN ({})".format(",".join(probe_cc_list)) - prev_obs_range = db.execute(q + where, q_args) + final_query = q + where + print(final_query) + print(q_args) + prev_obs_range = db.execute(final_query, q_args) assert isinstance(prev_obs_range, list) and len(prev_obs_range) == 1 max_created_at, min_created_at = prev_obs_range[0] @@ -115,6 +122,12 @@ def get_prev_range( ) +def optimize_all_tables(clickhouse): + with ClickhouseConnection(clickhouse) as db: + for _, table_name in create_queries: + db.execute(f"OPTIMIZE TABLE {table_name}") + + def get_obs_count_by_cc( db: ClickhouseConnection, test_name: List[str], @@ -159,17 +172,25 @@ def maybe_delete_prev_range( def make_db_rows( - dc_list: List, column_names: List[str], bucket_date: Optional[str] = None + dc_list: List, + column_names: List[str], + bucket_date: Optional[str] = None, + custom_remap: Optional[Dict[str, Callable]] = None, ) -> Tuple[str, List[str]]: assert len(dc_list) > 0 + def maybe_remap(k, value): + if custom_remap and k in custom_remap: + return custom_remap[k](value) + return value + table_name = dc_list[0].__table_name__ rows = [] for d in dc_list: if bucket_date: d.bucket_date = bucket_date assert table_name == d.__table_name__, "inconsistent group of observations" - rows.append(tuple(getattr(d, k) for k in column_names)) + rows.append(tuple(maybe_remap(k, getattr(d, k)) for k in column_names)) return table_name, rows diff --git a/oonidata/workers/observations.py b/oonidata/workers/observations.py index 7721826a..c518bb2c 100644 --- a/oonidata/workers/observations.py +++ b/oonidata/workers/observations.py @@ -37,6 +37,7 @@ get_prev_range, make_db_rows, maybe_delete_prev_range, + optimize_all_tables, ) log = logging.getLogger("oonidata.processing") @@ -210,6 +211,9 @@ def start_observation_maker( fast_fail: bool, log_level: int = logging.INFO, ): + log.info("Optimizing all tables") + optimize_all_tables(clickhouse) + dask_client = DaskClient( threads_per_worker=2, n_workers=parallelism, @@ -219,6 +223,8 @@ def start_observation_maker( total_size, total_msmt_count = 0, 0 day_list = list(date_interval(start_day, end_day)) # See: https://stackoverflow.com/questions/51099685/best-practices-in-setting-number-of-dask-workers + + log.info(f"Starting observation making on {probe_cc} ({start_day} - {end_day})") for day in day_list: t = PerfTimer() size, msmt_count = make_observation_in_day( diff --git a/tests/test_analysis.py b/tests/test_analysis.py index 137544f3..73ea8ed4 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -310,6 +310,6 @@ def test_website_web_analysis_blocked(fingerprintdb, netinfodb, measurements, da ) ) assert len(web_analysis) == len(web_obs) - for wa in web_analysis: - print(wa.measurement_uid) - print_nice_vertical(wa) + # for wa in web_analysis: + # print(wa.measurement_uid) + # print_nice_vertical(wa) diff --git a/tests/test_cli.py b/tests/test_cli.py index 440c6698..bf3e5158 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -58,7 +58,7 @@ def test_full_workflow( "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/testing_oonidata", + db.clickhouse_url, # "--archives-dir", # tmp_path.absolute(), ], @@ -90,7 +90,7 @@ def test_full_workflow( "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/testing_oonidata", + db.clickhouse_url, ], ) assert result.exit_code == 0 @@ -134,7 +134,7 @@ def test_full_workflow( result = cli_runner.invoke( cli, [ - "mker", + "mkanalysis", "--probe-cc", "BA", "--start-day", @@ -146,11 +146,11 @@ def test_full_workflow( "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/testing_oonidata", + db.clickhouse_url, ], ) assert result.exit_code == 0 res = db.execute( - "SELECT COUNT(DISTINCT(measurement_uid)) FROM experiment_result WHERE measurement_uid LIKE '20221020%' AND probe_cc = 'BA'" + "SELECT COUNT(DISTINCT(measurement_uid)) FROM measurement_experiment_result WHERE measurement_uid LIKE '20221020%' AND location_network_cc = 'BA'" ) assert res[0][0] == 200 # type: ignore diff --git a/tests/test_ctrl.py b/tests/test_ctrl.py index 2c4452a4..c5d86b81 100644 --- a/tests/test_ctrl.py +++ b/tests/test_ctrl.py @@ -72,7 +72,7 @@ def test_web_ground_truth_from_clickhouse(db, datadir, netinfodb, tmp_path): resolver_cc="US", resolver_as_org_name="Verizon Business", resolver_as_cc="US", - resolver_is_scrubbed=0, + resolver_is_scrubbed=False, resolver_asn_probe=22394, resolver_as_org_name_probe="Verizon Business", observation_id="TEST", @@ -83,7 +83,7 @@ def test_web_ground_truth_from_clickhouse(db, datadir, netinfodb, tmp_path): ip_as_org_name="Fastly, Inc.", ip_as_cc="US", ip_cc="US", - ip_is_bogon=0, + ip_is_bogon=False, dns_query_type="A", dns_failure=None, dns_engine="system", @@ -94,13 +94,13 @@ def test_web_ground_truth_from_clickhouse(db, datadir, netinfodb, tmp_path): dns_answer_as_org_name="Fastly, Inc.", dns_t=0.117683385, tcp_failure=None, - tcp_success=1, + tcp_success=True, tcp_t=0.583859739, tls_failure=None, tls_server_name=None, tls_version=None, tls_cipher_suite=None, - tls_is_certificate_valid=1, + tls_is_certificate_valid=True, tls_end_entity_certificate_fingerprint=None, tls_end_entity_certificate_subject=None, tls_end_entity_certificate_subject_common_name=None, @@ -136,8 +136,8 @@ def test_web_ground_truth_from_clickhouse(db, datadir, netinfodb, tmp_path): probe_analysis="false", pp_http_response_fingerprints=[], pp_http_fingerprint_country_consistent=None, - pp_http_response_matches_blockpage=0, - pp_http_response_matches_false_positive=0, + pp_http_response_matches_blockpage=False, + pp_http_response_matches_false_positive=False, pp_http_response_body_title=None, pp_http_response_body_meta_title=None, pp_dns_fingerprint_id=None, diff --git a/tests/test_experiment_results.py b/tests/test_experiment_results.py new file mode 100644 index 00000000..5e399f76 --- /dev/null +++ b/tests/test_experiment_results.py @@ -0,0 +1,67 @@ +from pprint import pprint +from oonidata.analysis.control import ( + BodyDB, + WebGroundTruthDB, + iter_ground_truths_from_web_control, +) +from oonidata.analysis.datasources import load_measurement +from oonidata.analysis.web_analysis import make_web_analysis +from oonidata.analysis.website_experiment_results import make_website_experiment_results +from oonidata.models.observations import print_nice, print_nice_vertical +from oonidata.transforms import measurement_to_observations + + +# Check this for wc 0.5 overwriting tls analsysis +# 20231031000227.813597_MY_webconnectivity_2f0b80761373aa7e +def test_website_experiment_results(measurements, netinfodb, fingerprintdb): + msmt = load_measurement( + msmt_path=measurements[ + "20221101055235.141387_RU_webconnectivity_046ce024dd76b564" + ] + ) + web_observations, web_control_observations = measurement_to_observations( + msmt, netinfodb=netinfodb + ) + assert isinstance(msmt.input, str) + web_ground_truth_db = WebGroundTruthDB() + web_ground_truth_db.build_from_rows( + rows=iter_ground_truths_from_web_control( + web_control_observations=web_control_observations, + netinfodb=netinfodb, + ), + ) + + web_ground_truths = web_ground_truth_db.lookup_by_web_obs(web_obs=web_observations) + web_analysis = list( + make_web_analysis( + web_observations=web_observations, + web_ground_truths=web_ground_truths, + body_db=BodyDB(db=None), # type: ignore + fingerprintdb=fingerprintdb, + ) + ) + + # TODO(arturo): there is currently an edge case here which is when we get an + # IPv6 answer, since we are ignoring them in the analysis, we will have N + # less analysis where N is the number of IPv6 addresses. + assert len(web_analysis) == len(web_observations) + # for wa in web_analysis: + # print_nice_vertical(wa) + + website_er = list(make_website_experiment_results(web_analysis)) + assert len(website_er) == 1 + + wer = website_er[0] + analysis_transcript_list = wer.analysis_transcript_list + + assert ( + sum(wer.loni_blocked_values) + sum(wer.loni_down_values) + wer.loni_ok_value + == 1 + ) + assert wer.anomaly == True + + # wer.analysis_transcript_list = None + # print_nice_vertical(wer) + # for loni in wer.loni_list: + # pprint(loni.to_dict()) + # print(analysis_transcript_list) diff --git a/tests/test_workers.py b/tests/test_workers.py index 216d4faf..4b97d58d 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -1,20 +1,19 @@ -from datetime import date +from datetime import date, datetime, timedelta, timezone import gzip from pathlib import Path import sqlite3 -from typing import List +from typing import List, Tuple from unittest.mock import MagicMock from oonidata.analysis.datasources import load_measurement from oonidata.dataclient import stream_jsonl -from oonidata.db.connections import ClickhouseConnection from oonidata.models.nettests.dnscheck import DNSCheck from oonidata.models.nettests.web_connectivity import WebConnectivity from oonidata.models.nettests.http_invalid_request_line import HTTPInvalidRequestLine from oonidata.models.observations import HTTPMiddleboxObservation from oonidata.workers.analysis import make_analysis_in_a_day, make_cc_batches, make_ctrl -from oonidata.workers.common import get_obs_count_by_cc +from oonidata.workers.common import get_obs_count_by_cc, get_prev_range from oonidata.workers.observations import ( make_observations_for_file_entry_batch, write_observations_to_db, @@ -25,6 +24,72 @@ from oonidata.transforms.nettests.measurement_transformer import MeasurementTransformer +def test_get_prev_range(db): + db.execute("DROP TABLE IF EXISTS test_range") + db.execute( + """CREATE TABLE test_range ( + created_at DateTime64(3, 'UTC'), + bucket_date String, + test_name String, + probe_cc String + ) + ENGINE = MergeTree + ORDER BY (bucket_date, created_at) + """ + ) + bucket_date = "2000-01-01" + test_name = "web_connectivity" + probe_cc = "IT" + min_time = datetime(2000, 1, 1, 23, 42, 00, tzinfo=timezone.utc) + rows = [(min_time, bucket_date, test_name, probe_cc)] + for i in range(200): + rows.append((min_time + timedelta(seconds=i), bucket_date, test_name, probe_cc)) + db.execute( + "INSERT INTO test_range (created_at, bucket_date, test_name, probe_cc) VALUES", + rows, + ) + prev_range = get_prev_range( + db, + "test_range", + test_name=[test_name], + bucket_date=bucket_date, + probe_cc=[probe_cc], + ) + assert prev_range.min_created_at and prev_range.max_created_at + assert prev_range.min_created_at == (min_time - timedelta(seconds=1)) + assert prev_range.max_created_at == (rows[-1][0] + timedelta(seconds=1)) + db.execute("TRUNCATE TABLE test_range") + + bucket_date = "2000-03-01" + test_name = "web_connectivity" + probe_cc = "IT" + min_time = datetime(2000, 1, 1, 23, 42, 00, tzinfo=timezone.utc) + rows: List[Tuple[datetime, str, str, str]] = [] + for i in range(10): + rows.append( + (min_time + timedelta(seconds=i), "2000-02-01", test_name, probe_cc) + ) + min_time = rows[-1][0] + for i in range(10): + rows.append((min_time + timedelta(seconds=i), bucket_date, test_name, probe_cc)) + + db.execute( + "INSERT INTO test_range (created_at, bucket_date, test_name, probe_cc) VALUES", + rows, + ) + prev_range = get_prev_range( + db, + "test_range", + test_name=[test_name], + bucket_date=bucket_date, + probe_cc=[probe_cc], + ) + assert prev_range.min_created_at and prev_range.max_created_at + assert prev_range.min_created_at == (min_time - timedelta(seconds=1)) + assert prev_range.max_created_at == (rows[-1][0] + timedelta(seconds=1)) + db.execute("DROP TABLE test_range") + + def test_make_cc_batches(): cc_batches = make_cc_batches( cnt_by_cc={"IT": 100, "IR": 300, "US": 1000}, From 0f2069f22572ea734f6341ec8ed998d2c680eb71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 20 Nov 2023 11:28:25 +0100 Subject: [PATCH 2/3] Delete dead code for previous version of experiment results --- oonidata/analysis/websites.py | 1187 ------------------------ oonidata/cli/command.py | 14 +- oonidata/workers/__init__.py | 1 - oonidata/workers/experiment_results.py | 258 ----- tests/test_analysis.py | 17 +- tests/test_scoring.py | 4 +- 6 files changed, 10 insertions(+), 1471 deletions(-) delete mode 100644 oonidata/analysis/websites.py delete mode 100644 oonidata/workers/experiment_results.py diff --git a/oonidata/analysis/websites.py b/oonidata/analysis/websites.py deleted file mode 100644 index 89eaaf77..00000000 --- a/oonidata/analysis/websites.py +++ /dev/null @@ -1,1187 +0,0 @@ -from collections import defaultdict -from dataclasses import dataclass -import dataclasses -import ipaddress -import math -from typing import Any, Generator, Iterable, NamedTuple, Optional, List, Dict -from oonidata.db.connections import ClickhouseConnection -from oonidata.analysis.control import ( - WebGroundTruth, - BodyDB, -) -from oonidata.models.experiment_result import ( - BlockingScope, - Outcome, - Scores, - ExperimentResult, - fp_to_scope, - iter_experiment_results, -) - -from oonidata.fingerprintdb import FingerprintDB -from oonidata.models.observations import WebControlObservation, WebObservation - -import logging - -log = logging.getLogger("oonidata.processing") - -CLOUD_PROVIDERS_ASNS = [ - 13335, # Cloudflare: https://www.peeringdb.com/net/4224 - 20940, # Akamai: https://www.peeringdb.com/net/2 - 9002, # Akamai RETN - 396982, # Google Cloud: https://www.peeringdb.com/net/30878 -] - -CLOUD_PROVIDERS_AS_ORGS_SUBSTRINGS = ["akamai"] - - -def get_web_ctrl_observations( - db: ClickhouseConnection, measurement_uid: str -) -> List[WebControlObservation]: - obs_list = [] - column_names = [f.name for f in dataclasses.fields(WebControlObservation)] - q = "SELECT (" - q += ",\n".join(column_names) - q += ") FROM obs_web_ctrl WHERE measurement_uid = %(measurement_uid)s" - - for res in db.execute_iter(q, {"measurement_uid": measurement_uid}): - row = res[0] - obs_list.append( - WebControlObservation(**{k: row[idx] for idx, k in enumerate(column_names)}) - ) - return obs_list - - -def is_cloud_provider(asn: Optional[int], as_org_name: Optional[str]): - if asn and asn in CLOUD_PROVIDERS_ASNS: - return True - if as_org_name and any( - [ss in as_org_name.lower() for ss in CLOUD_PROVIDERS_AS_ORGS_SUBSTRINGS] - ): - return True - return False - - -def encode_address(ip: str, port: int) -> str: - """ - return a properly encoded address handling IPv6 IPs - """ - # I'm amazed python doesn't have this in the standard library - # and urlparse is incredibly inconsistent with it's handling of IPv6 - # addresses. - ipaddr = ipaddress.ip_address(ip) - addr = ip - if isinstance(ipaddr, ipaddress.IPv6Address): - addr = "[" + ip + "]" - - addr += f":{port}" - return addr - - -def confidence_estimate(x: int, factor: float = 0.8, clamping: float = 0.9): - """ - Gives an estimate of confidence given the number of datapoints that are - consistent (x). - - clamping: defines what is the maximum value it can take - factor: is a multiplicate factor to decrease the value of the function - - This function was derived by looking for an exponential function in - the form f(x) = c1*a^x + c2 and solving for f(0) = 0 and f(10) = 1, - giving us a function in the form f(x) = (a^x - 1) / (a^10 - 1). We - then choose the magic value of 0.5 by looking for a solution in a - where f(1) ~= 0.5, doing a bit of plots and choosing a curve that - looks reasonably sloped. - """ - y = (pow(0.5, x) - 1) / (pow(0.5, 10) - 1) - return min(clamping, factor * y) - - -def ok_vs_nok_score( - ok_count: int, nok_count: int, blocking_factor: float = 0.8 -) -> Scores: - """ - This is a very simplistic estimation that just looks at the proportions of - failures to reachable measurement to establish if something is blocked or - not. - - It assumes that what we are calculating the ok_vs_nok score is some kind of - failure, which means the outcome is either that the target is down or - blocked. - - In order to determine which of the two cases it is, we look at the ground - truth. - """ - # We set this to 0.65 so that for the default value and lack of any ground - # truth, we end up with blocked_score = 0.52 and down_score = 0.48 - blocked_score = min(1.0, 0.65 * blocking_factor) - down_score = 1 - blocked_score - total_count = ok_count + nok_count - if total_count > 0: - blocked_score = min(1.0, ok_count / total_count * blocking_factor) - down_score = 1 - blocked_score - - return Scores(ok=0.0, blocked=blocked_score, down=down_score) - - -def make_tcp_outcome( - web_o: WebObservation, web_ground_truths: List[WebGroundTruth] -) -> Outcome: - assert web_o.ip is not None and web_o.port is not None - - blocking_subject = encode_address(web_o.ip, web_o.port) - - # It's working, wothing to see here, go on with your life - if web_o.tcp_success: - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tcp", - detail="ok", - meta={}, - ok_score=1.0, - down_score=0.0, - blocked_score=0.0, - ) - - assert ( - web_o.tcp_failure is not None - ), "inconsistency between tcp_success and tcp_failure" - - ground_truths = filter( - lambda gt: gt.ip == web_o.ip and gt.port == web_o.port, web_ground_truths - ) - unreachable_cc_asn = set() - reachable_cc_asn = set() - ok_count = 0 - nok_count = 0 - for gt in ground_truths: - # We don't check for strict == True, since depending on the DB engine - # True could also be represented as 1 - if gt.tcp_success is None: - continue - if gt.tcp_success: - if gt.is_trusted_vp: - ok_count += 1 - else: - reachable_cc_asn.add((gt.vp_cc, gt.vp_asn)) - else: - if gt.is_trusted_vp: - nok_count += 1 - else: - unreachable_cc_asn.add((gt.vp_cc, gt.vp_asn)) - - reachable_count = ok_count + len(reachable_cc_asn) - unreachable_count = nok_count + len(unreachable_cc_asn) - blocking_meta = { - "unreachable_count": str(unreachable_count), - "reachable_count": str(reachable_count), - } - - blocking_factor = 0.7 - if web_o.tls_failure == "connection_reset": - blocking_factor = 0.8 - - scores = ok_vs_nok_score( - ok_count=reachable_count, - nok_count=unreachable_count, - blocking_factor=blocking_factor, - ) - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tcp", - detail=web_o.tcp_failure, - meta=blocking_meta, - ok_score=scores.ok, - blocked_score=scores.blocked, - down_score=scores.down, - ) - - -class DNSFingerprintOutcome(NamedTuple): - meta: Dict[str, str] - label: str - scope: BlockingScope - - -def match_dns_fingerprint( - dns_observations: List[WebObservation], fingerprintdb: FingerprintDB -) -> DNSFingerprintOutcome: - outcome_meta = {} - outcome_label = "" - outcome_scope = BlockingScope.UNKNOWN - for web_o in dns_observations: - if not web_o.dns_answer: - continue - fp = fingerprintdb.match_dns(web_o.dns_answer) - if fp: - outcome_scope = fp_to_scope(fp.scope) - if outcome_scope != BlockingScope.SERVER_SIDE_BLOCK: - outcome_label = f"blocked" - outcome_meta["fingerprint"] = fp.name - outcome_meta["fingerprint_consistency"] = "country_consistent" - - # If we see the fingerprint in an unexpected country we should - # significantly reduce the confidence in the block - if fp.expected_countries and web_o.probe_cc not in fp.expected_countries: - log.debug( - f"Inconsistent probe_cc vs expected_countries {web_o.probe_cc} != {fp.expected_countries}" - ) - outcome_meta["fingerprint_consistency"] = "country_inconsistent" - return DNSFingerprintOutcome( - meta=outcome_meta, label=outcome_label, scope=outcome_scope - ) - return DNSFingerprintOutcome( - meta=outcome_meta, label=outcome_label, scope=outcome_scope - ) - - -class DNSGroundTruth(NamedTuple): - nxdomain_cc_asn: set - failure_cc_asn: set - ok_cc_asn: set - other_ips: Dict[str, set] - other_asns: Dict[str, set] - trusted_answers: Dict - - @property - def ok_count(self): - return len(self.ok_cc_asn) - - @property - def failure_count(self): - return len(self.failure_cc_asn) - - @property - def nxdomain_count(self): - return len(self.nxdomain_cc_asn) - - -def make_dns_ground_truth(ground_truths: Iterable[WebGroundTruth]): - """ - Here we count how many vantage vantage points, as in distinct probe_cc, - probe_asn pairs, presented the various types of results. - """ - nxdomain_cc_asn = set() - failure_cc_asn = set() - ok_cc_asn = set() - other_ips = defaultdict(set) - other_asns = defaultdict(set) - trusted_answers = {} - for gt in ground_truths: - if gt.dns_success is None: - continue - if gt.dns_failure == "dns_nxdomain_error": - nxdomain_cc_asn.add((gt.vp_cc, gt.vp_asn)) - if not gt.dns_success: - failure_cc_asn.add((gt.vp_cc, gt.vp_asn)) - continue - - ok_cc_asn.add((gt.vp_cc, gt.vp_asn)) - other_ips[gt.ip].add((gt.vp_cc, gt.vp_asn)) - assert gt.ip, "did not find IP in ground truth" - other_asns[gt.ip_asn].add((gt.vp_cc, gt.vp_asn)) - if gt.tls_is_certificate_valid == True or gt.is_trusted_vp == True: - trusted_answers[gt.ip] = gt - - return DNSGroundTruth( - nxdomain_cc_asn=nxdomain_cc_asn, - failure_cc_asn=failure_cc_asn, - ok_cc_asn=ok_cc_asn, - other_asns=other_asns, - other_ips=other_ips, - trusted_answers=trusted_answers, - ) - - -def compute_dns_failure_outcomes( - dns_ground_truth: DNSGroundTruth, dns_observations: List[WebObservation] -) -> List[Outcome]: - outcomes = [] - for web_o in dns_observations: - if not web_o.dns_failure: - continue - - outcome_meta = { - "ok_count": str(dns_ground_truth.ok_count), - "failure_count": str(dns_ground_truth.failure_count), - "nxdomain_count": str(dns_ground_truth.nxdomain_count), - } - scores = ok_vs_nok_score( - ok_count=dns_ground_truth.ok_count, - nok_count=dns_ground_truth.failure_count, - ) - - blocking_detail = f"failure.{web_o.dns_failure}" - if web_o.dns_failure == "dns_nxdomain_error": - blocking_detail = "inconsistent.nxdomain" - scores = ok_vs_nok_score( - ok_count=dns_ground_truth.ok_count, - nok_count=dns_ground_truth.nxdomain_count, - blocking_factor=0.85, - ) - outcome_subject = ( - f"{web_o.hostname}@{web_o.dns_engine}-{web_o.dns_engine_resolver_address}" - ) - outcomes.append( - Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - subject=outcome_subject, - label="", - category="dns", - detail=blocking_detail, - meta=outcome_meta, - ok_score=scores.ok, - down_score=scores.down, - blocked_score=scores.blocked, - ) - ) - return outcomes - - -def dns_observations_by_resolver( - dns_observations: List[WebObservation], -) -> Dict[str, List[WebObservation]]: - by_resolver = defaultdict(list) - for dns_o in dns_observations: - key = f"{dns_o.dns_engine}-{dns_o.dns_engine_resolver_address}" - by_resolver[key].append(dns_o) - return by_resolver - - -def get_outcome_subject(dns_o: WebObservation): - return f"{dns_o.ip}@{dns_o.dns_engine}-{dns_o.dns_engine_resolver_address}" - - -def check_dns_bogon( - dns_observations: List[WebObservation], - dns_ground_truth: DNSGroundTruth, - outcome_fingerprint: DNSFingerprintOutcome, -) -> Optional[Outcome]: - for web_o in dns_observations: - outcome_meta = {"ip": web_o.dns_answer or "", **outcome_fingerprint.meta} - if web_o.ip_is_bogon: - outcome_meta["why"] = "answer is bogon" - down_score = 0.1 - blocked_score = 0.9 - # If we saw the same bogon IP inside of the trusted_answers, it means it - # it's always returning a bogon and hence this site is actually down - if web_o.dns_answer in dns_ground_truth.trusted_answers: - outcome_meta["why"] = "answer is bogon as expected" - down_score = 0.9 - blocked_score = 0.1 - - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - subject=get_outcome_subject(web_o), - label=outcome_fingerprint.label, - category="dns", - detail="inconsistent.bogon", - meta=outcome_meta, - ok_score=0.0, - down_score=down_score, - blocked_score=blocked_score, - ) - - -def check_tls_consistency( - dns_observations: List[WebObservation], - dns_ground_truth: DNSGroundTruth, - outcome_fingerprint: DNSFingerprintOutcome, -) -> Optional[Outcome]: - for web_o in dns_observations: - outcome_meta = outcome_fingerprint.meta.copy() - if ( - web_o.tls_is_certificate_valid == True - or web_o.dns_answer in dns_ground_truth.trusted_answers - ): - outcome_meta["why"] = "resolved IP in trusted answers" - # No blocking detected - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=1.0, - down_score=0.0, - blocked_score=0.0, - ) - - -def check_tls_inconsistency( - dns_observations: List[WebObservation], - outcome_fingerprint: DNSFingerprintOutcome, -) -> Optional[Outcome]: - # We do these in two separate loops, so that we first ensure that none of - # the answers we got were good and only then do we proceed to doing a TLS - # inconsistency check. - for web_o in dns_observations: - outcome_meta = outcome_fingerprint.meta.copy() - if web_o.tls_is_certificate_valid == False: - # TODO: we probably need to handle cases here where it might be the case - # that the CERT is bad because it's always serving a bad certificate. - # here is an example: https://explorer.ooni.org/measurement/20220930T235729Z_webconnectivity_AE_5384_n1_BLcO454Y5UULxZoq?input=https://www.government.ae/en%23%2F - outcome_meta["why"] = "tls certificate is bad" - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="inconsistent", - meta=outcome_meta, - ok_score=0.0, - # We give a little bit of weight to the down score, due to no ground - # truthing of TLS - down_score=0.3, - blocked_score=0.7, - ) - - -def check_wc_style_consistency( - dns_observations: List[WebObservation], - dns_ground_truth: DNSGroundTruth, - outcome_fingerprint: DNSFingerprintOutcome, -) -> Optional[Outcome]: - """ - Do a web_connectivity style DNS consistency check. - - If we are in this case, it means we weren't able to determine the - consistency of the DNS query using TLS. This is the case either - because the tested site is not in HTTPS and therefore we didn't - generate a TLS measurement for it or because the target IP isn't - listening on HTTPS (which is quite fishy). - In either case we should flag these with being somewhat likely to be - blocked. - """ - ground_truth_asns = set() - ground_truth_as_org_names = set() - for gt in dns_ground_truth.trusted_answers.values(): - assert gt.ip, f"did not find IP in ground truth {gt.ip}" - ground_truth_asns.add(gt.ip_asn) - ground_truth_as_org_names.add(gt.ip_as_org_name.lower()) - - contains_matching_asn_answer = False - contains_matching_cc_answer = False - system_answers = 0 - for web_o in dns_observations: - outcome_meta = outcome_fingerprint.meta.copy() - if web_o.dns_engine == "system": - system_answers += 1 - - if web_o.dns_answer_asn in ground_truth_asns: - outcome_meta["why"] = "answer in matches AS of trusted answers" - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=0.9, - down_score=0.0, - blocked_score=0.1, - ) - - if ( - web_o.dns_answer_as_org_name - and web_o.dns_answer_as_org_name.lower() in ground_truth_as_org_names - ): - outcome_meta["why"] = "answer in TLS ground truth as org name" - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=0.9, - down_score=0.0, - blocked_score=0.1, - ) - - if web_o.dns_answer in dns_ground_truth.other_ips: - outcome_meta["why"] = "answer in resolved IPs ground truth" - outcome_meta["other_ip_count"] = str( - len(dns_ground_truth.other_ips[web_o.dns_answer]) - ) - blocked_score = confidence_estimate( - len(dns_ground_truth.other_ips[web_o.dns_answer]), - clamping=0.9, - factor=0.8, - ) - ok_score = 1 - blocked_score - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=ok_score, - down_score=0.0, - blocked_score=blocked_score, - ) - - if web_o.dns_answer in dns_ground_truth.other_asns: - # We clamp this to some lower values and scale it by a smaller factor, - # since ASN consistency is less strong than direct IP match. - outcome_meta["why"] = "answer AS in ground truth" - outcome_meta["other_asn_count"] = str( - len(dns_ground_truth.other_asns[web_o.dns_answer]) - ) - blocked_score = confidence_estimate( - len(dns_ground_truth.other_asns[web_o.dns_answer]), - clamping=0.9, - factor=0.8, - ) - ok_score = 1 - blocked_score - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=ok_score, - down_score=0.0, - blocked_score=blocked_score, - ) - - if is_cloud_provider(asn=web_o.ip_asn, as_org_name=web_o.ip_as_org_name): - # Cloud providers are a common source of false positives. Let's just - # mark them as ok with a low confidence - outcome_meta["why"] = "answer is cloud service provider" - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=0.6, - down_score=0.0, - blocked_score=0.4, - ) - - if web_o.dns_answer_asn == web_o.probe_asn: - contains_matching_asn_answer = True - elif web_o.ip_as_cc == web_o.probe_cc: - contains_matching_cc_answer = True - - outcome_meta = {} - outcome_meta["why"] = "unable to determine consistency through ground truth" - outcome_meta["system_answers"] = str(system_answers) - blocked_score = 0.6 - outcome_detail = "inconsistent" - - # It's quite unlikely that a censor will return multiple answers - if system_answers > 1: - outcome_meta["why"] += ", but multiple system_answers" - blocked_score = 0.4 - outcome_detail = "ok" - - # It's more common to answer to DNS queries for blocking with IPs managed by - # the ISP (ex. to serve their blockpage). - # So we give this a bit higher confidence - if contains_matching_asn_answer and system_answers > 1: - blocked_score = 0.8 - outcome_meta["why"] = "answer matches probe_asn" - outcome_detail = "inconsistent" - - # It's common to do this also in the country, for example when the blockpage - # is centrally managed (ex. case in IT, ID) - elif contains_matching_cc_answer: - blocked_score = 0.7 - outcome_meta["why"] = "answer matches probe_cc" - outcome_detail = "inconsistent" - - # We haven't managed to figured out if the DNS resolution was a good one, so - # we are going to assume it's bad. - # TODO: Currently web_connectivity assumes that if the last request was HTTPS and it was successful, then the whole measurement was OK. - # see: https://github.com/ooni/probe-cli/blob/a0dc65641d7a31e116d9411ecf9e69ed1955e792/internal/engine/experiment/webconnectivity/summary.go#L98 - # This seems a little bit too strong. We probably ought to do this only if - # the redirect chain was a good one, because it will lead to false negatives - # in cases in which the redirect is triggered by the middlebox. - # Imagine a case like this: - # http://example.com/ -> 302 -> https://blockpage.org/ - # - # The certificate for blockpage.org can be valid, but it's not what we - # wanted. - return Outcome( - observation_id=dns_observations[0].observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject="all@all", - category="dns", - detail=outcome_detail, - meta=outcome_meta, - ok_score=1 - blocked_score, - down_score=0.0, - blocked_score=blocked_score, - ) - - -def compute_dns_consistency_outcomes( - dns_ground_truth: DNSGroundTruth, - dns_observations: List[WebObservation], - outcome_fingerprint: DNSFingerprintOutcome, -) -> List[Outcome]: - outcomes = [] - - for dns_observations in dns_observations_by_resolver(dns_observations).values(): - bogon_outcome = check_dns_bogon( - dns_observations=dns_observations, - dns_ground_truth=dns_ground_truth, - outcome_fingerprint=outcome_fingerprint, - ) - if bogon_outcome: - outcomes.append(bogon_outcome) - continue - - tls_consistency_outcome = check_tls_consistency( - dns_observations=dns_observations, - dns_ground_truth=dns_ground_truth, - outcome_fingerprint=outcome_fingerprint, - ) - if tls_consistency_outcome: - outcomes.append(tls_consistency_outcome) - continue - - wc_style_outcome = check_wc_style_consistency( - dns_observations=dns_observations, - dns_ground_truth=dns_ground_truth, - outcome_fingerprint=outcome_fingerprint, - ) - if wc_style_outcome: - outcomes.append(wc_style_outcome) - continue - - # TODO: If we add a ground truth to this, we could potentially do it - # before the WC style consistency check and it will probably be more - # robust. - tls_inconsistency_outcome = check_tls_inconsistency( - dns_observations=dns_observations, - outcome_fingerprint=outcome_fingerprint, - ) - if tls_inconsistency_outcome: - outcomes.append(tls_inconsistency_outcome) - - return outcomes - - -def make_dns_outcomes( - hostname: str, - dns_observations: List[WebObservation], - web_ground_truths: List[WebGroundTruth], - fingerprintdb: FingerprintDB, -) -> List[Outcome]: - outcomes = [] - dns_ground_truth = make_dns_ground_truth( - ground_truths=filter( - lambda gt: gt.hostname == hostname, - web_ground_truths, - ) - ) - outcome_fingerprint = match_dns_fingerprint( - dns_observations=dns_observations, fingerprintdb=fingerprintdb - ) - outcomes += compute_dns_failure_outcomes( - dns_ground_truth=dns_ground_truth, dns_observations=dns_observations - ) - outcomes += compute_dns_consistency_outcomes( - dns_ground_truth=dns_ground_truth, - dns_observations=dns_observations, - outcome_fingerprint=outcome_fingerprint, - ) - return outcomes - - -def make_tls_outcome( - web_o: WebObservation, web_ground_truths: List[WebGroundTruth] -) -> Outcome: - blocking_subject = web_o.hostname or "" - outcome_meta = {} - if web_o.tls_is_certificate_valid == True: - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail="ok", - meta=outcome_meta, - ok_score=1.0, - down_score=0.0, - blocked_score=0.0, - ) - - ground_truths = filter( - lambda gt: gt.http_request_url and gt.hostname == web_o.hostname, - web_ground_truths, - ) - failure_cc_asn = set() - ok_cc_asn = set() - ok_count = 0 - failure_count = 0 - for gt in ground_truths: - # We don't check for strict == True, since depending on the DB engine - # True could also be represented as 1 - if gt.http_success is None: - continue - - if gt.http_success: - if gt.is_trusted_vp: - ok_count += gt.count - else: - ok_cc_asn.add((gt.vp_cc, gt.vp_asn)) - else: - if gt.is_trusted_vp: - failure_count += gt.count - else: - failure_cc_asn.add((gt.vp_cc, gt.vp_asn, gt.count)) - - # Untrusted Vantage Points (i.e. not control measurements) only count - # once per probe_cc, probe_asn pair to avoid spammy probes poisoning our - # data - failure_count += len(failure_cc_asn) - ok_count += len(ok_cc_asn) - outcome_meta["ok_count"] = str(ok_count) - outcome_meta["failure_count"] = str(failure_count) - - # FIXME: we currently use the HTTP control as a proxy to establish ground truth for TLS - if web_o.tls_is_certificate_valid == False and failure_count == 0: - outcome_meta["why"] = "invalid TLS certificate" - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail="mitm", - meta=outcome_meta, - ok_score=0.0, - down_score=0.2, - blocked_score=0.8, - ) - - elif web_o.tls_failure and failure_count == 0: - # We only consider it to be a TLS level verdict if we haven't seen any - # blocks in TCP - blocking_detail = f"{web_o.tls_failure}" - blocked_score = 0.5 - - if web_o.tls_handshake_read_count == 0 and web_o.tls_handshake_write_count == 1: - # This means we just wrote the TLS ClientHello, let's give it a bit - # more confidence in it being a block - blocked_score = 0.7 - - if web_o.tls_failure in ("connection_closed", "connection_reset"): - blocked_score += 0.15 - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail=blocking_detail, - meta=outcome_meta, - ok_score=0.0, - down_score=1 - blocked_score, - blocked_score=blocked_score, - ) - - elif web_o.tls_failure or web_o.tls_is_certificate_valid == False: - outcome_detail = f"{web_o.tls_failure}" - scores = ok_vs_nok_score( - ok_count=ok_count, nok_count=failure_count, blocking_factor=0.7 - ) - if web_o.tls_is_certificate_valid == False: - outcome_detail = "bad_cert" - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail=outcome_detail, - meta=outcome_meta, - ok_score=0.0, - down_score=scores.down, - blocked_score=scores.blocked, - ) - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail="ok", - meta=outcome_meta, - ok_score=0.9, - down_score=0.0, - blocked_score=0.1, - ) - - -def make_http_outcome( - web_o: WebObservation, - web_ground_truths: List[WebGroundTruth], - body_db: BodyDB, - fingerprintdb: FingerprintDB, -) -> Outcome: - assert web_o.http_request_url - request_is_encrypted = web_o.http_request_url.startswith("https://") - - blocking_subject = web_o.http_request_url - outcome_label = "" - outcome_meta = {} - outcome_category = "http" - if request_is_encrypted: - outcome_category = "https" - - ground_truths = filter( - lambda gt: gt.http_request_url == web_o.http_request_url, web_ground_truths - ) - failure_cc_asn = set() - ok_cc_asn = set() - ok_count = 0 - failure_count = 0 - response_body_len_count = defaultdict(int) - for gt in ground_truths: - # We don't check for strict == True, since depending on the DB engine - # True could also be represented as 1 - if gt.http_success is None: - continue - - # TODO: figure out why some are negative - if gt.http_response_body_length and gt.http_response_body_length > 0: - response_body_len_count[gt.http_response_body_length] += gt.count - - if gt.http_success: - if gt.is_trusted_vp: - ok_count += gt.count - else: - ok_cc_asn.add((gt.vp_cc, gt.vp_asn)) - else: - if gt.is_trusted_vp: - failure_count += gt.count - else: - failure_cc_asn.add((gt.vp_cc, gt.vp_asn, gt.count)) - - response_body_length = 0 - if len(response_body_len_count) > 0: - response_body_length = max(response_body_len_count.items(), key=lambda x: x[1])[ - 0 - ] - - # Untrusted Vantage Points (i.e. not control measurements) only count - # once per probe_cc, probe_asn pair to avoid spammy probes poisoning our - # data - failure_count += len(failure_cc_asn) - ok_count += len(ok_cc_asn) - outcome_meta["ok_count"] = str(ok_count) - outcome_meta["failure_count"] = str(failure_count) - - if web_o.http_failure: - scores = ok_vs_nok_score(ok_count=ok_count, nok_count=failure_count) - - outcome_detail = f"{web_o.http_failure}" - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category=outcome_category, - detail=outcome_detail, - meta=outcome_meta, - ok_score=scores.ok, - down_score=scores.down, - blocked_score=scores.blocked, - ) - - # TODO: do we care to do something about empty bodies? - # They are commonly a source of blockpages - if web_o.http_response_body_sha1: - matched_fp = body_db.lookup(web_o.http_response_body_sha1) - if len(matched_fp) > 0: - blocked_score = 0.8 - blocking_scope = BlockingScope.UNKNOWN - if request_is_encrypted: - # Likely some form of server-side blocking - blocking_scope = BlockingScope.SERVER_SIDE_BLOCK - blocked_score = 0.5 - - for fp_name in matched_fp: - fp = fingerprintdb.get_fp(fp_name) - if fp.scope: - blocking_scope = fp_to_scope(fp.scope) - outcome_meta["fingerprint"] = fp.name - outcome_meta["why"] = "matched fingerprint" - if ( - fp.expected_countries - and web_o.probe_cc in fp.expected_countries - ): - outcome_label = "blocked" - blocked_score = 1.0 - break - - return Outcome( - observation_id=web_o.observation_id, - scope=blocking_scope, - label=outcome_label, - subject=blocking_subject, - category=outcome_category, - detail="blockpage", - meta=outcome_meta, - ok_score=1 - blocked_score, - down_score=0.0, - blocked_score=blocked_score, - ) - - if not request_is_encrypted: - # TODO: We should probably do mining of the body dumps to figure out if there - # are blockpages in there instead of relying on a per-measurement heuristic - - # TODO: we don't have this - # if web_o.http_response_body_sha1 == http_ctrl.response_body_sha1: - # return ok_be - - if ( - web_o.http_response_body_length - and response_body_length - # We need to ignore redirects as we should only be doing matching of the response body on the last element in the chain - and ( - not web_o.http_response_header_location - and not math.floor(web_o.http_response_status_code or 0 / 100) == 3 - ) - and ( - (web_o.http_response_body_length + 1.0) / (response_body_length + 1.0) - < 0.7 - ) - ): - outcome_meta["response_body_length"] = str(web_o.http_response_body_length) - outcome_meta["ctrl_response_body_length"] = str(response_body_length) - blocking_detail = f"http.body-diff" - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category=outcome_category, - detail=blocking_detail, - meta=outcome_meta, - ok_score=0.3, - down_score=0.0, - blocked_score=0.7, - ) - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category=outcome_category, - detail="ok", - meta=outcome_meta, - ok_score=0.8, - down_score=0.0, - blocked_score=0.2, - ) - - -def is_blocked_or_down(o: Optional[Outcome]) -> bool: - if not o: - return False - if o.ok_score > 0.5: - return False - return True - - -def is_ip_blocked(dns_outcomes: List[Outcome], ip: Optional[str]) -> bool: - if not ip: - return False - - for outcome in dns_outcomes: - if outcome.subject.startswith(ip): - return is_blocked_or_down(outcome) - return False - - -def make_website_experiment_result( - web_observations: List[WebObservation], - web_ground_truths: List[WebGroundTruth], - body_db: BodyDB, - fingerprintdb: FingerprintDB, -) -> Generator[ExperimentResult, None, None]: - outcomes: List[Outcome] = [] - domain_name = web_observations[0].hostname - - # We need to process HTTP observations after all the others, because we - # arent' guaranteed to have on the same row all connected observations. - # If we don't do that, we will not exclude from our blocking calculations - # cases in which something has already been counted as blocked through other - # means - http_obs = [] - is_tcp_blocked = False - is_tls_blocked = False - - dns_observations_by_hostname = defaultdict(list) - dns_outcomes_by_hostname = {} - other_observations = [] - for web_o in web_observations: - if web_o.dns_query_type: - assert web_o.hostname is not None - dns_observations_by_hostname[web_o.hostname].append(web_o) - else: - other_observations.append(web_o) - - for hostname, dns_observations in dns_observations_by_hostname.items(): - dns_outcomes = make_dns_outcomes( - hostname=hostname, - dns_observations=dns_observations, - web_ground_truths=web_ground_truths, - fingerprintdb=fingerprintdb, - ) - outcomes += dns_outcomes - dns_outcomes_by_hostname[hostname] = dns_outcomes - - for web_o in web_observations: - # FIXME: for the moment we just ignore all IPv6 results, because they are too noisy - if web_o.ip: - try: - ipaddr = ipaddress.ip_address(web_o.ip) - if isinstance(ipaddr, ipaddress.IPv6Address): - continue - except: - log.error(f"Invalid IP in {web_o.ip}") - - request_is_encrypted = ( - web_o.http_request_url and web_o.http_request_url.startswith("https://") - ) - dns_outcomes = dns_outcomes_by_hostname.get(web_o.hostname, []) - - tcp_outcome = None - if not is_ip_blocked(dns_outcomes, web_o.ip) and web_o.tcp_success is not None: - tcp_outcome = make_tcp_outcome( - web_o=web_o, web_ground_truths=web_ground_truths - ) - if is_blocked_or_down(tcp_outcome): - is_tcp_blocked = True - outcomes.append(tcp_outcome) - - tls_outcome = None - # We ignore things that are already blocked by DNS or TCP - if ( - not is_ip_blocked(dns_outcomes, web_o.ip) - and not is_blocked_or_down(tcp_outcome) - and (web_o.tls_failure or web_o.tls_cipher_suite is not None) - ): - tls_outcome = make_tls_outcome( - web_o=web_o, web_ground_truths=web_ground_truths - ) - outcomes.append(tls_outcome) - if is_blocked_or_down(tls_outcome): - is_tls_blocked = True - - # When we don't know the IP of the http_request, we add them to a - # separate http_obs list. - # This is done so we can ignore the HTTP outcome if ANY of the DNS - # outcomes are an indication of blocking, since we can't do a - # consistency check on a specific DNS answer. - if web_o.http_request_url and not web_o.ip: - http_obs.append(web_o) - continue - - # For HTTP requests we ignore cases in which we detected the blocking - # already to be happening via DNS or TCP. - if ( - web_o.http_request_url - and ( - not is_ip_blocked(dns_outcomes, web_o.ip) - and not is_blocked_or_down(tcp_outcome) - # For HTTPS requests we ignore cases in which we detected the blocking via DNS, TCP or TLS - ) - and ( - request_is_encrypted == False - or (request_is_encrypted and not is_blocked_or_down(tls_outcome)) - ) - ): - http_outcome = make_http_outcome( - web_o=web_o, - web_ground_truths=web_ground_truths, - body_db=body_db, - fingerprintdb=fingerprintdb, - ) - outcomes.append(http_outcome) - - # Here we examine all of the HTTP Observations that didn't record an IP address - for web_o in http_obs: - is_dns_blocked = any( - [ - is_blocked_or_down(o) - for o in dns_outcomes_by_hostname.get(web_o.hostname, []) - ] - ) - request_is_encrypted = ( - web_o.http_request_url and web_o.http_request_url.startswith("https://") - ) - if ( - web_o.http_request_url - and ( - not is_dns_blocked - and not is_tcp_blocked - # For HTTPS requests we ignore cases in which we detected the blocking via DNS, TCP or TLS - ) - and ( - request_is_encrypted == False - or (request_is_encrypted and not is_tls_blocked) - ) - ): - http_outcome = make_http_outcome( - web_o=web_o, - web_ground_truths=web_ground_truths, - body_db=body_db, - fingerprintdb=fingerprintdb, - ) - outcomes.append(http_outcome) - - max_blocked_score = min(map(lambda o: o.blocked_score, outcomes)) - # TODO: we should probably be computing the anomaly and confirmed summary - # flags directly as part of aggregation. - confirmed = False - for o in outcomes: - if o.label == "blocked": - confirmed = True - anomaly = False - if max_blocked_score > 0.5: - anomaly = True - - return iter_experiment_results( - obs=web_observations[0], - experiment_group="websites", - domain_name=domain_name or "", - target_name=domain_name or "", - anomaly=anomaly, - confirmed=confirmed, - outcomes=outcomes, - ) diff --git a/oonidata/cli/command.py b/oonidata/cli/command.py index 0959fdfd..fe3f62a3 100644 --- a/oonidata/cli/command.py +++ b/oonidata/cli/command.py @@ -16,7 +16,6 @@ from oonidata.db.create_tables import create_queries, list_all_table_diffs from oonidata.netinfo import NetinfoDB from oonidata.workers import ( - start_experiment_result_maker, start_fingerprint_hunter, start_observation_maker, start_ground_truth_builder, @@ -244,18 +243,7 @@ def mker( for query, table_name in create_queries: click.echo(f"Running create query for {table_name}") db.execute(query) - - start_experiment_result_maker( - probe_cc=probe_cc, - test_name=test_name, - start_day=start_day, - end_day=end_day, - clickhouse=clickhouse, - data_dir=data_dir, - parallelism=parallelism, - fast_fail=fast_fail, - rebuild_ground_truths=rebuild_ground_truths, - ) + raise Exception("Run this via the analysis command") @cli.command() diff --git a/oonidata/workers/__init__.py b/oonidata/workers/__init__.py index 2a0a3137..f22d4557 100644 --- a/oonidata/workers/__init__.py +++ b/oonidata/workers/__init__.py @@ -1,4 +1,3 @@ -from .experiment_results import start_experiment_result_maker from .fingerprint_hunter import start_fingerprint_hunter from .observations import start_observation_maker from .ground_truths import start_ground_truth_builder diff --git a/oonidata/workers/experiment_results.py b/oonidata/workers/experiment_results.py deleted file mode 100644 index 4f60f9cc..00000000 --- a/oonidata/workers/experiment_results.py +++ /dev/null @@ -1,258 +0,0 @@ -import logging -import multiprocessing as mp -import pathlib -import queue -from datetime import date, datetime -from multiprocessing.synchronize import Event as EventClass -from threading import Thread -from typing import List - -import statsd - -from oonidata.analysis.control import BodyDB, WebGroundTruthDB -from oonidata.analysis.datasources import iter_web_observations -from oonidata.analysis.websites import make_website_experiment_result -from oonidata.dataclient import date_interval -from oonidata.datautils import PerfTimer -from oonidata.db.connections import ClickhouseConnection -from oonidata.fingerprintdb import FingerprintDB -from oonidata.models.experiment_result import ExperimentResult -from oonidata.netinfo import NetinfoDB -from oonidata.workers.ground_truths import maybe_build_web_ground_truth - -from .common import ( - get_prev_range, - make_db_rows, - maybe_delete_prev_range, - run_progress_thread, -) - -log = logging.getLogger("oonidata.processing") - - -def run_experiment_results( - day: date, - probe_cc: List[str], - fingerprintdb: FingerprintDB, - data_dir: pathlib.Path, - body_db: BodyDB, - db_writer: ClickhouseConnection, - clickhouse: str, -): - statsd_client = statsd.StatsClient("localhost", 8125) - - column_names = [f for f in ExperimentResult._fields] - db_lookup = ClickhouseConnection(clickhouse) - - prev_range = get_prev_range( - db=db_lookup, - table_name=ExperimentResult.__table_name__, - timestamp=datetime.combine(day, datetime.min.time()), - test_name=[], - probe_cc=probe_cc, - ) - - log.info(f"loading ground truth DB for {day}") - t = PerfTimer() - ground_truth_db_path = ( - data_dir / "ground_truths" / f"web-{day.strftime('%Y-%m-%d')}.sqlite3" - ) - web_ground_truth_db = WebGroundTruthDB() - web_ground_truth_db.build_from_existing(str(ground_truth_db_path.absolute())) - statsd_client.timing("wgt_er_all.timed", t.ms) - log.info(f"loaded ground truth DB for {day} in {t.pretty}") - - idx = 0 - for web_obs in iter_web_observations( - db_lookup, measurement_day=day, probe_cc=probe_cc, test_name="web_connectivity" - ): - try: - t_er_gen = PerfTimer() - t = PerfTimer() - relevant_gts = web_ground_truth_db.lookup_by_web_obs(web_obs=web_obs) - except: - log.error( - f"failed to lookup relevant_gts for {web_obs[0].measurement_uid}", - exc_info=True, - ) - continue - - try: - if statsd_client: - statsd_client.timing("wgt_er_reduced.timed", t.ms) - experiment_results = list( - make_website_experiment_result( - web_observations=web_obs, - body_db=body_db, - web_ground_truths=relevant_gts, - fingerprintdb=fingerprintdb, - ) - ) - idx += 1 - table_name, rows = make_db_rows( - dc_list=experiment_results, column_names=column_names - ) - if idx % 100 == 0: - statsd_client.incr("make_website_er.er_count", count=100) - statsd_client.gauge("make_website_er.er_gauge", 100, delta=True) - idx = 0 - - statsd_client.timing("make_website_er.timing", t_er_gen.ms) - - with statsd_client.timer("db_write_rows.timing"): - db_writer.write_rows( - table_name=table_name, - rows=rows, - column_names=column_names, - ) - yield experiment_results - except: - web_obs_ids = ",".join(map(lambda wo: wo.observation_id, web_obs)) - log.error(f"failed to generate er for {web_obs_ids}", exc_info=True) - - maybe_delete_prev_range( - db=db_lookup, prev_range=prev_range, table_name=ExperimentResult.__table_name__ - ) - - -class ExperimentResultMakerWorker(mp.Process): - def __init__( - self, - day_queue: mp.JoinableQueue, - progress_queue: mp.Queue, - shutdown_event: EventClass, - data_dir: pathlib.Path, - probe_cc: List[str], - test_name: List[str], - clickhouse: str, - fast_fail: bool, - log_level: int = logging.INFO, - ): - super().__init__(daemon=True) - self.day_queue = day_queue - self.progress_queue = progress_queue - self.probe_cc = probe_cc - self.test_name = test_name - self.clickhouse = clickhouse - self.fast_fail = fast_fail - self.data_dir = data_dir - - self.shutdown_event = shutdown_event - log.setLevel(log_level) - - def run(self): - - db_writer = ClickhouseConnection(self.clickhouse, row_buffer_size=10_000) - fingerprintdb = FingerprintDB(datadir=self.data_dir, download=False) - - body_db = BodyDB(db=ClickhouseConnection(self.clickhouse)) - - while not self.shutdown_event.is_set(): - try: - day = self.day_queue.get(block=True, timeout=0.1) - except queue.Empty: - continue - - log.info(f"generating experiment results from {day}") - try: - for idx, _ in enumerate( - run_experiment_results( - day=day, - probe_cc=self.probe_cc, - fingerprintdb=fingerprintdb, - data_dir=self.data_dir, - body_db=body_db, - db_writer=db_writer, - clickhouse=self.clickhouse, - ) - ): - if idx % 100 == 0: - self.progress_queue.put(100) - except Exception: - log.error(f"failed to process {day}", exc_info=True) - - finally: - log.info(f"finished processing day {day}") - self.day_queue.task_done() - - log.info("process is done") - try: - db_writer.close() - except: - log.error("failed to flush database", exc_info=True) - - -def start_experiment_result_maker( - probe_cc: List[str], - test_name: List[str], - start_day: date, - end_day: date, - data_dir: pathlib.Path, - clickhouse: str, - parallelism: int, - fast_fail: bool, - rebuild_ground_truths: bool, - log_level: int = logging.INFO, -): - netinfodb = NetinfoDB(datadir=data_dir, download=False) - - shutdown_event = mp.Event() - worker_shutdown_event = mp.Event() - - progress_queue = mp.JoinableQueue() - - progress_thread = Thread( - target=run_progress_thread, args=(progress_queue, shutdown_event) - ) - progress_thread.start() - - workers = [] - day_queue = mp.JoinableQueue() - for _ in range(parallelism): - worker = ExperimentResultMakerWorker( - day_queue=day_queue, - progress_queue=progress_queue, - shutdown_event=worker_shutdown_event, - probe_cc=probe_cc, - test_name=test_name, - data_dir=data_dir, - clickhouse=clickhouse, - fast_fail=fast_fail, - log_level=log_level, - ) - worker.start() - log.info(f"started worker {worker.pid}") - workers.append(worker) - - db_lookup = ClickhouseConnection(clickhouse) - - for day in date_interval(start_day, end_day): - maybe_build_web_ground_truth( - db=db_lookup, - netinfodb=netinfodb, - day=day, - data_dir=data_dir, - rebuild_ground_truths=rebuild_ground_truths, - ) - day_queue.put(day) - - log.info("waiting for the day queue to finish") - day_queue.join() - - log.info("sending shutdown signal to workers") - worker_shutdown_event.set() - - log.info("waiting for progress queue to finish") - progress_queue.join() - - log.info("waiting for experiment workers to finish running") - for idx, p in enumerate(workers): - log.info(f"waiting worker {idx} to join") - p.join() - log.info(f"waiting worker {idx} to close") - p.close() - - log.info("sending shutdown event progress thread") - shutdown_event.set() - log.info("waiting on progress queue") - progress_thread.join() diff --git a/tests/test_analysis.py b/tests/test_analysis.py index 73ea8ed4..9cea29f7 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -3,6 +3,8 @@ import random from typing import List from unittest.mock import MagicMock + +import pytest from oonidata.analysis.datasources import load_measurement from oonidata.analysis.web_analysis import make_web_analysis from oonidata.datautils import validate_cert_chain @@ -13,9 +15,6 @@ WebGroundTruthDB, ) from oonidata.analysis.signal import make_signal_experiment_result -from oonidata.analysis.websites import ( - make_website_experiment_result, -) from oonidata.models.nettests.signal import Signal from oonidata.models.nettests.web_connectivity import WebConnectivity from oonidata.models.observations import WebObservation, print_nice, print_nice_vertical @@ -120,6 +119,7 @@ def test_signal(fingerprintdb, netinfodb, measurements): def test_website_dns_blocking_event(fingerprintdb, netinfodb, measurements): + pytest.skip("TODO(arturo): implement this with the new analysis") msmt_path = measurements[ "20220627030703.592775_IR_webconnectivity_80e199b3c572f8d3" ] @@ -200,17 +200,11 @@ def make_experiment_result_from_wc_ctrl(msmt_path, fingerprintdb, netinfodb): body_db.lookup = MagicMock() body_db.lookup.return_value = [] - return make_website_experiment_result( - web_observations=web_observations, - web_ground_truths=web_ground_truth_db.lookup_by_web_obs( - web_obs=web_observations - ), - body_db=body_db, - fingerprintdb=fingerprintdb, - ) + return [] def test_website_experiment_result_blocked(fingerprintdb, netinfodb, measurements): + pytest.skip("TODO(arturo): implement this with the new analysis") experiment_results = list( make_experiment_result_from_wc_ctrl( measurements["20220627030703.592775_IR_webconnectivity_80e199b3c572f8d3"], @@ -223,6 +217,7 @@ def test_website_experiment_result_blocked(fingerprintdb, netinfodb, measurement def test_website_experiment_result_ok(fingerprintdb, netinfodb, measurements): + pytest.skip("TODO(arturo): implement this with the new analysis") experiment_results = list( make_experiment_result_from_wc_ctrl( measurements["20220608132401.787399_AM_webconnectivity_2285fc373f62729e"], diff --git a/tests/test_scoring.py b/tests/test_scoring.py index d11d3927..a55691cb 100644 --- a/tests/test_scoring.py +++ b/tests/test_scoring.py @@ -1,15 +1,17 @@ from unittest.mock import MagicMock + +import pytest from oonidata.analysis.control import ( WebGroundTruthDB, iter_ground_truths_from_web_control, ) from oonidata.analysis.datasources import load_measurement -from oonidata.analysis.websites import make_website_experiment_result from oonidata.models.experiment_result import print_nice_er from oonidata.transforms import measurement_to_observations def test_tcp_scoring(measurements, netinfodb, fingerprintdb): + pytest.skip("TODO(arturo): implement this with the new analysis") msmt = load_measurement( msmt_path=measurements[ "20221101055235.141387_RU_webconnectivity_046ce024dd76b564" From 0a0aeaa90cfb9f886b0a631ef380ddd83fc9d200 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 20 Nov 2023 12:24:27 +0100 Subject: [PATCH 3/3] Improvements related to timestamp and timezones * Make utcnow() calls timezone aware * Implement workardound for clickhouse bug https://github.com/mymarilyn/clickhouse-driver/issues/388 * Implement more tests for range deletions * Refactoring of get_prev_range functions * Fix problem in experiment result generation --- oonidata/analysis/web_analysis.py | 4 +- .../analysis/website_experiment_results.py | 5 +- oonidata/cli/command.py | 13 +- oonidata/db/connections.py | 4 +- oonidata/models/experiment_result.py | 4 +- .../http_header_field_manipulation.py | 4 +- .../nettests/http_invalid_request_line.py | 4 +- .../nettests/measurement_transformer.py | 11 +- .../transforms/nettests/web_connectivity.py | 4 +- oonidata/workers/analysis.py | 36 ++-- oonidata/workers/common.py | 167 ++++++++++-------- oonidata/workers/observations.py | 8 +- poetry.lock | 17 +- pyproject.toml | 1 + tests/conftest.py | 21 ++- tests/test_workers.py | 19 +- 16 files changed, 203 insertions(+), 119 deletions(-) diff --git a/oonidata/analysis/web_analysis.py b/oonidata/analysis/web_analysis.py index 041977a0..3921ba65 100644 --- a/oonidata/analysis/web_analysis.py +++ b/oonidata/analysis/web_analysis.py @@ -1,7 +1,7 @@ from collections import defaultdict from dataclasses import dataclass import dataclasses -from datetime import datetime +from datetime import datetime, timezone import ipaddress from typing import ( Generator, @@ -674,7 +674,7 @@ def make_web_analysis( fingerprintdb=fingerprintdb, ) - created_at = datetime.utcnow() + created_at = datetime.now(timezone.utc).replace(tzinfo=None) website_analysis = WebAnalysis( measurement_uid=web_o.measurement_uid, observation_id=web_o.observation_id, diff --git a/oonidata/analysis/website_experiment_results.py b/oonidata/analysis/website_experiment_results.py index 3c3f8431..42047e68 100644 --- a/oonidata/analysis/website_experiment_results.py +++ b/oonidata/analysis/website_experiment_results.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone import logging from typing import Dict, Generator, List, NamedTuple, Optional, Tuple @@ -796,7 +796,6 @@ def make_website_experiment_results( Takes as input a list of web_analysis and outputs a list of ExperimentResults for the website. """ - experiment_result_id = "XXXX" observation_id_list = [] first_analysis = web_analysis[0] @@ -974,7 +973,7 @@ def get_agg_outcome(loni_list, category, agg_func) -> Optional[OutcomeStatus]: measurement_uid=measurement_uid, observation_id_list=observation_id_list, timeofday=timeofday, - created_at=datetime.utcnow(), + created_at=datetime.now(timezone.utc).replace(tzinfo=None), location_network_type=first_analysis.network_type, location_network_asn=first_analysis.probe_asn, location_network_cc=first_analysis.probe_cc, diff --git a/oonidata/cli/command.py b/oonidata/cli/command.py index fe3f62a3..60b664d2 100644 --- a/oonidata/cli/command.py +++ b/oonidata/cli/command.py @@ -7,6 +7,7 @@ from typing import List, Optional import click +from click_loglevel import LogLevel from oonidata import __version__ from oonidata.dataclient import ( @@ -70,10 +71,18 @@ def _parse_csv(ctx, param, s: Optional[str]) -> List[str]: @click.group() @click.option("--error-log-file", type=Path) +@click.option( + "-l", + "--log-level", + type=LogLevel(), + default="INFO", + help="Set logging level", + show_default=True, +) @click.version_option(__version__) -def cli(error_log_file: Path): +def cli(error_log_file: Path, log_level: int): log.addHandler(logging.StreamHandler(sys.stderr)) - log.setLevel(logging.INFO) + log.setLevel(log_level) if error_log_file: logging.basicConfig( filename=error_log_file, encoding="utf-8", level=logging.ERROR diff --git a/oonidata/db/connections.py b/oonidata/db/connections.py index 0e4daeaf..43f29eec 100644 --- a/oonidata/db/connections.py +++ b/oonidata/db/connections.py @@ -3,7 +3,7 @@ import time from collections import defaultdict, namedtuple -from datetime import datetime +from datetime import datetime, timezone from pprint import pformat import logging @@ -114,7 +114,7 @@ def __init__(self, output_dir): def write_rows(self, table_name, rows, column_names): if table_name not in self.open_writers: - ts = datetime.utcnow().strftime("%Y%m%dT%H%M%S") + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") fh = (self.output_dir / f"{table_name}-{ts}.csv").open("w") csv_writer = csv.writer(fh) csv_writer.writerow(column_names) diff --git a/oonidata/models/experiment_result.py b/oonidata/models/experiment_result.py index f97c8a20..939fe7b2 100644 --- a/oonidata/models/experiment_result.py +++ b/oonidata/models/experiment_result.py @@ -3,7 +3,7 @@ import logging from typing import Any, Dict, Generator, List, Optional, NamedTuple, Mapping, Tuple from enum import Enum -from datetime import datetime +from datetime import datetime, timezone from tabulate import tabulate from oonidata.datautils import maybe_elipse @@ -277,7 +277,7 @@ def iter_experiment_results( target_name: str, outcomes: List[Outcome], ) -> Generator[ExperimentResult, None, None]: - created_at = datetime.utcnow() + created_at = datetime.now(timezone.utc).replace(tzinfo=None) for idx, outcome in enumerate(outcomes): yield ExperimentResult( measurement_uid=obs.measurement_uid, diff --git a/oonidata/transforms/nettests/http_header_field_manipulation.py b/oonidata/transforms/nettests/http_header_field_manipulation.py index 5e1774b3..f447f50a 100644 --- a/oonidata/transforms/nettests/http_header_field_manipulation.py +++ b/oonidata/transforms/nettests/http_header_field_manipulation.py @@ -1,5 +1,5 @@ import dataclasses -from datetime import datetime +from datetime import datetime, timezone import orjson from typing import List, Tuple from oonidata.models.nettests import HTTPHeaderFieldManipulation @@ -14,7 +14,7 @@ def make_observations( mb_obs = HTTPMiddleboxObservation( hfm_success=True, observation_id=f"{msmt.measurement_uid}_0", - created_at=datetime.utcnow().replace(microsecond=0), + created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None), **dataclasses.asdict(self.measurement_meta), ) diff --git a/oonidata/transforms/nettests/http_invalid_request_line.py b/oonidata/transforms/nettests/http_invalid_request_line.py index 597d0863..b789a2e5 100644 --- a/oonidata/transforms/nettests/http_invalid_request_line.py +++ b/oonidata/transforms/nettests/http_invalid_request_line.py @@ -1,5 +1,5 @@ import dataclasses -from datetime import datetime +from datetime import datetime, timezone from typing import List, Tuple from oonidata.models.dataformats import maybe_binary_data_to_bytes from oonidata.models.nettests import HTTPInvalidRequestLine @@ -54,7 +54,7 @@ def make_observations( mb_obs = HTTPMiddleboxObservation( hirl_success=True, observation_id=f"{msmt.measurement_uid}_0", - created_at=datetime.utcnow().replace(microsecond=0), + created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None), **dataclasses.asdict(self.measurement_meta), ) if not msmt.test_keys.sent: diff --git a/oonidata/transforms/nettests/measurement_transformer.py b/oonidata/transforms/nettests/measurement_transformer.py index 8973225b..a282e9a0 100644 --- a/oonidata/transforms/nettests/measurement_transformer.py +++ b/oonidata/transforms/nettests/measurement_transformer.py @@ -5,7 +5,7 @@ import dataclasses import logging from urllib.parse import urlparse, urlsplit -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import ( Callable, Optional, @@ -134,11 +134,14 @@ ) -def normalize_failure(failure: Failure): +def normalize_failure(failure: Union[Failure, bool]) -> Failure: if not failure: # This will set it to None even when it's false return None + if failure is True: + return "true" + if failure.startswith("unknown_failure"): for substring, new_failure in unknown_failure_map: if substring in failure: @@ -445,7 +448,7 @@ def maybe_set_web_fields( ], web_obs: WebObservation, prefix: str, - field_names: Tuple[str], + field_names: Tuple[str, ...], ): # TODO: the fact we have to do this is an artifact of the original # one-observation per type model. Once we decide we don't want to go for @@ -899,7 +902,7 @@ def consume_web_observations( for idx, obs in enumerate(web_obs_list): obs.observation_id = f"{obs.measurement_uid}_{idx}" - obs.created_at = datetime.utcnow().replace(microsecond=0) + obs.created_at = datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None) return web_obs_list diff --git a/oonidata/transforms/nettests/web_connectivity.py b/oonidata/transforms/nettests/web_connectivity.py index 9b9a856e..be8ac276 100644 --- a/oonidata/transforms/nettests/web_connectivity.py +++ b/oonidata/transforms/nettests/web_connectivity.py @@ -1,5 +1,5 @@ from copy import deepcopy -from datetime import datetime +from datetime import datetime, timezone from typing import Dict, List, Tuple from urllib.parse import urlparse from oonidata.datautils import is_ip_bogon @@ -41,7 +41,7 @@ def make_web_control_observations( test_name=msmt.test_name, test_version=msmt.test_version, hostname=hostname, - created_at=datetime.utcnow(), + created_at=datetime.now(timezone.utc).replace(tzinfo=None), ) # Reference for new-style web_connectivity: # https://explorer.ooni.org/measurement/20220924T215758Z_webconnectivity_IR_206065_n1_2CRoWBNJkWc7VyAs?input=https%3A%2F%2Fdoh.dns.apple.com%2Fdns-query%3Fdns%3Dq80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB diff --git a/oonidata/workers/analysis.py b/oonidata/workers/analysis.py index f6bd9ad3..ffc3a953 100644 --- a/oonidata/workers/analysis.py +++ b/oonidata/workers/analysis.py @@ -74,14 +74,25 @@ def make_analysis_in_a_day( column_names_wa = [f.name for f in dataclasses.fields(WebAnalysis)] column_names_er = [f.name for f in dataclasses.fields(MeasurementExperimentResult)] - prev_range = get_prev_range( - db=db_lookup, - table_name=WebAnalysis.__table_name__, - timestamp=datetime.combine(day, datetime.min.time()), - test_name=[], - probe_cc=probe_cc, - timestamp_column="measurement_start_time", - ) + prev_range_list = [ + get_prev_range( + db=db_lookup, + table_name=WebAnalysis.__table_name__, + timestamp=datetime.combine(day, datetime.min.time()), + test_name=[], + probe_cc=probe_cc, + timestamp_column="measurement_start_time", + ), + get_prev_range( + db=db_lookup, + table_name=MeasurementExperimentResult.__table_name__, + timestamp=datetime.combine(day, datetime.min.time()), + test_name=[], + probe_cc=probe_cc, + timestamp_column="timeofday", + probe_cc_column="location_network_cc", + ), + ] log.info(f"loading ground truth DB for {day}") t = PerfTimer() @@ -118,6 +129,7 @@ def make_analysis_in_a_day( fingerprintdb=fingerprintdb, ) ) + log.info(f"generated {len(website_analysis)} website_analysis") if len(website_analysis) == 0: log.info(f"no website analysis for {probe_cc}, {test_name}") continue @@ -138,6 +150,7 @@ def make_analysis_in_a_day( with statsd_client.timer("oonidata.web_analysis.experiment_results.timing"): website_er = list(make_website_experiment_results(website_analysis)) + log.info(f"generated {len(website_er)} website_er") table_name, rows = make_db_rows( dc_list=website_er, column_names=column_names_er, @@ -154,9 +167,9 @@ def make_analysis_in_a_day( web_obs_ids = ",".join(map(lambda wo: wo.observation_id, web_obs)) log.error(f"failed to generate analysis for {web_obs_ids}", exc_info=True) - maybe_delete_prev_range( - db=db_lookup, prev_range=prev_range, table_name=WebAnalysis.__table_name__ - ) + for prev_range in prev_range_list: + maybe_delete_prev_range(db=db_lookup, prev_range=prev_range) + db_writer.close() return idx @@ -285,3 +298,4 @@ def start_analysis( obs_per_sec = round(total_obs_count / t_total.s) log.info(f"finished processing {start_day} - {end_day} speed: {obs_per_sec}obs/s)") log.info(f"{total_obs_count} msmts in {t_total.pretty}") + dask_client.shutdown() diff --git a/oonidata/workers/common.py b/oonidata/workers/common.py index 35187ed6..76d6671d 100644 --- a/oonidata/workers/common.py +++ b/oonidata/workers/common.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass import queue import logging import multiprocessing as mp @@ -18,7 +19,6 @@ from tqdm import tqdm from oonidata.dataclient import ( MeasurementListProgress, - ProgressStatus, ) from oonidata.db.connections import ( ClickhouseConnection, @@ -28,13 +28,74 @@ log = logging.getLogger("oonidata.processing") -class PrevRange(NamedTuple): +@dataclass +class BatchParameters: + test_name: List[str] + probe_cc: List[str] bucket_date: Optional[str] - start_timestamp: Optional[datetime] - end_timestamp: Optional[datetime] - max_created_at: Optional[datetime] - min_created_at: Optional[datetime] - where: str + timestamp: Optional[datetime] + + +@dataclass +class PrevRange: + table_name: str + batch_parameters: BatchParameters + timestamp_column: Optional[str] + probe_cc_column: Optional[str] + max_created_at: Optional[datetime] = None + min_created_at: Optional[datetime] = None + + def format_query(self): + start_timestamp = None + end_timestamp = None + where = None + where = "WHERE " + q_args: Dict[str, Any] = {} + + if self.batch_parameters.bucket_date: + where = "WHERE bucket_date = %(bucket_date)s" + q_args["bucket_date"] = self.batch_parameters.bucket_date + + elif self.batch_parameters.timestamp: + start_timestamp = self.batch_parameters.timestamp + end_timestamp = start_timestamp + timedelta(days=1) + q_args["start_timestamp"] = start_timestamp + q_args["end_timestamp"] = end_timestamp + where += f"{self.timestamp_column} >= %(start_timestamp)s AND {self.timestamp_column} < %(end_timestamp)s" + else: + raise Exception("Must specify either bucket_date or timestamp") + + if len(self.batch_parameters.test_name) > 0: + where += " AND test_name IN %(test_names)s" + q_args["test_names"] = self.batch_parameters.test_name + if len(self.batch_parameters.probe_cc) > 0: + where += f" AND {self.probe_cc_column} IN %(probe_ccs)s" + q_args["probe_ccs"] = self.batch_parameters.probe_cc + + return where, q_args + + +def maybe_delete_prev_range(db: ClickhouseConnection, prev_range: PrevRange): + """ + We perform a lightweight delete of all the rows which have been + regenerated, so we don't have any duplicates in the table + """ + if not prev_range.max_created_at or not prev_range.min_created_at: + return + + # Disabled due to: https://github.com/ClickHouse/ClickHouse/issues/40651 + # db.execute("SET allow_experimental_lightweight_delete = true;") + + where, q_args = prev_range.format_query() + + q_args["max_created_at"] = prev_range.max_created_at + q_args["min_created_at"] = prev_range.min_created_at + where = f"{where} AND created_at <= %(max_created_at)s AND created_at >= %(min_created_at)s" + log.info(f"runing {where} with {q_args}") + + q = f"ALTER TABLE {prev_range.table_name} DELETE " + final_query = q + where + return db.execute(final_query, q_args) def get_prev_range( @@ -45,6 +106,7 @@ def get_prev_range( bucket_date: Optional[str] = None, timestamp: Optional[datetime] = None, timestamp_column: str = "timestamp", + probe_cc_column: str = "probe_cc", ) -> PrevRange: """ We lookup the range of previously generated rows so we can drop @@ -68,40 +130,26 @@ def get_prev_range( bucket as reprocessing in progress and guard against running queries for it. """ - q = f"SELECT MAX(created_at), MIN(created_at) FROM {table_name} " + # A batch specified by test_name, probe_cc and one of either bucket_date or + # timestamp depending on it being observations or experiment results. assert ( timestamp or bucket_date ), "either timestamp or bucket_date should be provided" - start_timestamp = None - end_timestamp = None - where = None - where = "WHERE bucket_date = %(bucket_date)s" - q_args: Dict[str, Any] = {"bucket_date": bucket_date} - if timestamp: - start_timestamp = timestamp - end_timestamp = timestamp + timedelta(days=1) - q_args["start_timestamp"] = start_timestamp - q_args["end_timestamp"] = end_timestamp - where += f" AND {timestamp_column} >= %(start_timestamp)s AND {timestamp_column} < %(end_timestamp)s" - - if len(test_name) > 0: - test_name_list = [] - for tn in test_name: - # sanitize the test_names. It should not be a security issue since - # it's not user provided, but better safe than sorry - assert tn.replace("_", "").isalnum(), f"not alphabetic testname {tn}" - test_name_list.append(f"'{tn}'") - where += " AND test_name IN ({})".format(",".join(test_name_list)) - if len(probe_cc) > 0: - probe_cc_list = [] - for cc in probe_cc: - assert cc.replace("_", "").isalnum(), f"not alphabetic probe_cc" - probe_cc_list.append(f"'{cc}'") - where += " AND probe_cc IN ({})".format(",".join(probe_cc_list)) + prev_range = PrevRange( + table_name=table_name, + batch_parameters=BatchParameters( + test_name=test_name, + probe_cc=probe_cc, + timestamp=timestamp, + bucket_date=bucket_date, + ), + timestamp_column=timestamp_column, + probe_cc_column=probe_cc_column, + ) + q = f"SELECT MAX(created_at), MIN(created_at) FROM {prev_range.table_name} " + where, q_args = prev_range.format_query() final_query = q + where - print(final_query) - print(q_args) prev_obs_range = db.execute(final_query, q_args) assert isinstance(prev_obs_range, list) and len(prev_obs_range) == 1 max_created_at, min_created_at = prev_obs_range[0] @@ -109,17 +157,14 @@ def get_prev_range( # We pad it by 1 second to take into account the time resolution downgrade # happening when going from clickhouse to python data types if max_created_at and min_created_at: - max_created_at += timedelta(seconds=1) - min_created_at -= timedelta(seconds=1) - - return PrevRange( - max_created_at=max_created_at, - min_created_at=min_created_at, - start_timestamp=start_timestamp, - end_timestamp=end_timestamp, - where=where, - bucket_date=bucket_date, - ) + prev_range.max_created_at = (max_created_at + timedelta(seconds=1)).replace( + tzinfo=None + ) + prev_range.min_created_at = (min_created_at - timedelta(seconds=1)).replace( + tzinfo=None + ) + + return prev_range def optimize_all_tables(clickhouse): @@ -143,34 +188,6 @@ def get_obs_count_by_cc( return dict(cc_list) -def maybe_delete_prev_range( - db: ClickhouseConnection, table_name: str, prev_range: PrevRange -): - """ - We perform a lightweight delete of all the rows which have been - regenerated, so we don't have any duplicates in the table - """ - if not prev_range.max_created_at: - return - - # Disabled due to: https://github.com/ClickHouse/ClickHouse/issues/40651 - # db.execute("SET allow_experimental_lightweight_delete = true;") - q_args = { - "max_created_at": prev_range.max_created_at, - "min_created_at": prev_range.min_created_at, - } - if prev_range.bucket_date: - q_args["bucket_date"] = prev_range.bucket_date - elif prev_range.start_timestamp: - q_args["start_timestamp"] = prev_range.start_timestamp - q_args["end_timestamp"] = prev_range.end_timestamp - else: - raise Exception("either bucket_date or timestamps should be set") - - where = f"{prev_range.where} AND created_at <= %(max_created_at)s AND created_at >= %(min_created_at)s" - return db.execute(f"ALTER TABLE {table_name} DELETE " + where, q_args) - - def make_db_rows( dc_list: List, column_names: List[str], diff --git a/oonidata/workers/observations.py b/oonidata/workers/observations.py index c518bb2c..23bdf925 100644 --- a/oonidata/workers/observations.py +++ b/oonidata/workers/observations.py @@ -1,7 +1,7 @@ import pathlib import logging import dataclasses -from datetime import date, datetime, timedelta +from datetime import date, datetime, timedelta, timezone from typing import ( List, @@ -195,7 +195,7 @@ def make_observation_in_day( if len(prev_ranges) > 0: with ClickhouseConnection(clickhouse, row_buffer_size=10_000) as db: for table_name, pr in prev_ranges: - maybe_delete_prev_range(db=db, prev_range=pr, table_name=table_name) + maybe_delete_prev_range(db=db, prev_range=pr) return total_size, total_msmt_count @@ -249,7 +249,7 @@ def start_observation_maker( [ [ "oonidata.bucket_processed", - datetime.utcnow(), + datetime.now(timezone.utc).replace(tzinfo=None), int(t.ms), size, msmt_count, @@ -266,3 +266,5 @@ def start_observation_maker( log.info( f"{round(total_size/10**9, 2)}GB {total_msmt_count} msmts in {t_total.pretty}" ) + + dask_client.shutdown() diff --git a/poetry.lock b/poetry.lock index 417a416d..5b796d03 100644 --- a/poetry.lock +++ b/poetry.lock @@ -511,6 +511,21 @@ files = [ [package.dependencies] colorama = {version = "*", markers = "platform_system == \"Windows\""} +[[package]] +name = "click-loglevel" +version = "0.5.0" +description = "Log level parameter type for Click" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-loglevel-0.5.0.tar.gz", hash = "sha256:fcc98a136a96479b4768494df25017114ba1cb525dac0bed619209b3578fd4f3"}, + {file = "click_loglevel-0.5.0-py3-none-any.whl", hash = "sha256:897070fd4bf5b503edb5a1ecc0551448647901f4fac83a01c9f00aa28ad86d60"}, +] + +[package.dependencies] +click = ">=8.0" + [[package]] name = "clickhouse-driver" version = "0.2.5" @@ -3699,4 +3714,4 @@ research = ["jupyterlab"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<4" -content-hash = "3d04d49326808a5955bb9acf7559627feb6775311fd16f242006a7d604692442" +content-hash = "dd5586d165d8e6d333c44d10223f88126f09700de8509a9674adc28adbc584ee" diff --git a/pyproject.toml b/pyproject.toml index 2fab322b..6e6b5f0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ numpy = {version = "^1.23.5", optional = true, python = ">=3.8"} pandas = {version = "^2.0.0", optional = true, python = ">=3.8"} flask = {version = "^2.2.2", optional = true} jupyterlab = {version = "^4.0.7", optional = true} +click-loglevel = "^0.5.0" [tool.poetry.dev-dependencies] pytest = ">=7.2" diff --git a/tests/conftest.py b/tests/conftest.py index 4f132dba..0e6a91bc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -76,10 +76,10 @@ def cli_runner(): return CliRunner() -@pytest.fixture -def db(): - from oonidata.db.create_tables import create_queries +from oonidata.db.create_tables import create_queries + +def create_db_for_fixture(): try: with ClickhouseConnection(conn_url="clickhouse://localhost/") as db: db.execute("CREATE DATABASE IF NOT EXISTS testing_oonidata") @@ -91,8 +91,19 @@ def db(): db.execute("SELECT 1") except: pytest.skip("no database connection") - for query, table_name in create_queries: - db.execute(f"DROP TABLE IF EXISTS {table_name};") + for query, _ in create_queries: db.execute(query) + return db + +@pytest.fixture +def db_notruncate(): + return create_db_for_fixture() + + +@pytest.fixture +def db(): + db = create_db_for_fixture() + for _, table_name in create_queries: + db.execute(f"TRUNCATE TABLE {table_name};") return db diff --git a/tests/test_workers.py b/tests/test_workers.py index 4b97d58d..08c9cdb2 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -13,7 +13,11 @@ from oonidata.models.nettests.http_invalid_request_line import HTTPInvalidRequestLine from oonidata.models.observations import HTTPMiddleboxObservation from oonidata.workers.analysis import make_analysis_in_a_day, make_cc_batches, make_ctrl -from oonidata.workers.common import get_obs_count_by_cc, get_prev_range +from oonidata.workers.common import ( + get_obs_count_by_cc, + get_prev_range, + maybe_delete_prev_range, +) from oonidata.workers.observations import ( make_observations_for_file_entry_batch, write_observations_to_db, @@ -22,6 +26,7 @@ from oonidata.workers.fingerprint_hunter import fingerprint_hunter from oonidata.transforms import measurement_to_observations from oonidata.transforms.nettests.measurement_transformer import MeasurementTransformer +from tests.test_cli import wait_for_mutations def test_get_prev_range(db): @@ -40,7 +45,7 @@ def test_get_prev_range(db): bucket_date = "2000-01-01" test_name = "web_connectivity" probe_cc = "IT" - min_time = datetime(2000, 1, 1, 23, 42, 00, tzinfo=timezone.utc) + min_time = datetime(2000, 1, 1, 23, 42, 00) rows = [(min_time, bucket_date, test_name, probe_cc)] for i in range(200): rows.append((min_time + timedelta(seconds=i), bucket_date, test_name, probe_cc)) @@ -63,7 +68,7 @@ def test_get_prev_range(db): bucket_date = "2000-03-01" test_name = "web_connectivity" probe_cc = "IT" - min_time = datetime(2000, 1, 1, 23, 42, 00, tzinfo=timezone.utc) + min_time = datetime(2000, 1, 1, 23, 42, 00) rows: List[Tuple[datetime, str, str, str]] = [] for i in range(10): rows.append( @@ -87,6 +92,14 @@ def test_get_prev_range(db): assert prev_range.min_created_at and prev_range.max_created_at assert prev_range.min_created_at == (min_time - timedelta(seconds=1)) assert prev_range.max_created_at == (rows[-1][0] + timedelta(seconds=1)) + + maybe_delete_prev_range( + db=db, + prev_range=prev_range, + ) + wait_for_mutations(db, "test_range") + res = db.execute("SELECT COUNT() FROM test_range") + assert res[0][0] == 10 db.execute("DROP TABLE test_range")