Skip to content

Commit

Permalink
优化: 延迟对 SOCKET 的关闭; 优化: 复用连接扫描全部匹配连接;
Browse files Browse the repository at this point in the history
  • Loading branch information
terrywh committed Nov 18, 2022
1 parent d9f7eaa commit ec6fbfc
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 61 deletions.
2 changes: 1 addition & 1 deletion include/xbond/math/rand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace detail {
std::random_device& device();
} // namespace detail

template <class Int>
template <class Int, typename = std::is_integral<Int>>
auto integer(Int begin, Int end) -> typename std::decay<Int>::type {
std::uniform_int_distribution< typename std::decay<Int>::type > dist { begin, end };
return dist( detail::device() );
Expand Down
29 changes: 15 additions & 14 deletions include/xbond/net/http/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,29 @@ class client {
manager_->close();
}
// 执行请求
template <class RequestBody, class ResponseBody, class ExecuteHandler>
template <class RequestBody, class ResponseBody>
void execute(const address& addr, boost::beast::http::request<RequestBody>& req,
boost::beast::http::response<ResponseBody>& rsp, ExecuteHandler&& handler) {

boost::beast::http::response_parser<ResponseBody> parser { std::move(rsp) };
parser.header_limit(option_.header_limit);
parser.body_limit(option_.body_limit);

execute(addr, req, parser, std::forward<ExecuteHandler>(handler));
rsp = parser.release();
boost::beast::http::response<ResponseBody>& rsp, xbond::coroutine_handler& ch) {
boost::asio::async_compose<xbond::coroutine_handler&, void(boost::system::error_code)>(
detail::client_execute<decltype(ch.executor()), RequestBody, ResponseBody>(
manager_,
std::make_shared<detail::client_execute_context<decltype(ch.executor()), RequestBody, ResponseBody, BufferSize>>(
ch.executor(), addr, option_.timeout, req, rsp // 使用当前协程对应的 STRAND 统一执行上下文
)
), ch, ch.executor()
);
}

// 执行请求
template <class RequestBody, class ResponseBody, class ExecuteHandler>
void execute(const address& addr, boost::beast::http::request<RequestBody>& req,
boost::beast::http::response_parser<ResponseBody>& rsp, ExecuteHandler&& handler) {
boost::beast::http::response<ResponseBody>& rsp, ExecuteHandler&& handler) {
boost::asio::async_compose<ExecuteHandler, void(boost::system::error_code)>(
detail::client_execute<RequestBody, ResponseBody>(
detail::client_execute<boost::asio::io_context, RequestBody, ResponseBody>(
manager_,
std::make_shared<detail::client_execute_context<RequestBody, ResponseBody, BufferSize>>(
std::make_shared<detail::client_execute_context<boost::asio::io_context, RequestBody, ResponseBody, BufferSize>>(
io_, addr, option_.timeout, req, rsp
)
), handler, io_
), handler
);
}
};
Expand Down
16 changes: 8 additions & 8 deletions include/xbond/net/http/detail/client_execute.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ namespace net {
namespace http {
namespace detail {

template <class RequestBody, class ResponseBody>
template <class Executor, class RequestBody, class ResponseBody>
class client_execute: public boost::asio::coroutine {

public:
using context_type = client_execute_context<RequestBody, ResponseBody>;
using context_type = client_execute_context<Executor, RequestBody, ResponseBody>;

private:
std::shared_ptr<detail::client_socket_manager> manager_;
Expand Down Expand Up @@ -44,14 +44,14 @@ class client_execute: public boost::asio::coroutine {
BOOST_ASIO_CORO_YIELD boost::beast::http::async_read(*context_->stream, context_->buffer, context_->response, std::move(self));
DONE:
context_->stream->expires_never();
if (error) { // 发生异常(本身可能就是超时)
self.complete(error);
context_->error = error == boost::asio::error::operation_aborted ? boost::beast::error::timeout : error;

if (error || context_->response.need_eof()) { // 发生异常(本身可能就是超时)
BOOST_ASIO_CORO_YIELD manager_->closing(context_, std::move(self));
} else {
if (!context_->response.need_eof()) { // 连接回收复用
BOOST_ASIO_CORO_YIELD manager_->release(context_, std::move(self));
}
self.complete({});
BOOST_ASIO_CORO_YIELD manager_->release(context_, std::move(self));
}
self.complete(context_->error);
}}
};

Expand Down
58 changes: 40 additions & 18 deletions include/xbond/net/http/detail/client_socket_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,30 @@ class client;

namespace detail {
// 请求执行上下文
template <class RequestBody, class ResponseBody, std::size_t BufferSize = 16 * 1024>
template <class Executor, class RequestBody, class ResponseBody, std::size_t BufferSize = 16 * 1024>
struct client_execute_context {
boost::asio::strand<boost::asio::io_context::executor_type> strand;
Executor& executor;
net::address address;
std::chrono::steady_clock::duration timeout;
boost::beast::http::request<RequestBody>& request;
boost::beast::http::response_parser<ResponseBody>& response;
boost::beast::http::response<ResponseBody>& response;
boost::beast::flat_static_buffer<BufferSize> buffer;
std::unique_ptr<boost::beast::tcp_stream> stream;
boost::system::error_code error;

client_execute_context(boost::asio::io_context& io, net::address addr,
client_execute_context(Executor& e, net::address addr,
std::chrono::steady_clock::duration to,
boost::beast::http::request<RequestBody>& req,
boost::beast::http::response_parser<ResponseBody>& rsp)
: strand(boost::asio::make_strand(io))
boost::beast::http::response<ResponseBody>& rsp)
: executor(e)
, address(addr), timeout(to)
, request(req), response(rsp) {

}

// ~client_execute_context() {
// std::cout << "X:" << this << std::endl;
// }
};
// 连接管理器,支持简单的复用机制
class client_socket_manager : public std::enable_shared_from_this<client_socket_manager> {
Expand All @@ -62,38 +67,55 @@ class client_socket_manager : public std::enable_shared_from_this<client_socket_
void start();
void close();

template <class RequestBody, class ResponseBody, std::size_t BufferSize, class AcquireHandler>
void acquire(std::shared_ptr<client_execute_context<RequestBody, ResponseBody, BufferSize>> context, AcquireHandler&& handler) {
template <class Executor, class RequestBody, class ResponseBody, std::size_t BufferSize, class AcquireHandler>
void acquire(std::shared_ptr<client_execute_context<Executor, RequestBody, ResponseBody, BufferSize>> context, AcquireHandler&& handler) {
boost::asio::post(strand_, [this, context, handler = std::move(handler), self = shared_from_this()] () mutable {
// 找到还在有效期内的同目标地址的链接
if (auto i = cache_.find(context->address); i != cache_.end()) {
auto r = cache_.equal_range(context->address);
for (auto i = r.first; i != r.second; ) {
if (std::chrono::steady_clock::now() < i->second.expire) {
context->stream = std::make_unique<boost::beast::tcp_stream>( std::move(*i->second.socket) );
cache_.erase(i);
boost::asio::post(context->strand, std::move(handler));
boost::asio::post(context->executor, std::move(handler));
return;
}
cache_.erase(i); // 无效连接移除(然后重新分配)
i = cache_.erase(i); // 无效连接移除(然后重新分配)
}
// 建立新的连接:使用 context::strand 保持其超时
context->stream = std::make_unique<boost::beast::tcp_stream>(context->strand);
boost::asio::async_compose<AcquireHandler, void(boost::system::error_code)>(
// 建立新的连接(注意回调回到 context->executor 上下文)
auto on_connect = [context, handler = std::move(handler)] (boost::system::error_code error) mutable {
boost::asio::post(context->executor, [handler = std::move(handler), error] () mutable {
handler(error);
});
};
context->stream = std::make_unique<boost::beast::tcp_stream>(io_);
boost::asio::async_compose<decltype(on_connect), void(boost::system::error_code)>(
net::detail::socket_connect<boost::asio::ip::tcp>(context->stream->socket(), context->address, resolver_),
handler, *context->stream, resolver_, context->strand
on_connect, *context->stream, resolver_, context->executor
);
});
}
// 释放链接(保存已备复用)
// 注意:释放后该 stream 不可用
template <class RequestBody, class ResponseBody, std::size_t BufferSize, class AcquireHandler>
void release(std::shared_ptr<client_execute_context<RequestBody, ResponseBody, BufferSize>> context, AcquireHandler&& handler) {
template <class Executor, class RequestBody, class ResponseBody, std::size_t BufferSize, class ReleaseHandler>
void release(std::shared_ptr<client_execute_context<Executor, RequestBody, ResponseBody, BufferSize>> context, ReleaseHandler&& handler) {
boost::asio::post(strand_, [this, context, handler = std::move(handler), self = shared_from_this()] () mutable {
auto now = std::chrono::steady_clock::now();
cache_.emplace(std::make_pair(context->address, cached_socket{
std::make_unique<boost::asio::ip::tcp::socket>(std::move(context->stream->release_socket())), now + ttl_}));
boost::asio::post(context->strand, std::move(handler));
boost::asio::post(context->executor, std::move(handler));
});
}
// 延迟关闭
template <class Executor, class RequestBody, class ResponseBody, std::size_t BufferSize, class ClosingHandler>
void closing(std::shared_ptr<client_execute_context<Executor, RequestBody, ResponseBody, BufferSize>> context, ClosingHandler&& handler) {
boost::asio::post(strand_, [this, context, handler = std::move(handler), self = shared_from_this()] () mutable {
auto now = std::chrono::steady_clock::now();
cache_.emplace(std::make_pair(context->address, cached_socket{
std::make_unique<boost::asio::ip::tcp::socket>(std::move(context->stream->release_socket())), now}));
boost::asio::post(context->executor, std::move(handler));
});
}

template <std::size_t BufferSize>
friend class xbond::net::http::client;
};
Expand Down
42 changes: 22 additions & 20 deletions test/net_http_client.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <xbond/net/http/client.hpp>
#include <xbond/thread_pool.hpp>
#include <xbond/time/sleep_for.hpp>
#include <xbond/math/rand.hpp>
#include <iostream>
using namespace xbond;

Expand All @@ -9,44 +10,45 @@ using namespace xbond;
int net_http_client_test(int argc, char* argv[]) {
LOGGER() << __func__ << "\n";
boost::asio::io_context io;
net::http::client<>::option opt;
opt.timeout = std::chrono::milliseconds(1000);
net::http::client cli{io, opt};
net::address addr1 {"127.0.0.1",3000};
net::address addr2 {"www.qq.com",80};

std::srand(std::time(nullptr));
coroutine::start(io, [&io] (coroutine_handler& ch) {
net::http::client cli{io};

boost::beast::http::request<boost::beast::http::empty_body> req {boost::beast::http::verb::get, "/", 11};
req.set(boost::beast::http::field::host, "www.qq.com");
req.keep_alive(true);

std::cout << req << std::endl;
boost::beast::http::response<boost::beast::http::string_body> rsp {};
boost::system::error_code error;
cli.execute(net::address{"www.qq.com",80}, req, rsp, ch[error]);
std::cout << error << ": " << error.message() << std::endl;
boost::beast::http::request<boost::beast::http::empty_body> req {boost::beast::http::verb::get, "/", 11};
req.set(boost::beast::http::field::host, "www.qq.com");
req.keep_alive(true);

std::cout << req << std::endl;
boost::beast::http::response<boost::beast::http::string_body> rsp {};
boost::system::error_code error;
cli.execute(addr1, req, rsp, [&rsp] (boost::system::error_code error) {
std::cout << error << " / " << error.message() << std::endl;
std::cout << rsp << std::endl;
rsp.body().clear();
});

for (int i=0;i<2;++i) {
coroutine::start(io, [&io, i] (coroutine_handler& ch) {
net::http::client cli{io};
for (int j=0;j<100;++j) {
time::sleep_for(std::chrono::milliseconds(i * 2500 + 5000), ch);
for (int i=0;i<64;++i) {
coroutine::start(io, [&cli, &addr1, &addr2, i] (coroutine_handler& ch) {
for (int j=0;j<1000;++j) {
time::sleep_for(std::chrono::milliseconds(math::rand::integer(0, 50)), ch);
boost::beast::http::request<boost::beast::http::empty_body> req {boost::beast::http::verb::get, "/", 11};
req.set(boost::beast::http::field::host, "www.qq.com");
req.keep_alive(true);
boost::beast::http::response<boost::beast::http::string_body> rsp {};
boost::system::error_code error;
cli.execute(net::address{"www.qq.com",80}, req, rsp, ch[error]);
std::cout << "." << std::flush;
cli.execute(addr1, req, rsp, ch[error]);
rsp.body().clear();
}
});
}
thread_pool pool(4, [&io] () {
thread_pool pool(2, [&io] () {
io.run();
});
pool.wait();
std::cout << "\n\t\t\tdone" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}

0 comments on commit ec6fbfc

Please sign in to comment.