Skip to content

Commit

Permalink
fix output stream batching
Browse files Browse the repository at this point in the history
Current code for output stream batching may execute two writes to
underlying fd in parallel which is illegal. It may happen since both
callback that is called by the poller and output stream user may run in
parallel and do not synchronize with one another. This patch aims to add
such synchronisation. If ostream user writes while poller is running it
will wait for poller to complete, otherwise cancel poller and write
directly.
  • Loading branch information
Gleb Natapov authored and avikivity committed Oct 12, 2015
1 parent f4d29d1 commit 78e3924
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 55 deletions.
112 changes: 88 additions & 24 deletions core/iostream-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {

auto chunk = buf.share(0, _size);
buf.trim_front(_size);
return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable {
return put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable {
return split_and_put(std::move(buf));
});
}
Expand All @@ -202,11 +202,13 @@ output_stream<CharType>::write(const char_type* buf, size_t n) {
_end = _size;
temporary_buffer<char> tmp = _fd.allocate_buffer(n - now);
std::copy(buf + now, buf + n, tmp.get_write());
return push_out().then([this, tmp = std::move(tmp)]() mutable {
_buf.trim(_end);
_end = 0;
return put(std::move(_buf)).then([this, tmp = std::move(tmp)]() mutable {
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
return put(std::move(tmp));
}
});
} else {
Expand All @@ -215,7 +217,7 @@ output_stream<CharType>::write(const char_type* buf, size_t n) {
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
return put(std::move(tmp));
}
}
}
Expand All @@ -234,41 +236,103 @@ output_stream<CharType>::write(const char_type* buf, size_t n) {
std::copy(buf + now, buf + n, next.get_write());
_end = n - now;
std::swap(next, _buf);
return _fd.put(std::move(next));
return put(std::move(next));
}
}

