Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl(pubsub-open-telemetry): add a publisher executable #268

Merged
merged 14 commits into from
Dec 1, 2023
21 changes: 21 additions & 0 deletions pubsub-open-telemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ package(default_visibility = ["//visibility:private"])

licenses(["notice"]) # Apache 2.0

cc_library(
name = "parse_args",
hdrs = ["parse_args.h"],
srcs = ["parse_args.cc"],
deps = [
"@boost//:program_options",
"@google_cloud_cpp//:pubsub",
"@google_cloud_cpp//:opentelemetry",
],
)

cc_binary(
name = "publisher",
srcs = ["publisher.cc"],
deps = [
"@google_cloud_cpp//:opentelemetry",
"@google_cloud_cpp//:pubsub",
":parse_args",
],
)

cc_binary(
name = "quickstart",
srcs = ["quickstart.cc"],
Expand Down
13 changes: 13 additions & 0 deletions pubsub-open-telemetry/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ project(pubsub-open-telemetry CXX)

find_package(google_cloud_cpp_pubsub CONFIG REQUIRED)
find_package(google_cloud_cpp_opentelemetry CONFIG REQUIRED)
find_package(Boost 1.66 REQUIRED COMPONENTS program_options)

add_library(parse_args STATIC parse_args.cc parse_args.h)
target_compile_features(parse_args PUBLIC cxx_std_14)
target_link_libraries(
parse_args PUBLIC Boost::program_options google-cloud-cpp::pubsub
google-cloud-cpp::opentelemetry)

add_executable(publisher publisher.cc)
target_compile_features(publisher PRIVATE cxx_std_14)
target_link_libraries(
publisher PRIVATE google-cloud-cpp::pubsub google-cloud-cpp::opentelemetry
parse_args)

add_executable(quickstart quickstart.cc)
target_compile_features(quickstart PRIVATE cxx_std_14)
Expand Down
82 changes: 79 additions & 3 deletions pubsub-open-telemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,33 @@ In v2.19 release[^1], we added instrumentation for the Google Cloud Pub/Sub C++

## Overview

The quickstart installs a Cloud Trace exporter. The application creates a Pub/Sub client with tracing enabled that publishes 5 messages and sends the collected traces to Cloud Trace.
### Quickstart
The quickstart creates a tracing enabled Pub/Sub Publisher client that publishes 5 messages and sends the collected traces to Cloud Trace.

### Example traces
#### Example traces

To find the traces, navigate to the Cloud Trace UI.

![Screenshot of the Cloud Trace UI after running this quickstart.](assets/quickstart.png)

For an overview of the Cloud Trace UI, see: [View traces overview].

### Publisher

The publisher application lets the user configure a tracing enabled Pub/Sub Publisher client to see how different configuration settings change the produced telemetry data.

#### Example traces

To find the traces, navigate to the Cloud Trace UI.

##### Publish trace

![Screenshot of the publish span in the Cloud Trace UI running publisher.](assets/publish_span.png)

##### Create trace

![Screenshot of the create span in the Cloud Trace UI running publisher.](assets/create_span.png)

## Prerequisites

### 1. Create a project in the Google Cloud Platform Console
Expand Down Expand Up @@ -90,10 +107,60 @@ cmake --build .build

### 4. Run the examples

#### Run the quickstart

```shell
.build/quickstart [project-name] [topic-id]
```

#### Run basic publisher examples
```shell
.build/publisher [project-name] [topic-id]
.build/publisher [project-name] [topic-id] -n 1000
.build/publisher [project-name] [topic-id] --message-size 0
.build/publisher [project-name] [topic-id] --tracing-rate 0.01 -n 10
```

#### Flow control example
```shell
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action reject
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action block
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action ignore
.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --publisher-action block
```

#### Batching example
```shell
.build/publisher [project-name] [topic-id] -n 5 --max-batch-messages 2 --max-hold-time 100
.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --max-hold-time 1000
```

#### To see all options

```shell
❯ .build/publisher --help
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
Usage: .build/publisher <project-id> <topic-id>
A simple publisher application with Open Telemetery enabled:
-h [ --help ] produce help message
--project-id arg the name of the Google Cloud project
--topic-id arg the name of the Google Cloud topic
--tracing-rate arg (=1) otel::BasicTracingRateOption value
--max-queue-size arg (=0) If set to 0, uses the default tracing
configuration.
-n [ --message-count ] arg (=1) the number of messages to publish
--message-size arg (=1) the desired message payload size
--enable-ordering-keys arg (=0) If set to true, the messages will be sent
with ordering keys. There will be 3 possible
ordering keys and they will be set randomly
--max-pending-messages arg pubsub::MaxPendingMessagesOption value
--max-pending-bytes arg pubsub::MaxPendingBytesOption value
--publisher-action arg pubsub::FullPublisherAction value
(block|ignore|reject)
--max-hold-time arg pubsub::MaxHoldTimeOption value in us
--max-batch-bytes arg pubsub::MaxBatchBytesOption value
--max-batch-messages arg pubsub::MaxBatchMessagesOption value
```

## Build and run using Bazel

### 1. Download or clone this repo
Expand All @@ -111,8 +178,17 @@ bazel build //:quickstart

### 3. Run these examples

#### Run the quickstart
```shell
bazel run //:quickstart [project-name] [topic-id]
```

#### Run basic publisher examples
```shell
bazel run //:quickstart [project-name] [topic-id]
bazel run //:publisher [project-name] [topic-id]
bazel run //:publisher -- [project-name] [topic-id] -n 1000
bazel run //:publisher -- [project-name] [topic-id] --message_size 0
bazel run //:publisher -- [project-name] [topic-id] --tracing-rate 0.01 -n 10
```

#### Run with a local version of google-cloud-cpp
Expand Down
11 changes: 11 additions & 0 deletions pubsub-open-telemetry/WORKSPACE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps")
grpc_deps()
load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps")
grpc_extra_deps()

# Boost
http_archive(
name = "com_github_nelhage_rules_boost",
url = "https://github.com/nelhage/rules_boost/archive/57be00c2e5f30bc8698be2453b3f278909e8eb2f.tar.gz",
strip_prefix = "rules_boost-57be00c2e5f30bc8698be2453b3f278909e8eb2f",
sha256 = "3faec27d6ad2d176ee7674298e549263fceb62198ec55a4821a402c20a9a70b8"
)

load("@com_github_nelhage_rules_boost//:boost/boost.bzl", "boost_deps")
boost_deps()
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
Binary file added pubsub-open-telemetry/assets/create_span.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added pubsub-open-telemetry/assets/publish_span.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
169 changes: 169 additions & 0 deletions pubsub-open-telemetry/parse_args.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2023 Google Inc.
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "parse_args.h"
#include "google/cloud/opentelemetry/configure_basic_tracing.h"
#include "google/cloud/opentelemetry_options.h"
#include "google/cloud/pubsub/options.h"
#include <boost/program_options.hpp>
#include <algorithm>
#include <cctype>
#include <stdexcept>
#include <string>

namespace po = ::boost::program_options;

ParseResult ParseArguments(int argc, char* argv[]) {
po::positional_options_description positional;
positional.add("project-id", 1);
positional.add("topic-id", 2);
po::options_description desc(
"A simple publisher application with Open Telemetery enabled");
// The following empty line comments are for readability.
desc.add_options()
//
("help,h", "produce help message")
//
("project-id", po::value<std::string>()->required(),
"the name of the Google Cloud project")
//
("topic-id", po::value<std::string>()->required(),
"the name of the Google Cloud topic")
// Tracing options
("tracing-rate", po::value<double>()->default_value(1.0),
"otel::BasicTracingRateOption value")
// Processor options
("max-queue-size", po::value<int>()->default_value(0),
"If set to 0, uses the default tracing configuration.")
// Message options
("message-count,n", po::value<int>()->default_value(1),
"the number of messages to publish")
//
("message-size", po::value<int>()->default_value(1),
"the desired message payload size")
// Flow control options
("max-pending-messages", po::value<std::size_t>(),
"pubsub::MaxPendingMessagesOption value")
//
("max-pending-bytes", po::value<std::size_t>(),
"pubsub::MaxPendingBytesOption value")
//
("publisher-action", po::value<std::string>(),
"pubsub::FullPublisherAction value "
"(block|ignore|reject)")
// Batching options
("max-hold-time", po::value<int>(),
"pubsub::MaxHoldTimeOption value in us")
//
("max-batch-bytes", po::value<std::size_t>(),
"pubsub::MaxBatchBytesOption value")
//
("max-batch-messages", po::value<std::size_t>(),
"pubsub::MaxBatchMessagesOption value");

po::variables_map vm;
po::store(po::command_line_parser(argc, argv)
.options(desc)
.positional(positional)
.run(),
vm);

ParseResult result;
if (vm.count("help") || argc == 1) {
std::cerr << "Usage: " << argv[0] << " <project-id> <topic-id>\n";
std::cerr << desc;
return result;
}

// This must come before po::notify which raises any errors when parsing the
// arguments. This ensures if --help is passed, the program does not raise any
// issues about missing required arguments.
po::notify(vm);
// Get arguments that are required or optional and have defaults set
auto const project_id = vm["project-id"].as<std::string>();
auto const topic_id = vm["topic-id"].as<std::string>();
auto const tracing_rate = vm["tracing-rate"].as<double>();
auto const message_count = vm["message-count"].as<int>();
auto const message_size = vm["message-size"].as<int>();
auto const max_queue_size = vm["max-queue-size"].as<int>();

// Validate the command-line options.
if (project_id.empty()) {
throw std::runtime_error("The project-id cannot be empty");
}
if (topic_id.empty()) {
throw std::runtime_error("The topic-id cannot be empty");
}
if (tracing_rate == 0) {
throw std::runtime_error(
"Setting the tracing rate to 0 will produce zero traces.");
}
if (message_count == 0) {
throw std::runtime_error(
"Setting the message count to 0 will produce zero traces.");
}
result.project_id = project_id;
result.topic_id = topic_id;
result.message_count = message_count;
result.message_size = message_size;
result.max_queue_size = max_queue_size;
result.otel_options =
gc::Options{}.set<gc::otel::BasicTracingRateOption>(tracing_rate);
result.publisher_options =
gc::Options{}.set<gc::OpenTelemetryTracingOption>(true);
std::cout << static_cast<bool>(result.publisher_options.get<gc::OpenTelemetryTracingOption>()) << "\n";
if (vm.count("max-pending-messages")) {
auto const max_pending_messages =
vm["max-pending-messages"].as<std::size_t>();
result.publisher_options.set<gc::pubsub::MaxPendingMessagesOption>(
max_pending_messages);
}
if (vm.count("max-pending-bytes")) {
auto const max_pending_bytes = vm["max-pending-bytes"].as<std::size_t>();
result.publisher_options.set<gc::pubsub::MaxPendingBytesOption>(
max_pending_bytes);
}
if (vm.count("publisher-action")) {
auto const publisher_action = vm["publisher-action"].as<std::string>();
gc::pubsub::FullPublisherAction action;
if (publisher_action == "reject") {
action = gc::pubsub::FullPublisherAction::kRejects;
} else if (publisher_action == "block") {
action = gc::pubsub::FullPublisherAction::kBlocks;
} else if (publisher_action == "ignore") {
action = gc::pubsub::FullPublisherAction::kIgnored;
} else {
throw std::runtime_error(
"publisher-action is invalid. it must be one of the three values: "
"block|ignore|reject");
}
result.publisher_options.set<gc::pubsub::FullPublisherActionOption>(action);
}
if (vm.count("max-hold-time")) {
auto const max_hold_time = vm["max-hold-time"].as<int>();
result.publisher_options.set<gc::pubsub::MaxHoldTimeOption>(
std::chrono::microseconds(max_hold_time));
}
if (vm.count("max-batch-bytes")) {
auto const max_batch_bytes = vm["max-batch-bytes"].as<std::size_t>();
result.publisher_options.set<gc::pubsub::MaxBatchBytesOption>(
max_batch_bytes);
}
if (vm.count("max-batch-messages")) {
auto const max_batch_messages = vm["max-batch-messages"].as<std::size_t>();
result.publisher_options.set<gc::pubsub::MaxBatchMessagesOption>(
max_batch_messages);
}
return result;
}
41 changes: 41 additions & 0 deletions pubsub-open-telemetry/parse_args.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 Google Inc.
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef CPP_SAMPLES_PUBSUB_OPEN_TELEMETRY_PARSE_ARGUMENTS_H
#define CPP_SAMPLES_PUBSUB_OPEN_TELEMETRY_PARSE_ARGUMENTS_H

#include "google/cloud/options.h"
#include <optional>
#include <string>

namespace gc = ::google::cloud;

// Parse the command line arguments.
struct ParseResult {
// Required.
std::string project_id;
std::string topic_id;

// Optional with defaults set.
int message_count;
int message_size;
int max_queue_size;

gc::Options otel_options;
gc::Options publisher_options;
};

ParseResult ParseArguments(int argc, char* argv[]);

#endif // CPP_SAMPLES_PUBSUB_OPEN_TELEMETRY_PARSE_ARGUMENTS_H
Loading
Loading