diff --git a/oonidata/analysis/web_analysis.py b/oonidata/analysis/web_analysis.py index c7a8b253..3921ba65 100644 --- a/oonidata/analysis/web_analysis.py +++ b/oonidata/analysis/web_analysis.py @@ -1,7 +1,7 @@ from collections import defaultdict from dataclasses import dataclass import dataclasses -from datetime import datetime +from datetime import datetime, timezone import ipaddress from typing import ( Generator, @@ -83,6 +83,7 @@ def encode_address(ip: str, port: Optional[int]) -> str: class TCPAnalysis: address: str success: bool + failure: Optional[str] ground_truth_failure_count: Optional[int] ground_truth_failure_asn_cc_count: Optional[int] @@ -105,6 +106,7 @@ def make_tcp_analysis( return TCPAnalysis( address=blocking_subject, success=True, + failure=None, ground_truth_failure_asn_cc_count=None, ground_truth_failure_count=None, ground_truth_ok_asn_cc_count=None, @@ -154,6 +156,7 @@ def make_tcp_analysis( return TCPAnalysis( address=blocking_subject, success=False, + failure=web_o.tcp_failure, ground_truth_failure_asn_cc_count=tcp_ground_truth_failure_asn_cc_count, ground_truth_failure_count=tcp_ground_truth_failure_count, ground_truth_ok_asn_cc_count=tcp_ground_truth_ok_asn_cc_count, @@ -261,6 +264,7 @@ class DNSConsistencyResults: is_answer_bogon: bool = False answer_fp_name: str = "" + answer_fp_scope: str = "" is_answer_fp_match: bool = False is_answer_fp_country_consistent: bool = False is_answer_fp_false_positive: bool = False @@ -318,6 +322,7 @@ def check_dns_consistency( # XXX in the event of multiple matches, we are overriding it with # the last value. It's probably OK for now. consistency_results.answer_fp_name = fp.name + consistency_results.answer_fp_scope = fp.scope or "" if not web_o.dns_engine or web_o.dns_engine in SYSTEM_RESOLVERS: # TODO: do the same thing for the non-system resolver @@ -460,18 +465,17 @@ def make_tls_analysis( handshake_time=web_o.tls_handshake_time, ) ground_truths = filter( - lambda gt: gt.http_request_url and gt.hostname == web_o.hostname, - web_ground_truths, + lambda gt: gt.ip == web_o.ip and gt.port == web_o.port, web_ground_truths ) failure_cc_asn = set() ok_cc_asn = set() for gt in ground_truths: # We don't check for strict == True, since depending on the DB engine # True could also be represented as 1 - if gt.http_success is None: + if gt.tls_success is None: continue - if gt.http_success: + if gt.tls_success: if gt.is_trusted_vp: tls_analysis.ground_truth_trusted_ok_count += gt.count else: @@ -510,6 +514,7 @@ class HTTPAnalysis: ground_truth_body_length: int = 0 fp_name: str = "" + fp_scope: str = "" is_http_fp_match: bool = False is_http_fp_country_consistent: bool = False is_http_fp_false_positive: bool = False @@ -589,6 +594,7 @@ def make_http_analysis( http_analysis.is_http_fp_country_consistent = True if fp.name: http_analysis.fp_name = fp.name + http_analysis.fp_scope = fp.scope if web_o.http_response_body_length: http_analysis.response_body_length = web_o.http_response_body_length @@ -638,7 +644,7 @@ def make_web_analysis( if web_o.ip: try: ipaddr = ipaddress.ip_address(web_o.ip) - # FIXME: for the moment we just ignore all IPv6 results, because they are too noisy + # TODO(arturo): for the moment we just ignore all IPv6 results, because they are too noisy if isinstance(ipaddr, ipaddress.IPv6Address): continue address = encode_address(web_o.ip, web_o.port) @@ -668,7 +674,7 @@ def make_web_analysis( fingerprintdb=fingerprintdb, ) - created_at = datetime.utcnow() + created_at = datetime.now(timezone.utc).replace(tzinfo=None) website_analysis = WebAnalysis( measurement_uid=web_o.measurement_uid, observation_id=web_o.observation_id, @@ -744,6 +750,9 @@ def make_web_analysis( website_analysis.dns_consistency_system_answer_fp_name = ( dns_analysis.consistency_system.answer_fp_name ) + website_analysis.dns_consistency_system_answer_fp_scope = ( + dns_analysis.consistency_system.answer_fp_scope + ) website_analysis.dns_consistency_system_is_answer_fp_match = ( dns_analysis.consistency_system.is_answer_fp_match ) @@ -835,6 +844,9 @@ def make_web_analysis( website_analysis.dns_consistency_other_answer_fp_name = ( dns_analysis.consistency_other.answer_fp_name ) + website_analysis.dns_consistency_other_answer_fp_scope = ( + dns_analysis.consistency_other.answer_fp_scope + ) website_analysis.dns_consistency_other_is_answer_fp_match = ( dns_analysis.consistency_other.is_answer_fp_match ) @@ -899,6 +911,7 @@ def make_web_analysis( if tcp_analysis: website_analysis.tcp_address = tcp_analysis.address website_analysis.tcp_success = tcp_analysis.success + website_analysis.tcp_failure = tcp_analysis.failure website_analysis.tcp_ground_truth_failure_count = ( tcp_analysis.ground_truth_failure_count ) @@ -954,6 +967,7 @@ def make_web_analysis( http_analysis.ground_truth_body_length ) website_analysis.http_fp_name = http_analysis.fp_name + website_analysis.http_fp_scope = http_analysis.fp_scope website_analysis.http_is_http_fp_match = http_analysis.is_http_fp_match website_analysis.http_is_http_fp_country_consistent = ( http_analysis.is_http_fp_country_consistent diff --git a/oonidata/analysis/website_experiment_results.py b/oonidata/analysis/website_experiment_results.py new file mode 100644 index 00000000..42047e68 --- /dev/null +++ b/oonidata/analysis/website_experiment_results.py @@ -0,0 +1,1008 @@ +from dataclasses import dataclass +from datetime import datetime, timezone +import logging +from typing import Dict, Generator, List, NamedTuple, Optional, Tuple + +from oonidata.models.analysis import WebAnalysis +from oonidata.models.experiment_result import MeasurementExperimentResult + +log = logging.getLogger("oonidata.analysis") + + +def map_analysis_to_target_name(analysis): + # Poormans mapping to target name + # TODO(arturo) we eventually want to look these up in some sort of database that groups together related domains + return analysis.target_domain_name.lstrip("www.") + + +NXDOMAIN_FAILURES = ["android_dns_cache_no_data", "dns_nxdomain_error"] + + +@dataclass +class OutcomeStatus: + key: str + value: float + scope: Optional[str] = None + + +@dataclass +class OutcomeSpace: + dns: Optional[OutcomeStatus] = None + tcp: Optional[OutcomeStatus] = None + tls: Optional[OutcomeStatus] = None + http: Optional[OutcomeStatus] = None + https: Optional[OutcomeStatus] = None + + def to_dict(self) -> Dict[str, float]: + d = {} + if self.dns: + d[self.dns.key] = self.dns.value + if self.tcp: + d[self.tcp.key] = self.tcp.value + if self.tls: + d[self.tls.key] = self.tls.value + if self.http: + d[self.http.key] = self.http.value + if self.https: + d[self.https.key] = self.https.value + return d + + def sum(self) -> float: + s = 0 + for _, val in self.to_dict().items(): + s += val + return s + + +@dataclass +class LoNI: + ok_final: float + + ok: OutcomeSpace + down: OutcomeSpace + blocked: OutcomeSpace + blocking_scope: Optional[str] + + def to_dict(self): + return { + "ok": self.ok.to_dict(), + "down": self.down.to_dict(), + "blocked": self.blocked.to_dict(), + "blocking_scope": self.blocking_scope, + "ok_final": self.ok_final, + } + + +def calculate_web_loni( + web_analysis: WebAnalysis, +) -> Tuple[LoNI, List[str]]: + ok_value = 0 + ok = OutcomeSpace() + down = OutcomeSpace() + blocked = OutcomeSpace() + + # TODO(arturo): make this not nullable + blocking_scope = None + analysis_transcript = [] + + # We start off not knowing anything. + # So basically without any additional information we may as well be rolling + # a 3 sided dice. + # Yes, you can make a 3 sided dice: https://upload.wikimedia.org/wikipedia/commons/3/3b/04ds3.JPG + blocked_key, down_key = None, None + ok_value, down_value, blocked_value = 0.33, 0.33, 0.33 + # We are in the case of a DNS query failing, i.e. we got no answer. + if web_analysis.dns_consistency_system_failure is not None: + """ + Relevant keys for this section of the analysis: + + web_analysis.dns_ground_truth_failure_cc_asn_count + web_analysis.dns_ground_truth_failure_count + web_analysis.dns_ground_truth_nxdomain_count + web_analysis.dns_ground_truth_nxdomain_cc_asn_count + web_analysis.dns_ground_truth_ok_count + web_analysis.dns_ground_truth_ok_cc_asn_count + """ + # For sure, no matter what, the target is having some issue. + ok_value = 0.0 + # We now need to figure out if the failure is because of the target + # being down or if it's blocked. + blocked_key, down_key = "dns", "dns" + blocked_value, down_value = 0.5, 0.5 + dns_ground_truth_failure_count = ( + web_analysis.dns_ground_truth_failure_count or 0 + ) + dns_ground_truth_ok_count = web_analysis.dns_ground_truth_ok_count or 0 + dns_ground_truth_failure_cc_asn_count = ( + web_analysis.dns_ground_truth_failure_cc_asn_count or 0 + ) + dns_ground_truth_ok_cc_asn_count = ( + web_analysis.dns_ground_truth_ok_cc_asn_count or 0 + ) + + # Without any additional information, it could be 50/50 + if web_analysis.dns_consistency_system_failure in NXDOMAIN_FAILURES: + # NXDOMAIN errors are more commonly associated with censorship, let's bump up the blocked_value + blocked_key, down_key = "dns.nxdomain", "dns.nxdomain" + blocked_value = 0.6 + down_value = 1 - blocked_value + analysis_transcript.append( + "web_analysis.dns_consistency_system_failure in NXDOMAIN_FAILURES" + ) + if dns_ground_truth_failure_count > dns_ground_truth_ok_count: + # It's failing more than it's succeeding. This smells like an unreliable site. + blocked_value = 0.3 + down_value = 1 - blocked_value + analysis_transcript.append( + "dns_ground_truth_failure_count > dns_ground_truth_ok_count" + ) + if ( + dns_ground_truth_failure_cc_asn_count + > dns_ground_truth_ok_cc_asn_count + ): + # Even more if that is happening globally + blocked_value = 0.2 + down_value = 1 - blocked_value + analysis_transcript.append( + "dns_ground_truth_failure_cc_asn_count > dns_ground_truth_ok_cc_asn_count" + ) + elif ( + dns_ground_truth_ok_count > 0 + and web_analysis.dns_ground_truth_nxdomain_count == 0 + and web_analysis.dns_ground_truth_nxdomain_cc_asn_count == 0 + ): + # If we never saw a single NXDOMAIN in our ground truth, then + # it's really fishy. Let's bump up the blocking reason. + # TODO(arturo): when we introduce web_obs based ground truthing, + # we should use a threshold here based on the number of metrics + analysis_transcript.append( + "dns_ground_truth_ok_count > 0 and web_analysis.dns_ground_truth_nxdomain_count == 0 and web_analysis.dns_ground_truth_nxdomain_cc_asn_count == 0" + ) + blocked_value = 0.75 + down_value = 1 - blocked_value + else: + analysis_transcript.append( + "web_analysis.dns_consistency_system_failure not in NXDOMAIN_FAILURES" + ) + if dns_ground_truth_failure_count > dns_ground_truth_ok_count: + analysis_transcript.append( + "dns_ground_truth_failure_count > dns_ground_truth_ok_count" + ) + # it's failing more than it's succeeding, more likely to be blocked. + blocked_key, down_key = "dns.failure", "dns.failure" + blocked_value = 0.6 + down_value = 1 - blocked_value + + elif len(web_analysis.dns_consistency_system_answers) > 0: + analysis_transcript.append( + "len(web_analysis.dns_consistency_system_answers) > 0" + ) + # Ok we got some answers. Now we need to figure out if what we got is a + # good answer. + blocked_key = "dns" + down_key = "dns" + blocked_value = 0.5 + ok_value = 0.5 + # No matter what happens, it's not gonna be flagged as down + down_value = 0 + if web_analysis.dns_consistency_system_is_answer_tls_consistent == True: + # "easy" case: we got a TLS consistent answer we can flag it as good + # and move on with our business. + # + # XXX(arturo): there is an important caveat here. We have seen + # cases where you get a surely wrong answer via DNS (eg. a bogon), + # but for some reason you are then still establishing a good TLS + # handshake. Technically it's probably OK to mark these as unblocked + # since eventually you do get the content, but it's worth pointing + # it out. + # Here is a sample measurement for this case: https://explorer.ooni.org/m/20230101013014.694846_AE_webconnectivity_f9f1078ce75936a1 + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_tls_consistent == True" + ) + blocked_value = 0 + down_value = 0 + ok_value = 1 + elif ( + web_analysis.dns_consistency_system_is_answer_fp_match == True + and web_analysis.dns_consistency_system_is_answer_fp_false_positive == False + ): + # TODO(arturo): will we eventually have false positives in the DNS? If so how do we handle them? + # We matched a signature known to be used to implemented censorship. We can mark this as confirmed blocked. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_fp_match == True and web_analysis.dns_consistency_system_is_answer_fp_false_positive == False" + ) + blocked_key = "dns.confirmed" + blocking_scope = web_analysis.dns_consistency_system_answer_fp_scope + blocked_value = 0.9 + if ( + web_analysis.dns_consistency_system_is_answer_fp_country_consistent + == True + ): + blocked_key = "dns.confirmed.country_consistent" + blocked_value = 1.0 + elif ( + web_analysis.dns_consistency_system_is_answer_fp_country_consistent + == False + ): + # We let the blocked value be slightly less for cases where the fingerprint is not country consistent + blocked_key = "dns.confirmed.not_country_consistent" + blocked_value = 0.8 + ok_value = 0 + down_value = 0 + elif web_analysis.dns_consistency_system_is_answer_bogon == True: + # Bogons are always fishy, yet we don't know if we see it because + # the site is misconfigured. + # In any case a bogon is not a routable IP, so the target is either + # down or blocked. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_bogon == True" + ) + blocked_key, down_key = "dns.bogon", "dns.bogon" + blocked_value = 0.5 + down_value = 0.5 + ok_value = 0 + if ( + web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers + == False + ): + # If we didn't see the bogon in the trusted answers, then it's probably censorship + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers == False" + ) + blocked_value = 0.8 + down_value = 1 - blocked_value + ok_value = 0 + elif ( + web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers + == True + ): + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers == True" + ) + # If we did see it in the trusted answers, then we should actually ignore this case and mark the target as down. + blocked_value = 0.2 + down_value = 1 - blocked_value + ok_value = 0 + elif ( + web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers == True + ): + # Direct hit of the IP in the trusted answers. We nothing to see here. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_ip_in_trusted_answers == True" + ) + blocked_value = 0.1 + ok_value = 1 - blocked_value + down_value = 0 + elif ( + web_analysis.dns_consistency_system_is_answer_asn_in_trusted_answers == True + or web_analysis.dns_consistency_system_is_answer_asorg_in_trusted_answers + == True + ): + # The ASN or AS org name of the observation matches that of our control. Most likely this is not a case of blocking. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_asn_in_trusted_answers == True" + ) + blocked_value = 0.2 + ok_value = 1 - blocked_value + down_value = 0 + if web_analysis.dns_consistency_other_is_answer_cloud_provider == True: + # We are even more confident about it not being blocked if it's a cloud provider + analysis_transcript.append( + "web_analysis.dns_consistency_other_is_answer_cloud_provider == True" + ) + blocked_value = 0.1 + ok_value = 1 - blocked_value + down_value = 0 + else: + # We are done with all the simpler cases. We can now move into the + # more sketchy dubious analysis strategies. + # We assume that if we weren't able to determine consistency through + # the several previous methods, we will air on the side of saying + # it's blocked, but marginally. + analysis_transcript.append("not a_simple_case") + blocked_value = 0.6 + ok_value = 1 - blocked_value + down_value = 0 + # TODO(arturo): if we ever add false positive fingerprints to DNS we + # should add case for them here. + if web_analysis.dns_consistency_system_is_answer_probe_cc_match == True: + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_probe_cc_match == True" + ) + # It's common for blockpages to be hosted in the country of where the blocking is happening, let's bump up the blocking score. + blocked_key = "dns.inconsistent" + blocked_value = 0.65 + ok_value = 1 - blocked_value + if ( + web_analysis.dns_consistency_system_is_answer_cloud_provider + == False + ): + # If it's not a cloud provider, even more a reason to believe that's the case. + # TODO(arturo): add a new metric which tells us if the + # target domain is being hosted on a cloud provider and use + # that instead since this metric here will actually never be set to false + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_cloud_provider == False" + ) + blocked_value = 0.75 + ok_value = 1 - blocked_value + elif ( + web_analysis.dns_consistency_system_is_answer_cloud_provider == True + ): + # If it's a cloud provider, this is probably a false positive. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_cloud_provider == True" + ) + blocked_key = "dns.cloud" + blocked_value = 0.3 + ok_value = 1 - blocked_value + elif ( + web_analysis.dns_consistency_system_is_answer_probe_asn_match + == True + ): + # It's not a cloud provider, but it's in the same network. Somethings up. + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_probe_asn_match == True" + ) + blocked_value = 0.7 + ok_value = 1 - blocked_value + + if blocked_key and down_key: + # We have finished the DNS analysis. We can store the + # final analysis to blocked and down dictionaries. + blocked.dns = OutcomeStatus( + key=blocked_key, value=blocked_value, scope=blocking_scope + ) + down.dns = OutcomeStatus(key=down_key, value=down_value) + ok.dns = OutcomeStatus(key="dns", value=ok_value) + assert ( + round(blocked.sum() + down.sum() + ok_value) == 1 + ), f"{blocked} + {down} + {ok_value} != 1" + if ok_value < 0.5: + # If the DNS analysis is leading us to believe the target is more down + # or blocked, than OK, we better off just call it day and return early. + # If we don't know if the answer we got was DNS consistent, we can't + # really trust the TCP and TLS analysis results. + # TODO(arturo): do we want to have different thresholds here? + analysis_transcript.append(f"ok_value < 0.5 # OK is {ok_value} after DNS") + return ( + LoNI( + ok_final=ok_value, + ok=ok, + blocked=blocked, + down=down, + blocking_scope=blocking_scope, + ), + analysis_transcript, + ) + + # TODO(arturo): convert paper notes into proof to go in here + if web_analysis.tcp_success == True: + # We succeeded via TCP, no matter what there are no TCP level issues + blocked_key, down_key = "tcp", "tcp" + down_value, blocked_value = 0.0, 0.0 + blocked.tcp = OutcomeStatus(key=blocked_key, value=blocked_value) + down.tcp = OutcomeStatus(key=down_key, value=down_value) + + elif web_analysis.tcp_success == False: + analysis_transcript.append("web_analysis.tcp_success == False") + # No matter what the target is + blocked_key, down_key = "tcp.failure", "tcp.failure" + + down_value, blocked_value = 0.5, 0.5 + tcp_ground_truth_failure_count = ( + web_analysis.tcp_ground_truth_trusted_failure_count or 0 + ) + # TODO(arturo): Here we are only using the trusted ground truths (i.e. the control measurements) + # eventually we want to switch to using other OONI measurements too. + tcp_ground_truth_ok_count = web_analysis.tcp_ground_truth_trusted_ok_count or 0 + tcp_ground_truth_failure_asn_cc_count = ( + web_analysis.tcp_ground_truth_failure_asn_cc_count or 0 + ) + tcp_ground_truth_ok_asn_cc_count = ( + web_analysis.tcp_ground_truth_ok_asn_cc_count or 0 + ) + if tcp_ground_truth_failure_count > tcp_ground_truth_ok_count: + analysis_transcript.append( + "tcp_ground_truth_failure_count > tcp_ground_truth_ok_count" + ) + # It's failing more than it's succeeding. Probably the site is unreliable + blocked_value = 0.3 + down_value = 1 - blocked_value + if tcp_ground_truth_failure_asn_cc_count > tcp_ground_truth_ok_asn_cc_count: + analysis_transcript.append( + "tcp_ground_truth_failure_asn_cc_count > tcp_ground_truth_ok_asn_cc_count" + ) + + # Even more if it's happening globally + blocked_value = 0.2 + down_value = 1 - blocked_value + elif tcp_ground_truth_ok_count > tcp_ground_truth_failure_count: + analysis_transcript.append( + "tcp_ground_truth_ok_count > tcp_ground_truth_failure_count" + ) + # OTOH, if it's mostly working, then this is a sign of blocking + blocked_value = 0.7 + down_value = 1 - blocked_value + if web_analysis.tcp_failure == "connection_reset": + analysis_transcript.append( + 'web_analysis.tcp_failure == "connection_reset"' + ) + # Connection reset is very fishy. Let's bump up the blocking value. + blocked_value = 0.8 + down_value = 1 - blocked_value + elif web_analysis.tcp_failure == "connection_reset": + analysis_transcript.append('web_analysis.tcp_failure == "connection_reset"') + # Connection reset is very fishy. Let's bump up the blocking value. + blocked_value = 0.7 + down_value = 1 - blocked_value + + # Let's set some nice blocking keys + if web_analysis.tcp_failure in ["generic_timeout_error", "timed_out"]: + blocked_key, down_key = "tcp.timeout", "tcp.timeout" + elif web_analysis.tcp_failure == "connection_reset": + blocked_key, down_key = "tcp.connection_reset", "tcp.connection_reset" + else: + blocked_key = f"{blocked_key}.{web_analysis.tcp_failure}" + down_key = f"{down_key}.{web_analysis.tcp_failure}" + + blocked.tcp = OutcomeStatus(key=blocked_key, value=blocked_value * ok_value) + down.tcp = OutcomeStatus(key=down_key, value=down_value * ok_value) + # TODO(arturo): double check this is correct + ok.tcp = OutcomeStatus(key="tcp", value=1 - (blocked.sum() + down.sum())) + + if blocked_key and down_key: + old_ok_value = ok_value + ok_value = 1 - (blocked.sum() + down.sum()) + assert ( + round(blocked.sum() + down.sum() + ok_value) == 1 + ), f"{blocked} + {down} + {ok_value} != 1" + + if ok_value < 0.5: + # If the TCP analysis is leading us to believe the target is more down + # or blocked, than OK, we better off just call it day and return early. + # TODO(arturo): How should we map multiple failure types? This is OK for + # web 0.4, but doesn't apply to wc 0.5 + analysis_transcript.append( + f"ok_value < 0.5 # OK went after TCP from {old_ok_value} -> {ok_value}" + ) + return ( + LoNI( + ok_final=ok_value, + ok=ok, + blocked=blocked, + down=down, + blocking_scope=blocking_scope, + ), + analysis_transcript, + ) + + if web_analysis.tls_success == True: + blocked_key, down_key = "tls", "tls" + down_value, blocked_value = 0.0, 0.0 + blocked.tls = OutcomeStatus(key=blocked_key, value=blocked_value) + down.tls = OutcomeStatus(key=down_key, value=down_value) + + elif web_analysis.tls_success == False: + analysis_transcript.append("web_analysis.tls_success == False") + # No matter what we are in a tls failure case + blocked_key, down_key = "tls.failure", "tls.failure" + + down_value, blocked_value = 0.5, 0.5 + + # TODO(arturo): Here we are only using the trusted ground truths (i.e. + # the control measurements) eventually we want to switch to using other + # OONI measurements too. + tls_ground_truth_failure_count = ( + web_analysis.tls_ground_truth_trusted_failure_count or 0 + ) + tls_ground_truth_ok_count = web_analysis.tls_ground_truth_trusted_ok_count or 0 + tls_ground_truth_failure_asn_cc_count = ( + web_analysis.tls_ground_truth_failure_asn_cc_count or 0 + ) + tls_ground_truth_ok_asn_cc_count = ( + web_analysis.tls_ground_truth_ok_asn_cc_count or 0 + ) + if tls_ground_truth_failure_count > tls_ground_truth_ok_count: + analysis_transcript.append( + "tls_ground_truth_failure_count > tls_ground_truth_ok_count" + ) + # It's failing more than it's succeeding. Probably the site is unreliable + blocked_value = 0.3 + down_value = 1 - blocked_value + if tls_ground_truth_failure_asn_cc_count > tls_ground_truth_ok_asn_cc_count: + analysis_transcript.append( + "tls_ground_truth_failure_asn_cc_count > tls_ground_truth_ok_asn_cc_count" + ) + # Even more if it's happening globally + blocked_value = 0.2 + down_value = 1 - blocked_value + elif tls_ground_truth_ok_count > tls_ground_truth_failure_count: + analysis_transcript.append( + "tls_ground_truth_ok_count > tls_ground_truth_failure_count" + ) + # OTOH, if it's mostly working, then this is a sign of blocking + blocked_value = 0.7 + down_value = 1 - blocked_value + if web_analysis.tls_is_tls_certificate_invalid == True: + analysis_transcript.append( + "web_analysis.tls_is_tls_certificate_invalid == True" + ) + # bad certificate is very fishy. Let's bump up the blocking value. + blocked_value = 0.9 + down_value = 1 - blocked_value + elif web_analysis.tls_failure == "connection_reset": + # bad certificate is very fishy. Let's bump up the blocking value. + analysis_transcript.append( + "web_analysis.tls_failure == 'connection_reset'" + ) + blocked_value = 0.8 + down_value = 1 - blocked_value + + elif web_analysis.tls_is_tls_certificate_invalid == True: + analysis_transcript.append( + "web_analysis.tls_is_tls_certificate_invalid == True" + ) + # bad certificate is very fishy. Let's bump up the blocking value. + blocked_value = 0.8 + down_value = 1 - blocked_value + elif web_analysis.tls_failure == "connection_reset": + # connection_reset very fishy. Let's bump up the blocking value. + analysis_transcript.append("web_analysis.tls_failure == 'connection_reset'") + blocked_value = 0.7 + down_value = 1 - blocked_value + + # Let's set some nice blocking keys + if web_analysis.tls_failure in ["generic_timeout_error", "timed_out"]: + blocked_key, down_key = "tls.timeout", "tls.timeout" + elif web_analysis.tls_failure == "connection_reset": + blocked_key, down_key = "tls.connection_reset", "tls.connection_reset" + else: + blocked_key = f"{blocked_key}.{web_analysis.tls_failure}" + down_key = f"{down_key}.{web_analysis.tls_failure}" + + blocked.tls = OutcomeStatus(key=blocked_key, value=blocked_value * ok_value) + down.tls = OutcomeStatus(key=down_key, value=down_value * ok_value) + # TODO(arturo): double check this is correct + ok.tls = OutcomeStatus(key="tls", value=1 - (blocked.sum() + down.sum())) + + if blocked_key and down_key: + old_ok_value = ok_value + ok_value = 1 - (blocked.sum() + down.sum()) + assert ( + round(blocked.sum() + down.sum() + ok_value) + ) == 1, f"{blocked} + {down} + {ok_value} != 1" + + if ok_value < 0.5: + # If the TLS analysis is leading us to believe the target is more down + # or blocked, than OK, we better off just call it day and return early. + analysis_transcript.append( + f"ok_value < 0.5 # OK went after TLS from {old_ok_value} -> {ok_value}" + ) + return ( + LoNI( + ok_final=ok_value, + ok=ok, + blocked=blocked, + down=down, + blocking_scope=blocking_scope, + ), + analysis_transcript, + ) + + if web_analysis.http_is_http_request_encrypted is not None: + # If the connection is encrypted we will map these to TLS failures, + # since they are equivalent to the TLS level anomalies. + prefix = "http" + if web_analysis.http_is_http_request_encrypted == True: + prefix = "tls" + + # This is the special case to handle the situation where the HTTP + # analysis happens on it's own. Our prior is set to 1.0 + # TODO(arturo): add more details on why this works + if not blocked_key and not down_key: + ok_value = 1.0 + + blocked_key, down_key = prefix, prefix + + if ( + web_analysis.http_is_http_request_encrypted == True + and web_analysis.http_success == True + ): + analysis_transcript.append( + "web_analysis.http_is_http_request_encrypted == True and web_analysis.http_success == True" + ) + down_value, blocked_value = 0.0, 0.0 + + elif ( + web_analysis.http_is_http_request_encrypted == False + and web_analysis.http_success == True + ): + down_value = 0.0 + # We got an answer via HTTP, yet we don't know if the answer is correct. + analysis_transcript.append( + "web_analysis.http_is_http_request_encrypted == False and web_analysis.http_success == True" + ) + if web_analysis.http_is_http_fp_match == True: + # It matches a known fingerprint, we can say stuff + analysis_transcript.append("web_analysis.http_is_http_fp_match == True") + if web_analysis.http_is_http_fp_false_positive == False: + # We matched a signature known to be used to implemented censorship. We can mark this as confirmed blocked. + analysis_transcript.append( + "web_analysis.http_is_http_fp_false_positive == False" + ) + blocked_key = "http.confirmed" + blocking_scope = web_analysis.http_fp_scope + blocked_value = 0.9 + if web_analysis.http_is_http_fp_country_consistent == True: + analysis_transcript.append( + "web_analysis.http_is_http_fp_country_consistent == True" + ) + blocked_key = "http.confirmed.country_consistent" + blocked_value = 1.0 + elif web_analysis.http_is_http_fp_country_consistent == False: + # We let the blocked value be slightly less for cases where the fingerprint is not country consistent + analysis_transcript.append( + "web_analysis.dns_consistency_system_is_answer_fp_country_consistent == False" + ) + blocked_key = "http.confirmed.not_country_consistent" + blocked_value = 0.8 + elif web_analysis.http_is_http_fp_false_positive == True: + blocked_value = 0.0 + else: + # We need to apply some fuzzy logic to fingerprint it + # TODO(arturo): in the future can use more features, such as the following + """ + web_analysis.http_response_status_code + web_analysis.http_response_body_proportion + web_analysis.http_response_body_length + web_analysis.http_ground_truth_body_length + """ + http_response_body_length = web_analysis.http_response_body_length or 0 + http_ground_truth_body_length = ( + web_analysis.http_ground_truth_body_length or 0 + ) + body_proportion = (http_response_body_length + 1) / ( + http_ground_truth_body_length + 1 + ) + if body_proportion < 0.7: + analysis_transcript.append( + "(http_response_body_length + 1)/ (http_ground_truth_body_length + 1) < 0.7" + ) + blocked_key = "http.inconsistent.body_length_mismatch" + blocked_value = 0.7 + # TODO(arturo): check if this indeed has the desired effect. + down_value = 0 + + elif web_analysis.http_failure: + analysis_transcript.append(f"web_analysis.http_failure # ok: {ok_value}") + # No matter what we are in a failure case + + blocked_key, down_key = f"{prefix}.failure", f"{prefix}.failure" + down_value, blocked_value = 0.5, 0.5 + + # TODO(arturo): Here we are only using the trusted ground truths (i.e. + # the control measurements) eventually we want to switch to using other + # OONI measurements too. + https_ground_truth_failure_count = ( + web_analysis.http_ground_truth_trusted_failure_count or 0 + ) + https_ground_truth_ok_count = ( + web_analysis.http_ground_truth_trusted_ok_count or 0 + ) + https_ground_truth_failure_asn_cc_count = ( + web_analysis.http_ground_truth_failure_asn_cc_count or 0 + ) + https_ground_truth_ok_asn_cc_count = ( + web_analysis.http_ground_truth_ok_asn_cc_count or 0 + ) + if https_ground_truth_failure_count > https_ground_truth_ok_count: + analysis_transcript.append( + "https_ground_truth_failure_count > https_ground_truth_ok_count" + ) + # It's failing more than it's succeeding. Probably the site is unreliable + blocked_value = 0.3 + down_value = 0.7 + if ( + https_ground_truth_failure_asn_cc_count + > https_ground_truth_ok_asn_cc_count + ): + analysis_transcript.append( + "https_ground_truth_failure_asn_cc_count > https_ground_truth_ok_asn_cc_count" + ) + # Even more if it's happening globally + blocked_value = 0.2 + down_value = 0.8 + elif https_ground_truth_ok_count > https_ground_truth_failure_count: + analysis_transcript.append( + "https_ground_truth_ok_count > https_ground_truth_failure_count" + ) + # OTOH, if it's mostly working, then this is a sign of blocking + blocked_value = 0.7 + down_value = 0.3 + if "ssl_" in web_analysis.http_failure: + analysis_transcript.append('"ssl_" in web_analysis.http_failure') + # bad certificate is very fishy. Let's bump up the blocking value. + blocked_value = 0.9 + down_value = 0.1 + elif web_analysis.http_failure == "connection_reset": + # connection reset is very fishy. Let's bump up the blocking value. + analysis_transcript.append( + 'web_analysis.http_failure == "connection_reset"' + ) + blocked_value = 0.8 + down_value = 0.2 + + elif web_analysis.http_failure == "connection_reset": + # connection_reset very fishy. Let's bump up the blocking value. + analysis_transcript.append( + "web_analysis.http_failure == 'connection_reset'" + ) + blocked_value = 0.7 + down_value = 0.3 + + # Let's set some nice blocking keys + if web_analysis.http_failure in ["generic_timeout_error", "timed_out"]: + blocked_key, down_key = f"{prefix}.timeout", f"{prefix}.timeout" + elif web_analysis.http_failure == "connection_reset": + blocked_key, down_key = ( + f"{prefix}.connection_reset", + f"{prefix}.connection_reset", + ) + else: + blocked_key = f"{blocked_key}.{web_analysis.http_failure}" + down_key = f"{down_key}.{web_analysis.http_failure}" + + if prefix == "tls": + if blocked.tls is not None: + log.info( + f"Overwriting previous TLS blocking status {blocked.tls} - {down.tls} with {blocked_value} {down_value} ({web_analysis.measurement_uid})" + ) + blocked.tls = OutcomeStatus(key=blocked_key, value=blocked_value * ok_value) + down.tls = OutcomeStatus(key=down_key, value=down_value * ok_value) + # TODO(arturo): double check this is correct + ok.tls = OutcomeStatus(key="tls", value=1 - (blocked.sum() + down.sum())) + else: + blocked.http = OutcomeStatus( + key=blocked_key, value=blocked_value * ok_value, scope=blocking_scope + ) + down.http = OutcomeStatus(key=down_key, value=down_value * ok_value) + # TODO(arturo): double check this is correct + ok.http = OutcomeStatus(key="http", value=1 - (blocked.sum() + down.sum())) + + if blocked_key and down_key: + old_ok_value = ok_value + ok_value = 1 - (blocked.sum() + down.sum()) + assert ( + round(blocked.sum() + down.sum() + ok_value) == 1 + ), f"{blocked} + {down} + {ok_value} != 1" + + return ( + LoNI( + ok_final=ok_value, + ok=ok, + blocked=blocked, + down=down, + blocking_scope=blocking_scope, + ), + analysis_transcript, + ) + + +def make_website_experiment_results( + web_analysis: List[WebAnalysis], +) -> Generator[MeasurementExperimentResult, None, None]: + """ + Takes as input a list of web_analysis and outputs a list of + ExperimentResults for the website. + """ + observation_id_list = [] + first_analysis = web_analysis[0] + + measurement_uid = first_analysis.measurement_uid + timeofday = first_analysis.measurement_start_time + + target_nettest_group = "websites" + target_category = "MISC" + target_name = map_analysis_to_target_name(first_analysis) + target_domain_name = first_analysis.target_domain_name + target_detail = first_analysis.target_detail + + analysis_transcript_list = [] + loni_list: List[LoNI] = [] + loni_blocked_list: List[OutcomeSpace] = [] + loni_down_list: List[OutcomeSpace] = [] + loni_ok_list: List[OutcomeSpace] = [] + for wa in web_analysis: + loni, analysis_transcript = calculate_web_loni(wa) + analysis_transcript_list.append(analysis_transcript) + loni_list.append(loni) + loni_blocked_list.append(loni.blocked) + loni_down_list.append(loni.down) + loni_ok_list.append(loni.ok) + + final_blocked = OutcomeSpace() + final_down = OutcomeSpace() + final_ok = OutcomeSpace() + ok_value = 0 + blocking_scope = None + + # TODO(arturo): this section needs to be formalized and verified a bit more + # in depth. Currently it's just a prototype to start seeing how the data + # looks like. + + def get_agg_outcome(loni_list, category, agg_func) -> Optional[OutcomeStatus]: + """ + Returns the min or max outcome status of the specified category given the loni list + """ + try: + return agg_func( + filter( + lambda x: x is not None, + map(lambda x: getattr(x, category), loni_list), + ), + key=lambda d: d.value if d else 0, + ) + except ValueError: + return None + + ### FINAL DNS + max_dns_blocked = get_agg_outcome(loni_blocked_list, "dns", max) + max_dns_down = get_agg_outcome(loni_down_list, "dns", max) + min_dns_ok = get_agg_outcome(loni_ok_list, "dns", min) + + if max_dns_blocked and max_dns_down and min_dns_ok: + ok_value = min_dns_ok.value + final_ok.dns = OutcomeStatus(key="dns", value=min_dns_ok.value) + final_blocked.dns = OutcomeStatus( + key=max_dns_blocked.key, value=max_dns_blocked.value + ) + final_down.dns = OutcomeStatus( + # TODO(arturo): this is overestimating blocking. + key=max_dns_down.key, + value=1 - (min_dns_ok.value + max_dns_blocked.value), + ) + if max_dns_blocked.scope: + # TODO(arturo): set this on the parent OutcomeStatus too + blocking_scope = max_dns_blocked.scope + log.debug(f"DNS done {ok_value}") + + ### FINAL TCP + max_tcp_blocked = get_agg_outcome(loni_blocked_list, "tcp", max) + max_tcp_down = get_agg_outcome(loni_down_list, "tcp", max) + min_tcp_ok = get_agg_outcome(loni_ok_list, "tcp", min) + if max_tcp_blocked and max_tcp_down and min_tcp_ok: + log.debug(f"PERFORMING TCP {ok_value}") + log.debug(f"max_tcp_blocked: {max_tcp_blocked}") + log.debug(f"max_tcp_down: {max_tcp_down}") + log.debug(f"min_tcp_ok: {min_tcp_ok}") + log.debug(f"final_down: {final_down}") + log.debug(f"final_blocked: {final_blocked}") + log.debug(f"final_ok: {final_ok}") + final_blocked.tcp = OutcomeStatus( + key=max_tcp_blocked.key, value=max_tcp_blocked.value * ok_value + ) + final_down.tcp = OutcomeStatus( + key=max_tcp_down.key, + value=(1 - (min_tcp_ok.value + max_tcp_blocked.value)) * ok_value, + ) + final_ok.tcp = OutcomeStatus(key="tcp", value=min_tcp_ok.value) + # TODO(arturo): should we update the DNS down key value in light of the + # fact we notice TCP is bad and hence the answer might have been bad to + # begin with? + old_ok_value = ok_value + ok_value = 1 - (final_blocked.sum() + final_down.sum()) + log.debug(f"TCP done {old_ok_value} -> {ok_value}") + log.debug(f"final_down: {final_down}") + log.debug(f"final_blocked: {final_blocked}") + log.debug(f"final_ok: {final_ok}") + + ### FINAL TLS + max_tls_blocked = get_agg_outcome(loni_blocked_list, "tls", max) + max_tls_down = get_agg_outcome(loni_down_list, "tls", max) + min_tls_ok = get_agg_outcome(loni_ok_list, "tls", min) + if max_tls_blocked and max_tls_down and min_tls_ok: + final_blocked.tls = OutcomeStatus( + key=max_tls_blocked.key, value=max_tls_blocked.value * ok_value + ) + final_down.tls = OutcomeStatus( + key=max_tls_down.key, + value=(1 - (min_tls_ok.value + max_tls_blocked.value)) * ok_value, + ) + final_ok.tls = OutcomeStatus(key="tls", value=min_tls_ok.value) + old_ok_value = ok_value + ok_value = 1 - (final_blocked.sum() + final_down.sum()) + log.debug(f"TLS done {old_ok_value} -> {ok_value}") + log.debug(f"final_down: {final_down}") + log.debug(f"final_blocked: {final_blocked}") + log.debug(f"final_ok: {final_ok}") + + ### FINAL HTTP + max_http_blocked = get_agg_outcome(loni_blocked_list, "http", max) + max_http_down = get_agg_outcome(loni_down_list, "http", max) + min_http_ok = get_agg_outcome(loni_ok_list, "http", min) + + if max_http_blocked and max_http_down and min_http_ok: + final_blocked.http = OutcomeStatus( + key=max_http_blocked.key, value=max_http_blocked.value * ok_value + ) + final_down.http = OutcomeStatus( + key=max_http_down.key, + value=(1 - (min_http_ok.value + max_http_blocked.value)) * ok_value, + ) + final_ok.http = OutcomeStatus(key="http", value=min_http_ok.value) + if max_http_blocked.scope: + if blocking_scope is not None: + log.warning(f"overwriting blocking_scope key: {blocking_scope}") + # TODO(arturo): set this on the parent OutcomeStatus too + blocking_scope = max_http_blocked.scope + + old_ok_value = ok_value + ok_value = 1 - (final_blocked.sum() + final_down.sum()) + log.debug(f"HTTP done {old_ok_value} -> {ok_value}") + log.debug(f"final_down: {final_down}") + log.debug(f"final_blocked: {final_blocked}") + log.debug(f"final_ok: {final_ok}") + + final_loni = LoNI( + ok_final=ok_value, + ok=final_ok, + down=final_down, + blocked=final_blocked, + blocking_scope=blocking_scope, + ) + log.debug(f"final_loni: {final_loni}") + + loni_ok_value = final_loni.ok_final + + loni_down = final_loni.down.to_dict() + loni_down_keys, loni_down_values = list(loni_down.keys()), list(loni_down.values()) + + loni_blocked = final_loni.blocked.to_dict() + loni_blocked_keys, loni_blocked_values = list(loni_blocked.keys()), list( + loni_blocked.values() + ) + + loni_ok = final_loni.ok.to_dict() + loni_ok_keys, loni_ok_values = list(loni_ok.keys()), list(loni_ok.values()) + + is_anomaly = loni_ok_value < 0.6 + is_confirmed = final_loni.blocked.sum() > 0.9 + + er = MeasurementExperimentResult( + measurement_uid=measurement_uid, + observation_id_list=observation_id_list, + timeofday=timeofday, + created_at=datetime.now(timezone.utc).replace(tzinfo=None), + location_network_type=first_analysis.network_type, + location_network_asn=first_analysis.probe_asn, + location_network_cc=first_analysis.probe_cc, + location_network_as_org_name=first_analysis.probe_as_org_name, + location_network_as_cc=first_analysis.probe_as_cc, + location_resolver_asn=first_analysis.resolver_asn, + location_resolver_as_org_name=first_analysis.resolver_as_org_name, + location_resolver_as_cc=first_analysis.resolver_as_cc, + location_resolver_cc=first_analysis.resolver_cc, + location_blocking_scope=None, + target_nettest_group=target_nettest_group, + target_category=target_category, + target_name=target_name, + target_domain_name=target_domain_name, + target_detail=target_detail, + loni_ok_value=loni_ok_value, + loni_down_keys=loni_down_keys, + loni_down_values=loni_down_values, + loni_blocked_keys=loni_blocked_keys, + loni_blocked_values=loni_blocked_values, + loni_ok_keys=loni_ok_keys, + loni_ok_values=loni_ok_values, + loni_list=list(map(lambda x: x.to_dict(), loni_list)), + analysis_transcript_list=analysis_transcript_list, + measurement_count=1, + observation_count=len(web_analysis), + vp_count=1, + anomaly=is_anomaly, + confirmed=is_confirmed, + ) + + yield er diff --git a/oonidata/analysis/websites.py b/oonidata/analysis/websites.py deleted file mode 100644 index 89eaaf77..00000000 --- a/oonidata/analysis/websites.py +++ /dev/null @@ -1,1187 +0,0 @@ -from collections import defaultdict -from dataclasses import dataclass -import dataclasses -import ipaddress -import math -from typing import Any, Generator, Iterable, NamedTuple, Optional, List, Dict -from oonidata.db.connections import ClickhouseConnection -from oonidata.analysis.control import ( - WebGroundTruth, - BodyDB, -) -from oonidata.models.experiment_result import ( - BlockingScope, - Outcome, - Scores, - ExperimentResult, - fp_to_scope, - iter_experiment_results, -) - -from oonidata.fingerprintdb import FingerprintDB -from oonidata.models.observations import WebControlObservation, WebObservation - -import logging - -log = logging.getLogger("oonidata.processing") - -CLOUD_PROVIDERS_ASNS = [ - 13335, # Cloudflare: https://www.peeringdb.com/net/4224 - 20940, # Akamai: https://www.peeringdb.com/net/2 - 9002, # Akamai RETN - 396982, # Google Cloud: https://www.peeringdb.com/net/30878 -] - -CLOUD_PROVIDERS_AS_ORGS_SUBSTRINGS = ["akamai"] - - -def get_web_ctrl_observations( - db: ClickhouseConnection, measurement_uid: str -) -> List[WebControlObservation]: - obs_list = [] - column_names = [f.name for f in dataclasses.fields(WebControlObservation)] - q = "SELECT (" - q += ",\n".join(column_names) - q += ") FROM obs_web_ctrl WHERE measurement_uid = %(measurement_uid)s" - - for res in db.execute_iter(q, {"measurement_uid": measurement_uid}): - row = res[0] - obs_list.append( - WebControlObservation(**{k: row[idx] for idx, k in enumerate(column_names)}) - ) - return obs_list - - -def is_cloud_provider(asn: Optional[int], as_org_name: Optional[str]): - if asn and asn in CLOUD_PROVIDERS_ASNS: - return True - if as_org_name and any( - [ss in as_org_name.lower() for ss in CLOUD_PROVIDERS_AS_ORGS_SUBSTRINGS] - ): - return True - return False - - -def encode_address(ip: str, port: int) -> str: - """ - return a properly encoded address handling IPv6 IPs - """ - # I'm amazed python doesn't have this in the standard library - # and urlparse is incredibly inconsistent with it's handling of IPv6 - # addresses. - ipaddr = ipaddress.ip_address(ip) - addr = ip - if isinstance(ipaddr, ipaddress.IPv6Address): - addr = "[" + ip + "]" - - addr += f":{port}" - return addr - - -def confidence_estimate(x: int, factor: float = 0.8, clamping: float = 0.9): - """ - Gives an estimate of confidence given the number of datapoints that are - consistent (x). - - clamping: defines what is the maximum value it can take - factor: is a multiplicate factor to decrease the value of the function - - This function was derived by looking for an exponential function in - the form f(x) = c1*a^x + c2 and solving for f(0) = 0 and f(10) = 1, - giving us a function in the form f(x) = (a^x - 1) / (a^10 - 1). We - then choose the magic value of 0.5 by looking for a solution in a - where f(1) ~= 0.5, doing a bit of plots and choosing a curve that - looks reasonably sloped. - """ - y = (pow(0.5, x) - 1) / (pow(0.5, 10) - 1) - return min(clamping, factor * y) - - -def ok_vs_nok_score( - ok_count: int, nok_count: int, blocking_factor: float = 0.8 -) -> Scores: - """ - This is a very simplistic estimation that just looks at the proportions of - failures to reachable measurement to establish if something is blocked or - not. - - It assumes that what we are calculating the ok_vs_nok score is some kind of - failure, which means the outcome is either that the target is down or - blocked. - - In order to determine which of the two cases it is, we look at the ground - truth. - """ - # We set this to 0.65 so that for the default value and lack of any ground - # truth, we end up with blocked_score = 0.52 and down_score = 0.48 - blocked_score = min(1.0, 0.65 * blocking_factor) - down_score = 1 - blocked_score - total_count = ok_count + nok_count - if total_count > 0: - blocked_score = min(1.0, ok_count / total_count * blocking_factor) - down_score = 1 - blocked_score - - return Scores(ok=0.0, blocked=blocked_score, down=down_score) - - -def make_tcp_outcome( - web_o: WebObservation, web_ground_truths: List[WebGroundTruth] -) -> Outcome: - assert web_o.ip is not None and web_o.port is not None - - blocking_subject = encode_address(web_o.ip, web_o.port) - - # It's working, wothing to see here, go on with your life - if web_o.tcp_success: - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tcp", - detail="ok", - meta={}, - ok_score=1.0, - down_score=0.0, - blocked_score=0.0, - ) - - assert ( - web_o.tcp_failure is not None - ), "inconsistency between tcp_success and tcp_failure" - - ground_truths = filter( - lambda gt: gt.ip == web_o.ip and gt.port == web_o.port, web_ground_truths - ) - unreachable_cc_asn = set() - reachable_cc_asn = set() - ok_count = 0 - nok_count = 0 - for gt in ground_truths: - # We don't check for strict == True, since depending on the DB engine - # True could also be represented as 1 - if gt.tcp_success is None: - continue - if gt.tcp_success: - if gt.is_trusted_vp: - ok_count += 1 - else: - reachable_cc_asn.add((gt.vp_cc, gt.vp_asn)) - else: - if gt.is_trusted_vp: - nok_count += 1 - else: - unreachable_cc_asn.add((gt.vp_cc, gt.vp_asn)) - - reachable_count = ok_count + len(reachable_cc_asn) - unreachable_count = nok_count + len(unreachable_cc_asn) - blocking_meta = { - "unreachable_count": str(unreachable_count), - "reachable_count": str(reachable_count), - } - - blocking_factor = 0.7 - if web_o.tls_failure == "connection_reset": - blocking_factor = 0.8 - - scores = ok_vs_nok_score( - ok_count=reachable_count, - nok_count=unreachable_count, - blocking_factor=blocking_factor, - ) - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tcp", - detail=web_o.tcp_failure, - meta=blocking_meta, - ok_score=scores.ok, - blocked_score=scores.blocked, - down_score=scores.down, - ) - - -class DNSFingerprintOutcome(NamedTuple): - meta: Dict[str, str] - label: str - scope: BlockingScope - - -def match_dns_fingerprint( - dns_observations: List[WebObservation], fingerprintdb: FingerprintDB -) -> DNSFingerprintOutcome: - outcome_meta = {} - outcome_label = "" - outcome_scope = BlockingScope.UNKNOWN - for web_o in dns_observations: - if not web_o.dns_answer: - continue - fp = fingerprintdb.match_dns(web_o.dns_answer) - if fp: - outcome_scope = fp_to_scope(fp.scope) - if outcome_scope != BlockingScope.SERVER_SIDE_BLOCK: - outcome_label = f"blocked" - outcome_meta["fingerprint"] = fp.name - outcome_meta["fingerprint_consistency"] = "country_consistent" - - # If we see the fingerprint in an unexpected country we should - # significantly reduce the confidence in the block - if fp.expected_countries and web_o.probe_cc not in fp.expected_countries: - log.debug( - f"Inconsistent probe_cc vs expected_countries {web_o.probe_cc} != {fp.expected_countries}" - ) - outcome_meta["fingerprint_consistency"] = "country_inconsistent" - return DNSFingerprintOutcome( - meta=outcome_meta, label=outcome_label, scope=outcome_scope - ) - return DNSFingerprintOutcome( - meta=outcome_meta, label=outcome_label, scope=outcome_scope - ) - - -class DNSGroundTruth(NamedTuple): - nxdomain_cc_asn: set - failure_cc_asn: set - ok_cc_asn: set - other_ips: Dict[str, set] - other_asns: Dict[str, set] - trusted_answers: Dict - - @property - def ok_count(self): - return len(self.ok_cc_asn) - - @property - def failure_count(self): - return len(self.failure_cc_asn) - - @property - def nxdomain_count(self): - return len(self.nxdomain_cc_asn) - - -def make_dns_ground_truth(ground_truths: Iterable[WebGroundTruth]): - """ - Here we count how many vantage vantage points, as in distinct probe_cc, - probe_asn pairs, presented the various types of results. - """ - nxdomain_cc_asn = set() - failure_cc_asn = set() - ok_cc_asn = set() - other_ips = defaultdict(set) - other_asns = defaultdict(set) - trusted_answers = {} - for gt in ground_truths: - if gt.dns_success is None: - continue - if gt.dns_failure == "dns_nxdomain_error": - nxdomain_cc_asn.add((gt.vp_cc, gt.vp_asn)) - if not gt.dns_success: - failure_cc_asn.add((gt.vp_cc, gt.vp_asn)) - continue - - ok_cc_asn.add((gt.vp_cc, gt.vp_asn)) - other_ips[gt.ip].add((gt.vp_cc, gt.vp_asn)) - assert gt.ip, "did not find IP in ground truth" - other_asns[gt.ip_asn].add((gt.vp_cc, gt.vp_asn)) - if gt.tls_is_certificate_valid == True or gt.is_trusted_vp == True: - trusted_answers[gt.ip] = gt - - return DNSGroundTruth( - nxdomain_cc_asn=nxdomain_cc_asn, - failure_cc_asn=failure_cc_asn, - ok_cc_asn=ok_cc_asn, - other_asns=other_asns, - other_ips=other_ips, - trusted_answers=trusted_answers, - ) - - -def compute_dns_failure_outcomes( - dns_ground_truth: DNSGroundTruth, dns_observations: List[WebObservation] -) -> List[Outcome]: - outcomes = [] - for web_o in dns_observations: - if not web_o.dns_failure: - continue - - outcome_meta = { - "ok_count": str(dns_ground_truth.ok_count), - "failure_count": str(dns_ground_truth.failure_count), - "nxdomain_count": str(dns_ground_truth.nxdomain_count), - } - scores = ok_vs_nok_score( - ok_count=dns_ground_truth.ok_count, - nok_count=dns_ground_truth.failure_count, - ) - - blocking_detail = f"failure.{web_o.dns_failure}" - if web_o.dns_failure == "dns_nxdomain_error": - blocking_detail = "inconsistent.nxdomain" - scores = ok_vs_nok_score( - ok_count=dns_ground_truth.ok_count, - nok_count=dns_ground_truth.nxdomain_count, - blocking_factor=0.85, - ) - outcome_subject = ( - f"{web_o.hostname}@{web_o.dns_engine}-{web_o.dns_engine_resolver_address}" - ) - outcomes.append( - Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - subject=outcome_subject, - label="", - category="dns", - detail=blocking_detail, - meta=outcome_meta, - ok_score=scores.ok, - down_score=scores.down, - blocked_score=scores.blocked, - ) - ) - return outcomes - - -def dns_observations_by_resolver( - dns_observations: List[WebObservation], -) -> Dict[str, List[WebObservation]]: - by_resolver = defaultdict(list) - for dns_o in dns_observations: - key = f"{dns_o.dns_engine}-{dns_o.dns_engine_resolver_address}" - by_resolver[key].append(dns_o) - return by_resolver - - -def get_outcome_subject(dns_o: WebObservation): - return f"{dns_o.ip}@{dns_o.dns_engine}-{dns_o.dns_engine_resolver_address}" - - -def check_dns_bogon( - dns_observations: List[WebObservation], - dns_ground_truth: DNSGroundTruth, - outcome_fingerprint: DNSFingerprintOutcome, -) -> Optional[Outcome]: - for web_o in dns_observations: - outcome_meta = {"ip": web_o.dns_answer or "", **outcome_fingerprint.meta} - if web_o.ip_is_bogon: - outcome_meta["why"] = "answer is bogon" - down_score = 0.1 - blocked_score = 0.9 - # If we saw the same bogon IP inside of the trusted_answers, it means it - # it's always returning a bogon and hence this site is actually down - if web_o.dns_answer in dns_ground_truth.trusted_answers: - outcome_meta["why"] = "answer is bogon as expected" - down_score = 0.9 - blocked_score = 0.1 - - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - subject=get_outcome_subject(web_o), - label=outcome_fingerprint.label, - category="dns", - detail="inconsistent.bogon", - meta=outcome_meta, - ok_score=0.0, - down_score=down_score, - blocked_score=blocked_score, - ) - - -def check_tls_consistency( - dns_observations: List[WebObservation], - dns_ground_truth: DNSGroundTruth, - outcome_fingerprint: DNSFingerprintOutcome, -) -> Optional[Outcome]: - for web_o in dns_observations: - outcome_meta = outcome_fingerprint.meta.copy() - if ( - web_o.tls_is_certificate_valid == True - or web_o.dns_answer in dns_ground_truth.trusted_answers - ): - outcome_meta["why"] = "resolved IP in trusted answers" - # No blocking detected - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=1.0, - down_score=0.0, - blocked_score=0.0, - ) - - -def check_tls_inconsistency( - dns_observations: List[WebObservation], - outcome_fingerprint: DNSFingerprintOutcome, -) -> Optional[Outcome]: - # We do these in two separate loops, so that we first ensure that none of - # the answers we got were good and only then do we proceed to doing a TLS - # inconsistency check. - for web_o in dns_observations: - outcome_meta = outcome_fingerprint.meta.copy() - if web_o.tls_is_certificate_valid == False: - # TODO: we probably need to handle cases here where it might be the case - # that the CERT is bad because it's always serving a bad certificate. - # here is an example: https://explorer.ooni.org/measurement/20220930T235729Z_webconnectivity_AE_5384_n1_BLcO454Y5UULxZoq?input=https://www.government.ae/en%23%2F - outcome_meta["why"] = "tls certificate is bad" - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="inconsistent", - meta=outcome_meta, - ok_score=0.0, - # We give a little bit of weight to the down score, due to no ground - # truthing of TLS - down_score=0.3, - blocked_score=0.7, - ) - - -def check_wc_style_consistency( - dns_observations: List[WebObservation], - dns_ground_truth: DNSGroundTruth, - outcome_fingerprint: DNSFingerprintOutcome, -) -> Optional[Outcome]: - """ - Do a web_connectivity style DNS consistency check. - - If we are in this case, it means we weren't able to determine the - consistency of the DNS query using TLS. This is the case either - because the tested site is not in HTTPS and therefore we didn't - generate a TLS measurement for it or because the target IP isn't - listening on HTTPS (which is quite fishy). - In either case we should flag these with being somewhat likely to be - blocked. - """ - ground_truth_asns = set() - ground_truth_as_org_names = set() - for gt in dns_ground_truth.trusted_answers.values(): - assert gt.ip, f"did not find IP in ground truth {gt.ip}" - ground_truth_asns.add(gt.ip_asn) - ground_truth_as_org_names.add(gt.ip_as_org_name.lower()) - - contains_matching_asn_answer = False - contains_matching_cc_answer = False - system_answers = 0 - for web_o in dns_observations: - outcome_meta = outcome_fingerprint.meta.copy() - if web_o.dns_engine == "system": - system_answers += 1 - - if web_o.dns_answer_asn in ground_truth_asns: - outcome_meta["why"] = "answer in matches AS of trusted answers" - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=0.9, - down_score=0.0, - blocked_score=0.1, - ) - - if ( - web_o.dns_answer_as_org_name - and web_o.dns_answer_as_org_name.lower() in ground_truth_as_org_names - ): - outcome_meta["why"] = "answer in TLS ground truth as org name" - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=0.9, - down_score=0.0, - blocked_score=0.1, - ) - - if web_o.dns_answer in dns_ground_truth.other_ips: - outcome_meta["why"] = "answer in resolved IPs ground truth" - outcome_meta["other_ip_count"] = str( - len(dns_ground_truth.other_ips[web_o.dns_answer]) - ) - blocked_score = confidence_estimate( - len(dns_ground_truth.other_ips[web_o.dns_answer]), - clamping=0.9, - factor=0.8, - ) - ok_score = 1 - blocked_score - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=ok_score, - down_score=0.0, - blocked_score=blocked_score, - ) - - if web_o.dns_answer in dns_ground_truth.other_asns: - # We clamp this to some lower values and scale it by a smaller factor, - # since ASN consistency is less strong than direct IP match. - outcome_meta["why"] = "answer AS in ground truth" - outcome_meta["other_asn_count"] = str( - len(dns_ground_truth.other_asns[web_o.dns_answer]) - ) - blocked_score = confidence_estimate( - len(dns_ground_truth.other_asns[web_o.dns_answer]), - clamping=0.9, - factor=0.8, - ) - ok_score = 1 - blocked_score - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=ok_score, - down_score=0.0, - blocked_score=blocked_score, - ) - - if is_cloud_provider(asn=web_o.ip_asn, as_org_name=web_o.ip_as_org_name): - # Cloud providers are a common source of false positives. Let's just - # mark them as ok with a low confidence - outcome_meta["why"] = "answer is cloud service provider" - return Outcome( - observation_id=web_o.observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject=get_outcome_subject(web_o), - category="dns", - detail="ok", - meta=outcome_meta, - ok_score=0.6, - down_score=0.0, - blocked_score=0.4, - ) - - if web_o.dns_answer_asn == web_o.probe_asn: - contains_matching_asn_answer = True - elif web_o.ip_as_cc == web_o.probe_cc: - contains_matching_cc_answer = True - - outcome_meta = {} - outcome_meta["why"] = "unable to determine consistency through ground truth" - outcome_meta["system_answers"] = str(system_answers) - blocked_score = 0.6 - outcome_detail = "inconsistent" - - # It's quite unlikely that a censor will return multiple answers - if system_answers > 1: - outcome_meta["why"] += ", but multiple system_answers" - blocked_score = 0.4 - outcome_detail = "ok" - - # It's more common to answer to DNS queries for blocking with IPs managed by - # the ISP (ex. to serve their blockpage). - # So we give this a bit higher confidence - if contains_matching_asn_answer and system_answers > 1: - blocked_score = 0.8 - outcome_meta["why"] = "answer matches probe_asn" - outcome_detail = "inconsistent" - - # It's common to do this also in the country, for example when the blockpage - # is centrally managed (ex. case in IT, ID) - elif contains_matching_cc_answer: - blocked_score = 0.7 - outcome_meta["why"] = "answer matches probe_cc" - outcome_detail = "inconsistent" - - # We haven't managed to figured out if the DNS resolution was a good one, so - # we are going to assume it's bad. - # TODO: Currently web_connectivity assumes that if the last request was HTTPS and it was successful, then the whole measurement was OK. - # see: https://github.com/ooni/probe-cli/blob/a0dc65641d7a31e116d9411ecf9e69ed1955e792/internal/engine/experiment/webconnectivity/summary.go#L98 - # This seems a little bit too strong. We probably ought to do this only if - # the redirect chain was a good one, because it will lead to false negatives - # in cases in which the redirect is triggered by the middlebox. - # Imagine a case like this: - # http://example.com/ -> 302 -> https://blockpage.org/ - # - # The certificate for blockpage.org can be valid, but it's not what we - # wanted. - return Outcome( - observation_id=dns_observations[0].observation_id, - scope=outcome_fingerprint.scope, - label=outcome_fingerprint.label, - subject="all@all", - category="dns", - detail=outcome_detail, - meta=outcome_meta, - ok_score=1 - blocked_score, - down_score=0.0, - blocked_score=blocked_score, - ) - - -def compute_dns_consistency_outcomes( - dns_ground_truth: DNSGroundTruth, - dns_observations: List[WebObservation], - outcome_fingerprint: DNSFingerprintOutcome, -) -> List[Outcome]: - outcomes = [] - - for dns_observations in dns_observations_by_resolver(dns_observations).values(): - bogon_outcome = check_dns_bogon( - dns_observations=dns_observations, - dns_ground_truth=dns_ground_truth, - outcome_fingerprint=outcome_fingerprint, - ) - if bogon_outcome: - outcomes.append(bogon_outcome) - continue - - tls_consistency_outcome = check_tls_consistency( - dns_observations=dns_observations, - dns_ground_truth=dns_ground_truth, - outcome_fingerprint=outcome_fingerprint, - ) - if tls_consistency_outcome: - outcomes.append(tls_consistency_outcome) - continue - - wc_style_outcome = check_wc_style_consistency( - dns_observations=dns_observations, - dns_ground_truth=dns_ground_truth, - outcome_fingerprint=outcome_fingerprint, - ) - if wc_style_outcome: - outcomes.append(wc_style_outcome) - continue - - # TODO: If we add a ground truth to this, we could potentially do it - # before the WC style consistency check and it will probably be more - # robust. - tls_inconsistency_outcome = check_tls_inconsistency( - dns_observations=dns_observations, - outcome_fingerprint=outcome_fingerprint, - ) - if tls_inconsistency_outcome: - outcomes.append(tls_inconsistency_outcome) - - return outcomes - - -def make_dns_outcomes( - hostname: str, - dns_observations: List[WebObservation], - web_ground_truths: List[WebGroundTruth], - fingerprintdb: FingerprintDB, -) -> List[Outcome]: - outcomes = [] - dns_ground_truth = make_dns_ground_truth( - ground_truths=filter( - lambda gt: gt.hostname == hostname, - web_ground_truths, - ) - ) - outcome_fingerprint = match_dns_fingerprint( - dns_observations=dns_observations, fingerprintdb=fingerprintdb - ) - outcomes += compute_dns_failure_outcomes( - dns_ground_truth=dns_ground_truth, dns_observations=dns_observations - ) - outcomes += compute_dns_consistency_outcomes( - dns_ground_truth=dns_ground_truth, - dns_observations=dns_observations, - outcome_fingerprint=outcome_fingerprint, - ) - return outcomes - - -def make_tls_outcome( - web_o: WebObservation, web_ground_truths: List[WebGroundTruth] -) -> Outcome: - blocking_subject = web_o.hostname or "" - outcome_meta = {} - if web_o.tls_is_certificate_valid == True: - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail="ok", - meta=outcome_meta, - ok_score=1.0, - down_score=0.0, - blocked_score=0.0, - ) - - ground_truths = filter( - lambda gt: gt.http_request_url and gt.hostname == web_o.hostname, - web_ground_truths, - ) - failure_cc_asn = set() - ok_cc_asn = set() - ok_count = 0 - failure_count = 0 - for gt in ground_truths: - # We don't check for strict == True, since depending on the DB engine - # True could also be represented as 1 - if gt.http_success is None: - continue - - if gt.http_success: - if gt.is_trusted_vp: - ok_count += gt.count - else: - ok_cc_asn.add((gt.vp_cc, gt.vp_asn)) - else: - if gt.is_trusted_vp: - failure_count += gt.count - else: - failure_cc_asn.add((gt.vp_cc, gt.vp_asn, gt.count)) - - # Untrusted Vantage Points (i.e. not control measurements) only count - # once per probe_cc, probe_asn pair to avoid spammy probes poisoning our - # data - failure_count += len(failure_cc_asn) - ok_count += len(ok_cc_asn) - outcome_meta["ok_count"] = str(ok_count) - outcome_meta["failure_count"] = str(failure_count) - - # FIXME: we currently use the HTTP control as a proxy to establish ground truth for TLS - if web_o.tls_is_certificate_valid == False and failure_count == 0: - outcome_meta["why"] = "invalid TLS certificate" - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail="mitm", - meta=outcome_meta, - ok_score=0.0, - down_score=0.2, - blocked_score=0.8, - ) - - elif web_o.tls_failure and failure_count == 0: - # We only consider it to be a TLS level verdict if we haven't seen any - # blocks in TCP - blocking_detail = f"{web_o.tls_failure}" - blocked_score = 0.5 - - if web_o.tls_handshake_read_count == 0 and web_o.tls_handshake_write_count == 1: - # This means we just wrote the TLS ClientHello, let's give it a bit - # more confidence in it being a block - blocked_score = 0.7 - - if web_o.tls_failure in ("connection_closed", "connection_reset"): - blocked_score += 0.15 - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail=blocking_detail, - meta=outcome_meta, - ok_score=0.0, - down_score=1 - blocked_score, - blocked_score=blocked_score, - ) - - elif web_o.tls_failure or web_o.tls_is_certificate_valid == False: - outcome_detail = f"{web_o.tls_failure}" - scores = ok_vs_nok_score( - ok_count=ok_count, nok_count=failure_count, blocking_factor=0.7 - ) - if web_o.tls_is_certificate_valid == False: - outcome_detail = "bad_cert" - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail=outcome_detail, - meta=outcome_meta, - ok_score=0.0, - down_score=scores.down, - blocked_score=scores.blocked, - ) - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category="tls", - detail="ok", - meta=outcome_meta, - ok_score=0.9, - down_score=0.0, - blocked_score=0.1, - ) - - -def make_http_outcome( - web_o: WebObservation, - web_ground_truths: List[WebGroundTruth], - body_db: BodyDB, - fingerprintdb: FingerprintDB, -) -> Outcome: - assert web_o.http_request_url - request_is_encrypted = web_o.http_request_url.startswith("https://") - - blocking_subject = web_o.http_request_url - outcome_label = "" - outcome_meta = {} - outcome_category = "http" - if request_is_encrypted: - outcome_category = "https" - - ground_truths = filter( - lambda gt: gt.http_request_url == web_o.http_request_url, web_ground_truths - ) - failure_cc_asn = set() - ok_cc_asn = set() - ok_count = 0 - failure_count = 0 - response_body_len_count = defaultdict(int) - for gt in ground_truths: - # We don't check for strict == True, since depending on the DB engine - # True could also be represented as 1 - if gt.http_success is None: - continue - - # TODO: figure out why some are negative - if gt.http_response_body_length and gt.http_response_body_length > 0: - response_body_len_count[gt.http_response_body_length] += gt.count - - if gt.http_success: - if gt.is_trusted_vp: - ok_count += gt.count - else: - ok_cc_asn.add((gt.vp_cc, gt.vp_asn)) - else: - if gt.is_trusted_vp: - failure_count += gt.count - else: - failure_cc_asn.add((gt.vp_cc, gt.vp_asn, gt.count)) - - response_body_length = 0 - if len(response_body_len_count) > 0: - response_body_length = max(response_body_len_count.items(), key=lambda x: x[1])[ - 0 - ] - - # Untrusted Vantage Points (i.e. not control measurements) only count - # once per probe_cc, probe_asn pair to avoid spammy probes poisoning our - # data - failure_count += len(failure_cc_asn) - ok_count += len(ok_cc_asn) - outcome_meta["ok_count"] = str(ok_count) - outcome_meta["failure_count"] = str(failure_count) - - if web_o.http_failure: - scores = ok_vs_nok_score(ok_count=ok_count, nok_count=failure_count) - - outcome_detail = f"{web_o.http_failure}" - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category=outcome_category, - detail=outcome_detail, - meta=outcome_meta, - ok_score=scores.ok, - down_score=scores.down, - blocked_score=scores.blocked, - ) - - # TODO: do we care to do something about empty bodies? - # They are commonly a source of blockpages - if web_o.http_response_body_sha1: - matched_fp = body_db.lookup(web_o.http_response_body_sha1) - if len(matched_fp) > 0: - blocked_score = 0.8 - blocking_scope = BlockingScope.UNKNOWN - if request_is_encrypted: - # Likely some form of server-side blocking - blocking_scope = BlockingScope.SERVER_SIDE_BLOCK - blocked_score = 0.5 - - for fp_name in matched_fp: - fp = fingerprintdb.get_fp(fp_name) - if fp.scope: - blocking_scope = fp_to_scope(fp.scope) - outcome_meta["fingerprint"] = fp.name - outcome_meta["why"] = "matched fingerprint" - if ( - fp.expected_countries - and web_o.probe_cc in fp.expected_countries - ): - outcome_label = "blocked" - blocked_score = 1.0 - break - - return Outcome( - observation_id=web_o.observation_id, - scope=blocking_scope, - label=outcome_label, - subject=blocking_subject, - category=outcome_category, - detail="blockpage", - meta=outcome_meta, - ok_score=1 - blocked_score, - down_score=0.0, - blocked_score=blocked_score, - ) - - if not request_is_encrypted: - # TODO: We should probably do mining of the body dumps to figure out if there - # are blockpages in there instead of relying on a per-measurement heuristic - - # TODO: we don't have this - # if web_o.http_response_body_sha1 == http_ctrl.response_body_sha1: - # return ok_be - - if ( - web_o.http_response_body_length - and response_body_length - # We need to ignore redirects as we should only be doing matching of the response body on the last element in the chain - and ( - not web_o.http_response_header_location - and not math.floor(web_o.http_response_status_code or 0 / 100) == 3 - ) - and ( - (web_o.http_response_body_length + 1.0) / (response_body_length + 1.0) - < 0.7 - ) - ): - outcome_meta["response_body_length"] = str(web_o.http_response_body_length) - outcome_meta["ctrl_response_body_length"] = str(response_body_length) - blocking_detail = f"http.body-diff" - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category=outcome_category, - detail=blocking_detail, - meta=outcome_meta, - ok_score=0.3, - down_score=0.0, - blocked_score=0.7, - ) - - return Outcome( - observation_id=web_o.observation_id, - scope=BlockingScope.UNKNOWN, - label="", - subject=blocking_subject, - category=outcome_category, - detail="ok", - meta=outcome_meta, - ok_score=0.8, - down_score=0.0, - blocked_score=0.2, - ) - - -def is_blocked_or_down(o: Optional[Outcome]) -> bool: - if not o: - return False - if o.ok_score > 0.5: - return False - return True - - -def is_ip_blocked(dns_outcomes: List[Outcome], ip: Optional[str]) -> bool: - if not ip: - return False - - for outcome in dns_outcomes: - if outcome.subject.startswith(ip): - return is_blocked_or_down(outcome) - return False - - -def make_website_experiment_result( - web_observations: List[WebObservation], - web_ground_truths: List[WebGroundTruth], - body_db: BodyDB, - fingerprintdb: FingerprintDB, -) -> Generator[ExperimentResult, None, None]: - outcomes: List[Outcome] = [] - domain_name = web_observations[0].hostname - - # We need to process HTTP observations after all the others, because we - # arent' guaranteed to have on the same row all connected observations. - # If we don't do that, we will not exclude from our blocking calculations - # cases in which something has already been counted as blocked through other - # means - http_obs = [] - is_tcp_blocked = False - is_tls_blocked = False - - dns_observations_by_hostname = defaultdict(list) - dns_outcomes_by_hostname = {} - other_observations = [] - for web_o in web_observations: - if web_o.dns_query_type: - assert web_o.hostname is not None - dns_observations_by_hostname[web_o.hostname].append(web_o) - else: - other_observations.append(web_o) - - for hostname, dns_observations in dns_observations_by_hostname.items(): - dns_outcomes = make_dns_outcomes( - hostname=hostname, - dns_observations=dns_observations, - web_ground_truths=web_ground_truths, - fingerprintdb=fingerprintdb, - ) - outcomes += dns_outcomes - dns_outcomes_by_hostname[hostname] = dns_outcomes - - for web_o in web_observations: - # FIXME: for the moment we just ignore all IPv6 results, because they are too noisy - if web_o.ip: - try: - ipaddr = ipaddress.ip_address(web_o.ip) - if isinstance(ipaddr, ipaddress.IPv6Address): - continue - except: - log.error(f"Invalid IP in {web_o.ip}") - - request_is_encrypted = ( - web_o.http_request_url and web_o.http_request_url.startswith("https://") - ) - dns_outcomes = dns_outcomes_by_hostname.get(web_o.hostname, []) - - tcp_outcome = None - if not is_ip_blocked(dns_outcomes, web_o.ip) and web_o.tcp_success is not None: - tcp_outcome = make_tcp_outcome( - web_o=web_o, web_ground_truths=web_ground_truths - ) - if is_blocked_or_down(tcp_outcome): - is_tcp_blocked = True - outcomes.append(tcp_outcome) - - tls_outcome = None - # We ignore things that are already blocked by DNS or TCP - if ( - not is_ip_blocked(dns_outcomes, web_o.ip) - and not is_blocked_or_down(tcp_outcome) - and (web_o.tls_failure or web_o.tls_cipher_suite is not None) - ): - tls_outcome = make_tls_outcome( - web_o=web_o, web_ground_truths=web_ground_truths - ) - outcomes.append(tls_outcome) - if is_blocked_or_down(tls_outcome): - is_tls_blocked = True - - # When we don't know the IP of the http_request, we add them to a - # separate http_obs list. - # This is done so we can ignore the HTTP outcome if ANY of the DNS - # outcomes are an indication of blocking, since we can't do a - # consistency check on a specific DNS answer. - if web_o.http_request_url and not web_o.ip: - http_obs.append(web_o) - continue - - # For HTTP requests we ignore cases in which we detected the blocking - # already to be happening via DNS or TCP. - if ( - web_o.http_request_url - and ( - not is_ip_blocked(dns_outcomes, web_o.ip) - and not is_blocked_or_down(tcp_outcome) - # For HTTPS requests we ignore cases in which we detected the blocking via DNS, TCP or TLS - ) - and ( - request_is_encrypted == False - or (request_is_encrypted and not is_blocked_or_down(tls_outcome)) - ) - ): - http_outcome = make_http_outcome( - web_o=web_o, - web_ground_truths=web_ground_truths, - body_db=body_db, - fingerprintdb=fingerprintdb, - ) - outcomes.append(http_outcome) - - # Here we examine all of the HTTP Observations that didn't record an IP address - for web_o in http_obs: - is_dns_blocked = any( - [ - is_blocked_or_down(o) - for o in dns_outcomes_by_hostname.get(web_o.hostname, []) - ] - ) - request_is_encrypted = ( - web_o.http_request_url and web_o.http_request_url.startswith("https://") - ) - if ( - web_o.http_request_url - and ( - not is_dns_blocked - and not is_tcp_blocked - # For HTTPS requests we ignore cases in which we detected the blocking via DNS, TCP or TLS - ) - and ( - request_is_encrypted == False - or (request_is_encrypted and not is_tls_blocked) - ) - ): - http_outcome = make_http_outcome( - web_o=web_o, - web_ground_truths=web_ground_truths, - body_db=body_db, - fingerprintdb=fingerprintdb, - ) - outcomes.append(http_outcome) - - max_blocked_score = min(map(lambda o: o.blocked_score, outcomes)) - # TODO: we should probably be computing the anomaly and confirmed summary - # flags directly as part of aggregation. - confirmed = False - for o in outcomes: - if o.label == "blocked": - confirmed = True - anomaly = False - if max_blocked_score > 0.5: - anomaly = True - - return iter_experiment_results( - obs=web_observations[0], - experiment_group="websites", - domain_name=domain_name or "", - target_name=domain_name or "", - anomaly=anomaly, - confirmed=confirmed, - outcomes=outcomes, - ) diff --git a/oonidata/cli/command.py b/oonidata/cli/command.py index 9348bf1a..60b664d2 100644 --- a/oonidata/cli/command.py +++ b/oonidata/cli/command.py @@ -7,6 +7,7 @@ from typing import List, Optional import click +from click_loglevel import LogLevel from oonidata import __version__ from oonidata.dataclient import ( @@ -16,7 +17,6 @@ from oonidata.db.create_tables import create_queries, list_all_table_diffs from oonidata.netinfo import NetinfoDB from oonidata.workers import ( - start_experiment_result_maker, start_fingerprint_hunter, start_observation_maker, start_ground_truth_builder, @@ -71,10 +71,18 @@ def _parse_csv(ctx, param, s: Optional[str]) -> List[str]: @click.group() @click.option("--error-log-file", type=Path) +@click.option( + "-l", + "--log-level", + type=LogLevel(), + default="INFO", + help="Set logging level", + show_default=True, +) @click.version_option(__version__) -def cli(error_log_file: Path): +def cli(error_log_file: Path, log_level: int): log.addHandler(logging.StreamHandler(sys.stderr)) - log.setLevel(logging.INFO) + log.setLevel(log_level) if error_log_file: logging.basicConfig( filename=error_log_file, encoding="utf-8", level=logging.ERROR @@ -244,18 +252,7 @@ def mker( for query, table_name in create_queries: click.echo(f"Running create query for {table_name}") db.execute(query) - - start_experiment_result_maker( - probe_cc=probe_cc, - test_name=test_name, - start_day=start_day, - end_day=end_day, - clickhouse=clickhouse, - data_dir=data_dir, - parallelism=parallelism, - fast_fail=fast_fail, - rebuild_ground_truths=rebuild_ground_truths, - ) + raise Exception("Run this via the analysis command") @cli.command() @@ -437,6 +434,7 @@ def checkdb( Check if the database tables require migrations. If the create-tables flag is not specified, it will not perform any operations. """ + if create_tables: if not clickhouse: click.echo("--clickhouse needs to be specified when creating tables") diff --git a/oonidata/dataviz/templates/analysis.html b/oonidata/dataviz/templates/analysis.html new file mode 100644 index 00000000..ef306635 --- /dev/null +++ b/oonidata/dataviz/templates/analysis.html @@ -0,0 +1,250 @@ + + + + + + + + + + + +
+
+
+
+

OONI Data Kraken Analysis

+

When data krakens and LoNIs join forces!

+
+
+
+ +
+ +

Experiment result output

+ +
+
+

{{website_experiment_result["measurement_uid"]}}

+ open in explorer +
+ Time Of Day {{website_experiment_result["timeofday"]}} +
+ +

Target

+
+
+ Target nettest_group {{website_experiment_result["target_nettest_group"]}} +
+
+ Target category {{website_experiment_result["target_category"]}} +
+
+ Target name {{website_experiment_result["target_name"]}} +
+
+ Target domain_name {{website_experiment_result["target_domain_name"]}} +
+
+ Target detail {{website_experiment_result["target_detail"]}} +
+
+ +

Location

+
+ +
+ ASN {{website_experiment_result["location_network_asn"]}} ({{website_experiment_result["location_network_as_org_name"]}}) +
+
+ Network type {{website_experiment_result["location_network_type"]}} +
+
+ Country {{website_experiment_result["location_network_cc"]}} +
+
+ Resolver ASN {{website_experiment_result["location_resolver_asn"]}} ({{website_experiment_result["location_resolver_as_org_name"]}}) +
+
+ Blocking scope {{website_experiment_result["location_blocking_scope"]}} +
+
+ +

Experiment result

+
+
+ anomaly{{website_experiment_result["anomaly"]}} +
+
+ confirmed{{website_experiment_result["confirmed"]}} +
+
+ + +
+
+
OK
+
{{loni_ok_value}}
+
+
+
blocked
+
{{loni_blocked_value}}
+
{{loni_blocked_dict}}
+
+
+
down
+
{{loni_down_value}}
+
{{loni_down_dict}}
+
+
+ +
+
+
{{website_experiment_result["measurement_count"]}}
+
measurement count
+
+
+
{{website_experiment_result["observation_count"]}}
+
observation count
+
+
+
{{website_experiment_result["vp_count"]}}
+
vantage point count
+
+
+ +
+
+ + +

Individual LoNIs

+ +{% for loni in loni_list %} +
+
+

Loni #{{loop.index}}

+
ok_final: {{loni["ok_final"]}}
+
+ + + + + + + + + + {% for status, value in loni["blocked"].items() %} + + + + + + {% endfor %} + {% for status, value in loni["down"].items() %} + + + + + + {% endfor %} + {% for status, value in loni["ok"].items() %} + + + + + + {% endfor %} + +
OutcomeSpaceOutcomeStatusLikelyhood
blocked{{status}}{{value}}
down{{status}}{{value}}
ok{{status}}{{value}}
+
+

Analysis transcript

+
+            {% for analysis_line in analysis_transcript_list[loop.index0] %}
+{{ analysis_line }}
+            {% endfor %}
+            
+
+
+{% endfor %} + + +

Analysis output

+ +{% for wa in web_analysis %} +

Analysis #{{loop.index}}

+
+ + + + + + + + + {% for key, value in wa.items() %} + + + + + {% endfor %} + +
keyvalue
{{key}}{{value}}
+
+{% endfor %} + +

Web Observations

+{% for wo in web_observations %} +

Observation #{{loop.index}}

+
+ + + + + + + + + {% for key, value in wo.items() %} + + + + + {% endfor %} + +
keyvalue
{{key}}{{value}}
+
+{% endfor %} + +
+ + + + + + + + + {% for key, value in website_experiment_result.items() %} + + + + + {% endfor %} + +
keyvalue
{{key}}{{value}}
+
+ + + +
+ + + diff --git a/oonidata/dataviz/web.py b/oonidata/dataviz/web.py index 43087550..88cd66a0 100644 --- a/oonidata/dataviz/web.py +++ b/oonidata/dataviz/web.py @@ -1,3 +1,15 @@ +from dataclasses import asdict +import json +from pathlib import Path +from oonidata.analysis.control import ( + BodyDB, + WebGroundTruthDB, + iter_ground_truths_from_web_control, +) +from oonidata.analysis.datasources import load_measurement +from oonidata.analysis.web_analysis import make_web_analysis +from oonidata.analysis.website_experiment_results import make_website_experiment_results +from oonidata.apiclient import get_measurement_dict_by_uid from oonidata.dataviz.viz import ( plot_blocking_world_map, plot_blocking_of_domain_in_asn, @@ -11,10 +23,87 @@ get_df_dns_analysis_raw, ) from flask import Flask, request, render_template +from oonidata.fingerprintdb import FingerprintDB +from oonidata.netinfo import NetinfoDB + +from oonidata.transforms import measurement_to_observations app = Flask(__name__) +def to_pretty_json(value): + return json.dumps( + value, sort_keys=True, indent=4, separators=(",", ": "), default=str + ) + + +app.jinja_env.filters["tojson_pretty"] = to_pretty_json + + +@app.route("/analysis/m/") +def analysis_by_msmt(measurement_uid): + data_dir = Path("tests/data/datadir/") + + fingerprintdb = FingerprintDB(datadir=data_dir, download=False) + netinfodb = NetinfoDB(datadir=data_dir, download=False) + raw_msmt = get_measurement_dict_by_uid(measurement_uid) + msmt = load_measurement(msmt=raw_msmt) + web_observations, web_control_observations = measurement_to_observations( + msmt, netinfodb=netinfodb + ) + web_ground_truth_db = WebGroundTruthDB() + web_ground_truth_db.build_from_rows( + rows=iter_ground_truths_from_web_control( + web_control_observations=web_control_observations, + netinfodb=netinfodb, + ), + ) + + web_ground_truths = web_ground_truth_db.lookup_by_web_obs(web_obs=web_observations) + web_analysis = list( + make_web_analysis( + web_observations=web_observations, + web_ground_truths=web_ground_truths, + body_db=BodyDB(db=None), # type: ignore + fingerprintdb=fingerprintdb, + ) + ) + + # assert len(web_analysis) == len( + # web_observations + # ), f"web_analysis != web_obs {len(web_analysis)} != {len(web_observations)}" + # for wa in web_analysis: + # print_nice_vertical(wa) + + website_er = list(make_website_experiment_results(web_analysis)) + assert len(website_er) == 1 + + wer = website_er[0] + analysis_transcript_list = wer.analysis_transcript_list + + # wer.analysis_transcript_list = None + # print_nice_vertical(wer) + # for loni in loni_list: + # pprint(loni.to_dict()) + # print(analysis_transcript_list) + + return render_template( + "analysis.html", + website_experiment_result=asdict(wer), + analysis_transcript_list=analysis_transcript_list, + loni_list=wer.loni_list, + raw_msmt=raw_msmt, + measurement_uid=measurement_uid, + web_analysis=list(map(lambda x: asdict(x), web_analysis)), + web_observations=list(map(lambda x: asdict(x), web_observations)), + loni_blocked_dict=dict(zip(wer.loni_blocked_keys, wer.loni_blocked_values)), + loni_blocked_value=sum(wer.loni_blocked_values), + loni_down_dict=dict(zip(wer.loni_down_keys, wer.loni_down_values)), + loni_down_value=sum(wer.loni_down_values), + loni_ok_value=wer.loni_ok_value, + ) + + @app.route("/api/_/viz/data/world_map") def data_world_map(): blocking_threshold = float(request.args.get("blocking_threshold", 0.7)) diff --git a/oonidata/db/connections.py b/oonidata/db/connections.py index 0e4daeaf..43f29eec 100644 --- a/oonidata/db/connections.py +++ b/oonidata/db/connections.py @@ -3,7 +3,7 @@ import time from collections import defaultdict, namedtuple -from datetime import datetime +from datetime import datetime, timezone from pprint import pformat import logging @@ -114,7 +114,7 @@ def __init__(self, output_dir): def write_rows(self, table_name, rows, column_names): if table_name not in self.open_writers: - ts = datetime.utcnow().strftime("%Y%m%dT%H%M%S") + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") fh = (self.output_dir / f"{table_name}-{ts}.csv").open("w") csv_writer = csv.writer(fh) csv_writer.writerow(column_names) diff --git a/oonidata/db/create_tables.py b/oonidata/db/create_tables.py index 4c967427..8485b6a1 100644 --- a/oonidata/db/create_tables.py +++ b/oonidata/db/create_tables.py @@ -6,6 +6,7 @@ from oonidata.db.connections import ClickhouseConnection from oonidata.models.experiment_result import ( ExperimentResult, + MeasurementExperimentResult, ) from oonidata.models.analysis import WebAnalysis from oonidata.models.observations import ( @@ -61,9 +62,19 @@ def typing_to_clickhouse(t: Any) -> str: if t == dict: return "String" + # Casting it to JSON + if t == List[Dict]: + return "String" + if t == Optional[float]: return "Nullable(Float64)" + if t == List[float]: + return "Array(Float64)" + + if t == List[List[str]]: + return "Array(Array(String))" + if t in (List[str], List[bytes]): return "Array(String)" @@ -104,29 +115,6 @@ def create_query_for_observation(obs_class: Type[ObservationBase]) -> Tuple[str, ) -def create_query_for_experiment_result() -> Tuple[str, str]: - columns = [] - for f in ExperimentResult._fields: - t = ExperimentResult.__annotations__.get(f) - type_str = typing_to_clickhouse(t) - columns.append(f" {f} {type_str}") - - columns_str = ",\n".join(columns) - table_name = ExperimentResult.__table_name__ - - return ( - f""" - CREATE TABLE IF NOT EXISTS {table_name} ( -{columns_str} - ) - ENGINE = ReplacingMergeTree - ORDER BY (measurement_uid, experiment_result_id) - SETTINGS index_granularity = 8192; - """, - "experiment_result", - ) - - def create_query_for_analysis(base_class) -> Tuple[str, str]: columns = [] for f in fields(base_class): @@ -169,8 +157,8 @@ def create_query_for_analysis(base_class) -> Tuple[str, str]: create_query_for_observation(WebObservation), create_query_for_observation(WebControlObservation), create_query_for_observation(HTTPMiddleboxObservation), - create_query_for_experiment_result(), create_query_for_analysis(WebAnalysis), + create_query_for_analysis(MeasurementExperimentResult), CREATE_QUERY_FOR_LOGS, ] diff --git a/oonidata/models/analysis.py b/oonidata/models/analysis.py index 7cedd4be..382e78f8 100644 --- a/oonidata/models/analysis.py +++ b/oonidata/models/analysis.py @@ -67,6 +67,7 @@ class WebAnalysis: dns_consistency_system_is_answer_probe_cc_match: Optional[bool] = None dns_consistency_system_is_answer_bogon: Optional[bool] = None dns_consistency_system_answer_fp_name: Optional[str] = None + dns_consistency_system_answer_fp_scope: Optional[str] = None dns_consistency_system_is_answer_fp_match: Optional[bool] = None dns_consistency_system_is_answer_fp_country_consistent: Optional[bool] = None dns_consistency_system_is_answer_fp_false_positive: Optional[bool] = None @@ -88,6 +89,7 @@ class WebAnalysis: dns_consistency_other_is_answer_probe_cc_match: Optional[bool] = None dns_consistency_other_is_answer_bogon: Optional[bool] = None dns_consistency_other_answer_fp_name: Optional[str] = None + dns_consistency_other_answer_fp_scope: Optional[str] = None dns_consistency_other_is_answer_fp_match: Optional[bool] = None dns_consistency_other_is_answer_fp_country_consistent: Optional[bool] = None dns_consistency_other_is_answer_fp_false_positive: Optional[bool] = None @@ -112,6 +114,7 @@ class WebAnalysis: tls_ground_truth_trusted_ok_count: Optional[int] = None tcp_address: Optional[str] = None tcp_success: Optional[bool] = None + tcp_failure: Optional[str] = None tcp_ground_truth_failure_count: Optional[int] = None tcp_ground_truth_failure_asn_cc_count: Optional[int] = None tcp_ground_truth_ok_count: Optional[int] = None @@ -132,6 +135,7 @@ class WebAnalysis: http_ground_truth_trusted_failure_count: Optional[int] = None http_ground_truth_body_length: Optional[int] = None http_fp_name: Optional[str] = None + http_fp_scope: Optional[str] = None http_is_http_fp_match: Optional[bool] = None http_is_http_fp_country_consistent: Optional[bool] = None http_is_http_fp_false_positive: Optional[bool] = None diff --git a/oonidata/models/experiment_result.py b/oonidata/models/experiment_result.py index 7fc9395f..939fe7b2 100644 --- a/oonidata/models/experiment_result.py +++ b/oonidata/models/experiment_result.py @@ -1,8 +1,9 @@ import dataclasses +from dataclasses import dataclass import logging from typing import Any, Dict, Generator, List, Optional, NamedTuple, Mapping, Tuple from enum import Enum -from datetime import datetime +from datetime import datetime, timezone from tabulate import tabulate from oonidata.datautils import maybe_elipse @@ -68,16 +69,19 @@ class Outcome(NamedTuple): blocked_score: float -class ExperimentResultInstant(NamedTuple): - __table_name__ = "experiment_result_instant" - - experiment_result_id: str +@dataclass +class MeasurementExperimentResult: + __table_name__ = "measurement_experiment_result" + __table_index__ = ( + "measurement_uid", + "timeofday", + ) # The measurement used to generate this experiment result measurement_uid: str # The list of observations used to generate this experiment result - observation_id: List[str] + observation_id_list: List[str] # The timeofday for which this experiment result is relevant. We use the # timeofday convention to differentiate it from the timestamp which is an @@ -161,9 +165,15 @@ class ExperimentResultInstant(NamedTuple): loni_blocked_keys: List[str] loni_blocked_values: List[float] + loni_ok_keys: List[str] + loni_ok_values: List[float] + + # Encoded as JSON + loni_list: List[Dict] + # Inside this string we include a representation of the logic that lead us # to produce the above loni values - analysis_transcript: Optional[str] + analysis_transcript_list: List[List[str]] # Number of measurements used to produce this experiment result measurement_count: int @@ -173,8 +183,8 @@ class ExperimentResultInstant(NamedTuple): vp_count: int # Backward compatible anomaly/confirmed flags - anomaly: bool - confirmed: bool + anomaly: Optional[bool] + confirmed: Optional[bool] class ExperimentResult(NamedTuple): @@ -267,7 +277,7 @@ def iter_experiment_results( target_name: str, outcomes: List[Outcome], ) -> Generator[ExperimentResult, None, None]: - created_at = datetime.utcnow() + created_at = datetime.now(timezone.utc).replace(tzinfo=None) for idx, outcome in enumerate(outcomes): yield ExperimentResult( measurement_uid=obs.measurement_uid, diff --git a/oonidata/transforms/nettests/http_header_field_manipulation.py b/oonidata/transforms/nettests/http_header_field_manipulation.py index 5e1774b3..f447f50a 100644 --- a/oonidata/transforms/nettests/http_header_field_manipulation.py +++ b/oonidata/transforms/nettests/http_header_field_manipulation.py @@ -1,5 +1,5 @@ import dataclasses -from datetime import datetime +from datetime import datetime, timezone import orjson from typing import List, Tuple from oonidata.models.nettests import HTTPHeaderFieldManipulation @@ -14,7 +14,7 @@ def make_observations( mb_obs = HTTPMiddleboxObservation( hfm_success=True, observation_id=f"{msmt.measurement_uid}_0", - created_at=datetime.utcnow().replace(microsecond=0), + created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None), **dataclasses.asdict(self.measurement_meta), ) diff --git a/oonidata/transforms/nettests/http_invalid_request_line.py b/oonidata/transforms/nettests/http_invalid_request_line.py index 597d0863..b789a2e5 100644 --- a/oonidata/transforms/nettests/http_invalid_request_line.py +++ b/oonidata/transforms/nettests/http_invalid_request_line.py @@ -1,5 +1,5 @@ import dataclasses -from datetime import datetime +from datetime import datetime, timezone from typing import List, Tuple from oonidata.models.dataformats import maybe_binary_data_to_bytes from oonidata.models.nettests import HTTPInvalidRequestLine @@ -54,7 +54,7 @@ def make_observations( mb_obs = HTTPMiddleboxObservation( hirl_success=True, observation_id=f"{msmt.measurement_uid}_0", - created_at=datetime.utcnow().replace(microsecond=0), + created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None), **dataclasses.asdict(self.measurement_meta), ) if not msmt.test_keys.sent: diff --git a/oonidata/transforms/nettests/measurement_transformer.py b/oonidata/transforms/nettests/measurement_transformer.py index 8973225b..a282e9a0 100644 --- a/oonidata/transforms/nettests/measurement_transformer.py +++ b/oonidata/transforms/nettests/measurement_transformer.py @@ -5,7 +5,7 @@ import dataclasses import logging from urllib.parse import urlparse, urlsplit -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import ( Callable, Optional, @@ -134,11 +134,14 @@ ) -def normalize_failure(failure: Failure): +def normalize_failure(failure: Union[Failure, bool]) -> Failure: if not failure: # This will set it to None even when it's false return None + if failure is True: + return "true" + if failure.startswith("unknown_failure"): for substring, new_failure in unknown_failure_map: if substring in failure: @@ -445,7 +448,7 @@ def maybe_set_web_fields( ], web_obs: WebObservation, prefix: str, - field_names: Tuple[str], + field_names: Tuple[str, ...], ): # TODO: the fact we have to do this is an artifact of the original # one-observation per type model. Once we decide we don't want to go for @@ -899,7 +902,7 @@ def consume_web_observations( for idx, obs in enumerate(web_obs_list): obs.observation_id = f"{obs.measurement_uid}_{idx}" - obs.created_at = datetime.utcnow().replace(microsecond=0) + obs.created_at = datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None) return web_obs_list diff --git a/oonidata/transforms/nettests/web_connectivity.py b/oonidata/transforms/nettests/web_connectivity.py index 9b9a856e..be8ac276 100644 --- a/oonidata/transforms/nettests/web_connectivity.py +++ b/oonidata/transforms/nettests/web_connectivity.py @@ -1,5 +1,5 @@ from copy import deepcopy -from datetime import datetime +from datetime import datetime, timezone from typing import Dict, List, Tuple from urllib.parse import urlparse from oonidata.datautils import is_ip_bogon @@ -41,7 +41,7 @@ def make_web_control_observations( test_name=msmt.test_name, test_version=msmt.test_version, hostname=hostname, - created_at=datetime.utcnow(), + created_at=datetime.now(timezone.utc).replace(tzinfo=None), ) # Reference for new-style web_connectivity: # https://explorer.ooni.org/measurement/20220924T215758Z_webconnectivity_IR_206065_n1_2CRoWBNJkWc7VyAs?input=https%3A%2F%2Fdoh.dns.apple.com%2Fdns-query%3Fdns%3Dq80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB diff --git a/oonidata/workers/__init__.py b/oonidata/workers/__init__.py index 2a0a3137..f22d4557 100644 --- a/oonidata/workers/__init__.py +++ b/oonidata/workers/__init__.py @@ -1,4 +1,3 @@ -from .experiment_results import start_experiment_result_maker from .fingerprint_hunter import start_fingerprint_hunter from .observations import start_observation_maker from .ground_truths import start_ground_truth_builder diff --git a/oonidata/workers/analysis.py b/oonidata/workers/analysis.py index dad0ab24..ffc3a953 100644 --- a/oonidata/workers/analysis.py +++ b/oonidata/workers/analysis.py @@ -3,6 +3,7 @@ import pathlib from datetime import date, datetime from typing import Dict, List +import orjson import statsd @@ -14,11 +15,13 @@ from oonidata.analysis.control import BodyDB, WebGroundTruthDB from oonidata.analysis.datasources import iter_web_observations from oonidata.analysis.web_analysis import make_web_analysis +from oonidata.analysis.website_experiment_results import make_website_experiment_results from oonidata.dataclient import date_interval from oonidata.datautils import PerfTimer from oonidata.db.connections import ClickhouseConnection from oonidata.fingerprintdb import FingerprintDB from oonidata.models.analysis import WebAnalysis +from oonidata.models.experiment_result import MeasurementExperimentResult from oonidata.netinfo import NetinfoDB from oonidata.workers.ground_truths import maybe_build_web_ground_truth @@ -27,6 +30,7 @@ get_prev_range, make_db_rows, maybe_delete_prev_range, + optimize_all_tables, ) log = logging.getLogger("oonidata.processing") @@ -58,22 +62,37 @@ def make_analysis_in_a_day( fast_fail: bool, day: date, ): + log.info("Optimizing all tables") + optimize_all_tables(clickhouse) + statsd_client = statsd.StatsClient("localhost", 8125) fingerprintdb = FingerprintDB(datadir=data_dir, download=False) body_db = BodyDB(db=ClickhouseConnection(clickhouse)) db_writer = ClickhouseConnection(clickhouse, row_buffer_size=10_000) db_lookup = ClickhouseConnection(clickhouse) - column_names = [f.name for f in dataclasses.fields(WebAnalysis)] + column_names_wa = [f.name for f in dataclasses.fields(WebAnalysis)] + column_names_er = [f.name for f in dataclasses.fields(MeasurementExperimentResult)] - prev_range = get_prev_range( - db=db_lookup, - table_name=WebAnalysis.__table_name__, - timestamp=datetime.combine(day, datetime.min.time()), - test_name=[], - probe_cc=probe_cc, - timestamp_column="measurement_start_time", - ) + prev_range_list = [ + get_prev_range( + db=db_lookup, + table_name=WebAnalysis.__table_name__, + timestamp=datetime.combine(day, datetime.min.time()), + test_name=[], + probe_cc=probe_cc, + timestamp_column="measurement_start_time", + ), + get_prev_range( + db=db_lookup, + table_name=MeasurementExperimentResult.__table_name__, + timestamp=datetime.combine(day, datetime.min.time()), + test_name=[], + probe_cc=probe_cc, + timestamp_column="timeofday", + probe_cc_column="location_network_cc", + ), + ] log.info(f"loading ground truth DB for {day}") t = PerfTimer() @@ -110,12 +129,13 @@ def make_analysis_in_a_day( fingerprintdb=fingerprintdb, ) ) + log.info(f"generated {len(website_analysis)} website_analysis") if len(website_analysis) == 0: log.info(f"no website analysis for {probe_cc}, {test_name}") continue idx += 1 table_name, rows = make_db_rows( - dc_list=website_analysis, column_names=column_names + dc_list=website_analysis, column_names=column_names_wa ) statsd_client.incr("oonidata.web_analysis.analysis.obs", 1, rate=0.1) # type: ignore statsd_client.gauge("oonidata.web_analysis.analysis.obs_idx", idx, rate=0.1) # type: ignore @@ -125,15 +145,31 @@ def make_analysis_in_a_day( db_writer.write_rows( table_name=table_name, rows=rows, - column_names=column_names, + column_names=column_names_wa, + ) + + with statsd_client.timer("oonidata.web_analysis.experiment_results.timing"): + website_er = list(make_website_experiment_results(website_analysis)) + log.info(f"generated {len(website_er)} website_er") + table_name, rows = make_db_rows( + dc_list=website_er, + column_names=column_names_er, + custom_remap={"loni_list": orjson.dumps}, ) + + db_writer.write_rows( + table_name=table_name, + rows=rows, + column_names=column_names_er, + ) + except: web_obs_ids = ",".join(map(lambda wo: wo.observation_id, web_obs)) log.error(f"failed to generate analysis for {web_obs_ids}", exc_info=True) - maybe_delete_prev_range( - db=db_lookup, prev_range=prev_range, table_name=WebAnalysis.__table_name__ - ) + for prev_range in prev_range_list: + maybe_delete_prev_range(db=db_lookup, prev_range=prev_range) + db_writer.close() return idx @@ -175,11 +211,12 @@ def make_cc_batches( cc_batches = [] current_cc_batch_size = 0 current_cc_batch = [] - while cnt_by_cc: + cnt_by_cc_sorted = sorted(cnt_by_cc.items(), key=lambda x: x[0]) + while cnt_by_cc_sorted: while current_cc_batch_size <= max_obs_per_batch: try: - cc, cnt = cnt_by_cc.popitem() - except KeyError: + cc, cnt = cnt_by_cc_sorted.pop() + except IndexError: break current_cc_batch.append(cc) current_cc_batch_size += cnt @@ -261,3 +298,4 @@ def start_analysis( obs_per_sec = round(total_obs_count / t_total.s) log.info(f"finished processing {start_day} - {end_day} speed: {obs_per_sec}obs/s)") log.info(f"{total_obs_count} msmts in {t_total.pretty}") + dask_client.shutdown() diff --git a/oonidata/workers/common.py b/oonidata/workers/common.py index 7a5346f8..76d6671d 100644 --- a/oonidata/workers/common.py +++ b/oonidata/workers/common.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass import queue import logging import multiprocessing as mp @@ -6,6 +7,8 @@ from datetime import date, datetime, timedelta from typing import ( + Any, + Callable, Dict, List, NamedTuple, @@ -16,22 +19,83 @@ from tqdm import tqdm from oonidata.dataclient import ( MeasurementListProgress, - ProgressStatus, ) from oonidata.db.connections import ( ClickhouseConnection, ) +from oonidata.db.create_tables import create_queries log = logging.getLogger("oonidata.processing") -class PrevRange(NamedTuple): +@dataclass +class BatchParameters: + test_name: List[str] + probe_cc: List[str] bucket_date: Optional[str] - start_timestamp: Optional[datetime] - end_timestamp: Optional[datetime] - max_created_at: Optional[datetime] - min_created_at: Optional[datetime] - where: str + timestamp: Optional[datetime] + + +@dataclass +class PrevRange: + table_name: str + batch_parameters: BatchParameters + timestamp_column: Optional[str] + probe_cc_column: Optional[str] + max_created_at: Optional[datetime] = None + min_created_at: Optional[datetime] = None + + def format_query(self): + start_timestamp = None + end_timestamp = None + where = None + where = "WHERE " + q_args: Dict[str, Any] = {} + + if self.batch_parameters.bucket_date: + where = "WHERE bucket_date = %(bucket_date)s" + q_args["bucket_date"] = self.batch_parameters.bucket_date + + elif self.batch_parameters.timestamp: + start_timestamp = self.batch_parameters.timestamp + end_timestamp = start_timestamp + timedelta(days=1) + q_args["start_timestamp"] = start_timestamp + q_args["end_timestamp"] = end_timestamp + where += f"{self.timestamp_column} >= %(start_timestamp)s AND {self.timestamp_column} < %(end_timestamp)s" + else: + raise Exception("Must specify either bucket_date or timestamp") + + if len(self.batch_parameters.test_name) > 0: + where += " AND test_name IN %(test_names)s" + q_args["test_names"] = self.batch_parameters.test_name + if len(self.batch_parameters.probe_cc) > 0: + where += f" AND {self.probe_cc_column} IN %(probe_ccs)s" + q_args["probe_ccs"] = self.batch_parameters.probe_cc + + return where, q_args + + +def maybe_delete_prev_range(db: ClickhouseConnection, prev_range: PrevRange): + """ + We perform a lightweight delete of all the rows which have been + regenerated, so we don't have any duplicates in the table + """ + if not prev_range.max_created_at or not prev_range.min_created_at: + return + + # Disabled due to: https://github.com/ClickHouse/ClickHouse/issues/40651 + # db.execute("SET allow_experimental_lightweight_delete = true;") + + where, q_args = prev_range.format_query() + + q_args["max_created_at"] = prev_range.max_created_at + q_args["min_created_at"] = prev_range.min_created_at + where = f"{where} AND created_at <= %(max_created_at)s AND created_at >= %(min_created_at)s" + log.info(f"runing {where} with {q_args}") + + q = f"ALTER TABLE {prev_range.table_name} DELETE " + final_query = q + where + return db.execute(final_query, q_args) def get_prev_range( @@ -42,6 +106,7 @@ def get_prev_range( bucket_date: Optional[str] = None, timestamp: Optional[datetime] = None, timestamp_column: str = "timestamp", + probe_cc_column: str = "probe_cc", ) -> PrevRange: """ We lookup the range of previously generated rows so we can drop @@ -65,54 +130,47 @@ def get_prev_range( bucket as reprocessing in progress and guard against running queries for it. """ - q = f"SELECT MAX(created_at), MIN(created_at) FROM {table_name} " + # A batch specified by test_name, probe_cc and one of either bucket_date or + # timestamp depending on it being observations or experiment results. assert ( timestamp or bucket_date ), "either timestamp or bucket_date should be provided" - start_timestamp = None - end_timestamp = None - where = None - where = "WHERE bucket_date = %(bucket_date)s" - q_args = {"bucket_date": bucket_date} - if timestamp: - start_timestamp = timestamp - end_timestamp = timestamp + timedelta(days=1) - q_args = {"start_timestamp": start_timestamp, "end_timestamp": end_timestamp} - where = f"WHERE {timestamp_column} >= %(start_timestamp)s AND {timestamp_column} < %(end_timestamp)s" - - if len(test_name) > 0: - test_name_list = [] - for tn in test_name: - # sanitize the test_names. It should not be a security issue since - # it's not user provided, but better safe than sorry - assert tn.replace("_", "").isalnum(), f"not alphabetic testname {tn}" - test_name_list.append(f"'{tn}'") - where += " AND test_name IN ({})".format(",".join(test_name_list)) - if len(probe_cc) > 0: - probe_cc_list = [] - for cc in probe_cc: - assert cc.replace("_", "").isalnum(), f"not alphabetic probe_cc" - probe_cc_list.append(f"'{cc}'") - where += " AND probe_cc IN ({})".format(",".join(probe_cc_list)) - - prev_obs_range = db.execute(q + where, q_args) + prev_range = PrevRange( + table_name=table_name, + batch_parameters=BatchParameters( + test_name=test_name, + probe_cc=probe_cc, + timestamp=timestamp, + bucket_date=bucket_date, + ), + timestamp_column=timestamp_column, + probe_cc_column=probe_cc_column, + ) + + q = f"SELECT MAX(created_at), MIN(created_at) FROM {prev_range.table_name} " + where, q_args = prev_range.format_query() + final_query = q + where + prev_obs_range = db.execute(final_query, q_args) assert isinstance(prev_obs_range, list) and len(prev_obs_range) == 1 max_created_at, min_created_at = prev_obs_range[0] # We pad it by 1 second to take into account the time resolution downgrade # happening when going from clickhouse to python data types if max_created_at and min_created_at: - max_created_at += timedelta(seconds=1) - min_created_at -= timedelta(seconds=1) - - return PrevRange( - max_created_at=max_created_at, - min_created_at=min_created_at, - start_timestamp=start_timestamp, - end_timestamp=end_timestamp, - where=where, - bucket_date=bucket_date, - ) + prev_range.max_created_at = (max_created_at + timedelta(seconds=1)).replace( + tzinfo=None + ) + prev_range.min_created_at = (min_created_at - timedelta(seconds=1)).replace( + tzinfo=None + ) + + return prev_range + + +def optimize_all_tables(clickhouse): + with ClickhouseConnection(clickhouse) as db: + for _, table_name in create_queries: + db.execute(f"OPTIMIZE TABLE {table_name}") def get_obs_count_by_cc( @@ -130,46 +188,26 @@ def get_obs_count_by_cc( return dict(cc_list) -def maybe_delete_prev_range( - db: ClickhouseConnection, table_name: str, prev_range: PrevRange -): - """ - We perform a lightweight delete of all the rows which have been - regenerated, so we don't have any duplicates in the table - """ - if not prev_range.max_created_at: - return - - # Disabled due to: https://github.com/ClickHouse/ClickHouse/issues/40651 - # db.execute("SET allow_experimental_lightweight_delete = true;") - q_args = { - "max_created_at": prev_range.max_created_at, - "min_created_at": prev_range.min_created_at, - } - if prev_range.bucket_date: - q_args["bucket_date"] = prev_range.bucket_date - elif prev_range.start_timestamp: - q_args["start_timestamp"] = prev_range.start_timestamp - q_args["end_timestamp"] = prev_range.end_timestamp - else: - raise Exception("either bucket_date or timestamps should be set") - - where = f"{prev_range.where} AND created_at <= %(max_created_at)s AND created_at >= %(min_created_at)s" - return db.execute(f"ALTER TABLE {table_name} DELETE " + where, q_args) - - def make_db_rows( - dc_list: List, column_names: List[str], bucket_date: Optional[str] = None + dc_list: List, + column_names: List[str], + bucket_date: Optional[str] = None, + custom_remap: Optional[Dict[str, Callable]] = None, ) -> Tuple[str, List[str]]: assert len(dc_list) > 0 + def maybe_remap(k, value): + if custom_remap and k in custom_remap: + return custom_remap[k](value) + return value + table_name = dc_list[0].__table_name__ rows = [] for d in dc_list: if bucket_date: d.bucket_date = bucket_date assert table_name == d.__table_name__, "inconsistent group of observations" - rows.append(tuple(getattr(d, k) for k in column_names)) + rows.append(tuple(maybe_remap(k, getattr(d, k)) for k in column_names)) return table_name, rows diff --git a/oonidata/workers/experiment_results.py b/oonidata/workers/experiment_results.py deleted file mode 100644 index 4f60f9cc..00000000 --- a/oonidata/workers/experiment_results.py +++ /dev/null @@ -1,258 +0,0 @@ -import logging -import multiprocessing as mp -import pathlib -import queue -from datetime import date, datetime -from multiprocessing.synchronize import Event as EventClass -from threading import Thread -from typing import List - -import statsd - -from oonidata.analysis.control import BodyDB, WebGroundTruthDB -from oonidata.analysis.datasources import iter_web_observations -from oonidata.analysis.websites import make_website_experiment_result -from oonidata.dataclient import date_interval -from oonidata.datautils import PerfTimer -from oonidata.db.connections import ClickhouseConnection -from oonidata.fingerprintdb import FingerprintDB -from oonidata.models.experiment_result import ExperimentResult -from oonidata.netinfo import NetinfoDB -from oonidata.workers.ground_truths import maybe_build_web_ground_truth - -from .common import ( - get_prev_range, - make_db_rows, - maybe_delete_prev_range, - run_progress_thread, -) - -log = logging.getLogger("oonidata.processing") - - -def run_experiment_results( - day: date, - probe_cc: List[str], - fingerprintdb: FingerprintDB, - data_dir: pathlib.Path, - body_db: BodyDB, - db_writer: ClickhouseConnection, - clickhouse: str, -): - statsd_client = statsd.StatsClient("localhost", 8125) - - column_names = [f for f in ExperimentResult._fields] - db_lookup = ClickhouseConnection(clickhouse) - - prev_range = get_prev_range( - db=db_lookup, - table_name=ExperimentResult.__table_name__, - timestamp=datetime.combine(day, datetime.min.time()), - test_name=[], - probe_cc=probe_cc, - ) - - log.info(f"loading ground truth DB for {day}") - t = PerfTimer() - ground_truth_db_path = ( - data_dir / "ground_truths" / f"web-{day.strftime('%Y-%m-%d')}.sqlite3" - ) - web_ground_truth_db = WebGroundTruthDB() - web_ground_truth_db.build_from_existing(str(ground_truth_db_path.absolute())) - statsd_client.timing("wgt_er_all.timed", t.ms) - log.info(f"loaded ground truth DB for {day} in {t.pretty}") - - idx = 0 - for web_obs in iter_web_observations( - db_lookup, measurement_day=day, probe_cc=probe_cc, test_name="web_connectivity" - ): - try: - t_er_gen = PerfTimer() - t = PerfTimer() - relevant_gts = web_ground_truth_db.lookup_by_web_obs(web_obs=web_obs) - except: - log.error( - f"failed to lookup relevant_gts for {web_obs[0].measurement_uid}", - exc_info=True, - ) - continue - - try: - if statsd_client: - statsd_client.timing("wgt_er_reduced.timed", t.ms) - experiment_results = list( - make_website_experiment_result( - web_observations=web_obs, - body_db=body_db, - web_ground_truths=relevant_gts, - fingerprintdb=fingerprintdb, - ) - ) - idx += 1 - table_name, rows = make_db_rows( - dc_list=experiment_results, column_names=column_names - ) - if idx % 100 == 0: - statsd_client.incr("make_website_er.er_count", count=100) - statsd_client.gauge("make_website_er.er_gauge", 100, delta=True) - idx = 0 - - statsd_client.timing("make_website_er.timing", t_er_gen.ms) - - with statsd_client.timer("db_write_rows.timing"): - db_writer.write_rows( - table_name=table_name, - rows=rows, - column_names=column_names, - ) - yield experiment_results - except: - web_obs_ids = ",".join(map(lambda wo: wo.observation_id, web_obs)) - log.error(f"failed to generate er for {web_obs_ids}", exc_info=True) - - maybe_delete_prev_range( - db=db_lookup, prev_range=prev_range, table_name=ExperimentResult.__table_name__ - ) - - -class ExperimentResultMakerWorker(mp.Process): - def __init__( - self, - day_queue: mp.JoinableQueue, - progress_queue: mp.Queue, - shutdown_event: EventClass, - data_dir: pathlib.Path, - probe_cc: List[str], - test_name: List[str], - clickhouse: str, - fast_fail: bool, - log_level: int = logging.INFO, - ): - super().__init__(daemon=True) - self.day_queue = day_queue - self.progress_queue = progress_queue - self.probe_cc = probe_cc - self.test_name = test_name - self.clickhouse = clickhouse - self.fast_fail = fast_fail - self.data_dir = data_dir - - self.shutdown_event = shutdown_event - log.setLevel(log_level) - - def run(self): - - db_writer = ClickhouseConnection(self.clickhouse, row_buffer_size=10_000) - fingerprintdb = FingerprintDB(datadir=self.data_dir, download=False) - - body_db = BodyDB(db=ClickhouseConnection(self.clickhouse)) - - while not self.shutdown_event.is_set(): - try: - day = self.day_queue.get(block=True, timeout=0.1) - except queue.Empty: - continue - - log.info(f"generating experiment results from {day}") - try: - for idx, _ in enumerate( - run_experiment_results( - day=day, - probe_cc=self.probe_cc, - fingerprintdb=fingerprintdb, - data_dir=self.data_dir, - body_db=body_db, - db_writer=db_writer, - clickhouse=self.clickhouse, - ) - ): - if idx % 100 == 0: - self.progress_queue.put(100) - except Exception: - log.error(f"failed to process {day}", exc_info=True) - - finally: - log.info(f"finished processing day {day}") - self.day_queue.task_done() - - log.info("process is done") - try: - db_writer.close() - except: - log.error("failed to flush database", exc_info=True) - - -def start_experiment_result_maker( - probe_cc: List[str], - test_name: List[str], - start_day: date, - end_day: date, - data_dir: pathlib.Path, - clickhouse: str, - parallelism: int, - fast_fail: bool, - rebuild_ground_truths: bool, - log_level: int = logging.INFO, -): - netinfodb = NetinfoDB(datadir=data_dir, download=False) - - shutdown_event = mp.Event() - worker_shutdown_event = mp.Event() - - progress_queue = mp.JoinableQueue() - - progress_thread = Thread( - target=run_progress_thread, args=(progress_queue, shutdown_event) - ) - progress_thread.start() - - workers = [] - day_queue = mp.JoinableQueue() - for _ in range(parallelism): - worker = ExperimentResultMakerWorker( - day_queue=day_queue, - progress_queue=progress_queue, - shutdown_event=worker_shutdown_event, - probe_cc=probe_cc, - test_name=test_name, - data_dir=data_dir, - clickhouse=clickhouse, - fast_fail=fast_fail, - log_level=log_level, - ) - worker.start() - log.info(f"started worker {worker.pid}") - workers.append(worker) - - db_lookup = ClickhouseConnection(clickhouse) - - for day in date_interval(start_day, end_day): - maybe_build_web_ground_truth( - db=db_lookup, - netinfodb=netinfodb, - day=day, - data_dir=data_dir, - rebuild_ground_truths=rebuild_ground_truths, - ) - day_queue.put(day) - - log.info("waiting for the day queue to finish") - day_queue.join() - - log.info("sending shutdown signal to workers") - worker_shutdown_event.set() - - log.info("waiting for progress queue to finish") - progress_queue.join() - - log.info("waiting for experiment workers to finish running") - for idx, p in enumerate(workers): - log.info(f"waiting worker {idx} to join") - p.join() - log.info(f"waiting worker {idx} to close") - p.close() - - log.info("sending shutdown event progress thread") - shutdown_event.set() - log.info("waiting on progress queue") - progress_thread.join() diff --git a/oonidata/workers/observations.py b/oonidata/workers/observations.py index 7721826a..23bdf925 100644 --- a/oonidata/workers/observations.py +++ b/oonidata/workers/observations.py @@ -1,7 +1,7 @@ import pathlib import logging import dataclasses -from datetime import date, datetime, timedelta +from datetime import date, datetime, timedelta, timezone from typing import ( List, @@ -37,6 +37,7 @@ get_prev_range, make_db_rows, maybe_delete_prev_range, + optimize_all_tables, ) log = logging.getLogger("oonidata.processing") @@ -194,7 +195,7 @@ def make_observation_in_day( if len(prev_ranges) > 0: with ClickhouseConnection(clickhouse, row_buffer_size=10_000) as db: for table_name, pr in prev_ranges: - maybe_delete_prev_range(db=db, prev_range=pr, table_name=table_name) + maybe_delete_prev_range(db=db, prev_range=pr) return total_size, total_msmt_count @@ -210,6 +211,9 @@ def start_observation_maker( fast_fail: bool, log_level: int = logging.INFO, ): + log.info("Optimizing all tables") + optimize_all_tables(clickhouse) + dask_client = DaskClient( threads_per_worker=2, n_workers=parallelism, @@ -219,6 +223,8 @@ def start_observation_maker( total_size, total_msmt_count = 0, 0 day_list = list(date_interval(start_day, end_day)) # See: https://stackoverflow.com/questions/51099685/best-practices-in-setting-number-of-dask-workers + + log.info(f"Starting observation making on {probe_cc} ({start_day} - {end_day})") for day in day_list: t = PerfTimer() size, msmt_count = make_observation_in_day( @@ -243,7 +249,7 @@ def start_observation_maker( [ [ "oonidata.bucket_processed", - datetime.utcnow(), + datetime.now(timezone.utc).replace(tzinfo=None), int(t.ms), size, msmt_count, @@ -260,3 +266,5 @@ def start_observation_maker( log.info( f"{round(total_size/10**9, 2)}GB {total_msmt_count} msmts in {t_total.pretty}" ) + + dask_client.shutdown() diff --git a/poetry.lock b/poetry.lock index 417a416d..5b796d03 100644 --- a/poetry.lock +++ b/poetry.lock @@ -511,6 +511,21 @@ files = [ [package.dependencies] colorama = {version = "*", markers = "platform_system == \"Windows\""} +[[package]] +name = "click-loglevel" +version = "0.5.0" +description = "Log level parameter type for Click" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-loglevel-0.5.0.tar.gz", hash = "sha256:fcc98a136a96479b4768494df25017114ba1cb525dac0bed619209b3578fd4f3"}, + {file = "click_loglevel-0.5.0-py3-none-any.whl", hash = "sha256:897070fd4bf5b503edb5a1ecc0551448647901f4fac83a01c9f00aa28ad86d60"}, +] + +[package.dependencies] +click = ">=8.0" + [[package]] name = "clickhouse-driver" version = "0.2.5" @@ -3699,4 +3714,4 @@ research = ["jupyterlab"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<4" -content-hash = "3d04d49326808a5955bb9acf7559627feb6775311fd16f242006a7d604692442" +content-hash = "dd5586d165d8e6d333c44d10223f88126f09700de8509a9674adc28adbc584ee" diff --git a/pyproject.toml b/pyproject.toml index 2fab322b..6e6b5f0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ numpy = {version = "^1.23.5", optional = true, python = ">=3.8"} pandas = {version = "^2.0.0", optional = true, python = ">=3.8"} flask = {version = "^2.2.2", optional = true} jupyterlab = {version = "^4.0.7", optional = true} +click-loglevel = "^0.5.0" [tool.poetry.dev-dependencies] pytest = ">=7.2" diff --git a/tests/conftest.py b/tests/conftest.py index 4f132dba..0e6a91bc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -76,10 +76,10 @@ def cli_runner(): return CliRunner() -@pytest.fixture -def db(): - from oonidata.db.create_tables import create_queries +from oonidata.db.create_tables import create_queries + +def create_db_for_fixture(): try: with ClickhouseConnection(conn_url="clickhouse://localhost/") as db: db.execute("CREATE DATABASE IF NOT EXISTS testing_oonidata") @@ -91,8 +91,19 @@ def db(): db.execute("SELECT 1") except: pytest.skip("no database connection") - for query, table_name in create_queries: - db.execute(f"DROP TABLE IF EXISTS {table_name};") + for query, _ in create_queries: db.execute(query) + return db + +@pytest.fixture +def db_notruncate(): + return create_db_for_fixture() + + +@pytest.fixture +def db(): + db = create_db_for_fixture() + for _, table_name in create_queries: + db.execute(f"TRUNCATE TABLE {table_name};") return db diff --git a/tests/test_analysis.py b/tests/test_analysis.py index 137544f3..9cea29f7 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -3,6 +3,8 @@ import random from typing import List from unittest.mock import MagicMock + +import pytest from oonidata.analysis.datasources import load_measurement from oonidata.analysis.web_analysis import make_web_analysis from oonidata.datautils import validate_cert_chain @@ -13,9 +15,6 @@ WebGroundTruthDB, ) from oonidata.analysis.signal import make_signal_experiment_result -from oonidata.analysis.websites import ( - make_website_experiment_result, -) from oonidata.models.nettests.signal import Signal from oonidata.models.nettests.web_connectivity import WebConnectivity from oonidata.models.observations import WebObservation, print_nice, print_nice_vertical @@ -120,6 +119,7 @@ def test_signal(fingerprintdb, netinfodb, measurements): def test_website_dns_blocking_event(fingerprintdb, netinfodb, measurements): + pytest.skip("TODO(arturo): implement this with the new analysis") msmt_path = measurements[ "20220627030703.592775_IR_webconnectivity_80e199b3c572f8d3" ] @@ -200,17 +200,11 @@ def make_experiment_result_from_wc_ctrl(msmt_path, fingerprintdb, netinfodb): body_db.lookup = MagicMock() body_db.lookup.return_value = [] - return make_website_experiment_result( - web_observations=web_observations, - web_ground_truths=web_ground_truth_db.lookup_by_web_obs( - web_obs=web_observations - ), - body_db=body_db, - fingerprintdb=fingerprintdb, - ) + return [] def test_website_experiment_result_blocked(fingerprintdb, netinfodb, measurements): + pytest.skip("TODO(arturo): implement this with the new analysis") experiment_results = list( make_experiment_result_from_wc_ctrl( measurements["20220627030703.592775_IR_webconnectivity_80e199b3c572f8d3"], @@ -223,6 +217,7 @@ def test_website_experiment_result_blocked(fingerprintdb, netinfodb, measurement def test_website_experiment_result_ok(fingerprintdb, netinfodb, measurements): + pytest.skip("TODO(arturo): implement this with the new analysis") experiment_results = list( make_experiment_result_from_wc_ctrl( measurements["20220608132401.787399_AM_webconnectivity_2285fc373f62729e"], @@ -310,6 +305,6 @@ def test_website_web_analysis_blocked(fingerprintdb, netinfodb, measurements, da ) ) assert len(web_analysis) == len(web_obs) - for wa in web_analysis: - print(wa.measurement_uid) - print_nice_vertical(wa) + # for wa in web_analysis: + # print(wa.measurement_uid) + # print_nice_vertical(wa) diff --git a/tests/test_cli.py b/tests/test_cli.py index 440c6698..bf3e5158 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -58,7 +58,7 @@ def test_full_workflow( "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/testing_oonidata", + db.clickhouse_url, # "--archives-dir", # tmp_path.absolute(), ], @@ -90,7 +90,7 @@ def test_full_workflow( "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/testing_oonidata", + db.clickhouse_url, ], ) assert result.exit_code == 0 @@ -134,7 +134,7 @@ def test_full_workflow( result = cli_runner.invoke( cli, [ - "mker", + "mkanalysis", "--probe-cc", "BA", "--start-day", @@ -146,11 +146,11 @@ def test_full_workflow( "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/testing_oonidata", + db.clickhouse_url, ], ) assert result.exit_code == 0 res = db.execute( - "SELECT COUNT(DISTINCT(measurement_uid)) FROM experiment_result WHERE measurement_uid LIKE '20221020%' AND probe_cc = 'BA'" + "SELECT COUNT(DISTINCT(measurement_uid)) FROM measurement_experiment_result WHERE measurement_uid LIKE '20221020%' AND location_network_cc = 'BA'" ) assert res[0][0] == 200 # type: ignore diff --git a/tests/test_ctrl.py b/tests/test_ctrl.py index 2c4452a4..c5d86b81 100644 --- a/tests/test_ctrl.py +++ b/tests/test_ctrl.py @@ -72,7 +72,7 @@ def test_web_ground_truth_from_clickhouse(db, datadir, netinfodb, tmp_path): resolver_cc="US", resolver_as_org_name="Verizon Business", resolver_as_cc="US", - resolver_is_scrubbed=0, + resolver_is_scrubbed=False, resolver_asn_probe=22394, resolver_as_org_name_probe="Verizon Business", observation_id="TEST", @@ -83,7 +83,7 @@ def test_web_ground_truth_from_clickhouse(db, datadir, netinfodb, tmp_path): ip_as_org_name="Fastly, Inc.", ip_as_cc="US", ip_cc="US", - ip_is_bogon=0, + ip_is_bogon=False, dns_query_type="A", dns_failure=None, dns_engine="system", @@ -94,13 +94,13 @@ def test_web_ground_truth_from_clickhouse(db, datadir, netinfodb, tmp_path): dns_answer_as_org_name="Fastly, Inc.", dns_t=0.117683385, tcp_failure=None, - tcp_success=1, + tcp_success=True, tcp_t=0.583859739, tls_failure=None, tls_server_name=None, tls_version=None, tls_cipher_suite=None, - tls_is_certificate_valid=1, + tls_is_certificate_valid=True, tls_end_entity_certificate_fingerprint=None, tls_end_entity_certificate_subject=None, tls_end_entity_certificate_subject_common_name=None, @@ -136,8 +136,8 @@ def test_web_ground_truth_from_clickhouse(db, datadir, netinfodb, tmp_path): probe_analysis="false", pp_http_response_fingerprints=[], pp_http_fingerprint_country_consistent=None, - pp_http_response_matches_blockpage=0, - pp_http_response_matches_false_positive=0, + pp_http_response_matches_blockpage=False, + pp_http_response_matches_false_positive=False, pp_http_response_body_title=None, pp_http_response_body_meta_title=None, pp_dns_fingerprint_id=None, diff --git a/tests/test_experiment_results.py b/tests/test_experiment_results.py new file mode 100644 index 00000000..5e399f76 --- /dev/null +++ b/tests/test_experiment_results.py @@ -0,0 +1,67 @@ +from pprint import pprint +from oonidata.analysis.control import ( + BodyDB, + WebGroundTruthDB, + iter_ground_truths_from_web_control, +) +from oonidata.analysis.datasources import load_measurement +from oonidata.analysis.web_analysis import make_web_analysis +from oonidata.analysis.website_experiment_results import make_website_experiment_results +from oonidata.models.observations import print_nice, print_nice_vertical +from oonidata.transforms import measurement_to_observations + + +# Check this for wc 0.5 overwriting tls analsysis +# 20231031000227.813597_MY_webconnectivity_2f0b80761373aa7e +def test_website_experiment_results(measurements, netinfodb, fingerprintdb): + msmt = load_measurement( + msmt_path=measurements[ + "20221101055235.141387_RU_webconnectivity_046ce024dd76b564" + ] + ) + web_observations, web_control_observations = measurement_to_observations( + msmt, netinfodb=netinfodb + ) + assert isinstance(msmt.input, str) + web_ground_truth_db = WebGroundTruthDB() + web_ground_truth_db.build_from_rows( + rows=iter_ground_truths_from_web_control( + web_control_observations=web_control_observations, + netinfodb=netinfodb, + ), + ) + + web_ground_truths = web_ground_truth_db.lookup_by_web_obs(web_obs=web_observations) + web_analysis = list( + make_web_analysis( + web_observations=web_observations, + web_ground_truths=web_ground_truths, + body_db=BodyDB(db=None), # type: ignore + fingerprintdb=fingerprintdb, + ) + ) + + # TODO(arturo): there is currently an edge case here which is when we get an + # IPv6 answer, since we are ignoring them in the analysis, we will have N + # less analysis where N is the number of IPv6 addresses. + assert len(web_analysis) == len(web_observations) + # for wa in web_analysis: + # print_nice_vertical(wa) + + website_er = list(make_website_experiment_results(web_analysis)) + assert len(website_er) == 1 + + wer = website_er[0] + analysis_transcript_list = wer.analysis_transcript_list + + assert ( + sum(wer.loni_blocked_values) + sum(wer.loni_down_values) + wer.loni_ok_value + == 1 + ) + assert wer.anomaly == True + + # wer.analysis_transcript_list = None + # print_nice_vertical(wer) + # for loni in wer.loni_list: + # pprint(loni.to_dict()) + # print(analysis_transcript_list) diff --git a/tests/test_scoring.py b/tests/test_scoring.py index d11d3927..a55691cb 100644 --- a/tests/test_scoring.py +++ b/tests/test_scoring.py @@ -1,15 +1,17 @@ from unittest.mock import MagicMock + +import pytest from oonidata.analysis.control import ( WebGroundTruthDB, iter_ground_truths_from_web_control, ) from oonidata.analysis.datasources import load_measurement -from oonidata.analysis.websites import make_website_experiment_result from oonidata.models.experiment_result import print_nice_er from oonidata.transforms import measurement_to_observations def test_tcp_scoring(measurements, netinfodb, fingerprintdb): + pytest.skip("TODO(arturo): implement this with the new analysis") msmt = load_measurement( msmt_path=measurements[ "20221101055235.141387_RU_webconnectivity_046ce024dd76b564" diff --git a/tests/test_workers.py b/tests/test_workers.py index 216d4faf..08c9cdb2 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -1,20 +1,23 @@ -from datetime import date +from datetime import date, datetime, timedelta, timezone import gzip from pathlib import Path import sqlite3 -from typing import List +from typing import List, Tuple from unittest.mock import MagicMock from oonidata.analysis.datasources import load_measurement from oonidata.dataclient import stream_jsonl -from oonidata.db.connections import ClickhouseConnection from oonidata.models.nettests.dnscheck import DNSCheck from oonidata.models.nettests.web_connectivity import WebConnectivity from oonidata.models.nettests.http_invalid_request_line import HTTPInvalidRequestLine from oonidata.models.observations import HTTPMiddleboxObservation from oonidata.workers.analysis import make_analysis_in_a_day, make_cc_batches, make_ctrl -from oonidata.workers.common import get_obs_count_by_cc +from oonidata.workers.common import ( + get_obs_count_by_cc, + get_prev_range, + maybe_delete_prev_range, +) from oonidata.workers.observations import ( make_observations_for_file_entry_batch, write_observations_to_db, @@ -23,6 +26,81 @@ from oonidata.workers.fingerprint_hunter import fingerprint_hunter from oonidata.transforms import measurement_to_observations from oonidata.transforms.nettests.measurement_transformer import MeasurementTransformer +from tests.test_cli import wait_for_mutations + + +def test_get_prev_range(db): + db.execute("DROP TABLE IF EXISTS test_range") + db.execute( + """CREATE TABLE test_range ( + created_at DateTime64(3, 'UTC'), + bucket_date String, + test_name String, + probe_cc String + ) + ENGINE = MergeTree + ORDER BY (bucket_date, created_at) + """ + ) + bucket_date = "2000-01-01" + test_name = "web_connectivity" + probe_cc = "IT" + min_time = datetime(2000, 1, 1, 23, 42, 00) + rows = [(min_time, bucket_date, test_name, probe_cc)] + for i in range(200): + rows.append((min_time + timedelta(seconds=i), bucket_date, test_name, probe_cc)) + db.execute( + "INSERT INTO test_range (created_at, bucket_date, test_name, probe_cc) VALUES", + rows, + ) + prev_range = get_prev_range( + db, + "test_range", + test_name=[test_name], + bucket_date=bucket_date, + probe_cc=[probe_cc], + ) + assert prev_range.min_created_at and prev_range.max_created_at + assert prev_range.min_created_at == (min_time - timedelta(seconds=1)) + assert prev_range.max_created_at == (rows[-1][0] + timedelta(seconds=1)) + db.execute("TRUNCATE TABLE test_range") + + bucket_date = "2000-03-01" + test_name = "web_connectivity" + probe_cc = "IT" + min_time = datetime(2000, 1, 1, 23, 42, 00) + rows: List[Tuple[datetime, str, str, str]] = [] + for i in range(10): + rows.append( + (min_time + timedelta(seconds=i), "2000-02-01", test_name, probe_cc) + ) + min_time = rows[-1][0] + for i in range(10): + rows.append((min_time + timedelta(seconds=i), bucket_date, test_name, probe_cc)) + + db.execute( + "INSERT INTO test_range (created_at, bucket_date, test_name, probe_cc) VALUES", + rows, + ) + prev_range = get_prev_range( + db, + "test_range", + test_name=[test_name], + bucket_date=bucket_date, + probe_cc=[probe_cc], + ) + assert prev_range.min_created_at and prev_range.max_created_at + assert prev_range.min_created_at == (min_time - timedelta(seconds=1)) + assert prev_range.max_created_at == (rows[-1][0] + timedelta(seconds=1)) + + maybe_delete_prev_range( + db=db, + prev_range=prev_range, + ) + wait_for_mutations(db, "test_range") + res = db.execute("SELECT COUNT() FROM test_range") + assert res[0][0] == 10 + db.execute("DROP TABLE test_range") def test_make_cc_batches():