From 78e3924fcfff04719d7ebbdedc8bb73147f20274 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 12 Oct 2015 13:54:20 +0300 Subject: [PATCH] fix output stream batching Current code for output stream batching may execute two writes to underlying fd in parallel which is illegal. It may happen since both callback that is called by the poller and output stream user may run in parallel and do not synchronize with one another. This patch aims to add such synchronisation. If ostream user writes while poller is running it will wait for poller to complete, otherwise cancel poller and write directly. --- core/iostream-impl.hh | 112 ++++++++++++++++++++++++++++-------- core/iostream.hh | 18 ++++-- core/reactor.cc | 17 ++---- core/reactor.hh | 8 +-- net/native-stack-impl.hh | 2 +- net/posix-stack.cc | 2 +- rpc/rpc_impl.hh | 4 +- tests/output_stream_test.cc | 5 +- 8 files changed, 113 insertions(+), 55 deletions(-) diff --git a/core/iostream-impl.hh b/core/iostream-impl.hh index 45c2cfaae..d8c510475 100644 --- a/core/iostream-impl.hh +++ b/core/iostream-impl.hh @@ -186,7 +186,7 @@ output_stream::split_and_put(temporary_buffer buf) { auto chunk = buf.share(0, _size); buf.trim_front(_size); - return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable { + return put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable { return split_and_put(std::move(buf)); }); } @@ -202,11 +202,13 @@ output_stream::write(const char_type* buf, size_t n) { _end = _size; temporary_buffer tmp = _fd.allocate_buffer(n - now); std::copy(buf + now, buf + n, tmp.get_write()); - return push_out().then([this, tmp = std::move(tmp)]() mutable { + _buf.trim(_end); + _end = 0; + return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable { if (_trim_to_size) { return split_and_put(std::move(tmp)); } else { - return _fd.put(std::move(tmp)); + return put(std::move(tmp)); } }); } else { @@ -215,7 +217,7 @@ output_stream::write(const char_type* buf, size_t n) { if (_trim_to_size) { return split_and_put(std::move(tmp)); } else { - return _fd.put(std::move(tmp)); + return put(std::move(tmp)); } } } @@ -234,41 +236,103 @@ output_stream::write(const char_type* buf, size_t n) { std::copy(buf + now, buf + n, next.get_write()); _end = n - now; std::swap(next, _buf); - return _fd.put(std::move(next)); + return put(std::move(next)); } } template future<> -output_stream::push_out() { - if (_end) { - _buf.trim(_end); - _end = 0; - return _fd.put(std::move(_buf)); +output_stream::flush() { + if (!_batch_flushes) { + if (_end) { + _buf.trim(_end); + _end = 0; + return put(std::move(_buf)).then([this] { + return _fd.flush(); + }); + } } else { - return make_ready_future<>(); + if (_ex) { + // flush is a good time to deliver outstanding errors + return make_exception_future<>(std::move(_ex)); + } else { + _flush = true; + if (!_in_batch) { + add_to_flush_poller(this); + _in_batch = promise<>(); + } + } } + return make_ready_future<>(); } +void add_to_flush_poller(output_stream* x); + template future<> -output_stream::flush() { - return push_out().then([this] { +output_stream::put(temporary_buffer buf) { + // if flush is scheduled, disable it, so it will not try to write in parallel + _flush = false; + if (_flushing) { + // flush in progress, wait for it to end before continuing + return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable { + return _fd.put(std::move(buf)); + }); + } else { + return _fd.put(std::move(buf)); + } +} + +template +void +output_stream::poll_flush() { + if (!_flush) { + // flush was canceled, do nothing + _flushing = false; + _in_batch.value().set_value(); + _in_batch = std::experimental::nullopt; + return; + } + + auto f = make_ready_future(); + _flush = false; + _flushing = true; // make whoever wants to write into the fd to wait for flush to complete + + if (_end) { + // send whatever is in the buffer right now + _buf.trim(_end); + _end = 0; + f = _fd.put(std::move(_buf)); + } + + f.then([this] { return _fd.flush(); + }).then_wrapped([this] (future<> f) { + try { + f.get(); + } catch (...) { + _ex = std::current_exception(); + } + // if flush() was called while flushing flush once more + poll_flush(); }); } -future<> add_to_flush_poller(output_stream& x); - template -void -output_stream::batch_flush() { - if (_in_batch_flush.available()) { - if (!_in_batch_flush.failed()) { - // do not destroy exceptional future - // the stream is in error state and cannot be used - // any more, so just skip adding it to the poller - _in_batch_flush = add_to_flush_poller(*this); +future<> +output_stream::close() { + return flush().then([this] { + if (_in_batch) { + return _in_batch.value().get_future(); + } else { + return make_ready_future(); } - } + }).then([this] { + // report final exception as close error + if (_ex) { + std::rethrow_exception(_ex); + } + }).finally([this] { + return _fd.close(); + }); } diff --git a/core/iostream.hh b/core/iostream.hh index adba4f950..1e9cf8e7c 100644 --- a/core/iostream.hh +++ b/core/iostream.hh @@ -176,19 +176,25 @@ class output_stream final { size_t _begin = 0; size_t _end = 0; bool _trim_to_size = false; - future<> _in_batch_flush = make_ready_future<>(); + bool _batch_flushes = false; + std::experimental::optional> _in_batch; + bool _flush = false; + bool _flushing = false; + std::exception_ptr _ex; private: size_t available() const { return _end - _begin; } size_t possibly_available() const { return _size - _begin; } future<> split_and_put(temporary_buffer buf); - future<> push_out(); + future<> put(temporary_buffer buf); + void poll_flush(); public: using char_type = CharType; output_stream() = default; - output_stream(data_sink fd, size_t size, bool trim_to_size = false) - : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {} + output_stream(data_sink fd, size_t size, bool trim_to_size = false, bool batch_flushes = false) + : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes) {} output_stream(output_stream&&) = default; output_stream& operator=(output_stream&&) = default; + ~output_stream() { assert(!_in_batch); } future<> write(const char_type* buf, size_t n); future<> write(const char_type* buf); @@ -199,9 +205,9 @@ public: future<> write(net::packet p); future<> write(scattered_message msg); future<> flush(); - void batch_flush(); - future<> close() { return _in_batch_flush.then([this] { return flush(); }).finally([this] { return _fd.close(); }); } + future<> close(); private: + friend class reactor; }; #include "iostream-impl.hh" diff --git a/core/reactor.cc b/core/reactor.cc index a0e50accb..8a84fa80e 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -1204,15 +1204,9 @@ int reactor::run() { poller batch_flush_poller([this] { bool work = _flush_batching.size(); while (!_flush_batching.empty()) { - auto e = std::move(_flush_batching.front()); + auto os = std::move(_flush_batching.front()); _flush_batching.pop_front(); - e.os.flush().then_wrapped([p = std::move(e.done)] (future<> f) mutable { - try { - p.set_value(f.get()); - } catch(...) { - p.set_exception(std::current_exception()); - } - }); + os->poll_flush(); } return work; }); @@ -2309,10 +2303,7 @@ future<> later() { return f; } -future<> add_to_flush_poller(output_stream& os) { - promise<> p; - auto f = p.get_future(); - engine()._flush_batching.emplace_back(reactor::flush_batch_entry{std::move(p), os}); - return f; +void add_to_flush_poller(output_stream* os) { + engine()._flush_batching.emplace_back(os); } diff --git a/core/reactor.hh b/core/reactor.hh index 6ac9f4092..092bf3019 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -741,11 +741,7 @@ private: const bool _reuseport; circular_buffer _loads; double _load = 0; - struct flush_batch_entry { - promise<> done; - output_stream& os; - }; - circular_buffer _flush_batching; + circular_buffer* > _flush_batching; private: static void clear_task_quota(int); bool flush_pending_aio(); @@ -902,7 +898,7 @@ private: friend class smp; friend class smp_message_queue; friend class poller; - friend future<> add_to_flush_poller(output_stream& os); + friend void add_to_flush_poller(output_stream* os); public: bool wait_and_process() { return _backend.wait_and_process(); diff --git a/net/native-stack-impl.hh b/net/native-stack-impl.hh index dda888462..41ecaf781 100644 --- a/net/native-stack-impl.hh +++ b/net/native-stack-impl.hh @@ -138,7 +138,7 @@ template output_stream native_connected_socket_impl::output() { data_sink ds(std::make_unique(_conn)); - return output_stream(std::move(ds), 8192); + return output_stream(std::move(ds), 8192, false, true); } template diff --git a/net/posix-stack.cc b/net/posix-stack.cc index d2f9df488..10a566523 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -33,7 +33,7 @@ class posix_connected_socket_impl final : public connected_socket_impl { explicit posix_connected_socket_impl(pollable_fd fd) : _fd(std::move(fd)) {} public: virtual input_stream input() override { return input_stream(posix_data_source(_fd)); } - virtual output_stream output() override { return output_stream(posix_data_sink(_fd), 8192); } + virtual output_stream output() override { return output_stream(posix_data_sink(_fd), 8192, false, true); } virtual void shutdown_input() override { _fd.shutdown(SHUT_RD); } diff --git a/rpc/rpc_impl.hh b/rpc/rpc_impl.hh index f37ee0bdf..8a45f4e79 100644 --- a/rpc/rpc_impl.hh +++ b/rpc/rpc_impl.hh @@ -293,7 +293,7 @@ auto send_helper(MsgType xt, signature xsig) { dst.out_ready() = do_with(std::move(data), [&dst] (sstring& data) { return dst.out_ready().then([&dst, &data] () mutable { return dst.out().write(data).then([&dst] { - dst.out().batch_flush(); + return dst.out().flush(); }); }); }).finally([&dst] () { @@ -327,7 +327,7 @@ protocol::server::connection::respond(int64_t msg_id, sstri *unaligned_cast(p + 8) = net::hton(data.size() - 16); return do_with(std::move(data), this->shared_from_this(), [msg_id] (const sstring& data, lw_shared_ptr::server::connection> conn) { return conn->out().write(data.begin(), data.size()).then([conn] { - conn->out().batch_flush(); + return conn->out().flush(); }); }); } diff --git a/tests/output_stream_test.cc b/tests/output_stream_test.cc index 8f2d8e7da..cdfb9e5f1 100644 --- a/tests/output_stream_test.cc +++ b/tests/output_stream_test.cc @@ -72,7 +72,7 @@ future<> assert_split(StreamConstructor stream_maker, std::initializer_list w return do_for_each(sh_write_calls->begin(), sh_write_calls->end(), [out, sh_write_calls] (auto&& chunk) { return out->write(chunk); }).then([out, v, sh_expected_splits] { - return out->flush().then([out, v, sh_expected_splits] { + return out->close().then([out, v, sh_expected_splits] { BOOST_REQUIRE_EQUAL(v->size(), sh_expected_splits->size()); int i = 0; for (auto&& chunk : *sh_expected_splits) { @@ -125,5 +125,6 @@ SEASTAR_TEST_CASE(test_flush_on_empty_buffer_does_not_push_empty_packet_down_str return out->flush().then([v, out] { BOOST_REQUIRE(v->empty()); - }); + return out->close(); + }).finally([out]{}); }