diff --git a/docs/cn/combo_channel.md b/docs/cn/combo_channel.md index 76c0b23c98..e11c79b46b 100644 --- a/docs/cn/combo_channel.md +++ b/docs/cn/combo_channel.md @@ -19,7 +19,11 @@ ParallelChannel (有时被称为“pchan”)同时访问其包含的sub channel 示例代码见[example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/)。 -任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。 +任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。 + +用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。 + +用户可以设置ParallelChannelOptions.success_limit来控制访问的最大成功次数,当成功的访问达到这个数目时,RPC会立刻结束。ParallelChannelOptions.fail_limit的优先级高于ParallelChannelOptions.success_limit,只有未设置fail_limit时,success_limit才会生效。 一个sub channel可多次加入同一个ParallelChannel。当你需要对同一个服务发起多次异步访问并等待它们完成的话,这很有用。 diff --git a/docs/en/combo_channel.md b/docs/en/combo_channel.md index c87dc5ca7a..686fad59c1 100644 --- a/docs/en/combo_channel.md +++ b/docs/en/combo_channel.md @@ -19,7 +19,11 @@ We need a better abstraction. If several channels are combined into a larger one Check [example/parallel_echo_c++](https://github.com/apache/brpc/tree/master/example/parallel_echo_c++/) for an example. -Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`, including `ParallelChannel` and other combo channels. Set `ParallelChannelOptions.fail_limit` to control maximum number of failures. When number of failed responses reaches the limit, the RPC is ended immediately rather than waiting for timeout. +Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`, including `ParallelChannel` and other combo channels. + +Set `ParallelChannelOptions.fail_limit` to control maximum number of failures. When number of failed responses reaches the limit, the RPC is ended immediately rather than waiting for timeout. + +Set `ParallelChannelOptions.sucess_limit` to control maximum number of successful responses. When number of successful responses reaches the limit, the RPC is ended immediately.`ParallelChannelOptions.fail_limit` has a higher priority than `ParallelChannelOptions.success_limit`. Success_limit will take effect only when fail_limit is not set. A sub channel can be added to the same `ParallelChannel` more than once, which is useful when you need to initiate multiple asynchronous RPC to the same service and wait for their completions. diff --git a/src/brpc/parallel_channel.cpp b/src/brpc/parallel_channel.cpp index c697221135..130712bfb9 100644 --- a/src/brpc/parallel_channel.cpp +++ b/src/brpc/parallel_channel.cpp @@ -24,14 +24,8 @@ #include "brpc/details/controller_private_accessor.h" #include "brpc/parallel_channel.h" - namespace brpc { -ParallelChannelOptions::ParallelChannelOptions() - : timeout_ms(500) - , fail_limit(-1) { -} - DECLARE_bool(usercode_in_pthread); // Not see difference when memory is cached. @@ -45,12 +39,15 @@ static __thread Memory tls_cached_pchan_mem = { 0, NULL }; class ParallelChannelDone : public google::protobuf::Closure { private: - ParallelChannelDone(int fail_limit, int ndone, int nchan, int memsize, + ParallelChannelDone(int fail_limit, int success_limit, + int ndone, int nchan, int memsize, Controller* cntl, google::protobuf::Closure* user_done) : _fail_limit(fail_limit) + , _success_limit(success_limit) , _ndone(ndone) , _nchan(nchan) , _memsize(memsize) + , _current_success(0) , _current_fail(0) , _current_done(0) , _cntl(cntl) @@ -59,15 +56,13 @@ class ParallelChannelDone : public google::protobuf::Closure { , _callmethod_pthread(0) { } - ~ParallelChannelDone() { } - public: class SubDone : public google::protobuf::Closure { public: SubDone() : shared_data(NULL) { } - ~SubDone() { + ~SubDone() override { // Can't delete request/response in ~SubCall because the // object is copyable. if (ap.flags & DELETE_REQUEST) { @@ -78,7 +73,7 @@ class ParallelChannelDone : public google::protobuf::Closure { } } - void Run() { + void Run() override { shared_data->OnSubDoneRun(this); } @@ -89,7 +84,8 @@ class ParallelChannelDone : public google::protobuf::Closure { }; static ParallelChannelDone* Create( - int fail_limit, int ndone, const SubCall* aps, int nchan, + int fail_limit, int success_limit, + int ndone, const SubCall* aps, int nchan, Controller* cntl, google::protobuf::Closure* user_done) { // We need to create the object in this way because _sub_done is // dynamically allocated. @@ -130,8 +126,8 @@ class ParallelChannelDone : public google::protobuf::Closure { return NULL; } #endif - ParallelChannelDone* d = new (mem) ParallelChannelDone( - fail_limit, ndone, nchan, memsize, cntl, user_done); + auto d = new (mem) ParallelChannelDone( + fail_limit, success_limit, ndone, nchan, memsize, cntl, user_done); // Apply client settings of _cntl to controllers of sub calls, except // timeout. If we let sub channel do their timeout separately, when @@ -183,7 +179,7 @@ class ParallelChannelDone : public google::protobuf::Closure { } } - void Run() { + void Run() override { const int ec = _cntl->ErrorCode(); if (ec == EPCHANFINISH) { // all sub calls finished. Clear the error and we'll set @@ -220,14 +216,25 @@ class ParallelChannelDone : public google::protobuf::Closure { if (fin != NULL) { // [ called from SubDone::Run() ] - // Count failed sub calls, if fail_limit is reached, cancel others. - if (fin->cntl.FailedInline() && - _current_fail.fetch_add(1, butil::memory_order_relaxed) + 1 - == _fail_limit) { + int error_code = fin->cntl.ErrorCode(); + // EPCHANFINISH is not an error of sub calls. + bool fail = 0 != error_code && EPCHANFINISH != error_code; + bool cancel = + // Count failed sub calls, if `fail_limit' is reached, cancel others. + (fail && _current_fail.fetch_add(1, butil::memory_order_relaxed) + 1 + == _fail_limit) || + // Count successful sub calls, if `success_limit' is reached, cancel others. + (0 == error_code && + _current_success.fetch_add(1, butil::memory_order_relaxed) + 1 + == _success_limit); + + if (cancel) { + // Only cancel once by `fail_limit' or `success_limit'. for (int i = 0; i < _ndone; ++i) { SubDone* sd = sub_done(i); if (fin != sd) { - bthread_id_error(sd->cntl.call_id(), ECANCELED); + bthread_id_error( + sd->cntl.call_id(), fail ? ECANCELED : EPCHANFINISH); } } } @@ -423,6 +430,7 @@ class ParallelChannelDone : public google::protobuf::Closure { private: int _fail_limit; + int _success_limit; int _ndone; int _nchan; #if defined(__clang__) @@ -430,6 +438,7 @@ class ParallelChannelDone : public google::protobuf::Closure { #else int _memsize; #endif + butil::atomic _current_success; butil::atomic _current_fail; butil::atomic _current_done; Controller* _cntl; @@ -602,6 +611,7 @@ void ParallelChannel::CallMethod( ParallelChannelDone* d = NULL; int ndone = nchan; int fail_limit = 1; + int success_limit = 1; DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64); if (cntl->FailedInline()) { @@ -655,9 +665,21 @@ void ParallelChannel::CallMethod( fail_limit = ndone; } } - - d = ParallelChannelDone::Create(fail_limit, ndone, aps, nchan, - cntl, done); + + // `success_limit' is only valid when `fail_limit' is not set. + if (_options.fail_limit >= 0 || _options.success_limit < 0) { + success_limit = ndone; + } else { + success_limit = _options.success_limit; + if (success_limit < 1) { + success_limit = 1; + } else if (success_limit > ndone) { + success_limit = ndone; + } + } + + d = ParallelChannelDone::Create( + fail_limit, success_limit, ndone, aps, nchan, cntl, done); if (NULL == d) { cntl->SetFailed(ENOMEM, "Fail to new ParallelChannelDone"); goto FAIL; diff --git a/src/brpc/parallel_channel.h b/src/brpc/parallel_channel.h index df85c9acce..84e5f342cb 100644 --- a/src/brpc/parallel_channel.h +++ b/src/brpc/parallel_channel.h @@ -112,7 +112,7 @@ class CallMapper : public SharedObject { } // Only callable by subclasses and butil::intrusive_ptr - virtual ~CallMapper() {} + ~CallMapper() override = default; }; // Clone req_base typed `Req'. @@ -140,12 +140,11 @@ class ResponseMerger : public SharedObject { FAIL_ALL }; - ResponseMerger() { } virtual Result Merge(google::protobuf::Message* response, const google::protobuf::Message* sub_response) = 0; protected: // Only callable by subclasses and butil::intrusive_ptr - virtual ~ResponseMerger() { } + ~ResponseMerger() override = default; }; struct ParallelChannelOptions { @@ -156,7 +155,7 @@ struct ParallelChannelOptions { // Overridable by Controller.set_timeout_ms(). // Default: 500 (milliseconds) // Maximum: 0x7fffffff (roughly 30 days) - int32_t timeout_ms; + int32_t timeout_ms{500}; // The RPC is considered to be successful if number of failed sub RPC // does not reach this limit. Even if the RPC is timedout or canceled, @@ -165,10 +164,14 @@ struct ParallelChannelOptions { // the timeout) when the limit is reached. // Default: number of sub channels, meaning that the RPC to ParallChannel // does not fail unless all sub RPC failed. - int fail_limit; + int fail_limit{-1}; - // Construct with default options. - ParallelChannelOptions(); + // The RPC is considered to be successful when number of successful sub + // RPC reach this limit. + // Default: number of sub channels, meaning that the RPC to ParallChannel + // does not return unless all sub RPC succeed. + // Note: `success_limit' is only valid when `fail_limit' is not set. + int success_limit{ -1}; }; // ParallelChannel(aka "pchan") accesses all sub channels simultaneously with @@ -185,8 +188,7 @@ struct ParallelChannelOptions { class ParallelChannel : public ChannelBase { friend class Controller; public: - ParallelChannel() { } - ~ParallelChannel(); + ~ParallelChannel() override; // Initialize ParallelChannel with `options'. // NOTE: Currently this function always returns 0. @@ -234,7 +236,7 @@ friend class Controller; google::protobuf::RpcController* controller, const google::protobuf::Message* request, google::protobuf::Message* response, - google::protobuf::Closure* done); + google::protobuf::Closure* done) override; // Number of sub channels. size_t channel_count() const { return _chans.size(); } @@ -245,10 +247,10 @@ friend class Controller; // Minimum weight of sub channels. // FIXME(gejun): be minimum of top(nchan-fail_limit) - int Weight(); + int Weight() override; // Put description into `os'. - void Describe(std::ostream& os, const DescribeOptions&) const; + void Describe(std::ostream& os, const DescribeOptions&) const override; public: struct SubChan { @@ -263,7 +265,7 @@ friend class Controller; protected: static void* RunDoneAndDestroy(void* arg); - int CheckHealth(); + int CheckHealth() override; ParallelChannelOptions _options; ChannelList _chans; diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 8814b0bcad..7b98896b41 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -569,6 +569,24 @@ class ChannelTest : public ::testing::Test{ } }; + class SuccessLimitCallMapper : public brpc::CallMapper { + public: + brpc::SubCall Map(int channel_index, + const google::protobuf::MethodDescriptor* method, + const google::protobuf::Message* req_base, + google::protobuf::Message* response) override { + auto req = brpc::Clone(req_base); + req->set_code(channel_index + 1/*non-zero*/); + if (_index++ > 0) { + req->set_sleep_us(5 * 1000); + } + return brpc::SubCall(method, req, response->New(), + brpc::DELETE_REQUEST | brpc::DELETE_RESPONSE); + } + private: + size_t _index{0}; + }; + class MergeNothing : public brpc::ResponseMerger { Result Merge(google::protobuf::Message* /*response*/, const google::protobuf::Message* /*sub_response*/) { @@ -826,7 +844,60 @@ class ChannelTest : public ::testing::Test{ } StopAndJoin(); } - + + void TestSuccessLimitParallel(bool single_server, bool async, bool short_connection) { + std::cout << " *** single=" << single_server + << " async=" << async + << " short=" << short_connection << std::endl; + + ASSERT_EQ(0, StartAccept(_ep)); + const size_t NCHANS = 8; + brpc::Channel subchans[NCHANS]; + brpc::ParallelChannel channel; + brpc::ParallelChannelOptions options; + // Only care about the first successful response. + options.success_limit = 1; + channel.Init(&options); + butil::intrusive_ptr fast_call_mapper(new SuccessLimitCallMapper); + for (size_t i = 0; i < NCHANS; ++i) { + SetUpChannel(&subchans[i], single_server, short_connection); + ASSERT_EQ(0, channel.AddChannel( + &subchans[i], brpc::DOESNT_OWN_CHANNEL, fast_call_mapper, NULL)); + } + brpc::Controller cntl; + test::EchoRequest req; + test::EchoResponse res; + req.set_message(__FUNCTION__); + req.set_code(23); + CallMethod(&channel, &cntl, &req, &res, async); + + EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText(); + EXPECT_EQ(NCHANS, (size_t)cntl.sub_count()); + for (int i = 0; i < cntl.sub_count(); ++i) { + EXPECT_TRUE(cntl.sub(i)) << "i=" << i; + if (0 == i) { + EXPECT_TRUE(!cntl.sub(i)->Failed()) << "i=" << i; + } else { + EXPECT_TRUE(cntl.sub(i)->Failed()) << "i=" << i; + EXPECT_EQ(brpc::EPCHANFINISH, cntl.sub(i)->ErrorCode()) << "i=" << i; + } + } + EXPECT_EQ("received " + std::string(__FUNCTION__), res.message()); + ASSERT_EQ(1, res.code_list_size()); + ASSERT_EQ((int)1, res.code_list(0)); + if (short_connection) { + // Sleep to let `_messenger' detect `Socket' being `SetFailed' + const int64_t start_time = butil::gettimeofday_us(); + while (_messenger.ConnectionCount() != 0) { + EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/); + bthread_usleep(1000); + } + } else { + EXPECT_GE(1ul, _messenger.ConnectionCount()); + } + StopAndJoin(); + } + struct CancelerArg { int64_t sleep_before_cancel_us; brpc::CallId cid; @@ -2382,7 +2453,7 @@ TEST_F(ChannelTest, success_parallel) { } TEST_F(ChannelTest, success_duplicated_parallel) { - for (int i = 0; i <= 1; ++i) { // Flag SingleServer + for (int i = 0; i <= 1; ++i) { // Flag SingleServer for (int j = 0; j <= 1; ++j) { // Flag Asynchronous for (int k = 0; k <=1; ++k) { // Flag ShortConnection TestSuccessDuplicatedParallel(i, j, k); @@ -2421,6 +2492,16 @@ TEST_F(ChannelTest, success_parallel2) { } } +TEST_F(ChannelTest, success_limit_parallel) { + for (int i = 0; i <= 1; ++i) { // Flag SingleServer + for (int j = 0; j <= 1; ++j) { // Flag Asynchronous + for (int k = 0; k <=1; ++k) { // Flag ShortConnection + TestSuccessLimitParallel(i, j, k); + } + } + } +} + TEST_F(ChannelTest, cancel_before_callmethod) { for (int i = 0; i <= 1; ++i) { // Flag SingleServer for (int j = 0; j <= 1; ++j) { // Flag Asynchronous