-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathsqs_work_dispatcher.py
977 lines (874 loc) · 55.4 KB
/
sqs_work_dispatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
import logging
import inspect
import json
import os
import psutil as ps
import signal
import time
import multiprocessing as mp
from botocore.exceptions import ClientError, EndpointConnectionError, NoCredentialsError, NoRegionError
from dataactcore.aws.sqsHandler import sqs_queue
from dataactvalidator.validator_logging import log_job_message
from dataactvalidator.queue_exceptions import (
QueueWorkDispatcherError,
QueueWorkerProcessError,
ExecutionTimeout,
)
# Not a complete list of signals
# NOTE: 1-15 are relatively standard on unix platforms; > 15 can change from platform to platform
BSD_SIGNALS = {
1: "SIGHUP [1] (Hangup detected on controlling terminal or death of controlling process)",
2: "SIGINT [2] (Interrupt from keyboard)",
3: "SIGQUIT [3] (Quit from keyboard)",
4: "SIGILL [4] (Illegal Instruction)",
6: "SIGABRT [6] (Abort signal from abort(3))",
8: "SIGFPE [8] (Floating point exception)",
9: "SIGKILL [9] (Non-catchable, non-ignorable kill)",
11: "SIGSEGV [11] (Invalid memory reference)",
14: "SIGALRM [14] (Timer signal from alarm(2))",
15: "SIGTERM [15] (Software termination signal)",
19: "SIGSTOP [19] (Suspend process execution)", # NOTE: 17 on Mac OSX, 19 on RHEL
20: "SIGTSTP [20] (Interrupt from keyboard to suspend (CTRL-Z)", # NOTE: 18 on Mac OSX, 20 on RHEL
}
class SQSWorkDispatcher:
""" SQSWorkDispatcher object that is used to pull work from an SQS queue, and then dispatch it to be
executed on a child worker process.
This has the benefit of the worker process terminating after completion of the work, which will clear all
resources used during its execution, and return memory to the operating system. It also allows for periodically
monitoring the progress of the worker process from the parent process, and extending the SQS
VisibilityTimeout, so that the message is not prematurely terminated and/or moved to the Dead Letter Queue
Attributes:
EXIT_SIGNALS (List[int]): Delineates each of the signals we want to handle, which represent a case where
the work being performed is prematurely halting, and the process will exit
"""
_logger = logging.getLogger(__name__)
EXIT_SIGNALS = [signal.SIGHUP, signal.SIGABRT, signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
# NOTE: We are not handling signal.SIGSTOP or signal.SIGTSTP because those are suspensions.
# There may be valid cases to suspend and then later resume the process.
# It is even done here via multiprocessing.Process.suspend() to suspend during cleanup.
# So we do NOT want to handle that
def __init__(self, sqs_queue_instance, worker_process_name=None, default_visibility_timeout=60,
long_poll_seconds=None, monitor_sleep_time=5, exit_handling_timeout=30,
worker_can_start_child_processes=False):
"""
Args:
sqs_queue_instance (SQS.Queue): the SQS queue to get work from
worker_process_name (str): the name to give to the worker process. It will use the name of the callable
job to be executed if not provided
default_visibility_timeout (int): how long until the message is made visible in the queue again,
for other consumers. If it only allows 1 retry, this may end up directing it to the Dead Letter queue
when the next consumer attempts to receive it.
long_poll_seconds (int): if it should wait patiently for some time to receive a message when requesting.
Defaults to None, which means it should honor the value configured on the SQS queue
monitor_sleep_time (float): periodicity to check up on the status of the worker process
exit_handling_timeout (int): expected window of time during which cleanup should complete (not
guaranteed). This for example would be the time to finish cleanup before the messages is re-queued
or DLQ'd
worker_can_start_child_processes (bool): Controls the multiprocessing.Process daemon attribute. When
``daemon=True``, it would ensure that if the parent dispatcher dies, it will kill its child
worker. However, from the docs "a daemonic process is not allowed to create child processes",
because termination is not cascaded, so a terminated parent killing a direct child would leave
orphaned grand-children. So if the worker process needs to start its own child processes in order to
do its work, set this to ``True``, so that ``Process.daemon`` will be set to ``False``,
allowing the creation of grandchild processes from the dispatcher's child worker process.
"""
self.sqs_queue_instance = sqs_queue_instance
self.worker_process_name = worker_process_name
self._default_visibility_timeout = default_visibility_timeout
self._monitor_sleep_time = monitor_sleep_time
self._exit_handling_timeout = exit_handling_timeout
self._worker_can_start_child_processes = worker_can_start_child_processes
self._current_sqs_message = None
self._worker_process = None
self._job_args = ()
self._job_kwargs = {}
self._exit_handler = None
self._parent_dispatcher_pid = os.getppid()
self._sqs_heartbeat_log_period_seconds = 15 # log the heartbeat extension of visibility at most this often
self._long_poll_seconds = 0 # if nothing is set anywhere, it defaults to 0 (short-polling)
if long_poll_seconds:
self._long_poll_seconds = long_poll_seconds
else:
receive_message_wait_time_seconds = self.sqs_queue_instance.attributes.get("ReceiveMessageWaitTimeSeconds")
if receive_message_wait_time_seconds:
self._long_poll_seconds = int(receive_message_wait_time_seconds)
# Flag used to prevent handling an exit signal more than once, if multiple signals come in succession
# True when the parent dispatcher process is handling one of the signals in EXIT_SIGNALS, otherwise False
self._handling_exit_signal = False
# Flagged True when exit handling should lead to an exit of the parent dispatcher process
self._dispatcher_exiting = False
if self._monitor_sleep_time >= self._default_visibility_timeout:
msg = "_monitor_sleep_time must be less than _default_visibility_timeout. " \
"Otherwise job duplication can occur"
raise QueueWorkDispatcherError(
msg, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
)
# Map handler functions for each of the exit signals we want to handle on the parent dispatcher process
for sig in self.EXIT_SIGNALS:
signal.signal(sig, self._handle_exit_signal)
@property
def is_exiting(self):
""" bool: True when this parent dispatcher process has received a signal that will lead to the process
exiting
"""
return self._dispatcher_exiting
@property
def allow_retries(self):
""" bool: Determine from the provided queue if retries of messages should be performed
If False, the message will not be returned to the queue for retries by other consumers
"""
redrive_policy = self.sqs_queue_instance.attributes.get("RedrivePolicy")
if not redrive_policy:
return True # Messages in queues without a redrive policy basically have endless retries
redrive_json = json.loads(redrive_policy)
retries = redrive_json.get("maxReceiveCount")
return retries and retries > 1
@property
def message_try_attempt(self):
""" int: Give an approximation of which attempt this is to process this message
Will return None if the message has not been received yet or if the receive count is not being tracked
This can be used to determine if this is a retry of this message (if return value > 1)
"""
if not self._current_sqs_message:
return None
# get the non-user-defined (queue-defined) attributes on the message
q_msg_attr = self._current_sqs_message.attributes
if q_msg_attr.get("ApproximateReceiveCount") is None:
return None
else:
return int(q_msg_attr.get("ApproximateReceiveCount"))
def _dispatch(self, job, worker_process_name=None, exit_handler=None, *job_args, **job_kwargs):
""" Dispatch work to be performed in a newly started worker process.
Execute the callable job that will run in a separate child worker process. The worker process
will be monitored so that heartbeats can be sent to SQS to allow it to keep going.
Args:
job (Callable): Callable to use as the target of a new child process
worker_process_name (str): Name given to the newly created child process. If not already set,
defaults to the name of the provided job callable
exit_handler: a callable to be called when handling an :attr:`EXIT_SIGNALS` signal, giving the
opportunity to perform cleanup before the process exits. When invoked, it gets the same args passed
to it that were passed to the :param:`job` callable.
job_args: Zero or many variadic args that are unnamed (not keyword) that can be
passed to the callable job. NOTE: Passing args to the job this
way can be done when needed, but it is preferred to use keyword-args through :param:`job_kwargs`
to be more explicit what arg values are going to what params.
job_kwargs: Zero or many variadic keyword-args that can be passed to the callable job
Returns:
bool: True if a message was found on the queue and dispatched to completion, without error. Otherwise it
is an error condition and will raise an exception.
Raises:
AttributeError: If this is called before setting self._current_sqs_message
Others: Exceptions raised by :meth:`_monitor_work_progress`
"""
if self._current_sqs_message is None:
raise AttributeError("Cannot dispatch work when there is no current message in self._current_sqs_message")
self.worker_process_name = worker_process_name or self.worker_process_name or job.__name__
log_job_message(
logger=self._logger,
message="Creating and starting worker process named [{}] to invoke callable [{}] "
"with args={} and kwargs={}".format(self.worker_process_name, job, job_args, job_kwargs)
)
# Set the exit_handler function if provided, or reset to None
# Also save the job_args, and job_kwargs, which will be passed to the exit_handler
self._exit_handler = exit_handler
self._job_args = job_args
self._job_kwargs = job_kwargs
# Use the 'fork' method to create a new child process.
# This shares the same python interpreter and memory space and references as the parent process
# A side-effect of that is that it inherits the signal-handlers of the parent process. So wrap the job to be
# executed with some before-advice that resets the signal-handlers to python-defaults within the child process
ctx = mp.get_context("fork")
def signal_reset_wrapper(func, args, kwargs):
# Reset signal handling to defaults in process that calls this
for sig in self.EXIT_SIGNALS:
signal.signal(sig, signal.SIG_DFL)
# Then execute the given function with the given args
func(*args, **kwargs)
self._worker_process = ctx.Process(
name=self.worker_process_name,
target=signal_reset_wrapper,
args=(job, job_args, job_kwargs),
daemon=not self._worker_can_start_child_processes,
)
self._worker_process.start()
log_job_message(
logger=self._logger,
message="Worker process named [{}] started with process ID [{}]".format(
self.worker_process_name,
self._worker_process.pid
),
is_debug=True
)
self._monitor_work_progress()
return True
def dispatch(self, job, *additional_job_args, message_transformer=lambda msg: msg.body, worker_process_name=None,
exit_handler=None, **additional_job_kwargs):
""" Get work from the queue and dispatch it in a newly started worker process.
Poll the queue for a message to work on. Any additional (not named in the signature) args or additional
(not named in the signature) keyword-args will be combined with the args/kwargs provided by the
message_transformer. This complete set of arguments is provided to the callable :param:`job` that will be
executed in a separate worker process. The worker process will be monitored so that heartbeats can be
sent to the SQS to allow it to keep going.
Args:
job (Callable[]): the callable to execute as work for the job
additional_job_args: Zero or many variadic args that are unnamed (not keyword) that may be
passed along with those from the message to the callable job. NOTE: Passing args to the job this
way can be done when needed, but it is preferred to use keyword-args through
:param:`additional_job_kwargs` to be more explicit what arg values are going to what params.
message_transformer: A lambda to extract arguments to be passed to the callable from the queue message.
By default, a function that just returns the message body is used. A provided transformer can
return a single object as its arg, a tuple of args, or a dictionary of named args (which will
be used as keyword-args in the job invocation). This is a keyword-only argument that can only be
passed with its name: ``message_transformer=xxx``
worker_process_name: Name given to the newly created child process. If not already set, defaults to
the name of the provided job callable. This is a keyword-only argument that can only be
passed with its name: ``worker_process_name=abc``
exit_handler: a callable to be called when handling an :attr:`EXIT_SIGNALS` signal, giving the
opportunity to perform cleanup before the process exits. When invoked, it gets the same args passed
to it that were passed to the :param:`job` callable.
additional_job_kwargs: Zero or many variadic keyword-args that may be passed along with those from
the message to the callable job
Examples:
Given job function::
def my_job(a, b, c, d):
pass # do something
The preferred way to dispatch it is with a message_transformer that returns a dictionary and,
if necessary, additional args passed as keyword args::
dispatcher.dispatch(my_job,
message_transformer=lambda msg: {"a": msg.body, "b": db.get_org(msg.body)},
c=some_tracking_id,
d=datetime.datetime.now())
Returns:
bool: True if a message was found on the queue and dispatched, otherwise False if nothing on the queue
Raises:
SystemExit(1): If it can't connect to the queue or receive messages
QueueWorkerProcessError: Under various conditions where the child worker process fails to run the job
QueueWorkDispatcherError: Under various conditions where the parent dispatcher process can't
orchestrate work execution, monitoring, or exit-signal-handling
"""
self._dequeue_message(self._long_poll_seconds)
if self._current_sqs_message is None:
return False
job_args = ()
job_kwargs = additional_job_kwargs # important that this be set as default, in case msg_args is not also a dict
msg_args = message_transformer(self._current_sqs_message)
if isinstance(msg_args, list) or isinstance(msg_args, tuple):
job_args = tuple(msg_args) + additional_job_args
elif isinstance(msg_args, dict):
if msg_args.keys() & additional_job_kwargs.keys():
raise KeyError("message_transformer produces named keyword args that are duplicative of those in "
"additional_job_kwargs and would be overwritten")
job_kwargs = {**msg_args, **additional_job_kwargs}
else:
job_args = (msg_args,) + additional_job_args
return self._dispatch(job, worker_process_name, exit_handler, *job_args, **job_kwargs)
def dispatch_by_message_attribute(self, message_transformer, *additional_job_args, worker_process_name=None,
**additional_job_kwargs):
""" Use a provided function to derive the callable job and its arguments from attributes within the queue
message
Args:
message_transformer (Callable[[SQS.Message], dict]): A callable function that takes in the SQS
message as its argument and returns a dict of::
{
'_job': Callable, # Required. The job to run
'_exit_handler': Callable # Optional. A callable to be called when handling an
:attr:`EXIT_SIGNALS` signal, giving the opportunity to
perform cleanup before the process exits. Gets the ``job_args``
and any other items in this dict as args passed to it when run.
'_job_args': tuple, # Optional. Partial or full collection of job args as a list or
tuple.
'named_job_arg1': Any, # Optional. A named argument to be used as a keyword arg when
calling the job. ``named_job_arg1`` is representative, and the
actual names of the ``_job``'s params should be used here.
This is the preferred way to pass args to the ``_job``
'named_job_argN: Any # Optional: Same as above. As many as are needed.
}
worker_process_name (str): Name given to the newly created child process. If not already set, defaults
to the name of the provided job Callable
additional_job_args: Zero or many variadic args that are unnamed (not keyword) that may be
passed along with those from the message to the callable job. NOTE: Passing args to the job this
way can be done when needed, but it is preferred to use keyword-args through
:param:`additional_job_kwargs` to be more explicit what arg values are going to what params.
additional_job_kwargs: Zero or many variadic keyword-args that may be passed along with those from
the message to the callable job
Examples:
Given job function::
def my_job(a, b, c, d):
pass # do something
The preferred way to dispatch it is with a ``message_transformer`` that returns a dictionary and,
if necessary, additional args passed as keyword args::
def work_routing_message_transformer(msg):
job = job_strategy_factory.from(msg.message_attributes["origin"]["StringValue"])
return {"_job": job,
"a": msg.body,
"b": db.get_org(msg.body)}
dispatcher.dispatch_by_message_attribute(work_routing_message_transformer,
c=some_tracking_id,
d=datetime.datetime.now())
Returns:
bool: True if a message was found on the queue and dispatched, otherwise False if nothing on the queue
Raises:
SystemExit(1): If it can't connect to the queue or receive messages
QueueWorkerProcessError: Under various conditions where the child worker process fails to run the job
QueueWorkDispatcherError: Under various conditions where the parent dispatcher process can't
orchestrate work execution, monitoring, or exit-signal-handling
"""
self._dequeue_message(self._long_poll_seconds)
if self._current_sqs_message is None:
return False
job_args = ()
job_kwargs = {}
results = message_transformer(self._current_sqs_message)
def parse_message_transformer_results(_job, _exit_handler=None, _job_args=(), **_job_kwargs):
""" Use to map dictionary items to this functions params, and packs up remaining items in a separate
dictionary. Then returns the organized data as a 4-tuple
"""
return _job, _exit_handler, _job_args, _job_kwargs
# Parse the result components from the returned dictionary
job, exit_handler, msg_args, msg_kwargs = parse_message_transformer_results(**results)
if isinstance(msg_args, list) or isinstance(msg_args, tuple):
job_args = tuple(msg_args) + additional_job_args
else:
job_args = (msg_args,) + additional_job_args # single element
if msg_kwargs.keys() & additional_job_kwargs.keys():
raise KeyError("message_transformer produces named keyword args that are duplicative of those in "
"additional_job_kwargs and would be overwritten")
job_kwargs = {**msg_kwargs, **additional_job_kwargs}
return self._dispatch(job, worker_process_name, exit_handler, *job_args, **job_kwargs)
def _dequeue_message(self, wait_time):
""" Attempt to get a single message from the queue.
It will set this message in the `self._current_sqs_message` field if received, otherwise it leaves that None
Args:
wait_time: If no message is readily available, wait for this many seconds for one to arrive before
returning
"""
try:
# NOTE: Forcing MaxNumberOfMessages=1
# This will pull at most 1 message off the queue, or no messages. This dispatcher is built to dispatch
# one queue message at a time for work, such that one job at a time is handled by the child worker
# process. The best way to then scale up work-throughput is to have multiple consumers (e.g. multiple
# parent dispatcher processes on one machine, or multiple machines each running a parent dispatcher process)
# This may not be an ideal configuration for jobs that may expect to complete with sub-second performance,
# and handle a massive amount of message-throughput (e.g. 10+ messages/second, or 1M+ messages/day),
# where the added latency of connecting to the queue to fetch each message could add up.
received_messages = self.sqs_queue_instance.receive_messages(
WaitTimeSeconds=wait_time,
AttributeNames=["All"],
MessageAttributeNames=["All"],
VisibilityTimeout=self._default_visibility_timeout,
MaxNumberOfMessages=1,
)
except (EndpointConnectionError, ClientError, NoCredentialsError, NoRegionError) as conn_exc:
log_job_message(logger=self._logger, message="SQS connection issue. See Traceback and investigate settings",
is_exception=True)
raise SystemExit(1) from conn_exc
except Exception as exc:
log_job_message(logger=self._logger, message="Unknown error occurred when attempting to receive messages "
"from the queue. See Traceback",
is_exception=True)
raise SystemExit(1) from exc
if received_messages:
self._current_sqs_message = received_messages[0]
log_job_message(logger=self._logger, message="Message received: {}".format(self._current_sqs_message.body))
def delete_message_from_queue(self):
""" Deletes the message from SQS. This is usually treated as a *successful* culmination of message handling,
so long as it was not previously copied to the Dead Letter Queue before deletion
Raises:
QueueWorkDispatcherError: If for some reason the boto3 delete() operation caused an error
"""
if self._current_sqs_message is None:
log_job_message(
logger=self._logger,
message="Message to delete does not exist. "
"Message might have previously been moved, released, or deleted",
is_warning=True
)
return
try:
self._current_sqs_message.delete()
self._current_sqs_message = None
except Exception as exc:
message = "Unable to delete SQS message from queue. " \
"Message might have previously been deleted upon completion or failure"
log_job_message(
logger=self._logger, message=message, is_exception=True)
raise QueueWorkDispatcherError(
message, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
) from exc
def surrender_message_to_other_consumers(self, delay=0):
""" Return the message back into its original queue for other consumers to process it.
Does this by making it immediately visible (it was always there, just invisible).
See Also:
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout # noqa
Args:
delay (int): How long to wait in seconds before message becomes visible to other consumers. Default
is 0 seconds (immediately consumable)
"""
# Protect against the scenario where one of many consumers is in the midst of a long-poll when an exit signal
# is received. If they are in this position, and the message is returned to the queue, they will dequeue it
# (receive it) when they should not have, because they should have been responding to the signal.
# They will respond to the signal, but not until after they return from receiving the message, and the message
# will be lost.
# So, so only make the message visible after any long-poll has returned
if self._long_poll_seconds and self._long_poll_seconds > 0:
delay = self._long_poll_seconds + 1
if self._current_sqs_message:
retried = 0
if self._current_sqs_message.attributes.get('ApproximateReceiveCount') is not None:
retried = int(self._current_sqs_message.attributes.get('ApproximateReceiveCount'))
log_job_message(
logger=self._logger,
message="Returning message to its queue for retry number {} in {} seconds. "
"If this exceeds the configured number of retries in the queue, it "
"will be moved to its Dead Letter Queue".format(retried + 1, delay),
)
self._set_message_visibility(delay)
def move_message_to_dead_letter_queue(self):
""" Move the current message to the Dead Letter Queue.
Moves the message to the Dead Letter Queue associated with this worker's SQS queue via its RedrivePolicy.
Then delete the message from its originating queue.
Raises:
QueueWorkDispatcherError: Raise an error if there is no RedrivePolicy or a Dead Letter Queue is not
configured, specified, or connectible for this queue.
"""
if self._current_sqs_message is None:
log_job_message(
logger=self._logger,
message="Unable to move SQS message to the dead letter queue. No current message exists. "
"Message might have previously been moved, released, or deleted",
is_warning=True
)
return
dlq_name = "not discovered yet"
try:
redrive_policy = self.sqs_queue_instance.attributes.get("RedrivePolicy")
if not redrive_policy:
error = "Failed to move message to dead letter queue. Cannot get RedrivePolicy for SQS queue \"{}\". " \
"It was not set, or was not included as an attribute to be " \
"retrieved with this queue.".format(self.sqs_queue_instance)
log_job_message(logger=self._logger, message=error, is_error=True)
raise QueueWorkDispatcherError(
error, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message,
)
redrive_json = json.loads(redrive_policy)
dlq_arn = redrive_json.get("deadLetterTargetArn")
if not dlq_arn:
error = "Failed to move message to dead letter queue. " \
"Cannot find a dead letter queue in the RedrivePolicy " \
"for SQS queue \"{}\"".format(self.sqs_queue_instance)
log_job_message(logger=self._logger, message=error, is_error=True)
raise QueueWorkDispatcherError(
error, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message,
)
dlq_name = dlq_arn.split(':')[-1]
# Copy the message to the designated dead letter queue
dlq = sqs_queue(queue_name=dlq_name)
message_attr = self._current_sqs_message.message_attributes.copy() if \
self._current_sqs_message.message_attributes else {}
dlq_response = dlq.send_message(
MessageBody=self._current_sqs_message.body,
MessageAttributes=message_attr
)
except Exception as exc:
error = "Error occurred when parent dispatcher with PID [{}] tried to move the message " \
"for worker process with PID [{}] " \
"to the dead letter queue [{}].".format(os.getpid(), self._worker_process.pid, dlq_name)
log_job_message(logger=self._logger, message=error, is_exception=True)
raise QueueWorkDispatcherError(
error, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message,
) from exc
log_job_message(
logger=self._logger,
message="Message sent to dead letter queue \"{}\" "
"with [{}] response code. "
"Now deleting message from origin queue".format(dlq_name,
dlq_response['ResponseMetadata']['HTTPStatusCode']),
is_debug=True
)
self.delete_message_from_queue()
def _monitor_work_progress(self):
""" Tracks the running or exit status of the child worker process.
There are four scenarios it detects when monitoring:
1. It extends life of the process back at the queue if it detects the child worker process is still
running
2. It sees the child worker process exited with a 0 exit code (success). It deletes the message from
the queue and logs success.
3. It sees the child worker process exited with a > 0 exit code. Handles this and raises
:exc:`QueueWorkerProcessError` to alert the caller.
4. It sees the child worker process exited with a < 0 exit code, indicating it received some kind of
signal. It calls the signal-handling code to handle this accordingly.
Raises:
QueueWorkerProcessError: When the child worker process was found to have exited with a > 0 exit
code.
Others: Anything raised by :meth:`_set_message_visibility`, :meth:`delete_message_from_queue`,
or :meth:`_handle_exit_signal`
"""
monitor_process = True
heartbeats = 0
while monitor_process:
monitor_process = False
if self._worker_process.is_alive():
# Process still working. Send "heartbeat" to SQS so it may continue
if (heartbeats * self._monitor_sleep_time) >= self._sqs_heartbeat_log_period_seconds:
log_job_message(
logger=self._logger,
message="Job worker process with PID [{}] is still running. "
"Renewing VisibilityTimeout of {} seconds".format(self._worker_process.pid,
self._default_visibility_timeout),
is_debug=True
)
heartbeats = 0
else:
heartbeats += 1
self._set_message_visibility(self._default_visibility_timeout)
time.sleep(self._monitor_sleep_time)
monitor_process = True
elif self._worker_process.exitcode == 0:
# If process exits with 0: success! Remove from queue
log_job_message(
logger=self._logger,
message="Job worker process with PID [{}] completed with 0 for exit code (success). Deleting "
"message from the queue".format(self._worker_process.pid)
)
self.delete_message_from_queue()
elif self._worker_process.exitcode > 0:
# If process exits with positive code, an ERROR occurred within the worker process.
# Don't delete the message, don't force retry, and don't force into dead letter queue.
# Let the VisibilityTimeout expire, and the queue will handle whether to retry or put in
# the DLQ based on its queue configuration.
# Raise an exception to give control back to the caller to allow any error-handling to take place there
message = "Job worker process with PID [{}] errored with exit code: {}.".format(
self._worker_process.pid,
self._worker_process.exitcode
)
log_job_message(logger=self._logger, message=message, is_error=True)
raise QueueWorkerProcessError(
message, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message,
)
elif self._worker_process.exitcode < 0:
# If process exits with a negative code, process was terminated by a signal since
# a Python subprocess returns the negative value of the signal.
signum = self._worker_process.exitcode * -1
# In the rare case where the child worker process's exit signal is detected before its parent
# process received the same signal, proceed with handling the child's exit signal
self._handle_exit_signal(signum=signum, frame=None, parent_dispatcher_signaled=False)
def _handle_exit_signal(self, signum, frame, parent_dispatcher_signaled=True, is_retry=False):
""" Attempt to gracefully handle the exiting of the job as a result of receiving an exit signal.
NOTE: This handler is only expected to be run from the parent dispatcher process. It is an error
condition if it is invoked from the child worker process. Signals received in the child worker
process should not have this handler registered for those signals. Because signal handlers registered for a
parent process are inherited by forked child processes (see: "man 7 signal" docs and search for
"inherited"), this handler will initially be registered for each of this class's EXIT_SIGNALS; however,
those handlers are reset to their default as the worker process is started by a wrapper function around
the job to execute.
The signal very likely indicates a non-error failure scenario from which the job might be restarted or
rerun, if allowed. Handle cleanup, logging, job retry logic, etc.
NOTE: Order is important:
1. "Lock" (guard) this exit handler, to handle one exit signal only
2. Extend VisibilityTimeout by _exit_handling_timeout, to allow time for cleanup
3. Suspend child worker process, if alive, else skip (to not interfere with cleanup)
4. Execute pre-exit cleanup
5. Move the message (back to the queue, or to the dead letter queue)
6. Kill child worker process
7. Exit this parent dispatcher process if it received an exit signal
Args:
signum: number representing the signal received
frame: Frame passed in with the signal
parent_dispatcher_signaled: Assumed that this handler is being called as a result of signaling that
happened on the parent dispatcher process. If this was instead the parent detecting that the child
exited due to a signal, set this to False. Defaults to True.
is_retry: If this is the 2nd and last try to handled the signal and perform pre-exit cleanup
Raises:
QueueWorkerProcessError: When the _exit_handling function cannot be completed in time to perform
pre-exit cleanup
QueueWorkerProcessError: If only the child process died (not the parent dispatcher), but the message it
was working was moved to the Dead Letter Queue because the queue is not configured to allow retries,
raise an exception so the parent process might mark the job as failed with its normal error-handling
process
SystemExit: If the parent dispatcher process received an exit signal, this should be raised
after handling the signal, as part of running os.kill of that process using the original signal
number that was originally handled
"""
# If the process handling this signal is the same as the started worker process, this *is* the worker process
is_worker_process = self._worker_process and self._worker_process.pid == os.getpid()
# If the signal is in BSD_SIGNALS, use the human-readable string, otherwise use the signal value
signal_or_human = BSD_SIGNALS.get(signum, signum)
if is_worker_process:
log_job_message(
logger=self._logger,
message="Worker process with PID [{}] received signal [{}] while running. "
"It should not be handled in this exit signal handler, "
"as it is meant to run from the parent dispatcher process."
"Exiting worker process with original exit signal.".format(os.getpid(), signal_or_human),
is_warning=True
)
self._kill_worker(with_signal=signum)
elif not parent_dispatcher_signaled:
log_job_message(
logger=self._logger,
message="Job worker process with PID [{}] exited due to signal with exit code: [{}], "
"after receiving exit signal [{}]. "
"Gracefully handling exit of job".format(
self._worker_process.pid,
self._worker_process.exitcode,
signal_or_human
),
is_error=True
)
# Attempt to cleanup children, but likely will be a no-op because the worker has already exited and
# orphaned them
self._kill_worker(with_signal=signum, just_kill_descendants=True)
else: # dispatcher signaled
log_job_message(
logger=self._logger,
message="Parent dispatcher process with PID [{}] received signal [{}] while running. "
"Gracefully stopping job being worked".format(os.getpid(), signal_or_human),
is_error=True
)
# Make sure the parent dispatcher process exits after handling the signal by setting this flag
self._dispatcher_exiting = True
if self._handling_exit_signal and not is_retry:
# Don't allow the parent dispatcher process to handle more than one exit signal, unless this is a retry
# of the first attempt
return
self._handling_exit_signal = True
exit_handling_failed = False # flag to mark this kind of end-state of this handler
try:
# Extend message visibility for as long is given for the exit handling to process, so message does not get
# prematurely returned to the queue or moved to the dead letter queue.
# Give it a 5 second buffer in case retry mechanics put it over the timeout
self._set_message_visibility(self._exit_handling_timeout + 5)
if self._worker_process.is_alive():
worker = ps.Process(self._worker_process.pid)
if not is_retry:
# Suspend the child worker process so it does not conflict with doing cleanup in the exit_handler
log_job_message(
logger=self._logger,
message="Suspending worker process with PID [{}] during first try of exit_handler".format(
self._worker_process.pid),
is_debug=True
)
worker.suspend()
else:
# This is try 2. The cleanup was not able to complete on try 1 with the worker process merely
# suspended. There could be some kind of transaction deadlock. Kill the worker to clear the way
# for cleanup retry
log_job_message(
logger=self._logger,
message="Killing worker process with PID [{}] during second try of exit_handler".format(
self._worker_process.pid),
is_debug=True
)
self._kill_worker()
else:
try_attempt = "second" if is_retry else "first"
log_job_message(
logger=self._logger,
message="Worker process with PID [{}] is not alive during {} try of exit_handler".format(
self._worker_process.pid, try_attempt),
is_debug=True
)
if self._exit_handler is not None:
# Execute cleanup procedures to handle exiting of the worker process
# Wrap cleanup in a fixed timeout, and a retry (one time)
try:
with ExecutionTimeout(self._exit_handling_timeout):
# Call _exit_handler callable to do cleanup
queue_message_param = inspect.signature(self._exit_handler).parameters.get("queue_message")
arg_spec = inspect.getfullargspec(self._exit_handler)
if queue_message_param or arg_spec.varkw == "kwargs":
# it defines a param named "queue_message", or accepts kwargs,
# so pass along the message as "queue_message" in case it's needed
log_job_message(
logger=self._logger,
message="Invoking job exit_handler [{}] with args={}, kwargs={}, "
"and queue_message={}".format(self._exit_handler, self._job_args,
self._job_kwargs, self._current_sqs_message)
)
self._exit_handler(*self._job_args, **self._job_kwargs,
queue_message=self._current_sqs_message)
else:
log_job_message(
logger=self._logger,
message="Invoking job exit_handler [{}] with args={} "
"and kwargs={}".format(self._exit_handler, self._job_args, self._job_kwargs)
)
self._exit_handler(*self._job_args, **self._job_kwargs)
except TimeoutError:
exit_handling_failed = True
if not is_retry:
log_job_message(
logger=self._logger,
message="Could not perform cleanup during exiting of job in allotted "
"_exit_handling_timeout ({}s). "
"Retrying once.".format(self._exit_handling_timeout),
is_warning=True
)
# Attempt retry
self._handle_exit_signal(signum, frame, parent_dispatcher_signaled, is_retry=True)
else:
message = "Could not perform cleanup during exiting of job in allotted " \
"_exit_handling_timeout ({}s) after 2 tries. " \
"Raising exception.".format(self._exit_handling_timeout)
log_job_message(
logger=self._logger,
message=message,
is_error=True
)
raise QueueWorkDispatcherError(
message,
worker_process_name=self.worker_process_name,
queue_message=self._current_sqs_message
)
except Exception as exc:
exit_handling_failed = True
message = "Execution of exit_handler failed for unknown reason. See Traceback."
log_job_message(
logger=self._logger,
message=message,
is_exception=True
)
raise QueueWorkDispatcherError(
message, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
) from exc
if self.allow_retries:
log_job_message(
logger=self._logger,
message="Dispatcher for this job allows retries of the message. "
"Attempting to return message to its queue."
)
self.surrender_message_to_other_consumers()
else:
# Otherwise, attempt to send message directly to the dead letter queue
log_job_message(
logger=self._logger,
message="Dispatcher for this job does not allow message to be retried. "
"Attempting to move message to its Dead Letter Queue."
)
self.move_message_to_dead_letter_queue()
# Now that SQS message has been handled, ensure worker process is dead
if self._worker_process.is_alive():
log_job_message(
logger=self._logger,
message="Message handled. Now killing worker process with PID [{}] "
"as it is alive but has no more work to do.".format(self._worker_process.pid),
is_debug=True
)
self._kill_worker()
finally:
if exit_handling_failed:
# Don't perform final logic if we got here after failing to execute the exit handler
if is_retry:
# 2nd time we've failed to process exit_handle? - log so and move to DLQ
log_job_message(
logger=self._logger,
message="Graceful exit_handling failed after 2 attempts for job being worked by worker process "
"with PID [{}]. An attempt to put message directly on the Dead Letter Queue will be "
"made, because cleanup logic in the exit_handler could not be guaranteed "
"(retrying a message without prior cleanup "
"may compound problems).".format(self._worker_process.pid),
is_error=True
)
self.move_message_to_dead_letter_queue()
else:
# exit_handler not provided, or processed successfully
self._handling_exit_signal = False
if self._dispatcher_exiting:
# An exit signal was received by the parent dispatcher process.
# Continue with exiting the parent process, as per the original signal, after having handled it
log_job_message(
logger=self._logger,
message="Exiting from parent dispatcher process with PID [{}] "
"to culminate exit-handling of signal [{}]".format(os.getpid(), signal_or_human),
is_debug=True
)
# Simply doing sys.exit or raise SystemExit, even with a given negative exit code won't exit this
# process with the negative signal value, as is the Python custom.
# So override the assigned handler for the signal we're handing, resetting it back to default,
# and kill the process using that signal to get it to exit as it would if we never handled it
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
elif not self.allow_retries:
# If only the child process died, but the message it was working was moved to the Dead Letter Queue
# because the queue is not configured to allow retries, raise an exception so the caller
# might mark the job as failed with its normal error-handling process
raise QueueWorkerProcessError(
"Worker process with PID [{}] was not able to complete its job due to "
"interruption from exit signal [{}]. The queue message for that job "
"was moved to the Dead Letter Queue, where it can be reviewed for a "
"manual restart, or other remediation.".format(self._worker_process.pid, signal_or_human))
def _set_message_visibility(self, new_visibility):
""" Sends a request back to the SQS API via boto3 to refresh the time that the currently-in-flight message
will be *invisible* to other consumers of the queue, such that they cannot also process it.
Examples:
If :param:`new_visibility` is 60, then in 60 seconds from when SQS receives the request, so long as
the ``VisibilityTimeout`` is not adjusted again, the in-flight message will reappear on the queue. It
will remain invisible until that time.
Raises:
QueueWorkDispatcherError: If for some reason the boto3 change_visibility() operation caused an error
"""
if self._current_sqs_message is None:
log_job_message(
logger=self._logger,
message="No SQS message to change visibility of. Message might have previously been released, "
"deleted, or moved to dead letter queue.",
is_warning=True
)
return
try:
self._current_sqs_message.change_visibility(VisibilityTimeout=new_visibility)
except Exception as exc:
message = "Unable to set VisibilityTimeout. " \
"Message might have previously been deleted upon completion or failure"
log_job_message(logger=self._logger, message=message, is_exception=True)
raise QueueWorkDispatcherError(
message, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
) from exc
def _kill_worker(self, with_signal=None, just_kill_descendants=False):
"""
Cleanup (kill) a worker process and its spawned descendant processes
Args:
with_signal: Use this termination signal when killing. Otherwise, hard kill (-9)
just_kill_descendants: Set to True to leave the worker and only kill its descendants
"""
if not self._worker_process or not ps.pid_exists(self._worker_process.pid):
return
try:
worker = ps.Process(self._worker_process.pid)
except ps.NoSuchProcess:
return
if self._worker_can_start_child_processes:
# First kill any other processes the worker may have spawned
for spawn_of_worker in worker.children(recursive=True):
if with_signal:
log_job_message(
logger=self._logger,
message="Attempting to terminate child process with PID {} and name {} using signal {}"
.format(spawn_of_worker.pid, spawn_of_worker.name, with_signal),
is_warning=True,
)
try:
spawn_of_worker.send_signal(with_signal)
except ps.NoSuchProcess:
pass
else:
log_job_message(
logger=self._logger,
message="Attempting to terminate child process with PID {} and name {}"
.format(spawn_of_worker.pid, spawn_of_worker.name),
is_warning=True,
)
try:
spawn_of_worker.kill()
except ps.NoSuchProcess:
pass
if not just_kill_descendants:
if with_signal:
# Ensure the worker exits as would have occurred if not handling its signals
signal.signal(with_signal, signal.SIG_DFL)
worker.send_signal(with_signal)
else:
try:
worker.kill()
except ps.NoSuchProcess:
pass