Skip to content

Commit

Permalink
rpc: server connection shutdown fix
Browse files Browse the repository at this point in the history
Server side of a connection should be closed in the same way client side
close its end.
  • Loading branch information
Gleb Natapov authored and avikivity committed Oct 12, 2015
1 parent c2e86d5 commit f4d29d1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
1 change: 0 additions & 1 deletion rpc/rpc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public:
client_info _info;
public:
connection(server& s, connected_socket&& fd, socket_address&& addr, protocol& proto);
~connection() { this->_output_ready.ignore_ready_future(); }
future<> process();
future<> respond(int64_t msg_id, sstring&& data);
auto& info() { return _info; }
Expand Down
30 changes: 20 additions & 10 deletions rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,11 @@ auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci
// note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
apply(func, client->info(), WantClientInfo(), signature(), std::move(args)).then_wrapped(
[client, msg_id] (futurize_t<typename signature::ret_type> ret) {
client->out_ready() = client->out_ready().then([client, msg_id, ret = std::move(ret)] () mutable {
return reply<Serializer, MsgType>(wait_style(), std::move(ret), msg_id, *client);
});
if (!client->error()) {
client->out_ready() = client->out_ready().then([client, msg_id, ret = std::move(ret)] () mutable {
return reply<Serializer, MsgType>(wait_style(), std::move(ret), msg_id, *client);
});
}
});
};
}
Expand Down Expand Up @@ -571,14 +573,22 @@ future<> protocol<Serializer, MsgType>::server::connection::process() {
this->_error = true;
}
});
}).finally([this, conn_ptr = this->shared_from_this()] () {
}).finally([this] {
this->_error = true;
return this->out_ready().then_wrapped([this] (future<> f) {
f.ignore_ready_future();
return this->_write_buf.close();
}).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
if (!this->_server._stopping) {
// if server is stopping do not remove connection
// since it may invalidate _conns iterators
this->_server._conns.erase(this);
}
this->_stopped.set_value();
});
}).finally([conn_ptr = this->shared_from_this()] {
// hold onto connection pointer until do_until() exists
if (!this->_server._stopping) {
// if server is stopping do not remove connection
// since it may invalidate _conns iterators
this->_server._conns.erase(this);
}
this->_stopped.set_value();
});
}

Expand Down

0 comments on commit f4d29d1

Please sign in to comment.