From e32f1433df5511bd092f8971b8ff85248780801e Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Tue, 11 Jun 2024 15:10:51 -0500 Subject: [PATCH 1/8] OpenTelemetry support --- index.d.ts | 1 + package.json | 4 + src/Transport.ts | 602 ++++++++++++++++++++++++++--------------------- 3 files changed, 341 insertions(+), 266 deletions(-) diff --git a/index.d.ts b/index.d.ts index 2a74b83..ecb1c27 100644 --- a/index.d.ts +++ b/index.d.ts @@ -50,6 +50,7 @@ export type { export type { TransportOptions, + TransportRequestMetadata, TransportRequestParams, TransportRequestOptions, TransportRequestOptionsWithMeta, diff --git a/package.json b/package.json index 78de221..722297b 100644 --- a/package.json +++ b/package.json @@ -65,6 +65,10 @@ "tslib": "^2.4.0", "undici": "^6.12.0" }, + "peerDependencies": { + "@opentelemetry/api": "1.x", + "@opentelemetry/sdk-trace-base": "1.x" + }, "tap": { "allow-incomplete-coverage": true, "plugin": [ diff --git a/src/Transport.ts b/src/Transport.ts index 8d39cca..a0c0d46 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -79,6 +79,7 @@ import { kRetryBackoff } from './symbols' import { setTimeout as setTimeoutPromise } from 'node:timers/promises' +import opentelemetry, { Exception, SpanStatusCode, Span } from '@opentelemetry/api' const { version: clientVersion } = require('../package.json') // eslint-disable-line const debug = Debug('elasticsearch') @@ -119,12 +120,18 @@ export interface TransportOptions { retryBackoff?: (min: number, max: number, attempt: number) => number } +export interface TransportRequestMetadata { + name: string + pathParts?: Record +} + export interface TransportRequestParams { method: string path: string body?: RequestBody bulkBody?: RequestNDBody querystring?: Record | string + meta?: TransportRequestMetadata } export interface TransportRequestOptions { @@ -331,320 +338,383 @@ export default class Transport { async request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise> async request (params: TransportRequestParams, options?: TransportRequestOptions): Promise async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise { - const connectionParams: ConnectionRequestParams = { - method: params.method, - path: params.path - } + const otelTracer = opentelemetry.trace.getTracer('elastic-transport') + let otelSpan: Span - const meta: TransportResult['meta'] = { - context: null, - request: { - params: connectionParams, - options: options, - id: options.id ?? this[kGenerateRequestId](params, options) - }, - name: this[kName], - connection: null, - attempts: 0, - aborted: false - } + const _request = async (): Promise => { + const connectionParams: ConnectionRequestParams = { + method: params.method, + path: params.path + } + + const meta: TransportResult['meta'] = { + context: null, + request: { + params: connectionParams, + options: options, + id: options.id ?? this[kGenerateRequestId](params, options) + }, + name: this[kName], + connection: null, + attempts: 0, + aborted: false + } - const returnMeta = options.meta ?? false + const returnMeta = options.meta ?? false - if (this[kContext] != null && options.context != null) { - meta.context = Object.assign({}, this[kContext], options.context) - } else if (this[kContext] !== null) { - meta.context = this[kContext] - } else if (options.context != null) { - meta.context = options.context - } + if (this[kContext] != null && options.context != null) { + meta.context = Object.assign({}, this[kContext], options.context) + } else if (this[kContext] !== null) { + meta.context = this[kContext] + } else if (options.context != null) { + meta.context = options.context + } - const result: TransportResult = { - // the default body value can't be `null` - // as it's a valid JSON value - body: undefined, - statusCode: 0, - headers: {}, - meta, - get warnings () { - if (this.headers?.warning == null) { - return null + const result: TransportResult = { + // the default body value can't be `null` + // as it's a valid JSON value + body: undefined, + statusCode: 0, + headers: {}, + meta, + get warnings () { + if (this.headers?.warning == null) { + return null + } + const { warning } = this.headers + // if multiple HTTP headers have the same name, Undici represents them as an array + const warnings: string[] = Array.isArray(warning) ? warning : [warning] + return warnings + .flatMap(w => w.split(/(?!\B"[^"]*),(?![^"]*"\B)/)) + .filter((warning) => warning.match(/^\d\d\d Elasticsearch-/)) } - - const { warning } = this.headers - // if multiple HTTP headers have the same name, Undici represents them as an array - const warnings: string[] = Array.isArray(warning) ? warning : [warning] - return warnings - .flatMap(w => w.split(/(?!\B"[^"]*),(?![^"]*"\B)/)) - .filter((warning) => warning.match(/^\d\d\d Elasticsearch-/)) } - } - // We should not retry if we are sending a stream body, because we should store in memory - // a copy of the stream to be able to send it again, but since we don't know in advance - // the size of the stream, we risk to take too much memory. - // Furthermore, copying every time the stream is very a expensive operation. - const maxRetries = isStream(params.body ?? params.bulkBody) ? 0 : (typeof options.maxRetries === 'number' ? options.maxRetries : this[kMaxRetries]) - const compression = typeof options.compression === 'boolean' ? options.compression : this[kCompression] - const signal = options.signal - const maxResponseSize = options.maxResponseSize ?? this[kMaxResponseSize] - const maxCompressedResponseSize = options.maxCompressedResponseSize ?? this[kMaxCompressedResponseSize] - - const errorOptions: ErrorOptions = { - redaction: typeof options.redaction === 'object' ? options.redaction : this[kRedaction] - } + // We should not retry if we are sending a stream body, because we should store in memory + // a copy of the stream to be able to send it again, but since we don't know in advance + // the size of the stream, we risk to take too much memory. + // Furthermore, copying every time the stream is very a expensive operation. + const maxRetries = isStream(params.body ?? params.bulkBody) ? 0 : (typeof options.maxRetries === 'number' ? options.maxRetries : this[kMaxRetries]) + const compression = typeof options.compression === 'boolean' ? options.compression : this[kCompression] + const signal = options.signal + const maxResponseSize = options.maxResponseSize ?? this[kMaxResponseSize] + const maxCompressedResponseSize = options.maxCompressedResponseSize ?? this[kMaxCompressedResponseSize] + + const errorOptions: ErrorOptions = { + redaction: typeof options.redaction === 'object' ? options.redaction : this[kRedaction] + } - this[kDiagnostic].emit('serialization', null, result) - const headers = Object.assign({}, this[kHeaders], lowerCaseHeaders(options.headers)) + this[kDiagnostic].emit('serialization', null, result) + const headers = Object.assign({}, this[kHeaders], lowerCaseHeaders(options.headers)) - if (options.opaqueId !== undefined) { - headers['x-opaque-id'] = typeof this[kOpaqueIdPrefix] === 'string' - ? this[kOpaqueIdPrefix] + options.opaqueId // eslint-disable-line - : options.opaqueId - } + if (options.opaqueId !== undefined) { + headers['x-opaque-id'] = typeof this[kOpaqueIdPrefix] === 'string' + ? this[kOpaqueIdPrefix] + options.opaqueId // eslint-disable-line + : options.opaqueId + } - // handle json body - if (params.body != null) { - if (shouldSerialize(params.body)) { - try { - connectionParams.body = this[kSerializer].serialize(params.body) - } catch (err: any) { - this[kDiagnostic].emit('request', err, result) - throw err + // handle json body + if (params.body != null) { + if (shouldSerialize(params.body)) { + try { + connectionParams.body = this[kSerializer].serialize(params.body) + } catch (err: any) { + this[kDiagnostic].emit('request', err, result) + throw err + } + headers['content-type'] = headers['content-type'] ?? this[kJsonContentType] + headers.accept = headers.accept ?? this[kJsonContentType] + } else { + if (params.body !== '') { + headers['content-type'] = headers['content-type'] ?? 'text/plain' + headers.accept = headers.accept ?? this[kAcceptHeader] + } + connectionParams.body = params.body } - headers['content-type'] = headers['content-type'] ?? this[kJsonContentType] - headers.accept = headers.accept ?? this[kJsonContentType] - } else { - if (params.body !== '') { - headers['content-type'] = headers['content-type'] ?? 'text/plain' - headers.accept = headers.accept ?? this[kAcceptHeader] + + // handle ndjson body + } else if (params.bulkBody != null) { + if (shouldSerialize(params.bulkBody)) { + try { + connectionParams.body = this[kSerializer].ndserialize(params.bulkBody as Array>) + } catch (err: any) { + this[kDiagnostic].emit('request', err, result) + throw err + } + } else { + connectionParams.body = params.bulkBody } - connectionParams.body = params.body - } - // handle ndjson body - } else if (params.bulkBody != null) { - if (shouldSerialize(params.bulkBody)) { - try { - connectionParams.body = this[kSerializer].ndserialize(params.bulkBody as Array>) - } catch (err: any) { - this[kDiagnostic].emit('request', err, result) - throw err + if (connectionParams.body !== '') { + headers['content-type'] = headers['content-type'] ?? this[kNdjsonContentType] + headers.accept = headers.accept ?? this[kJsonContentType] } - } else { - connectionParams.body = params.bulkBody } - if (connectionParams.body !== '') { - headers['content-type'] = headers['content-type'] ?? this[kNdjsonContentType] - headers.accept = headers.accept ?? this[kJsonContentType] + // serializes the querystring + if (options.querystring == null) { + connectionParams.querystring = this[kSerializer].qserialize(params.querystring) + } else { + connectionParams.querystring = this[kSerializer].qserialize( + Object.assign({}, params.querystring, options.querystring) + ) } - } - // serializes the querystring - if (options.querystring == null) { - connectionParams.querystring = this[kSerializer].qserialize(params.querystring) - } else { - connectionParams.querystring = this[kSerializer].qserialize( - Object.assign({}, params.querystring, options.querystring) - ) - } - - // handle compression - if (connectionParams.body !== '' && connectionParams.body != null) { - if (isStream(connectionParams.body)) { - if (compression) { + // handle compression + if (connectionParams.body !== '' && connectionParams.body != null) { + if (isStream(connectionParams.body)) { + if (compression) { + headers['content-encoding'] = 'gzip' + connectionParams.body = connectionParams.body.pipe(createGzip()) + } + } else if (compression) { + try { + connectionParams.body = await gzip(connectionParams.body) + } catch (err: any) { + /* istanbul ignore next */ + this[kDiagnostic].emit('request', err, result) + /* istanbul ignore next */ + throw err + } headers['content-encoding'] = 'gzip' - connectionParams.body = connectionParams.body.pipe(createGzip()) - } - } else if (compression) { - try { - connectionParams.body = await gzip(connectionParams.body) - } catch (err: any) { - /* istanbul ignore next */ - this[kDiagnostic].emit('request', err, result) - /* istanbul ignore next */ - throw err + headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line + } else { + headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line } - headers['content-encoding'] = 'gzip' - headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line - } else { - headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line } - } - headers.accept = headers.accept ?? this[kAcceptHeader] - connectionParams.headers = headers - while (meta.attempts <= maxRetries) { - try { - if (signal?.aborted) { // eslint-disable-line - throw new RequestAbortedError('Request has been aborted by the user', result, errorOptions) - } + headers.accept = headers.accept ?? this[kAcceptHeader] + connectionParams.headers = headers + while (meta.attempts <= maxRetries) { + try { + if (signal?.aborted) { // eslint-disable-line + throw new RequestAbortedError('Request has been aborted by the user', result, errorOptions) + } - meta.connection = this.getConnection({ - requestId: meta.request.id, - context: meta.context - }) - if (meta.connection === null) { - throw new NoLivingConnectionsError('There are no living connections', result, errorOptions) - } + meta.connection = this.getConnection({ + requestId: meta.request.id, + context: meta.context + }) + if (meta.connection === null) { + throw new NoLivingConnectionsError('There are no living connections', result, errorOptions) + } - this[kDiagnostic].emit('request', null, result) - - // perform the actual http request - let { statusCode, headers, body } = await meta.connection.request(connectionParams, { - requestId: meta.request.id, - name: this[kName], - context: meta.context, - maxResponseSize, - maxCompressedResponseSize, - signal, - timeout: toMs(options.requestTimeout != null ? options.requestTimeout : this[kRequestTimeout]), - ...(options.asStream === true ? { asStream: true } : null) - }) - result.statusCode = statusCode - result.headers = headers - - if (this[kProductCheck] != null && headers['x-elastic-product'] !== this[kProductCheck] && statusCode >= 200 && statusCode < 300) { - /* eslint-disable @typescript-eslint/prefer-ts-expect-error */ - // @ts-ignore - throw new ProductNotSupportedError(this[kProductCheck], result, errorOptions) - /* eslint-enable @typescript-eslint/prefer-ts-expect-error */ - } + // generate required OpenTelemetry attributes from the request URL + const requestUrl = meta.connection.url + otelSpan?.setAttributes({ + 'url.full': requestUrl.toString(), + 'server.address': requestUrl.hostname + }) + if (requestUrl.port === '') { + if (requestUrl.protocol === 'https:') { + otelSpan?.setAttribute('server.port', 443) + } else if (requestUrl.protocol === 'http:') { + otelSpan?.setAttribute('server.port', 80) + } + } else if (requestUrl.port !== '9200') { + otelSpan?.setAttribute('server.port', parseInt(requestUrl.port, 10)) + } - if (options.asStream === true) { - result.body = body - this[kDiagnostic].emit('response', null, result) - return returnMeta ? result : body - } + this[kDiagnostic].emit('request', null, result) + + // perform the actual http request + let { statusCode, headers, body } = await meta.connection.request(connectionParams, { + requestId: meta.request.id, + name: this[kName], + context: meta.context, + maxResponseSize, + maxCompressedResponseSize, + signal, + timeout: toMs(options.requestTimeout != null ? options.requestTimeout : this[kRequestTimeout]), + ...(options.asStream === true ? { asStream: true } : null) + }) + result.statusCode = statusCode + result.headers = headers + + if (headers['x-found-handling-cluster'] != null) { + otelSpan?.setAttribute('db.elasticsearch.cluster.name', headers['x-found-handling-cluster']) + } - const contentEncoding = (headers['content-encoding'] ?? '').toLowerCase() - if (contentEncoding.includes('gzip') || contentEncoding.includes('deflate')) { - body = await unzip(body) - } + if (headers['x-found-handling-instance'] != null) { + otelSpan?.setAttribute('db.elasticsearch.node.name', headers['x-found-handling-instance']) + } - const isVectorTile = (headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile') - if (Buffer.isBuffer(body) && !isVectorTile) { - body = body.toString() - } + if (this[kProductCheck] != null && headers['x-elastic-product'] !== this[kProductCheck] && statusCode >= 200 && statusCode < 300) { + /* eslint-disable @typescript-eslint/prefer-ts-expect-error */ + // @ts-ignore + throw new ProductNotSupportedError(this[kProductCheck], result, errorOptions) + /* eslint-enable @typescript-eslint/prefer-ts-expect-error */ + } - const isHead = params.method === 'HEAD' - // we should attempt the payload deserialization only if: - // - a `content-type` is defined and is equal to `application/json` - // - the request is not a HEAD request - // - the payload is not an empty string - if (headers['content-type'] !== undefined && - (headers['content-type']?.includes('application/json') || - headers['content-type']?.includes('application/vnd.elasticsearch+json')) && - !isHead && body !== '') { // eslint-disable-line - result.body = this[kSerializer].deserialize(body as string) - } else { - // cast to boolean if the request method was HEAD and there was no error - result.body = isHead && statusCode < 400 ? true : body - } + if (options.asStream === true) { + result.body = body + this[kDiagnostic].emit('response', null, result) + return returnMeta ? result : body + } - // we should ignore the statusCode if the user has configured the `ignore` field with - // the statusCode we just got or if the request method is HEAD and the statusCode is 404 - const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.includes(statusCode)) || - (isHead && statusCode === 404) - - if (!ignoreStatusCode && (statusCode === 502 || statusCode === 503 || statusCode === 504)) { - // if the statusCode is 502/3/4 we should run our retry strategy - // and mark the connection as dead - this[kConnectionPool].markDead(meta.connection) - // retry logic - if (meta.attempts < maxRetries) { - meta.attempts++ - debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) - continue + const contentEncoding = (headers['content-encoding'] ?? '').toLowerCase() + if (contentEncoding.includes('gzip') || contentEncoding.includes('deflate')) { + body = await unzip(body) } - } else { - // everything has worked as expected, let's mark - // the connection as alive (or confirm it) - this[kConnectionPool].markAlive(meta.connection) - } - if (!ignoreStatusCode && statusCode >= 400) { - throw new ResponseError(result, errorOptions) - } else { - // cast to boolean if the request method was HEAD - if (isHead && statusCode === 404) { - result.body = false + const isVectorTile = (headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile') + if (Buffer.isBuffer(body) && !isVectorTile) { + body = body.toString() } - this[kDiagnostic].emit('response', null, result) - return returnMeta ? result : result.body - } - } catch (error: any) { - switch (error.name) { - // should not retry - case 'ProductNotSupportedError': - case 'NoLivingConnectionsError': - case 'DeserializationError': - case 'ResponseError': - this[kDiagnostic].emit('response', error, result) - throw error - case 'RequestAbortedError': { - meta.aborted = true - // Wrap the error to get a clean stack trace - const wrappedError = new RequestAbortedError(error.message, result, errorOptions) - this[kDiagnostic].emit('response', wrappedError, result) - throw wrappedError + + const isHead = params.method === 'HEAD' + // we should attempt the payload deserialization only if: + // - a `content-type` is defined and is equal to `application/json` + // - the request is not a HEAD request + // - the payload is not an empty string + if (headers['content-type'] !== undefined && + (headers['content-type']?.includes('application/json') || + headers['content-type']?.includes('application/vnd.elasticsearch+json')) && + !isHead && body !== '') { // eslint-disable-line + result.body = this[kSerializer].deserialize(body as string) + } else { + // cast to boolean if the request method was HEAD and there was no error + result.body = isHead && statusCode < 400 ? true : body } - // should maybe retry - // @ts-expect-error `case` fallthrough is intentional: should retry if retryOnTimeout is true - case 'TimeoutError': - if (!this[kRetryOnTimeout]) { - const wrappedError = new TimeoutError(error.message, result, errorOptions) - this[kDiagnostic].emit('response', wrappedError, result) - throw wrappedError - } - // should retry - // eslint-disable-next-line no-fallthrough - case 'ConnectionError': { - // if there is an error in the connection - // let's mark the connection as dead - this[kConnectionPool].markDead(meta.connection as Connection) - - if (this[kSniffOnConnectionFault]) { - this.sniff({ - reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT, - requestId: meta.request.id, - context: meta.context - }) - } + // we should ignore the statusCode if the user has configured the `ignore` field with + // the statusCode we just got or if the request method is HEAD and the statusCode is 404 + const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.includes(statusCode)) || + (isHead && statusCode === 404) + + if (!ignoreStatusCode && (statusCode === 502 || statusCode === 503 || statusCode === 504)) { + // if the statusCode is 502/3/4 we should run our retry strategy + // and mark the connection as dead + this[kConnectionPool].markDead(meta.connection) // retry logic if (meta.attempts < maxRetries) { meta.attempts++ debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) + continue + } + } else { + // everything has worked as expected, let's mark + // the connection as alive (or confirm it) + this[kConnectionPool].markAlive(meta.connection) + } - // exponential backoff on retries, with jitter - const backoff = options.retryBackoff ?? this[kRetryBackoff] - const backoffWait = backoff(0, 4, meta.attempts) - if (backoffWait > 0) { - await setTimeoutPromise(backoffWait * 1000) + if (!ignoreStatusCode && statusCode >= 400) { + throw new ResponseError(result, errorOptions) + } else { + // cast to boolean if the request method was HEAD + if (isHead && statusCode === 404) { + result.body = false + } + this[kDiagnostic].emit('response', null, result) + return returnMeta ? result : result.body + } + } catch (error: any) { + switch (error.name) { + // should not retry + case 'ProductNotSupportedError': + case 'NoLivingConnectionsError': + case 'DeserializationError': + case 'ResponseError': + this[kDiagnostic].emit('response', error, result) + throw error + case 'RequestAbortedError': { + meta.aborted = true + // Wrap the error to get a clean stack trace + const wrappedError = new RequestAbortedError(error.message, result, errorOptions) + this[kDiagnostic].emit('response', wrappedError, result) + throw wrappedError + } + // should maybe retry + // @ts-expect-error `case` fallthrough is intentional: should retry if retryOnTimeout is true + case 'TimeoutError': + if (!this[kRetryOnTimeout]) { + const wrappedError = new TimeoutError(error.message, result, errorOptions) + this[kDiagnostic].emit('response', wrappedError, result) + throw wrappedError + } + // should retry + // eslint-disable-next-line no-fallthrough + case 'ConnectionError': { + // if there is an error in the connection + // let's mark the connection as dead + this[kConnectionPool].markDead(meta.connection as Connection) + + if (this[kSniffOnConnectionFault]) { + this.sniff({ + reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT, + requestId: meta.request.id, + context: meta.context + }) } - continue + // retry logic + if (meta.attempts < maxRetries) { + meta.attempts++ + debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) + + // exponential backoff on retries, with jitter + const backoff = options.retryBackoff ?? this[kRetryBackoff] + const backoffWait = backoff(0, 4, meta.attempts) + if (backoffWait > 0) { + await setTimeoutPromise(backoffWait * 1000) + } + + continue + } + + // Wrap the error to get a clean stack trace + const wrappedError = error.name === 'TimeoutError' + ? new TimeoutError(error.message, result, errorOptions) + : new ConnectionError(error.message, result, errorOptions) + this[kDiagnostic].emit('response', wrappedError, result) + throw wrappedError } - // Wrap the error to get a clean stack trace - const wrappedError = error.name === 'TimeoutError' - ? new TimeoutError(error.message, result, errorOptions) - : new ConnectionError(error.message, result, errorOptions) - this[kDiagnostic].emit('response', wrappedError, result) - throw wrappedError + // edge cases, such as bad compression + default: + this[kDiagnostic].emit('response', error, result) + throw error } - - // edge cases, such as bad compression - default: - this[kDiagnostic].emit('response', error, result) - throw error } } + + return returnMeta ? result : result.body } - return returnMeta ? result : result.body + // wrap in OpenTelemetry span + if (params.meta?.name != null) { + return await otelTracer.startActiveSpan(params.meta.name, async (span) => { + otelSpan = span + otelSpan.setAttributes({ + 'db.system': 'elasticsearch', + 'http.request.method': params.method, + 'db.operation.name': params.meta?.name + }) + + if (params.meta?.pathParts != null) { + for (const key of Object.keys(params.meta.pathParts)) { + otelSpan.setAttribute(`db.elasticsearch.path_parts.${key}`, params.meta.pathParts[key]) + } + } + + let response + try { + response = await _request() + } catch (err: any) { + otelSpan.recordException(err as Exception) + otelSpan.setStatus({ code: SpanStatusCode.ERROR }) + otelSpan.setAttribute('error.type', err.name ?? 'Error') + + throw err + } finally { + otelSpan.end() + } + + return response + }) + } else { + return await _request() + } } getConnection (opts: GetConnectionOptions): Connection | null { From 46ceb2352c49bd83f598264f527d411ea253cfa5 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Tue, 18 Jun 2024 13:44:07 -0500 Subject: [PATCH 2/8] OpenTelemetry tests Add test for OpenTelemetry error tracking --- package.json | 4 +- test/unit/transport.test.ts | 120 ++++++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 722297b..00d65f6 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "node": ">=18" }, "devDependencies": { + "@opentelemetry/sdk-trace-base": "^1.25.0", "@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1", "@tapjs/clock": "^1.1.24", "@types/debug": "^4.1.7", @@ -72,7 +73,8 @@ "tap": { "allow-incomplete-coverage": true, "plugin": [ - "@tapjs/clock" + "@tapjs/clock", + "@tapjs/before" ] } } diff --git a/test/unit/transport.test.ts b/test/unit/transport.test.ts index 4242243..e304257 100644 --- a/test/unit/transport.test.ts +++ b/test/unit/transport.test.ts @@ -26,6 +26,7 @@ import os from 'os' import { Readable } from 'stream' import intoStream from 'into-stream' import * as http from 'http' +import { BasicTracerProvider, InMemorySpanExporter, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base' import { Transport, Serializer, @@ -2269,3 +2270,122 @@ test('redaction does not get leaked to original object', async t => { } server.stop() }) + +test('OpenTelemetry', t => { + let processor: SimpleSpanProcessor + let provider: BasicTracerProvider + let exporter: InMemorySpanExporter + + t.before(() => { + exporter = new InMemorySpanExporter() + processor = new SimpleSpanProcessor(exporter) + provider = new BasicTracerProvider() + provider.addSpanProcessor(processor) + provider.register() + }) + + t.afterEach(async () => { + await provider.forceFlush() + exporter.reset() + }) + + t.after(async () => { + await provider.shutdown() + }) + + t.test('basic details', async t => { + t.plan(2) + + function handler (req: http.IncomingMessage, res: http.ServerResponse) { + res.end('ok') + } + + const [{ port }, server] = await buildServer(handler) + const pool = new WeightedConnectionPool({ Connection: UndiciConnection }) + pool.addConnection(`http://localhost:${port}`) + const transport = new Transport({ connectionPool: pool }) + + await transport.request({ + path: '/hello', + method: 'GET', + meta: { name: 'hello' }, + }) + + const spans = exporter.getFinishedSpans() + + t.same(spans[0].attributes, { + 'db.system': 'elasticsearch', + 'http.request.method': 'GET', + 'db.operation.name': 'hello', + 'url.full': `http://localhost:${port}/`, + 'server.address': 'localhost', + 'server.port': port, + }) + t.equal(spans[0].status.code, 0) + + server.stop() + }) + + t.test('cloud cluster and instance details', async t => { + t.plan(2) + + function handler (_req: http.IncomingMessage, res: http.ServerResponse) { + res.setHeader('x-found-handling-cluster', 'foobar') + res.setHeader('x-found-handling-instance', 'instance-1') + res.end('ok') + } + + const [{ port }, server] = await buildServer(handler) + const pool = new WeightedConnectionPool({ Connection: UndiciConnection }) + pool.addConnection(`http://localhost:${port}`) + const transport = new Transport({ connectionPool: pool }) + + await transport.request({ + path: '/hello2', + method: 'GET', + meta: { name: 'hello.2' }, + }) + + const spans = exporter.getFinishedSpans() + t.same(spans[0].attributes, { + 'db.system': 'elasticsearch', + 'http.request.method': 'GET', + 'db.operation.name': 'hello.2', + 'url.full': `http://localhost:${port}/`, + 'server.address': 'localhost', + 'server.port': port, + 'db.elasticsearch.cluster.name': 'foobar', + 'db.elasticsearch.node.name': 'instance-1', + }) + t.equal(spans[0].status.code, 0) + + server.stop() + }) + + t.test('span records error state', async t => { + t.plan(2) + + const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout }) + pool.addConnection('http://localhost:9200') + + const transport = new Transport({ + connectionPool: pool, + }) + + try { + await transport.request({ + path: '/hello2', + method: 'GET', + meta: { name: 'hello.2' }, + }) + } catch (err: any) { + t.ok(err instanceof Error) + } + + const spans = exporter.getFinishedSpans() + + t.not(spans[0].status.code, 0) + }) + + t.end() +}) From e914c47d015520ceab944af4eb97bbba6d4fb675 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Tue, 18 Jun 2024 17:27:03 -0500 Subject: [PATCH 3/8] Set error.type on OpenTelemetry spans --- test/unit/transport.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/unit/transport.test.ts b/test/unit/transport.test.ts index e304257..a7e79cb 100644 --- a/test/unit/transport.test.ts +++ b/test/unit/transport.test.ts @@ -2363,7 +2363,7 @@ test('OpenTelemetry', t => { }) t.test('span records error state', async t => { - t.plan(2) + t.plan(3) const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout }) pool.addConnection('http://localhost:9200') @@ -2384,6 +2384,7 @@ test('OpenTelemetry', t => { const spans = exporter.getFinishedSpans() + t.equal(spans[0].attributes['error.type'], 'TimeoutError') t.not(spans[0].status.code, 0) }) From be182ddd18ba7e46886648567f67b1b691718468 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Thu, 27 Jun 2024 13:05:50 -0500 Subject: [PATCH 4/8] Provide client version when getting tracer Co-authored-by: Trent Mick --- src/Transport.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Transport.ts b/src/Transport.ts index a0c0d46..1ad2b84 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -338,7 +338,7 @@ export default class Transport { async request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise> async request (params: TransportRequestParams, options?: TransportRequestOptions): Promise async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise { - const otelTracer = opentelemetry.trace.getTracer('elastic-transport') + const otelTracer = opentelemetry.trace.getTracer('@elastic/transport', clientVersion) let otelSpan: Span const _request = async (): Promise => { From ec782d636482749df8ec2386aa399869f7a6569a Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Thu, 27 Jun 2024 13:11:36 -0500 Subject: [PATCH 5/8] Represent OTel dependencies more accurately --- package.json | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/package.json b/package.json index 00d65f6..522db73 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "workq": "^3.0.0" }, "dependencies": { + "@opentelemetry/api": "1.x", "debug": "^4.3.4", "hpagent": "^1.0.0", "ms": "^2.1.3", @@ -66,10 +67,6 @@ "tslib": "^2.4.0", "undici": "^6.12.0" }, - "peerDependencies": { - "@opentelemetry/api": "1.x", - "@opentelemetry/sdk-trace-base": "1.x" - }, "tap": { "allow-incomplete-coverage": true, "plugin": [ From 96cb20ff4f5e81a6786288d1aa3b12ed36a59864 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Thu, 27 Jun 2024 13:26:14 -0500 Subject: [PATCH 6/8] Fetch OTel tracer at instantiation --- src/Transport.ts | 9 ++++++--- src/symbols.ts | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/Transport.ts b/src/Transport.ts index 1ad2b84..1c2b7af 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -76,10 +76,11 @@ import { kNdjsonContentType, kAcceptHeader, kRedaction, - kRetryBackoff + kRetryBackoff, + kOtelTracer } from './symbols' import { setTimeout as setTimeoutPromise } from 'node:timers/promises' -import opentelemetry, { Exception, SpanStatusCode, Span } from '@opentelemetry/api' +import opentelemetry, { Exception, SpanStatusCode, Span, Tracer } from '@opentelemetry/api' const { version: clientVersion } = require('../package.json') // eslint-disable-line const debug = Debug('elasticsearch') @@ -228,6 +229,7 @@ export default class Transport { [kAcceptHeader]: string [kRedaction]: RedactionOptions [kRetryBackoff]: (min: number, max: number, attempt: number) => number + [kOtelTracer]: Tracer static sniffReasons = { SNIFF_ON_START: 'sniff-on-start', @@ -290,6 +292,7 @@ export default class Transport { this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain' this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] } this[kRetryBackoff] = opts.retryBackoff ?? retryBackoff + this[kOtelTracer] = opentelemetry.trace.getTracer('@elastic/transport', clientVersion) if (opts.sniffOnStart === true) { this.sniff({ @@ -338,7 +341,7 @@ export default class Transport { async request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise> async request (params: TransportRequestParams, options?: TransportRequestOptions): Promise async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise { - const otelTracer = opentelemetry.trace.getTracer('@elastic/transport', clientVersion) + const otelTracer = this[kOtelTracer] let otelSpan: Span const _request = async (): Promise => { diff --git a/src/symbols.ts b/src/symbols.ts index a6ae316..ccfce13 100644 --- a/src/symbols.ts +++ b/src/symbols.ts @@ -48,3 +48,4 @@ export const kNdjsonContentType = Symbol('ndjson content type') export const kAcceptHeader = Symbol('accept header') export const kRedaction = Symbol('redaction') export const kRetryBackoff = Symbol('retry backoff') +export const kOtelTracer = Symbol('opentelemetry tracer') From 1cae465a681a83935d9b566a39c3dce3acead714 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Thu, 27 Jun 2024 13:34:24 -0500 Subject: [PATCH 7/8] Refactor _request into private method --- src/Transport.ts | 601 ++++++++++++++++++++++++----------------------- 1 file changed, 302 insertions(+), 299 deletions(-) diff --git a/src/Transport.ts b/src/Transport.ts index 1c2b7af..3e31dea 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -337,356 +337,359 @@ export default class Transport { return this[kDiagnostic] } - async request (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta): Promise - async request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise> - async request (params: TransportRequestParams, options?: TransportRequestOptions): Promise - async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise { - const otelTracer = this[kOtelTracer] - let otelSpan: Span - - const _request = async (): Promise => { - const connectionParams: ConnectionRequestParams = { - method: params.method, - path: params.path - } + private async _request (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta, otelSpan?: Span): Promise + private async _request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta, otelSpan?: Span): Promise> + private async _request (params: TransportRequestParams, options?: TransportRequestOptions, otelSpan?: Span): Promise + private async _request (params: TransportRequestParams, options: TransportRequestOptions = {}, otelSpan?: Span): Promise { + const connectionParams: ConnectionRequestParams = { + method: params.method, + path: params.path + } - const meta: TransportResult['meta'] = { - context: null, - request: { - params: connectionParams, - options: options, - id: options.id ?? this[kGenerateRequestId](params, options) - }, - name: this[kName], - connection: null, - attempts: 0, - aborted: false - } + const meta: TransportResult['meta'] = { + context: null, + request: { + params: connectionParams, + options: options, + id: options.id ?? this[kGenerateRequestId](params, options) + }, + name: this[kName], + connection: null, + attempts: 0, + aborted: false + } - const returnMeta = options.meta ?? false + const returnMeta = options.meta ?? false - if (this[kContext] != null && options.context != null) { - meta.context = Object.assign({}, this[kContext], options.context) - } else if (this[kContext] !== null) { - meta.context = this[kContext] - } else if (options.context != null) { - meta.context = options.context - } + if (this[kContext] != null && options.context != null) { + meta.context = Object.assign({}, this[kContext], options.context) + } else if (this[kContext] !== null) { + meta.context = this[kContext] + } else if (options.context != null) { + meta.context = options.context + } - const result: TransportResult = { - // the default body value can't be `null` - // as it's a valid JSON value - body: undefined, - statusCode: 0, - headers: {}, - meta, - get warnings () { - if (this.headers?.warning == null) { - return null - } - const { warning } = this.headers - // if multiple HTTP headers have the same name, Undici represents them as an array - const warnings: string[] = Array.isArray(warning) ? warning : [warning] - return warnings - .flatMap(w => w.split(/(?!\B"[^"]*),(?![^"]*"\B)/)) - .filter((warning) => warning.match(/^\d\d\d Elasticsearch-/)) + const result: TransportResult = { + // the default body value can't be `null` + // as it's a valid JSON value + body: undefined, + statusCode: 0, + headers: {}, + meta, + get warnings () { + if (this.headers?.warning == null) { + return null } + const { warning } = this.headers + // if multiple HTTP headers have the same name, Undici represents them as an array + const warnings: string[] = Array.isArray(warning) ? warning : [warning] + return warnings + .flatMap(w => w.split(/(?!\B"[^"]*),(?![^"]*"\B)/)) + .filter((warning) => warning.match(/^\d\d\d Elasticsearch-/)) } + } - // We should not retry if we are sending a stream body, because we should store in memory - // a copy of the stream to be able to send it again, but since we don't know in advance - // the size of the stream, we risk to take too much memory. - // Furthermore, copying every time the stream is very a expensive operation. - const maxRetries = isStream(params.body ?? params.bulkBody) ? 0 : (typeof options.maxRetries === 'number' ? options.maxRetries : this[kMaxRetries]) - const compression = typeof options.compression === 'boolean' ? options.compression : this[kCompression] - const signal = options.signal - const maxResponseSize = options.maxResponseSize ?? this[kMaxResponseSize] - const maxCompressedResponseSize = options.maxCompressedResponseSize ?? this[kMaxCompressedResponseSize] - - const errorOptions: ErrorOptions = { - redaction: typeof options.redaction === 'object' ? options.redaction : this[kRedaction] - } + // We should not retry if we are sending a stream body, because we should store in memory + // a copy of the stream to be able to send it again, but since we don't know in advance + // the size of the stream, we risk to take too much memory. + // Furthermore, copying every time the stream is very a expensive operation. + const maxRetries = isStream(params.body ?? params.bulkBody) ? 0 : (typeof options.maxRetries === 'number' ? options.maxRetries : this[kMaxRetries]) + const compression = typeof options.compression === 'boolean' ? options.compression : this[kCompression] + const signal = options.signal + const maxResponseSize = options.maxResponseSize ?? this[kMaxResponseSize] + const maxCompressedResponseSize = options.maxCompressedResponseSize ?? this[kMaxCompressedResponseSize] + + const errorOptions: ErrorOptions = { + redaction: typeof options.redaction === 'object' ? options.redaction : this[kRedaction] + } - this[kDiagnostic].emit('serialization', null, result) - const headers = Object.assign({}, this[kHeaders], lowerCaseHeaders(options.headers)) + this[kDiagnostic].emit('serialization', null, result) + const headers = Object.assign({}, this[kHeaders], lowerCaseHeaders(options.headers)) - if (options.opaqueId !== undefined) { - headers['x-opaque-id'] = typeof this[kOpaqueIdPrefix] === 'string' - ? this[kOpaqueIdPrefix] + options.opaqueId // eslint-disable-line - : options.opaqueId - } + if (options.opaqueId !== undefined) { + headers['x-opaque-id'] = typeof this[kOpaqueIdPrefix] === 'string' + ? this[kOpaqueIdPrefix] + options.opaqueId // eslint-disable-line + : options.opaqueId + } - // handle json body - if (params.body != null) { - if (shouldSerialize(params.body)) { - try { - connectionParams.body = this[kSerializer].serialize(params.body) - } catch (err: any) { - this[kDiagnostic].emit('request', err, result) - throw err - } - headers['content-type'] = headers['content-type'] ?? this[kJsonContentType] - headers.accept = headers.accept ?? this[kJsonContentType] - } else { - if (params.body !== '') { - headers['content-type'] = headers['content-type'] ?? 'text/plain' - headers.accept = headers.accept ?? this[kAcceptHeader] - } - connectionParams.body = params.body + // handle json body + if (params.body != null) { + if (shouldSerialize(params.body)) { + try { + connectionParams.body = this[kSerializer].serialize(params.body) + } catch (err: any) { + this[kDiagnostic].emit('request', err, result) + throw err } - - // handle ndjson body - } else if (params.bulkBody != null) { - if (shouldSerialize(params.bulkBody)) { - try { - connectionParams.body = this[kSerializer].ndserialize(params.bulkBody as Array>) - } catch (err: any) { - this[kDiagnostic].emit('request', err, result) - throw err - } - } else { - connectionParams.body = params.bulkBody + headers['content-type'] = headers['content-type'] ?? this[kJsonContentType] + headers.accept = headers.accept ?? this[kJsonContentType] + } else { + if (params.body !== '') { + headers['content-type'] = headers['content-type'] ?? 'text/plain' + headers.accept = headers.accept ?? this[kAcceptHeader] } + connectionParams.body = params.body + } - if (connectionParams.body !== '') { - headers['content-type'] = headers['content-type'] ?? this[kNdjsonContentType] - headers.accept = headers.accept ?? this[kJsonContentType] + // handle ndjson body + } else if (params.bulkBody != null) { + if (shouldSerialize(params.bulkBody)) { + try { + connectionParams.body = this[kSerializer].ndserialize(params.bulkBody as Array>) + } catch (err: any) { + this[kDiagnostic].emit('request', err, result) + throw err } + } else { + connectionParams.body = params.bulkBody } - // serializes the querystring - if (options.querystring == null) { - connectionParams.querystring = this[kSerializer].qserialize(params.querystring) - } else { - connectionParams.querystring = this[kSerializer].qserialize( - Object.assign({}, params.querystring, options.querystring) - ) + if (connectionParams.body !== '') { + headers['content-type'] = headers['content-type'] ?? this[kNdjsonContentType] + headers.accept = headers.accept ?? this[kJsonContentType] } + } - // handle compression - if (connectionParams.body !== '' && connectionParams.body != null) { - if (isStream(connectionParams.body)) { - if (compression) { - headers['content-encoding'] = 'gzip' - connectionParams.body = connectionParams.body.pipe(createGzip()) - } - } else if (compression) { - try { - connectionParams.body = await gzip(connectionParams.body) - } catch (err: any) { - /* istanbul ignore next */ - this[kDiagnostic].emit('request', err, result) - /* istanbul ignore next */ - throw err - } + // serializes the querystring + if (options.querystring == null) { + connectionParams.querystring = this[kSerializer].qserialize(params.querystring) + } else { + connectionParams.querystring = this[kSerializer].qserialize( + Object.assign({}, params.querystring, options.querystring) + ) + } + + // handle compression + if (connectionParams.body !== '' && connectionParams.body != null) { + if (isStream(connectionParams.body)) { + if (compression) { headers['content-encoding'] = 'gzip' - headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line - } else { - headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line + connectionParams.body = connectionParams.body.pipe(createGzip()) } + } else if (compression) { + try { + connectionParams.body = await gzip(connectionParams.body) + } catch (err: any) { + /* istanbul ignore next */ + this[kDiagnostic].emit('request', err, result) + /* istanbul ignore next */ + throw err + } + headers['content-encoding'] = 'gzip' + headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line + } else { + headers['content-length'] = '' + Buffer.byteLength(connectionParams.body) // eslint-disable-line } + } - headers.accept = headers.accept ?? this[kAcceptHeader] - connectionParams.headers = headers - while (meta.attempts <= maxRetries) { - try { - if (signal?.aborted) { // eslint-disable-line - throw new RequestAbortedError('Request has been aborted by the user', result, errorOptions) - } + headers.accept = headers.accept ?? this[kAcceptHeader] + connectionParams.headers = headers + while (meta.attempts <= maxRetries) { + try { + if (signal?.aborted) { // eslint-disable-line + throw new RequestAbortedError('Request has been aborted by the user', result, errorOptions) + } - meta.connection = this.getConnection({ - requestId: meta.request.id, - context: meta.context - }) - if (meta.connection === null) { - throw new NoLivingConnectionsError('There are no living connections', result, errorOptions) - } + meta.connection = this.getConnection({ + requestId: meta.request.id, + context: meta.context + }) + if (meta.connection === null) { + throw new NoLivingConnectionsError('There are no living connections', result, errorOptions) + } - // generate required OpenTelemetry attributes from the request URL - const requestUrl = meta.connection.url - otelSpan?.setAttributes({ - 'url.full': requestUrl.toString(), - 'server.address': requestUrl.hostname - }) - if (requestUrl.port === '') { - if (requestUrl.protocol === 'https:') { - otelSpan?.setAttribute('server.port', 443) - } else if (requestUrl.protocol === 'http:') { - otelSpan?.setAttribute('server.port', 80) - } - } else if (requestUrl.port !== '9200') { - otelSpan?.setAttribute('server.port', parseInt(requestUrl.port, 10)) + // generate required OpenTelemetry attributes from the request URL + const requestUrl = meta.connection.url + otelSpan?.setAttributes({ + 'url.full': requestUrl.toString(), + 'server.address': requestUrl.hostname + }) + if (requestUrl.port === '') { + if (requestUrl.protocol === 'https:') { + otelSpan?.setAttribute('server.port', 443) + } else if (requestUrl.protocol === 'http:') { + otelSpan?.setAttribute('server.port', 80) } + } else if (requestUrl.port !== '9200') { + otelSpan?.setAttribute('server.port', parseInt(requestUrl.port, 10)) + } - this[kDiagnostic].emit('request', null, result) - - // perform the actual http request - let { statusCode, headers, body } = await meta.connection.request(connectionParams, { - requestId: meta.request.id, - name: this[kName], - context: meta.context, - maxResponseSize, - maxCompressedResponseSize, - signal, - timeout: toMs(options.requestTimeout != null ? options.requestTimeout : this[kRequestTimeout]), - ...(options.asStream === true ? { asStream: true } : null) - }) - result.statusCode = statusCode - result.headers = headers - - if (headers['x-found-handling-cluster'] != null) { - otelSpan?.setAttribute('db.elasticsearch.cluster.name', headers['x-found-handling-cluster']) - } + this[kDiagnostic].emit('request', null, result) + + // perform the actual http request + let { statusCode, headers, body } = await meta.connection.request(connectionParams, { + requestId: meta.request.id, + name: this[kName], + context: meta.context, + maxResponseSize, + maxCompressedResponseSize, + signal, + timeout: toMs(options.requestTimeout != null ? options.requestTimeout : this[kRequestTimeout]), + ...(options.asStream === true ? { asStream: true } : null) + }) + result.statusCode = statusCode + result.headers = headers - if (headers['x-found-handling-instance'] != null) { - otelSpan?.setAttribute('db.elasticsearch.node.name', headers['x-found-handling-instance']) - } + if (headers['x-found-handling-cluster'] != null) { + otelSpan?.setAttribute('db.elasticsearch.cluster.name', headers['x-found-handling-cluster']) + } - if (this[kProductCheck] != null && headers['x-elastic-product'] !== this[kProductCheck] && statusCode >= 200 && statusCode < 300) { - /* eslint-disable @typescript-eslint/prefer-ts-expect-error */ - // @ts-ignore - throw new ProductNotSupportedError(this[kProductCheck], result, errorOptions) - /* eslint-enable @typescript-eslint/prefer-ts-expect-error */ - } + if (headers['x-found-handling-instance'] != null) { + otelSpan?.setAttribute('db.elasticsearch.node.name', headers['x-found-handling-instance']) + } - if (options.asStream === true) { - result.body = body - this[kDiagnostic].emit('response', null, result) - return returnMeta ? result : body - } + if (this[kProductCheck] != null && headers['x-elastic-product'] !== this[kProductCheck] && statusCode >= 200 && statusCode < 300) { + /* eslint-disable @typescript-eslint/prefer-ts-expect-error */ + // @ts-ignore + throw new ProductNotSupportedError(this[kProductCheck], result, errorOptions) + /* eslint-enable @typescript-eslint/prefer-ts-expect-error */ + } - const contentEncoding = (headers['content-encoding'] ?? '').toLowerCase() - if (contentEncoding.includes('gzip') || contentEncoding.includes('deflate')) { - body = await unzip(body) - } + if (options.asStream === true) { + result.body = body + this[kDiagnostic].emit('response', null, result) + return returnMeta ? result : body + } - const isVectorTile = (headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile') - if (Buffer.isBuffer(body) && !isVectorTile) { - body = body.toString() - } + const contentEncoding = (headers['content-encoding'] ?? '').toLowerCase() + if (contentEncoding.includes('gzip') || contentEncoding.includes('deflate')) { + body = await unzip(body) + } - const isHead = params.method === 'HEAD' - // we should attempt the payload deserialization only if: - // - a `content-type` is defined and is equal to `application/json` - // - the request is not a HEAD request - // - the payload is not an empty string - if (headers['content-type'] !== undefined && - (headers['content-type']?.includes('application/json') || - headers['content-type']?.includes('application/vnd.elasticsearch+json')) && - !isHead && body !== '') { // eslint-disable-line - result.body = this[kSerializer].deserialize(body as string) - } else { - // cast to boolean if the request method was HEAD and there was no error - result.body = isHead && statusCode < 400 ? true : body - } + const isVectorTile = (headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile') + if (Buffer.isBuffer(body) && !isVectorTile) { + body = body.toString() + } - // we should ignore the statusCode if the user has configured the `ignore` field with - // the statusCode we just got or if the request method is HEAD and the statusCode is 404 - const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.includes(statusCode)) || - (isHead && statusCode === 404) + const isHead = params.method === 'HEAD' + // we should attempt the payload deserialization only if: + // - a `content-type` is defined and is equal to `application/json` + // - the request is not a HEAD request + // - the payload is not an empty string + if (headers['content-type'] !== undefined && + (headers['content-type']?.includes('application/json') || + headers['content-type']?.includes('application/vnd.elasticsearch+json')) && + !isHead && body !== '') { // eslint-disable-line + result.body = this[kSerializer].deserialize(body as string) + } else { + // cast to boolean if the request method was HEAD and there was no error + result.body = isHead && statusCode < 400 ? true : body + } - if (!ignoreStatusCode && (statusCode === 502 || statusCode === 503 || statusCode === 504)) { - // if the statusCode is 502/3/4 we should run our retry strategy - // and mark the connection as dead - this[kConnectionPool].markDead(meta.connection) - // retry logic - if (meta.attempts < maxRetries) { - meta.attempts++ - debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) - continue - } - } else { - // everything has worked as expected, let's mark - // the connection as alive (or confirm it) - this[kConnectionPool].markAlive(meta.connection) + // we should ignore the statusCode if the user has configured the `ignore` field with + // the statusCode we just got or if the request method is HEAD and the statusCode is 404 + const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.includes(statusCode)) || + (isHead && statusCode === 404) + + if (!ignoreStatusCode && (statusCode === 502 || statusCode === 503 || statusCode === 504)) { + // if the statusCode is 502/3/4 we should run our retry strategy + // and mark the connection as dead + this[kConnectionPool].markDead(meta.connection) + // retry logic + if (meta.attempts < maxRetries) { + meta.attempts++ + debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) + continue } + } else { + // everything has worked as expected, let's mark + // the connection as alive (or confirm it) + this[kConnectionPool].markAlive(meta.connection) + } - if (!ignoreStatusCode && statusCode >= 400) { - throw new ResponseError(result, errorOptions) - } else { - // cast to boolean if the request method was HEAD - if (isHead && statusCode === 404) { - result.body = false - } - this[kDiagnostic].emit('response', null, result) - return returnMeta ? result : result.body + if (!ignoreStatusCode && statusCode >= 400) { + throw new ResponseError(result, errorOptions) + } else { + // cast to boolean if the request method was HEAD + if (isHead && statusCode === 404) { + result.body = false } - } catch (error: any) { - switch (error.name) { - // should not retry - case 'ProductNotSupportedError': - case 'NoLivingConnectionsError': - case 'DeserializationError': - case 'ResponseError': - this[kDiagnostic].emit('response', error, result) - throw error - case 'RequestAbortedError': { - meta.aborted = true - // Wrap the error to get a clean stack trace - const wrappedError = new RequestAbortedError(error.message, result, errorOptions) + this[kDiagnostic].emit('response', null, result) + return returnMeta ? result : result.body + } + } catch (error: any) { + switch (error.name) { + // should not retry + case 'ProductNotSupportedError': + case 'NoLivingConnectionsError': + case 'DeserializationError': + case 'ResponseError': + this[kDiagnostic].emit('response', error, result) + throw error + case 'RequestAbortedError': { + meta.aborted = true + // Wrap the error to get a clean stack trace + const wrappedError = new RequestAbortedError(error.message, result, errorOptions) + this[kDiagnostic].emit('response', wrappedError, result) + throw wrappedError + } + // should maybe retry + // @ts-expect-error `case` fallthrough is intentional: should retry if retryOnTimeout is true + case 'TimeoutError': + if (!this[kRetryOnTimeout]) { + const wrappedError = new TimeoutError(error.message, result, errorOptions) this[kDiagnostic].emit('response', wrappedError, result) throw wrappedError } - // should maybe retry - // @ts-expect-error `case` fallthrough is intentional: should retry if retryOnTimeout is true - case 'TimeoutError': - if (!this[kRetryOnTimeout]) { - const wrappedError = new TimeoutError(error.message, result, errorOptions) - this[kDiagnostic].emit('response', wrappedError, result) - throw wrappedError - } - // should retry - // eslint-disable-next-line no-fallthrough - case 'ConnectionError': { - // if there is an error in the connection - // let's mark the connection as dead - this[kConnectionPool].markDead(meta.connection as Connection) - - if (this[kSniffOnConnectionFault]) { - this.sniff({ - reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT, - requestId: meta.request.id, - context: meta.context - }) - } - - // retry logic - if (meta.attempts < maxRetries) { - meta.attempts++ - debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) + // should retry + // eslint-disable-next-line no-fallthrough + case 'ConnectionError': { + // if there is an error in the connection + // let's mark the connection as dead + this[kConnectionPool].markDead(meta.connection as Connection) + + if (this[kSniffOnConnectionFault]) { + this.sniff({ + reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT, + requestId: meta.request.id, + context: meta.context + }) + } - // exponential backoff on retries, with jitter - const backoff = options.retryBackoff ?? this[kRetryBackoff] - const backoffWait = backoff(0, 4, meta.attempts) - if (backoffWait > 0) { - await setTimeoutPromise(backoffWait * 1000) - } + // retry logic + if (meta.attempts < maxRetries) { + meta.attempts++ + debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) - continue + // exponential backoff on retries, with jitter + const backoff = options.retryBackoff ?? this[kRetryBackoff] + const backoffWait = backoff(0, 4, meta.attempts) + if (backoffWait > 0) { + await setTimeoutPromise(backoffWait * 1000) } - // Wrap the error to get a clean stack trace - const wrappedError = error.name === 'TimeoutError' - ? new TimeoutError(error.message, result, errorOptions) - : new ConnectionError(error.message, result, errorOptions) - this[kDiagnostic].emit('response', wrappedError, result) - throw wrappedError + continue } - // edge cases, such as bad compression - default: - this[kDiagnostic].emit('response', error, result) - throw error + // Wrap the error to get a clean stack trace + const wrappedError = error.name === 'TimeoutError' + ? new TimeoutError(error.message, result, errorOptions) + : new ConnectionError(error.message, result, errorOptions) + this[kDiagnostic].emit('response', wrappedError, result) + throw wrappedError } + + // edge cases, such as bad compression + default: + this[kDiagnostic].emit('response', error, result) + throw error } } - - return returnMeta ? result : result.body } + return returnMeta ? result : result.body + } + + async request (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta): Promise + async request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise> + async request (params: TransportRequestParams, options?: TransportRequestOptions): Promise + async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise { + const otelTracer = this[kOtelTracer] + let otelSpan: Span + // wrap in OpenTelemetry span if (params.meta?.name != null) { - return await otelTracer.startActiveSpan(params.meta.name, async (span) => { + return await otelTracer.startActiveSpan(params.meta.name, async (span: Span) => { otelSpan = span otelSpan.setAttributes({ 'db.system': 'elasticsearch', @@ -702,7 +705,7 @@ export default class Transport { let response try { - response = await _request() + response = await this._request(params, options, otelSpan) } catch (err: any) { otelSpan.recordException(err as Exception) otelSpan.setStatus({ code: SpanStatusCode.ERROR }) @@ -716,7 +719,7 @@ export default class Transport { return response }) } else { - return await _request() + return await this._request(params, options) } } From 264d99e232a967bd407b22cac571570f43df2219 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Thu, 27 Jun 2024 13:47:49 -0500 Subject: [PATCH 8/8] Collect OTel attributes before starting span --- src/Transport.ts | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/Transport.ts b/src/Transport.ts index 3e31dea..3c5d76a 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -80,7 +80,7 @@ import { kOtelTracer } from './symbols' import { setTimeout as setTimeoutPromise } from 'node:timers/promises' -import opentelemetry, { Exception, SpanStatusCode, Span, Tracer } from '@opentelemetry/api' +import opentelemetry, { Attributes, Exception, SpanKind, SpanStatusCode, Span, Tracer } from '@opentelemetry/api' const { version: clientVersion } = require('../package.json') // eslint-disable-line const debug = Debug('elasticsearch') @@ -684,25 +684,21 @@ export default class Transport { async request (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise> async request (params: TransportRequestParams, options?: TransportRequestOptions): Promise async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise { - const otelTracer = this[kOtelTracer] - let otelSpan: Span - // wrap in OpenTelemetry span if (params.meta?.name != null) { - return await otelTracer.startActiveSpan(params.meta.name, async (span: Span) => { - otelSpan = span - otelSpan.setAttributes({ - 'db.system': 'elasticsearch', - 'http.request.method': params.method, - 'db.operation.name': params.meta?.name - }) - - if (params.meta?.pathParts != null) { - for (const key of Object.keys(params.meta.pathParts)) { - otelSpan.setAttribute(`db.elasticsearch.path_parts.${key}`, params.meta.pathParts[key]) - } + // gather OpenTelemetry attributes + const attributes: Attributes = { + 'db.system': 'elasticsearch', + 'http.request.method': params.method, + 'db.operation.name': params.meta?.name + } + if (params.meta?.pathParts != null) { + for (const key of Object.keys(params.meta.pathParts)) { + attributes[`db.elasticsearch.path_parts.${key}`] = params.meta.pathParts[key] } + } + return await this[kOtelTracer].startActiveSpan(params.meta.name, { attributes, kind: SpanKind.CLIENT }, async (otelSpan: Span) => { let response try { response = await this._request(params, options, otelSpan)