Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl(bigquery/read): add arrow bq read example #335

Merged
merged 5 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions bigquery/read/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# ~~~
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ~~~

cmake_minimum_required(VERSION 3.20)
set(CMAKE_CXX_STANDARD 20)

# Define the project name and where to report bugs.
set(PACKAGE_BUGREPORT
"https://github.com/GoogleCloudPlatform/cpp-samples/issues")
project(cpp-samples-bigquery-read CXX)

find_package(google_cloud_cpp_bigquery REQUIRED)
find_package(Arrow REQUIRED)

add_executable(arrow_read arrow_read.cc)
target_link_libraries(arrow_read PRIVATE google-cloud-cpp::bigquery
Arrow::arrow_static)
113 changes: 113 additions & 0 deletions bigquery/read/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Using BigQuery Storage Read

Cloud BigQuery is a data platform that allows users to easily create, manage,
share, and query data using SQL. When you want to access your data, you can read
directly from a table. However, if you want to transform the data in a table by
mapping, filtering, or joining, you need to first make a query. When you make a
query, you can specify a table to store the results. Then you can start a read
session for the table via the BigQuery Storage library and read the rows from
the table.

This example shows how to create a query job using the BigQuery v2 Python API,
and then read the data from the table using the BigQuery Storage C++ API.

If you are not familiar with the BigQuery v2 API or the BigQuery Storage Read
API, we recommend you first read the [API overview] before starting this guide.

## Pre-requisites

You are going to need a Google Cloud project to host the BigQuery dataset and
table used in this example. You will need to install and configure the BigQuery
CLI tool. Follow the [Google Cloud CLI install][install-sdk] instructions, and
then the [quickstart][bigquery cli tool] for the BigQuery CLI tool.

Verify the CLI is working using a simple command to list the active project:

```shell
bq show
```

### Creating the query job

The following script uses the BigQuery v2 python client to create a dataset (if
it does not already exist) and a query job.

```
python3 -m venv env
source env/bin/activate
pip3 install -r requirements.txt
python3 create_query_job.py --project_id [PROJECT-ID] --dataset_name usa_names --table_name top10_names
```

## Compiling the Example

This project uses `vcpkg` to install its dependencies. Clone `vcpkg` in your
`$HOME`:

```shell
git clone -C $HOME https://github.com/microsoft/vcpkg.git
```

Install the typical development tools, on Ubuntu you would use:

```shell
apt update && apt install -y build-essential cmake git ninja-build pkg-config g++ curl tar zip unzip
```

In this directory compile the dependencies and the code, this can take as long
as an hour, depending on the performance of your workstation:

```shell
cd cpp-samples/bigquery/read
cmake -S . -B .build -DCMAKE_BUILD_TYPE=Release \
-DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake
cmake --build .build
```

## Run the sample

Run the example, replace the `[PROJECT ID]` placeholder with the id of your
project:

```shell
.build/arrow_read [PROJECT ID] [DATASET_NAME] [TABLE_NAME]
```

```shell
.build/arrow_read [PROJECT ID] usa_names top10_names
```

## Output

Your output should look like the following:

```
Schema is:
name: string
total: int64
name total
Row 0: James 4942431
Row 1: John 4834422
Row 2: Robert 4718787
Row 3: Michael 4297230
Row 4: William 3822209
Row 5: Mary 3737679
Row 6: David 3549801
Row 7: Richard 2531924
Row 8: Joseph 2472917
Row 9: Charles 2244693
Read 1 record batch(es) and 10 total row(s) from table: projects/[PROJECT-ID]/datasets/usa_names/tables/top10_names
```

## Cleanup

Remove the table and dataset:

```shell
bq rm -f usa_names.top10
bq rm -f usa_names
```

[api overview]: https://cloud.google.com/bigquery/docs/reference/storage
[bigquery cli tool]: https://cloud.google.com/bigquery/docs/bq-command-line-tool
[install-sdk]: https://cloud.google.com/sdk/docs/install-sdk
183 changes: 183 additions & 0 deletions bigquery/read/arrow_read.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/bigquery/storage/v1/bigquery_read_client.h"
#include "google/cloud/project.h"
#include <arrow/api.h>
#include <arrow/array/data.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/api.h>
#include <arrow/record_batch.h>
#include <arrow/status.h>
#include <format>
#include <iostream>

