Skip to content

Commit

Permalink
Add support for listing files in s3 by hour
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Jan 20, 2025
1 parent 4058e3f commit 6d4eaea
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 15 deletions.
56 changes: 42 additions & 14 deletions oonidata/src/oonidata/dataclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,18 @@ def get_v2_search_prefixes(testnames: Set[str], ccs: Set[str]) -> List[Prefix]:


def get_v2_prefixes(
ccs: Set[str], testnames: Set[str], start_day: date, end_day: date
ccs: Set[str], testnames: Set[str], start_day: date, end_day: date, from_cans=False
) -> List[Prefix]:
legacy_prefixes = [
# At this wide prefix we have both the new jsonl files and postcans
new_jsonl_and_postcans = [
Prefix(bucket_name=MC_BUCKET_NAME, prefix=f"raw/{d:%Y%m%d}")
for d in date_interval(max(date(2020, 10, 20), start_day), end_day)
]
if not testnames:
testnames = list_all_testnames()
prefixes = []
if start_day < date(2020, 10, 21):
# Prior to 2020-10-21 we had a different format for JSONL files
if start_day < date(2020, 10, 21) and not from_cans:
if not testnames:
testnames = list_all_testnames()
prefixes = get_v2_search_prefixes(testnames, ccs)
combos = list(itertools.product(prefixes, date_interval(start_day, end_day)))
# This results in a faster listing in cases where we need only a small time
Expand All @@ -391,7 +393,7 @@ def get_v2_prefixes(
for p, d in combos
]

return prefixes + legacy_prefixes
return prefixes + new_jsonl_and_postcans


def get_can_prefixes(start_day: date, end_day: date) -> List[Prefix]:
Expand Down Expand Up @@ -514,9 +516,9 @@ def ccs_set(probe_cc: CSVList) -> Set[str]:
return set()


def get_file_entries(
start_day: date,
end_day: date,
def get_file_entries_hourly(
start_hour: datetime,
end_hour: datetime,
probe_cc: CSVList,
test_name: CSVList,
from_cans: bool,
Expand All @@ -525,14 +527,19 @@ def get_file_entries(
ccs = ccs_set(probe_cc)
testnames = testnames_set(test_name)

start_timestamp = datetime.combine(start_day, datetime.min.time())
end_timestamp = datetime.combine(end_day, datetime.min.time())
start_day = start_hour.date()
end_day = end_hour.date()
# if the end_hour is not at midnight, we need to include the next day
if end_hour.hour > 0:
end_day = end_hour.date() + timedelta(days=1)

prefix_list = get_v2_prefixes(ccs, testnames, start_day, end_day)
prefix_list = get_v2_prefixes(
ccs, testnames, start_day, end_day, from_cans=from_cans
)
if from_cans == True:
prefix_list = get_can_prefixes(start_day, end_day) + prefix_list

log.debug(f"using prefix list {prefix_list}")
log.info(f"using prefix list {prefix_list}")
file_entries = []
prefix_idx = 0
total_prefixes = len(prefix_list)
Expand All @@ -549,7 +556,7 @@ def get_file_entries(

for prefix in prefix_list:
for fe in iter_file_entries(prefix):
if not fe.matches_filter(ccs, testnames, start_timestamp, end_timestamp):
if not fe.matches_filter(ccs, testnames, start_hour, end_hour):
continue

if from_cans == True and not fe.is_can:
Expand All @@ -571,6 +578,27 @@ def get_file_entries(
return file_entries


def get_file_entries(
start_day: date,
end_day: date,
probe_cc: CSVList,
test_name: CSVList,
from_cans: bool,
progress_callback: Optional[Callable[[MeasurementListProgress], None]] = None,
) -> List[FileEntry]:
start_timestamp = datetime.combine(start_day, datetime.min.time())
end_timestamp = datetime.combine(end_day, datetime.min.time())

return get_file_entries_hourly(
start_hour=start_timestamp,
end_hour=end_timestamp,
probe_cc=probe_cc,
test_name=test_name,
from_cans=from_cans,
progress_callback=progress_callback,
)


def list_file_entries_batches(
start_day: Union[date, str],
end_day: Union[date, str],
Expand Down
24 changes: 23 additions & 1 deletion oonidata/tests/integration/test_dataclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from oonidata.dataclient import (
date_interval,
get_file_entries_hourly,
iter_file_entries,
get_v2_prefixes,
iter_measurements,
Expand Down Expand Up @@ -90,6 +91,27 @@ def progress_callback(p):
assert len(fe_list) == 454


def test_get_file_entries_by_hour():
from oonidata.dataclient import ProgressStatus

def progress_callback(p):
assert p.total_prefixes == 1
assert (
p.progress_status == ProgressStatus.LISTING_BEGIN
or p.progress_status == ProgressStatus.LISTING
)

fe_list = get_file_entries_hourly(
probe_cc="IT",
test_name="webconnectivity",
start_hour=datetime(2022, 8, 1, 11),
end_hour=datetime(2022, 8, 1, 12),
from_cans=True,
progress_callback=progress_callback,
)
assert len(fe_list) == 4


def test_get_can_prefixes():
# print(get_can_prefixes(set(), set(), date(2019, 6, 2), date(2020, 10, 21)))
# print(get_can_prefixes(set(), set(), date(2020, 6, 2), date(2020, 10, 21)))
Expand Down Expand Up @@ -173,4 +195,4 @@ def test_iter_measurements(caplog):
progress_callback=lambda x: print(x),
):
count += 1
assert count == 200
assert count == 200

0 comments on commit 6d4eaea

Please sign in to comment.