diff --git a/debian/changelog b/debian/changelog index e06ebfcfc..48dd80cda 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,10 @@ +cocaine-core (0.12.15.0) unstable; urgency=low + + * Fixed: typo + * Fixed (BREAKING): race between socket close and async calls by stream + + -- Kirill Smorodinnikov Thu, 19 Apr 2018 11:31:41 +0300 + cocaine-core (0.12.14.25) unstable; urgency=low * Fixed: typo diff --git a/include/cocaine/rpc/asio/readable_stream.hpp b/include/cocaine/rpc/asio/readable_stream.hpp index 1b499b324..6715e2a63 100644 --- a/include/cocaine/rpc/asio/readable_stream.hpp +++ b/include/cocaine/rpc/asio/readable_stream.hpp @@ -22,6 +22,7 @@ #define COCAINE_IO_BUFFERED_READABLE_STREAM_HPP #include "cocaine/errors.hpp" +#include "cocaine/locked_ptr.hpp" #include "cocaine/memory.hpp" #include @@ -55,6 +56,8 @@ class readable_stream: decoder_type m_decoder; + synchronized m_stopped{false}; + public: explicit readable_stream(const std::shared_ptr& socket): @@ -66,6 +69,11 @@ class readable_stream: void read(message_type& message, handler_type handle) { + auto stopped = m_stopped.synchronize(); + if (*stopped) { + return; + } + std::error_code ec; const size_t @@ -90,7 +98,7 @@ class readable_stream: if(bytes_pending * 2 >= m_ring.size()) { // The total size of unprocessed data in larger than half the size of the ring, so grow - // the ring in order to accomodate more data. + // the ring in order to accommodate more data. m_ring.resize(m_ring.size() * 2); } @@ -107,9 +115,21 @@ class readable_stream: return m_ring.size(); } + void + stop() { + m_stopped.apply([] (bool& stopped) { + stopped = true; + }); + } + private: void fill(message_type& message, handler_type handle, const std::error_code& ec, size_t bytes_read) { + auto stopped = m_stopped.synchronize(); + if (*stopped) { + return; + } + if(ec) { if(ec == asio::error::operation_aborted) { return; diff --git a/include/cocaine/rpc/asio/transport.hpp b/include/cocaine/rpc/asio/transport.hpp index 62377fa7a..b2eb70444 100644 --- a/include/cocaine/rpc/asio/transport.hpp +++ b/include/cocaine/rpc/asio/transport.hpp @@ -57,6 +57,8 @@ struct transport { ~transport() { try { + reader->stop(); + writer->stop(); socket->shutdown(socket_type::shutdown_both); socket->close(); } catch(...) { diff --git a/include/cocaine/rpc/asio/writable_stream.hpp b/include/cocaine/rpc/asio/writable_stream.hpp index 666e874f7..741407005 100644 --- a/include/cocaine/rpc/asio/writable_stream.hpp +++ b/include/cocaine/rpc/asio/writable_stream.hpp @@ -22,6 +22,7 @@ #define COCAINE_IO_BUFFERED_WRITABLE_STREAM_HPP #include "cocaine/errors.hpp" +#include "cocaine/locked_ptr.hpp" #include "cocaine/trace/trace.hpp" #include @@ -56,6 +57,8 @@ class writable_stream: encoder_type encoder; + synchronized m_stopped{false}; + public: explicit writable_stream(const std::shared_ptr& socket): @@ -65,6 +68,11 @@ class writable_stream: void write(const message_type& message, handler_type handle) { + auto stopped = m_stopped.synchronize(); + if (*stopped) { + return; + } + size_t bytes_written = 0; auto encoded = encoder.encode(message); @@ -103,9 +111,20 @@ class writable_stream: return asio::buffer_size(m_messages); } + void stop() { + m_stopped.apply([] (bool& stopped) { + stopped = true; + }); + } + private: void flush(const std::error_code& ec, size_t bytes_written) { + auto stopped = m_stopped.synchronize(); + if (*stopped) { + return; + } + if(ec) { if(ec == asio::error::operation_aborted) { return;