namespace {

std::shared_ptr<arrow::Schema> GetArrowSchema(
::google::cloud::bigquery::storage::v1::ArrowSchema const& schema_in) {
std::shared_ptr<arrow::Buffer> buffer =
std::make_shared<arrow::Buffer>(schema_in.serialized_schema());
arrow::io::BufferReader buffer_reader(buffer);
arrow::ipc::DictionaryMemo dictionary_memo;
auto result = arrow::ipc::ReadSchema(&buffer_reader, &dictionary_memo);
if (!result.ok()) {
std::cout << "Unable to parse schema\n";
throw result.status();
}
std::shared_ptr<arrow::Schema> schema = result.ValueOrDie();
std::cout << std::format("Schema is:\n {}\n", schema->ToString());
return schema;
}

std::shared_ptr<arrow::RecordBatch> GetArrowRecordBatch(
::google::cloud::bigquery::storage::v1::ArrowRecordBatch const&
record_batch_in,
std::shared_ptr<arrow::Schema> schema) {
std::shared_ptr<arrow::Buffer> buffer = std::make_shared<arrow::Buffer>(
record_batch_in.serialized_record_batch());
arrow::io::BufferReader buffer_reader(buffer);
arrow::ipc::DictionaryMemo dictionary_memo;
arrow::ipc::IpcReadOptions read_options;
auto result = arrow::ipc::ReadRecordBatch(schema, &dictionary_memo,
read_options, &buffer_reader);
if (!result.ok()) {
std::cout << "Unable to parse record batch\n";
throw result.status();
}
std::shared_ptr<arrow::RecordBatch> record_batch = result.ValueOrDie();
return record_batch;
}

void PrintColumnNames(std::shared_ptr<arrow::RecordBatch> record_batch) {
// Print each column name for the record batch.
std::cout << std::setfill(' ') << std::setw(7) << "";
for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) {
std::cout << std::left << std::setw(16) << record_batch->column_name(col);
}
std::cout << "\n";
}

void ProcessRecordBatch(std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<arrow::RecordBatch> record_batch,
std::int64_t num_rows) {
// If you want to see what the result looks like without parsing the
// datatypes, use `record_batch->ToString()` for quick debugging.
// Note: you might need to adjust the formatting depending on how big the data
// in your table is.
for (std::int64_t row = 0; row < record_batch->num_rows(); ++row) {
std::cout << std::format("Row {}: ", row + num_rows);

for (std::int64_t col = 0; col < record_batch->num_columns(); ++col) {
std::shared_ptr<arrow::Array> column = record_batch->column(col);
arrow::Result<std::shared_ptr<arrow::Scalar> > result =
column->GetScalar(row);
if (!result.ok()) {
std::cout << "Unable to parse scalar\n";
throw result.status();
}

std::shared_ptr<arrow::Scalar> scalar = result.ValueOrDie();
switch (scalar->type->id()) {
case arrow::Type::INT64:
std::cout
<< std::left << std::setw(15)
<< std::dynamic_pointer_cast<arrow::Int64Scalar>(scalar)->value
<< " ";
break;
case arrow::Type::STRING:
std::cout
<< std::left << std::setw(15)
<< std::dynamic_pointer_cast<arrow::StringScalar>(scalar)->view()
<< " ";
break;
// Depending on the table you are reading, you might need to add cases
// for other datatypes here. The schema will tell you what datatypes
// need to be handled.
default:
std::cout << std::left << std::setw(15) << "UNDEFINED ";
<< " ";
}
}
std::cout << "\n";
}
}

} // namespace

int main(int argc, char* argv[]) try {
if (argc != 4) {
std::cerr << "Usage: " << argv[0]
<< " <project-id> <dataset-name> <table-name>\n";
return 1;
}

std::string const project_id = argv[1];
std::string const dataset_name = argv[2];
std::string const table_name = argv[3];

std::string const table_id = "projects/" + project_id + "/datasets/" +
dataset_name + "/tables/" + table_name;

// Create a namespace alias to make the code easier to read.
namespace bigquery_storage = ::google::cloud::bigquery_storage_v1;
constexpr int kMaxReadStreams = 1;
// Create the ReadSession.
auto client = bigquery_storage::BigQueryReadClient(
bigquery_storage::MakeBigQueryReadConnection());
::google::cloud::bigquery::storage::v1::ReadSession read_session;
read_session.set_data_format(
google::cloud::bigquery::storage::v1::DataFormat::ARROW);
read_session.set_table(table_id);
auto session =
client.CreateReadSession(google::cloud::Project(project_id).FullName(),
read_session, kMaxReadStreams);
if (!session) throw std::move(session).status();

// Get schema.
std::shared_ptr<arrow::Schema> schema =
GetArrowSchema(session->arrow_schema());

// Read rows from the ReadSession.
constexpr int kRowOffset = 0;
auto read_rows = client.ReadRows(session->streams(0).name(), kRowOffset);

std::int64_t num_rows = 0;
std::int64_t record_batch_count = 0;
for (auto const& read_rows_response : read_rows) {
if (read_rows_response.ok()) {
std::shared_ptr<arrow::RecordBatch> record_batch =
GetArrowRecordBatch(read_rows_response->arrow_record_batch(), schema);

if (record_batch_count == 0) {
PrintColumnNames(record_batch);
}

ProcessRecordBatch(schema, record_batch, num_rows);
num_rows += row->row_count();
++record_batch_count;
}
}

std::cout << std::format(
"Read {} record batch(es) and {} total row(s) from table: {}\n",
record_batch_count, num_rows, table_id);
return 0;
} catch (google::cloud::Status const& status) {
std::cerr << "google::cloud::Status thrown: " << status << "\n";
return 1;
} catch (arrow::Status const& status) {
std::cerr << "arrow::Status thrown: " << status << "\n";
return 1;
}
Loading