diff --git a/examples/LLM_Workflows/observability_openllmetry/README.md b/examples/LLM_Workflows/observability_openllmetry/README.md new file mode 100644 index 000000000..bf1389ee5 --- /dev/null +++ b/examples/LLM_Workflows/observability_openllmetry/README.md @@ -0,0 +1,41 @@ +# Monitor Hamilton with OpenTelemetry, OpenLLMetry and Traceloop + +In this simple example, you'll learn how to use the `OpenTelemetryTracer` to emit traces of your Hamilton code using the OpenTelemetry format, in particular LLM applications. + +![Traceloop screenshot](image.png) + +[OpenTelemetry](https://opentelemetry.io/) is an open-source cross-language tool that allows to instrument, generate, collect, and export telemetry data (metrics, logs, traces), and constitute an industry-recognized standard. Learn more about it in this [Awesome OpenTelemetry repository](https://github.com/magsther/awesome-opentelemetry) + +[OpenLLMetry](https://github.com/traceloop/openllmetry) is an open-source Python library that automatically instruments with OpenTelemetry components of your LLM stack including LLM providers (OpenAI, Anthropic, HuggingFace, Cohere, etc.), vector databases (Weaviate, Qdrant, Chroma, etc.), and frameworks ([Burr](https://github.com/dagworks-inc/burr), Haystack, LangChain, LlamaIndex). In concrete terms, it means you automatically get detailed traces of API calls, retrieval operations, or text transformations for example. + +One thing to note, OpenTelemetry is a middleware; it doesn't provide a destination to store data nor a dashboard. For this example, we'll use the tool [Traceloop](https://www.traceloop.com/), which is built by the developers of OpenLLMetry. It has a generous free-tier and can be conveniently set up in a few lines of code for this demo. + +## Set up +Having access to a [Traceloop account](https://www.traceloop.com/) and an API key is a pre-requisite. + +1. Create a virtual environment and activate it + ```bash + python -m venv venv && . venv/bin/active + ``` + +2. Install requirements. + ```bash + pip install -r requirements.txt + ``` + +3. Set environment variables for your API keys `OPENAI_API_KEY` and `TRACELOOP_API_KEY` + +4. Execute the code + ```bash + python run.py + ``` + +5. Explore results on Traceloop (or your OpenTelemetry destination). + +### Without Traceloop + +For this example to work without Traceloop, you will need to set up your own [OpenTelemetry destination](https://opentelemetry.io/ecosystem/vendors/). We suggest using [Jaeger](https://www.jaegertracing.io/docs/1.47/getting-started/) and included Python code to route telemetry to it in `run.py`. + +## Should I still use the Hamilton UI? + +Absolutely! OpenTelemetry focsues on collecting telemetry about the internals of code and external API calls. It's a standard amongst web services. There's no conflict between the OpenTelemetry tracer and the tracking for the Hamilton UI. In fact, the Hamilton UI captures a superset of what OpenTelemetry allows, tailored to the Hamilton framework such as: visualizations, data lineage, summary statistics, and more utilities to improve your development experience. In the not too distant future, the Hamilton UI could ingest OpenTelemetry data 😉 (contributions welcomed!) diff --git a/examples/LLM_Workflows/observability_openllmetry/notebook.ipynb b/examples/LLM_Workflows/observability_openllmetry/notebook.ipynb new file mode 100644 index 000000000..abae8a2c3 --- /dev/null +++ b/examples/LLM_Workflows/observability_openllmetry/notebook.ipynb @@ -0,0 +1,318 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Monitor Hamilton with OpenTelemetry, OpenLLMetry and Traceloop\n", + "\n", + "\n", + "It will showcase how the [OpenTelemetry](https://opentelemetry.io/) plugin for Hamilton can trace node and using [OpenLLMetry](https://github.com/traceloop/openllmetry) allows to automatically trace popular LLM components. This example uses [Traceloop](https://www.traceloop.com/) as a destination for telemetry, but other [open-source options are available](https://opentelemetry.io/ecosystem/vendors/) (Jaeger, Elastic, Clickhouse, Grafana, etc.)" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from hamilton import driver\n", + "from hamilton.plugins import h_opentelemetry\n", + "from traceloop.sdk import Traceloop\n", + "\n", + "%load_ext hamilton.plugins.jupyter_magic" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define a dataflow\n", + "The next cell creates a Python module named `llm_dataflow` and defines a dataflow that creates an OpenAI client and queries the chat completions endpoint to center an HTML `
` tag.\n", + "\n", + "You'll notice that nothing special is added to enable tracing" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "%3\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "universal_truth\n", + "\n", + "universal_truth\n", + "str\n", + "\n", + "\n", + "\n", + "llm_client\n", + "\n", + "llm_client\n", + "OpenAI\n", + "\n", + "\n", + "\n", + "llm_client->universal_truth\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%%cell_to_module llm_dataflow -d\n", + "import openai\n", + "\n", + "def llm_client() -> openai.OpenAI:\n", + " return openai.OpenAI()\n", + "\n", + "def universal_truth(llm_client: openai.OpenAI) -> str:\n", + " response = llm_client.chat.completions.create(\n", + " model=\"gpt-4o-mini\",\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": \"You are a benevolent all-knowning being\"},\n", + " {\"role\": \"user\", \"content\": \"Please center my HTML
tag\"},\n", + " ],\n", + " )\n", + " return str(response.choices[0].message.content)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build the dataflow\n", + "The Hamilton `Builder` object takes the previously defined dataflow via `.with_modules(llm_dataflow)`. Then, we pass the `OpenTelemetryTracer` object via `.with_adapters()` to trace dataflow execution. Make sure to call `Traceloop.init()` before creating the `OpenTelemetryTracer`.\n", + "\n", + "You'll need to set your `OPENAI_API_KEY` and your `TRACELOOP_API_KEY` to run the cell." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "os.environ[\"OPENAI_API_KEY\"] = ...\n", + "os.environ[\"TRACELOOP_API_KEY\"] = ..." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[32mTraceloop syncing configuration and prompts\u001b[39m\n", + "\u001b[32mTraceloop exporting traces to https://api.traceloop.com authenticating with bearer token\n", + "\u001b[39m\n" + ] + } + ], + "source": [ + "Traceloop.init()\n", + "\n", + "dr = (\n", + " driver.Builder()\n", + " .with_modules(llm_dataflow)\n", + " .with_adapters(h_opentelemetry.OpenTelemetryTracer())\n", + " .build()\n", + ")\n", + "\n", + "# If you wanted to use another OpenTelemetry destination such as the\n", + "# open-source Jaeger, setup the container locally and use the following code\n", + "\n", + "# from opentelemetry import trace\n", + "# from opentelemetry.sdk.trace import TracerProvider\n", + "# from opentelemetry.sdk.trace.export import SimpleSpanProcessor\n", + "# from opentelemetry.exporter.jaeger import JaegerExporter\n", + "\n", + "# jaeger_exporter = JaegerExporter(agent_host_name='localhost', agent_port=5775)\n", + "# span_processor = SimpleSpanProcessor(jaeger_exporter)\n", + "# provider = TracerProvider(active_span_processor=span_processor)\n", + "# trace.set_tracer_provider(provider)\n", + "\n", + "results = dr.execute([\"universal_truth\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "To center an HTML `
` element, you can use CSS. There are several methods to achieve this, depending on whether you want to center it horizontally, vertically, or both. Here are a few common methods:\n", + "\n", + "### Centering Horizontally\n", + "\n", + "One of the simplest ways to center a `
` horizontally is to set its width, and then use `margin: auto;`. Here's an example:\n", + "\n", + "```html\n", + "\n", + "\n", + "\n", + " \n", + " \n", + " Center Div\n", + " \n", + "\n", + "\n", + "
\n", + " This div is centered horizontally!\n", + "
\n", + "\n", + "\n", + "```\n", + "\n", + "### Centering Vertically and Horizontally\n", + "\n", + "To center a `
` both vertically and horizontally, you can use Flexbox or CSS Grid. Here's an example using Flexbox:\n", + "\n", + "```html\n", + "\n", + "\n", + "\n", + " \n", + " \n", + " Center Div\n", + " \n", + "\n", + "\n", + "
\n", + " This div is centered both vertically and horizontally!\n", + "
\n", + "\n", + "\n", + "```\n", + "\n", + "### Centering with CSS Grid\n", + "\n", + "Here's another way to center using CSS Grid:\n", + "\n", + "```html\n", + "\n", + "\n", + "\n", + " \n", + " \n", + " Center Div\n", + " \n", + "\n", + "\n", + "
\n", + " This div is centered using CSS Grid!\n", + "
\n", + "\n", + "\n", + "```\n", + "\n", + "Choose the method that best fits your layout needs!\n" + ] + } + ], + "source": [ + "print(results[\"universal_truth\"])" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.1" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/LLM_Workflows/observability_openllmetry/requirements.txt b/examples/LLM_Workflows/observability_openllmetry/requirements.txt new file mode 100644 index 000000000..d83512fa7 --- /dev/null +++ b/examples/LLM_Workflows/observability_openllmetry/requirements.txt @@ -0,0 +1,4 @@ +openai +opentelemetry-instrumentation-openai +sf-hamilton +traceloop-sdk diff --git a/examples/LLM_Workflows/observability_openllmetry/run.py b/examples/LLM_Workflows/observability_openllmetry/run.py new file mode 100644 index 000000000..51e5e284e --- /dev/null +++ b/examples/LLM_Workflows/observability_openllmetry/run.py @@ -0,0 +1,51 @@ +import openai + + +def llm_client() -> openai.OpenAI: + return openai.OpenAI() + + +def universal_truth(llm_client: openai.OpenAI) -> str: + response = llm_client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": "You are a benevolent all-knowning being"}, + {"role": "user", "content": "Please center my HTML
tag"}, + ], + ) + return str(response.choices[0].message.content) + + +if __name__ == "__main__": + import __main__ # noqa: I001 + from hamilton import driver + from hamilton.plugins import h_opentelemetry + + # We're using Traceloop because it can be conveniently set up in 2 lines of code + # import the `Traceloop` object and initialize it + from traceloop.sdk import Traceloop + + Traceloop.init() + + # If you wanted to use another OpenTelemetry destination such as the open-source Jaeger, + # setup the container locally and use the following code + + # from opentelemetry import trace + # from opentelemetry.sdk.trace import TracerProvider + # from opentelemetry.sdk.trace.export import SimpleSpanProcessor + # from opentelemetry.exporter.jaeger import JaegerExporter + + # jaeger_exporter = JaegerExporter(agent_host_name='localhost', agent_port=5775) + # span_processor = SimpleSpanProcessor(jaeger_exporter) + # provider = TracerProvider(active_span_processor=span_processor) + # trace.set_tracer_provider(provider) + + dr = ( + driver.Builder() + .with_modules(__main__) + .with_adapters(h_opentelemetry.OpenTelemetryTracer()) + .build() + ) + + results = dr.execute(["universal_truth"]) + print(results["universal_truth"]) diff --git a/examples/LLM_Workflows/observability_openllmetry/screenshot.png b/examples/LLM_Workflows/observability_openllmetry/screenshot.png new file mode 100644 index 000000000..7ccd91894 Binary files /dev/null and b/examples/LLM_Workflows/observability_openllmetry/screenshot.png differ diff --git a/examples/opentelemetry/README.md b/examples/opentelemetry/README.md new file mode 100644 index 000000000..9979c6d6e --- /dev/null +++ b/examples/opentelemetry/README.md @@ -0,0 +1,6 @@ +# Hamilton + OpenTelemetry + +You can use the `hamilton.plugins.OpenTelemetryTracer()` to send telemetry +about your Hamilton dataflow. + +See [the example with LLMs and OpenLLMetry](https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/LLM_Workflows/observability/openllmetry) for more information. diff --git a/hamilton/plugins/h_opentelemetry.py b/hamilton/plugins/h_opentelemetry.py new file mode 100644 index 000000000..51f73feb9 --- /dev/null +++ b/hamilton/plugins/h_opentelemetry.py @@ -0,0 +1,149 @@ +import json +import logging +from contextvars import ContextVar +from typing import Any, Collection, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + + +try: + from opentelemetry import context, trace + from opentelemetry.sdk.trace import Span +except ImportError as e: + raise ImportError( + "Failed to import `opentelemetry` " + "Use `pip install sf-hamilton[opentelemetry]` to install " + "dependencies for the `h_opentelemetry` plugin." + ) from e + +from hamilton.graph_types import HamiltonGraph, HamiltonNode +from hamilton.lifecycle import GraphExecutionHook, NodeExecutionHook, TaskExecutionHook + +# We have to keep track of tokens for the span +# As OpenTel has some weird behavior around context managers, we have to account for the latest ones we started +# This way we can pop one off and know where to set the current one (as the parent, when the next one ends) +token_stack = ContextVar[Optional[List[Tuple[object, Span]]]]("token_stack", default=None) + + +def _exit_span(exc: Optional[Exception] = None): + """Ditto with _enter_span, but for exiting the span. Pops the token off the stack and detaches the context.""" + stack = token_stack.get()[:] + token, span = stack.pop() + token_stack.set(stack) + context.detach(token) + if exc: + span.set_status(trace.Status(trace.StatusCode.ERROR, str(exc))) + else: + span.set_status(trace.Status(trace.StatusCode.OK)) + span.end() + return span + + +def _enter_span(name: str, tracer: trace.Tracer): + """Utility function to enter a span. Starts, sets the current context, and adds it to the token stack. + + See this for some background on why start_span doesn't really work. We could use start_as_current_span, + but this is a bit more explicit. + """ + span = tracer.start_span( + name=name, + record_exception=False, # we'll handle this ourselves + set_status_on_exception=False, + ) + ctx = trace.set_span_in_context(span) + token = context.attach(ctx) + stack = (token_stack.get() or [])[:] + stack.append((token, span)) + token_stack.set(stack) + return span + + +class OpenTelemetryTracer(NodeExecutionHook, GraphExecutionHook, TaskExecutionHook): + """Adapter to log Hamilton execution to OpenTelemetry. At a high level, this works as follows: + 1. On any of the start/pre hooks (run_before_graph, run_before_node, run_before_task), we start a new span + 2. On any of the post ones we exit the span, accounting for the error (setting it if needed) + + This works by logging to OpenTelemetry, and setting the span processor to be the right one (that knows about the tracker). + """ + + def __init__(self, tracer_name: Optional[str] = None, tracer: Optional[trace.Tracer] = None): + if tracer_name and tracer: + raise ValueError( + f"Only pass in one of tracer_name or tracer, not both, got: tracer_name={tracer_name} and tracer={tracer}" + ) + + if tracer: + self.tracer = tracer + elif tracer_name: + self.tracer = trace.get_tracer(tracer_name) + else: + self.tracer = trace.get_tracer(__name__) + + self.graph = None + + def run_before_graph_execution( + self, + *, + graph: HamiltonGraph, + final_vars: List[str], + inputs: dict, + overrides: dict, + execution_path: Collection[str], + run_id: str, + **kwargs, + ): + self.graph = graph + + attributes = { + "graph_version": graph.version, + "final_vars": final_vars, + "inputs": list(inputs.keys()) if inputs else [], + "overrides": list(overrides.keys()) if overrides else [], + "execution_path": list(execution_path), + } + + graph_span = _enter_span(run_id, self.tracer) + graph_span.set_attributes(attributes) + + def run_before_node_execution( + self, + *, + node_name: str, + node_tags: dict, + node_return_type: type, + **kwargs: Any, + ): + attributes = { + "type": str(node_return_type), + "node_version": self.graph[node_name].version, + "tags": json.dumps(node_tags), + } + + node_span = _enter_span(node_name, self.tracer) + node_span.set_attributes(attributes) + + def run_before_task_execution( + self, + *, + task_id: str, + nodes: List[HamiltonNode], + inputs: Dict[str, Any], + overrides: Dict[str, Any], + **kwargs, + ): + attributes = { + "nodes": [n.name for n in nodes], + "inputs": list(inputs.keys()) if inputs else [], + "overrides": list(overrides.keys()) if overrides else [], + } + task_span = _enter_span(task_id, self.tracer) + task_span.set_attributes(attributes) + + def run_after_task_execution(self, *, error: Optional[Exception], **kwargs): + _exit_span(error) + + def run_after_node_execution(self, *, error: Optional[Exception], **kwargs): + _exit_span(error) + + def run_after_graph_execution(self, *, error: Optional[Exception], **kwargs): + _exit_span(error)