diff --git a/oonipipeline/src/oonipipeline/temporal/common.py b/oonipipeline/src/oonipipeline/temporal/common.py index f14c9b79..6e497b43 100644 --- a/oonipipeline/src/oonipipeline/temporal/common.py +++ b/oonipipeline/src/oonipipeline/temporal/common.py @@ -86,6 +86,7 @@ def maybe_delete_prev_range(db: ClickhouseConnection, prev_range: PrevRange) -> if not prev_range.max_created_at or not prev_range.min_created_at: return "" + wait_for_mutations(db, prev_range.table_name) # Disabled due to: https://github.com/ClickHouse/ClickHouse/issues/40651 # db.execute("SET allow_experimental_lightweight_delete = true;") @@ -99,7 +100,6 @@ def maybe_delete_prev_range(db: ClickhouseConnection, prev_range: PrevRange) -> q = f"ALTER TABLE {prev_range.table_name} DELETE " final_query = q + where db.execute(final_query, q_args) - wait_for_mutations(db, prev_range.table_name) return final_query diff --git a/oonipipeline/src/oonipipeline/temporal/workflows/observations.py b/oonipipeline/src/oonipipeline/temporal/workflows/observations.py index 2916f636..d354149a 100644 --- a/oonipipeline/src/oonipipeline/temporal/workflows/observations.py +++ b/oonipipeline/src/oonipipeline/temporal/workflows/observations.py @@ -70,15 +70,10 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: await workflow.execute_activity( optimize_tables, - OptimizeTablesParams( - clickhouse=params.clickhouse, table_names=["buffer_obs_web"] - ), + OptimizeTablesParams(clickhouse=params.clickhouse, table_names=["obs_web"]), start_to_close_timeout=timedelta(minutes=20), retry_policy=RetryPolicy(maximum_attempts=10), ) - workflow.logger.info( - f"finished optimize_tables for bucket_date={params.bucket_date}" - ) previous_ranges = await workflow.execute_activity( get_previous_range, @@ -89,8 +84,8 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: probe_cc=params.probe_cc, tables=["obs_web"], ), - start_to_close_timeout=timedelta(minutes=20), - retry_policy=RetryPolicy(maximum_attempts=10), + start_to_close_timeout=timedelta(minutes=2), + retry_policy=RetryPolicy(maximum_attempts=4), ) workflow.logger.info( f"finished get_previous_range for bucket_date={params.bucket_date}" @@ -108,14 +103,6 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: f"{total_t.pretty} speed: {obs_res['mb_per_sec']}MB/s ({obs_res['measurement_per_sec']}msmt/s)" ) - await workflow.execute_activity( - optimize_tables, - OptimizeTablesParams( - clickhouse=params.clickhouse, table_names=["buffer_obs_web"] - ), - start_to_close_timeout=timedelta(minutes=20), - retry_policy=RetryPolicy(maximum_attempts=10), - ) workflow.logger.info( f"finished optimize_tables for bucket_date={params.bucket_date}" )