diff --git a/pubsub-open-telemetry/.bazelrc b/pubsub-open-telemetry/.bazelrc index b86565f..c69caaf 100644 --- a/pubsub-open-telemetry/.bazelrc +++ b/pubsub-open-telemetry/.bazelrc @@ -27,3 +27,35 @@ build --experimental_convenience_symlinks=ignore # Enable OpenTelemetry tracing instrumentation for google-cloud-cpp. build --@io_opentelemetry_cpp//api:with_abseil build --@google_cloud_cpp//:enable_opentelemetry + + +# Clang Sanitizers, use with (for example): +# +# --client_env=CXX=clang++ --client_env=CC=clang --config asan +# + +# --config sanitizer refactors comment settings for all sanitizers +build:sanitizer --strip=never +build:sanitizer --copt=-Og +build:sanitizer --copt=-g +build:sanitizer --copt=-fno-omit-frame-pointer + +# --config asan: Address Sanitizer +build:asan --config=sanitizer +build:asan --copt=-fsanitize=address +# Protobuf enables -Werror by default. We are building with GCC 13. With +# -Werror and Address Sanitizer the compiler emits warnings in some of the +# Protobuf code, and that stops the build. It may be a compiler bug, or Protobuf +# may be assuming that some compiler flags are set when compiling with Address +# Sanitizier (-DADDRESS_SANITIZER=1 is one possibility). +build:asan --per_file_copt=com_google_protobuf//@-Wno-error +build:asan --linkopt=-fsanitize=address +build:asan --action_env=ASAN_OPTIONS=detect_leaks=1:color=always +build:asan --action_env=LSAN_OPTIONS=report_objects=1 + +# --config tsan: Thread Sanitizer +build:tsan --config=sanitizer +build:tsan --copt=-fsanitize=thread +build:tsan --linkopt=-fsanitize=thread +# report_atomic_races=0: https://github.com/google/sanitizers/issues/953 +build:tsan --action_env=TSAN_OPTIONS=halt_on_error=1:second_deadlock_stack=1:report_atomic_races=0 diff --git a/pubsub-open-telemetry/BUILD.bazel b/pubsub-open-telemetry/BUILD.bazel index ff89546..e7df967 100644 --- a/pubsub-open-telemetry/BUILD.bazel +++ b/pubsub-open-telemetry/BUILD.bazel @@ -16,6 +16,27 @@ package(default_visibility = ["//visibility:private"]) licenses(["notice"]) # Apache 2.0 +cc_library( + name = "parse_args", + hdrs = ["parse_args.h"], + srcs = ["parse_args.cc"], + deps = [ + "@boost//:program_options", + "@google_cloud_cpp//:pubsub", + "@google_cloud_cpp//:opentelemetry", + ], +) + +cc_binary( + name = "publisher", + srcs = ["publisher.cc"], + deps = [ + "@google_cloud_cpp//:opentelemetry", + "@google_cloud_cpp//:pubsub", + ":parse_args", + ], +) + cc_binary( name = "quickstart", srcs = ["quickstart.cc"], diff --git a/pubsub-open-telemetry/CMakeLists.txt b/pubsub-open-telemetry/CMakeLists.txt index 33e8be4..25da5de 100644 --- a/pubsub-open-telemetry/CMakeLists.txt +++ b/pubsub-open-telemetry/CMakeLists.txt @@ -23,6 +23,19 @@ project(pubsub-open-telemetry CXX) find_package(google_cloud_cpp_pubsub CONFIG REQUIRED) find_package(google_cloud_cpp_opentelemetry CONFIG REQUIRED) +find_package(Boost 1.66 REQUIRED COMPONENTS program_options) + +add_library(parse_args STATIC parse_args.cc parse_args.h) +target_compile_features(parse_args PUBLIC cxx_std_14) +target_link_libraries( + parse_args PUBLIC Boost::program_options google-cloud-cpp::pubsub + google-cloud-cpp::opentelemetry) + +add_executable(publisher publisher.cc) +target_compile_features(publisher PRIVATE cxx_std_14) +target_link_libraries( + publisher PRIVATE google-cloud-cpp::pubsub google-cloud-cpp::opentelemetry + parse_args) add_executable(quickstart quickstart.cc) target_compile_features(quickstart PRIVATE cxx_std_14) diff --git a/pubsub-open-telemetry/README.md b/pubsub-open-telemetry/README.md index 5a2ee5b..fd688ae 100644 --- a/pubsub-open-telemetry/README.md +++ b/pubsub-open-telemetry/README.md @@ -10,9 +10,10 @@ In v2.19 release[^1], we added instrumentation for the Google Cloud Pub/Sub C++ ## Overview -The quickstart installs a Cloud Trace exporter. The application creates a Pub/Sub client with tracing enabled that publishes 5 messages and sends the collected traces to Cloud Trace. +### Quickstart +The quickstart creates a tracing enabled Pub/Sub Publisher client that publishes 5 messages and sends the collected traces to Cloud Trace. -### Example traces +#### Example traces To find the traces, navigate to the Cloud Trace UI. @@ -20,6 +21,22 @@ To find the traces, navigate to the Cloud Trace UI. For an overview of the Cloud Trace UI, see: [View traces overview]. +### Publisher + +The publisher application lets the user configure a tracing enabled Pub/Sub Publisher client to see how different configuration settings change the produced telemetry data. + +#### Example traces + +To find the traces, navigate to the Cloud Trace UI. + +##### Publish trace + +![Screenshot of the publish span in the Cloud Trace UI running publisher.](assets/publish_span.png) + +##### Create trace + +![Screenshot of the create span in the Cloud Trace UI running publisher.](assets/create_span.png) + ## Prerequisites ### 1. Create a project in the Google Cloud Platform Console @@ -90,10 +107,60 @@ cmake --build .build ### 4. Run the examples +#### Run the quickstart + ```shell .build/quickstart [project-name] [topic-id] ``` +#### Run basic publisher examples +```shell +.build/publisher [project-name] [topic-id] +.build/publisher [project-name] [topic-id] -n 1000 +.build/publisher [project-name] [topic-id] --message-size 0 +.build/publisher [project-name] [topic-id] --tracing-rate 0.01 -n 10 +``` + +#### Flow control example +```shell +.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action reject +.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action block +.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action ignore +.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --publisher-action block +``` + +#### Batching example +```shell +.build/publisher [project-name] [topic-id] -n 5 --max-batch-messages 2 --max-hold-time 100 +.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --max-hold-time 1000 +``` + +#### To see all options + +```shell +.build/publisher --help +Usage: .build/publisher +A simple publisher application with Open Telemetery enabled: + -h [ --help ] produce help message + --project-id arg the name of the Google Cloud project + --topic-id arg the name of the Google Cloud topic + --tracing-rate arg (=1) otel::BasicTracingRateOption value + --max-queue-size arg (=0) If set to 0, uses the default tracing + configuration. + -n [ --message-count ] arg (=1) the number of messages to publish + --message-size arg (=1) the desired message payload size + --enable-ordering-keys arg (=0) If set to true, the messages will be sent + with ordering keys. There will be 3 possible + ordering keys and they will be set randomly + --max-pending-messages arg pubsub::MaxPendingMessagesOption value + --max-pending-bytes arg pubsub::MaxPendingBytesOption value + --publisher-action arg pubsub::FullPublisherAction value + (block|ignore|reject) + --max-hold-time arg pubsub::MaxHoldTimeOption value in us + --max-batch-bytes arg pubsub::MaxBatchBytesOption value + --max-batch-messages arg pubsub::MaxBatchMessagesOption value +``` + ## Build and run using Bazel ### 1. Download or clone this repo @@ -111,8 +178,17 @@ bazel build //:quickstart ### 3. Run these examples +#### Run the quickstart +```shell +bazel run //:quickstart [project-name] [topic-id] +``` + +#### Run basic publisher examples ```shell - bazel run //:quickstart [project-name] [topic-id] +bazel run //:publisher [project-name] [topic-id] +bazel run //:publisher -- [project-name] [topic-id] -n 1000 +bazel run //:publisher -- [project-name] [topic-id] --message_size 0 +bazel run //:publisher -- [project-name] [topic-id] --tracing-rate 0.01 -n 10 ``` #### Run with a local version of google-cloud-cpp diff --git a/pubsub-open-telemetry/WORKSPACE.bazel b/pubsub-open-telemetry/WORKSPACE.bazel index ce2dc2f..8716fd1 100644 --- a/pubsub-open-telemetry/WORKSPACE.bazel +++ b/pubsub-open-telemetry/WORKSPACE.bazel @@ -35,3 +35,14 @@ load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps") grpc_deps() load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps") grpc_extra_deps() + +# Boost +http_archive( + name = "com_github_nelhage_rules_boost", + url = "https://github.com/nelhage/rules_boost/archive/57be00c2e5f30bc8698be2453b3f278909e8eb2f.tar.gz", + strip_prefix = "rules_boost-57be00c2e5f30bc8698be2453b3f278909e8eb2f", + sha256 = "3faec27d6ad2d176ee7674298e549263fceb62198ec55a4821a402c20a9a70b8" +) + +load("@com_github_nelhage_rules_boost//:boost/boost.bzl", "boost_deps") +boost_deps() diff --git a/pubsub-open-telemetry/assets/create_span.png b/pubsub-open-telemetry/assets/create_span.png new file mode 100644 index 0000000..c12eb49 Binary files /dev/null and b/pubsub-open-telemetry/assets/create_span.png differ diff --git a/pubsub-open-telemetry/assets/publish_span.png b/pubsub-open-telemetry/assets/publish_span.png new file mode 100644 index 0000000..790403c Binary files /dev/null and b/pubsub-open-telemetry/assets/publish_span.png differ diff --git a/pubsub-open-telemetry/parse_args.cc b/pubsub-open-telemetry/parse_args.cc new file mode 100644 index 0000000..a4d7e2b --- /dev/null +++ b/pubsub-open-telemetry/parse_args.cc @@ -0,0 +1,168 @@ +// Copyright 2023 Google LLC. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "parse_args.h" +#include "google/cloud/opentelemetry/configure_basic_tracing.h" +#include "google/cloud/opentelemetry_options.h" +#include "google/cloud/pubsub/options.h" +#include +#include +#include +#include +#include + +namespace po = ::boost::program_options; + +ParseResult ParseArguments(int argc, char* argv[]) { + po::positional_options_description positional; + positional.add("project-id", 1); + positional.add("topic-id", 2); + po::options_description desc( + "A simple publisher application with Open Telemetery enabled"); + // The following empty line comments are for readability. + desc.add_options() + // + ("help,h", "produce help message") + // + ("project-id", po::value()->required(), + "the name of the Google Cloud project") + // + ("topic-id", po::value()->required(), + "the name of the Google Cloud topic") + // Tracing options + ("tracing-rate", po::value()->default_value(1.0), + "otel::BasicTracingRateOption value") + // Processor options + ("max-queue-size", po::value()->default_value(2048), + "set the max queue size for open telemetery") + // Message options + ("message-count,n", po::value()->default_value(1), + "the number of messages to publish") + // + ("message-size", po::value()->default_value(1), + "the desired message payload size") + // Flow control options + ("max-pending-messages", po::value(), + "pubsub::MaxPendingMessagesOption value") + // + ("max-pending-bytes", po::value(), + "pubsub::MaxPendingBytesOption value") + // + ("publisher-action", po::value(), + "pubsub::FullPublisherAction value " + "(block|ignore|reject)") + // Batching options + ("max-hold-time", po::value(), + "pubsub::MaxHoldTimeOption value in us") + // + ("max-batch-bytes", po::value(), + "pubsub::MaxBatchBytesOption value") + // + ("max-batch-messages", po::value(), + "pubsub::MaxBatchMessagesOption value"); + + po::variables_map vm; + po::store(po::command_line_parser(argc, argv) + .options(desc) + .positional(positional) + .run(), + vm); + + ParseResult result; + if (vm.count("help") || argc == 1) { + std::cerr << "Usage: " << argv[0] << " \n"; + std::cerr << desc; + return result; + } + + // This must come before po::notify which raises any errors when parsing the + // arguments. This ensures if --help is passed, the program does not raise any + // issues about missing required arguments. + po::notify(vm); + // Get arguments that are required or optional and have defaults set + auto const project_id = vm["project-id"].as(); + auto const topic_id = vm["topic-id"].as(); + auto const tracing_rate = vm["tracing-rate"].as(); + auto const message_count = vm["message-count"].as(); + auto const message_size = vm["message-size"].as(); + auto const max_queue_size = vm["max-queue-size"].as(); + + // Validate the command-line options. + if (project_id.empty()) { + throw std::runtime_error("The project-id cannot be empty"); + } + if (topic_id.empty()) { + throw std::runtime_error("The topic-id cannot be empty"); + } + if (tracing_rate == 0) { + throw std::runtime_error( + "Setting the tracing rate to 0 will produce zero traces."); + } + if (message_count == 0) { + throw std::runtime_error( + "Setting the message count to 0 will produce zero traces."); + } + result.project_id = project_id; + result.topic_id = topic_id; + result.message_count = message_count; + result.message_size = message_size; + result.max_queue_size = max_queue_size; + result.otel_options = + gc::Options{}.set(tracing_rate); + result.publisher_options = + gc::Options{}.set(true); + if (vm.count("max-pending-messages")) { + auto const max_pending_messages = + vm["max-pending-messages"].as(); + result.publisher_options.set( + max_pending_messages); + } + if (vm.count("max-pending-bytes")) { + auto const max_pending_bytes = vm["max-pending-bytes"].as(); + result.publisher_options.set( + max_pending_bytes); + } + if (vm.count("publisher-action")) { + auto const publisher_action = vm["publisher-action"].as(); + gc::pubsub::FullPublisherAction action; + if (publisher_action == "reject") { + action = gc::pubsub::FullPublisherAction::kRejects; + } else if (publisher_action == "block") { + action = gc::pubsub::FullPublisherAction::kBlocks; + } else if (publisher_action == "ignore") { + action = gc::pubsub::FullPublisherAction::kIgnored; + } else { + throw std::runtime_error( + "publisher-action is invalid. it must be one of the three values: " + "block|ignore|reject"); + } + result.publisher_options.set(action); + } + if (vm.count("max-hold-time")) { + auto const max_hold_time = vm["max-hold-time"].as(); + result.publisher_options.set( + std::chrono::microseconds(max_hold_time)); + } + if (vm.count("max-batch-bytes")) { + auto const max_batch_bytes = vm["max-batch-bytes"].as(); + result.publisher_options.set( + max_batch_bytes); + } + if (vm.count("max-batch-messages")) { + auto const max_batch_messages = vm["max-batch-messages"].as(); + result.publisher_options.set( + max_batch_messages); + } + return result; +} diff --git a/pubsub-open-telemetry/parse_args.h b/pubsub-open-telemetry/parse_args.h new file mode 100644 index 0000000..347db7c --- /dev/null +++ b/pubsub-open-telemetry/parse_args.h @@ -0,0 +1,41 @@ +// Copyright 2023 Google LLC. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef CPP_SAMPLES_PUBSUB_OPEN_TELEMETRY_PARSE_ARGUMENTS_H +#define CPP_SAMPLES_PUBSUB_OPEN_TELEMETRY_PARSE_ARGUMENTS_H + +#include "google/cloud/options.h" +#include +#include + +namespace gc = ::google::cloud; + +// Parse the command line arguments. +struct ParseResult { + // Required. + std::string project_id; + std::string topic_id; + + // Optional with defaults set. + int message_count; + int message_size; + int max_queue_size; + + gc::Options otel_options; + gc::Options publisher_options; +}; + +ParseResult ParseArguments(int argc, char* argv[]); + +#endif // CPP_SAMPLES_PUBSUB_OPEN_TELEMETRY_PARSE_ARGUMENTS_H diff --git a/pubsub-open-telemetry/publisher.cc b/pubsub-open-telemetry/publisher.cc new file mode 100644 index 0000000..a80a997 --- /dev/null +++ b/pubsub-open-telemetry/publisher.cc @@ -0,0 +1,88 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/pubsub/publisher.h" +#include "google/cloud/opentelemetry/configure_basic_tracing.h" +#include "google/cloud/opentelemetry/trace_exporter.h" +#include "google/cloud/opentelemetry_options.h" +#include "parse_args.h" +#include +#include +#include +#include +#include +#include +#include + +// Create a few namespace aliases to make the code easier to read. +namespace gc = ::google::cloud; +namespace pubsub = gc::pubsub; +namespace otel = gc::otel; + +std::string GeneratePayload(int payload_size) { + auto gen = google::cloud::internal::DefaultPRNG(std::random_device{}()); + const std::string charset = "abcdefghijklmnopqrstuvwxyz"; + std::uniform_int_distribution rd(0, charset.size() - 1); + + std::string result(payload_size, '0'); + std::generate(result.begin(), result.end(), + [&rd, &gen, &charset]() { return charset[rd(gen)]; }); + return result; +} + +int main(int argc, char* argv[]) try { + auto args = ParseArguments(argc, argv); + if (args.project_id.empty() && args.topic_id.empty()) { + return 1; + } + + std::cout << "Using project `" << args.project_id << "` and topic `" + << args.topic_id << "`\n"; + auto exporter = otel::MakeTraceExporter(gc::Project(args.project_id)); + opentelemetry::sdk::trace::BatchSpanProcessorOptions span_options; + span_options.max_queue_size = args.max_queue_size; + auto processor = opentelemetry::sdk::trace::BatchSpanProcessorFactory::Create( + std::move(exporter), span_options); + auto provider = opentelemetry::sdk::trace::TracerProviderFactory::Create( + std::move(processor)); + opentelemetry::trace::Provider::SetTracerProvider(std::move(provider)); + + auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection( + pubsub::Topic(args.project_id, args.topic_id), args.publisher_options)); + + std::cout << "Publishing " << std::to_string(args.message_count) + << " message(s) with payload size " + << std::to_string(args.message_size) << "...\n"; + std::vector> ids; + for (int i = 0; i < args.message_count; i++) { + auto id = publisher + .Publish(pubsub::MessageBuilder() + .SetData(GeneratePayload(args.message_size)) + .Build()) + .then([](gc::future> f) { + return f.get().value(); + }); + ids.push_back(std::move(id)); + } + for (auto& id : ids) try { + std::cout << "Sent message with id: " << id.get() << "\n"; + } catch (std::exception const& ex) { + std::cout << "Error in publish: " << ex.what() << "\n"; + } + + return 0; +} catch (google::cloud::Status const& status) { + std::cerr << "google::cloud::Status thrown: " << status << "\n"; + return 1; +} diff --git a/pubsub-open-telemetry/vcpkg.json b/pubsub-open-telemetry/vcpkg.json index a3a2a21..b7d5de3 100644 --- a/pubsub-open-telemetry/vcpkg.json +++ b/pubsub-open-telemetry/vcpkg.json @@ -10,6 +10,7 @@ "pubsub", "opentelemetry" ] - } + }, + "boost-program-options" ] }