-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gracefully stopping consumers? #517
Comments
The way we've handled this in fedora-messaging is to register signal handlers. I'd recommend that approach. Handle the signal by setting a "stop" flag, and if that flag is already set (e.g. the signal got sent twice) stop immediately. |
@jeremycline, ah that's clever because I can do that on my consumer directly (I think) without having to mess with fedmsg/moksha. Thanks, I'll look into that. |
I think I'm getting closer to this. I've added a signal handler like this in my consumer: class MyConsumer(fedmsg.consumers.FedmsgConsumer):
def __init__(self, hub):
...
signal.signal(signal.SIGTERM, self._on_sigterm)
def _on_sigterm(self, signum, frame):
log.info('Received signal %d', signum)
if signum != signal.SIGTERM:
log.warning('Signal is unexpected, doing nothing')
return
# Do not consume more messages from UMB
self.hub.unsubscribe(self._consume)
# Finish up what's already in queue
while not self.incoming.empty():
# TODO: Break if configured timeout expires
time.sleep(1)
# TODO: Ensure that there are no active queue items being processed - not sure what to check
# Disconnect from hub and do some cleanup (Yikes! this is ugly)
self.stop()
self.hub.stop()
from moksha.hub.reactor import reactor
reactor.stop() After calling The only missing piece is checking for running threads before killing hub/reactor. The reason being that I'd like to publish a message at the end of my consume method. If I'm killing hub/reactor, no messages will be published. I'll keep digging to see if there's a way. |
I have a consumer that takes about 20 minutes to process a received message. During this period the consumer should not be stopped. If I need to deploy a new version of the consumer, for example, things become challenging.
I'd like a way to signal fedmsg to stop consuming new messages; wait for consumers to finish processing current messages; then finally stop fedmsg. Ideally a configurable max wait timeout would control this.
Before I delve into the code base, does this approach make sense?
The text was updated successfully, but these errors were encountered: