Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possibility to provide cred #11

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions dask_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
from google.api_core import client_info as rest_client_info
from google.api_core.gapic_v1 import client_info as grpc_client_info
from google.cloud import bigquery, bigquery_storage
from google.oauth2 import service_account
from google.oauth2.service_account import Credentials

import dask_bigquery


@contextmanager
def bigquery_clients(project_id):
def bigquery_clients(project_id, credentials):
"""This context manager is a temporary solution until there is an
upstream solution to handle this.
See googleapis/google-cloud-python#9457
Expand All @@ -30,7 +32,9 @@ def bigquery_clients(project_id):
user_agent=f"dask-bigquery/{dask_bigquery.__version__}"
)

with bigquery.Client(project_id, client_info=bq_client_info) as bq_client:
with bigquery.Client(
project_id, credentials=credentials, client_info=bq_client_info
) as bq_client:
bq_storage_client = bigquery_storage.BigQueryReadClient(
credentials=bq_client._credentials,
client_info=bqstorage_client_info,
Expand All @@ -53,6 +57,7 @@ def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs):
def bigquery_read(
make_create_read_session_request: callable,
project_id: str,
credentials: Credentials,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally you'd allow a wider set of credentials than this. google.auth.credentials.Credentials is going to be the most flexible.

There are two types of credentials that come to mind that you'll want to support that aren't service account credentials:

  • External Account Credentials -- These are useful for folks running on other clouds.
  • OAuth 2.0 Credentials -- Also known as end-user credentials. These are useful for folks running locally. It's the "open a browser window and use your Google account" flow.

read_kwargs: dict,
stream_name: str,
) -> pd.DataFrame:
Expand All @@ -71,7 +76,7 @@ def bigquery_read(
NOTE: Please set if reading from Storage API without any `row_restriction`.
https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream
"""
with bigquery_clients(project_id) as (_, bqs_client):
with bigquery_clients(project_id, credentials) as (_, bqs_client):
session = bqs_client.create_read_session(make_create_read_session_request())
schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(session.arrow_schema.serialized_schema)
Expand All @@ -89,6 +94,8 @@ def read_gbq(
dataset_id: str,
table_id: str,
row_filter="",
*,
cred_fpath: str = None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A credentials object is going to be the most flexible. Not everyone has access to the local file system, such as if they're running from a Colab notebook.

read_kwargs: dict = None,
):
"""Read table as dask dataframe using BigQuery Storage API via Arrow format.
Expand All @@ -104,6 +111,8 @@ def read_gbq(
BigQuery table within dataset
row_filter: str
SQL text filtering statement to pass to `row_restriction`
cred_fpath: str
path for the service account key json file.
read_kwargs: dict
kwargs to pass to read_rows()

Expand All @@ -112,7 +121,13 @@ def read_gbq(
Dask DataFrame
"""
read_kwargs = read_kwargs or {}
with bigquery_clients(project_id) as (bq_client, bqs_client):

if cred_fpath:
credentials = service_account.Credentials.from_service_account_file(cred_fpath)
else:
credentials = cred_fpath # if no path set to None to try read default

with bigquery_clients(project_id, credentials) as (bq_client, bqs_client):
table_ref = bq_client.get_table(f"{dataset_id}.{table_id}")
if table_ref.table_type == "VIEW":
raise TypeError("Table type VIEW not supported")
Expand Down Expand Up @@ -157,6 +172,7 @@ def make_create_read_session_request(row_filter=""):
bigquery_read,
make_create_read_session_request,
project_id,
credentials,
read_kwargs,
),
label=label,
Expand Down