-
Notifications
You must be signed in to change notification settings - Fork 284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor events publisher #1818
Conversation
25985fc
to
f1bf435
Compare
ae8df9c
to
a23f668
Compare
""" | ||
logger.info("Declaring exchange %s", self.exchange_name) | ||
if len(self.unsent_events): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(self.unsent_events): | |
if self.unsent_events: |
broker_connection = self.get_connection() | ||
sent_events = 0 | ||
logger.info("Sending %i not sent messages", len(self.unsent_events)) | ||
for unsent_message in list(self.unsent_events): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
list(self.unsent_events)
This is to prevent race conditions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was by mistake, I don't know why... but you touch a very interesting topic because two or more threads can call this function and duplicate events, I'll handle it.
if self.unsent_events: | ||
# Avoid race conditions | ||
unsent_events = self.unsent_events | ||
self.unsent_events = [] | ||
broker_connection = self.get_connection() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get the connection before unsent_events = self.unsent_events
, in case there's a problem getting the connection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close #1814
General description
The focus of this pull request is to improve the communication between safe-transaction-service and the general-
RabbitMQ
, used as events broker.The communication issues were:
RabbitMQ
from celery taskConnection issues with
RabbitMQ
After some research, we could see that the connection issues were shared socket resources between
gevent greenlets
fromcelery
andpika greenlet
, causing a loop of tries connecting and disconnecting.ConcurrentObjectUseError('This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0xffff9d30a9d0>>')
Some messages/events were lost
After create a test that send (delayed with celery) 10.000 of events with an incremental id and forcing to close the client connection from RabbitMQ management, I observed delayed event disconnection. During this delay several messages were sent without detecting the connection issue, more or less 500 events were lost.
This could be handled forcing ACK to each message from RabbitMQ, but I don't focus in this solution because the previous point.
Delayed events when we have high volume of events
Publish events from a task is slower than publish directly, the reason can be that to add a task that publish an event is as follows:
RabbitMQ
the task to publish the eventgeneral-rabbitmq
the event.vs:
general-rabbitMQ
Proposed solution on this PR
BlockingConnection
instead asynchronous pikaGeventConnection
RabbitMQ
BlockingConnection
cannot be shared between threads we must to create a pool of connections.Benchmark
To do this benchmark we use asynchronous connection with ACK and synchronous connection with ACK.
The time calculation is the difference of instant time of last message sent and acknowledge less time instant of first message.
Asynchronous 100K messages takes 22 seconds
Synchronous 100K messages takes 27 seconds
It is important to mention and in asynchronous connection ACK is handled in other thread that means publish this quantity of messages takes around 800ms vs 27 seconds of
BlockingConnection
.