Skip to content

Commit

Permalink
Fix #2068 (#2080)
Browse files Browse the repository at this point in the history
* Fix #2068

* Add unit test
  • Loading branch information
yhirose authored Feb 21, 2025
1 parent ebe7efa commit 71ba7e7
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
10 changes: 10 additions & 0 deletions httplib.h
Original file line number Diff line number Diff line change
Expand Up @@ -7333,6 +7333,16 @@ inline ClientImpl::ClientImpl(const std::string &host, int port,
client_cert_path_(client_cert_path), client_key_path_(client_key_path) {}

inline ClientImpl::~ClientImpl() {
// Wait until all the requests in flight are handled.
size_t retry_count = 10;
while (retry_count-- > 0) {
{
std::lock_guard<std::mutex> guard(socket_mutex_);
if (socket_requests_in_flight_ == 0) { break; }
}
std::this_thread::sleep_for(std::chrono::milliseconds{1});
}

std::lock_guard<std::mutex> guard(socket_mutex_);
shutdown_socket(socket_);
close_socket(socket_);
Expand Down
83 changes: 83 additions & 0 deletions test/test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8351,3 +8351,86 @@ TEST(MaxTimeoutTest, ContentStreamSSL) {
max_timeout_test(svr, cli, timeout, threshold);
}
#endif

class EventDispatcher {
public:
EventDispatcher() {}

void wait_event(DataSink *sink) {
unique_lock<mutex> lk(m_);
int id = id_;
cv_.wait(lk, [&] { return cid_ == id; });
sink->write(message_.data(), message_.size());
}

void send_event(const string &message) {
lock_guard<mutex> lk(m_);
cid_ = id_++;
message_ = message;
cv_.notify_all();
}

private:
mutex m_;
condition_variable cv_;
atomic_int id_{0};
atomic_int cid_{-1};
string message_;
};

TEST(ClientInThreadTest, Issue2068) {
EventDispatcher ed;

Server svr;
svr.Get("/event1", [&](const Request & /*req*/, Response &res) {
res.set_chunked_content_provider("text/event-stream",
[&](size_t /*offset*/, DataSink &sink) {
ed.wait_event(&sink);
return true;
});
});

auto listen_thread = std::thread([&svr]() { svr.listen(HOST, PORT); });

svr.wait_until_ready();

thread event_thread([&] {
int id = 0;
while (svr.is_running()) {
this_thread::sleep_for(chrono::milliseconds(500));

std::stringstream ss;
ss << "data: " << id << "\n\n";
ed.send_event(ss.str());
id++;
}
});

auto se = detail::scope_exit([&] {
svr.stop();

listen_thread.join();
event_thread.join();

ASSERT_FALSE(svr.is_running());
});

{
auto client = detail::make_unique<Client>(HOST, PORT);
client->set_read_timeout(std::chrono::minutes(10));

std::atomic<bool> stop{false};

std::thread t([&] {
client->Get("/event1",
[&](const char *, size_t) -> bool { return !stop; });
});

std::this_thread::sleep_for(std::chrono::seconds(2));
stop = true;
client->stop();
client.reset();

t.join();
}
}

0 comments on commit 71ba7e7

Please sign in to comment.