From 6d4eaeae1f36d3d5d402f718fa6976543e8a6fdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 20 Jan 2025 17:55:57 +0100 Subject: [PATCH] Add support for listing files in s3 by hour --- oonidata/src/oonidata/dataclient.py | 56 ++++++++++++++----- oonidata/tests/integration/test_dataclient.py | 24 +++++++- 2 files changed, 65 insertions(+), 15 deletions(-) diff --git a/oonidata/src/oonidata/dataclient.py b/oonidata/src/oonidata/dataclient.py index b039c9b1..e5ebf023 100644 --- a/oonidata/src/oonidata/dataclient.py +++ b/oonidata/src/oonidata/dataclient.py @@ -370,16 +370,18 @@ def get_v2_search_prefixes(testnames: Set[str], ccs: Set[str]) -> List[Prefix]: def get_v2_prefixes( - ccs: Set[str], testnames: Set[str], start_day: date, end_day: date + ccs: Set[str], testnames: Set[str], start_day: date, end_day: date, from_cans=False ) -> List[Prefix]: - legacy_prefixes = [ + # At this wide prefix we have both the new jsonl files and postcans + new_jsonl_and_postcans = [ Prefix(bucket_name=MC_BUCKET_NAME, prefix=f"raw/{d:%Y%m%d}") for d in date_interval(max(date(2020, 10, 20), start_day), end_day) ] - if not testnames: - testnames = list_all_testnames() prefixes = [] - if start_day < date(2020, 10, 21): + # Prior to 2020-10-21 we had a different format for JSONL files + if start_day < date(2020, 10, 21) and not from_cans: + if not testnames: + testnames = list_all_testnames() prefixes = get_v2_search_prefixes(testnames, ccs) combos = list(itertools.product(prefixes, date_interval(start_day, end_day))) # This results in a faster listing in cases where we need only a small time @@ -391,7 +393,7 @@ def get_v2_prefixes( for p, d in combos ] - return prefixes + legacy_prefixes + return prefixes + new_jsonl_and_postcans def get_can_prefixes(start_day: date, end_day: date) -> List[Prefix]: @@ -514,9 +516,9 @@ def ccs_set(probe_cc: CSVList) -> Set[str]: return set() -def get_file_entries( - start_day: date, - end_day: date, +def get_file_entries_hourly( + start_hour: datetime, + end_hour: datetime, probe_cc: CSVList, test_name: CSVList, from_cans: bool, @@ -525,14 +527,19 @@ def get_file_entries( ccs = ccs_set(probe_cc) testnames = testnames_set(test_name) - start_timestamp = datetime.combine(start_day, datetime.min.time()) - end_timestamp = datetime.combine(end_day, datetime.min.time()) + start_day = start_hour.date() + end_day = end_hour.date() + # if the end_hour is not at midnight, we need to include the next day + if end_hour.hour > 0: + end_day = end_hour.date() + timedelta(days=1) - prefix_list = get_v2_prefixes(ccs, testnames, start_day, end_day) + prefix_list = get_v2_prefixes( + ccs, testnames, start_day, end_day, from_cans=from_cans + ) if from_cans == True: prefix_list = get_can_prefixes(start_day, end_day) + prefix_list - log.debug(f"using prefix list {prefix_list}") + log.info(f"using prefix list {prefix_list}") file_entries = [] prefix_idx = 0 total_prefixes = len(prefix_list) @@ -549,7 +556,7 @@ def get_file_entries( for prefix in prefix_list: for fe in iter_file_entries(prefix): - if not fe.matches_filter(ccs, testnames, start_timestamp, end_timestamp): + if not fe.matches_filter(ccs, testnames, start_hour, end_hour): continue if from_cans == True and not fe.is_can: @@ -571,6 +578,27 @@ def get_file_entries( return file_entries +def get_file_entries( + start_day: date, + end_day: date, + probe_cc: CSVList, + test_name: CSVList, + from_cans: bool, + progress_callback: Optional[Callable[[MeasurementListProgress], None]] = None, +) -> List[FileEntry]: + start_timestamp = datetime.combine(start_day, datetime.min.time()) + end_timestamp = datetime.combine(end_day, datetime.min.time()) + + return get_file_entries_hourly( + start_hour=start_timestamp, + end_hour=end_timestamp, + probe_cc=probe_cc, + test_name=test_name, + from_cans=from_cans, + progress_callback=progress_callback, + ) + + def list_file_entries_batches( start_day: Union[date, str], end_day: Union[date, str], diff --git a/oonidata/tests/integration/test_dataclient.py b/oonidata/tests/integration/test_dataclient.py index 8fb96f32..913e2e85 100644 --- a/oonidata/tests/integration/test_dataclient.py +++ b/oonidata/tests/integration/test_dataclient.py @@ -2,6 +2,7 @@ from oonidata.dataclient import ( date_interval, + get_file_entries_hourly, iter_file_entries, get_v2_prefixes, iter_measurements, @@ -90,6 +91,27 @@ def progress_callback(p): assert len(fe_list) == 454 +def test_get_file_entries_by_hour(): + from oonidata.dataclient import ProgressStatus + + def progress_callback(p): + assert p.total_prefixes == 1 + assert ( + p.progress_status == ProgressStatus.LISTING_BEGIN + or p.progress_status == ProgressStatus.LISTING + ) + + fe_list = get_file_entries_hourly( + probe_cc="IT", + test_name="webconnectivity", + start_hour=datetime(2022, 8, 1, 11), + end_hour=datetime(2022, 8, 1, 12), + from_cans=True, + progress_callback=progress_callback, + ) + assert len(fe_list) == 4 + + def test_get_can_prefixes(): # print(get_can_prefixes(set(), set(), date(2019, 6, 2), date(2020, 10, 21))) # print(get_can_prefixes(set(), set(), date(2020, 6, 2), date(2020, 10, 21))) @@ -173,4 +195,4 @@ def test_iter_measurements(caplog): progress_callback=lambda x: print(x), ): count += 1 - assert count == 200 \ No newline at end of file + assert count == 200