Skip to content

Commit

Permalink
Merge branch 'main' into wantsui/snowflake
Browse files Browse the repository at this point in the history
  • Loading branch information
wantsui authored Jan 10, 2025
2 parents 392b1c6 + a58f139 commit c878159
Show file tree
Hide file tree
Showing 34 changed files with 1,177 additions and 415 deletions.
52 changes: 48 additions & 4 deletions ddtrace/appsec/_iast/_ast/ast_patching.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sys import version_info
import textwrap
from types import ModuleType
from typing import Iterable
from typing import Optional
from typing import Text
from typing import Tuple
Expand Down Expand Up @@ -327,6 +328,49 @@
log = get_logger(__name__)


class _TrieNode:
__slots__ = ("children", "is_end")

def __init__(self):
self.children = {}
self.is_end = False

def __iter__(self):
if self.is_end:
yield ("", None)
else:
for k, v in self.children.items():
yield (k, dict(v))


def build_trie(words: Iterable[str]) -> _TrieNode:
root = _TrieNode()
for word in words:
node = root
for char in word:
if char not in node.children:
node.children[char] = _TrieNode()
node = node.children[char]
node.is_end = True
return root


_TRIE_ALLOWLIST = build_trie(IAST_ALLOWLIST)
_TRIE_DENYLIST = build_trie(IAST_DENYLIST)


def _trie_has_prefix_for(trie: _TrieNode, string: str) -> bool:
node = trie
for char in string:
node = node.children.get(char)
if not node:
return False

if node.is_end:
return True
return node.is_end


