Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl(pubsub-open-telemetry): add a publisher executable #268

Merged
merged 14 commits into from
Dec 1, 2023
32 changes: 32 additions & 0 deletions pubsub-open-telemetry/.bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,35 @@ build --experimental_convenience_symlinks=ignore
# Enable OpenTelemetry tracing instrumentation for google-cloud-cpp.
build --@io_opentelemetry_cpp//api:with_abseil
build --@google_cloud_cpp//:enable_opentelemetry


# Clang Sanitizers, use with (for example):
#
# --client_env=CXX=clang++ --client_env=CC=clang --config asan
#

# --config sanitizer refactors comment settings for all sanitizers
build:sanitizer --strip=never
build:sanitizer --copt=-Og
build:sanitizer --copt=-g
build:sanitizer --copt=-fno-omit-frame-pointer

# --config asan: Address Sanitizer
build:asan --config=sanitizer
build:asan --copt=-fsanitize=address
# Protobuf enables -Werror by default. We are building with GCC 13. With
# -Werror and Address Sanitizer the compiler emits warnings in some of the
# Protobuf code, and that stops the build. It may be a compiler bug, or Protobuf
# may be assuming that some compiler flags are set when compiling with Address
# Sanitizier (-DADDRESS_SANITIZER=1 is one possibility).
build:asan --per_file_copt=com_google_protobuf//@-Wno-error
build:asan --linkopt=-fsanitize=address
build:asan --action_env=ASAN_OPTIONS=detect_leaks=1:color=always
build:asan --action_env=LSAN_OPTIONS=report_objects=1

# --config tsan: Thread Sanitizer
build:tsan --config=sanitizer
build:tsan --copt=-fsanitize=thread
build:tsan --linkopt=-fsanitize=thread
# report_atomic_races=0: https://github.com/google/sanitizers/issues/953
build:tsan --action_env=TSAN_OPTIONS=halt_on_error=1:second_deadlock_stack=1:report_atomic_races=0
21 changes: 21 additions & 0 deletions pubsub-open-telemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ package(default_visibility = ["//visibility:private"])

licenses(["notice"]) # Apache 2.0

cc_library(
name = "parse_args",
hdrs = ["parse_args.h"],
srcs = ["parse_args.cc"],
deps = [
"@boost//:program_options",
"@google_cloud_cpp//:pubsub",
"@google_cloud_cpp//:opentelemetry",
],
)

cc_binary(
name = "publisher",
srcs = ["publisher.cc"],
deps = [
"@google_cloud_cpp//:opentelemetry",
"@google_cloud_cpp//:pubsub",
":parse_args",
],
)

