Skip to content

Commit

Permalink
Pydantic converter sample (#44)
Browse files Browse the repository at this point in the history
Fixes #25
  • Loading branch information
cretz authored Jan 23, 2023
1 parent 1196d28 commit 636af75
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
with:
python-version: ${{ matrix.python }}
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install
- run: poetry install --with pydantic
- run: poe lint
- run: poe test -s -o log_cli_level=DEBUG
- run: poe test -s -o log_cli_level=DEBUG --workflow-environment time-skipping
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [sentry](sentry) - Report errors to Sentry.

## Test
Expand Down
55 changes: 54 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions pydantic_converter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Pydantic Converter Sample

This sample shows how to create a custom Pydantic converter to properly serialize Pydantic models.

For this sample, the optional `pydantic` dependency group must be included. To include, run:

poetry install --with pydantic

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflow:

poetry run python starter.py

In the worker terminal, the workflow and its activity will log that it received the Pydantic models. In the starter
terminal, the Pydantic models in the workflow result will be logged.

### Notes

This is the preferred way to use Pydantic models with Temporal Python SDK. The converter code is small and meant to
embed into other projects.

This sample also demonstrates use of `datetime` inside of Pydantic models. Due to a known issue with the Temporal
sandbox, this class is seen by Pydantic as `date` instead of `datetime` upon deserialization. This is due to a
[known Python issue](https://github.com/python/cpython/issues/89010) where, when we proxy the `datetime` class in the
sandbox to prevent non-deterministic calls like `now()`, `issubclass` fails for the proxy type causing Pydantic to think
it's a `date` instead. In `worker.py`, we have shown a workaround of disabling restrictions on `datetime` which solves
this issue but no longer protects against workflow developers making non-deterministic calls in that module.
Empty file added pydantic_converter/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions pydantic_converter/converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
from typing import Any, Optional

from pydantic.json import pydantic_encoder
from temporalio.api.common.v1 import Payload
from temporalio.converter import (
CompositePayloadConverter,
DataConverter,
DefaultPayloadConverter,
JSONPlainPayloadConverter,
)


class PydanticJSONPayloadConverter(JSONPlainPayloadConverter):
"""Pydantic JSON payload converter.
This extends the :py:class:`JSONPlainPayloadConverter` to override
:py:meth:`to_payload` using the Pydantic encoder.
"""

def to_payload(self, value: Any) -> Optional[Payload]:
"""Convert all values with Pydantic encoder or fail.
Like the base class, we fail if we cannot convert. This payload
converter is expected to be the last in the chain, so it can fail if
unable to convert.
"""
# We let JSON conversion errors be thrown to caller
return Payload(
metadata={"encoding": self.encoding.encode()},
data=json.dumps(
value, separators=(",", ":"), sort_keys=True, default=pydantic_encoder
).encode(),
)


class PydanticPayloadConverter(CompositePayloadConverter):
"""Payload converter that replaces Temporal JSON conversion with Pydantic
JSON conversion.
"""

def __init__(self) -> None:
super().__init__(
*(
c
if not isinstance(c, JSONPlainPayloadConverter)
else PydanticJSONPayloadConverter()
for c in DefaultPayloadConverter.default_encoding_payload_converters
)
)


pydantic_data_converter = DataConverter(
payload_converter_class=PydanticPayloadConverter
)
"""Data converter using Pydantic JSON conversion."""
39 changes: 39 additions & 0 deletions pydantic_converter/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import logging
from datetime import datetime
from ipaddress import IPv4Address

from temporalio.client import Client

from pydantic_converter.converter import pydantic_data_converter
from pydantic_converter.worker import MyPydanticModel, MyWorkflow


async def main():
logging.basicConfig(level=logging.INFO)
# Connect client using the Pydantic converter
client = await Client.connect(
"localhost:7233", data_converter=pydantic_data_converter
)

# Run workflow
result = await client.execute_workflow(
MyWorkflow.run,
[
MyPydanticModel(
some_ip=IPv4Address("127.0.0.1"),
some_date=datetime(2000, 1, 2, 3, 4, 5),
),
MyPydanticModel(
some_ip=IPv4Address("127.0.0.2"),
some_date=datetime(2001, 2, 3, 4, 5, 6),
),
],
id=f"pydantic_converter-workflow-id",
task_queue="pydantic_converter-task-queue",
)
logging.info("Got models from client: %s" % result)


if __name__ == "__main__":
asyncio.run(main())
98 changes: 98 additions & 0 deletions pydantic_converter/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio
import dataclasses
import logging
from datetime import datetime, timedelta
from ipaddress import IPv4Address
from typing import List

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner,
SandboxRestrictions,
)

# We always want to pass through external modules to the sandbox that we know
# are safe for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel

from pydantic_converter.converter import pydantic_data_converter


class MyPydanticModel(BaseModel):
some_ip: IPv4Address
some_date: datetime


@activity.defn
async def my_activity(models: List[MyPydanticModel]) -> List[MyPydanticModel]:
activity.logger.info("Got models in activity: %s" % models)
return models


@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, models: List[MyPydanticModel]) -> List[MyPydanticModel]:
workflow.logger.info("Got models in workflow: %s" % models)
return await workflow.execute_activity(
my_activity, models, start_to_close_timeout=timedelta(minutes=1)
)


