From 90351b092f98a5439a726c7d67ef5cbd0c758bad Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Mon, 3 Jun 2024 15:18:16 -0400 Subject: [PATCH] impl(bigquery/read): add avro example (#339) * impl(bigquery/read): add avro example * checkers * Apply suggestions from code review * ci: add bigquery/read * style * add flex dep * fix dockerfile * add bison --- CMakeLists.txt | 2 + bigquery/read/README.md | 50 +++++++- bigquery/read/{ => arrow}/CMakeLists.txt | 2 +- bigquery/read/{ => arrow}/arrow_read.cc | 3 +- bigquery/read/{ => arrow}/vcpkg.json | 4 +- bigquery/read/avro/CMakeLists.txt | 30 +++++ bigquery/read/avro/avro_read.cc | 154 +++++++++++++++++++++++ bigquery/read/avro/vcpkg.json | 14 +++ ci/devtools.Dockerfile | 20 ++- 9 files changed, 267 insertions(+), 12 deletions(-) rename bigquery/read/{ => arrow}/CMakeLists.txt (95%) rename bigquery/read/{ => arrow}/arrow_read.cc (99%) rename bigquery/read/{ => arrow}/vcpkg.json (83%) create mode 100644 bigquery/read/avro/CMakeLists.txt create mode 100644 bigquery/read/avro/avro_read.cc create mode 100644 bigquery/read/avro/vcpkg.json diff --git a/CMakeLists.txt b/CMakeLists.txt index 8a57890..167fed5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,6 +24,8 @@ set(samples batch/cpp_application batch/parallel/application batch/simple + bigquery/read/arrow + bigquery/read/avro bigquery/write cloud-run-hello-world gcs-fast-transfers diff --git a/bigquery/read/README.md b/bigquery/read/README.md index 4eb2a16..e6b8cf0 100644 --- a/bigquery/read/README.md +++ b/bigquery/read/README.md @@ -9,7 +9,8 @@ session for the table via the BigQuery Storage library and read the rows from the table. This example shows how to create a query job using the BigQuery v2 Python API, -and then read the data from the table using the BigQuery Storage C++ API. +and then read the data from the table using the BigQuery Storage C++ API. There +are two examples for reading the data: one using Avro and one using Arrow. If you are not familiar with the BigQuery v2 API or the BigQuery Storage Read API, we recommend you first read the [API overview] before starting this guide. @@ -57,8 +58,19 @@ apt update && apt install -y build-essential cmake git ninja-build pkg-config g+ In this directory compile the dependencies and the code, this can take as long as an hour, depending on the performance of your workstation: +### Arrow read + ```shell -cd cpp-samples/bigquery/read +cd cpp-samples/bigquery/read/arrow +cmake -S . -B .build -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake +cmake --build .build +``` + +### Avro read + +```shell +cd cpp-samples/bigquery/read/avro cmake -S . -B .build -DCMAKE_BUILD_TYPE=Release \ -DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake cmake --build .build @@ -69,7 +81,10 @@ cmake --build .build Run the example, replace the `[PROJECT ID]` placeholder with the id of your project: +### Arrow read + ```shell +cd cpp-samples/bigquery/read/arrow .build/arrow_read [PROJECT ID] [DATASET_NAME] [TABLE_NAME] ``` @@ -77,10 +92,6 @@ project: .build/arrow_read [PROJECT ID] usa_names top10_names ``` -## Output - -Your output should look like the following: - ``` Schema is: name: string @@ -99,6 +110,33 @@ Row 9: Charles 2244693 Read 1 record batch(es) and 10 total row(s) from table: projects/[PROJECT-ID]/datasets/usa_names/tables/top10_names ``` +### Avro read + +```shell +cd cpp-samples/bigquery/read/avro +.build/avro_read [PROJECT ID] [DATASET_NAME] [TABLE_NAME] +``` + +```shell +.build/avro_read [PROJECT ID] usa_names top10_names +``` + +The output should look like: + +``` +Row 0 (2): James 4942431 +Row 1 (2): John 4834422 +Row 2 (2): Robert 4718787 +Row 3 (2): Michael 4297230 +Row 4 (2): William 3822209 +Row 5 (2): Mary 3737679 +Row 6 (2): David 3549801 +Row 7 (2): Richard 2531924 +Row 8 (2): Joseph 2472917 +Row 9 (2): Charles 2244693 +Read 1 response(s) and 10 total row(s) from table: projects/[PROJECT-ID]/datasets/usa_names/tables/top10_names +``` + ## Cleanup Remove the table and dataset: diff --git a/bigquery/read/CMakeLists.txt b/bigquery/read/arrow/CMakeLists.txt similarity index 95% rename from bigquery/read/CMakeLists.txt rename to bigquery/read/arrow/CMakeLists.txt index 3c32bdb..50f5475 100644 --- a/bigquery/read/CMakeLists.txt +++ b/bigquery/read/arrow/CMakeLists.txt @@ -20,7 +20,7 @@ set(CMAKE_CXX_STANDARD 20) # Define the project name and where to report bugs. set(PACKAGE_BUGREPORT "https://github.com/GoogleCloudPlatform/cpp-samples/issues") -project(cpp-samples-bigquery-read CXX) +project(cpp-samples-bigquery-read-arrow CXX) find_package(google_cloud_cpp_bigquery REQUIRED) find_package(Arrow REQUIRED) diff --git a/bigquery/read/arrow_read.cc b/bigquery/read/arrow/arrow_read.cc similarity index 99% rename from bigquery/read/arrow_read.cc rename to bigquery/read/arrow/arrow_read.cc index 8174682..a0eaa30 100644 --- a/bigquery/read/arrow_read.cc +++ b/bigquery/read/arrow/arrow_read.cc @@ -107,7 +107,6 @@ void ProcessRecordBatch(std::shared_ptr schema, // need to be handled. default: std::cout << std::left << std::setw(15) << "UNDEFINED "; - << " "; } } std::cout << "\n"; @@ -165,7 +164,7 @@ int main(int argc, char* argv[]) try { } ProcessRecordBatch(schema, record_batch, num_rows); - num_rows += row->row_count(); + num_rows += read_rows_response->row_count(); ++record_batch_count; } } diff --git a/bigquery/read/vcpkg.json b/bigquery/read/arrow/vcpkg.json similarity index 83% rename from bigquery/read/vcpkg.json rename to bigquery/read/arrow/vcpkg.json index eb8ec84..8a132c3 100644 --- a/bigquery/read/vcpkg.json +++ b/bigquery/read/arrow/vcpkg.json @@ -1,8 +1,8 @@ { - "name": "gcp-cpp-samples-bigquery-read", + "name": "gcp-cpp-samples-bigquery-read-arrow", "version-string": "unversioned", "homepage": "https://github.com/GoogleCloudPlatform/cpp-samples/", - "description": "An example using the BigQuery Storage Read API", + "description": "An example using the BigQuery Storage Read API and Arrow", "dependencies": [ { "name": "google-cloud-cpp", diff --git a/bigquery/read/avro/CMakeLists.txt b/bigquery/read/avro/CMakeLists.txt new file mode 100644 index 0000000..a6ea255 --- /dev/null +++ b/bigquery/read/avro/CMakeLists.txt @@ -0,0 +1,30 @@ +# ~~~ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ~~~ + +cmake_minimum_required(VERSION 3.20) +set(CMAKE_CXX_STANDARD 14) + +# Define the project name and where to report bugs. +set(PACKAGE_BUGREPORT + "https://github.com/GoogleCloudPlatform/cpp-samples/issues") +project(cpp-samples-bigquery-read-avro CXX) + +find_package(google_cloud_cpp_bigquery REQUIRED) +find_package(unofficial-avro-cpp CONFIG REQUIRED) + +add_executable(avro_read avro_read.cc) +target_link_libraries(avro_read PRIVATE google-cloud-cpp::bigquery + unofficial::avro-cpp::avrocpp) diff --git a/bigquery/read/avro/avro_read.cc b/bigquery/read/avro/avro_read.cc new file mode 100644 index 0000000..331e13b --- /dev/null +++ b/bigquery/read/avro/avro_read.cc @@ -0,0 +1,154 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigquery/storage/v1/bigquery_read_client.h" +#include "google/cloud/project.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +avro::ValidSchema GetAvroSchema( + ::google::cloud::bigquery::storage::v1::AvroSchema const& schema) { + // Create a valid reader schema. + std::istringstream schema_bytes(schema.schema(), std::ios::binary); + avro::ValidSchema valid_schema; + avro::compileJsonSchema(schema_bytes, valid_schema); + + // [optional] Write the schema to a file. This could be useful if you want to + // re-use the schema elsewhere. + std::ofstream output("schema.avsc"); + if (output.is_open()) { + valid_schema.toJson(output); + output.close(); + } else { + std::cerr << "Error opening the file!" << std::endl; + } + return valid_schema; +} + +void ProcessRowsInAvroFormat( + avro::ValidSchema const& valid_schema, + ::google::cloud::bigquery::storage::v1::AvroRows const& rows, + std::int64_t row_count) { + // Get an avro decoder. + std::stringstream row_bytes(rows.serialized_binary_rows(), std::ios::binary); + std::unique_ptr in = avro::istreamInputStream(row_bytes); + avro::DecoderPtr decoder = + avro::validatingDecoder(valid_schema, avro::binaryDecoder()); + decoder->init(*in); + + for (auto i = 0; i < row_count; ++i) { + std::cout << "Row " << i << " "; + avro::GenericDatum datum(valid_schema); + avro::decode(*decoder, datum); + if (datum.type() == avro::AVRO_RECORD) { + const avro::GenericRecord& record = datum.value(); + std::cout << "(" << record.fieldCount() << "): "; + for (auto i = 0; i < record.fieldCount(); i++) { + const avro::GenericDatum& datum = record.fieldAt(i); + + switch (datum.type()) { + case avro::AVRO_STRING: + std::cout << std::left << std::setw(15) + << datum.value(); + break; + case avro::AVRO_INT: + std::cout << std::left << std::setw(15) << datum.value(); + break; + case avro::AVRO_LONG: + std::cout << std::left << std::setw(15) << datum.value(); + break; + // Depending on the table you are reading, you might need to add + // cases for other datatypes here. The schema will tell you what + // datatypes need to be handled. + default: + std::cout << std::left << std::setw(15) << "UNDEFINED"; + } + std::cout << "\t"; + } + } + std::cout << "\n"; + } +} + +} // namespace + +int main(int argc, char* argv[]) try { + if (argc != 4) { + std::cerr << "Usage: " << argv[0] + << " \n"; + return 1; + } + + std::string const project_id = argv[1]; + std::string const dataset_name = argv[2]; + std::string const table_name = argv[3]; + + std::string const table_id = "projects/" + project_id + "/datasets/" + + dataset_name + "/tables/" + table_name; + + // Create a namespace alias to make the code easier to read. + namespace bigquery_storage = ::google::cloud::bigquery_storage_v1; + constexpr int kMaxReadStreams = 1; + // Create the ReadSession. + auto client = bigquery_storage::BigQueryReadClient( + bigquery_storage::MakeBigQueryReadConnection()); + ::google::cloud::bigquery::storage::v1::ReadSession read_session; + read_session.set_data_format( + google::cloud::bigquery::storage::v1::DataFormat::AVRO); + read_session.set_table(table_id); + auto session = + client.CreateReadSession(google::cloud::Project(project_id).FullName(), + read_session, kMaxReadStreams); + if (!session) throw std::move(session).status(); + + // Get Avro schema. + avro::ValidSchema valid_schema = GetAvroSchema(session->avro_schema()); + + // Read rows from the ReadSession. + constexpr int kRowOffset = 0; + auto read_rows = client.ReadRows(session->streams(0).name(), kRowOffset); + + std::int64_t num_rows = 0; + std::int64_t num_responses = 0; + for (auto const& read_rows_response : read_rows) { + if (read_rows_response.ok()) { + num_rows += read_rows_response->row_count(); + ProcessRowsInAvroFormat(valid_schema, read_rows_response->avro_rows(), + read_rows_response->row_count()); + ++num_responses; + } + } + + std::cout << "Read " << num_responses << " responses(s) and " << num_rows + << " total row(s) from table: " << table_id << "\n"; + + return 0; +} catch (google::cloud::Status const& status) { + std::cerr << "google::cloud::Status thrown: " << status << "\n"; + return 1; +} catch (avro::Exception const& e) { + std::cerr << "avro::Exception thrown: " << e.what() << "\n"; + return 1; +} diff --git a/bigquery/read/avro/vcpkg.json b/bigquery/read/avro/vcpkg.json new file mode 100644 index 0000000..ccf6983 --- /dev/null +++ b/bigquery/read/avro/vcpkg.json @@ -0,0 +1,14 @@ +{ + "name": "gcp-cpp-samples-bigquery-read-avro", + "version-string": "unversioned", + "homepage": "https://github.com/GoogleCloudPlatform/cpp-samples/", + "description": "An example using the BigQuery Storage Read API and Avro", + "dependencies": [ + { + "name": "google-cloud-cpp", + "default-features": false, + "features": ["bigquery"] + }, + "avro-cpp" + ] +} diff --git a/ci/devtools.Dockerfile b/ci/devtools.Dockerfile index cc1a886..7ff3b61 100644 --- a/ci/devtools.Dockerfile +++ b/ci/devtools.Dockerfile @@ -15,8 +15,26 @@ FROM ubuntu:24.04 ENV DEBIAN_FRONTEND=noninteractive +# bigquery/read/arrow: bison is for thrift, which is a dependency for arrow +# bigquery/read/arrow: flex is for thrift, which is a dependency for arrow RUN apt update \ - && apt install -y build-essential git gcc g++ clang llvm cmake ninja-build pkg-config python3 tar zip unzip curl + && apt install -y \ + bison \ + build-essential \ + git \ + gcc \ + g++ \ + clang \ + cmake \ + curl \ + flex \ + llvm \ + ninja-build \ + pkg-config \ + python3 \ + tar \ + zip \ + unzip RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" | \ tee -a /etc/apt/sources.list.d/google-cloud-sdk.list \