Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry support #104

Merged
merged 8 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export type {

export type {
TransportOptions,
TransportRequestMetadata,
TransportRequestParams,
TransportRequestOptions,
TransportRequestOptionsWithMeta,
Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"node": ">=18"
},
"devDependencies": {
"@opentelemetry/sdk-trace-base": "^1.25.0",
"@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1",
"@tapjs/clock": "^1.1.24",
"@types/debug": "^4.1.7",
Expand All @@ -58,6 +59,7 @@
"workq": "^3.0.0"
},
"dependencies": {
"@opentelemetry/api": "1.x",
"debug": "^4.3.4",
"hpagent": "^1.0.0",
"ms": "^2.1.3",
Expand All @@ -68,7 +70,8 @@
"tap": {
"allow-incomplete-coverage": true,
"plugin": [
"@tapjs/clock"
"@tapjs/clock",
"@tapjs/before"
]
}
}
84 changes: 78 additions & 6 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ import {
kNdjsonContentType,
kAcceptHeader,
kRedaction,
kRetryBackoff
kRetryBackoff,
kOtelTracer
} from './symbols'
import { setTimeout as setTimeoutPromise } from 'node:timers/promises'
import opentelemetry, { Attributes, Exception, SpanKind, SpanStatusCode, Span, Tracer } from '@opentelemetry/api'

const { version: clientVersion } = require('../package.json') // eslint-disable-line
const debug = Debug('elasticsearch')
Expand Down Expand Up @@ -119,12 +121,18 @@ export interface TransportOptions {
retryBackoff?: (min: number, max: number, attempt: number) => number
}

export interface TransportRequestMetadata {
name: string
pathParts?: Record<string, any>
}

export interface TransportRequestParams {
method: string
path: string
body?: RequestBody
bulkBody?: RequestNDBody
querystring?: Record<string, any> | string
meta?: TransportRequestMetadata
}

export interface TransportRequestOptions {
Expand Down Expand Up @@ -221,6 +229,7 @@ export default class Transport {
[kAcceptHeader]: string
[kRedaction]: RedactionOptions
[kRetryBackoff]: (min: number, max: number, attempt: number) => number
[kOtelTracer]: Tracer

static sniffReasons = {
SNIFF_ON_START: 'sniff-on-start',
Expand Down Expand Up @@ -283,6 +292,7 @@ export default class Transport {
this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain'
this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] }
this[kRetryBackoff] = opts.retryBackoff ?? retryBackoff
this[kOtelTracer] = opentelemetry.trace.getTracer('@elastic/transport', clientVersion)

if (opts.sniffOnStart === true) {
this.sniff({
Expand Down Expand Up @@ -327,10 +337,10 @@ export default class Transport {
return this[kDiagnostic]
}

async request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta): Promise<TResponse>
async request<TResponse = unknown, TContext = any> (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise<TransportResult<TResponse, TContext>>
async request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptions): Promise<TResponse>
async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise<any> {
private async _request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta, otelSpan?: Span): Promise<TResponse>
private async _request<TResponse = unknown, TContext = any> (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta, otelSpan?: Span): Promise<TransportResult<TResponse, TContext>>
private async _request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptions, otelSpan?: Span): Promise<TResponse>
private async _request (params: TransportRequestParams, options: TransportRequestOptions = {}, otelSpan?: Span): Promise<any> {
const connectionParams: ConnectionRequestParams = {
method: params.method,
path: params.path
Expand Down Expand Up @@ -370,7 +380,6 @@ export default class Transport {
if (this.headers?.warning == null) {
return null
}

const { warning } = this.headers
// if multiple HTTP headers have the same name, Undici represents them as an array
const warnings: string[] = Array.isArray(warning) ? warning : [warning]
Expand Down Expand Up @@ -489,6 +498,22 @@ export default class Transport {
throw new NoLivingConnectionsError('There are no living connections', result, errorOptions)
}

// generate required OpenTelemetry attributes from the request URL
const requestUrl = meta.connection.url
otelSpan?.setAttributes({
'url.full': requestUrl.toString(),
'server.address': requestUrl.hostname
})
if (requestUrl.port === '') {
if (requestUrl.protocol === 'https:') {
otelSpan?.setAttribute('server.port', 443)
} else if (requestUrl.protocol === 'http:') {
otelSpan?.setAttribute('server.port', 80)
}
} else if (requestUrl.port !== '9200') {
otelSpan?.setAttribute('server.port', parseInt(requestUrl.port, 10))
}

this[kDiagnostic].emit('request', null, result)

// perform the actual http request
Expand All @@ -505,6 +530,14 @@ export default class Transport {
result.statusCode = statusCode
result.headers = headers

if (headers['x-found-handling-cluster'] != null) {
otelSpan?.setAttribute('db.elasticsearch.cluster.name', headers['x-found-handling-cluster'])
}

if (headers['x-found-handling-instance'] != null) {
otelSpan?.setAttribute('db.elasticsearch.node.name', headers['x-found-handling-instance'])
}

if (this[kProductCheck] != null && headers['x-elastic-product'] !== this[kProductCheck] && statusCode >= 200 && statusCode < 300) {
/* eslint-disable @typescript-eslint/prefer-ts-expect-error */
// @ts-ignore
Expand Down Expand Up @@ -647,6 +680,45 @@ export default class Transport {
return returnMeta ? result : result.body
}

async request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta): Promise<TResponse>
async request<TResponse = unknown, TContext = any> (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise<TransportResult<TResponse, TContext>>
async request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptions): Promise<TResponse>
async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise<any> {
// wrap in OpenTelemetry span
if (params.meta?.name != null) {
// gather OpenTelemetry attributes
const attributes: Attributes = {
'db.system': 'elasticsearch',
'http.request.method': params.method,
'db.operation.name': params.meta?.name
}
if (params.meta?.pathParts != null) {
for (const key of Object.keys(params.meta.pathParts)) {
attributes[`db.elasticsearch.path_parts.${key}`] = params.meta.pathParts[key]
}
}

return await this[kOtelTracer].startActiveSpan(params.meta.name, { attributes, kind: SpanKind.CLIENT }, async (otelSpan: Span) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshMock Hi there!

Why is elastic creating an otel span by default without Otel being activated/used? 🤔

I have not used --require '@opentelemetry/auto-instrumentations-node/register' but I can see while debugging that the otel spans are getting created.

Could you please explain to me why elastic is approaching this? Thanks so much!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the OpenTelemetry docs, the code will create no-op traces if the OpenTelemetry SDK has not been initialized. They'll still show up in the stack trace, but they won't go anywhere.

If you need to disable OTel collection for this process, you can set the environment variable OTEL_SDK_DISABLED=true.

let response
try {
response = await this._request(params, options, otelSpan)
} catch (err: any) {
otelSpan.recordException(err as Exception)
otelSpan.setStatus({ code: SpanStatusCode.ERROR })
otelSpan.setAttribute('error.type', err.name ?? 'Error')

throw err
} finally {
otelSpan.end()
}

return response
})
} else {
return await this._request(params, options)
}
}

getConnection (opts: GetConnectionOptions): Connection | null {
const now = Date.now()
if (this[kSniffEnabled] && now > this[kNextSniff]) {
Expand Down
1 change: 1 addition & 0 deletions src/symbols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ export const kNdjsonContentType = Symbol('ndjson content type')
export const kAcceptHeader = Symbol('accept header')
export const kRedaction = Symbol('redaction')
export const kRetryBackoff = Symbol('retry backoff')
export const kOtelTracer = Symbol('opentelemetry tracer')
121 changes: 121 additions & 0 deletions test/unit/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import os from 'os'
import { Readable } from 'stream'
import intoStream from 'into-stream'
import * as http from 'http'
import { BasicTracerProvider, InMemorySpanExporter, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base'
import {
Transport,
Serializer,
Expand Down Expand Up @@ -2269,3 +2270,123 @@ test('redaction does not get leaked to original object', async t => {
}
server.stop()
})

test('OpenTelemetry', t => {
let processor: SimpleSpanProcessor
let provider: BasicTracerProvider
let exporter: InMemorySpanExporter

t.before(() => {
exporter = new InMemorySpanExporter()
processor = new SimpleSpanProcessor(exporter)
provider = new BasicTracerProvider()
provider.addSpanProcessor(processor)
provider.register()
})

t.afterEach(async () => {
await provider.forceFlush()
exporter.reset()
})

t.after(async () => {
await provider.shutdown()
})

t.test('basic details', async t => {
t.plan(2)

function handler (req: http.IncomingMessage, res: http.ServerResponse) {
res.end('ok')
}

const [{ port }, server] = await buildServer(handler)
const pool = new WeightedConnectionPool({ Connection: UndiciConnection })
pool.addConnection(`http://localhost:${port}`)
const transport = new Transport({ connectionPool: pool })

await transport.request({
path: '/hello',
method: 'GET',
meta: { name: 'hello' },
})

const spans = exporter.getFinishedSpans()

t.same(spans[0].attributes, {
'db.system': 'elasticsearch',
'http.request.method': 'GET',
'db.operation.name': 'hello',
'url.full': `http://localhost:${port}/`,
'server.address': 'localhost',
'server.port': port,
})
t.equal(spans[0].status.code, 0)

server.stop()
})

t.test('cloud cluster and instance details', async t => {
t.plan(2)

function handler (_req: http.IncomingMessage, res: http.ServerResponse) {
res.setHeader('x-found-handling-cluster', 'foobar')
res.setHeader('x-found-handling-instance', 'instance-1')
res.end('ok')
}

const [{ port }, server] = await buildServer(handler)
const pool = new WeightedConnectionPool({ Connection: UndiciConnection })
pool.addConnection(`http://localhost:${port}`)
const transport = new Transport({ connectionPool: pool })

await transport.request({
path: '/hello2',
method: 'GET',
meta: { name: 'hello.2' },
})

const spans = exporter.getFinishedSpans()
t.same(spans[0].attributes, {
'db.system': 'elasticsearch',
'http.request.method': 'GET',
'db.operation.name': 'hello.2',
'url.full': `http://localhost:${port}/`,
'server.address': 'localhost',
'server.port': port,
'db.elasticsearch.cluster.name': 'foobar',
'db.elasticsearch.node.name': 'instance-1',
})
t.equal(spans[0].status.code, 0)

server.stop()
})

t.test('span records error state', async t => {
t.plan(3)

const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout })
pool.addConnection('http://localhost:9200')

const transport = new Transport({
connectionPool: pool,
})

try {
await transport.request({
path: '/hello2',
method: 'GET',
meta: { name: 'hello.2' },
})
} catch (err: any) {
t.ok(err instanceof Error)
}

const spans = exporter.getFinishedSpans()

t.equal(spans[0].attributes['error.type'], 'TimeoutError')
t.not(spans[0].status.code, 0)
})

t.end()
})
Loading