Skip to content

Commit

Permalink
Merge branch 'master' into anytype
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Lindell committed May 7, 2024
2 parents bcf85d7 + b7ef37e commit bd93efe
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions target_bigquery/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
PARTITIONS = "partitions"



class BigQuerySink(BatchSink):
"""BigQuery target sink class."""

Expand Down Expand Up @@ -357,14 +356,18 @@ def transform_record(record):

with open(temp_file, "wb") as tempfile:
elapsed_time = stop_timer(start)
self.logger.info(f"[{self.stream_name}][{batch_id}] open(temp_file) wb... elapsed time: {elapsed_time} ms")
self.logger.info(
f"[{self.stream_name}][{batch_id}] open(temp_file) wb... elapsed time: {elapsed_time} ms"
)
writer(tempfile, self.parsed_schema, avro_records)

self.logger.info(f"[{self.stream_name}][{batch_id}] Uploading LoadJob...")

with open(temp_file, "r+b") as tempfile:
elapsed_time = stop_timer(start)
self.logger.info(f"[{self.stream_name}][{batch_id}] open(temp_file) r+b... elapsed time: {elapsed_time} ms")
self.logger.info(
f"[{self.stream_name}][{batch_id}] open(temp_file) r+b... elapsed time: {elapsed_time} ms"
)
load_job = self.load_table_from_file(tempfile, batch_id)

# Delete temp file once we are done with it
Expand All @@ -384,22 +387,28 @@ def transform_record(record):

# Await job to finish
elapsed_time = stop_timer(start)
self.logger.info(f"[{self.stream_name}][{batch_id}] before load_job.result() - time - {elapsed_time} ms")
self.logger.info(
f"[{self.stream_name}][{batch_id}] before load_job.result() - time - {elapsed_time} ms"
)
load_job.result()
elapsed_time = stop_timer(start)

self.logger.info(f"[{self.stream_name}][{batch_id}] after load_job.result() - time - {elapsed_time} ms")
self.logger.info(
f"[{self.stream_name}][{batch_id}] after load_job.result() - time - {elapsed_time} ms"
)

queries.append(self.update_from_temp_table(batch_id, batch_meta))
queries.append(self.drop_temp_table(batch_id))

self.logger.info(f"[{self.stream_name}][{batch_id}] before query - time - {elapsed_time} ms")
self.logger.info(
f"[{self.stream_name}][{batch_id}] before query - time - {elapsed_time} ms"
)

self.query(queries)

self.logger.info(f"[{self.stream_name}][{batch_id}] after query - time - {elapsed_time} ms")

self.logger.info(f"[{self.stream_name}][{batch_id}] Finished batch")
self.logger.info(
f"[{self.stream_name}][{batch_id}] Finished batch - time - {elapsed_time} ms"
)

def process_batch_files(
self,
Expand Down Expand Up @@ -439,8 +448,10 @@ def process_batch_files(
f"Unsupported batch encoding format: {encoding.format}"
)


def start_timer():
return time.time()


def stop_timer(start_time):
return (time.time() - start_time) * 1000
return (time.time() - start_time) * 1000

0 comments on commit bd93efe

Please sign in to comment.