-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
116 lines (95 loc) · 3.94 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
Hello world server to test observability features with OpenTelemetry
and Jaeger.
Some quick start guides:
https://uptrace.dev/opentelemetry/distributed-tracing.html
https://opentelemetry.io/docs/concepts/observability-primer/
"""
from fastapi import FastAPI
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import uvicorn
import time
import json
import subprocess
import os
# Instantiate server
app: FastAPI = FastAPI()
# Append OpenTelemetry instrumentation wrapper
# Instrumentation libraries help to include OpenTelemetry automatically avoiding
# manual changes.
# Some examples: https://github.com/open-telemetry/opentelemetry-python/tree/main/docs/examples
#
# Instrumentation package for FastAPI:
# https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/fastapi/fastapi.html
FastAPIInstrumentor.instrument_app(app=app)
# Get current tracer
tracer = trace.get_tracer("FastAPI")
# Current path
current_path = os.getcwd()
@tracer.start_as_current_span(name="custom_action")
def custom_action():
time.sleep(2)
return "Custom Action Finished"
@tracer.start_as_current_span(name="custom_action_2")
def custom_action_2():
time.sleep(5)
return "Custom Action v2 Finished"
@tracer.start_as_current_span(name="store_current_context")
def store_span_context(carrier: dict, filename: str):
with open(file=filename, encoding="utf-8", mode="w") as cf:
json.dump(obj=carrier, fp=cf, indent=4)
# Some test endpoints
@tracer.start_as_current_span(name="parent_endpoint")
@app.get(path="/")
def hello():
# Retrieve current span and add some extra info
# Events - They can be seen as logs for the current span
# Attributes - Custom metadata for the span
# https://opentelemetry.io/docs/concepts/signals/traces/#attributes
span = trace.get_current_span()
span.set_attribute("action", "Custom Action Override")
span.add_event("custom-event", {"msg": "Custom Event Message"})
# There are several ways to achieve distribute tracing
# Its purpose is to retrieve runtime execution context information from several microservices
# and allow developers and operators to check through time the workflow execution
# At this time, we are going to persist context information manually using the W3C trace protocol
# However, the vision of OpenTelemetry is to provide several implementation libraries to include
# automatically observability features. Currently, there are several connectors
# to enable distributed tracing without configure it manually.
#
# https://opentelemetry.io/docs/concepts/signals/traces/#context-propagation
# https://opentelemetry.io/docs/instrumentation/python/automatic/
# These automatically connectors generally share parent span context information
# via request metadata, for example, HTTP headers.
# We are going to store the parent span tracing information inside a JSON file,
# the second process inside the current trace is going to be a batch process developed using Python.
carrier = {}
TraceContextTextMapPropagator().inject(carrier=carrier)
store_span_context(carrier=carrier, filename="test.json")
# The following dummy functions mimic some auxiliary functions
# required by some process
c_a_v1 = custom_action()
c_a_v2 = custom_action_2()
# Fire and forget batch process
batch_command: str = f"/bin/bash ./start-child.sh"
command: list[str] = batch_command.strip().split(" ")
subprocess.call(
args=command,
cwd=current_path,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
# HTTP Response
return {
"action_1": c_a_v1,
"action_2": c_a_v2,
}
if __name__ == '__main__':
print(f"Current working path: {current_path}")
uvicorn.run(
app=app,
host="localhost",
port=9000,
)