Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add valkey instrumentation support #12003

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/COMMIT_TEMPLATE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ feat/fix/docs/refactor/ci(xxx): commit title here
# mysqlpython, openai, opentelemetry, opentracer, profile, psycopg, pylibmc, pymemcache,
# pymongo, pymysql, pynamodb, pyodbc, pyramid, pytest, redis, rediscluster, requests, rq,
# sanic, snowflake, sourcecode, sqlalchemy, starlette, stdlib, structlog, subprocess,
# telemetry, test_logging, tornado, tracer, unittest, urllib3, vendor, vertica, wsgi,
# telemetry, test_logging, tornado, tracer, unittest, urllib3, valkey, vendor, vertica, wsgi,
# yaaredis
26 changes: 26 additions & 0 deletions .riot/requirements/11ac941.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/11ac941.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.1
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.5.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==5.0.0
pytest-mock==3.14.0
pytest-randomly==3.15.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
zipp==3.20.2
26 changes: 26 additions & 0 deletions .riot/requirements/1e98e9b.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1e98e9b.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.5.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
zipp==3.21.0
22 changes: 22 additions & 0 deletions .riot/requirements/4aa2a2a.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/4aa2a2a.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
21 changes: 21 additions & 0 deletions .riot/requirements/7219cf4.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.13
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/7219cf4.in
#
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
21 changes: 21 additions & 0 deletions .riot/requirements/b96b665.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/b96b665.in
#
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
24 changes: 24 additions & 0 deletions .riot/requirements/dd68acc.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/dd68acc.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
exceptiongroup==1.2.2
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
1 change: 1 addition & 0 deletions ddtrace/_monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
"unittest": True,
"coverage": False,
"selenium": True,
"valkey": True,
}


Expand Down
8 changes: 8 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,11 @@ def _on_redis_command_post(ctx: core.ExecutionContext, rowcount):
ctx.span.set_metric(db.ROWCOUNT, rowcount)


def _on_valkey_command_post(ctx: core.ExecutionContext, rowcount):
if rowcount is not None:
ctx.span.set_metric(db.ROWCOUNT, rowcount)


def _on_test_visibility_enable(config) -> None:
from ddtrace.internal.ci_visibility import CIVisibility

Expand Down Expand Up @@ -758,6 +763,8 @@ def listen():
core.on("botocore.kinesis.GetRecords.post", _on_botocore_kinesis_getrecords_post)
core.on("redis.async_command.post", _on_redis_command_post)
core.on("redis.command.post", _on_redis_command_post)
core.on("valkey.async_command.post", _on_valkey_command_post)
core.on("valkey.command.post", _on_valkey_command_post)
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
core.on("azure.functions.start_response", _on_azure_functions_start_response)

Expand Down Expand Up @@ -786,6 +793,7 @@ def listen():
"botocore.patched_stepfunctions_api_call",
"botocore.patched_bedrock_api_call",
"redis.command",
"valkey.command",
"rq.queue.enqueue_job",
"rq.traced_queue_fetch_job",
"rq.worker.perform_job",
Expand Down
96 changes: 96 additions & 0 deletions ddtrace/_trace/utils_valkey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Some utils used by the dogtrace valkey integration
"""

from contextlib import contextmanager
from typing import List
from typing import Optional

from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.constants import SPAN_MEASURED_KEY
from ddtrace.contrib import trace_utils
from ddtrace.contrib.valkey_utils import _extract_conn_tags
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.ext import valkey as valkeyx
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.schema import schematize_cache_operation
from ddtrace.internal.utils.formats import stringify_cache_args


format_command_args = stringify_cache_args


def _set_span_tags(
span, pin, config_integration, args: Optional[List], instance, query: Optional[List], is_cluster: bool = False
):
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT)
span.set_tag_str(COMPONENT, config_integration.integration_name)
span.set_tag_str(db.SYSTEM, valkeyx.APP)
span.set_tag(SPAN_MEASURED_KEY)
if query is not None:
span_name = schematize_cache_operation(valkeyx.RAWCMD, cache_provider=valkeyx.APP) # type: ignore[operator]
span.set_tag_str(span_name, query)
if pin.tags:
span.set_tags(pin.tags)
# some valkey clients do not have a connection_pool attribute (ex. aiovalkey v1.3)
if not is_cluster and hasattr(instance, "connection_pool"):
span.set_tags(_extract_conn_tags(instance.connection_pool.connection_kwargs))
if args is not None:
span.set_metric(valkeyx.ARGS_LEN, len(args))
else:
for attr in ("command_stack", "_command_stack"):
if hasattr(instance, attr):
span.set_metric(valkeyx.PIPELINE_LEN, len(getattr(instance, attr)))
# set analytics sample rate if enabled
span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, config_integration.get_analytics_sample_rate())


@contextmanager
def _instrument_valkey_cmd(pin, config_integration, instance, args):
query = stringify_cache_args(args, cmd_max_len=config_integration.cmd_max_length)
with core.context_with_data(
"valkey.command",
span_name=schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
pin=pin,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
resource=query.split(" ")[0] if config_integration.resource_only_command else query,
) as ctx, ctx.span as span:
_set_span_tags(span, pin, config_integration, args, instance, query)
yield ctx


@contextmanager
def _instrument_valkey_execute_pipeline(pin, config_integration, cmds, instance, is_cluster=False):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])

with pin.tracer.trace(
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
resource=resource,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
) as span:
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
yield span


@contextmanager
def _instrument_valkey_execute_async_cluster_pipeline(pin, config_integration, cmds, instance):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])

with pin.tracer.trace(
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
resource=resource,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
) as span:
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
yield span
36 changes: 36 additions & 0 deletions ddtrace/contrib/internal/valkey/asyncio_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from ddtrace import config
from ddtrace._trace.utils_valkey import _instrument_valkey_cmd
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_async_cluster_pipeline
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_pipeline
from ddtrace.contrib.valkey_utils import _run_valkey_command_async
from ddtrace.internal.utils.formats import stringify_cache_args
from ddtrace.trace import Pin


async def instrumented_async_execute_command(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

with _instrument_valkey_cmd(pin, config.valkey, instance, args) as ctx:
return await _run_valkey_command_async(ctx=ctx, func=func, args=args, kwargs=kwargs)


async def instrumented_async_execute_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c, cmd_max_len=config.valkey.cmd_max_length) for c, _ in instance.command_stack]
with _instrument_valkey_execute_pipeline(pin, config.valkey, cmds, instance):
return await func(*args, **kwargs)


async def instrumented_async_execute_cluster_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c.args, cmd_max_len=config.valkey.cmd_max_length) for c in instance._command_stack]
with _instrument_valkey_execute_async_cluster_pipeline(pin, config.valkey, cmds, instance):
return await func(*args, **kwargs)
Loading
Loading