Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tracing): resolve OpenTelemetry warnings and token type error #3889

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/promptflow-tracing/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# promptflow-tracing package

## v1.16.4 (2024.19.14)
- Fix Open Telemetry warnings
- Fix token count issue

## v1.16.3 (2024.12.14)

- Fix token count issue when the value is None or it is a Dict
Expand Down
136 changes: 86 additions & 50 deletions src/promptflow-tracing/promptflow/tracing/_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,56 +109,92 @@ def start_as_current_span(


class TokenCollector:
_lock = Lock()

def __init__(self):
self._span_id_to_tokens = {}

def collect_openai_tokens(self, span, output):
span_id = span.get_span_context().span_id
if not inspect.isgenerator(output) and hasattr(output, "usage") and output.usage is not None:
tokens = output.usage.dict()
if tokens:
with self._lock:
self._span_id_to_tokens[span_id] = tokens

def collect_openai_tokens_for_streaming(self, span, inputs, output, is_chat):
span_id = span.get_span_context().span_id
calculator = OpenAIMetricsCalculator()
if is_chat:
tokens = calculator.get_openai_metrics_for_chat_api(inputs, output)
else:
tokens = calculator.get_openai_metrics_for_completion_api(inputs, output)
with self._lock:
self._span_id_to_tokens[span_id] = tokens

def collect_openai_tokens_for_parent_span(self, span):
tokens = self.try_get_openai_tokens(span.get_span_context().span_id)
if tokens:
if not hasattr(span, "parent") or span.parent is None:
return
parent_span_id = span.parent.span_id
with self._lock:
if parent_span_id in self._span_id_to_tokens:
merged_tokens = {}
for key in set(self._span_id_to_tokens[parent_span_id]) | set(tokens):
parent_value = self._span_id_to_tokens[parent_span_id].get(key, 0)
token_value = tokens.get(key, 0)
if isinstance(parent_value, dict) and isinstance(token_value, dict):
# Handle the case where both values are dictionaries
merged_tokens[key] = {**parent_value, **token_value}
elif isinstance(parent_value, dict) or isinstance(token_value, dict):
# Handle the case where one value is a dictionary and the other is not
merged_tokens[key] = parent_value if isinstance(parent_value, dict) else token_value
else:
merged_tokens[key] = int(parent_value or 0) + int(token_value or 0)
self._span_id_to_tokens[parent_span_id] = merged_tokens
else:
self._span_id_to_tokens[parent_span_id] = tokens

def try_get_openai_tokens(self, span_id):
with self._lock:
return self._span_id_to_tokens.get(span_id, None)
_lock = Lock()

def __init__(self):
self._span_id_to_tokens = {}

def collect_openai_tokens(self, span, output):
span_id = span.get_span_context().span_id
if not inspect.isgenerator(output) and hasattr(output, "usage") and output.usage is not None:
try:
tokens = output.usage.dict()
if tokens:
safe_tokens = {
k: float(v) if isinstance(v, (int, float)) else 0
for k, v in tokens.items()
}
with self._lock:
self._span_id_to_tokens[span_id] = safe_tokens
except:
logging.warning("Failed to process token usage")

def collect_openai_tokens_for_streaming(self, span, inputs, output, is_chat):
span_id = span.get_span_context().span_id
calculator = OpenAIMetricsCalculator()
try:
if is_chat:
tokens = calculator.get_openai_metrics_for_chat_api(inputs, output)
else:
tokens = calculator.get_openai_metrics_for_completion_api(inputs, output)
safe_tokens = {
k: float(v) if isinstance(v, (int, float)) else 0
for k, v in tokens.items()
}
with self._lock:
self._span_id_to_tokens[span_id] = safe_tokens
except Exception as e:
logging.warning(f"Failed to collect streaming tokens: {e}")

def collect_openai_tokens_for_parent_span(self, span):
def safe_token_value(value):
if value is None:
return 0
if isinstance(value, dict):
try:
return sum(safe_token_value(v) for v in value.values())
except:
return 0
if isinstance(value, (int, float)):
return float(value)
try:
return float(value)
except:
return 0

tokens = self.try_get_openai_tokens(span.get_span_context().span_id)
if not tokens:
return

if not hasattr(span, "parent") or span.parent is None:
return

parent_span_id = span.parent.span_id
with self._lock:
if parent_span_id in self._span_id_to_tokens:
current_tokens = self._span_id_to_tokens[parent_span_id]
merged_tokens = {
key: safe_token_value(current_tokens.get(key, 0)) + safe_token_value(tokens.get(key, 0))
for key in set(current_tokens.keys()) | set(tokens.keys())
}
for field in ['completion_tokens', 'prompt_tokens', 'total_tokens']:
if field not in merged_tokens:
merged_tokens[field] = 0.0
else:
merged_tokens[field] = float(merged_tokens[field])
self._span_id_to_tokens[parent_span_id] = merged_tokens
else:
safe_tokens = {k: safe_token_value(v) for k, v in tokens.items()}
for field in ['completion_tokens', 'prompt_tokens', 'total_tokens']:
if field not in safe_tokens:
safe_tokens[field] = 0.0
else:
safe_tokens[field] = float(safe_tokens[field])
self._span_id_to_tokens[parent_span_id] = safe_tokens

def try_get_openai_tokens(self, span_id):
with self._lock:
return self._span_id_to_tokens.get(span_id, None)


token_collector = TokenCollector()
Expand Down
Loading