Skip to content

Commit

Permalink
debug: additional output and error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Lindell committed May 8, 2024
1 parent bd93efe commit 357e37e
Showing 1 changed file with 37 additions and 14 deletions.
51 changes: 37 additions & 14 deletions target_bigquery/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
from tempfile import mkstemp
from typing import IO, Any, BinaryIO, Dict, List, Optional, Sequence

from google.api_core.exceptions import GoogleAPICallError

from fastavro import parse_schema, writer
from google.cloud import bigquery
# from google.cloud.bigquery import DEFAULT_RETRY

from singer_sdk.helpers._batch import (
BaseBatchFileEncoding,
BatchFileFormat,
Expand Down Expand Up @@ -390,25 +394,44 @@ def transform_record(record):
self.logger.info(
f"[{self.stream_name}][{batch_id}] before load_job.result() - time - {elapsed_time} ms"
)
load_job.result()
elapsed_time = stop_timer(start)

self.logger.info(
f"[{self.stream_name}][{batch_id}] after load_job.result() - time - {elapsed_time} ms"
)
try:
result = load_job.result()
self.logger.info(
f"[{self.stream_name}][{batch_id}] result.eror { result.errors }")
# suggested?
# result = load_job.result(DEFAULT_RETRY.with_deadline(60), 500)

queries.append(self.update_from_temp_table(batch_id, batch_meta))
queries.append(self.drop_temp_table(batch_id))
elapsed_time = stop_timer(start)

self.logger.info(
f"[{self.stream_name}][{batch_id}] before query - time - {elapsed_time} ms"
)
self.logger.info(
f"[{self.stream_name}][{batch_id}] after load_job.result() - time - {elapsed_time} ms"
)

queries.append(self.update_from_temp_table(batch_id, batch_meta))
queries.append(self.drop_temp_table(batch_id))

self.logger.info(
f"[{self.stream_name}][{batch_id}] before query - time - {elapsed_time} ms"
)

self.query(queries)

self.logger.info(
f"[{self.stream_name}][{batch_id}] Finished batch - time - {elapsed_time} ms"
)

except GoogleAPICallError as e:
# Handle Google API call errors
self.logger.info(f"[{self.stream_name}][{batch_id}] Google API call error: {e}")
except TimeoutError:
# Handle timeout errors
self.logger.info(f"[{self.stream_name}][{batch_id}] Timeout error: Job did not complete in the given timeout.")
except Exception as e:
# Handle other unexpected errors
self.logger.info(f"[{self.stream_name}][{batch_id}] An unexpected error occurred: {e}")

self.query(queries)

self.logger.info(
f"[{self.stream_name}][{batch_id}] Finished batch - time - {elapsed_time} ms"
)

def process_batch_files(
self,
Expand Down

0 comments on commit 357e37e

Please sign in to comment.