Skip to content

Commit

Permalink
Revert changes in UnixStreamSocketHandle because we are not ready f…
Browse files Browse the repository at this point in the history
…or them
  • Loading branch information
ibc committed May 3, 2024
1 parent 09e1b83 commit 3345f2f
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 59 deletions.
4 changes: 1 addition & 3 deletions worker/include/handles/UnixStreamSocketHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class UnixStreamSocketHandle
virtual ~UnixStreamSocketHandle();

public:
void Close();
bool IsClosed() const
{
return this->closed;
Expand All @@ -50,9 +51,6 @@ class UnixStreamSocketHandle
uint32_t GetRecvBufferSize() const;
void SetRecvBufferSize(uint32_t size);

private:
void InternalClose();

/* Callbacks fired by UV events. */
public:
void OnUvReadAlloc(size_t suggestedSize, uv_buf_t* buf);
Expand Down
9 changes: 5 additions & 4 deletions worker/src/Channel/ChannelSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ namespace Channel
{
Close();
}

delete this->consumerSocket;
delete this->producerSocket;
}

void ChannelSocket::Close()
Expand All @@ -102,14 +105,12 @@ namespace Channel

if (this->consumerSocket)
{
delete this->consumerSocket;
this->consumerSocket = nullptr;
this->consumerSocket->Close();
}

if (this->producerSocket)
{
delete this->producerSocket;
this->producerSocket = nullptr;
this->producerSocket->Close();
}
}

Expand Down
106 changes: 54 additions & 52 deletions worker/src/handles/UnixStreamSocketHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,62 @@ UnixStreamSocketHandle::~UnixStreamSocketHandle()

if (!this->closed)
{
InternalClose();
Close();
}

delete[] this->buffer;
}

// NOTE: In UnixStreamSocketHandle we need a poublic Close() method and cannot
// just rely on the destructor plus a private InternalClose() method.
void UnixStreamSocketHandle::Close()
{
MS_TRACE_STD();

if (this->closed)
{
return;
}

int err;

this->closed = true;

// Tell the UV handle that the UnixStreamSocketHandle has been closed.
this->uvHandle->data = nullptr;

if (this->role == UnixStreamSocketHandle::Role::CONSUMER)
{
// Don't read more.
err = uv_read_stop(reinterpret_cast<uv_stream_t*>(this->uvHandle));

if (err != 0)
{
MS_ABORT("uv_read_stop() failed: %s", uv_strerror(err));
}
}

// If there is no error and the peer didn't close its pipe side then close gracefully.
if (this->role == UnixStreamSocketHandle::Role::PRODUCER && !this->hasError && !this->isClosedByPeer)
{
// Use uv_shutdown() so pending data to be written will be sent to the peer before closing.
auto* req = new uv_shutdown_t;
req->data = static_cast<void*>(this);
err = uv_shutdown(
req, reinterpret_cast<uv_stream_t*>(this->uvHandle), static_cast<uv_shutdown_cb>(onShutdown));

if (err != 0)
{
MS_ABORT("uv_shutdown() failed: %s", uv_strerror(err));
}
}
// Otherwise directly close the socket.
else
{
uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClosePipe));
}
}

void UnixStreamSocketHandle::Write(const uint8_t* data, size_t len)
{
MS_TRACE_STD();
Expand Down Expand Up @@ -270,54 +320,6 @@ void UnixStreamSocketHandle::SetRecvBufferSize(uint32_t size)
}
}

void UnixStreamSocketHandle::InternalClose()
{
MS_TRACE_STD();

if (this->closed)
{
return;
}

int err;

this->closed = true;

// Tell the UV handle that the UnixStreamSocketHandle has been closed.
this->uvHandle->data = nullptr;

if (this->role == UnixStreamSocketHandle::Role::CONSUMER)
{
// Don't read more.
err = uv_read_stop(reinterpret_cast<uv_stream_t*>(this->uvHandle));

if (err != 0)
{
MS_ABORT("uv_read_stop() failed: %s", uv_strerror(err));
}
}

// If there is no error and the peer didn't close its pipe side then close gracefully.
if (this->role == UnixStreamSocketHandle::Role::PRODUCER && !this->hasError && !this->isClosedByPeer)
{
// Use uv_shutdown() so pending data to be written will be sent to the peer before closing.
auto* req = new uv_shutdown_t;
req->data = static_cast<void*>(this);
err = uv_shutdown(
req, reinterpret_cast<uv_stream_t*>(this->uvHandle), static_cast<uv_shutdown_cb>(onShutdown));

if (err != 0)
{
MS_ABORT("uv_shutdown() failed: %s", uv_strerror(err));
}
}
// Otherwise directly close the socket.
else
{
uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClosePipe));
}
}

inline void UnixStreamSocketHandle::OnUvReadAlloc(size_t /*suggestedSize*/, uv_buf_t* buf)
{
MS_TRACE_STD();
Expand Down Expand Up @@ -368,7 +370,7 @@ inline void UnixStreamSocketHandle::OnUvRead(ssize_t nread, const uv_buf_t* /*bu
this->isClosedByPeer = true;

// Close local side of the pipe.
InternalClose();
Close();

// Notify the subclass.
UserOnUnixStreamSocketClosed();
Expand All @@ -381,7 +383,7 @@ inline void UnixStreamSocketHandle::OnUvRead(ssize_t nread, const uv_buf_t* /*bu
this->hasError = true;

// Close the socket.
InternalClose();
Close();

// Notify the subclass.
UserOnUnixStreamSocketClosed();
Expand All @@ -399,7 +401,7 @@ inline void UnixStreamSocketHandle::OnUvWriteError(int error)

MS_ERROR_STD("write error, closing the pipe: %s", uv_strerror(error));

InternalClose();
Close();

// Notify the subclass.
UserOnUnixStreamSocketClosed();
Expand Down

0 comments on commit 3345f2f

Please sign in to comment.