diff --git a/target_bigquery/sinks.py b/target_bigquery/sinks.py index e6bb367..683481a 100644 --- a/target_bigquery/sinks.py +++ b/target_bigquery/sinks.py @@ -28,7 +28,6 @@ PARTITIONS = "partitions" - class BigQuerySink(BatchSink): """BigQuery target sink class.""" @@ -357,14 +356,18 @@ def transform_record(record): with open(temp_file, "wb") as tempfile: elapsed_time = stop_timer(start) - self.logger.info(f"[{self.stream_name}][{batch_id}] open(temp_file) wb... elapsed time: {elapsed_time} ms") + self.logger.info( + f"[{self.stream_name}][{batch_id}] open(temp_file) wb... elapsed time: {elapsed_time} ms" + ) writer(tempfile, self.parsed_schema, avro_records) self.logger.info(f"[{self.stream_name}][{batch_id}] Uploading LoadJob...") with open(temp_file, "r+b") as tempfile: elapsed_time = stop_timer(start) - self.logger.info(f"[{self.stream_name}][{batch_id}] open(temp_file) r+b... elapsed time: {elapsed_time} ms") + self.logger.info( + f"[{self.stream_name}][{batch_id}] open(temp_file) r+b... elapsed time: {elapsed_time} ms" + ) load_job = self.load_table_from_file(tempfile, batch_id) # Delete temp file once we are done with it @@ -384,22 +387,28 @@ def transform_record(record): # Await job to finish elapsed_time = stop_timer(start) - self.logger.info(f"[{self.stream_name}][{batch_id}] before load_job.result() - time - {elapsed_time} ms") + self.logger.info( + f"[{self.stream_name}][{batch_id}] before load_job.result() - time - {elapsed_time} ms" + ) load_job.result() elapsed_time = stop_timer(start) - self.logger.info(f"[{self.stream_name}][{batch_id}] after load_job.result() - time - {elapsed_time} ms") + self.logger.info( + f"[{self.stream_name}][{batch_id}] after load_job.result() - time - {elapsed_time} ms" + ) queries.append(self.update_from_temp_table(batch_id, batch_meta)) queries.append(self.drop_temp_table(batch_id)) - self.logger.info(f"[{self.stream_name}][{batch_id}] before query - time - {elapsed_time} ms") + self.logger.info( + f"[{self.stream_name}][{batch_id}] before query - time - {elapsed_time} ms" + ) self.query(queries) - self.logger.info(f"[{self.stream_name}][{batch_id}] after query - time - {elapsed_time} ms") - - self.logger.info(f"[{self.stream_name}][{batch_id}] Finished batch") + self.logger.info( + f"[{self.stream_name}][{batch_id}] Finished batch - time - {elapsed_time} ms" + ) def process_batch_files( self, @@ -439,8 +448,10 @@ def process_batch_files( f"Unsupported batch encoding format: {encoding.format}" ) + def start_timer(): return time.time() + def stop_timer(start_time): - return (time.time() - start_time) * 1000 \ No newline at end of file + return (time.time() - start_time) * 1000