diff --git a/oonidata/analysis/web_analysis.py b/oonidata/analysis/web_analysis.py index 041977a0..3921ba65 100644 --- a/oonidata/analysis/web_analysis.py +++ b/oonidata/analysis/web_analysis.py @@ -1,7 +1,7 @@ from collections import defaultdict from dataclasses import dataclass import dataclasses -from datetime import datetime +from datetime import datetime, timezone import ipaddress from typing import ( Generator, @@ -674,7 +674,7 @@ def make_web_analysis( fingerprintdb=fingerprintdb, ) - created_at = datetime.utcnow() + created_at = datetime.now(timezone.utc).replace(tzinfo=None) website_analysis = WebAnalysis( measurement_uid=web_o.measurement_uid, observation_id=web_o.observation_id, diff --git a/oonidata/analysis/website_experiment_results.py b/oonidata/analysis/website_experiment_results.py index 3c3f8431..42047e68 100644 --- a/oonidata/analysis/website_experiment_results.py +++ b/oonidata/analysis/website_experiment_results.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone import logging from typing import Dict, Generator, List, NamedTuple, Optional, Tuple @@ -796,7 +796,6 @@ def make_website_experiment_results( Takes as input a list of web_analysis and outputs a list of ExperimentResults for the website. """ - experiment_result_id = "XXXX" observation_id_list = [] first_analysis = web_analysis[0] @@ -974,7 +973,7 @@ def get_agg_outcome(loni_list, category, agg_func) -> Optional[OutcomeStatus]: measurement_uid=measurement_uid, observation_id_list=observation_id_list, timeofday=timeofday, - created_at=datetime.utcnow(), + created_at=datetime.now(timezone.utc).replace(tzinfo=None), location_network_type=first_analysis.network_type, location_network_asn=first_analysis.probe_asn, location_network_cc=first_analysis.probe_cc, diff --git a/oonidata/cli/command.py b/oonidata/cli/command.py index fe3f62a3..60b664d2 100644 --- a/oonidata/cli/command.py +++ b/oonidata/cli/command.py @@ -7,6 +7,7 @@ from typing import List, Optional import click +from click_loglevel import LogLevel from oonidata import __version__ from oonidata.dataclient import ( @@ -70,10 +71,18 @@ def _parse_csv(ctx, param, s: Optional[str]) -> List[str]: @click.group() @click.option("--error-log-file", type=Path) +@click.option( + "-l", + "--log-level", + type=LogLevel(), + default="INFO", + help="Set logging level", + show_default=True, +) @click.version_option(__version__) -def cli(error_log_file: Path): +def cli(error_log_file: Path, log_level: int): log.addHandler(logging.StreamHandler(sys.stderr)) - log.setLevel(logging.INFO) + log.setLevel(log_level) if error_log_file: logging.basicConfig( filename=error_log_file, encoding="utf-8", level=logging.ERROR diff --git a/oonidata/db/connections.py b/oonidata/db/connections.py index 0e4daeaf..43f29eec 100644 --- a/oonidata/db/connections.py +++ b/oonidata/db/connections.py @@ -3,7 +3,7 @@ import time from collections import defaultdict, namedtuple -from datetime import datetime +from datetime import datetime, timezone from pprint import pformat import logging @@ -114,7 +114,7 @@ def __init__(self, output_dir): def write_rows(self, table_name, rows, column_names): if table_name not in self.open_writers: - ts = datetime.utcnow().strftime("%Y%m%dT%H%M%S") + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") fh = (self.output_dir / f"{table_name}-{ts}.csv").open("w") csv_writer = csv.writer(fh) csv_writer.writerow(column_names) diff --git a/oonidata/models/experiment_result.py b/oonidata/models/experiment_result.py index f97c8a20..939fe7b2 100644 --- a/oonidata/models/experiment_result.py +++ b/oonidata/models/experiment_result.py @@ -3,7 +3,7 @@ import logging from typing import Any, Dict, Generator, List, Optional, NamedTuple, Mapping, Tuple from enum import Enum -from datetime import datetime +from datetime import datetime, timezone from tabulate import tabulate from oonidata.datautils import maybe_elipse @@ -277,7 +277,7 @@ def iter_experiment_results( target_name: str, outcomes: List[Outcome], ) -> Generator[ExperimentResult, None, None]: - created_at = datetime.utcnow() + created_at = datetime.now(timezone.utc).replace(tzinfo=None) for idx, outcome in enumerate(outcomes): yield ExperimentResult( measurement_uid=obs.measurement_uid, diff --git a/oonidata/transforms/nettests/http_header_field_manipulation.py b/oonidata/transforms/nettests/http_header_field_manipulation.py index 5e1774b3..f447f50a 100644 --- a/oonidata/transforms/nettests/http_header_field_manipulation.py +++ b/oonidata/transforms/nettests/http_header_field_manipulation.py @@ -1,5 +1,5 @@ import dataclasses -from datetime import datetime +from datetime import datetime, timezone import orjson from typing import List, Tuple from oonidata.models.nettests import HTTPHeaderFieldManipulation @@ -14,7 +14,7 @@ def make_observations( mb_obs = HTTPMiddleboxObservation( hfm_success=True, observation_id=f"{msmt.measurement_uid}_0", - created_at=datetime.utcnow().replace(microsecond=0), + created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None), **dataclasses.asdict(self.measurement_meta), ) diff --git a/oonidata/transforms/nettests/http_invalid_request_line.py b/oonidata/transforms/nettests/http_invalid_request_line.py index 597d0863..b789a2e5 100644 --- a/oonidata/transforms/nettests/http_invalid_request_line.py +++ b/oonidata/transforms/nettests/http_invalid_request_line.py @@ -1,5 +1,5 @@ import dataclasses -from datetime import datetime +from datetime import datetime, timezone from typing import List, Tuple from oonidata.models.dataformats import maybe_binary_data_to_bytes from oonidata.models.nettests import HTTPInvalidRequestLine @@ -54,7 +54,7 @@ def make_observations( mb_obs = HTTPMiddleboxObservation( hirl_success=True, observation_id=f"{msmt.measurement_uid}_0", - created_at=datetime.utcnow().replace(microsecond=0), + created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None), **dataclasses.asdict(self.measurement_meta), ) if not msmt.test_keys.sent: diff --git a/oonidata/transforms/nettests/measurement_transformer.py b/oonidata/transforms/nettests/measurement_transformer.py index 8973225b..a282e9a0 100644 --- a/oonidata/transforms/nettests/measurement_transformer.py +++ b/oonidata/transforms/nettests/measurement_transformer.py @@ -5,7 +5,7 @@ import dataclasses import logging from urllib.parse import urlparse, urlsplit -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import ( Callable, Optional, @@ -134,11 +134,14 @@ ) -def normalize_failure(failure: Failure): +def normalize_failure(failure: Union[Failure, bool]) -> Failure: if not failure: # This will set it to None even when it's false return None + if failure is True: + return "true" + if failure.startswith("unknown_failure"): for substring, new_failure in unknown_failure_map: if substring in failure: @@ -445,7 +448,7 @@ def maybe_set_web_fields( ], web_obs: WebObservation, prefix: str, - field_names: Tuple[str], + field_names: Tuple[str, ...], ): # TODO: the fact we have to do this is an artifact of the original # one-observation per type model. Once we decide we don't want to go for @@ -899,7 +902,7 @@ def consume_web_observations( for idx, obs in enumerate(web_obs_list): obs.observation_id = f"{obs.measurement_uid}_{idx}" - obs.created_at = datetime.utcnow().replace(microsecond=0) + obs.created_at = datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None) return web_obs_list diff --git a/oonidata/transforms/nettests/web_connectivity.py b/oonidata/transforms/nettests/web_connectivity.py index 9b9a856e..be8ac276 100644 --- a/oonidata/transforms/nettests/web_connectivity.py +++ b/oonidata/transforms/nettests/web_connectivity.py @@ -1,5 +1,5 @@ from copy import deepcopy -from datetime import datetime +from datetime import datetime, timezone from typing import Dict, List, Tuple from urllib.parse import urlparse from oonidata.datautils import is_ip_bogon @@ -41,7 +41,7 @@ def make_web_control_observations( test_name=msmt.test_name, test_version=msmt.test_version, hostname=hostname, - created_at=datetime.utcnow(), + created_at=datetime.now(timezone.utc).replace(tzinfo=None), ) # Reference for new-style web_connectivity: # https://explorer.ooni.org/measurement/20220924T215758Z_webconnectivity_IR_206065_n1_2CRoWBNJkWc7VyAs?input=https%3A%2F%2Fdoh.dns.apple.com%2Fdns-query%3Fdns%3Dq80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB diff --git a/oonidata/workers/analysis.py b/oonidata/workers/analysis.py index f6bd9ad3..ffc3a953 100644 --- a/oonidata/workers/analysis.py +++ b/oonidata/workers/analysis.py @@ -74,14 +74,25 @@ def make_analysis_in_a_day( column_names_wa = [f.name for f in dataclasses.fields(WebAnalysis)] column_names_er = [f.name for f in dataclasses.fields(MeasurementExperimentResult)] - prev_range = get_prev_range( - db=db_lookup, - table_name=WebAnalysis.__table_name__, - timestamp=datetime.combine(day, datetime.min.time()), - test_name=[], - probe_cc=probe_cc, - timestamp_column="measurement_start_time", - ) + prev_range_list = [ + get_prev_range( + db=db_lookup, + table_name=WebAnalysis.__table_name__, + timestamp=datetime.combine(day, datetime.min.time()), + test_name=[], + probe_cc=probe_cc, + timestamp_column="measurement_start_time", + ), + get_prev_range( + db=db_lookup, + table_name=MeasurementExperimentResult.__table_name__, + timestamp=datetime.combine(day, datetime.min.time()), + test_name=[], + probe_cc=probe_cc, + timestamp_column="timeofday", + probe_cc_column="location_network_cc", + ), + ] log.info(f"loading ground truth DB for {day}") t = PerfTimer() @@ -118,6 +129,7 @@ def make_analysis_in_a_day( fingerprintdb=fingerprintdb, ) ) + log.info(f"generated {len(website_analysis)} website_analysis") if len(website_analysis) == 0: log.info(f"no website analysis for {probe_cc}, {test_name}") continue @@ -138,6 +150,7 @@ def make_analysis_in_a_day( with statsd_client.timer("oonidata.web_analysis.experiment_results.timing"): website_er = list(make_website_experiment_results(website_analysis)) + log.info(f"generated {len(website_er)} website_er") table_name, rows = make_db_rows( dc_list=website_er, column_names=column_names_er, @@ -154,9 +167,9 @@ def make_analysis_in_a_day( web_obs_ids = ",".join(map(lambda wo: wo.observation_id, web_obs)) log.error(f"failed to generate analysis for {web_obs_ids}", exc_info=True) - maybe_delete_prev_range( - db=db_lookup, prev_range=prev_range, table_name=WebAnalysis.__table_name__ - ) + for prev_range in prev_range_list: + maybe_delete_prev_range(db=db_lookup, prev_range=prev_range) + db_writer.close() return idx @@ -285,3 +298,4 @@ def start_analysis( obs_per_sec = round(total_obs_count / t_total.s) log.info(f"finished processing {start_day} - {end_day} speed: {obs_per_sec}obs/s)") log.info(f"{total_obs_count} msmts in {t_total.pretty}") + dask_client.shutdown() diff --git a/oonidata/workers/common.py b/oonidata/workers/common.py index 35187ed6..76d6671d 100644 --- a/oonidata/workers/common.py +++ b/oonidata/workers/common.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass import queue import logging import multiprocessing as mp @@ -18,7 +19,6 @@ from tqdm import tqdm from oonidata.dataclient import ( MeasurementListProgress, - ProgressStatus, ) from oonidata.db.connections import ( ClickhouseConnection, @@ -28,13 +28,74 @@ log = logging.getLogger("oonidata.processing") -class PrevRange(NamedTuple): +@dataclass +class BatchParameters: + test_name: List[str] + probe_cc: List[str] bucket_date: Optional[str] - start_timestamp: Optional[datetime] - end_timestamp: Optional[datetime] - max_created_at: Optional[datetime] - min_created_at: Optional[datetime] - where: str + timestamp: Optional[datetime] + + +@dataclass +class PrevRange: + table_name: str + batch_parameters: BatchParameters + timestamp_column: Optional[str] + probe_cc_column: Optional[str] + max_created_at: Optional[datetime] = None + min_created_at: Optional[datetime] = None + + def format_query(self): + start_timestamp = None + end_timestamp = None + where = None + where = "WHERE " + q_args: Dict[str, Any] = {} + + if self.batch_parameters.bucket_date: + where = "WHERE bucket_date = %(bucket_date)s" + q_args["bucket_date"] = self.batch_parameters.bucket_date + + elif self.batch_parameters.timestamp: + start_timestamp = self.batch_parameters.timestamp + end_timestamp = start_timestamp + timedelta(days=1) + q_args["start_timestamp"] = start_timestamp + q_args["end_timestamp"] = end_timestamp + where += f"{self.timestamp_column} >= %(start_timestamp)s AND {self.timestamp_column} < %(end_timestamp)s" + else: + raise Exception("Must specify either bucket_date or timestamp") + + if len(self.batch_parameters.test_name) > 0: + where += " AND test_name IN %(test_names)s" + q_args["test_names"] = self.batch_parameters.test_name + if len(self.batch_parameters.probe_cc) > 0: + where += f" AND {self.probe_cc_column} IN %(probe_ccs)s" + q_args["probe_ccs"] = self.batch_parameters.probe_cc + + return where, q_args + + +def maybe_delete_prev_range(db: ClickhouseConnection, prev_range: PrevRange): + """ + We perform a lightweight delete of all the rows which have been + regenerated, so we don't have any duplicates in the table + """ + if not prev_range.max_created_at or not prev_range.min_created_at: + return + + # Disabled due to: https://github.com/ClickHouse/ClickHouse/issues/40651 + # db.execute("SET allow_experimental_lightweight_delete = true;") + + where, q_args = prev_range.format_query() + + q_args["max_created_at"] = prev_range.max_created_at + q_args["min_created_at"] = prev_range.min_created_at + where = f"{where} AND created_at <= %(max_created_at)s AND created_at >= %(min_created_at)s" + log.info(f"runing {where} with {q_args}") + + q = f"ALTER TABLE {prev_range.table_name} DELETE " + final_query = q + where + return db.execute(final_query, q_args) def get_prev_range( @@ -45,6 +106,7 @@ def get_prev_range( bucket_date: Optional[str] = None, timestamp: Optional[datetime] = None, timestamp_column: str = "timestamp", + probe_cc_column: str = "probe_cc", ) -> PrevRange: """ We lookup the range of previously generated rows so we can drop @@ -68,40 +130,26 @@ def get_prev_range( bucket as reprocessing in progress and guard against running queries for it. """ - q = f"SELECT MAX(created_at), MIN(created_at) FROM {table_name} " + # A batch specified by test_name, probe_cc and one of either bucket_date or + # timestamp depending on it being observations or experiment results. assert ( timestamp or bucket_date ), "either timestamp or bucket_date should be provided" - start_timestamp = None - end_timestamp = None - where = None - where = "WHERE bucket_date = %(bucket_date)s" - q_args: Dict[str, Any] = {"bucket_date": bucket_date} - if timestamp: - start_timestamp = timestamp - end_timestamp = timestamp + timedelta(days=1) - q_args["start_timestamp"] = start_timestamp - q_args["end_timestamp"] = end_timestamp - where += f" AND {timestamp_column} >= %(start_timestamp)s AND {timestamp_column} < %(end_timestamp)s" - - if len(test_name) > 0: - test_name_list = [] - for tn in test_name: - # sanitize the test_names. It should not be a security issue since - # it's not user provided, but better safe than sorry - assert tn.replace("_", "").isalnum(), f"not alphabetic testname {tn}" - test_name_list.append(f"'{tn}'") - where += " AND test_name IN ({})".format(",".join(test_name_list)) - if len(probe_cc) > 0: - probe_cc_list = [] - for cc in probe_cc: - assert cc.replace("_", "").isalnum(), f"not alphabetic probe_cc" - probe_cc_list.append(f"'{cc}'") - where += " AND probe_cc IN ({})".format(",".join(probe_cc_list)) + prev_range = PrevRange( + table_name=table_name, + batch_parameters=BatchParameters( + test_name=test_name, + probe_cc=probe_cc, + timestamp=timestamp, + bucket_date=bucket_date, + ), + timestamp_column=timestamp_column, + probe_cc_column=probe_cc_column, + ) + q = f"SELECT MAX(created_at), MIN(created_at) FROM {prev_range.table_name} " + where, q_args = prev_range.format_query() final_query = q + where - print(final_query) - print(q_args) prev_obs_range = db.execute(final_query, q_args) assert isinstance(prev_obs_range, list) and len(prev_obs_range) == 1 max_created_at, min_created_at = prev_obs_range[0] @@ -109,17 +157,14 @@ def get_prev_range( # We pad it by 1 second to take into account the time resolution downgrade # happening when going from clickhouse to python data types if max_created_at and min_created_at: - max_created_at += timedelta(seconds=1) - min_created_at -= timedelta(seconds=1) - - return PrevRange( - max_created_at=max_created_at, - min_created_at=min_created_at, - start_timestamp=start_timestamp, - end_timestamp=end_timestamp, - where=where, - bucket_date=bucket_date, - ) + prev_range.max_created_at = (max_created_at + timedelta(seconds=1)).replace( + tzinfo=None + ) + prev_range.min_created_at = (min_created_at - timedelta(seconds=1)).replace( + tzinfo=None + ) + + return prev_range def optimize_all_tables(clickhouse): @@ -143,34 +188,6 @@ def get_obs_count_by_cc( return dict(cc_list) -def maybe_delete_prev_range( - db: ClickhouseConnection, table_name: str, prev_range: PrevRange -): - """ - We perform a lightweight delete of all the rows which have been - regenerated, so we don't have any duplicates in the table - """ - if not prev_range.max_created_at: - return - - # Disabled due to: https://github.com/ClickHouse/ClickHouse/issues/40651 - # db.execute("SET allow_experimental_lightweight_delete = true;") - q_args = { - "max_created_at": prev_range.max_created_at, - "min_created_at": prev_range.min_created_at, - } - if prev_range.bucket_date: - q_args["bucket_date"] = prev_range.bucket_date - elif prev_range.start_timestamp: - q_args["start_timestamp"] = prev_range.start_timestamp - q_args["end_timestamp"] = prev_range.end_timestamp - else: - raise Exception("either bucket_date or timestamps should be set") - - where = f"{prev_range.where} AND created_at <= %(max_created_at)s AND created_at >= %(min_created_at)s" - return db.execute(f"ALTER TABLE {table_name} DELETE " + where, q_args) - - def make_db_rows( dc_list: List, column_names: List[str], diff --git a/oonidata/workers/observations.py b/oonidata/workers/observations.py index c518bb2c..23bdf925 100644 --- a/oonidata/workers/observations.py +++ b/oonidata/workers/observations.py @@ -1,7 +1,7 @@ import pathlib import logging import dataclasses -from datetime import date, datetime, timedelta +from datetime import date, datetime, timedelta, timezone from typing import ( List, @@ -195,7 +195,7 @@ def make_observation_in_day( if len(prev_ranges) > 0: with ClickhouseConnection(clickhouse, row_buffer_size=10_000) as db: for table_name, pr in prev_ranges: - maybe_delete_prev_range(db=db, prev_range=pr, table_name=table_name) + maybe_delete_prev_range(db=db, prev_range=pr) return total_size, total_msmt_count @@ -249,7 +249,7 @@ def start_observation_maker( [ [ "oonidata.bucket_processed", - datetime.utcnow(), + datetime.now(timezone.utc).replace(tzinfo=None), int(t.ms), size, msmt_count, @@ -266,3 +266,5 @@ def start_observation_maker( log.info( f"{round(total_size/10**9, 2)}GB {total_msmt_count} msmts in {t_total.pretty}" ) + + dask_client.shutdown() diff --git a/poetry.lock b/poetry.lock index 417a416d..5b796d03 100644 --- a/poetry.lock +++ b/poetry.lock @@ -511,6 +511,21 @@ files = [ [package.dependencies] colorama = {version = "*", markers = "platform_system == \"Windows\""} +[[package]] +name = "click-loglevel" +version = "0.5.0" +description = "Log level parameter type for Click" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-loglevel-0.5.0.tar.gz", hash = "sha256:fcc98a136a96479b4768494df25017114ba1cb525dac0bed619209b3578fd4f3"}, + {file = "click_loglevel-0.5.0-py3-none-any.whl", hash = "sha256:897070fd4bf5b503edb5a1ecc0551448647901f4fac83a01c9f00aa28ad86d60"}, +] + +[package.dependencies] +click = ">=8.0" + [[package]] name = "clickhouse-driver" version = "0.2.5" @@ -3699,4 +3714,4 @@ research = ["jupyterlab"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<4" -content-hash = "3d04d49326808a5955bb9acf7559627feb6775311fd16f242006a7d604692442" +content-hash = "dd5586d165d8e6d333c44d10223f88126f09700de8509a9674adc28adbc584ee" diff --git a/pyproject.toml b/pyproject.toml index 2fab322b..6e6b5f0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ numpy = {version = "^1.23.5", optional = true, python = ">=3.8"} pandas = {version = "^2.0.0", optional = true, python = ">=3.8"} flask = {version = "^2.2.2", optional = true} jupyterlab = {version = "^4.0.7", optional = true} +click-loglevel = "^0.5.0" [tool.poetry.dev-dependencies] pytest = ">=7.2" diff --git a/tests/conftest.py b/tests/conftest.py index 4f132dba..0e6a91bc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -76,10 +76,10 @@ def cli_runner(): return CliRunner() -@pytest.fixture -def db(): - from oonidata.db.create_tables import create_queries +from oonidata.db.create_tables import create_queries + +def create_db_for_fixture(): try: with ClickhouseConnection(conn_url="clickhouse://localhost/") as db: db.execute("CREATE DATABASE IF NOT EXISTS testing_oonidata") @@ -91,8 +91,19 @@ def db(): db.execute("SELECT 1") except: pytest.skip("no database connection") - for query, table_name in create_queries: - db.execute(f"DROP TABLE IF EXISTS {table_name};") + for query, _ in create_queries: db.execute(query) + return db + +@pytest.fixture +def db_notruncate(): + return create_db_for_fixture() + + +@pytest.fixture +def db(): + db = create_db_for_fixture() + for _, table_name in create_queries: + db.execute(f"TRUNCATE TABLE {table_name};") return db diff --git a/tests/test_workers.py b/tests/test_workers.py index 4b97d58d..08c9cdb2 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -13,7 +13,11 @@ from oonidata.models.nettests.http_invalid_request_line import HTTPInvalidRequestLine from oonidata.models.observations import HTTPMiddleboxObservation from oonidata.workers.analysis import make_analysis_in_a_day, make_cc_batches, make_ctrl -from oonidata.workers.common import get_obs_count_by_cc, get_prev_range +from oonidata.workers.common import ( + get_obs_count_by_cc, + get_prev_range, + maybe_delete_prev_range, +) from oonidata.workers.observations import ( make_observations_for_file_entry_batch, write_observations_to_db, @@ -22,6 +26,7 @@ from oonidata.workers.fingerprint_hunter import fingerprint_hunter from oonidata.transforms import measurement_to_observations from oonidata.transforms.nettests.measurement_transformer import MeasurementTransformer +from tests.test_cli import wait_for_mutations def test_get_prev_range(db): @@ -40,7 +45,7 @@ def test_get_prev_range(db): bucket_date = "2000-01-01" test_name = "web_connectivity" probe_cc = "IT" - min_time = datetime(2000, 1, 1, 23, 42, 00, tzinfo=timezone.utc) + min_time = datetime(2000, 1, 1, 23, 42, 00) rows = [(min_time, bucket_date, test_name, probe_cc)] for i in range(200): rows.append((min_time + timedelta(seconds=i), bucket_date, test_name, probe_cc)) @@ -63,7 +68,7 @@ def test_get_prev_range(db): bucket_date = "2000-03-01" test_name = "web_connectivity" probe_cc = "IT" - min_time = datetime(2000, 1, 1, 23, 42, 00, tzinfo=timezone.utc) + min_time = datetime(2000, 1, 1, 23, 42, 00) rows: List[Tuple[datetime, str, str, str]] = [] for i in range(10): rows.append( @@ -87,6 +92,14 @@ def test_get_prev_range(db): assert prev_range.min_created_at and prev_range.max_created_at assert prev_range.min_created_at == (min_time - timedelta(seconds=1)) assert prev_range.max_created_at == (rows[-1][0] + timedelta(seconds=1)) + + maybe_delete_prev_range( + db=db, + prev_range=prev_range, + ) + wait_for_mutations(db, "test_range") + res = db.execute("SELECT COUNT() FROM test_range") + assert res[0][0] == 10 db.execute("DROP TABLE test_range")