cc_binary(
name = "quickstart",
srcs = ["quickstart.cc"],
Expand Down
13 changes: 13 additions & 0 deletions pubsub-open-telemetry/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ project(pubsub-open-telemetry CXX)

find_package(google_cloud_cpp_pubsub CONFIG REQUIRED)
find_package(google_cloud_cpp_opentelemetry CONFIG REQUIRED)
find_package(Boost 1.66 REQUIRED COMPONENTS program_options)

add_library(parse_args STATIC parse_args.cc parse_args.h)
target_compile_features(parse_args PUBLIC cxx_std_14)
target_link_libraries(
parse_args PUBLIC Boost::program_options google-cloud-cpp::pubsub
google-cloud-cpp::opentelemetry)

add_executable(publisher publisher.cc)
target_compile_features(publisher PRIVATE cxx_std_14)
target_link_libraries(
publisher PRIVATE google-cloud-cpp::pubsub google-cloud-cpp::opentelemetry
parse_args)

add_executable(quickstart quickstart.cc)
target_compile_features(quickstart PRIVATE cxx_std_14)
Expand Down
82 changes: 79 additions & 3 deletions pubsub-open-telemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,33 @@ In v2.19 release[^1], we added instrumentation for the Google Cloud Pub/Sub C++

## Overview

The quickstart installs a Cloud Trace exporter. The application creates a Pub/Sub client with tracing enabled that publishes 5 messages and sends the collected traces to Cloud Trace.
### Quickstart
The quickstart creates a tracing enabled Pub/Sub Publisher client that publishes 5 messages and sends the collected traces to Cloud Trace.

### Example traces
#### Example traces

To find the traces, navigate to the Cloud Trace UI.

![Screenshot of the Cloud Trace UI after running this quickstart.](assets/quickstart.png)

For an overview of the Cloud Trace UI, see: [View traces overview].

### Publisher

The publisher application lets the user configure a tracing enabled Pub/Sub Publisher client to see how different configuration settings change the produced telemetry data.

#### Example traces

To find the traces, navigate to the Cloud Trace UI.

##### Publish trace

![Screenshot of the publish span in the Cloud Trace UI running publisher.](assets/publish_span.png)

##### Create trace

![Screenshot of the create span in the Cloud Trace UI running publisher.](assets/create_span.png)

## Prerequisites

### 1. Create a project in the Google Cloud Platform Console
Expand Down Expand Up @@ -90,10 +107,60 @@ cmake --build .build

### 4. Run the examples

#### Run the quickstart

```shell
.build/quickstart [project-name] [topic-id]
```

#### Run basic publisher examples
```shell
.build/publisher [project-name] [topic-id]
.build/publisher [project-name] [topic-id] -n 1000
.build/publisher [project-name] [topic-id] --message-size 0
.build/publisher [project-name] [topic-id] --tracing-rate 0.01 -n 10
```

#### Flow control example
```shell
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action reject
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action block
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action ignore
.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --publisher-action block
```

#### Batching example
```shell
.build/publisher [project-name] [topic-id] -n 5 --max-batch-messages 2 --max-hold-time 100
.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --max-hold-time 1000
```

#### To see all options

```shell
❯ .build/publisher --help
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
Usage: .build/publisher <project-id> <topic-id>
A simple publisher application with Open Telemetery enabled:
-h [ --help ] produce help message
--project-id arg the name of the Google Cloud project
--topic-id arg the name of the Google Cloud topic
--tracing-rate arg (=1) otel::BasicTracingRateOption value
--max-queue-size arg (=0) If set to 0, uses the default tracing
configuration.
-n [ --message-count ] arg (=1) the number of messages to publish
--message-size arg (=1) the desired message payload size
--enable-ordering-keys arg (=0) If set to true, the messages will be sent
with ordering keys. There will be 3 possible
ordering keys and they will be set randomly
--max-pending-messages arg pubsub::MaxPendingMessagesOption value
--max-pending-bytes arg pubsub::MaxPendingBytesOption value
--publisher-action arg pubsub::FullPublisherAction value
(block|ignore|reject)
--max-hold-time arg pubsub::MaxHoldTimeOption value in us
--max-batch-bytes arg pubsub::MaxBatchBytesOption value
--max-batch-messages arg pubsub::MaxBatchMessagesOption value
```

## Build and run using Bazel

### 1. Download or clone this repo
Expand All @@ -111,8 +178,17 @@ bazel build //:quickstart

### 3. Run these examples

#### Run the quickstart
```shell
bazel run //:quickstart [project-name] [topic-id]
```

#### Run basic publisher examples
```shell
bazel run //:quickstart [project-name] [topic-id]
bazel run //:publisher [project-name] [topic-id]
bazel run //:publisher -- [project-name] [topic-id] -n 1000
bazel run //:publisher -- [project-name] [topic-id] --message_size 0
bazel run //:publisher -- [project-name] [topic-id] --tracing-rate 0.01 -n 10
```

#### Run with a local version of google-cloud-cpp
Expand Down
11 changes: 11 additions & 0 deletions pubsub-open-telemetry/WORKSPACE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps")
grpc_deps()
load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps")
grpc_extra_deps()

# Boost
http_archive(
name = "com_github_nelhage_rules_boost",
url = "https://github.com/nelhage/rules_boost/archive/57be00c2e5f30bc8698be2453b3f278909e8eb2f.tar.gz",
strip_prefix = "rules_boost-57be00c2e5f30bc8698be2453b3f278909e8eb2f",
sha256 = "3faec27d6ad2d176ee7674298e549263fceb62198ec55a4821a402c20a9a70b8"
)

load("@com_github_nelhage_rules_boost//:boost/boost.bzl", "boost_deps")
boost_deps()
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
Binary file added pubsub-open-telemetry/assets/create_span.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added pubsub-open-telemetry/assets/publish_span.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
168 changes: 168 additions & 0 deletions pubsub-open-telemetry/parse_args.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2023 Google Inc.
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "parse_args.h"
#include "google/cloud/opentelemetry/configure_basic_tracing.h"
#include "google/cloud/opentelemetry_options.h"
#include "google/cloud/pubsub/options.h"
#include <boost/program_options.hpp>
#include <algorithm>
#include <cctype>
#include <stdexcept>
#include <string>

namespace po = ::boost::program_options;

ParseResult ParseArguments(int argc, char* argv[]) {
po::positional_options_description positional;
positional.add("project-id", 1);
positional.add("topic-id", 2);
po::options_description desc(
"A simple publisher application with Open Telemetery enabled");
// The following empty line comments are for readability.
desc.add_options()
//
("help,h", "produce help message")
//
("project-id", po::value<std::string>()->required(),
"the name of the Google Cloud project")
//
("topic-id", po::value<std::string>()->required(),
"the name of the Google Cloud topic")
// Tracing options
("tracing-rate", po::value<double>()->default_value(1.0),
"otel::BasicTracingRateOption value")
// Processor options
("max-queue-size", po::value<int>()->default_value(0),
"If set to 0, uses the default tracing configuration.")
// Message options
("message-count,n", po::value<int>()->default_value(1),
"the number of messages to publish")
//
("message-size", po::value<int>()->default_value(1),
"the desired message payload size")
// Flow control options
("max-pending-messages", po::value<std::size_t>(),
"pubsub::MaxPendingMessagesOption value")
//
("max-pending-bytes", po::value<std::size_t>(),
"pubsub::MaxPendingBytesOption value")
//
("publisher-action", po::value<std::string>(),
"pubsub::FullPublisherAction value "
"(block|ignore|reject)")
// Batching options
("max-hold-time", po::value<int>(),
"pubsub::MaxHoldTimeOption value in us")
//
("max-batch-bytes", po::value<std::size_t>(),
"pubsub::MaxBatchBytesOption value")
//
("max-batch-messages", po::value<std::size_t>(),
"pubsub::MaxBatchMessagesOption value");

po::variables_map vm;
po::store(po::command_line_parser(argc, argv)
.options(desc)
.positional(positional)
.run(),
vm);

ParseResult result;
if (vm.count("help") || argc == 1) {
std::cerr << "Usage: " << argv[0] << " <project-id> <topic-id>\n";
std::cerr << desc;
return result;
}

// This must come before po::notify which raises any errors when parsing the
// arguments. This ensures if --help is passed, the program does not raise any
// issues about missing required arguments.
po::notify(vm);
// Get arguments that are required or optional and have defaults set
auto const project_id = vm["project-id"].as<std::string>();
auto const topic_id = vm["topic-id"].as<std::string>();
auto const tracing_rate = vm["tracing-rate"].as<double>();
auto const message_count = vm["message-count"].as<int>();
auto const message_size = vm["message-size"].as<int>();
auto const max_queue_size = vm["max-queue-size"].as<int>();

// Validate the command-line options.
if (project_id.empty()) {
throw std::runtime_error("The project-id cannot be empty");
}
if (topic_id.empty()) {
throw std::runtime_error("The topic-id cannot be empty");
}
if (tracing_rate == 0) {
throw std::runtime_error(
"Setting the tracing rate to 0 will produce zero traces.");
}
if (message_count == 0) {
throw std::runtime_error(
"Setting the message count to 0 will produce zero traces.");
}
result.project_id = project_id;
result.topic_id = topic_id;
result.message_count = message_count;
result.message_size = message_size;
result.max_queue_size = max_queue_size;
result.otel_options =
gc::Options{}.set<gc::otel::BasicTracingRateOption>(tracing_rate);
result.publisher_options =
gc::Options{}.set<gc::OpenTelemetryTracingOption>(true);
if (vm.count("max-pending-messages")) {
auto const max_pending_messages =
vm["max-pending-messages"].as<std::size_t>();
result.publisher_options.set<gc::pubsub::MaxPendingMessagesOption>(
max_pending_messages);
}
if (vm.count("max-pending-bytes")) {
auto const max_pending_bytes = vm["max-pending-bytes"].as<std::size_t>();
result.publisher_options.set<gc::pubsub::MaxPendingBytesOption>(
max_pending_bytes);
}
if (vm.count("publisher-action")) {
auto const publisher_action = vm["publisher-action"].as<std::string>();
gc::pubsub::FullPublisherAction action;
if (publisher_action == "reject") {
action = gc::pubsub::FullPublisherAction::kRejects;
} else if (publisher_action == "block") {
action = gc::pubsub::FullPublisherAction::kBlocks;
} else if (publisher_action == "ignore") {
action = gc::pubsub::FullPublisherAction::kIgnored;
} else {
throw std::runtime_error(
"publisher-action is invalid. it must be one of the three values: "
"block|ignore|reject");
}
result.publisher_options.set<gc::pubsub::FullPublisherActionOption>(action);
}
if (vm.count("max-hold-time")) {
auto const max_hold_time = vm["max-hold-time"].as<int>();
result.publisher_options.set<gc::pubsub::MaxHoldTimeOption>(
std::chrono::microseconds(max_hold_time));
}
if (vm.count("max-batch-bytes")) {
auto const max_batch_bytes = vm["max-batch-bytes"].as<std::size_t>();
result.publisher_options.set<gc::pubsub::MaxBatchBytesOption>(
max_batch_bytes);
}
if (vm.count("max-batch-messages")) {
auto const max_batch_messages = vm["max-batch-messages"].as<std::size_t>();
result.publisher_options.set<gc::pubsub::MaxBatchMessagesOption>(
max_batch_messages);
}
return result;
}
Loading
Loading