Skip to content

Commit

Permalink
Improve diagnostics
Browse files Browse the repository at this point in the history
  • Loading branch information
delvedor committed Mar 3, 2021
1 parent ef4277e commit 0d46b8a
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 46 deletions.
6 changes: 4 additions & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import Diagnostic from './lib/Diagnostic'
import Diagnostic, { events } from './lib/Diagnostic'
import Transport from './lib/Transport'
import {
BaseConnection,
Expand All @@ -36,6 +36,7 @@ import * as errors from './lib/errors'
export type {
Connection,
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestResponse
} from './lib/connection'
Expand Down Expand Up @@ -64,5 +65,6 @@ export {
HttpConnection,
UndiciConnection,
Serializer,
errors
errors,
events
}
5 changes: 3 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

'use strict'

const Diagnostic = require('./lib/Diagnostic').default
const { default: Diagnostic, events } = require('./lib/Diagnostic')
const Transport = require('./lib/Transport').default
const {
BaseConnection,
Expand All @@ -46,5 +46,6 @@ module.exports = {
CloudConnectionPool,
WeightedConnectionPool,
Serializer,
errors
errors,
events
}
49 changes: 32 additions & 17 deletions src/Diagnostic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,50 @@

import { EventEmitter } from 'events'
import { ElasticsearchClientError, ConfigurationError } from './errors'
import { ConnectionRequestOptions } from './connection'
import { Result } from './types'

export type DiagnosticListener = (err: ElasticsearchClientError | null, meta: Result | null) => void
export type DiagnosticListener = (err: ElasticsearchClientError | null, meta: any | null) => void
export type DiagnosticListenerFull = (err: ElasticsearchClientError | null, meta: Result | null) => void
export type DiagnosticListenerLight = (err: ElasticsearchClientError | null, meta: ConnectionRequestOptions | null) => void

export default class Diagnostic extends EventEmitter {
static events = {
RESPONSE: 'response',
REQUEST: 'request',
SNIFF: 'sniff',
RESURRECT: 'resurrect',
SERIALIZATION: 'serialization',
DESERIALIZATION: 'deserialization'
}
export enum events {
RESPONSE = 'response',
REQUEST = 'request',
SNIFF = 'sniff',
RESURRECT = 'resurrect',
SERIALIZATION = 'serialization',
DESERIALIZATION = 'deserialization'
}

on (event: string, callback: DiagnosticListener): this {
export default class Diagnostic extends EventEmitter {
on (event: events.REQUEST, listener: DiagnosticListenerFull): this
on (event: events.RESPONSE, listener: DiagnosticListenerFull): this
on (event: events.SERIALIZATION, listener: DiagnosticListenerFull): this
on (event: events.SNIFF, listener: DiagnosticListenerFull): this
on (event: events.DESERIALIZATION, listener: DiagnosticListenerLight): this
on (event: events.RESURRECT, listener: DiagnosticListenerLight): this
on (event: string, listener: DiagnosticListener): this {
assertSupportedEvent(event)
super.on(event, callback)
super.on(event, listener)
return this
}

once (event: string, callback: DiagnosticListener): this {
once (event: events.REQUEST, listener: DiagnosticListenerFull): this
once (event: events.RESPONSE, listener: DiagnosticListenerFull): this
once (event: events.SERIALIZATION, listener: DiagnosticListenerFull): this
once (event: events.SNIFF, listener: DiagnosticListenerFull): this
once (event: events.DESERIALIZATION, listener: DiagnosticListenerLight): this
once (event: events.RESURRECT, listener: DiagnosticListenerLight): this
once (event: string, listener: DiagnosticListener): this {
assertSupportedEvent(event)
super.once(event, callback)
super.once(event, listener)
return this
}

off (event: string, callback: DiagnosticListener): this {
off (event: string, listener: DiagnosticListener): this {
assertSupportedEvent(event)
super.off(event, callback)
super.off(event, listener)
return this
}
}
Expand All @@ -59,4 +74,4 @@ function assertSupportedEvent (event: string): void {
}

// @ts-expect-error
const supportedEvents = Object.keys(Diagnostic.events).map(key => Diagnostic.events[key])
const supportedEvents: string[] = Object.keys(events).map(key => events[key])
44 changes: 34 additions & 10 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ import {
ConfigurationError,
TimeoutError
} from './errors'
import { Connection, ConnectionRequestOptions } from './connection'
import { Connection, ConnectionRequestParams } from './connection'
import Diagnostic from './Diagnostic'
import Serializer from './Serializer'
import AbortController from 'node-abort-controller'
import { Readable as ReadableStream } from 'stream'
import {
ClusterConnectionPool,
CloudConnectionPool,
WeightedConnectionPool
WeightedConnectionPool,
BaseConnectionPool
} from './pool'
import {
nodeFilterFn,
Expand Down Expand Up @@ -124,12 +125,14 @@ export interface TransportRequestOptions {
}

export interface GetConnectionOptions {
requestId: string
requestId: string | number
context: any
}

export interface SniffOptions {
requestId?: string
requestId?: string | number
reason: string
context: any
}

export default class Transport {
Expand Down Expand Up @@ -199,10 +202,18 @@ export default class Transport {
this[kSniffEndpoint] = opts.sniffEndpoint ?? null

if (opts.sniffOnStart === true) {
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START })
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_START,
requestId: `sniff-on-start-${Math.random()}`,
context: this[kContext]
})
}
}

get connectionPool (): BaseConnectionPool {
return this[kConnectionPool]
}

get sniffEnabled (): boolean {
return this[kSniffEnabled]
}
Expand Down Expand Up @@ -266,7 +277,7 @@ export default class Transport {
}
}

const connectionParams: ConnectionRequestOptions = {
const connectionParams: ConnectionRequestParams = {
method: params.method,
path: params.path
}
Expand Down Expand Up @@ -369,15 +380,22 @@ export default class Transport {
throw new RequestAbortedError('Request has been aborted by the user', result)
}

meta.connection = this.getConnection({ requestId: meta.request.id })
meta.connection = this.getConnection({
requestId: meta.request.id,
context: meta.context
})
if (meta.connection === null) {
throw new NoLivingConnectionsError('There are no living connections', result)
}

this[kDiagnostic].emit('request', null, result)

// perform the actual http request
let { statusCode, headers, body } = await meta.connection.request(connectionParams)
let { statusCode, headers, body } = await meta.connection.request(connectionParams, {
requestId: meta.request.id,
name: this[kName],
context: meta.context
})
result.statusCode = statusCode
result.headers = headers

Expand Down Expand Up @@ -466,7 +484,8 @@ export default class Transport {
if (this[kSniffOnConnectionFault]) {
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id
requestId: meta.request.id,
context: meta.context
})
}

Expand Down Expand Up @@ -500,13 +519,18 @@ export default class Transport {
const now = Date.now()
if (this[kSniffEnabled] && now > this[kNextSniff]) {
this[kNextSniff] = now + (this[kSniffInterval] as number)
this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId })
this.sniff({
reason: Transport.sniffReasons.SNIFF_INTERVAL,
requestId: opts.requestId,
context: opts.context
})
}
return this[kConnectionPool].getConnection({
filter: this[kNodeFilter],
selector: this[kNodeSelector],
requestId: opts.requestId,
name: this[kName],
context: opts.context,
now
})
}
Expand Down
10 changes: 8 additions & 2 deletions src/connection/BaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface ConnectionOptions {
proxy?: string | URL
}

export interface ConnectionRequestOptions {
export interface ConnectionRequestParams {
method: string
path: string
headers?: http.IncomingHttpHeaders
Expand All @@ -60,6 +60,12 @@ export interface ConnectionRequestOptions {
timeout?: number
}

export interface ConnectionRequestOptions {
requestId: string | number
name: string
context: any
}

export interface ConnectionRequestResponse {
body: string | Buffer
headers: http.IncomingHttpHeaders
Expand Down Expand Up @@ -118,7 +124,7 @@ export default class BaseConnection {
}

/* istanbul ignore next */
async request (params: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
throw new ConfigurationError('The request method should be implemented by extended classes')
}

Expand Down
8 changes: 4 additions & 4 deletions src/connection/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import buffer from 'buffer'
import { promisify } from 'util'
import BaseConnection, {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestResponse
} from './BaseConnection'
Expand Down Expand Up @@ -85,7 +86,7 @@ export default class HttpConnection extends BaseConnection {
: https.request
}

async request (params: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
return await new Promise((resolve, reject) => {
this._openRequests++
let cleanedListeners = false
Expand Down Expand Up @@ -162,8 +163,7 @@ export default class HttpConnection extends BaseConnection {
response.setEncoding('utf8')
}

// TODO: fixme
// this.diagnostic.emit('deserialization', null, result)
this.diagnostic.emit('deserialization', null, options)
response.on('data', onData)
response.on('error', onEnd)
response.on('end', onEnd)
Expand Down Expand Up @@ -237,7 +237,7 @@ export default class HttpConnection extends BaseConnection {
}
}

buildRequestObject (params: ConnectionRequestOptions): http.ClientRequestArgs {
buildRequestObject (params: ConnectionRequestParams): http.ClientRequestArgs {
const url = this.url
const request = {
protocol: url.protocol,
Expand Down
6 changes: 3 additions & 3 deletions src/connection/UndiciConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import Debug from 'debug'
import buffer from 'buffer'
import BaseConnection, {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestResponse
} from './BaseConnection'
Expand Down Expand Up @@ -76,7 +77,7 @@ export default class Connection extends BaseConnection {
})
}

async request (params: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
const requestParams = {
method: params.method,
path: params.path + (params.querystring == null || params.querystring === '' ? '' : `?${params.querystring}`),
Expand Down Expand Up @@ -140,8 +141,7 @@ export default class Connection extends BaseConnection {
}
}

// TODO: fixme
// this.diagnostic.emit('deserialization', null, result)
this.diagnostic.emit('deserialization', null, options)
try {
if (isCompressed) {
const payload: Buffer[] = []
Expand Down
1 change: 1 addition & 0 deletions src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import UndiciConnection from './UndiciConnection'
export type Connection = BaseConnection | HttpConnection | UndiciConnection
export type {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestResponse
} from './BaseConnection'
Expand Down
3 changes: 3 additions & 0 deletions src/pool/BaseConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface GetConnectionOptions {
now: number
requestId: string | number
name: string
context: any
}

export default class BaseConnectionPool {
Expand Down Expand Up @@ -119,6 +120,8 @@ export default class BaseConnectionPool {
if (opts.agent == null) opts.agent = this._agent
/* istanbul ignore else */
if (opts.proxy == null) opts.proxy = this._proxy
/* istanbul ignore else */
if (opts.diagnostic == null) opts.diagnostic = this.diagnostic

const connection = new this.Connection(opts)

Expand Down
13 changes: 7 additions & 6 deletions src/pool/ClusterConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface ResurrectOptions {
now: number
requestId: string | number
name: string
context: any
}

export default class ClusterConnectionPool extends BaseConnectionPool {
Expand Down Expand Up @@ -149,11 +150,10 @@ export default class ClusterConnectionPool extends BaseConnectionPool {

// ping strategy
if (this.resurrectStrategy === 1) {
connection.request({
method: 'HEAD',
path: '/',
timeout: this.pingTimeout
})
connection.request(
{ method: 'HEAD', path: '/', timeout: this.pingTimeout },
{ requestId: opts.requestId, name: opts.name, context: opts.context }
)
.then(({ statusCode }) => {
let isAlive = true
if (statusCode === 502 || statusCode === 503 || statusCode === 504) {
Expand Down Expand Up @@ -207,7 +207,8 @@ export default class ClusterConnectionPool extends BaseConnectionPool {
this.resurrect({
now: opts.now,
requestId: opts.requestId,
name: opts.name
name: opts.name,
context: opts.context
})

const noAliveConnections = this.size === this.dead.length
Expand Down

0 comments on commit 0d46b8a

Please sign in to comment.