diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index c88a3d0a..4c1634a2 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -358,23 +358,36 @@ def activate( activation_err: Optional[Exception] = None try: # Apply every job, running the loop afterward - is_query = False + no_queries = True for job in act.jobs: if job.HasField("initialize_workflow"): self._workflow_input = self._make_workflow_input( job.initialize_workflow ) elif job.HasField("query_workflow"): - is_query = True + no_queries = False # Let errors bubble out of these to the caller to fail the task self._apply(job) # Conditions are not checked on query activations. Query activations always come without # any other jobs. - self._run_once(check_conditions=not is_query) - # Ensure the main workflow function is called on first task, and called last. - if self._primary_task_initter is not None and self._primary_task is None: - self._primary_task_initter() + first_task = False + try: + self._run_once(check_conditions=no_queries) + finally: + # Ensure the main workflow function task is initialized after a first run of the + # event loop, which might execute before-start signals/updates. This is behind + # a finally because if those handlers fail, we need still need the main routine + # to be initialized in order to fail tasks properly. + if ( + self._primary_task_initter is not None + and self._primary_task is None + ): + self._primary_task_initter() + first_task = True + # Because we want any before-start signals/updates to fully process before running + # the main routine for the first time, we run the loop again if this is the first time. + if first_task: self._run_once(check_conditions=True) except Exception as err: # We want some errors during activation, like those that can happen diff --git a/tests/worker/test_replayer.py b/tests/worker/test_replayer.py index 9f6e6ac5..cece82db 100644 --- a/tests/worker/test_replayer.py +++ b/tests/worker/test_replayer.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from datetime import timedelta from pathlib import Path -from typing import Dict +from typing import Any, Dict, Optional, Type import pytest @@ -12,8 +12,16 @@ from temporalio.client import Client, WorkflowFailureError, WorkflowHistory from temporalio.exceptions import ApplicationError from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Replayer, Worker +from temporalio.worker import ( + ExecuteWorkflowInput, + Interceptor, + Replayer, + Worker, + WorkflowInboundInterceptor, + WorkflowInterceptorClassInput, +) from tests.helpers import assert_eq_eventually +from tests.worker.test_workflow import SignalsActivitiesTimersUpdatesTracingWorkflow @activity.defn @@ -385,3 +393,79 @@ async def test_replayer_command_reordering_backward_compatibility() -> None: await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow( WorkflowHistory.from_json("fake", history) ) + + +workflow_res = None + + +class WorkerWorkflowResultInterceptor(Interceptor): + def workflow_interceptor_class( + self, input: WorkflowInterceptorClassInput + ) -> Optional[Type[WorkflowInboundInterceptor]]: + return WorkflowResultInterceptor + + +class WorkflowResultInterceptor(WorkflowInboundInterceptor): + async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: + global workflow_res + res = await super().execute_workflow(input) + workflow_res = res + return res + + +async def test_replayer_async_ordering() -> None: + """ + This test verifies that the order that asyncio tasks/coroutines are woken up matches the + order they were before changes to apply all jobs and then run the event loop, where previously + the event loop was ran after each "batch" of jobs. + """ + histories_and_expecteds = [ + ( + "test_replayer_event_tracing.json", + [ + "sig-before-sync", + "sig-before-1", + "sig-before-2", + "timer-sync", + "act-sync", + "act-1", + "act-2", + "sig-1-sync", + "sig-1-1", + "sig-1-2", + "update-1-sync", + "update-1-1", + "update-1-2", + "timer-1", + "timer-2", + ], + ), + ( + "test_replayer_event_tracing_double_sig_at_start.json", + [ + "sig-before-sync", + "sig-before-1", + "sig-1-sync", + "sig-1-1", + "sig-before-2", + "sig-1-2", + "timer-sync", + "act-sync", + "update-1-sync", + "update-1-1", + "update-1-2", + "act-1", + "act-2", + "timer-1", + "timer-2", + ], + ), + ] + for history, expected in histories_and_expecteds: + with Path(__file__).with_name(history).open() as f: + history = f.read() + await Replayer( + workflows=[SignalsActivitiesTimersUpdatesTracingWorkflow], + interceptors=[WorkerWorkflowResultInterceptor()], + ).replay_workflow(WorkflowHistory.from_json("fake", history)) + assert workflow_res == expected diff --git a/tests/worker/test_replayer_event_tracing.json b/tests/worker/test_replayer_event_tracing.json new file mode 100644 index 00000000..c7059929 --- /dev/null +++ b/tests/worker/test_replayer_event_tracing.json @@ -0,0 +1,469 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-12-30T22:38:44.668149481Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1049178", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SignalsActivitiesTimersUpdatesTracingWorkflow" + }, + "taskQueue": { + "name": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "af6b802e-0485-4953-9625-c46e9b2243e6", + "identity": "19041@monolith", + "firstExecutionRunId": "af6b802e-0485-4953-9625-c46e9b2243e6", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "workflowId": "wf-13b9e507-6f00-42e7-b9f3-3c07ba101ff4" + } + }, + { + "eventId": "2", + "eventTime": "2024-12-30T22:38:44.668193778Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049179", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-12-30T22:38:44.670247658Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1049184", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImJlZm9yZSI=" + } + ] + }, + "identity": "19041@monolith" + } + }, + { + "eventId": "4", + "eventTime": "2024-12-30T22:38:44.773914284Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049186", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "19041@monolith", + "requestId": "550a3619-9085-434a-806e-a1f3f36f0d81", + "historySizeBytes": "432", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "5", + "eventTime": "2024-12-30T22:38:44.855333558Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049191", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "4", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": { + "coreUsedFlags": [ + 2, + 1, + 3 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "6", + "eventTime": "2024-12-30T22:38:44.855357649Z", + "eventType": "EVENT_TYPE_TIMER_STARTED", + "taskId": "1049192", + "timerStartedEventAttributes": { + "timerId": "1", + "startToFireTimeout": "0.100s", + "workflowTaskCompletedEventId": "5" + } + }, + { + "eventId": "7", + "eventTime": "2024-12-30T22:38:44.855373343Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED", + "taskId": "1049193", + "activityTaskScheduledEventAttributes": { + "activityId": "1", + "activityType": { + "name": "say_hello" + }, + "taskQueue": { + "name": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "header": {}, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkVuY2hpIg==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "5", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + }, + "useWorkflowBuildId": true + } + }, + { + "eventId": "8", + "eventTime": "2024-12-30T22:38:44.855391748Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED", + "taskId": "1049198", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "7", + "identity": "19041@monolith", + "requestId": "1cf0b17b-c708-4690-88e7-238facf002b1", + "attempt": 1, + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "9", + "eventTime": "2024-12-30T22:38:44.858633171Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED", + "taskId": "1049199", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvLCBFbmNoaSEi" + } + ] + }, + "scheduledEventId": "7", + "startedEventId": "8", + "identity": "19041@monolith" + } + }, + { + "eventId": "10", + "eventTime": "2024-12-30T22:38:44.858637777Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049200", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "19041@monolith-bf301b3177254f0e9f28bbea3fcd3bb5", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "11", + "eventTime": "2024-12-30T22:38:44.859901888Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049204", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "10", + "identity": "19041@monolith", + "requestId": "f3d5bdba-6830-43fa-a0ca-ee4c49435118", + "historySizeBytes": "1187", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "12", + "eventTime": "2024-12-30T22:38:44.862902108Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049208", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "10", + "startedEventId": "11", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "13", + "eventTime": "2024-12-30T22:38:44.977333487Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1049210", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + } + ] + }, + "identity": "19041@monolith" + } + }, + { + "eventId": "14", + "eventTime": "2024-12-30T22:38:44.977342033Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049211", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "19041@monolith-bf301b3177254f0e9f28bbea3fcd3bb5", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "15", + "eventTime": "2024-12-30T22:38:44.981296169Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049215", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "14", + "identity": "19041@monolith", + "requestId": "e0f19a52-c283-4898-a4c2-18b8dbd54e87", + "historySizeBytes": "1603", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "16", + "eventTime": "2024-12-30T22:38:44.984426814Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049219", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "14", + "startedEventId": "15", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "17", + "eventTime": "2024-12-30T22:38:44.984766655Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049222", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "19041@monolith-bf301b3177254f0e9f28bbea3fcd3bb5", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "18", + "eventTime": "2024-12-30T22:38:44.984768595Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049223", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "17", + "identity": "19041@monolith", + "requestId": "request-from-RespondWorkflowTaskCompleted", + "historySizeBytes": "1810", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "19", + "eventTime": "2024-12-30T22:38:44.987111164Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049224", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "17", + "startedEventId": "18", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "20", + "eventTime": "2024-12-30T22:38:44.987144596Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "taskId": "1049225", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "3899d0f3-2269-485e-bea0-d1a6f23bffc6", + "acceptedRequestMessageId": "3899d0f3-2269-485e-bea0-d1a6f23bffc6/request", + "acceptedRequestSequencingEventId": "17", + "acceptedRequest": { + "meta": { + "updateId": "3899d0f3-2269-485e-bea0-d1a6f23bffc6", + "identity": "19041@monolith" + }, + "input": { + "name": "doupdate", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + } + ] + } + } + } + } + }, + { + "eventId": "21", + "eventTime": "2024-12-30T22:38:44.987169359Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "taskId": "1049226", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "3899d0f3-2269-485e-bea0-d1a6f23bffc6", + "identity": "19041@monolith" + }, + "acceptedEventId": "20", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + } + } + ] + } + } + } + }, + { + "eventId": "22", + "eventTime": "2024-12-30T22:38:45.670816108Z", + "eventType": "EVENT_TYPE_TIMER_FIRED", + "taskId": "1049229", + "timerFiredEventAttributes": { + "timerId": "1", + "startedEventId": "6" + } + }, + { + "eventId": "23", + "eventTime": "2024-12-30T22:38:45.670833156Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049230", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "19041@monolith-bf301b3177254f0e9f28bbea3fcd3bb5", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "24", + "eventTime": "2024-12-30T22:38:45.675574932Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049234", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "23", + "identity": "19041@monolith", + "requestId": "9d2c2c0b-3f6f-4e67-9693-150e2eec6fc6", + "historySizeBytes": "2640", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "25", + "eventTime": "2024-12-30T22:38:45.690634379Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049238", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "23", + "startedEventId": "24", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "26", + "eventTime": "2024-12-30T22:38:45.690715818Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1049239", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "WyJzaWctYmVmb3JlLXN5bmMiLCJzaWctYmVmb3JlLTEiLCJ0aW1lci1zeW5jIiwiYWN0LXN5bmMiLCJzaWctYmVmb3JlLTIiLCJhY3QtMSIsImFjdC0yIiwic2lnLTEtc3luYyIsInNpZy0xLTEiLCJzaWctMS0yIiwidXBkYXRlLTEtc3luYyIsInVwZGF0ZS0xLTEiLCJ1cGRhdGUtMS0yIiwidGltZXItMSIsInRpbWVyLTIiXQ==" + } + ] + }, + "workflowTaskCompletedEventId": "25" + } + } + ] +} \ No newline at end of file diff --git a/tests/worker/test_replayer_event_tracing_double_sig_at_start.json b/tests/worker/test_replayer_event_tracing_double_sig_at_start.json new file mode 100644 index 00000000..77e6d7b1 --- /dev/null +++ b/tests/worker/test_replayer_event_tracing_double_sig_at_start.json @@ -0,0 +1,419 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-12-30T23:32:32.093973251Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1049984", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SignalsActivitiesTimersUpdatesTracingWorkflow" + }, + "taskQueue": { + "name": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "2de83810-23a0-462e-ac13-bb7396196fe7", + "identity": "45920@monolith", + "firstExecutionRunId": "2de83810-23a0-462e-ac13-bb7396196fe7", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "workflowId": "wf-22dd253f-47e3-4235-975b-1de4722a65ca" + } + }, + { + "eventId": "2", + "eventTime": "2024-12-30T23:32:32.094013742Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049985", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-12-30T23:32:32.095679382Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1049990", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImJlZm9yZSI=" + } + ] + }, + "identity": "45920@monolith" + } + }, + { + "eventId": "4", + "eventTime": "2024-12-30T23:32:32.196636794Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1049992", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + } + ] + }, + "identity": "45920@monolith" + } + }, + { + "eventId": "5", + "eventTime": "2024-12-30T23:32:32.198643629Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049994", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "45920@monolith", + "requestId": "830905ad-736a-49c4-bb60-36636b6bfcbd", + "historySizeBytes": "511", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "6", + "eventTime": "2024-12-30T23:32:32.280201513Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049999", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "5", + "identity": "45920@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": { + "coreUsedFlags": [ + 3, + 1, + 2 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "7", + "eventTime": "2024-12-30T23:32:32.280226107Z", + "eventType": "EVENT_TYPE_TIMER_STARTED", + "taskId": "1050000", + "timerStartedEventAttributes": { + "timerId": "1", + "startToFireTimeout": "0.100s", + "workflowTaskCompletedEventId": "6" + } + }, + { + "eventId": "8", + "eventTime": "2024-12-30T23:32:32.280241347Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED", + "taskId": "1050001", + "activityTaskScheduledEventAttributes": { + "activityId": "1", + "activityType": { + "name": "say_hello" + }, + "taskQueue": { + "name": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "header": {}, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkVuY2hpIg==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "6", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + }, + "useWorkflowBuildId": true + } + }, + { + "eventId": "9", + "eventTime": "2024-12-30T23:32:32.280932517Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1050007", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "45920@monolith-4969e79c76e14402a358a6c48afa41fa", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "10", + "eventTime": "2024-12-30T23:32:32.280935332Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1050008", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "9", + "requestId": "request-from-RespondWorkflowTaskCompleted", + "historySizeBytes": "926" + } + }, + { + "eventId": "11", + "eventTime": "2024-12-30T23:32:32.285406568Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1050011", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "9", + "startedEventId": "10", + "identity": "45920@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "12", + "eventTime": "2024-12-30T23:32:32.285444845Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "taskId": "1050012", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "e4b5bdec-c887-422f-b28d-ac217874bc2b", + "acceptedRequestMessageId": "e4b5bdec-c887-422f-b28d-ac217874bc2b/request", + "acceptedRequestSequencingEventId": "9", + "acceptedRequest": { + "meta": { + "updateId": "e4b5bdec-c887-422f-b28d-ac217874bc2b", + "identity": "45920@monolith" + }, + "input": { + "name": "doupdate", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + } + ] + } + } + } + } + }, + { + "eventId": "13", + "eventTime": "2024-12-30T23:32:32.285476357Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "taskId": "1050013", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "e4b5bdec-c887-422f-b28d-ac217874bc2b", + "identity": "45920@monolith" + }, + "acceptedEventId": "12", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + } + } + ] + } + } + } + }, + { + "eventId": "14", + "eventTime": "2024-12-30T23:32:32.280263855Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED", + "taskId": "1050014", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "8", + "identity": "45920@monolith", + "requestId": "3a230ac0-ff57-4016-b199-ce716a4eb331", + "attempt": 1, + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "15", + "eventTime": "2024-12-30T23:32:32.284262316Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED", + "taskId": "1050015", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvLCBFbmNoaSEi" + } + ] + }, + "scheduledEventId": "8", + "startedEventId": "14", + "identity": "45920@monolith" + } + }, + { + "eventId": "16", + "eventTime": "2024-12-30T23:32:32.285491511Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1050016", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "45920@monolith-4969e79c76e14402a358a6c48afa41fa", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "17", + "eventTime": "2024-12-30T23:32:32.285493889Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1050017", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "16", + "identity": "45920@monolith", + "requestId": "request-from-RespondWorkflowTaskCompleted", + "historySizeBytes": "1126", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "18", + "eventTime": "2024-12-30T23:32:32.288461384Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1050020", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "16", + "startedEventId": "17", + "identity": "45920@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "19", + "eventTime": "2024-12-30T23:32:33.096788296Z", + "eventType": "EVENT_TYPE_TIMER_FIRED", + "taskId": "1050022", + "timerFiredEventAttributes": { + "timerId": "1", + "startedEventId": "7" + } + }, + { + "eventId": "20", + "eventTime": "2024-12-30T23:32:33.096806096Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1050023", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "45920@monolith-4969e79c76e14402a358a6c48afa41fa", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "21", + "eventTime": "2024-12-30T23:32:33.101429716Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1050027", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "20", + "identity": "45920@monolith", + "requestId": "7ceeadc0-a095-4298-80ba-44b7b91d02d8", + "historySizeBytes": "2253", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "22", + "eventTime": "2024-12-30T23:32:33.117619050Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1050031", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "20", + "startedEventId": "21", + "identity": "45920@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "23", + "eventTime": "2024-12-30T23:32:33.117743945Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1050032", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "WyJzaWctYmVmb3JlLXN5bmMiLCJzaWctYmVmb3JlLTEiLCJzaWctMS1zeW5jIiwic2lnLTEtMSIsInNpZy1iZWZvcmUtMiIsInNpZy0xLTIiLCJ0aW1lci1zeW5jIiwiYWN0LXN5bmMiLCJ1cGRhdGUtMS1zeW5jIiwidXBkYXRlLTEtMSIsInVwZGF0ZS0xLTIiLCJhY3QtMSIsImFjdC0yIiwidGltZXItMSIsInRpbWVyLTIiXQ==" + } + ] + }, + "workflowTaskCompletedEventId": "22" + } + } + ] +} \ No newline at end of file diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 37f10278..a3b677a7 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -6444,10 +6444,10 @@ async def test_concurrent_sleeps_use_proper_options( @workflow.defn class SignalsActivitiesTimersUpdatesTracingWorkflow: def __init__(self) -> None: - self.events = [] + self.events: List[str] = [] @workflow.run - async def run(self) -> list[str]: + async def run(self) -> List[str]: tt = asyncio.create_task(self.run_timer()) at = asyncio.create_task(self.run_act()) await asyncio.gather(tt, at) @@ -6456,7 +6456,7 @@ async def run(self) -> list[str]: @workflow.signal async def dosig(self, name: str): self.events.append(f"sig-{name}-sync") - fut = asyncio.Future() + fut: asyncio.Future[bool] = asyncio.Future() fut.set_result(True) await fut self.events.append(f"sig-{name}-1") @@ -6466,7 +6466,7 @@ async def dosig(self, name: str): @workflow.update async def doupdate(self, name: str): self.events.append(f"update-{name}-sync") - fut = asyncio.Future() + fut: asyncio.Future[bool] = asyncio.Future() fut.set_result(True) await fut self.events.append(f"update-{name}-1") @@ -6476,7 +6476,7 @@ async def doupdate(self, name: str): async def run_timer(self): self.events.append("timer-sync") await workflow.sleep(0.1) - fut = asyncio.Future() + fut: asyncio.Future[bool] = asyncio.Future() fut.set_result(True) await fut self.events.append("timer-1") @@ -6488,7 +6488,7 @@ async def run_act(self): await workflow.execute_activity( say_hello, "Enchi", schedule_to_close_timeout=timedelta(seconds=5) ) - fut = asyncio.Future() + fut: asyncio.Future[bool] = asyncio.Future() fut.set_result(True) await fut self.events.append("act-1") @@ -6497,6 +6497,8 @@ async def run_act(self): async def test_async_loop_ordering(client: Client): + """This test mostly exists to generate histories for test_replayer_async_ordering. + See that test for more.""" task_queue = f"tq-{uuid.uuid4()}" handle = await client.start_workflow( SignalsActivitiesTimersUpdatesTracingWorkflow.run, @@ -6516,21 +6518,4 @@ async def test_async_loop_ordering(client: Client): await handle.execute_update( SignalsActivitiesTimersUpdatesTracingWorkflow.doupdate, "1" ) - expected_old = [ - "sig-before-sync", - "sig-before-1", - "sig-before-2", - "timer-sync", - "act-sync", - "act-1", - "act-2", - "sig-1-sync", - "sig-1-1", - "sig-1-2", - "update-1-sync", - "update-1-1", - "update-1-2", - "timer-1", - "timer-2" - ] - assert await handle.result() == expected_old + await handle.result()