Skip to content

Commit

Permalink
Try to prototype new decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed May 30, 2024
1 parent 377ec89 commit be50780
Showing 1 changed file with 43 additions and 29 deletions.
72 changes: 43 additions & 29 deletions update/job_runner_I1_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta
import inspect
import logging
from typing import Awaitable, Callable, Optional, Type

from temporalio import common, workflow, activity
from temporalio.client import Client, WorkflowHandle
from temporalio.worker import Worker
from temporalio.workflow import UpdateMethodMultiParam


JobID = str
Expand Down Expand Up @@ -42,6 +42,13 @@ class JobOutput:
UpdateID = str
Workflow = Type


@dataclass
class Update:
id: UpdateID
arg: I


_sdk_internals_pending_tasks_count = 0
_sdk_internals_handler_mutex = asyncio.Lock()

Expand All @@ -67,41 +74,44 @@ async def _sdk_internals__track_pending__wait_until_ready__serialize_execution(
_sdk_internals_pending_tasks_count -= 1


class SDKInternals:
# Here, the SDK is wrapping the user's update handlers with the required wait-until-ready,
# pending tasks tracking, and synchronization functionality. This is a fake implementation: the
# real implementation will automatically inspect and wrap the user's declared update handlers.
_original_workflow_update_decorator = workflow.update

def ready_to_execute(self, arg: I) -> bool:
# Implemented by user
return True

@workflow.update
async def run_shell_script_job(self, arg: I) -> O:
handler = getattr(self, "_" + inspect.currentframe().f_code.co_name)
async with _sdk_internals__track_pending__wait_until_ready__serialize_execution(
lambda: self.ready_to_execute(arg)
):
return await handler(arg)
def _new_workflow_update_decorator(
execute_condition: Callable[[Workflow, Update], bool], **kwargs
) -> Callable[
[Callable[[Workflow, I], Awaitable[O]]],
UpdateMethodMultiParam[[Workflow, I], O],
]:
def decorator(
handler: Callable[[Workflow, I], Awaitable[O]]
) -> UpdateMethodMultiParam:
async def wrapped_handler(self: Workflow, arg: I):
async with _sdk_internals__track_pending__wait_until_ready__serialize_execution(
lambda: execute_condition(self, Update(arg.id, arg))
):
return await handler(self, arg)

dec = (
_original_workflow_update_decorator(**kwargs)
if kwargs
else _original_workflow_update_decorator
)
return dec(wrapped_handler)

@workflow.update
async def run_python_job(self, arg: I) -> O:
handler = getattr(self, "_" + inspect.currentframe().f_code.co_name)
async with _sdk_internals__track_pending__wait_until_ready__serialize_execution(
lambda: self.ready_to_execute(arg)
):
return await handler(arg)
return decorator


# Monkey-patch proposed new public API
setattr(workflow, "all_handlers_completed", _sdk_internals_all_handlers_completed)
setattr(workflow, "update", _new_workflow_update_decorator)
##
## END SDK internals prototype
##


@workflow.defn
class JobRunner(SDKInternals):
class JobRunner:
"""
Jobs must be executed in order dictated by job dependency graph (see `job.depends_on`) and
not before `job.after_time`.
Expand All @@ -115,12 +125,13 @@ async def run(self):
await workflow.wait_condition(
lambda: (
workflow.info().is_continue_as_new_suggested()
and self.all_handlers_completed()
and workflow.all_handlers_completed()
)
)
workflow.continue_as_new()

def ready_to_execute(self, job: Job) -> bool:
def ready_to_execute(self, update: Update) -> bool:
job = update.arg
if not set(job.depends_on) <= self.completed_tasks:
return False
if after_time := job.after_time:
Expand All @@ -131,8 +142,11 @@ def ready_to_execute(self, job: Job) -> bool:
# These are the real handler functions. When we implement SDK support, these will use the
# @workflow.update decorator and will not use an underscore prefix.

# @workflow.update
async def _run_shell_script_job(self, job: Job) -> JobOutput:
# @workflow.update(
# execute_condition=lambda self, update: self.ready_to_execute(update)
# )
@_new_workflow_update_decorator(execute_condition=ready_to_execute)
async def run_shell_script_job(self, job: Job) -> JobOutput:
try:
if security_errors := await workflow.execute_activity(
run_shell_script_security_linter,
Expand All @@ -148,8 +162,8 @@ async def _run_shell_script_job(self, job: Job) -> JobOutput:
# FIXME: unbounded memory usage
self.completed_tasks.add(job.id)

# @workflow.update
async def _run_python_job(self, job: Job) -> JobOutput:
@_new_workflow_update_decorator(execute_condition=ready_to_execute)
async def run_python_job(self, job: Job) -> JobOutput:
try:
if not await workflow.execute_activity(
check_python_interpreter_version,
Expand Down

0 comments on commit be50780

Please sign in to comment.