Skip to content

Commit

Permalink
Fixes up the async example to use the tracker
Browse files Browse the repository at this point in the history
But only if the server is running
  • Loading branch information
elijahbenizzy committed Jun 26, 2024
1 parent 6549130 commit c330c85
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 11 deletions.
24 changes: 24 additions & 0 deletions examples/async/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ You should get the following result:
{"pipeline":{"computation1":false,"computation2":true}}
```

## Tracking

This has an additional endpoint that will use the async tracker if the [ui](https://hamilton.dagworks.io/en/latest/concepts/ui/)
is running on port 8241 -- see [fastapi_example.py](fastapi_example.py) for the code.
If it is not running it will proceed anyway without tracking.

You can run it with:

```bash
curl -X 'POST' \
'http://localhost:8000/execute' \
-H 'accept: application/json' \
-d '{}'
```

Recall, to get the server running, you'll have to run the following:

```bash
pip install sf-hamilton[ui]
hamilton ui
```

This assumes a project (1) exists -- if you want a different one you can go the the UI and create one and/or set it in the code.


## How it works

Expand Down
10 changes: 5 additions & 5 deletions examples/async/async_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ def bar(request_raw: dict) -> str:
return request_raw.get("bar", "baz")


async def computation1(foo: str, some_data: dict) -> bool:
await asyncio.sleep(1)
return False


async def some_data() -> dict:
async with aiohttp.ClientSession() as session:
async with session.get("http://httpbin.org/get") as resp:
return await resp.json()


async def computation1(foo: str, some_data: dict) -> bool:
await asyncio.sleep(1)
return False


async def computation2(bar: str) -> bool:
await asyncio.sleep(1)
return True
Expand Down
79 changes: 73 additions & 6 deletions examples/async/fastapi_example.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,89 @@
import logging
from contextlib import asynccontextmanager

import aiohttp
import async_module
import fastapi
from aiohttp import client_exceptions
from hamilton_sdk import adapters

from hamilton import base
from hamilton.experimental import h_async

app = fastapi.FastAPI()
logger = logging.getLogger(__name__)

# can instantiate a driver once for the life of the app:
dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
dr_with_tracking: h_async.AsyncDriver = None
dr_without_tracking: h_async.AsyncDriver = None


async def _tracking_server_running():
"""Quickly tells if the tracking server is up and running"""
async with aiohttp.ClientSession() as session:
try:
async with session.get("http://localhost:8241/api/v1/ping") as response:
if response.status == 200:
return True
else:
return False
except client_exceptions.ClientConnectionError:
return False


@asynccontextmanager
async def lifespan(app: fastapi.FastAPI):
"""Fast API lifespan context manager for setting up the driver and tracking adapters
This has to be done async as there are initializers
"""
global dr_with_tracking
global dr_without_tracking
builder = h_async.Builder().with_modules(async_module)
is_server_running = await _tracking_server_running()
dr_without_tracking = await builder.build()
dr_with_tracking = (
await builder.with_adapters(
adapters.AsyncHamiltonTracker(
project_id=1,
username="elijah",
dag_name="async_tracker",
)
).build()
if is_server_running
else None
)
if not is_server_running:
logger.warning(
"Tracking server is not running, skipping telemetry. To run the telemetry server, run hamilton ui. "
"Note you must have a project with ID 1 if it is running -- if not, you can change the project "
"ID in this file or create a new one from the UI. Then make sure to restart this server."
)
yield


app = fastapi.FastAPI(lifespan=lifespan)


@app.post("/execute")
async def call(request: fastapi.Request) -> dict:
"""Handler for pipeline call"""
async def call_without_tracker(request: fastapi.Request) -> dict:
"""Handler for pipeline call -- this does not track in the Hamilton UI"""
input_data = {"request": request}
# Can instantiate a driver within a request as well:
# dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
result = await dr_without_tracking.execute(["pipeline"], inputs=input_data)
# dr.visualize_execution(["pipeline"], "./pipeline.dot", {"format": "png"}, inputs=input_data)
return result


@app.post("/execute_tracker")
async def call_with_tracker(request: fastapi.Request) -> dict:
"""Handler for pipeline call -- this does track in the Hamilton UI."""
input_data = {"request": request}
# Can instantiate a driver within a request as well:
# dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
result = await dr.execute(["pipeline"], inputs=input_data)
if dr_with_tracking is None:
raise ValueError(
"Tracking driver not initialized -- you must have the tracking server running at app startup to use this endpoint."
)
result = await dr_with_tracking.execute(["pipeline"], inputs=input_data)
# dr.visualize_execution(["pipeline"], "./pipeline.dot", {"format": "png"}, inputs=input_data)
return result

Expand Down

0 comments on commit c330c85

Please sign in to comment.