Skip to content

Commit

Permalink
Async tracking
Browse files Browse the repository at this point in the history
Break into commits
1. Fixes up async hooks
2. Implements async tracking
3. Adds async builder
4. Adds async tracking documentation
  • Loading branch information
elijahbenizzy committed Jun 25, 2024
1 parent 82ae9ed commit 362f645
Show file tree
Hide file tree
Showing 10 changed files with 638 additions and 108 deletions.
6 changes: 6 additions & 0 deletions examples/async/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ You should get the following result:
{"pipeline":{"computation1":false,"computation2":true}}
```

## Tracking

This uses the async tracker if the [ui](https://hamilton.dagworks.io/en/latest/concepts/ui/)
is running on port 8241 -- see [fastapi_example.py](fastapi_example.py) for the code.
If it is not running it will proceed anyway without tracking.


## How it works

Expand Down
10 changes: 5 additions & 5 deletions examples/async/async_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ def bar(request_raw: dict) -> str:
return request_raw.get("bar", "baz")


async def computation1(foo: str, some_data: dict) -> bool:
await asyncio.sleep(1)
return False


async def some_data() -> dict:
async with aiohttp.ClientSession() as session:
async with session.get("http://httpbin.org/get") as resp:
return await resp.json()


async def computation1(foo: str, some_data: dict) -> bool:
await asyncio.sleep(1)
return False


async def computation2(bar: str) -> bool:
await asyncio.sleep(1)
return True
Expand Down
54 changes: 51 additions & 3 deletions examples/async/fastapi_example.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,61 @@
import logging
from contextlib import asynccontextmanager

import aiohttp
import async_module
import fastapi
import hamilton_sdk.adapters
from aiohttp import client_exceptions

from hamilton import base
from hamilton.experimental import h_async

app = fastapi.FastAPI()
logger = logging.getLogger(__name__)

# can instantiate a driver once for the life of the app:
dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
dr = None


async def _tracking_server_running():
"""Quickly tells if the tracking server is up and running"""
async with aiohttp.ClientSession() as session:
try:
async with session.get("http://localhost:8241/api/v1/ping") as response:
if response.status == 200:
return True
else:
return False
except client_exceptions.ClientConnectionError:
return False


@asynccontextmanager
async def lifespan(app: fastapi.FastAPI):
global dr
is_server_running = await _tracking_server_running()
if not is_server_running:
logger.warning(
"Tracking server is not running, skipping telemetry. To run the telemetry server, run hamilton ui. "
"Note you must have a project with ID 1 if it is running -- if not, you can change the project "
"ID in this file or create a new one from the UI"
)
adapters = []
if is_server_running:
tracker_async = hamilton_sdk.adapters.AsyncHamiltonTracker(
project_id=1,
username="elijah",
dag_name="async_tracker",
)
tracker_sync = hamilton_sdk.adapters.HamiltonTracker(
project_id=1,
username="elijah",
dag_name="sync_tracker_dont_use_this",
)
adapters = [tracker_async, tracker_sync]
dr = await h_async.Builder().with_modules(async_module).with_adapters(*adapters).build()
yield


app = fastapi.FastAPI(lifespan=lifespan)


@app.post("/execute")
Expand Down
1 change: 1 addition & 0 deletions hamilton/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ def __init__(
if _graph_executor is None:
_graph_executor = DefaultGraphExecutor(self.adapter)
self.graph_executor = _graph_executor
self.config = config
except Exception as e:
error = telemetry.sanitize_error(*sys.exc_info())
logger.error(SLACK_ERROR_MESSAGE)
Expand Down
Loading

0 comments on commit 362f645

Please sign in to comment.