From 8823d0386eb90c4987d5682b0a33929d3a141c25 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 30 May 2024 14:32:19 -0400 Subject: [PATCH 1/5] impl(bigquery/read): add arrow bq read example --- bigquery/read/CMakeLists.txt | 30 ++++++ bigquery/read/README.md | 91 ++++++++++++++++ bigquery/read/arrow_read.cc | 165 ++++++++++++++++++++++++++++++ bigquery/read/create_query_job.py | 67 ++++++++++++ bigquery/read/requirements.txt | 23 +++++ bigquery/read/vcpkg.json | 14 +++ 6 files changed, 390 insertions(+) create mode 100644 bigquery/read/CMakeLists.txt create mode 100644 bigquery/read/README.md create mode 100644 bigquery/read/arrow_read.cc create mode 100644 bigquery/read/create_query_job.py create mode 100644 bigquery/read/requirements.txt create mode 100644 bigquery/read/vcpkg.json diff --git a/bigquery/read/CMakeLists.txt b/bigquery/read/CMakeLists.txt new file mode 100644 index 0000000..3c32bdb --- /dev/null +++ b/bigquery/read/CMakeLists.txt @@ -0,0 +1,30 @@ +# ~~~ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ~~~ + +cmake_minimum_required(VERSION 3.20) +set(CMAKE_CXX_STANDARD 20) + +# Define the project name and where to report bugs. +set(PACKAGE_BUGREPORT + "https://github.com/GoogleCloudPlatform/cpp-samples/issues") +project(cpp-samples-bigquery-read CXX) + +find_package(google_cloud_cpp_bigquery REQUIRED) +find_package(Arrow REQUIRED) + +add_executable(arrow_read arrow_read.cc) +target_link_libraries(arrow_read PRIVATE google-cloud-cpp::bigquery + Arrow::arrow_static) diff --git a/bigquery/read/README.md b/bigquery/read/README.md new file mode 100644 index 0000000..f9b3809 --- /dev/null +++ b/bigquery/read/README.md @@ -0,0 +1,91 @@ +# Using BigQuery Storage Read + +Cloud BigQuery is a data platform that allows users to easily create, manage, +share, and query data using SQL. When you want to access your data, you can read +directly from a table. However, if you want to transform the data in a table by +mapping, filtering, or joining, you need to first make a query. When you make a +query, you can specify a table to store the results. Then you can start a read +session for the table via the BigQuery Storage library and read the rows from +the table. + +This example shows how to create a query job using the BigQuery v2 Python API, +and then read the data from the table using the BigQuery Storage C++ API. + +If you are not familiar with the BigQuery v2 API or the BigQuery Storage Read +API, we recommend you first read the [API overview] before starting this guide. + +## Pre-requisites + +You are going to need a Google Cloud project to host the BigQuery dataset and +table used in this example. You will need to install and configure the BigQuery +CLI tool. Follow the [Google Cloud CLI install][install-sdk] instructions, and +then the [quickstart][bigquery cli tool] for the BigQuery CLI tool. + +Verify the CLI is working using a simple command to list the active project: + +```shell +bq show +``` + +### Creating the query job + +The following script uses the BigQuery v2 python client to create a dataset (if +it does not already exist) and a query job. + +``` +python3 -m venv env +source env/bin/activate +pip3 install -r requirements.txt +python3 create_query_job.py --project_id alevenb-test --dataset_name usa_names --table_name top10_names +``` + +## Compiling the Example + +This project uses `vcpkg` to install its dependencies. Clone `vcpkg` in your +`$HOME`: + +```shell +git clone -C $HOME https://github.com/microsoft/vcpkg.git +``` + +Install the typical development tools, on Ubuntu you would use: + +```shell +apt update && apt install -y build-essential cmake git ninja-build pkg-config g++ curl tar zip unzip +``` + +In this directory compile the dependencies and the code, this can take as long +as an hour, depending on the performance of your workstation: + +```shell +cd cpp-samples/bigquery/read +cmake -S . -B .build -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake +cmake --build .build +``` + +## Run the sample + +Run the example, replace the `[PROJECT ID]` placeholder with the id of your +project: + +```shell +.build/arrow_read [PROJECT ID] [DATASET_NAME] [TABLE_NAME] +``` + +```shell +.build/arrow_read [PROJECT ID] usa_names top10_names +``` + +## Cleanup + +Remove the table and dataset: + +```shell +bq rm -f usa_names.top10 +bq rm -f usa_names +``` + +[api overview]: https://cloud.google.com/bigquery/docs/reference/storage +[bigquery cli tool]: https://cloud.google.com/bigquery/docs/bq-command-line-tool +[install-sdk]: https://cloud.google.com/sdk/docs/install-sdk diff --git a/bigquery/read/arrow_read.cc b/bigquery/read/arrow_read.cc new file mode 100644 index 0000000..54d47d9 --- /dev/null +++ b/bigquery/read/arrow_read.cc @@ -0,0 +1,165 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigquery/storage/v1/bigquery_read_client.h" +#include "google/cloud/project.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +std::shared_ptr GetArrowSchema( + ::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in) { + std::shared_ptr buffer = + std::make_shared(schema_in.serialized_schema()); + arrow::io::BufferReader buffer_reader(buffer); + arrow::ipc::DictionaryMemo dictionary_memo; + auto result = arrow::ipc::ReadSchema(&buffer_reader, &dictionary_memo); + if (!result.ok()) { + std::cout << "Unable to parse schema\n"; + throw result.status(); + } + std::shared_ptr schema = result.ValueOrDie(); + return schema; +} + +std::shared_ptr GetArrowRecordBatch( + ::google::cloud::bigquery::storage::v1::ArrowRecordBatch const& + record_batch_in, + std::shared_ptr schema) { + std::shared_ptr buffer = std::make_shared( + record_batch_in.serialized_record_batch()); + arrow::io::BufferReader buffer_reader(buffer); + arrow::ipc::DictionaryMemo dictionary_memo; + arrow::ipc::IpcReadOptions read_options; + auto result = arrow::ipc::ReadRecordBatch(schema, &dictionary_memo, + read_options, &buffer_reader); + if (!result.ok()) { + std::cout << "Unable to parse record batch\n"; + throw result.status(); + } + std::shared_ptr record_batch = result.ValueOrDie(); + return record_batch; +} + +void ProcessRowsInArrowFormat( + ::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in, + ::google::cloud::bigquery::storage::v1::ArrowRecordBatch const& + record_batch_in) { + std::shared_ptr schema = GetArrowSchema(schema_in); + + std::shared_ptr record_batch = + GetArrowRecordBatch(record_batch_in, schema); + + // Print information about the record batch. + std::cout << std::format("Record batch schema is:\n {}\n", + record_batch->schema()->ToString()); + std::cout << std::format("Record batch has {} cols and {} rows\n", + record_batch->num_columns(), + record_batch->num_rows()); + + // Print each row and column in the record batch. + std::cout << std::setfill(' ') << std::setw(7) << ""; + for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) { + std::cout << std::left << std::setw(12) << record_batch->column_name(col); + } + std::cout << "\n"; + // If you want to see what the result looks like without parsing the + // datatypes, use `record_batch->ToString()` for quick debugging. + for (std::int64_t row = 0; row < record_batch->num_rows(); ++row) { + std::cout << std::format("Row {}: ", row); + + for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) { + std::shared_ptr column = record_batch->column(col); + arrow::Result > result = + column->GetScalar(row); + if (!result.ok()) { + std::cout << "Unable to parse scalar\n"; + throw result.status(); + } + + std::shared_ptr scalar = result.ValueOrDie(); + if (scalar->type->id() == arrow::Type::INT64) { + std::shared_ptr int64_scalar = + std::dynamic_pointer_cast(scalar); + std::cout << std::left << std::setw(9) << int64_scalar->value << " "; + } else if (scalar->type->id() == arrow::Type::STRING) { + std::shared_ptr string_scalar = + std::dynamic_pointer_cast(scalar); + std::cout << std::left << std::setw(9) << string_scalar->view() << " "; + } + } + std::cout << "\n"; + } +} + +} // namespace + +int main(int argc, char* argv[]) try { + if (argc != 4) { + std::cerr << "Usage: " << argv[0] + << " \n"; + return 1; + } + + std::string const project_id = argv[1]; + std::string const dataset_name = argv[2]; + std::string const table_name = argv[3]; + + std::string const table_id = "projects/" + project_id + "/datasets/" + + dataset_name + "/tables/" + table_name; + + // Create a namespace alias to make the code easier to read. + namespace bigquery_storage = ::google::cloud::bigquery_storage_v1; + constexpr int kMaxReadStreams = 1; + // Create the ReadSession. + auto client = bigquery_storage::BigQueryReadClient( + bigquery_storage::MakeBigQueryReadConnection()); + ::google::cloud::bigquery::storage::v1::ReadSession read_session; + read_session.set_data_format( + google::cloud::bigquery::storage::v1::DataFormat::ARROW); + read_session.set_table(table_id); + auto session = + client.CreateReadSession(google::cloud::Project(project_id).FullName(), + read_session, kMaxReadStreams); + if (!session) throw std::move(session).status(); + + // Read rows from the ReadSession. + constexpr int kRowOffset = 0; + auto read_rows = client.ReadRows(session->streams(0).name(), kRowOffset); + + std::int64_t num_rows = 0; + for (auto const& row : read_rows) { + if (row.ok()) { + num_rows += row->row_count(); + ProcessRowsInArrowFormat(session->arrow_schema(), + row->arrow_record_batch()); + } + } + + std::cout << std::format("Read {} rows from table: {}\n", num_rows, table_id); + return 0; +} catch (google::cloud::Status const& status) { + std::cerr << "google::cloud::Status thrown: " << status << "\n"; + return 1; +} catch (arrow::Status const& status) { + std::cerr << "arrow::Status thrown: " << status << "\n"; + return 1; +} diff --git a/bigquery/read/create_query_job.py b/bigquery/read/create_query_job.py new file mode 100644 index 0000000..17c190a --- /dev/null +++ b/bigquery/read/create_query_job.py @@ -0,0 +1,67 @@ +from google.cloud import bigquery +import sys +import argparse +from google.api_core.exceptions import Conflict, NotFound + +def main(args): + # Construct a BigQuery client object. + client = bigquery.Client() + # If it does not already exist, create dataset to store table. + dataset_id = f"{args.project_id}.{args.dataset_name}" + dataset = bigquery.Dataset(dataset_id) + dataset.location = args.dataset_location + try: + dataset = client.create_dataset(dataset, timeout=30) + print("Created dataset {}.{}".format(client.project, dataset.dataset_id)) + except Conflict as e: + if ("ALREADY_EXISTS" in e.details[0]['detail']): + print(f"Dataset {dataset_id} already exists.") + else: + print(f"Unable to create dataset. Error with code {e.code} and message {e.message}") + return + except Exception as e: + print(f"Unable to create dataset. Error with code {e.code} and message {e.message}") + return + + # Verify table exists. + table_id = f"{dataset_id}.{args.table_name}" + try: + table = client.get_table(table_id) + print(f"Table {table_id} already exists. Run script with a new --table_name argument.") + return + except NotFound: + pass + except Exception as e: + print(f"Unable to verify if table exists. Error with code {e.code} and message {e.message}") + return + + # Create query job that writes the top 10 names to a table. + job_config = bigquery.QueryJobConfig(destination=table_id) + sql = """ + SELECT + name, + SUM(number) AS total + FROM + `bigquery-public-data.usa_names.usa_1910_2013` + GROUP BY + name + ORDER BY + total DESC + LIMIT + 10; + """ + + # Start the query, passing in the extra configuration. + query_job = client.query(sql, job_config=job_config) # Make an API request. + query_job.result() # Wait for the job to complete. + + print(f"Query results loaded to the table {table_id}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A script to create BigQuery query job.") + parser.add_argument("-p","--project_id", type=str,help="GCP project id") + parser.add_argument("--dataset_name", type=str,help="Dataset name to store the table in") + parser.add_argument("--dataset_location",type=str, default="US") + parser.add_argument("--table_name", type=str,help="Table name to write the query results to") + args = parser.parse_args() + main(args) diff --git a/bigquery/read/requirements.txt b/bigquery/read/requirements.txt new file mode 100644 index 0000000..e750ae8 --- /dev/null +++ b/bigquery/read/requirements.txt @@ -0,0 +1,23 @@ +cachetools==5.3.3 +certifi==2024.2.2 +charset-normalizer==3.3.2 +google-api-core==2.19.0 +google-auth==2.29.0 +google-cloud-bigquery==3.23.1 +google-cloud-core==2.4.1 +google-crc32c==1.5.0 +google-resumable-media==2.7.0 +googleapis-common-protos==1.63.0 +grpcio==1.64.0 +grpcio-status==1.62.2 +idna==3.7 +packaging==24.0 +proto-plus==1.23.0 +protobuf==4.25.3 +pyasn1==0.6.0 +pyasn1_modules==0.4.0 +python-dateutil==2.9.0.post0 +requests==2.32.3 +rsa==4.9 +six==1.16.0 +urllib3==2.2.1 diff --git a/bigquery/read/vcpkg.json b/bigquery/read/vcpkg.json new file mode 100644 index 0000000..eb8ec84 --- /dev/null +++ b/bigquery/read/vcpkg.json @@ -0,0 +1,14 @@ +{ + "name": "gcp-cpp-samples-bigquery-read", + "version-string": "unversioned", + "homepage": "https://github.com/GoogleCloudPlatform/cpp-samples/", + "description": "An example using the BigQuery Storage Read API", + "dependencies": [ + { + "name": "google-cloud-cpp", + "default-features": false, + "features": ["bigquery"] + }, + "arrow" + ] +} From 855960af32338015760077ca2ea7211e0b91ae42 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Fri, 31 May 2024 11:50:19 -0400 Subject: [PATCH 2/5] address comments --- bigquery/read/README.md | 24 ++++++++++- bigquery/read/arrow_read.cc | 81 ++++++++++++++++++++++--------------- 2 files changed, 72 insertions(+), 33 deletions(-) diff --git a/bigquery/read/README.md b/bigquery/read/README.md index f9b3809..4eb2a16 100644 --- a/bigquery/read/README.md +++ b/bigquery/read/README.md @@ -36,7 +36,7 @@ it does not already exist) and a query job. python3 -m venv env source env/bin/activate pip3 install -r requirements.txt -python3 create_query_job.py --project_id alevenb-test --dataset_name usa_names --table_name top10_names +python3 create_query_job.py --project_id [PROJECT-ID] --dataset_name usa_names --table_name top10_names ``` ## Compiling the Example @@ -77,6 +77,28 @@ project: .build/arrow_read [PROJECT ID] usa_names top10_names ``` +## Output + +Your output should look like the following: + +``` +Schema is: + name: string +total: int64 + name total +Row 0: James 4942431 +Row 1: John 4834422 +Row 2: Robert 4718787 +Row 3: Michael 4297230 +Row 4: William 3822209 +Row 5: Mary 3737679 +Row 6: David 3549801 +Row 7: Richard 2531924 +Row 8: Joseph 2472917 +Row 9: Charles 2244693 +Read 1 record batch(es) and 10 total row(s) from table: projects/[PROJECT-ID]/datasets/usa_names/tables/top10_names +``` + ## Cleanup Remove the table and dataset: diff --git a/bigquery/read/arrow_read.cc b/bigquery/read/arrow_read.cc index 54d47d9..c094185 100644 --- a/bigquery/read/arrow_read.cc +++ b/bigquery/read/arrow_read.cc @@ -37,6 +37,7 @@ std::shared_ptr GetArrowSchema( throw result.status(); } std::shared_ptr schema = result.ValueOrDie(); + std::cout << std::format("Schema is:\n {}\n", schema->ToString()); return schema; } @@ -59,32 +60,24 @@ std::shared_ptr GetArrowRecordBatch( return record_batch; } -void ProcessRowsInArrowFormat( - ::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in, - ::google::cloud::bigquery::storage::v1::ArrowRecordBatch const& - record_batch_in) { - std::shared_ptr schema = GetArrowSchema(schema_in); - - std::shared_ptr record_batch = - GetArrowRecordBatch(record_batch_in, schema); - - // Print information about the record batch. - std::cout << std::format("Record batch schema is:\n {}\n", - record_batch->schema()->ToString()); - std::cout << std::format("Record batch has {} cols and {} rows\n", - record_batch->num_columns(), - record_batch->num_rows()); - - // Print each row and column in the record batch. +void PrintColumnNames(std::shared_ptr record_batch) { + // Print each column name for the record batch. std::cout << std::setfill(' ') << std::setw(7) << ""; for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) { - std::cout << std::left << std::setw(12) << record_batch->column_name(col); + std::cout << std::left << std::setw(16) << record_batch->column_name(col); } std::cout << "\n"; +} + +void ProcessRecordBatch(std::shared_ptr schema, + std::shared_ptr record_batch, + std::int64_t num_rows) { // If you want to see what the result looks like without parsing the // datatypes, use `record_batch->ToString()` for quick debugging. + // Note: you might need to adjust the formatting depending on how big the data + // in your table is. for (std::int64_t row = 0; row < record_batch->num_rows(); ++row) { - std::cout << std::format("Row {}: ", row); + std::cout << std::format("Row {}: ", row + num_rows); for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) { std::shared_ptr column = record_batch->column(col); @@ -96,14 +89,24 @@ void ProcessRowsInArrowFormat( } std::shared_ptr scalar = result.ValueOrDie(); - if (scalar->type->id() == arrow::Type::INT64) { - std::shared_ptr int64_scalar = - std::dynamic_pointer_cast(scalar); - std::cout << std::left << std::setw(9) << int64_scalar->value << " "; - } else if (scalar->type->id() == arrow::Type::STRING) { - std::shared_ptr string_scalar = - std::dynamic_pointer_cast(scalar); - std::cout << std::left << std::setw(9) << string_scalar->view() << " "; + switch (scalar->type->id()) { + case arrow::Type::INT64: + std::cout + << std::left << std::setw(15) + << std::dynamic_pointer_cast(scalar)->value + << " "; + break; + case arrow::Type::STRING: + std::cout + << std::left << std::setw(15) + << std::dynamic_pointer_cast(scalar)->view() + << " "; + break; + // Depending on the table you are reading, you might need to add cases + // for other datatypes here. The schema will tell you what datatypes + // need to be handled. + default: + std::cout << std::left << std::setw(15) << "UNDEFINED " << " "; } } std::cout << "\n"; @@ -141,20 +144,34 @@ int main(int argc, char* argv[]) try { read_session, kMaxReadStreams); if (!session) throw std::move(session).status(); + // Get schema. + std::shared_ptr schema = + GetArrowSchema(session->arrow_schema()); + // Read rows from the ReadSession. constexpr int kRowOffset = 0; auto read_rows = client.ReadRows(session->streams(0).name(), kRowOffset); std::int64_t num_rows = 0; - for (auto const& row : read_rows) { - if (row.ok()) { + std::int64_t record_batch_count = 0; + for (auto const& read_rows_response : read_rows) { + if (read_rows_response.ok()) { + std::shared_ptr record_batch = + GetArrowRecordBatch(read_rows_response->arrow_record_batch(), schema); + + if (record_batch_count == 0) { + PrintColumnNames(record_batch); + } + + ProcessRecordBatch(schema, record_batch, num_rows); num_rows += row->row_count(); - ProcessRowsInArrowFormat(session->arrow_schema(), - row->arrow_record_batch()); + ++record_batch_count; } } - std::cout << std::format("Read {} rows from table: {}\n", num_rows, table_id); + std::cout << std::format( + "Read {} record batch(es) and {} total row(s) from table: {}\n", + record_batch_count, num_rows, table_id); return 0; } catch (google::cloud::Status const& status) { std::cerr << "google::cloud::Status thrown: " << status << "\n"; From 9d55b36ab40116e5b39d1fecefae6a167bf75fce Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Fri, 31 May 2024 11:52:13 -0400 Subject: [PATCH 3/5] fix formatting --- bigquery/read/arrow_read.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigquery/read/arrow_read.cc b/bigquery/read/arrow_read.cc index c094185..cbe20d4 100644 --- a/bigquery/read/arrow_read.cc +++ b/bigquery/read/arrow_read.cc @@ -106,7 +106,8 @@ void ProcessRecordBatch(std::shared_ptr schema, // for other datatypes here. The schema will tell you what datatypes // need to be handled. default: - std::cout << std::left << std::setw(15) << "UNDEFINED " << " "; + std::cout << std::left << std::setw(15) << "UNDEFINED "; + << " "; } } std::cout << "\n"; From fa5ad81fb68e93f96378116874abcfa980d404ec Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Fri, 31 May 2024 12:07:25 -0400 Subject: [PATCH 4/5] fix formatting again --- bigquery/read/arrow_read.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/read/arrow_read.cc b/bigquery/read/arrow_read.cc index cbe20d4..8174682 100644 --- a/bigquery/read/arrow_read.cc +++ b/bigquery/read/arrow_read.cc @@ -107,7 +107,7 @@ void ProcessRecordBatch(std::shared_ptr schema, // need to be handled. default: std::cout << std::left << std::setw(15) << "UNDEFINED "; - << " "; + << " "; } } std::cout << "\n"; From 8bd409294538b4b9bb1f43bfc5febf3b243a9150 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Fri, 31 May 2024 12:14:00 -0400 Subject: [PATCH 5/5] add copyright to py file --- bigquery/read/create_query_job.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/bigquery/read/create_query_job.py b/bigquery/read/create_query_job.py index 17c190a..94a543f 100644 --- a/bigquery/read/create_query_job.py +++ b/bigquery/read/create_query_job.py @@ -1,3 +1,17 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from google.cloud import bigquery import sys import argparse