# Due to known issues with Pydantic's use of issubclass and our inability to
# override the check in sandbox, Pydantic will think datetime is actually date
# in the sandbox. At the expense of protecting against datetime.now() use in
# workflows, we're going to remove datetime module restrictions. See sdk-python
# README's discussion of known sandbox issues for more details.
def new_sandbox_runner() -> SandboxedWorkflowRunner:
# TODO(cretz): Use with_child_unrestricted when https://github.com/temporalio/sdk-python/issues/254
# is fixed and released
invalid_module_member_children = dict(
SandboxRestrictions.invalid_module_members_default.children
)
del invalid_module_member_children["datetime"]
return SandboxedWorkflowRunner(
restrictions=dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=dataclasses.replace(
SandboxRestrictions.invalid_module_members_default,
children=invalid_module_member_children,
),
)
)


interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)
# Connect client using the Pydantic converter
client = await Client.connect(
"localhost:7233", data_converter=pydantic_data_converter
)

# Run a worker for the workflow
async with Worker(
client,
task_queue="pydantic_converter-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
workflow_runner=new_sandbox_runner(),
):
# Wait until interrupted
print("Worker started, ctrl+c to exit")
await interrupt_event.wait()
print("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ optional = true
temporalio = { version = "*", extras = ["opentelemetry"] }
opentelemetry-exporter-jaeger-thrift = "^1.13.0"

[tool.poetry.group.pydantic]
optional = true
dependencies = { pydantic = "^1.10.4" }

[tool.poetry.group.sentry]
optional = true
dependencies = { sentry-sdk = "^1.11.0" }

[tool.poe.tasks]
format = [{cmd = "black ."}, {cmd = "isort ."}]
lint = [{cmd = "black --check ."}, {cmd = "isort --check-only ."}, {ref = "lint-types" }]
lint-types = "mypy --check-untyped-defs ."
lint-types = "mypy --check-untyped-defs --namespace-packages ."
test = "pytest"

[build-system]
Expand Down
Empty file.
46 changes: 46 additions & 0 deletions tests/pydantic_converter/workflow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import uuid
from datetime import datetime
from ipaddress import IPv4Address

from temporalio.client import Client
from temporalio.worker import Worker

from pydantic_converter.converter import pydantic_data_converter
from pydantic_converter.worker import (
MyPydanticModel,
MyWorkflow,
my_activity,
new_sandbox_runner,
)


async def test_workflow_with_pydantic_model(client: Client):
# Replace data converter in client
new_config = client.config()
new_config["data_converter"] = pydantic_data_converter
client = Client(**new_config)
task_queue_name = str(uuid.uuid4())

orig_models = [
MyPydanticModel(
some_ip=IPv4Address("127.0.0.1"), some_date=datetime(2000, 1, 2, 3, 4, 5)
),
MyPydanticModel(
some_ip=IPv4Address("127.0.0.2"), some_date=datetime(2001, 2, 3, 4, 5, 6)
),
]

async with Worker(
client,
task_queue=task_queue_name,
workflows=[MyWorkflow],
activities=[my_activity],
workflow_runner=new_sandbox_runner(),
):
result = await client.execute_workflow(
MyWorkflow.run,
orig_models,
id=str(uuid.uuid4()),
task_queue=task_queue_name,
)
assert orig_models == result

0 comments on commit 636af75

Please sign in to comment.