template <typename CharType>
future<>
output_stream<CharType>::push_out() {
if (_end) {
_buf.trim(_end);
_end = 0;
return _fd.put(std::move(_buf));
output_stream<CharType>::flush() {
if (!_batch_flushes) {
if (_end) {
_buf.trim(_end);
_end = 0;
return put(std::move(_buf)).then([this] {
return _fd.flush();
});
}
} else {
return make_ready_future<>();
if (_ex) {
// flush is a good time to deliver outstanding errors
return make_exception_future<>(std::move(_ex));
} else {
_flush = true;
if (!_in_batch) {
add_to_flush_poller(this);
_in_batch = promise<>();
}
}
}
return make_ready_future<>();
}

void add_to_flush_poller(output_stream<char>* x);

template <typename CharType>
future<>
output_stream<CharType>::flush() {
return push_out().then([this] {
output_stream<CharType>::put(temporary_buffer<CharType> buf) {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
// flush in progress, wait for it to end before continuing
return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
return _fd.put(std::move(buf));
});
} else {
return _fd.put(std::move(buf));
}
}

template <typename CharType>
void
output_stream<CharType>::poll_flush() {
if (!_flush) {
// flush was canceled, do nothing
_flushing = false;
_in_batch.value().set_value();
_in_batch = std::experimental::nullopt;
return;
}

auto f = make_ready_future();
_flush = false;
_flushing = true; // make whoever wants to write into the fd to wait for flush to complete

if (_end) {
// send whatever is in the buffer right now
_buf.trim(_end);
_end = 0;
f = _fd.put(std::move(_buf));
}

f.then([this] {
return _fd.flush();
}).then_wrapped([this] (future<> f) {
try {
f.get();
} catch (...) {
_ex = std::current_exception();
}
// if flush() was called while flushing flush once more
poll_flush();
});
}

future<> add_to_flush_poller(output_stream<char>& x);

template <typename CharType>
void
output_stream<CharType>::batch_flush() {
if (_in_batch_flush.available()) {
if (!_in_batch_flush.failed()) {
// do not destroy exceptional future
// the stream is in error state and cannot be used
// any more, so just skip adding it to the poller
_in_batch_flush = add_to_flush_poller(*this);
future<>
output_stream<CharType>::close() {
return flush().then([this] {
if (_in_batch) {
return _in_batch.value().get_future();
} else {
return make_ready_future();
}
}
}).then([this] {
// report final exception as close error
if (_ex) {
std::rethrow_exception(_ex);
}
}).finally([this] {
return _fd.close();
});
}
18 changes: 12 additions & 6 deletions core/iostream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,25 @@ class output_stream final {
size_t _begin = 0;
size_t _end = 0;
bool _trim_to_size = false;
future<> _in_batch_flush = make_ready_future<>();
bool _batch_flushes = false;
std::experimental::optional<promise<>> _in_batch;
bool _flush = false;
bool _flushing = false;
std::exception_ptr _ex;
private:
size_t available() const { return _end - _begin; }
size_t possibly_available() const { return _size - _begin; }
future<> split_and_put(temporary_buffer<CharType> buf);
future<> push_out();
future<> put(temporary_buffer<CharType> buf);
void poll_flush();
public:
using char_type = CharType;
output_stream() = default;
output_stream(data_sink fd, size_t size, bool trim_to_size = false)
: _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {}
output_stream(data_sink fd, size_t size, bool trim_to_size = false, bool batch_flushes = false)
: _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes) {}
output_stream(output_stream&&) = default;
output_stream& operator=(output_stream&&) = default;
~output_stream() { assert(!_in_batch); }
future<> write(const char_type* buf, size_t n);
future<> write(const char_type* buf);

Expand All @@ -199,9 +205,9 @@ public:
future<> write(net::packet p);
future<> write(scattered_message<char_type> msg);
future<> flush();
void batch_flush();
future<> close() { return _in_batch_flush.then([this] { return flush(); }).finally([this] { return _fd.close(); }); }
future<> close();
private:
friend class reactor;
};

#include "iostream-impl.hh"
17 changes: 4 additions & 13 deletions core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1204,15 +1204,9 @@ int reactor::run() {
poller batch_flush_poller([this] {
bool work = _flush_batching.size();
while (!_flush_batching.empty()) {
auto e = std::move(_flush_batching.front());
auto os = std::move(_flush_batching.front());
_flush_batching.pop_front();
e.os.flush().then_wrapped([p = std::move(e.done)] (future<> f) mutable {
try {
p.set_value(f.get());
} catch(...) {
p.set_exception(std::current_exception());
}
});
os->poll_flush();
}
return work;
});
Expand Down Expand Up @@ -2309,10 +2303,7 @@ future<> later() {
return f;
}

future<> add_to_flush_poller(output_stream<char>& os) {
promise<> p;
auto f = p.get_future();
engine()._flush_batching.emplace_back(reactor::flush_batch_entry{std::move(p), os});
return f;
void add_to_flush_poller(output_stream<char>* os) {
engine()._flush_batching.emplace_back(os);
}

8 changes: 2 additions & 6 deletions core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -741,11 +741,7 @@ private:
const bool _reuseport;
circular_buffer<double> _loads;
double _load = 0;
struct flush_batch_entry {
promise<> done;
output_stream<char>& os;
};
circular_buffer<flush_batch_entry> _flush_batching;
circular_buffer<output_stream<char>* > _flush_batching;
private:
static void clear_task_quota(int);
bool flush_pending_aio();
Expand Down Expand Up @@ -902,7 +898,7 @@ private:
friend class smp;
friend class smp_message_queue;
friend class poller;
friend future<> add_to_flush_poller(output_stream<char>& os);
friend void add_to_flush_poller(output_stream<char>* os);
public:
bool wait_and_process() {
return _backend.wait_and_process();
Expand Down
2 changes: 1 addition & 1 deletion net/native-stack-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ template <typename Protocol>
output_stream<char>
native_connected_socket_impl<Protocol>::output() {
data_sink ds(std::make_unique<native_data_sink_impl>(_conn));
return output_stream<char>(std::move(ds), 8192);
return output_stream<char>(std::move(ds), 8192, false, true);
}

template <typename Protocol>
Expand Down
2 changes: 1 addition & 1 deletion net/posix-stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class posix_connected_socket_impl final : public connected_socket_impl {
explicit posix_connected_socket_impl(pollable_fd fd) : _fd(std::move(fd)) {}
public:
virtual input_stream<char> input() override { return input_stream<char>(posix_data_source(_fd)); }
virtual output_stream<char> output() override { return output_stream<char>(posix_data_sink(_fd), 8192); }
virtual output_stream<char> output() override { return output_stream<char>(posix_data_sink(_fd), 8192, false, true); }
virtual void shutdown_input() override {
_fd.shutdown(SHUT_RD);
}
Expand Down
4 changes: 2 additions & 2 deletions rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ auto send_helper(MsgType xt, signature<Ret (InArgs...)> xsig) {
dst.out_ready() = do_with(std::move(data), [&dst] (sstring& data) {
return dst.out_ready().then([&dst, &data] () mutable {
return dst.out().write(data).then([&dst] {
dst.out().batch_flush();
return dst.out().flush();
});
});
}).finally([&dst] () {
Expand Down Expand Up @@ -327,7 +327,7 @@ protocol<Serializer, MsgType>::server::connection::respond(int64_t msg_id, sstri
*unaligned_cast<uint64_t*>(p + 8) = net::hton(data.size() - 16);
return do_with(std::move(data), this->shared_from_this(), [msg_id] (const sstring& data, lw_shared_ptr<protocol<Serializer, MsgType>::server::connection> conn) {
return conn->out().write(data.begin(), data.size()).then([conn] {
conn->out().batch_flush();
return conn->out().flush();
});
});
}
Expand Down
5 changes: 3 additions & 2 deletions tests/output_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ future<> assert_split(StreamConstructor stream_maker, std::initializer_list<T> w
return do_for_each(sh_write_calls->begin(), sh_write_calls->end(), [out, sh_write_calls] (auto&& chunk) {
return out->write(chunk);
}).then([out, v, sh_expected_splits] {
return out->flush().then([out, v, sh_expected_splits] {
return out->close().then([out, v, sh_expected_splits] {
BOOST_REQUIRE_EQUAL(v->size(), sh_expected_splits->size());
int i = 0;
for (auto&& chunk : *sh_expected_splits) {
Expand Down Expand Up @@ -125,5 +125,6 @@ SEASTAR_TEST_CASE(test_flush_on_empty_buffer_does_not_push_empty_packet_down_str

return out->flush().then([v, out] {
BOOST_REQUIRE(v->empty());
});
return out->close();
}).finally([out]{});
}

0 comments on commit 78e3924

Please sign in to comment.