From 71ba7e7b1b328fe0de6cfbd3e94e5e0ddd4b4073 Mon Sep 17 00:00:00 2001 From: yhirose Date: Thu, 20 Feb 2025 23:45:21 -0500 Subject: [PATCH] Fix #2068 (#2080) * Fix #2068 * Add unit test --- httplib.h | 10 +++++++ test/test.cc | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/httplib.h b/httplib.h index 6296e2de82..60da33f34e 100644 --- a/httplib.h +++ b/httplib.h @@ -7333,6 +7333,16 @@ inline ClientImpl::ClientImpl(const std::string &host, int port, client_cert_path_(client_cert_path), client_key_path_(client_key_path) {} inline ClientImpl::~ClientImpl() { + // Wait until all the requests in flight are handled. + size_t retry_count = 10; + while (retry_count-- > 0) { + { + std::lock_guard guard(socket_mutex_); + if (socket_requests_in_flight_ == 0) { break; } + } + std::this_thread::sleep_for(std::chrono::milliseconds{1}); + } + std::lock_guard guard(socket_mutex_); shutdown_socket(socket_); close_socket(socket_); diff --git a/test/test.cc b/test/test.cc index 30d9f8c46a..04eb2ea70c 100644 --- a/test/test.cc +++ b/test/test.cc @@ -8351,3 +8351,86 @@ TEST(MaxTimeoutTest, ContentStreamSSL) { max_timeout_test(svr, cli, timeout, threshold); } #endif + +class EventDispatcher { +public: + EventDispatcher() {} + + void wait_event(DataSink *sink) { + unique_lock lk(m_); + int id = id_; + cv_.wait(lk, [&] { return cid_ == id; }); + sink->write(message_.data(), message_.size()); + } + + void send_event(const string &message) { + lock_guard lk(m_); + cid_ = id_++; + message_ = message; + cv_.notify_all(); + } + +private: + mutex m_; + condition_variable cv_; + atomic_int id_{0}; + atomic_int cid_{-1}; + string message_; +}; + +TEST(ClientInThreadTest, Issue2068) { + EventDispatcher ed; + + Server svr; + svr.Get("/event1", [&](const Request & /*req*/, Response &res) { + res.set_chunked_content_provider("text/event-stream", + [&](size_t /*offset*/, DataSink &sink) { + ed.wait_event(&sink); + return true; + }); + }); + + auto listen_thread = std::thread([&svr]() { svr.listen(HOST, PORT); }); + + svr.wait_until_ready(); + + thread event_thread([&] { + int id = 0; + while (svr.is_running()) { + this_thread::sleep_for(chrono::milliseconds(500)); + + std::stringstream ss; + ss << "data: " << id << "\n\n"; + ed.send_event(ss.str()); + id++; + } + }); + + auto se = detail::scope_exit([&] { + svr.stop(); + + listen_thread.join(); + event_thread.join(); + + ASSERT_FALSE(svr.is_running()); + }); + + { + auto client = detail::make_unique(HOST, PORT); + client->set_read_timeout(std::chrono::minutes(10)); + + std::atomic stop{false}; + + std::thread t([&] { + client->Get("/event1", + [&](const char *, size_t) -> bool { return !stop; }); + }); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + stop = true; + client->stop(); + client.reset(); + + t.join(); + } +}