From d4bb6a51441709dae332fd4bb343bdfa74a0b522 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 30 Sep 2021 15:40:46 +0000 Subject: [PATCH 01/14] add possibility to provide cred via filename --- dask_bigquery/core.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 1fd396d..dcbe346 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -12,12 +12,13 @@ from google.api_core import client_info as rest_client_info from google.api_core.gapic_v1 import client_info as grpc_client_info from google.cloud import bigquery, bigquery_storage +from google.oauth2 import service_account import dask_bigquery @contextmanager -def bigquery_clients(project_id): +def bigquery_clients(project_id, cred_fpath): """This context manager is a temporary solution until there is an upstream solution to handle this. See googleapis/google-cloud-python#9457 @@ -30,7 +31,14 @@ def bigquery_clients(project_id): user_agent=f"dask-bigquery/{dask_bigquery.__version__}" ) - with bigquery.Client(project_id, client_info=bq_client_info) as bq_client: + if cred_fpath: + credentials = service_account.Credentials.from_service_account_file(cred_fpath) + else: + credentials = cred_fpath # if no path set to None to try read default + + with bigquery.Client( + project_id, credentials=credentials, client_info=bq_client_info + ) as bq_client: bq_storage_client = bigquery_storage.BigQueryReadClient( credentials=bq_client._credentials, client_info=bqstorage_client_info, @@ -53,6 +61,7 @@ def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs): def bigquery_read( make_create_read_session_request: callable, project_id: str, + cred_fpath: str, read_kwargs: dict, stream_name: str, ) -> pd.DataFrame: @@ -71,7 +80,7 @@ def bigquery_read( NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream """ - with bigquery_clients(project_id) as (_, bqs_client): + with bigquery_clients(project_id, cred_fpath) as (_, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) schema = pyarrow.ipc.read_schema( pyarrow.py_buffer(session.arrow_schema.serialized_schema) @@ -89,6 +98,8 @@ def read_gbq( dataset_id: str, table_id: str, row_filter="", + *, + cred_fpath: str = None, read_kwargs: dict = None, ): """Read table as dask dataframe using BigQuery Storage API via Arrow format. @@ -104,6 +115,8 @@ def read_gbq( BigQuery table within dataset row_filter: str SQL text filtering statement to pass to `row_restriction` + cred_fpath: str + path for the service account key json file. read_kwargs: dict kwargs to pass to read_rows() @@ -112,7 +125,7 @@ def read_gbq( Dask DataFrame """ read_kwargs = read_kwargs or {} - with bigquery_clients(project_id) as (bq_client, bqs_client): + with bigquery_clients(project_id, cred_fpath) as (bq_client, bqs_client): table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") if table_ref.table_type == "VIEW": raise TypeError("Table type VIEW not supported") @@ -157,6 +170,7 @@ def make_create_read_session_request(row_filter=""): bigquery_read, make_create_read_session_request, project_id, + cred_fpath, read_kwargs, ), label=label, From 7b874e8c08b306b6f182377bff6c44d382ffec64 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 30 Sep 2021 17:31:24 +0000 Subject: [PATCH 02/14] pass credentials not cred file, broken cannot pickle --- dask_bigquery/core.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index dcbe346..efed643 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -13,12 +13,13 @@ from google.api_core.gapic_v1 import client_info as grpc_client_info from google.cloud import bigquery, bigquery_storage from google.oauth2 import service_account +from google.oauth2.service_account import Credentials import dask_bigquery @contextmanager -def bigquery_clients(project_id, cred_fpath): +def bigquery_clients(project_id, credentials): """This context manager is a temporary solution until there is an upstream solution to handle this. See googleapis/google-cloud-python#9457 @@ -31,11 +32,6 @@ def bigquery_clients(project_id, cred_fpath): user_agent=f"dask-bigquery/{dask_bigquery.__version__}" ) - if cred_fpath: - credentials = service_account.Credentials.from_service_account_file(cred_fpath) - else: - credentials = cred_fpath # if no path set to None to try read default - with bigquery.Client( project_id, credentials=credentials, client_info=bq_client_info ) as bq_client: @@ -61,7 +57,7 @@ def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs): def bigquery_read( make_create_read_session_request: callable, project_id: str, - cred_fpath: str, + credentials: Credentials, read_kwargs: dict, stream_name: str, ) -> pd.DataFrame: @@ -80,7 +76,7 @@ def bigquery_read( NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream """ - with bigquery_clients(project_id, cred_fpath) as (_, bqs_client): + with bigquery_clients(project_id, credentials) as (_, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) schema = pyarrow.ipc.read_schema( pyarrow.py_buffer(session.arrow_schema.serialized_schema) @@ -125,7 +121,13 @@ def read_gbq( Dask DataFrame """ read_kwargs = read_kwargs or {} - with bigquery_clients(project_id, cred_fpath) as (bq_client, bqs_client): + + if cred_fpath: + credentials = service_account.Credentials.from_service_account_file(cred_fpath) + else: + credentials = cred_fpath # if no path set to None to try read default + + with bigquery_clients(project_id, credentials) as (bq_client, bqs_client): table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") if table_ref.table_type == "VIEW": raise TypeError("Table type VIEW not supported") @@ -170,7 +172,7 @@ def make_create_read_session_request(row_filter=""): bigquery_read, make_create_read_session_request, project_id, - cred_fpath, + credentials, read_kwargs, ), label=label, From 1b4c744991734b2565385e3175712817645f503e Mon Sep 17 00:00:00 2001 From: ncclementi Date: Tue, 5 Oct 2021 16:51:22 -0400 Subject: [PATCH 03/14] add read cred from json --- dask_bigquery/core.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index efed643..e471cd1 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -1,5 +1,7 @@ from __future__ import annotations +import json +import os from contextlib import contextmanager from functools import partial @@ -13,7 +15,6 @@ from google.api_core.gapic_v1 import client_info as grpc_client_info from google.cloud import bigquery, bigquery_storage from google.oauth2 import service_account -from google.oauth2.service_account import Credentials import dask_bigquery @@ -57,8 +58,8 @@ def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs): def bigquery_read( make_create_read_session_request: callable, project_id: str, - credentials: Credentials, read_kwargs: dict, + creds: dict, stream_name: str, ) -> pd.DataFrame: """Read a single batch of rows via BQ Storage API, in Arrow binary format. @@ -71,11 +72,14 @@ def bigquery_read( Name of the BigQuery project. read_kwargs: dict kwargs to pass to read_rows() + creds: dict + credentials dictionary stream_name: str BigQuery Storage API Stream "name" NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream """ + credentials = service_account.Credentials.from_service_account_info(creds) with bigquery_clients(project_id, credentials) as (_, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) schema = pyarrow.ipc.read_schema( @@ -95,7 +99,6 @@ def read_gbq( table_id: str, row_filter="", *, - cred_fpath: str = None, read_kwargs: dict = None, ): """Read table as dask dataframe using BigQuery Storage API via Arrow format. @@ -111,8 +114,6 @@ def read_gbq( BigQuery table within dataset row_filter: str SQL text filtering statement to pass to `row_restriction` - cred_fpath: str - path for the service account key json file. read_kwargs: dict kwargs to pass to read_rows() @@ -122,10 +123,13 @@ def read_gbq( """ read_kwargs = read_kwargs or {} - if cred_fpath: - credentials = service_account.Credentials.from_service_account_file(cred_fpath) - else: - credentials = cred_fpath # if no path set to None to try read default + creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") + if creds_path is None: + raise ValueError("No credentials found") + with open(creds_path) as f: + creds = json.load(f) + + credentials = service_account.Credentials.from_service_account_file(creds_path) with bigquery_clients(project_id, credentials) as (bq_client, bqs_client): table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") @@ -172,8 +176,8 @@ def make_create_read_session_request(row_filter=""): bigquery_read, make_create_read_session_request, project_id, - credentials, read_kwargs, + creds, ), label=label, ) From 1ec5dea8e5f0c260182c65d1fc708a7833aaa40e Mon Sep 17 00:00:00 2001 From: ncclementi Date: Tue, 5 Oct 2021 16:52:07 -0400 Subject: [PATCH 04/14] modify read to include credentials setup --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f55eb48..d35316c 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,11 @@ Read data from Google BigQuery with Dask ## Example -`dask-bigquery` assumes that you are already authenticated. +`dask-bigquery` assumes that you are already authenticated and have an environment variable + +``` +GOOGLE_APPLICATION_CREDENTIALS=path_to/creds.json +``` ```python import dask_bigquery From 8de4d1ef946c0aee46093ac727cffa370642b877 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 7 Oct 2021 16:43:49 -0400 Subject: [PATCH 05/14] use pyjwt to create token --- dask_bigquery/core.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index e471cd1..5fbc2e4 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -5,6 +5,7 @@ from contextlib import contextmanager from functools import partial +import jwt import pandas as pd import pyarrow from dask.base import tokenize @@ -59,7 +60,7 @@ def bigquery_read( make_create_read_session_request: callable, project_id: str, read_kwargs: dict, - creds: dict, + cred_token, stream_name: str, ) -> pd.DataFrame: """Read a single batch of rows via BQ Storage API, in Arrow binary format. @@ -79,6 +80,8 @@ def bigquery_read( NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream """ + creds = jwt.decode(cred_token, "secret", algorithms=["HS256"]) + credentials = service_account.Credentials.from_service_account_info(creds) with bigquery_clients(project_id, credentials) as (_, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) @@ -129,6 +132,8 @@ def read_gbq( with open(creds_path) as f: creds = json.load(f) + cred_token = jwt.encode(creds, "secret", algorithm="HS256") + credentials = service_account.Credentials.from_service_account_file(creds_path) with bigquery_clients(project_id, credentials) as (bq_client, bqs_client): @@ -177,7 +182,7 @@ def make_create_read_session_request(row_filter=""): make_create_read_session_request, project_id, read_kwargs, - creds, + cred_token, ), label=label, ) From 8d9c026a464af6524286b1c796019a25fb6e263d Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 7 Oct 2021 19:02:24 -0400 Subject: [PATCH 06/14] add type to cred_token variable --- dask_bigquery/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 5fbc2e4..09f192b 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -60,7 +60,7 @@ def bigquery_read( make_create_read_session_request: callable, project_id: str, read_kwargs: dict, - cred_token, + cred_token: str, stream_name: str, ) -> pd.DataFrame: """Read a single batch of rows via BQ Storage API, in Arrow binary format. From 9c9a52af4439aec87c79f474180fa274e908fc29 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 29 Oct 2021 12:52:26 -0400 Subject: [PATCH 07/14] remove unecessary dependency --- dask_bigquery/core.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index c36191b..79ef99e 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -1,11 +1,11 @@ from __future__ import annotations -import json import os from contextlib import contextmanager from functools import partial -import jwt +import google.auth.transport.requests +import google.oauth2.credentials import pandas as pd import pyarrow from dask.base import tokenize @@ -80,9 +80,9 @@ def bigquery_read( NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream """ - creds = jwt.decode(cred_token, "secret", algorithms=["HS256"]) - credentials = service_account.Credentials.from_service_account_info(creds) + credentials = google.oauth2.credentials.Credentials(cred_token) + with bigquery_clients(project_id, credentials) as (_, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) schema = pyarrow.ipc.read_schema( @@ -131,12 +131,14 @@ def read_gbq( creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") if creds_path is None: raise ValueError("No credentials found") - with open(creds_path) as f: - creds = json.load(f) - cred_token = jwt.encode(creds, "secret", algorithm="HS256") + credentials = service_account.Credentials.from_service_account_file( + creds_path, scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) - credentials = service_account.Credentials.from_service_account_file(creds_path) + auth_req = google.auth.transport.requests.Request() + credentials.refresh(auth_req) + cred_token = credentials.token with bigquery_clients(project_id, credentials) as (bq_client, bqs_client): table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") From 5d1b57165c4e7e03070b668e27d089f7d5e3887f Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 29 Oct 2021 12:52:26 -0400 Subject: [PATCH 08/14] use google auth brearer token to authenticate --- dask_bigquery/core.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index c36191b..79ef99e 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -1,11 +1,11 @@ from __future__ import annotations -import json import os from contextlib import contextmanager from functools import partial -import jwt +import google.auth.transport.requests +import google.oauth2.credentials import pandas as pd import pyarrow from dask.base import tokenize @@ -80,9 +80,9 @@ def bigquery_read( NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream """ - creds = jwt.decode(cred_token, "secret", algorithms=["HS256"]) - credentials = service_account.Credentials.from_service_account_info(creds) + credentials = google.oauth2.credentials.Credentials(cred_token) + with bigquery_clients(project_id, credentials) as (_, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) schema = pyarrow.ipc.read_schema( @@ -131,12 +131,14 @@ def read_gbq( creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") if creds_path is None: raise ValueError("No credentials found") - with open(creds_path) as f: - creds = json.load(f) - cred_token = jwt.encode(creds, "secret", algorithm="HS256") + credentials = service_account.Credentials.from_service_account_file( + creds_path, scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) - credentials = service_account.Credentials.from_service_account_file(creds_path) + auth_req = google.auth.transport.requests.Request() + credentials.refresh(auth_req) + cred_token = credentials.token with bigquery_clients(project_id, credentials) as (bq_client, bqs_client): table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") From 1832d39223453f3cbb31df8eac001ab7a4433cef Mon Sep 17 00:00:00 2001 From: ncclementi Date: Mon, 1 Nov 2021 16:45:51 -0400 Subject: [PATCH 09/14] change scope to bigquery read only --- dask_bigquery/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 79ef99e..a4a83a6 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -133,7 +133,7 @@ def read_gbq( raise ValueError("No credentials found") credentials = service_account.Credentials.from_service_account_file( - creds_path, scopes=["https://www.googleapis.com/auth/cloud-platform"] + creds_path, scopes=["https://www.googleapis.com/auth/bigquery.readonly"] ) auth_req = google.auth.transport.requests.Request() From 4673b8206ad9edf82ee8b723cfd1c23eea38611b Mon Sep 17 00:00:00 2001 From: ncclementi Date: Mon, 1 Nov 2021 17:16:21 -0400 Subject: [PATCH 10/14] Fix readme mismatch --- README.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/README.md b/README.md index a2c163c..a734a2a 100644 --- a/README.md +++ b/README.md @@ -14,11 +14,7 @@ pip install dask-bigquery ## Example -`dask-bigquery` assumes that you are already authenticated and have an environment variable - -``` -GOOGLE_APPLICATION_CREDENTIALS=path_to/creds.json -``` +`dask-bigquery` assumes that you are already authenticated. ```python import dask_bigquery From 1ab71c2e7eda7d0a13705cd66fd71cdb36579a55 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 3 Nov 2021 17:26:10 -0400 Subject: [PATCH 11/14] add fwd_creds flag to send credentials --- dask_bigquery/core.py | 45 ++++++++++++++++++++------------ dask_bigquery/tests/test_core.py | 22 ++++++++++++---- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index a4a83a6..39a6da1 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -21,7 +21,7 @@ @contextmanager -def bigquery_clients(project_id, credentials): +def bigquery_clients(project_id, credentials=None): """This context manager is a temporary solution until there is an upstream solution to handle this. See googleapis/google-cloud-python#9457 @@ -73,17 +73,20 @@ def bigquery_read( Name of the BigQuery project. read_kwargs: dict kwargs to pass to read_rows() - creds: dict - credentials dictionary stream_name: str BigQuery Storage API Stream "name" NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream + cred_token: str + google_auth bearer token """ - credentials = google.oauth2.credentials.Credentials(cred_token) + if cred_token: + credentials = google.oauth2.credentials.Credentials(cred_token) + else: + credentials = None - with bigquery_clients(project_id, credentials) as (_, bqs_client): + with bigquery_clients(project_id, credentials=credentials) as (_, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) schema = pyarrow.ipc.read_schema( pyarrow.py_buffer(session.arrow_schema.serialized_schema) @@ -103,6 +106,7 @@ def read_gbq( row_filter: str = "", columns: list[str] = None, read_kwargs: dict = None, + fwd_creds: bool = False, ): """Read table as dask dataframe using BigQuery Storage API via Arrow format. Partitions will be approximately balanced according to BigQuery stream allocation logic. @@ -121,6 +125,8 @@ def read_gbq( list of columns to load from the table read_kwargs: dict kwargs to pass to read_rows() + fwd_creds: bool + Set to True if user desires to forward credentials to the workers. Default to False. Returns ------- @@ -128,19 +134,26 @@ def read_gbq( """ read_kwargs = read_kwargs or {} - creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") - if creds_path is None: - raise ValueError("No credentials found") + if fwd_creds: + creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") + if creds_path is None: + raise ValueError("No credentials found") - credentials = service_account.Credentials.from_service_account_file( - creds_path, scopes=["https://www.googleapis.com/auth/bigquery.readonly"] - ) - - auth_req = google.auth.transport.requests.Request() - credentials.refresh(auth_req) - cred_token = credentials.token + credentials = service_account.Credentials.from_service_account_file( + creds_path, scopes=["https://www.googleapis.com/auth/bigquery.readonly"] + ) - with bigquery_clients(project_id, credentials) as (bq_client, bqs_client): + auth_req = google.auth.transport.requests.Request() + credentials.refresh(auth_req) + cred_token = credentials.token + else: + credentials = None + cred_token = None + + with bigquery_clients(project_id, credentials=credentials) as ( + bq_client, + bqs_client, + ): table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") if table_ref.table_type == "VIEW": raise TypeError("Table type VIEW not supported") diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 82b922b..749930f 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -51,22 +51,30 @@ def dataset(df): ) -def test_read_gbq(df, dataset, client): +@pytest.mark.parametrize("fwd_creds", [False, True]) +def test_read_gbq(df, dataset, fwd_creds, client): project_id, dataset_id, table_id = dataset - ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id) + ddf = read_gbq( + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + fwd_creds=fwd_creds, + ) assert list(ddf.columns) == ["name", "number", "idx"] assert ddf.npartitions == 2 assert assert_eq(ddf.set_index("idx"), df.set_index("idx")) -def test_read_row_filter(df, dataset, client): +@pytest.mark.parametrize("fwd_creds", [False, True]) +def test_read_row_filter(df, dataset, fwd_creds, client): project_id, dataset_id, table_id = dataset ddf = read_gbq( project_id=project_id, dataset_id=dataset_id, table_id=table_id, row_filter="idx < 5", + fwd_creds=fwd_creds, ) assert list(ddf.columns) == ["name", "number", "idx"] @@ -74,20 +82,23 @@ def test_read_row_filter(df, dataset, client): assert assert_eq(ddf.set_index("idx").loc[:4], df.set_index("idx").loc[:4]) -def test_read_kwargs(dataset, client): +@pytest.mark.parametrize("fwd_creds", [False, True]) +def test_read_kwargs(dataset, fwd_creds, client): project_id, dataset_id, table_id = dataset ddf = read_gbq( project_id=project_id, dataset_id=dataset_id, table_id=table_id, read_kwargs={"timeout": 1e-12}, + fwd_creds=fwd_creds, ) with pytest.raises(Exception, match="Deadline Exceeded"): ddf.compute() -def test_read_columns(df, dataset, client): +@pytest.mark.parametrize("fwd_creds", [False, True]) +def test_read_columns(df, dataset, fwd_creds, client): project_id, dataset_id, table_id = dataset assert df.shape[1] > 1, "Test data should have multiple columns" @@ -97,5 +108,6 @@ def test_read_columns(df, dataset, client): dataset_id=dataset_id, table_id=table_id, columns=columns, + fwd_creds=fwd_creds, ) assert list(ddf.columns) == columns From c73db5bd3bcbe654a0481c7b088258a59fe4a7cf Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 18 Nov 2021 16:55:01 -0500 Subject: [PATCH 12/14] use of default credentials in fwd_creds instead of env variable --- dask_bigquery/core.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 39a6da1..0f746a9 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os from contextlib import contextmanager from functools import partial @@ -15,7 +14,6 @@ from google.api_core import client_info as rest_client_info from google.api_core.gapic_v1 import client_info as grpc_client_info from google.cloud import bigquery, bigquery_storage -from google.oauth2 import service_account import dask_bigquery @@ -135,13 +133,12 @@ def read_gbq( read_kwargs = read_kwargs or {} if fwd_creds: - creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") - if creds_path is None: - raise ValueError("No credentials found") - - credentials = service_account.Credentials.from_service_account_file( - creds_path, scopes=["https://www.googleapis.com/auth/bigquery.readonly"] - ) + try: + credentials, _ = google.auth.default( + scopes=["https://www.googleapis.com/auth/bigquery.readonly"] + ) + except google.auth.exceptions.DefaultCredentialsError: + print("No credentials found") auth_req = google.auth.transport.requests.Request() credentials.refresh(auth_req) From 9c885ac9ccd66d6611f02150a4da19a57d399491 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 18 Nov 2021 17:53:17 -0500 Subject: [PATCH 13/14] let google.auth.default raise error if no credentials --- dask_bigquery/core.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 0f746a9..5c96f9e 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -133,12 +133,9 @@ def read_gbq( read_kwargs = read_kwargs or {} if fwd_creds: - try: - credentials, _ = google.auth.default( - scopes=["https://www.googleapis.com/auth/bigquery.readonly"] - ) - except google.auth.exceptions.DefaultCredentialsError: - print("No credentials found") + credentials, _ = google.auth.default( + scopes=["https://www.googleapis.com/auth/bigquery.readonly"] + ) auth_req = google.auth.transport.requests.Request() credentials.refresh(auth_req) From 742526cd29c248567058b6dab63f6a1dfe106dfa Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 18 Nov 2021 18:19:13 -0500 Subject: [PATCH 14/14] add test for when we have when no credentials --- dask_bigquery/tests/test_core.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 749930f..16dcfbe 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -111,3 +111,24 @@ def test_read_columns(df, dataset, fwd_creds, client): fwd_creds=fwd_creds, ) assert list(ddf.columns) == columns + + +@pytest.mark.parametrize("fwd_creds", [False, True]) +def test_read_gbq_no_creds_fail(dataset, fwd_creds, monkeypatch, client): + """This test is to check that if we do not have credentials + we can not authenticate. + """ + project_id, dataset_id, table_id = dataset + + def mock_auth(scopes=["https://www.googleapis.com/auth/bigquery.readonly"]): + raise google.auth.exceptions.DefaultCredentialsError() + + monkeypatch.setattr(google.auth, "default", mock_auth) + + with pytest.raises(google.auth.exceptions.DefaultCredentialsError): + read_gbq( + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + fwd_creds=fwd_creds, + )