forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinterceptor.py
85 lines (71 loc) · 3.76 KB
/
interceptor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from dataclasses import asdict, is_dataclass
from typing import Any, Optional, Type, Union
from temporalio import activity, workflow
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)
with workflow.unsafe.imports_passed_through():
from sentry_sdk import Hub, capture_exception, set_context, set_tag
def _set_common_workflow_tags(info: Union[workflow.Info, activity.Info]):
set_tag("temporal.workflow.type", info.workflow_type)
set_tag("temporal.workflow.id", info.workflow_id)
class _SentryActivityInboundInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
# https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues
with Hub(Hub.current):
set_tag("temporal.execution_type", "activity")
set_tag("module", input.fn.__module__ + "." + input.fn.__qualname__)
activity_info = activity.info()
_set_common_workflow_tags(activity_info)
set_tag("temporal.activity.id", activity_info.activity_id)
set_tag("temporal.activity.type", activity_info.activity_type)
set_tag("temporal.activity.task_queue", activity_info.task_queue)
set_tag("temporal.workflow.namespace", activity_info.workflow_namespace)
set_tag("temporal.workflow.run_id", activity_info.workflow_run_id)
try:
return await super().execute_activity(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
set_context("temporal.activity.input", asdict(input.args[0]))
set_context("temporal.activity.info", activity.info().__dict__)
capture_exception()
raise e
class _SentryWorkflowInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
# https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues
with Hub(Hub.current):
set_tag("temporal.execution_type", "workflow")
set_tag("module", input.run_fn.__module__ + "." + input.run_fn.__qualname__)
workflow_info = workflow.info()
_set_common_workflow_tags(workflow_info)
set_tag("temporal.workflow.task_queue", workflow_info.task_queue)
set_tag("temporal.workflow.namespace", workflow_info.namespace)
set_tag("temporal.workflow.run_id", workflow_info.run_id)
try:
return await super().execute_workflow(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
set_context("temporal.workflow.input", asdict(input.args[0]))
set_context("temporal.workflow.info", workflow.info().__dict__)
if not workflow.unsafe.is_replaying():
with workflow.unsafe.sandbox_unrestricted():
capture_exception()
raise e
class SentryInterceptor(Interceptor):
"""Temporal Interceptor class which will report workflow & activity exceptions to Sentry"""
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
"""Implementation of
:py:meth:`temporalio.worker.Interceptor.intercept_activity`.
"""
return _SentryActivityInboundInterceptor(super().intercept_activity(next))
def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return _SentryWorkflowInterceptor