diff --git a/core/iostream-impl.hh b/core/iostream-impl.hh index 45c2cfaae..d8c510475 100644 --- a/core/iostream-impl.hh +++ b/core/iostream-impl.hh @@ -186,7 +186,7 @@ output_stream::split_and_put(temporary_buffer buf) { auto chunk = buf.share(0, _size); buf.trim_front(_size); - return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable { + return put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable { return split_and_put(std::move(buf)); }); } @@ -202,11 +202,13 @@ output_stream::write(const char_type* buf, size_t n) { _end = _size; temporary_buffer tmp = _fd.allocate_buffer(n - now); std::copy(buf + now, buf + n, tmp.get_write()); - return push_out().then([this, tmp = std::move(tmp)]() mutable { + _buf.trim(_end); + _end = 0; + return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable { if (_trim_to_size) { return split_and_put(std::move(tmp)); } else { - return _fd.put(std::move(tmp)); + return put(std::move(tmp)); } }); } else { @@ -215,7 +217,7 @@ output_stream::write(const char_type* buf, size_t n) { if (_trim_to_size) { return split_and_put(std::move(tmp)); } else { - return _fd.put(std::move(tmp)); + return put(std::move(tmp)); } } } @@ -234,41 +236,103 @@ output_stream::write(const char_type* buf, size_t n) { std::copy(buf + now, buf + n, next.get_write()); _end = n - now; std::swap(next, _buf); - return _fd.put(std::move(next)); + return put(std::move(next)); } } template future<> -output_stream::push_out() { - if (_end) { - _buf.trim(_end); - _end = 0; - return _fd.put(std::move(_buf)); +output_stream::flush() { + if (!_batch_flushes) { + if (_end) { + _buf.trim(_end); + _end = 0; + return put(std::move(_buf)).then([this] { + return _fd.flush(); + }); + } } else { - return make_ready_future<>(); + if (_ex) { + // flush is a good time to deliver outstanding errors + return make_exception_future<>(std::move(_ex)); + } else { + _flush = true; + if (!_in_batch) { + add_to_flush_poller(this); + _in_batch = promise<>(); + } + } } + return make_ready_future<>(); } +void add_to_flush_poller(output_stream* x); + template future<> -output_stream::flush() { - return push_out().then([this] { +output_stream::put(temporary_buffer buf) { + // if flush is scheduled, disable it, so it will not try to write in parallel + _flush = false; + if (_flushing) { + // flush in progress, wait for it to end before continuing + return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable { + return _fd.put(std::move(buf)); + }); + } else { + return _fd.put(std::move(buf)); + } +} + +template +void +output_stream::poll_flush() { + if (!_flush) { + // flush was canceled, do nothing + _flushing = false; + _in_batch.value().set_value(); + _in_batch = std::experimental::nullopt; + return; + } + + auto f = make_ready_future(); + _flush = false; + _flushing = true; // make whoever wants to write into the fd to wait for flush to complete + + if (_end) { + // send whatever is in the buffer right now + _buf.trim(_end); + _end = 0; + f = _fd.put(std::move(_buf)); + } + + f.then([this] { return _fd.flush(); + }).then_wrapped([this] (future<> f) { + try { + f.get(); + } catch (...) { + _ex = std::current_exception(); + } + // if flush() was called while flushing flush once more + poll_flush(); }); } -future<> add_to_flush_poller(output_stream& x); - template -void -output_stream::batch_flush() { - if (_in_batch_flush.available()) { - if (!_in_batch_flush.failed()) { - // do not destroy exceptional future - // the stream is in error state and cannot be used - // any more, so just skip adding it to the poller - _in_batch_flush = add_to_flush_poller(*this); +future<> +output_stream::close() { + return flush().then([this] { + if (_in_batch) { + return _in_batch.value().get_future(); + } else { + return make_ready_future(); } - } + }).then([this] { + // report final exception as close error + if (_ex) { + std::rethrow_exception(_ex); + } + }).finally([this] { + return _fd.close(); + }); } diff --git a/core/iostream.hh b/core/iostream.hh index adba4f950..1e9cf8e7c 100644 --- a/core/iostream.hh +++ b/core/iostream.hh @@ -176,19 +176,25 @@ class output_stream final { size_t _begin = 0; size_t _end = 0; bool _trim_to_size = false; - future<> _in_batch_flush = make_ready_future<>(); + bool _batch_flushes = false; + std::experimental::optional> _in_batch; + bool _flush = false; + bool _flushing = false; + std::exception_ptr _ex; private: size_t available() const { return _end - _begin; } size_t possibly_available() const { return _size - _begin; } future<> split_and_put(temporary_buffer buf); - future<> push_out(); + future<> put(temporary_buffer buf); + void poll_flush(); public: using char_type = CharType; output_stream() = default; - output_stream(data_sink fd, size_t size, bool trim_to_size = false) - : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {} + output_stream(data_sink fd, size_t size, bool trim_to_size = false, bool batch_flushes = false) + : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes) {} output_stream(output_stream&&) = default; output_stream& operator=(output_stream&&) = default; + ~output_stream() { assert(!_in_batch); } future<> write(const char_type* buf, size_t n); future<> write(const char_type* buf); @@ -199,9 +205,9 @@ public: future<> write(net::packet p); future<> write(scattered_message msg); future<> flush(); - void batch_flush(); - future<> close() { return _in_batch_flush.then([this] { return flush(); }).finally([this] { return _fd.close(); }); } + future<> close(); private: + friend class reactor; }; #include "iostream-impl.hh" diff --git a/core/reactor.cc b/core/reactor.cc index a0e50accb..8a84fa80e 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -1204,15 +1204,9 @@ int reactor::run() { poller batch_flush_poller([this] { bool work = _flush_batching.size(); while (!_flush_batching.empty()) { - auto e = std::move(_flush_batching.front()); + auto os = std::move(_flush_batching.front()); _flush_batching.pop_front(); - e.os.flush().then_wrapped([p = std::move(e.done)] (future<> f) mutable { - try { - p.set_value(f.get()); - } catch(...) { - p.set_exception(std::current_exception()); - } - }); + os->poll_flush(); } return work; }); @@ -2309,10 +2303,7 @@ future<> later() { return f; } -future<> add_to_flush_poller(output_stream& os) { - promise<> p; - auto f = p.get_future(); - engine()._flush_batching.emplace_back(reactor::flush_batch_entry{std::move(p), os}); - return f; +void add_to_flush_poller(output_stream* os) { + engine()._flush_batching.emplace_back(os); } diff --git a/core/reactor.hh b/core/reactor.hh index 6ac9f4092..092bf3019 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -741,11 +741,7 @@ private: const bool _reuseport; circular_buffer _loads; double _load = 0; - struct flush_batch_entry { - promise<> done; - output_stream& os; - }; - circular_buffer _flush_batching; + circular_buffer* > _flush_batching; private: static void clear_task_quota(int); bool flush_pending_aio(); @@ -902,7 +898,7 @@ private: friend class smp; friend class smp_message_queue; friend class poller; - friend future<> add_to_flush_poller(output_stream& os); + friend void add_to_flush_poller(output_stream* os); public: bool wait_and_process() { return _backend.wait_and_process(); diff --git a/net/native-stack-impl.hh b/net/native-stack-impl.hh index dda888462..41ecaf781 100644 --- a/net/native-stack-impl.hh +++ b/net/native-stack-impl.hh @@ -138,7 +138,7 @@ template output_stream native_connected_socket_impl::output() { data_sink ds(std::make_unique(_conn)); - return output_stream(std::move(ds), 8192); + return output_stream(std::move(ds), 8192, false, true); } template diff --git a/net/posix-stack.cc b/net/posix-stack.cc index d2f9df488..10a566523 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -33,7 +33,7 @@ class posix_connected_socket_impl final : public connected_socket_impl { explicit posix_connected_socket_impl(pollable_fd fd) : _fd(std::move(fd)) {} public: virtual input_stream input() override { return input_stream(posix_data_source(_fd)); } - virtual output_stream output() override { return output_stream(posix_data_sink(_fd), 8192); } + virtual output_stream output() override { return output_stream(posix_data_sink(_fd), 8192, false, true); } virtual void shutdown_input() override { _fd.shutdown(SHUT_RD); } diff --git a/rpc/rpc_impl.hh b/rpc/rpc_impl.hh index f37ee0bdf..8a45f4e79 100644 --- a/rpc/rpc_impl.hh +++ b/rpc/rpc_impl.hh @@ -293,7 +293,7 @@ auto send_helper(MsgType xt, signature xsig) { dst.out_ready() = do_with(std::move(data), [&dst] (sstring& data) { return dst.out_ready().then([&dst, &data] () mutable { return dst.out().write(data).then([&dst] { - dst.out().batch_flush(); + return dst.out().flush(); }); }); }).finally([&dst] () { @@ -327,7 +327,7 @@ protocol::server::connection::respond(int64_t msg_id, sstri *unaligned_cast(p + 8) = net::hton(data.size() - 16); return do_with(std::move(data), this->shared_from_this(), [msg_id] (const sstring& data, lw_shared_ptr::server::connection> conn) { return conn->out().write(data.begin(), data.size()).then([conn] { - conn->out().batch_flush(); + return conn->out().flush(); }); }); } diff --git a/tests/output_stream_test.cc b/tests/output_stream_test.cc index 8f2d8e7da..cdfb9e5f1 100644 --- a/tests/output_stream_test.cc +++ b/tests/output_stream_test.cc @@ -72,7 +72,7 @@ future<> assert_split(StreamConstructor stream_maker, std::initializer_list w return do_for_each(sh_write_calls->begin(), sh_write_calls->end(), [out, sh_write_calls] (auto&& chunk) { return out->write(chunk); }).then([out, v, sh_expected_splits] { - return out->flush().then([out, v, sh_expected_splits] { + return out->close().then([out, v, sh_expected_splits] { BOOST_REQUIRE_EQUAL(v->size(), sh_expected_splits->size()); int i = 0; for (auto&& chunk : *sh_expected_splits) { @@ -125,5 +125,6 @@ SEASTAR_TEST_CASE(test_flush_on_empty_buffer_does_not_push_empty_packet_down_str return out->flush().then([v, out] { BOOST_REQUIRE(v->empty()); - }); + return out->close(); + }).finally([out]{}); }