Skip to content

Commit

Permalink
Add bq job labels to job configs
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex-Krykun committed Jan 31, 2025
1 parent a219818 commit 30e0be2
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ def get_table_from_response(cls, resp) -> "agate.Table":
column_names = [field.name for field in resp.schema]
return agate_helper.table_from_data_flat(resp, column_names)

def get_job_labels(self):
labels = self.get_labels_from_query_comment()
labels["dbt_invocation_id"] = get_invocation_id()

return labels

def get_labels_from_query_comment(cls):
if (
hasattr(cls.profile, "query_comment")
Expand Down Expand Up @@ -244,9 +250,7 @@ def raw_execute(

fire_event(SQLQuery(conn_name=conn.name, sql=sql, node_info=get_node_info()))

labels = self.get_labels_from_query_comment()

labels["dbt_invocation_id"] = get_invocation_id()
labels = self.get_job_labels()

job_params = {
"use_legacy_sql": use_legacy_sql,
Expand Down Expand Up @@ -424,6 +428,7 @@ def copy_bq_table(self, source, destination, write_disposition) -> None:
destination_ref = self.table_ref(
destination.database, destination.schema, destination.table
)
labels = self.get_job_labels()

logger.debug(
'Copying table(s) "{}" to "{}" with disposition: "{}"',
Expand All @@ -440,7 +445,7 @@ def copy_bq_table(self, source, destination, write_disposition) -> None:
copy_job = client.copy_table(
source_ref_array,
destination_ref,
job_config=CopyJobConfig(write_disposition=write_disposition),
job_config=CopyJobConfig(write_disposition=write_disposition, labels=labels),
retry=self._retry.create_reopen_with_deadline(conn),
)
copy_job.result(timeout=self._retry.create_job_execution_timeout(fallback=300))
Expand All @@ -456,8 +461,10 @@ def write_dataframe_to_table(
field_delimiter: str,
fallback_timeout: Optional[float] = None,
) -> None:
labels = self.get_job_labels()
load_config = LoadJobConfig(
skip_leading_rows=1,
labels=labels,
schema=table_schema,
field_delimiter=field_delimiter,
)
Expand All @@ -477,6 +484,10 @@ def write_file_to_table(
config = kwargs["kwargs"]
if "schema" in config:
config["schema"] = json.load(config["schema"])

if "labels" not in config:
config["labels"] = self.get_job_labels()

load_config = LoadJobConfig(**config)
table = self.table_ref(database, schema, identifier)
self._write_file_to_table(client, file_path, table, load_config, fallback_timeout)
Expand Down

0 comments on commit 30e0be2

Please sign in to comment.