Skip to content

Commit

Permalink
fix(tracing): resolve OpenTelemetry token type warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Gaias Malagurti committed Dec 19, 2024
1 parent e8641e6 commit 1210f77
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 50 deletions.
4 changes: 4 additions & 0 deletions src/promptflow-tracing/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# promptflow-tracing package

## v1.16.4 (2024.19.14)
- Fix Open Telemetry warnings
- Fix token count issue

## v1.16.3 (2024.12.14)

- Fix token count issue when the value is None or it is a Dict
Expand Down
136 changes: 86 additions & 50 deletions src/promptflow-tracing/promptflow/tracing/_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,56 +109,92 @@ def start_as_current_span(


class TokenCollector:
_lock = Lock()

def __init__(self):
self._span_id_to_tokens = {}

def collect_openai_tokens(self, span, output):
span_id = span.get_span_context().span_id
if not inspect.isgenerator(output) and hasattr(output, "usage") and output.usage is not None:
tokens = output.usage.dict()
if tokens:
with self._lock:
self._span_id_to_tokens[span_id] = tokens

def collect_openai_tokens_for_streaming(self, span, inputs, output, is_chat):
span_id = span.get_span_context().span_id
calculator = OpenAIMetricsCalculator()
if is_chat:
tokens = calculator.get_openai_metrics_for_chat_api(inputs, output)
else:
tokens = calculator.get_openai_metrics_for_completion_api(inputs, output)
with self._lock:
self._span_id_to_tokens[span_id] = tokens

def collect_openai_tokens_for_parent_span(self, span):
tokens = self.try_get_openai_tokens(span.get_span_context().span_id)
if tokens:
if not hasattr(span, "parent") or span.parent is None:
return
parent_span_id = span.parent.span_id
with self._lock:
if parent_span_id in self._span_id_to_tokens:
merged_tokens = {}
for key in set(self._span_id_to_tokens[parent_span_id]) | set(tokens):
parent_value = self._span_id_to_tokens[parent_span_id].get(key, 0)
token_value = tokens.get(key, 0)
if isinstance(parent_value, dict) and isinstance(token_value, dict):
# Handle the case where both values are dictionaries
merged_tokens[key] = {**parent_value, **token_value}
elif isinstance(parent_value, dict) or isinstance(token_value, dict):
# Handle the case where one value is a dictionary and the other is not
merged_tokens[key] = parent_value if isinstance(parent_value, dict) else token_value
else:
merged_tokens[key] = int(parent_value or 0) + int(token_value or 0)
self._span_id_to_tokens[parent_span_id] = merged_tokens
else:
self._span_id_to_tokens[parent_span_id] = tokens

def try_get_openai_tokens(self, span_id):
with self._lock:
return self._span_id_to_tokens.get(span_id, None)
_lock = Lock()

def __init__(self):
self._span_id_to_tokens = {}

def collect_openai_tokens(self, span, output):
span_id = span.get_span_context().span_id
if not inspect.isgenerator(output) and hasattr(output, "usage") and output.usage is not None:
try:
tokens = output.usage.dict()
if tokens:
safe_tokens = {
k: float(v) if isinstance(v, (int, float)) else 0
for k, v in tokens.items()
}
with self._lock:
self._span_id_to_tokens[span_id] = safe_tokens
except:
logging.warning("Failed to process token usage")

def collect_openai_tokens_for_streaming(self, span, inputs, output, is_chat):
span_id = span.get_span_context().span_id
calculator = OpenAIMetricsCalculator()
try:
if is_chat:
tokens = calculator.get_openai_metrics_for_chat_api(inputs, output)
else:
tokens = calculator.get_openai_metrics_for_completion_api(inputs, output)
safe_tokens = {
k: float(v) if isinstance(v, (int, float)) else 0
for k, v in tokens.items()
}
with self._lock:
self._span_id_to_tokens[span_id] = safe_tokens
except Exception as e:
logging.warning(f"Failed to collect streaming tokens: {e}")

def collect_openai_tokens_for_parent_span(self, span):
def safe_token_value(value):
if value is None:
return 0
if isinstance(value, dict):
try:
return sum(safe_token_value(v) for v in value.values())
except:
return 0
if isinstance(value, (int, float)):
return float(value)
try:
return float(value)
except:
return 0

tokens = self.try_get_openai_tokens(span.get_span_context().span_id)
if not tokens:
return

if not hasattr(span, "parent") or span.parent is None:
return

parent_span_id = span.parent.span_id
with self._lock:
if parent_span_id in self._span_id_to_tokens:
current_tokens = self._span_id_to_tokens[parent_span_id]
merged_tokens = {
key: safe_token_value(current_tokens.get(key, 0)) + safe_token_value(tokens.get(key, 0))
for key in set(current_tokens.keys()) | set(tokens.keys())
}
for field in ['completion_tokens', 'prompt_tokens', 'total_tokens']:
if field not in merged_tokens:
merged_tokens[field] = 0.0
else:
merged_tokens[field] = float(merged_tokens[field])
self._span_id_to_tokens[parent_span_id] = merged_tokens
else:
safe_tokens = {k: safe_token_value(v) for k, v in tokens.items()}
for field in ['completion_tokens', 'prompt_tokens', 'total_tokens']:
if field not in safe_tokens:
safe_tokens[field] = 0.0
else:
safe_tokens[field] = float(safe_tokens[field])
self._span_id_to_tokens[parent_span_id] = safe_tokens

def try_get_openai_tokens(self, span_id):
with self._lock:
return self._span_id_to_tokens.get(span_id, None)


token_collector = TokenCollector()
Expand Down

0 comments on commit 1210f77

Please sign in to comment.