diff --git a/index.d.ts b/index.d.ts index 2a74b83..ecb1c27 100644 --- a/index.d.ts +++ b/index.d.ts @@ -50,6 +50,7 @@ export type { export type { TransportOptions, + TransportRequestMetadata, TransportRequestParams, TransportRequestOptions, TransportRequestOptionsWithMeta, diff --git a/package.json b/package.json index 78de221..522db73 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "node": ">=18" }, "devDependencies": { + "@opentelemetry/sdk-trace-base": "^1.25.0", "@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1", "@tapjs/clock": "^1.1.24", "@types/debug": "^4.1.7", @@ -58,6 +59,7 @@ "workq": "^3.0.0" }, "dependencies": { + "@opentelemetry/api": "1.x", "debug": "^4.3.4", "hpagent": "^1.0.0", "ms": "^2.1.3", @@ -68,7 +70,8 @@ "tap": { "allow-incomplete-coverage": true, "plugin": [ - "@tapjs/clock" + "@tapjs/clock", + "@tapjs/before" ] } } diff --git a/src/Transport.ts b/src/Transport.ts index 8d39cca..3c5d76a 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -76,9 +76,11 @@ import { kNdjsonContentType, kAcceptHeader, kRedaction, - kRetryBackoff + kRetryBackoff, + kOtelTracer } from './symbols' import { setTimeout as setTimeoutPromise } from 'node:timers/promises' +import opentelemetry, { Attributes, Exception, SpanKind, SpanStatusCode, Span, Tracer } from '@opentelemetry/api' const { version: clientVersion } = require('../package.json') // eslint-disable-line const debug = Debug('elasticsearch') @@ -119,12 +121,18 @@ export interface TransportOptions { retryBackoff?: (min: number, max: number, attempt: number) => number } +export interface TransportRequestMetadata { + name: string + pathParts?: Record +} + export interface TransportRequestParams { method: string path: string body?: RequestBody bulkBody?: RequestNDBody querystring?: Record | string + meta?: TransportRequestMetadata } export interface TransportRequestOptions { @@ -221,6 +229,7 @@ export default class Transport { [kAcceptHeader]: string [kRedaction]: RedactionOptions [kRetryBackoff]: (min: number, max: number, attempt: number) => number + [kOtelTracer]: Tracer static sniffReasons = { SNIFF_ON_START: 'sniff-on-start', @@ -283,6 +292,7 @@ export default class Transport { this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain' this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] } this[kRetryBackoff] = opts.retryBackoff ?? retryBackoff + this[kOtelTracer] = opentelemetry.trace.getTracer('@elastic/transport', clientVersion) if (opts.sniffOnStart === true) { this.sniff({ @@ -327,10 +337,10 @@ export default class Transport { return this[kDiagnostic] } - async request (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta): Promise - async request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise> - async request (params: TransportRequestParams, options?: TransportRequestOptions): Promise - async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise { + private async _request (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta, otelSpan?: Span): Promise + private async _request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta, otelSpan?: Span): Promise> + private async _request (params: TransportRequestParams, options?: TransportRequestOptions, otelSpan?: Span): Promise + private async _request (params: TransportRequestParams, options: TransportRequestOptions = {}, otelSpan?: Span): Promise { const connectionParams: ConnectionRequestParams = { method: params.method, path: params.path @@ -370,7 +380,6 @@ export default class Transport { if (this.headers?.warning == null) { return null } - const { warning } = this.headers // if multiple HTTP headers have the same name, Undici represents them as an array const warnings: string[] = Array.isArray(warning) ? warning : [warning] @@ -489,6 +498,22 @@ export default class Transport { throw new NoLivingConnectionsError('There are no living connections', result, errorOptions) } + // generate required OpenTelemetry attributes from the request URL + const requestUrl = meta.connection.url + otelSpan?.setAttributes({ + 'url.full': requestUrl.toString(), + 'server.address': requestUrl.hostname + }) + if (requestUrl.port === '') { + if (requestUrl.protocol === 'https:') { + otelSpan?.setAttribute('server.port', 443) + } else if (requestUrl.protocol === 'http:') { + otelSpan?.setAttribute('server.port', 80) + } + } else if (requestUrl.port !== '9200') { + otelSpan?.setAttribute('server.port', parseInt(requestUrl.port, 10)) + } + this[kDiagnostic].emit('request', null, result) // perform the actual http request @@ -505,6 +530,14 @@ export default class Transport { result.statusCode = statusCode result.headers = headers + if (headers['x-found-handling-cluster'] != null) { + otelSpan?.setAttribute('db.elasticsearch.cluster.name', headers['x-found-handling-cluster']) + } + + if (headers['x-found-handling-instance'] != null) { + otelSpan?.setAttribute('db.elasticsearch.node.name', headers['x-found-handling-instance']) + } + if (this[kProductCheck] != null && headers['x-elastic-product'] !== this[kProductCheck] && statusCode >= 200 && statusCode < 300) { /* eslint-disable @typescript-eslint/prefer-ts-expect-error */ // @ts-ignore @@ -647,6 +680,45 @@ export default class Transport { return returnMeta ? result : result.body } + async request (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta): Promise + async request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise> + async request (params: TransportRequestParams, options?: TransportRequestOptions): Promise + async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise { + // wrap in OpenTelemetry span + if (params.meta?.name != null) { + // gather OpenTelemetry attributes + const attributes: Attributes = { + 'db.system': 'elasticsearch', + 'http.request.method': params.method, + 'db.operation.name': params.meta?.name + } + if (params.meta?.pathParts != null) { + for (const key of Object.keys(params.meta.pathParts)) { + attributes[`db.elasticsearch.path_parts.${key}`] = params.meta.pathParts[key] + } + } + + return await this[kOtelTracer].startActiveSpan(params.meta.name, { attributes, kind: SpanKind.CLIENT }, async (otelSpan: Span) => { + let response + try { + response = await this._request(params, options, otelSpan) + } catch (err: any) { + otelSpan.recordException(err as Exception) + otelSpan.setStatus({ code: SpanStatusCode.ERROR }) + otelSpan.setAttribute('error.type', err.name ?? 'Error') + + throw err + } finally { + otelSpan.end() + } + + return response + }) + } else { + return await this._request(params, options) + } + } + getConnection (opts: GetConnectionOptions): Connection | null { const now = Date.now() if (this[kSniffEnabled] && now > this[kNextSniff]) { diff --git a/src/symbols.ts b/src/symbols.ts index a6ae316..ccfce13 100644 --- a/src/symbols.ts +++ b/src/symbols.ts @@ -48,3 +48,4 @@ export const kNdjsonContentType = Symbol('ndjson content type') export const kAcceptHeader = Symbol('accept header') export const kRedaction = Symbol('redaction') export const kRetryBackoff = Symbol('retry backoff') +export const kOtelTracer = Symbol('opentelemetry tracer') diff --git a/test/unit/transport.test.ts b/test/unit/transport.test.ts index 4242243..a7e79cb 100644 --- a/test/unit/transport.test.ts +++ b/test/unit/transport.test.ts @@ -26,6 +26,7 @@ import os from 'os' import { Readable } from 'stream' import intoStream from 'into-stream' import * as http from 'http' +import { BasicTracerProvider, InMemorySpanExporter, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base' import { Transport, Serializer, @@ -2269,3 +2270,123 @@ test('redaction does not get leaked to original object', async t => { } server.stop() }) + +test('OpenTelemetry', t => { + let processor: SimpleSpanProcessor + let provider: BasicTracerProvider + let exporter: InMemorySpanExporter + + t.before(() => { + exporter = new InMemorySpanExporter() + processor = new SimpleSpanProcessor(exporter) + provider = new BasicTracerProvider() + provider.addSpanProcessor(processor) + provider.register() + }) + + t.afterEach(async () => { + await provider.forceFlush() + exporter.reset() + }) + + t.after(async () => { + await provider.shutdown() + }) + + t.test('basic details', async t => { + t.plan(2) + + function handler (req: http.IncomingMessage, res: http.ServerResponse) { + res.end('ok') + } + + const [{ port }, server] = await buildServer(handler) + const pool = new WeightedConnectionPool({ Connection: UndiciConnection }) + pool.addConnection(`http://localhost:${port}`) + const transport = new Transport({ connectionPool: pool }) + + await transport.request({ + path: '/hello', + method: 'GET', + meta: { name: 'hello' }, + }) + + const spans = exporter.getFinishedSpans() + + t.same(spans[0].attributes, { + 'db.system': 'elasticsearch', + 'http.request.method': 'GET', + 'db.operation.name': 'hello', + 'url.full': `http://localhost:${port}/`, + 'server.address': 'localhost', + 'server.port': port, + }) + t.equal(spans[0].status.code, 0) + + server.stop() + }) + + t.test('cloud cluster and instance details', async t => { + t.plan(2) + + function handler (_req: http.IncomingMessage, res: http.ServerResponse) { + res.setHeader('x-found-handling-cluster', 'foobar') + res.setHeader('x-found-handling-instance', 'instance-1') + res.end('ok') + } + + const [{ port }, server] = await buildServer(handler) + const pool = new WeightedConnectionPool({ Connection: UndiciConnection }) + pool.addConnection(`http://localhost:${port}`) + const transport = new Transport({ connectionPool: pool }) + + await transport.request({ + path: '/hello2', + method: 'GET', + meta: { name: 'hello.2' }, + }) + + const spans = exporter.getFinishedSpans() + t.same(spans[0].attributes, { + 'db.system': 'elasticsearch', + 'http.request.method': 'GET', + 'db.operation.name': 'hello.2', + 'url.full': `http://localhost:${port}/`, + 'server.address': 'localhost', + 'server.port': port, + 'db.elasticsearch.cluster.name': 'foobar', + 'db.elasticsearch.node.name': 'instance-1', + }) + t.equal(spans[0].status.code, 0) + + server.stop() + }) + + t.test('span records error state', async t => { + t.plan(3) + + const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout }) + pool.addConnection('http://localhost:9200') + + const transport = new Transport({ + connectionPool: pool, + }) + + try { + await transport.request({ + path: '/hello2', + method: 'GET', + meta: { name: 'hello.2' }, + }) + } catch (err: any) { + t.ok(err instanceof Error) + } + + const spans = exporter.getFinishedSpans() + + t.equal(spans[0].attributes['error.type'], 'TimeoutError') + t.not(spans[0].status.code, 0) + }) + + t.end() +})