From c330c852635c143638d437c7a1afc389d1d9a3bf Mon Sep 17 00:00:00 2001 From: elijahbenizzy Date: Mon, 24 Jun 2024 22:03:34 -0700 Subject: [PATCH] Fixes up the async example to use the tracker But only if the server is running --- examples/async/README.md | 24 ++++++++++ examples/async/async_module.py | 10 ++-- examples/async/fastapi_example.py | 79 ++++++++++++++++++++++++++++--- 3 files changed, 102 insertions(+), 11 deletions(-) diff --git a/examples/async/README.md b/examples/async/README.md index 140f906ce..8ef0fb490 100644 --- a/examples/async/README.md +++ b/examples/async/README.md @@ -29,6 +29,30 @@ You should get the following result: {"pipeline":{"computation1":false,"computation2":true}} ``` +## Tracking + +This has an additional endpoint that will use the async tracker if the [ui](https://hamilton.dagworks.io/en/latest/concepts/ui/) +is running on port 8241 -- see [fastapi_example.py](fastapi_example.py) for the code. +If it is not running it will proceed anyway without tracking. + +You can run it with: + +```bash +curl -X 'POST' \ + 'http://localhost:8000/execute' \ + -H 'accept: application/json' \ + -d '{}' +``` + +Recall, to get the server running, you'll have to run the following: + +```bash +pip install sf-hamilton[ui] +hamilton ui +``` + +This assumes a project (1) exists -- if you want a different one you can go the the UI and create one and/or set it in the code. + ## How it works diff --git a/examples/async/async_module.py b/examples/async/async_module.py index 33432b8c7..ad52c7a1b 100644 --- a/examples/async/async_module.py +++ b/examples/async/async_module.py @@ -24,17 +24,17 @@ def bar(request_raw: dict) -> str: return request_raw.get("bar", "baz") -async def computation1(foo: str, some_data: dict) -> bool: - await asyncio.sleep(1) - return False - - async def some_data() -> dict: async with aiohttp.ClientSession() as session: async with session.get("http://httpbin.org/get") as resp: return await resp.json() +async def computation1(foo: str, some_data: dict) -> bool: + await asyncio.sleep(1) + return False + + async def computation2(bar: str) -> bool: await asyncio.sleep(1) return True diff --git a/examples/async/fastapi_example.py b/examples/async/fastapi_example.py index 88c87ca07..bf73b0d0f 100644 --- a/examples/async/fastapi_example.py +++ b/examples/async/fastapi_example.py @@ -1,22 +1,89 @@ +import logging +from contextlib import asynccontextmanager + +import aiohttp import async_module import fastapi +from aiohttp import client_exceptions +from hamilton_sdk import adapters -from hamilton import base from hamilton.experimental import h_async -app = fastapi.FastAPI() +logger = logging.getLogger(__name__) # can instantiate a driver once for the life of the app: -dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult()) +dr_with_tracking: h_async.AsyncDriver = None +dr_without_tracking: h_async.AsyncDriver = None + + +async def _tracking_server_running(): + """Quickly tells if the tracking server is up and running""" + async with aiohttp.ClientSession() as session: + try: + async with session.get("http://localhost:8241/api/v1/ping") as response: + if response.status == 200: + return True + else: + return False + except client_exceptions.ClientConnectionError: + return False + + +@asynccontextmanager +async def lifespan(app: fastapi.FastAPI): + """Fast API lifespan context manager for setting up the driver and tracking adapters + This has to be done async as there are initializers + """ + global dr_with_tracking + global dr_without_tracking + builder = h_async.Builder().with_modules(async_module) + is_server_running = await _tracking_server_running() + dr_without_tracking = await builder.build() + dr_with_tracking = ( + await builder.with_adapters( + adapters.AsyncHamiltonTracker( + project_id=1, + username="elijah", + dag_name="async_tracker", + ) + ).build() + if is_server_running + else None + ) + if not is_server_running: + logger.warning( + "Tracking server is not running, skipping telemetry. To run the telemetry server, run hamilton ui. " + "Note you must have a project with ID 1 if it is running -- if not, you can change the project " + "ID in this file or create a new one from the UI. Then make sure to restart this server." + ) + yield + + +app = fastapi.FastAPI(lifespan=lifespan) @app.post("/execute") -async def call(request: fastapi.Request) -> dict: - """Handler for pipeline call""" +async def call_without_tracker(request: fastapi.Request) -> dict: + """Handler for pipeline call -- this does not track in the Hamilton UI""" + input_data = {"request": request} + # Can instantiate a driver within a request as well: + # dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult()) + result = await dr_without_tracking.execute(["pipeline"], inputs=input_data) + # dr.visualize_execution(["pipeline"], "./pipeline.dot", {"format": "png"}, inputs=input_data) + return result + + +@app.post("/execute_tracker") +async def call_with_tracker(request: fastapi.Request) -> dict: + """Handler for pipeline call -- this does track in the Hamilton UI.""" input_data = {"request": request} # Can instantiate a driver within a request as well: # dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult()) - result = await dr.execute(["pipeline"], inputs=input_data) + if dr_with_tracking is None: + raise ValueError( + "Tracking driver not initialized -- you must have the tracking server running at app startup to use this endpoint." + ) + result = await dr_with_tracking.execute(["pipeline"], inputs=input_data) # dr.visualize_execution(["pipeline"], "./pipeline.dot", {"format": "png"}, inputs=input_data) return result