Skip to content

Commit

Permalink
dht, dhtrunner: add shutdown stop argument
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed May 31, 2021
1 parent ec359d5 commit a178a8f
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 28 deletions.
2 changes: 1 addition & 1 deletion include/opendht/dht.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {
/**
* Performs final operations before quitting.
*/
void shutdown(ShutdownCallback cb) override;
void shutdown(ShutdownCallback cb, bool stop = false) override;

/**
* Returns true if the node is running (have access to an open socket).
Expand Down
4 changes: 3 additions & 1 deletion include/opendht/dht_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ class OPENDHT_PUBLIC DhtInterface {

/**
* Performs final operations before quitting.
* stop: if true, cancel ongoing operations and call their 'done'
* callbacks synchronously.
*/
virtual void shutdown(ShutdownCallback cb) = 0;
virtual void shutdown(ShutdownCallback cb, bool stop = false) = 0;

/**
* Returns true if the node is running (have access to an open socket).
Expand Down
2 changes: 1 addition & 1 deletion include/opendht/dht_proxy_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
/**
* Performs final operations before quitting.
*/
void shutdown(ShutdownCallback cb) override;
void shutdown(ShutdownCallback cb, bool) override;

/**
* Returns true if the node is running (have access to an open socket).
Expand Down
2 changes: 1 addition & 1 deletion include/opendht/dhtrunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class OPENDHT_PUBLIC DhtRunner {
/**
* Gracefuly disconnect from network.
*/
void shutdown(ShutdownCallback cb = {});
void shutdown(ShutdownCallback cb = {}, bool stop = false);

/**
* Quit and wait for all threads to terminate.
Expand Down
4 changes: 2 additions & 2 deletions include/opendht/securedht.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ class OPENDHT_PUBLIC SecureDht final : public DhtInterface {
/**
* SecureDht to Dht proxy
*/
void shutdown(ShutdownCallback cb) override {
dht_->shutdown(cb);
void shutdown(ShutdownCallback cb, bool stop = false) override {
dht_->shutdown(cb, stop);
}
void dumpTables() const override {
dht_->dumpTables();
Expand Down
18 changes: 17 additions & 1 deletion src/dht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,27 @@ Dht::Kad::getStatus(time_point now) const
}

void
Dht::shutdown(ShutdownCallback cb)
Dht::shutdown(ShutdownCallback cb, bool stop)
{
if (not persistPath.empty())
saveState(persistPath);

if (stop) {
for (auto dht : {&dht4, &dht6}) {
for (auto& sr : dht->searches) {
for (const auto& r : sr.second->callbacks)
r.second.done_cb(false, {});
sr.second->callbacks.clear();
for (const auto& a : sr.second->announce) {
if (a.callback) a.callback(false, {});
}
sr.second->announce.clear();
sr.second->listeners.clear();
}
}
network_engine.clear();
}

if (not maintain_storage) {
if (cb) cb();
return;
Expand Down
2 changes: 1 addition & 1 deletion src/dht_proxy_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ DhtProxyClient::cancelAllListeners()
}

void
DhtProxyClient::shutdown(ShutdownCallback cb)
DhtProxyClient::shutdown(ShutdownCallback cb, bool)
{
stop();
if (cb)
Expand Down
55 changes: 35 additions & 20 deletions src/dhtrunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,29 +268,31 @@ DhtRunner::run(const Config& config, Context&& context)
}

void
DhtRunner::shutdown(ShutdownCallback cb) {
DhtRunner::shutdown(ShutdownCallback cb, bool stop) {
std::unique_lock<std::mutex> lck(storage_mtx);
auto expected = State::Running;
if (not running.compare_exchange_strong(expected, State::Stopping)) {
if (expected == State::Stopping and ongoing_ops) {
std::lock_guard<std::mutex> lck(storage_mtx);
shutdownCallbacks_.emplace_back(std::move(cb));
}
else if (cb) cb();
else if (cb) {
lck.unlock();
cb();
}
return;
}
if (logger_)
logger_->d("[runner %p] state changed to Stopping, %zu ongoing ops", this, ongoing_ops.load());
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
shutdownCallbacks_.emplace_back(std::move(cb));
pending_ops_prio.emplace([=](SecureDht&) mutable {
pending_ops.emplace([=](SecureDht&) mutable {
auto onShutdown = [this]{ opEnded(); };
#ifdef OPENDHT_PROXY_CLIENT
if (dht_via_proxy_)
dht_via_proxy_->shutdown(onShutdown);
dht_via_proxy_->shutdown(onShutdown, stop);
#endif
if (dht_)
dht_->shutdown(onShutdown);
dht_->shutdown(onShutdown, stop);
});
cv.notify_all();
}
Expand Down Expand Up @@ -319,11 +321,11 @@ DhtRunner::bindOpDoneCallback(DoneCallbackSimple&& cb) {

bool
DhtRunner::checkShutdown() {
if (running != State::Stopping or ongoing_ops)
return false;
decltype(shutdownCallbacks_) cbs;
{
std::lock_guard<std::mutex> lck(storage_mtx);
if (running != State::Stopping or ongoing_ops)
return false;
cbs = std::move(shutdownCallbacks_);
}
for (auto& cb : cbs)
Expand Down Expand Up @@ -355,9 +357,13 @@ DhtRunner::join()

{
std::lock_guard<std::mutex> lck(storage_mtx);
if (ongoing_ops and logger_) {
logger_->w("[runner %p] stopping with %zu remaining ops", this, ongoing_ops.load());
}
pending_ops = decltype(pending_ops)();
pending_ops_prio = decltype(pending_ops_prio)();
ongoing_ops = 0;
shutdownCallbacks_.clear();
}
{
std::lock_guard<std::mutex> lck(dht_mtx);
Expand Down Expand Up @@ -709,11 +715,12 @@ DhtRunner::loop_()
void
DhtRunner::get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f, Where w)
{
std::unique_lock<std::mutex> lck(storage_mtx);
if (running != State::Running) {
lck.unlock();
if (dcb) dcb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) mutable {
dht.get(hash, std::move(vcb), bindOpDoneCallback(std::move(dcb)), std::move(f), std::move(w));
Expand All @@ -728,11 +735,12 @@ DhtRunner::get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb,
}
void
DhtRunner::query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb, Query q) {
std::unique_lock<std::mutex> lck(storage_mtx);
if (running != State::Running) {
lck.unlock();
if (done_cb) done_cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) mutable {
dht.query(hash, std::move(cb), bindOpDoneCallback(std::move(done_cb)), std::move(q));
Expand All @@ -744,11 +752,12 @@ std::future<size_t>
DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w)
{
auto ret_token = std::make_shared<std::promise<size_t>>();
std::unique_lock<std::mutex> lck(storage_mtx);
if (running != State::Running) {
lck.unlock();
ret_token->set_value(0);
return ret_token->get_future();
}
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops.emplace([=](SecureDht& dht) mutable {
#ifdef OPENDHT_PROXY_CLIENT
auto tokenbGlobal = listener_token_++;
Expand Down Expand Up @@ -829,11 +838,12 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken)
void
DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created, bool permanent)
{
std::unique_lock<std::mutex> lck(storage_mtx);
if (running != State::Running) {
lck.unlock();
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=,
cb = std::move(cb),
Expand All @@ -847,11 +857,12 @@ DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created
void
DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, time_point created, bool permanent)
{
std::unique_lock<std::mutex> lck(storage_mtx);
if (running != State::Running) {
lck.unlock();
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=, cb = std::move(cb)](SecureDht& dht) mutable {
dht.put(hash, value, bindOpDoneCallback(std::move(cb)), created, permanent);
Expand Down Expand Up @@ -888,11 +899,12 @@ DhtRunner::cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value)
void
DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, bool permanent)
{
std::unique_lock<std::mutex> lck(storage_mtx);
if (running != State::Running) {
lck.unlock();
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=,
cb = std::move(cb),
Expand All @@ -918,11 +930,12 @@ DhtRunner::putSigned(const std::string& key, Value&& value, DoneCallbackSimple c
void
DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb, bool permanent)
{
std::unique_lock<std::mutex> lck(storage_mtx);
if (running != State::Running) {
lck.unlock();
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=,
cb = std::move(cb),
Expand Down Expand Up @@ -1008,11 +1021,12 @@ DhtRunner::bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple&& cb)
void
DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb)
{
std::unique_lock<std::mutex> lck(storage_mtx);
if (running != State::Running) {
lck.unlock();
if (cb) cb(false);
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops_prio.emplace([addr, cb = bindOpDoneCallback(std::move(cb))](SecureDht& dht) mutable {
dht.pingNode(std::move(addr), std::move(cb));
Expand All @@ -1023,9 +1037,9 @@ DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb)
void
DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address)
{
std::lock_guard<std::mutex> lck(storage_mtx);
if (running != State::Running)
return;
std::unique_lock<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([id, address](SecureDht& dht) mutable {
dht.insertNode(id, address);
});
Expand All @@ -1035,9 +1049,9 @@ DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address)
void
DhtRunner::bootstrap(const std::vector<NodeExport>& nodes)
{
std::lock_guard<std::mutex> lck(storage_mtx);
if (running != State::Running)
return;
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([=](SecureDht& dht) {
for (auto& node : nodes)
dht.insertNode(node);
Expand All @@ -1061,11 +1075,12 @@ DhtRunner::connectivityChanged()

void
DhtRunner::findCertificate(InfoHash hash, std::function<void(const Sp<crypto::Certificate>&)> cb) {
std::unique_lock<std::mutex> lck(storage_mtx);
if (running != State::Running) {
lck.unlock();
cb({});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([this, hash, cb = std::move(cb)] (SecureDht& dht) {
dht.findCertificate(hash, [this, cb = std::move(cb)](const Sp<crypto::Certificate>& crt){
Expand Down
43 changes: 43 additions & 0 deletions tests/dhtproxytester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,47 @@ DhtProxyTester::testFuzzy()
CPPUNIT_ASSERT(value->data == mtu);
}

void
DhtProxyTester::testShutdownStop()
{
constexpr size_t N = 40000;
constexpr unsigned C = 100;

// Arrange
auto key = dht::InfoHash::get("testShutdownStop");
std::vector<std::shared_ptr<dht::Value>> values;
std::vector<uint8_t> mtu;
mtu.reserve(N);
for (size_t i = 0; i < N; i++)
mtu.emplace_back((i % 2) ? 'T' : 'M');

std::atomic_uint callback_count {0};

// Act
for (size_t i = 0; i < C; i++) {
auto nodeTest = std::make_shared<dht::DhtRunner>();
nodeTest->run(0, clientConfig);
nodeTest->put(key, dht::Value(mtu), [&](bool ok) {
callback_count++;
});
nodeTest->get(key, [&](const std::vector<std::shared_ptr<dht::Value>>& vals){
values.insert(values.end(), vals.begin(), vals.end());
return true;
},[&](bool ok){
callback_count++;
});
bool done = false;
std::condition_variable cv;
std::mutex cv_m;
nodeTest->shutdown([&]{
std::lock_guard<std::mutex> lk(cv_m);
done = true;
cv.notify_all();
}, true);
std::unique_lock<std::mutex> lk(cv_m);
CPPUNIT_ASSERT(cv.wait_for(lk, 10s, [&]{ return done; }));
}
CPPUNIT_ASSERT_EQUAL(2*C, callback_count.load());
}

} // namespace test
3 changes: 3 additions & 0 deletions tests/dhtproxytester.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class DhtProxyTester : public CppUnit::TestFixture {
CPPUNIT_TEST(testResubscribeGetValues);
CPPUNIT_TEST(testPutGet40KChars);
CPPUNIT_TEST(testFuzzy);
CPPUNIT_TEST(testShutdownStop);
CPPUNIT_TEST_SUITE_END();

public:
Expand Down Expand Up @@ -68,6 +69,8 @@ class DhtProxyTester : public CppUnit::TestFixture {

void testFuzzy();

void testShutdownStop();

private:
dht::DhtRunner::Config clientConfig {};
dht::DhtRunner nodePeer;
Expand Down

0 comments on commit a178a8f

Please sign in to comment.