Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async tracking #970

Merged
merged 6 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .ci/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ if [[ ${TASK} != "pre-commit" ]]; then
-r requirements-test.txt
fi

if [[ ${TASK} == "async" ]]; then
pip install \
-r plugin_tests/h_async/requirements-test.txt
fi

if [[ ${TASK} == "pyspark" ]]; then
if [[ ${OPERATING_SYSTEM} == "Linux" ]]; then
sudo apt-get install \
Expand Down
24 changes: 0 additions & 24 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,3 @@ workflows:
name: integrations-py312
python-version: '3.12'
task: integrations
- test:
requires:
- check_for_changes
name: asyncio-py39
python-version: '3.9'
task: async
- test:
requires:
- check_for_changes
name: asyncio-py310
python-version: '3.10'
task: async
- test:
requires:
- check_for_changes
name: asyncio-py311
python-version: '3.11'
task: async
- test:
requires:
- check_for_changes
name: asyncio-py312
python-version: '3.12'
task: async
4 changes: 2 additions & 2 deletions docs/code-comparisons/langchain_snippets/hamilton_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ async def joke_response(
import hamilton_async

from hamilton import base
from hamilton.experimental import h_async
from hamilton import async_driver

dr = h_async.AsyncDriver(
dr = async_driver.AsyncDriver(
{},
hamilton_async,
result_builder=base.DictResult()
Expand Down
13 changes: 11 additions & 2 deletions docs/reference/drivers/AsyncDriver.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
AsyncDriver
______________
___________
Use this driver in an async context. E.g. for use with FastAPI.

.. autoclass:: hamilton.experimental.h_async.AsyncDriver
.. autoclass:: hamilton.async_driver.AsyncDriver
:special-members: __init__
:members:

Async Builder
-------------

Builds a driver in an async context -- use ``await builder....build()``.

.. autoclass:: hamilton.async_driver.Builder
:special-members: __init__
:members:
2 changes: 1 addition & 1 deletion docs/reference/graph-adapters/AsyncGraphAdapter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ h_async.AsyncGraphAdapter
=========================


.. autoclass:: hamilton.experimental.h_async.AsyncGraphAdapter
.. autoclass:: hamilton.async_driver.AsyncGraphAdapter
:special-members: __init__
:members:
:inherited-members:
24 changes: 24 additions & 0 deletions examples/async/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ You should get the following result:
{"pipeline":{"computation1":false,"computation2":true}}
```

## Tracking

This has an additional endpoint that will use the async tracker if the [ui](https://hamilton.dagworks.io/en/latest/concepts/ui/)
is running on port 8241 -- see [fastapi_example.py](fastapi_example.py) for the code.
If it is not running it will proceed anyway without tracking.

You can run it with:

```bash
curl -X 'POST' \
'http://localhost:8000/execute' \
-H 'accept: application/json' \
-d '{}'
```

Recall, to get the server running, you'll have to run the following:

```bash
pip install sf-hamilton[ui]
hamilton ui
```

This assumes a project (1) exists -- if you want a different one you can go the the UI and create one and/or set it in the code.


## How it works

Expand Down
10 changes: 5 additions & 5 deletions examples/async/async_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ def bar(request_raw: dict) -> str:
return request_raw.get("bar", "baz")


async def computation1(foo: str, some_data: dict) -> bool:
await asyncio.sleep(1)
return False


async def some_data() -> dict:
async with aiohttp.ClientSession() as session:
async with session.get("http://httpbin.org/get") as resp:
return await resp.json()


async def computation1(foo: str, some_data: dict) -> bool:
await asyncio.sleep(1)
return False


async def computation2(bar: str) -> bool:
await asyncio.sleep(1)
return True
Expand Down
81 changes: 72 additions & 9 deletions examples/async/fastapi_example.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,85 @@
import logging
from contextlib import asynccontextmanager

import aiohttp
import async_module
import fastapi
from aiohttp import client_exceptions
from hamilton_sdk import adapters

from hamilton import base
from hamilton.experimental import h_async
from hamilton import async_driver

app = fastapi.FastAPI()
logger = logging.getLogger(__name__)

# can instantiate a driver once for the life of the app:
dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
dr_with_tracking: async_driver.AsyncDriver = None
dr_without_tracking: async_driver.AsyncDriver = None


async def _tracking_server_running():
"""Quickly tells if the tracking server is up and running"""
async with aiohttp.ClientSession() as session:
try:
async with session.get("http://localhost:8241/api/v1/ping") as response:
if response.status == 200:
return True
else:
return False
except client_exceptions.ClientConnectionError:
return False


@asynccontextmanager
async def lifespan(app: fastapi.FastAPI):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a second endpoint to compare latency/penalty for the tracker.

"""Fast API lifespan context manager for setting up the driver and tracking adapters
This has to be done async as there are initializers
"""
global dr_with_tracking
global dr_without_tracking
builder = async_driver.Builder().with_modules(async_module)
is_server_running = await _tracking_server_running()
dr_without_tracking = await builder.build()
dr_with_tracking = (
await builder.with_adapters(
adapters.AsyncHamiltonTracker(
project_id=1,
username="elijah",
dag_name="async_tracker",
)
).build()
if is_server_running
else None
)
if not is_server_running:
logger.warning(
"Tracking server is not running, skipping telemetry. To run the telemetry server, run hamilton ui. "
"Note you must have a project with ID 1 if it is running -- if not, you can change the project "
"ID in this file or create a new one from the UI. Then make sure to restart this server."
)
yield


app = fastapi.FastAPI(lifespan=lifespan)


@app.post("/execute")
async def call(request: fastapi.Request) -> dict:
"""Handler for pipeline call"""
async def call_without_tracker(request: fastapi.Request) -> dict:
"""Handler for pipeline call -- this does not track in the Hamilton UI"""
input_data = {"request": request}
result = await dr_without_tracking.execute(["pipeline"], inputs=input_data)
# dr.visualize_execution(["pipeline"], "./pipeline.dot", {"format": "png"}, inputs=input_data)
return result


@app.post("/execute_tracker")
async def call_with_tracker(request: fastapi.Request) -> dict:
"""Handler for pipeline call -- this does track in the Hamilton UI."""
input_data = {"request": request}
# Can instantiate a driver within a request as well:
# dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
result = await dr.execute(["pipeline"], inputs=input_data)
if dr_with_tracking is None:
raise ValueError(
"Tracking driver not initialized -- you must have the tracking server running at app startup to use this endpoint."
)
result = await dr_with_tracking.execute(["pipeline"], inputs=input_data)
# dr.visualize_execution(["pipeline"], "./pipeline.dot", {"format": "png"}, inputs=input_data)
return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import pandas as pd
import pydantic

from hamilton import base
from hamilton.experimental import h_async
from hamilton import async_driver, base

app = fastapi.FastAPI()

Expand Down Expand Up @@ -56,7 +55,7 @@ def fake_model_predict(df: pd.DataFrame) -> pd.Series:

# We instantiate an async driver once for the life of the app. We use the AsyncDriver here because under the hood
# FastAPI is async. If you were using Flask, you could use the regular Hamilton driver without issue.
dr = h_async.AsyncDriver(
dr = async_driver.AsyncDriver(
{}, # no config/invariant inputs in this example.
features, # the module that contains the common feature definitions.
result_builder=base.SimplePythonDataFrameGraphAdapter(),
Expand Down
Loading
Loading