def get_encoding(module_path: Text) -> Text:
"""
First tries to detect the encoding for the file,
Expand All @@ -341,11 +385,11 @@ def get_encoding(module_path: Text) -> Text:
return ENCODING


_NOT_PATCH_MODULE_NAMES = _stdlib_for_python_version() | set(builtin_module_names)
_NOT_PATCH_MODULE_NAMES = {i.lower() for i in _stdlib_for_python_version() | set(builtin_module_names)}


def _in_python_stdlib(module_name: str) -> bool:
return module_name.split(".")[0].lower() in [x.lower() for x in _NOT_PATCH_MODULE_NAMES]
return module_name.split(".")[0].lower() in _NOT_PATCH_MODULE_NAMES


def _should_iast_patch(module_name: Text) -> bool:
Expand All @@ -359,10 +403,10 @@ def _should_iast_patch(module_name: Text) -> bool:
# diff = max_allow - max_deny
# return diff > 0 or (diff == 0 and not _in_python_stdlib_or_third_party(module_name))
dotted_module_name = module_name.lower() + "."
if dotted_module_name.startswith(IAST_ALLOWLIST):
if _trie_has_prefix_for(_TRIE_ALLOWLIST, dotted_module_name):
log.debug("IAST: allowing %s. it's in the IAST_ALLOWLIST", module_name)
return True
if dotted_module_name.startswith(IAST_DENYLIST):
if _trie_has_prefix_for(_TRIE_DENYLIST, dotted_module_name):
log.debug("IAST: denying %s. it's in the IAST_DENYLIST", module_name)
return False
if _in_python_stdlib(module_name):
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/appsec/_python_info/stdlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
from .module_names_py312 import STDLIB_MODULE_NAMES


def _stdlib_for_python_version(): # type: () -> set
def _stdlib_for_python_version(): # type: () -> set[str]
return STDLIB_MODULE_NAMES
9 changes: 5 additions & 4 deletions ddtrace/debugging/_safety.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from inspect import CO_VARARGS
from inspect import CO_VARKEYWORDS
from itertools import chain
from types import FrameType
from typing import Any
from typing import Dict
Expand All @@ -23,11 +24,11 @@ def get_args(frame: FrameType) -> Iterator[Tuple[str, Any]]:

def get_locals(frame: FrameType) -> Iterator[Tuple[str, Any]]:
code = frame.f_code
_locals = frame.f_locals
nargs = code.co_argcount + bool(code.co_flags & CO_VARARGS) + bool(code.co_flags & CO_VARKEYWORDS)
names = code.co_varnames[nargs:]
values = (frame.f_locals.get(name) for name in names)

return zip(names, values)
return (
(name, _locals.get(name)) for name in chain(code.co_varnames[nargs:], code.co_freevars, code.co_cellvars)
) # include freevars and cellvars


def get_globals(frame: FrameType) -> Iterator[Tuple[str, Any]]:
Expand Down
213 changes: 213 additions & 0 deletions ddtrace/llmobs/_evaluators/ragas/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import traceback
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

from ddtrace.internal.logger import get_logger
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs._constants import INTERNAL_CONTEXT_VARIABLE_KEYS
from ddtrace.llmobs._constants import INTERNAL_QUERY_VARIABLE_KEYS
from ddtrace.llmobs._constants import RAGAS_ML_APP_PREFIX


logger = get_logger(__name__)


class RagasDependencies:
"""
A helper class to store instances of ragas classes and functions
that may or may not exist in a user's environment.
"""

def __init__(self):
import ragas

self.ragas_version = parse_version(ragas.__version__)
if self.ragas_version >= (0, 2, 0) or self.ragas_version < (0, 1, 10):
raise NotImplementedError(
"Ragas version: {} is not supported".format(self.ragas_version),
)

from ragas.llms import llm_factory

self.llm_factory = llm_factory

from ragas.llms.output_parser import RagasoutputParser

self.RagasoutputParser = RagasoutputParser

from ragas.metrics import context_precision

self.context_precision = context_precision

from ragas.metrics.base import ensembler

self.ensembler = ensembler

from ragas.metrics import faithfulness

self.faithfulness = faithfulness

from ragas.metrics.base import get_segmenter

self.get_segmenter = get_segmenter

from ddtrace.llmobs._evaluators.ragas.models import StatementFaithfulnessAnswers

self.StatementFaithfulnessAnswers = StatementFaithfulnessAnswers

from ddtrace.llmobs._evaluators.ragas.models import StatementsAnswers

self.StatementsAnswers = StatementsAnswers


def _get_ml_app_for_ragas_trace(span_event: dict) -> str:
"""
The `ml_app` spans generated from traces of ragas will be named as `dd-ragas-<ml_app>`
or `dd-ragas` if `ml_app` is not present in the span event.
"""
tags: List[str] = span_event.get("tags", [])
ml_app = None
for tag in tags:
if isinstance(tag, str) and tag.startswith("ml_app:"):
ml_app = tag.split(":")[1]
break
if not ml_app:
return RAGAS_ML_APP_PREFIX
return "{}-{}".format(RAGAS_ML_APP_PREFIX, ml_app)


class BaseRagasEvaluator:
"""A class used by EvaluatorRunner to conduct ragas evaluations
on LLM Observability span events. The job of an Evaluator is to take a span and
submit evaluation metrics based on the span's attributes.
Extenders of this class should only need to implement the `evaluate` method.
"""

LABEL = "ragas"
METRIC_TYPE = "score"

def __init__(self, llmobs_service):
"""
Initialize an evaluator that uses the ragas library to generate a score on finished LLM spans.
:param llmobs_service: An instance of the LLM Observability service used for tracing the evaluation and
submitting evaluation metrics.
Raises: NotImplementedError if the ragas library is not found or if ragas version is not supported.
"""
self.llmobs_service = llmobs_service
self.ragas_version = "unknown"
telemetry_state = "ok"
try:
self.ragas_dependencies = RagasDependencies()
self.ragas_version = self.ragas_dependencies.ragas_version
except ImportError as e:
telemetry_state = "fail_import_error"
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e
except AttributeError as e:
telemetry_state = "fail_attribute_error"
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e
except NotImplementedError as e:
telemetry_state = "fail_not_supported"
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e
except Exception as e:
telemetry_state = "fail_unknown"
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e
finally:
telemetry_writer.add_count_metric(
namespace=TELEMETRY_APM_PRODUCT.LLMOBS,
name="evaluators.init",
value=1,
tags=(
("evaluator_label", self.LABEL),
("state", telemetry_state),
("evaluator_version", self.ragas_version),
),
)
if telemetry_state != "ok":
telemetry_writer.add_log(
level=TELEMETRY_LOG_LEVEL.ERROR,
message="Failed to import Ragas dependencies",
stack_trace=traceback.format_exc(),
tags={"evaluator_version": self.ragas_version},
)

def run_and_submit_evaluation(self, span_event: dict):
if not span_event:
return
score_result_or_failure, metric_metadata = self.evaluate(span_event)
telemetry_writer.add_count_metric(
TELEMETRY_APM_PRODUCT.LLMOBS,
"evaluators.run",
1,
tags=(
("evaluator_label", self.LABEL),
("state", score_result_or_failure if isinstance(score_result_or_failure, str) else "success"),
("evaluator_version", self.ragas_version),
),
)
if isinstance(score_result_or_failure, float):
self.llmobs_service.submit_evaluation(
span_context={"trace_id": span_event.get("trace_id"), "span_id": span_event.get("span_id")},
label=self.LABEL,
metric_type=self.METRIC_TYPE,
value=score_result_or_failure,
metadata=metric_metadata,
)

def evaluate(self, span_event: dict) -> Tuple[Union[float, str], Optional[dict]]:
raise NotImplementedError("evaluate method must be implemented by individual evaluators")

def _extract_evaluation_inputs_from_span(self, span_event: dict) -> Optional[dict]:
"""
Extracts the question, answer, and context used as inputs for a ragas evaluation on a span event.
"""
with self.llmobs_service.workflow("dd-ragas.extract_evaluation_inputs_from_span") as extract_inputs_workflow:
self.llmobs_service.annotate(span=extract_inputs_workflow, input_data=span_event)
question, answer, contexts = None, None, None

meta_io = span_event.get("meta")
if meta_io is None:
return None

meta_input = meta_io.get("input")
meta_output = meta_io.get("output")

if not (meta_input and meta_output):
return None

prompt = meta_input.get("prompt")
if prompt is None:
logger.debug("Failed to extract `prompt` from span for ragas evaluation")
return None
prompt_variables = prompt.get("variables")

input_messages = meta_input.get("messages")

messages = meta_output.get("messages")
if messages is not None and len(messages) > 0:
answer = messages[-1].get("content")

if prompt_variables:
context_keys = prompt.get(INTERNAL_CONTEXT_VARIABLE_KEYS, ["context"])
question_keys = prompt.get(INTERNAL_QUERY_VARIABLE_KEYS, ["question"])
contexts = [prompt_variables.get(key) for key in context_keys if prompt_variables.get(key)]
question = " ".join([prompt_variables.get(key) for key in question_keys if prompt_variables.get(key)])

if not question and input_messages is not None and len(input_messages) > 0:
question = input_messages[-1].get("content")

self.llmobs_service.annotate(
span=extract_inputs_workflow, output_data={"question": question, "contexts": contexts, "answer": answer}
)
if any(field is None for field in (question, contexts, answer)):
logger.debug("Failed to extract inputs required for ragas evaluation")
return None

return {"question": question, "contexts": contexts, "answer": answer}
Loading

0 comments on commit c878159

Please sign in to comment.