Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: kirkrodrigues <[email protected]>
  • Loading branch information
haiqi96 and kirkrodrigues authored Nov 17, 2024
1 parent 3d86291 commit 4b84812
Showing 1 changed file with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

# Dictionary that maps IDs of clp-s archives being extracted to IDs of jobs waiting for them
active_archive_json_extractions: Dict[str, List[str]] = {}

reducer_connection_queue: Optional[asyncio.Queue] = None


Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(self, job_id: str, job_config: Dict[str, Any], db_conn):
db_conn, self._job_config
)
if self._archive_id is None:
raise ValueError("Job parameters does not resolve to an existing archive")
raise ValueError("Job parameters don't resolve to an existing archive")

self._job_config.file_split_id = self._file_split_id

Expand All @@ -134,7 +135,7 @@ def mark_job_as_waiting(self) -> None:

def create_stream_extraction_job(self) -> QueryJob:
logger.info(
f"Creating ir extraction job {self._job_id} on file_split: {self._file_split_id}"
f"Creating IR extraction job {self._job_id} for file_split: {self._file_split_id}"
)
return ExtractIrJob(
id=self._job_id,
Expand All @@ -149,7 +150,7 @@ def __init__(self, job_id: str, job_config: Dict[str, Any], db_conn):
self._job_config = ExtractJsonJobConfig.parse_obj(job_config)
self._archive_id = self._job_config.archive_id
if not check_if_archive_exists(db_conn, self._archive_id):
raise ValueError(f"archive {self._archive_id} does not exist")
raise ValueError(f"Archive {self._archive_id} doesn't exist")

def get_stream_id(self) -> Optional[str]:
return self._archive_id
Expand Down Expand Up @@ -451,7 +452,6 @@ def check_if_archive_exists(
db_conn,
archive_id: str,
) -> bool:

query = f"""SELECT 1
FROM {CLP_METADATA_TABLE_PREFIX}archives WHERE
id = %s
Expand Down Expand Up @@ -665,7 +665,7 @@ def handle_pending_query_jobs(
else:
job_handle = JsonExtractionHandle(job_id, job_config, db_conn)
except Exception:
logger.exception("Failed to initialize extraction job")
logger.exception("Failed to initialize extraction job handle")
if not set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
Expand All @@ -679,28 +679,29 @@ def handle_pending_query_jobs(
logger.error(f"Failed to set job {job_id} as failed")
continue

# NOTE: The following two if blocks of `is_stream_extraction_active` and
# NOTE: The following two if blocks for `is_stream_extraction_active` and
# `is_stream_extracted` should not be reordered.
# We must:
# 1. First, check if the stream is in the process of being extracted
# (`is_stream_extraction_active`).
# 2. Then, check if the stream has already been extracted
# (`is_stream_extracted`).
#
# This order ensures correctness because `is_stream_extracted` returns True if
# any chunk of the stream has been extracted, but it does not guarantee that *all*
# chunks are extracted. If `is_stream_extracted` is checked first and the job is
# marked as successful based on its result, it is possible that the extraction job
# is still in progress, meaning the specific chunk requested by the web UI might
# not yet be ready.

# Check if the target is currently being extracted; if so, add the job ID to the
# list of jobs waiting for it.
# The logic below works as follows:
# 1. It checks if a stream is already being extracted
# (`is_stream_extraction_active`) and if so, it marks the new job as waiting for
# the old job to finish.
# 2. Otherwise, it checks if a stream has already been extracted
# (`is_stream_extracted`) and if so, it marks the new job as complete.
# 3. Otherwise, it creates a new stream extraction job.
#
# `is_stream_extracted` only checks if a single stream has been extracted rather
# than whether all required streams have been extracted. This means that we can't
# use it to check if the old job is complete; instead, we need to employ the
# aforementioned logic.

# Check if the required streams are currently being extracted; if so, add the job ID
# to the list of jobs waiting for it.
if job_handle.is_stream_extraction_active():
job_handle.mark_job_as_waiting()
logger.info(
f"target {job_handle.get_stream_id()} is being extracted, "
f"so mark job {job_id} as running"
f"Stream {job_handle.get_stream_id()} is already being extracted,"
f" so mark job {job_id} as running."
)
if not set_job_or_task_status(
db_conn,
Expand All @@ -714,11 +715,11 @@ def handle_pending_query_jobs(
logger.error(f"Failed to set job {job_id} as running")
continue

# Check if a stream file in the target has already been extracted
# Check if a required stream file has already been extracted
if job_handle.is_stream_extracted(results_cache_uri, stream_collection_name):
logger.info(
f"target {job_handle.get_stream_id()} already extracted, "
f"so mark job {job_id} as done"
f"Stream {job_handle.get_stream_id()} already extracted,"
f" so mark job {job_id} as succeeded."
)
if not set_job_or_task_status(
db_conn,
Expand Down Expand Up @@ -746,7 +747,7 @@ def handle_pending_query_jobs(

job_handle.mark_job_as_waiting()
active_jobs[job_id] = next_stream_extraction_job
logger.info(f"Dispatched stream extraction job {job_id} on archive: {archive_id}")
logger.info(f"Dispatched stream extraction job {job_id} for archive: {archive_id}")

else:
# NOTE: We're skipping the job for this iteration, but its status will remain
Expand Down Expand Up @@ -927,13 +928,13 @@ async def handle_finished_stream_extraction_job(
task_id = task_result.task_id
if not QueryJobStatus.SUCCEEDED == task_result.status:
logger.error(
f"extraction task job-{job_id}-task-{task_id} failed. "
f"Extraction task job-{job_id}-task-{task_id} failed. "
f"Check {task_result.error_log_path} for details."
)
new_job_status = QueryJobStatus.FAILED
else:
logger.info(
f"extraction task job-{job_id}-task-{task_id} succeeded in "
f"Extraction task job-{job_id}-task-{task_id} succeeded in "
f"{task_result.duration} second(s)."
)

Expand Down Expand Up @@ -1011,7 +1012,7 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri):
await handle_finished_search_job(
db_conn, search_job, returned_results, results_cache_uri
)
elif job_type in [QueryJobType.EXTRACT_JSON, QueryJobType.EXTRACT_IR]:
elif job_type in (QueryJobType.EXTRACT_JSON, QueryJobType.EXTRACT_IR):
await handle_finished_stream_extraction_job(db_conn, job, returned_results)
else:
logger.error(f"Unexpected job type: {job_type}, skipping job {job_id}")
Expand Down

0 comments on commit 4b84812

Please sign in to comment.