Skip to content

Commit

Permalink
Support trpc protocol (aeraki-mesh#123)
Browse files Browse the repository at this point in the history
* support trpc protocol

Signed-off-by: Huabing Zhao <[email protected]>

remove files

Signed-off-by: huabing zhao <[email protected]>

* fix checkout

Signed-off-by: huabing zhao <[email protected]>

---------

Signed-off-by: huabing zhao <[email protected]>
Signed-off-by: CTakanashiRikka <[email protected]>
  • Loading branch information
zhaohuabing authored and Chenrujie-85 committed Sep 20, 2024
1 parent 1810b9d commit c755d1a
Show file tree
Hide file tree
Showing 18 changed files with 1,777 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/buildci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
run: |
sudo rm -rf /home/ubuntu/.cache/bazel
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: install dependency
run: |
./bazel/setup_clang.sh /home/ubuntu/clang+llvm-10.0.0-linux-gnu
Expand Down
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ envoy_cc_binary(
"//src/application_protocols/dubbo:config",
"//src/application_protocols/thrift:config",
"//src/application_protocols/brpc:config",
"//src/application_protocols/trpc:config",
"@io_istio_proxy//extensions/access_log_policy:access_log_policy_lib",
"@io_istio_proxy//extensions/metadata_exchange:metadata_exchange_lib",
"@io_istio_proxy//extensions/stackdriver:stackdriver_plugin",
Expand Down
12 changes: 6 additions & 6 deletions clang.bazelrc
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# Generated file, do not edit. If you want to disable clang, just delete this file.
build:clang --action_env='PATH=/root/clang+llvm-10.0.0-linux-gnu/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/root/clang+llvm-10.0.0-linux-gnu/bin'
build:clang --action_env='PATH=/home/ubuntu/clang+llvm-10.0.0-linux-gnu/bin:/home/linuxbrew/.linuxbrew/bin:/home/linuxbrew/.linuxbrew/sbin:/home/ubuntu/clang+llvm-14/bin:/usr/local/go/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/usr/local/go/bin:/home/ubuntu/go/bin/'
build:clang --action_env=CC=clang
build:clang --action_env=CXX=clang++
build:clang --action_env='LLVM_CONFIG=/root/clang+llvm-10.0.0-linux-gnu//bin/llvm-config'
build:clang --repo_env='LLVM_CONFIG=/root/clang+llvm-10.0.0-linux-gnu//bin/llvm-config'
build:clang --linkopt='-L/root/clang+llvm-10.0.0-linux-gnu/lib'
build:clang --linkopt='-Wl,-rpath,/root/clang+llvm-10.0.0-linux-gnu/lib'
build:clang --action_env='LLVM_CONFIG=/home/ubuntu/clang+llvm-10.0.0-linux-gnu/bin/llvm-config'
build:clang --repo_env='LLVM_CONFIG=/home/ubuntu/clang+llvm-10.0.0-linux-gnu/bin/llvm-config'
build:clang --linkopt='-L/home/ubuntu/clang+llvm-10.0.0-linux-gnu/lib'
build:clang --linkopt='-Wl,-rpath,/home/ubuntu/clang+llvm-10.0.0-linux-gnu/lib'

build:clang-asan --action_env=ENVOY_UBSAN_VPTR=1
build:clang-asan --copt=-fsanitize=vptr,function
build:clang-asan --linkopt=-fsanitize=vptr,function
build:clang-asan --linkopt='-L/root/clang+llvm-10.0.0-linux-gnu/lib/clang/10.0.0/lib/x86_64-unknown-linux-gnu'
build:clang-asan --linkopt='-L/home/ubuntu/clang+llvm-10.0.0-linux-gnu/lib/clang/10.0.0/lib/x86_64-unknown-linux-gnu'
build:clang-asan --linkopt=-l:libclang_rt.ubsan_standalone.a
build:clang-asan --linkopt=-l:libclang_rt.ubsan_standalone_cxx.a

86 changes: 86 additions & 0 deletions src/application_protocols/trpc/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
load(
"@envoy//bazel:envoy_build_system.bzl",
"envoy_cc_library",
)

# compile proto
load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

api_proto_package(
deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"],
)

envoy_cc_library(
name = "config",
visibility = ["//:__pkg__"],
repository = "@envoy",
srcs = ["config.cc"],
hdrs = ["config.h"],
deps = [
":pkg_cc_proto",
":codec_lib",
"@envoy//envoy/registry",
"//src/meta_protocol_proxy/codec:factory_lib",
],
)

envoy_cc_library(
name = "codec_lib",
repository = "@envoy",
srcs = ["trpc_codec.cc"],
hdrs = ["trpc_codec.h"],
deps = [
"@envoy//envoy/buffer:buffer_interface",
"@envoy//source/common/common:logger_lib",
"@envoy//source/common/buffer:buffer_lib",
"//src/meta_protocol_proxy/codec:codec_interface",
":codec_checker",
":protocol",
],
)

envoy_cc_library(
name = "codec_checker",
srcs = [
"codec_checker.cc",
],
hdrs = [
"codec_checker.h",
],
repository = "@envoy",
deps = [
":pkg_cc_proto",
":protocol",
"@envoy//envoy/buffer:buffer_interface",
"@envoy//envoy/server:filter_config_interface",
"@envoy//source/common/buffer:buffer_lib",
"@envoy//source/common/common:assert_lib",
"@envoy//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "metadata",
srcs = ["metadata.cc"],
hdrs = ["metadata.h"],
repository = "@envoy",
deps = [
":pkg_cc_proto",
"@envoy//source/common/http:header_map_lib",
],
)

envoy_cc_library(
name = "protocol",
srcs = ["protocol.cc"],
hdrs = ["protocol.h"],
repository = "@envoy",
deps = [
":metadata",
":pkg_cc_proto",
"@envoy//source/common/buffer:buffer_lib",
"@envoy//source/common/common:minimal_logger_lib",
"//src/meta_protocol_proxy/codec:codec_interface",
],
)

135 changes: 135 additions & 0 deletions src/application_protocols/trpc/codec_checker.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#include "src/application_protocols/trpc/codec_checker.h"

#include <cstdlib>
#include <string>
#include <utility>

#include "source/common/common/assert.h"

#include "src/application_protocols/trpc/protocol.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace MetaProtocolProxy {
namespace Trpc {

CodecChecker::DecodeStage CodecChecker::onData(Buffer::Instance& buffer) {
ENVOY_LOG(debug, "decoder onData: {}", buffer.length());

while (decode_stage_ != DecodeStage::kDecodeDone) {
auto state = handleState(buffer);
if (state == DecodeStage::kWaitForData) {
return DecodeStage::kWaitForData;
}
decode_stage_ = state;
}

ASSERT(decode_stage_ == DecodeStage::kDecodeDone);

reset();
ENVOY_LOG(debug, "trpc decoder: data length {}", buffer.length());
return DecodeStage::kDecodeDone;
}

CodecChecker::DecodeStage CodecChecker::handleState(Buffer::Instance& buffer) {
switch (decode_stage_) {
case DecodeStage::kDecodeFixedHeader:
return decodeFixedHeader(buffer);
case DecodeStage::kDecodeUnaryProtocolHeader:
return decodeUnaryProtocolHeader(buffer);
case DecodeStage::KDecodeStreamFrame:
return decodeStreamFrame(buffer);
case DecodeStage::kDecodePayload:
return decodePayload(buffer);
default:
PANIC("not reached");
}
return DecodeStage::kDecodeDone;
}

CodecChecker::DecodeStage CodecChecker::decodeFixedHeader(Buffer::Instance& buffer) {
ENVOY_LOG(debug, "decoder FixedHeader: {}", buffer.length());
if (buffer.length() < TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE) {
ENVOY_LOG(debug, "continue {}", buffer.length());
return DecodeStage::kWaitForData;
}

std::unique_ptr<TrpcFixedHeader> fixed_header = std::make_unique<TrpcFixedHeader>();
if (!fixed_header->decode(buffer, false)) {
throw EnvoyException(fmt::format("protocol invalid"));
}

total_size_ = fixed_header->data_frame_size;
protocol_header_size_ = fixed_header->pb_header_size;

auto frame_type = fixed_header->stream_frame_type;
call_backs_.onFixedHeaderDecoded(std::move(fixed_header));
if (frame_type == trpc::TrpcStreamFrameType::TRPC_UNARY) {
return DecodeStage::kDecodeUnaryProtocolHeader;
}

return DecodeStage::KDecodeStreamFrame;
}

CodecChecker::DecodeStage CodecChecker::decodeUnaryProtocolHeader(Buffer::Instance& buffer) {
ENVOY_LOG(debug, "decoder ProtocolHeader: {}", buffer.length());

// 数据不全,继续收包
if (buffer.length() < TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE + protocol_header_size_) {
ENVOY_LOG(debug, "continue {}", buffer.length());
return DecodeStage::kWaitForData;
}
std::string header_raw;
header_raw.reserve(protocol_header_size_);
header_raw.resize(protocol_header_size_);
buffer.copyOut(TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE, protocol_header_size_, &(header_raw[0]));

if (!call_backs_.onUnaryHeader(std::move(header_raw))) {
throw EnvoyException("parse header failed");
}

return DecodeStage::kDecodePayload;
}

CodecChecker::DecodeStage CodecChecker::decodeStreamFrame(Buffer::Instance& buffer) {
ENVOY_LOG(debug, "decoder stream frame {} ? {}", total_size_, buffer.length());

if (buffer.length() < total_size_) {
ENVOY_LOG(debug, "continue {}", buffer.length());
return DecodeStage::kWaitForData;
}

std::string header_raw;
auto frame_size = total_size_ - TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE;
header_raw.reserve(frame_size);
header_raw.resize(frame_size);
buffer.copyOut(TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE, frame_size, &(header_raw[0]));

if (!call_backs_.onStreamFrame(std::move(header_raw))) {
throw EnvoyException("parse header failed");
}

return DecodeStage::kDecodePayload;
}

CodecChecker::DecodeStage CodecChecker::decodePayload(Buffer::Instance& buffer) {
ENVOY_LOG(debug, "decoder payload {} ? {}", total_size_, buffer.length());

if (buffer.length() < total_size_) {
ENVOY_LOG(debug, "continue {}", buffer.length());
return DecodeStage::kWaitForData;
}
std::unique_ptr<Buffer::OwnedImpl> msg = std::make_unique<Buffer::OwnedImpl>();
msg->move(buffer, static_cast<uint64_t>(total_size_));

call_backs_.onCompleted(std::move(msg));

return DecodeStage::kDecodeDone;
}

} // namespace Trpc
} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Loading

0 comments on commit c755d1a

Please sign in to comment.