Skip to content

Commit

Permalink
Refactoring of observation related activities (#86)
Browse files Browse the repository at this point in the history
* Add support for performing observation generation using multiple
cores, instead of multiple threads since it's CPU bound
* Separate observation activities into distinct smaller activities
allowing for more narrowly scoped scheduling and retry policies
* Change the type of PrevRange so that it's possible to serialize it in
JSON allowing to pass it as a parameter to activities
* Move update_assets into observation activity
* Add support for passing config file via `CONFIG_FILE` environment
variable
* Improvements to the CLI commands
* Drop several CLI arguments that should only be read from the config
file
* Other improvements related to typing
  • Loading branch information
hellais authored Sep 6, 2024
1 parent f17863b commit 99bfa0d
Show file tree
Hide file tree
Showing 25 changed files with 1,301 additions and 1,352 deletions.
8 changes: 4 additions & 4 deletions oonidata/src/oonidata/dataclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ def create_s3_client():
return boto3.client("s3", config=botoConfig(signature_version=botoSigUNSIGNED))


s3 = create_s3_client()


def date_interval(start_day: date, end_day: date):
"""
A generator for a date_interval.
Expand Down Expand Up @@ -243,6 +240,7 @@ def stream_oldcan(body: io.BytesIO, s3path: str) -> Generator[dict, None, None]:


def stream_measurements(bucket_name, s3path, ext):
s3 = create_s3_client()
body = s3.get_object(Bucket=bucket_name, Key=s3path)["Body"]
log.debug(f"streaming file s3://{bucket_name}/{s3path}")
if ext == "jsonl.gz":
Expand Down Expand Up @@ -334,6 +332,7 @@ def from_obj_dict(bucket_name: str, obj_dict: dict) -> "FileEntry":


def list_all_testnames() -> Set[str]:
s3 = create_s3_client()
testnames = set()
paginator = s3.get_paginator("list_objects_v2")
for r in paginator.paginate(Bucket=MC_BUCKET_NAME, Prefix="jsonl/", Delimiter="/"):
Expand All @@ -354,6 +353,7 @@ def get_v2_search_prefixes(testnames: Set[str], ccs: Set[str]) -> List[Prefix]:
If the ccs list is empty we will return prefixes for all countries for
which that particular testname as measurements.
"""
s3 = create_s3_client()
prefixes = []
paginator = s3.get_paginator("list_objects_v2")
for tn in testnames:
Expand Down Expand Up @@ -577,7 +577,7 @@ def list_file_entries_batches(
probe_cc: CSVList = None,
test_name: CSVList = None,
from_cans: bool = True,
) -> Tuple[List[Tuple], int]:
) -> Tuple[List[List[Tuple]], int]:
if isinstance(start_day, str):
start_day = datetime.strptime(start_day, "%Y-%m-%d").date()
if isinstance(end_day, str):
Expand Down
328 changes: 0 additions & 328 deletions oonidata/src/oonidata/s3client.py

This file was deleted.

2 changes: 1 addition & 1 deletion oonipipeline/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies = [
"opentelemetry-exporter-otlp-proto-grpc ~= 1.18.0",
"bokeh ~= 3.5.2",
"uvicorn ~= 0.25.0",
"pydantic-settings ~= 2.1.0",
"pydantic-settings ~= 2.4.0",
]

[tool.hatch.build.targets.sdist]
Expand Down
1 change: 1 addition & 0 deletions oonipipeline/src/oonipipeline/analysis/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def build_from_existing(self, db_str: str):
with sqlite3.connect(db_str) as src_db:
self.db = sqlite3.connect(":memory:")
src_db.backup(self.db)
self.db.commit()

def close(self):
self.db.close()
Expand Down
Loading

0 comments on commit 99bfa0d

Please sign in to comment.