From 2b414e026446723f9a5aaff8a1411c28c18defe2 Mon Sep 17 00:00:00 2001 From: Victor Lindell Date: Wed, 8 May 2024 09:38:51 +0200 Subject: [PATCH] fix: lint --- target_bigquery/sinks.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/target_bigquery/sinks.py b/target_bigquery/sinks.py index 8784cc5..7e7a902 100644 --- a/target_bigquery/sinks.py +++ b/target_bigquery/sinks.py @@ -3,8 +3,8 @@ from __future__ import annotations import json -import uuid import time +import uuid from datetime import datetime, timedelta from gzip import GzipFile from gzip import open as gzip_open @@ -12,12 +12,9 @@ from tempfile import mkstemp from typing import IO, Any, BinaryIO, Dict, List, Optional, Sequence -from google.api_core.exceptions import GoogleAPICallError - from fastavro import parse_schema, writer +from google.api_core.exceptions import GoogleAPICallError from google.cloud import bigquery -# from google.cloud.bigquery import DEFAULT_RETRY - from singer_sdk.helpers._batch import ( BaseBatchFileEncoding, BatchFileFormat, @@ -29,6 +26,10 @@ from .avro import avro_schema, fix_recursive_types_in_dict from .bq import column_type, get_client +# from google.cloud.bigquery import DEFAULT_RETRY + + + PARTITIONS = "partitions" @@ -398,7 +399,8 @@ def transform_record(record): try: result = load_job.result() self.logger.info( - f"[{self.stream_name}][{batch_id}] result.eror { result.errors }") + f"[{self.stream_name}][{batch_id}] result.eror { result.errors }" + ) # suggested? # result = load_job.result(DEFAULT_RETRY.with_deadline(60), 500) @@ -423,15 +425,19 @@ def transform_record(record): except GoogleAPICallError as e: # Handle Google API call errors - self.logger.info(f"[{self.stream_name}][{batch_id}] Google API call error: {e}") + self.logger.info( + f"[{self.stream_name}][{batch_id}] Google API call error: {e}" + ) except TimeoutError: # Handle timeout errors - self.logger.info(f"[{self.stream_name}][{batch_id}] Timeout error: Job did not complete in the given timeout.") + self.logger.info( + f"[{self.stream_name}][{batch_id}] Timeout error: Job did not complete in the given timeout." + ) except Exception as e: # Handle other unexpected errors - self.logger.info(f"[{self.stream_name}][{batch_id}] An unexpected error occurred: {e}") - - + self.logger.info( + f"[{self.stream_name}][{batch_id}] An unexpected error occurred: {e}" + ) def process_batch_files( self,