From 0d46b8a57359a02fd5ae913463334df00460cda0 Mon Sep 17 00:00:00 2001 From: delvedor Date: Wed, 3 Mar 2021 09:31:14 +0100 Subject: [PATCH] Improve diagnostics --- index.d.ts | 6 ++-- index.js | 5 +-- src/Diagnostic.ts | 49 +++++++++++++++++++----------- src/Transport.ts | 44 +++++++++++++++++++++------ src/connection/BaseConnection.ts | 10 ++++-- src/connection/HttpConnection.ts | 8 ++--- src/connection/UndiciConnection.ts | 6 ++-- src/connection/index.ts | 1 + src/pool/BaseConnectionPool.ts | 3 ++ src/pool/ClusterConnectionPool.ts | 13 ++++---- 10 files changed, 99 insertions(+), 46 deletions(-) diff --git a/index.d.ts b/index.d.ts index 23a9e16..bc21b67 100644 --- a/index.d.ts +++ b/index.d.ts @@ -17,7 +17,7 @@ * under the License. */ -import Diagnostic from './lib/Diagnostic' +import Diagnostic, { events } from './lib/Diagnostic' import Transport from './lib/Transport' import { BaseConnection, @@ -36,6 +36,7 @@ import * as errors from './lib/errors' export type { Connection, ConnectionOptions, + ConnectionRequestParams, ConnectionRequestOptions, ConnectionRequestResponse } from './lib/connection' @@ -64,5 +65,6 @@ export { HttpConnection, UndiciConnection, Serializer, - errors + errors, + events } diff --git a/index.js b/index.js index 2a19cd1..e775387 100644 --- a/index.js +++ b/index.js @@ -19,7 +19,7 @@ 'use strict' -const Diagnostic = require('./lib/Diagnostic').default +const { default: Diagnostic, events } = require('./lib/Diagnostic') const Transport = require('./lib/Transport').default const { BaseConnection, @@ -46,5 +46,6 @@ module.exports = { CloudConnectionPool, WeightedConnectionPool, Serializer, - errors + errors, + events } diff --git a/src/Diagnostic.ts b/src/Diagnostic.ts index 5067426..075d3dc 100644 --- a/src/Diagnostic.ts +++ b/src/Diagnostic.ts @@ -19,35 +19,50 @@ import { EventEmitter } from 'events' import { ElasticsearchClientError, ConfigurationError } from './errors' +import { ConnectionRequestOptions } from './connection' import { Result } from './types' -export type DiagnosticListener = (err: ElasticsearchClientError | null, meta: Result | null) => void +export type DiagnosticListener = (err: ElasticsearchClientError | null, meta: any | null) => void +export type DiagnosticListenerFull = (err: ElasticsearchClientError | null, meta: Result | null) => void +export type DiagnosticListenerLight = (err: ElasticsearchClientError | null, meta: ConnectionRequestOptions | null) => void -export default class Diagnostic extends EventEmitter { - static events = { - RESPONSE: 'response', - REQUEST: 'request', - SNIFF: 'sniff', - RESURRECT: 'resurrect', - SERIALIZATION: 'serialization', - DESERIALIZATION: 'deserialization' - } +export enum events { + RESPONSE = 'response', + REQUEST = 'request', + SNIFF = 'sniff', + RESURRECT = 'resurrect', + SERIALIZATION = 'serialization', + DESERIALIZATION = 'deserialization' +} - on (event: string, callback: DiagnosticListener): this { +export default class Diagnostic extends EventEmitter { + on (event: events.REQUEST, listener: DiagnosticListenerFull): this + on (event: events.RESPONSE, listener: DiagnosticListenerFull): this + on (event: events.SERIALIZATION, listener: DiagnosticListenerFull): this + on (event: events.SNIFF, listener: DiagnosticListenerFull): this + on (event: events.DESERIALIZATION, listener: DiagnosticListenerLight): this + on (event: events.RESURRECT, listener: DiagnosticListenerLight): this + on (event: string, listener: DiagnosticListener): this { assertSupportedEvent(event) - super.on(event, callback) + super.on(event, listener) return this } - once (event: string, callback: DiagnosticListener): this { + once (event: events.REQUEST, listener: DiagnosticListenerFull): this + once (event: events.RESPONSE, listener: DiagnosticListenerFull): this + once (event: events.SERIALIZATION, listener: DiagnosticListenerFull): this + once (event: events.SNIFF, listener: DiagnosticListenerFull): this + once (event: events.DESERIALIZATION, listener: DiagnosticListenerLight): this + once (event: events.RESURRECT, listener: DiagnosticListenerLight): this + once (event: string, listener: DiagnosticListener): this { assertSupportedEvent(event) - super.once(event, callback) + super.once(event, listener) return this } - off (event: string, callback: DiagnosticListener): this { + off (event: string, listener: DiagnosticListener): this { assertSupportedEvent(event) - super.off(event, callback) + super.off(event, listener) return this } } @@ -59,4 +74,4 @@ function assertSupportedEvent (event: string): void { } // @ts-expect-error -const supportedEvents = Object.keys(Diagnostic.events).map(key => Diagnostic.events[key]) +const supportedEvents: string[] = Object.keys(events).map(key => events[key]) diff --git a/src/Transport.ts b/src/Transport.ts index c3079b4..6b7a960 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -31,7 +31,7 @@ import { ConfigurationError, TimeoutError } from './errors' -import { Connection, ConnectionRequestOptions } from './connection' +import { Connection, ConnectionRequestParams } from './connection' import Diagnostic from './Diagnostic' import Serializer from './Serializer' import AbortController from 'node-abort-controller' @@ -39,7 +39,8 @@ import { Readable as ReadableStream } from 'stream' import { ClusterConnectionPool, CloudConnectionPool, - WeightedConnectionPool + WeightedConnectionPool, + BaseConnectionPool } from './pool' import { nodeFilterFn, @@ -124,12 +125,14 @@ export interface TransportRequestOptions { } export interface GetConnectionOptions { - requestId: string + requestId: string | number + context: any } export interface SniffOptions { - requestId?: string + requestId?: string | number reason: string + context: any } export default class Transport { @@ -199,10 +202,18 @@ export default class Transport { this[kSniffEndpoint] = opts.sniffEndpoint ?? null if (opts.sniffOnStart === true) { - this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START }) + this.sniff({ + reason: Transport.sniffReasons.SNIFF_ON_START, + requestId: `sniff-on-start-${Math.random()}`, + context: this[kContext] + }) } } + get connectionPool (): BaseConnectionPool { + return this[kConnectionPool] + } + get sniffEnabled (): boolean { return this[kSniffEnabled] } @@ -266,7 +277,7 @@ export default class Transport { } } - const connectionParams: ConnectionRequestOptions = { + const connectionParams: ConnectionRequestParams = { method: params.method, path: params.path } @@ -369,7 +380,10 @@ export default class Transport { throw new RequestAbortedError('Request has been aborted by the user', result) } - meta.connection = this.getConnection({ requestId: meta.request.id }) + meta.connection = this.getConnection({ + requestId: meta.request.id, + context: meta.context + }) if (meta.connection === null) { throw new NoLivingConnectionsError('There are no living connections', result) } @@ -377,7 +391,11 @@ export default class Transport { this[kDiagnostic].emit('request', null, result) // perform the actual http request - let { statusCode, headers, body } = await meta.connection.request(connectionParams) + let { statusCode, headers, body } = await meta.connection.request(connectionParams, { + requestId: meta.request.id, + name: this[kName], + context: meta.context + }) result.statusCode = statusCode result.headers = headers @@ -466,7 +484,8 @@ export default class Transport { if (this[kSniffOnConnectionFault]) { this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT, - requestId: meta.request.id + requestId: meta.request.id, + context: meta.context }) } @@ -500,13 +519,18 @@ export default class Transport { const now = Date.now() if (this[kSniffEnabled] && now > this[kNextSniff]) { this[kNextSniff] = now + (this[kSniffInterval] as number) - this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId }) + this.sniff({ + reason: Transport.sniffReasons.SNIFF_INTERVAL, + requestId: opts.requestId, + context: opts.context + }) } return this[kConnectionPool].getConnection({ filter: this[kNodeFilter], selector: this[kNodeSelector], requestId: opts.requestId, name: this[kName], + context: opts.context, now }) } diff --git a/src/connection/BaseConnection.ts b/src/connection/BaseConnection.ts index 2f1ed1e..9f82a5e 100644 --- a/src/connection/BaseConnection.ts +++ b/src/connection/BaseConnection.ts @@ -49,7 +49,7 @@ export interface ConnectionOptions { proxy?: string | URL } -export interface ConnectionRequestOptions { +export interface ConnectionRequestParams { method: string path: string headers?: http.IncomingHttpHeaders @@ -60,6 +60,12 @@ export interface ConnectionRequestOptions { timeout?: number } +export interface ConnectionRequestOptions { + requestId: string | number + name: string + context: any +} + export interface ConnectionRequestResponse { body: string | Buffer headers: http.IncomingHttpHeaders @@ -118,7 +124,7 @@ export default class BaseConnection { } /* istanbul ignore next */ - async request (params: ConnectionRequestOptions): Promise { + async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise { throw new ConfigurationError('The request method should be implemented by extended classes') } diff --git a/src/connection/HttpConnection.ts b/src/connection/HttpConnection.ts index c467e3f..2a1b484 100644 --- a/src/connection/HttpConnection.ts +++ b/src/connection/HttpConnection.ts @@ -25,6 +25,7 @@ import buffer from 'buffer' import { promisify } from 'util' import BaseConnection, { ConnectionOptions, + ConnectionRequestParams, ConnectionRequestOptions, ConnectionRequestResponse } from './BaseConnection' @@ -85,7 +86,7 @@ export default class HttpConnection extends BaseConnection { : https.request } - async request (params: ConnectionRequestOptions): Promise { + async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise { return await new Promise((resolve, reject) => { this._openRequests++ let cleanedListeners = false @@ -162,8 +163,7 @@ export default class HttpConnection extends BaseConnection { response.setEncoding('utf8') } - // TODO: fixme - // this.diagnostic.emit('deserialization', null, result) + this.diagnostic.emit('deserialization', null, options) response.on('data', onData) response.on('error', onEnd) response.on('end', onEnd) @@ -237,7 +237,7 @@ export default class HttpConnection extends BaseConnection { } } - buildRequestObject (params: ConnectionRequestOptions): http.ClientRequestArgs { + buildRequestObject (params: ConnectionRequestParams): http.ClientRequestArgs { const url = this.url const request = { protocol: url.protocol, diff --git a/src/connection/UndiciConnection.ts b/src/connection/UndiciConnection.ts index b8b2b2c..484bcab 100644 --- a/src/connection/UndiciConnection.ts +++ b/src/connection/UndiciConnection.ts @@ -22,6 +22,7 @@ import Debug from 'debug' import buffer from 'buffer' import BaseConnection, { ConnectionOptions, + ConnectionRequestParams, ConnectionRequestOptions, ConnectionRequestResponse } from './BaseConnection' @@ -76,7 +77,7 @@ export default class Connection extends BaseConnection { }) } - async request (params: ConnectionRequestOptions): Promise { + async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise { const requestParams = { method: params.method, path: params.path + (params.querystring == null || params.querystring === '' ? '' : `?${params.querystring}`), @@ -140,8 +141,7 @@ export default class Connection extends BaseConnection { } } - // TODO: fixme - // this.diagnostic.emit('deserialization', null, result) + this.diagnostic.emit('deserialization', null, options) try { if (isCompressed) { const payload: Buffer[] = [] diff --git a/src/connection/index.ts b/src/connection/index.ts index 9968026..104ff93 100644 --- a/src/connection/index.ts +++ b/src/connection/index.ts @@ -24,6 +24,7 @@ import UndiciConnection from './UndiciConnection' export type Connection = BaseConnection | HttpConnection | UndiciConnection export type { ConnectionOptions, + ConnectionRequestParams, ConnectionRequestOptions, ConnectionRequestResponse } from './BaseConnection' diff --git a/src/pool/BaseConnectionPool.ts b/src/pool/BaseConnectionPool.ts index 35546fe..f97a334 100644 --- a/src/pool/BaseConnectionPool.ts +++ b/src/pool/BaseConnectionPool.ts @@ -57,6 +57,7 @@ export interface GetConnectionOptions { now: number requestId: string | number name: string + context: any } export default class BaseConnectionPool { @@ -119,6 +120,8 @@ export default class BaseConnectionPool { if (opts.agent == null) opts.agent = this._agent /* istanbul ignore else */ if (opts.proxy == null) opts.proxy = this._proxy + /* istanbul ignore else */ + if (opts.diagnostic == null) opts.diagnostic = this.diagnostic const connection = new this.Connection(opts) diff --git a/src/pool/ClusterConnectionPool.ts b/src/pool/ClusterConnectionPool.ts index 5b4109d..9cceb64 100644 --- a/src/pool/ClusterConnectionPool.ts +++ b/src/pool/ClusterConnectionPool.ts @@ -31,6 +31,7 @@ export interface ResurrectOptions { now: number requestId: string | number name: string + context: any } export default class ClusterConnectionPool extends BaseConnectionPool { @@ -149,11 +150,10 @@ export default class ClusterConnectionPool extends BaseConnectionPool { // ping strategy if (this.resurrectStrategy === 1) { - connection.request({ - method: 'HEAD', - path: '/', - timeout: this.pingTimeout - }) + connection.request( + { method: 'HEAD', path: '/', timeout: this.pingTimeout }, + { requestId: opts.requestId, name: opts.name, context: opts.context } + ) .then(({ statusCode }) => { let isAlive = true if (statusCode === 502 || statusCode === 503 || statusCode === 504) { @@ -207,7 +207,8 @@ export default class ClusterConnectionPool extends BaseConnectionPool { this.resurrect({ now: opts.now, requestId: opts.requestId, - name: opts.name + name: opts.name, + context: opts.context }) const noAliveConnections = this.size === this.dead.length