Skip to content

Commit

Permalink
grpc: Google gRPC C++ client implementation. (envoyproxy#2444)
Browse files Browse the repository at this point in the history
This PR introduces Grpc::GoogleAsyncClientImpl, a Google C++ client library implementation of the
Grpc::AsyncClient implementation. This is a MVP implementation, deliberately kept simple for review.
As noted in various TODO comments, there's a bunch of optimizations we should pursue once we are
convinced of the initial correctness aspect of this (reducing thread creation/teardown, avoid
copies).

Also made some small change to the onReceiveTrailingMetadata() semantics to match Envoy gRPC and
Google gRPC client semantics; we now always deliver trailing metadata prior to onRemoteClose().

Testing: Added ClientType::GoogleGrpc to existing gRPC client integration tests.
Risk Level: Low (no users of the Google gRPC implementation yet).

Signed-off-by: Harvey Tuch <[email protected]>
  • Loading branch information
htuch authored Jan 29, 2018
1 parent 899ed8a commit 3ed4e70
Show file tree
Hide file tree
Showing 18 changed files with 821 additions and 162 deletions.
2 changes: 2 additions & 0 deletions RAW_RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,7 @@ final version.
* Added `gateway-error` retry-on policy.
* Added support for building envoy with exported symbols
This change allows scripts loaded with the lua filter to load shared object libraries such as those installed via luarocks.
* The Google gRPC C++ library client is now supported as specified in the :ref:`gRPC services
overview <arch_overview_grpc_services>` and :ref:`GrpcService <envoy_api_msg_GrpcService>`.
* Added support for more granular weighted cluster routing by allowing the total weight to be specified in configuration.
* Added support for custom request/response headers with mixed static and dynamic values.
6 changes: 4 additions & 2 deletions include/envoy/grpc/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ class AsyncStreamCallbacks {
virtual void onCreateInitialMetadata(Http::HeaderMap& metadata) PURE;

/**
* Called when initial metadata is recevied.
* Called when initial metadata is received. This will be called with empty metadata on a
* trailers-only response, followed by onReceiveTrailingMetadata() with the trailing metadata.
* @param metadata initial metadata reference.
*/
virtual void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) PURE;
Expand All @@ -141,7 +142,8 @@ class AsyncStreamCallbacks {
virtual void onReceiveMessageUntyped(ProtobufTypes::MessagePtr&& message) PURE;

/**
* Called when trailing metadata is recevied.
* Called when trailing metadata is recevied. This will also be called on non-Ok grpc-status
* stream termination.
* @param metadata trailing metadata reference.
*/
virtual void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) PURE;
Expand Down
3 changes: 3 additions & 0 deletions include/envoy/grpc/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class Status {
// The RPC does not have required credentials for the RPC to succeed.
Unauthenticated = 16,

// Maximum value of valid status codes.
MaximumValid = Unauthenticated,

// This is a non-GRPC error code, indicating the status code in gRPC headers
// was invalid.
InvalidCode = -1,
Expand Down
3 changes: 2 additions & 1 deletion source/common/common/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ namespace Logger {
FUNCTION(runtime) \
FUNCTION(testing) \
FUNCTION(tracing) \
FUNCTION(upstream)
FUNCTION(upstream) \
FUNCTION(grpc)

enum class Id {
ALL_LOGGER_IDS(GENERATE_ENUM)
Expand Down
18 changes: 17 additions & 1 deletion source/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
"envoy_select_google_grpc",
)

envoy_package()
Expand Down Expand Up @@ -31,7 +32,7 @@ envoy_cc_library(
"//include/envoy/singleton:manager_interface",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
],
] + envoy_select_google_grpc([":google_async_client_lib"]),
)

envoy_cc_library(
Expand All @@ -41,6 +42,7 @@ envoy_cc_library(
deps = [
"//include/envoy/buffer:buffer_interface",
"//source/common/buffer:buffer_lib",
"//source/common/common:logger_lib",
],
)

Expand Down Expand Up @@ -70,6 +72,20 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "google_async_client_lib",
srcs = ["google_async_client_impl.cc"],
hdrs = ["google_async_client_impl.h"],
external_deps = ["grpc"],
deps = [
"//include/envoy/grpc:async_client_interface",
"//source/common/common:empty_string",
"//source/common/common:linked_object",
"//source/common/common:thread_lib",
"//source/common/tracing:http_tracer_lib",
],
)

envoy_cc_library(
name = "http1_bridge_filter_lib",
srcs = ["http1_bridge_filter.cc"],
Expand Down
41 changes: 21 additions & 20 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ AsyncRequest* AsyncClientImpl::send(const Protobuf::MethodDescriptor& service_me
AsyncStream* AsyncClientImpl::start(const Protobuf::MethodDescriptor& service_method,
AsyncStreamCallbacks& callbacks) {
const Optional<std::chrono::milliseconds> no_timeout;
std::unique_ptr<AsyncStreamImpl> grpc_stream{
new AsyncStreamImpl(*this, service_method, callbacks, no_timeout)};
auto grpc_stream =
std::make_unique<AsyncStreamImpl>(*this, service_method, callbacks, no_timeout);

grpc_stream->initialize(false);
if (grpc_stream->hasResetStream()) {
Expand Down Expand Up @@ -70,22 +70,26 @@ void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
return;
}

// TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
// https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
headers_message_ = Common::prepareHeaders(
parent_.remote_cluster_name_, service_method_.service()->full_name(), service_method_.name());
callbacks_.onCreateInitialMetadata(headers_message_->headers());
stream_->sendHeaders(headers_message_->headers(), false);
}

// TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
// https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
void AsyncStreamImpl::onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) {
const auto http_response_status = Http::Utility::getResponseStatus(*headers);
const auto grpc_status = Common::getGrpcStatus(*headers);
const std::string grpc_message = Common::getGrpcMessage(*headers);
callbacks_.onReceiveInitialMetadata(std::move(headers));
callbacks_.onReceiveInitialMetadata(end_stream ? std::make_unique<Http::HeaderMapImpl>()
: std::move(headers));
if (http_response_status != enumToInt(Http::Code::OK)) {
// https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that
// grpc-status be used if available.
if (end_stream && grpc_status.valid()) {
trailerResponse(grpc_status, grpc_message);
onTrailers(std::move(headers));
return;
}
// Technically this should be
Expand All @@ -96,7 +100,7 @@ void AsyncStreamImpl::onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) {
return;
}
if (end_stream) {
trailerResponse(grpc_status, grpc_message);
onTrailers(std::move(headers));
}
}

Expand All @@ -123,32 +127,29 @@ void AsyncStreamImpl::onData(Buffer::Instance& data, bool end_stream) {

if (end_stream) {
Http::HeaderMapPtr empty_trailers = std::make_unique<Http::HeaderMapImpl>();
callbacks_.onReceiveTrailingMetadata(std::move(empty_trailers));
streamError(Status::GrpcStatus::Unknown);
}
}

// TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
// https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
void AsyncStreamImpl::onTrailers(Http::HeaderMapPtr&& trailers) {
const auto grpc_status = Common::getGrpcStatus(*trailers);
auto grpc_status = Common::getGrpcStatus(*trailers);
const std::string grpc_message = Common::getGrpcMessage(*trailers);
callbacks_.onReceiveTrailingMetadata(std::move(trailers));
trailerResponse(grpc_status, grpc_message);
}

void AsyncStreamImpl::trailerResponse(Optional<Status::GrpcStatus> grpc_status,
const std::string& grpc_message) {
if (!grpc_status.valid()) {
streamError(Status::GrpcStatus::Unknown);
return;
grpc_status.value(Status::GrpcStatus::Unknown);
}
if (grpc_status.value() != Status::GrpcStatus::Ok) {
streamError(grpc_status.value(), grpc_message);
return;
}
callbacks_.onRemoteClose(Status::GrpcStatus::Ok, EMPTY_STRING);
callbacks_.onRemoteClose(grpc_status.value(), grpc_message);
cleanup();
}

void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::string& message) {
callbacks_.onReceiveTrailingMetadata(std::make_unique<Http::HeaderMapImpl>());
callbacks_.onRemoteClose(grpc_status, message);
resetStream();
}

void AsyncStreamImpl::onReset() {
if (http_reset_) {
return;
Expand Down
6 changes: 1 addition & 5 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ class AsyncStreamImpl : public AsyncStream,
bool hasResetStream() const { return http_reset_; }

private:
void streamError(Status::GrpcStatus grpc_status, const std::string& message) {
callbacks_.onRemoteClose(grpc_status, message);
resetStream();
}

void streamError(Status::GrpcStatus grpc_status, const std::string& message);
void streamError(Status::GrpcStatus grpc_status) { streamError(grpc_status, EMPTY_STRING); }

void cleanup();
Expand Down
35 changes: 30 additions & 5 deletions source/common/grpc/async_client_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

#include "common/grpc/async_client_impl.h"

#ifdef ENVOY_GOOGLE_GRPC
#include "common/grpc/google_async_client_impl.h"
#endif

namespace Envoy {
namespace Grpc {

Expand All @@ -18,22 +22,43 @@ AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
}
}

AsyncClientManagerImpl::AsyncClientManagerImpl(Upstream::ClusterManager& cm,
ThreadLocal::Instance& tls)
: cm_(cm), tls_(tls) {}

AsyncClientPtr AsyncClientFactoryImpl::create() {
return std::make_unique<AsyncClientImpl>(cm_, cluster_name_);
}

AsyncClientManagerImpl::AsyncClientManagerImpl(Upstream::ClusterManager& cm,
ThreadLocal::Instance& /*tls*/)
: cm_(cm) {}
GoogleAsyncClientFactoryImpl::GoogleAsyncClientFactoryImpl(
ThreadLocal::Instance& tls, Stats::Scope& scope,
const envoy::api::v2::GrpcService::GoogleGrpc& config)
: tls_(tls), scope_(scope.createScope(fmt::format("grpc.{}.", config.stat_prefix()))),
config_(config) {
#ifndef ENVOY_GOOGLE_GRPC
UNREFERENCED_PARAMETER(tls_);
UNREFERENCED_PARAMETER(scope_);
UNREFERENCED_PARAMETER(config_);
throw EnvoyException("Google C++ gRPC client is not linked");
#endif
}

AsyncClientPtr GoogleAsyncClientFactoryImpl::create() {
#ifdef ENVOY_GOOGLE_GRPC
return std::make_unique<GoogleAsyncClientImpl>(tls_.dispatcher(), *scope_, config_);
#else
return nullptr;
#endif
}

AsyncClientFactoryPtr
AsyncClientManagerImpl::factoryForGrpcService(const envoy::api::v2::GrpcService& grpc_service,
Stats::Scope& /*scope*/) {
Stats::Scope& scope) {
switch (grpc_service.target_specifier_case()) {
case envoy::api::v2::GrpcService::kEnvoyGrpc:
return std::make_unique<AsyncClientFactoryImpl>(cm_, grpc_service.envoy_grpc().cluster_name());
case envoy::api::v2::GrpcService::kGoogleGrpc:
throw EnvoyException("Google C++ gRPC client is not implemented yet");
return std::make_unique<GoogleAsyncClientFactoryImpl>(tls_, scope, grpc_service.google_grpc());
default:
NOT_REACHED;
}
Expand Down
14 changes: 14 additions & 0 deletions source/common/grpc/async_client_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ class AsyncClientFactoryImpl : public AsyncClientFactory {
const std::string cluster_name_;
};

class GoogleAsyncClientFactoryImpl : public AsyncClientFactory {
public:
GoogleAsyncClientFactoryImpl(ThreadLocal::Instance& tls, Stats::Scope& scope,
const envoy::api::v2::GrpcService::GoogleGrpc& config);

AsyncClientPtr create() override;

private:
ThreadLocal::Instance& tls_;
Stats::ScopePtr scope_;
const envoy::api::v2::GrpcService::GoogleGrpc config_;
};

class AsyncClientManagerImpl : public AsyncClientManager {
public:
AsyncClientManagerImpl(Upstream::ClusterManager& cm, ThreadLocal::Instance& tls);
Expand All @@ -29,6 +42,7 @@ class AsyncClientManagerImpl : public AsyncClientManager {

private:
Upstream::ClusterManager& cm_;
ThreadLocal::Instance& tls_;
};

} // namespace Grpc
Expand Down
Loading

0 comments on commit 3ed4e70

Please sign in to comment.