diff --git a/.npmignore b/.npmignore
new file mode 100644
index 0000000..becc004
--- /dev/null
+++ b/.npmignore
@@ -0,0 +1,72 @@
+# Logs
+logs
+*.log
+npm-debug.log*
+
+# Runtime data
+pids
+*.pid
+*.seed
+
+# Directory for instrumented libs generated by jscoverage/JSCover
+lib-cov
+
+# Coverage directory used by tools like istanbul
+coverage
+
+# coverage output
+coverage.lcov
+
+# nyc test coverage
+.nyc_output
+
+# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
+.grunt
+
+# node-waf configuration
+.lock-wscript
+
+# Compiled binary addons (http://nodejs.org/api/addons.html)
+build/Release
+
+# Dependency directories
+node_modules
+jspm_packages
+
+# Optional npm cache directory
+.npm
+
+# Optional REPL history
+.node_repl_history
+
+# mac files
+.DS_Store
+
+# vim swap files
+*.swp
+
+package-lock.json
+
+# elasticsearch repo or binary files
+elasticsearch*
+
+# Generated typings, we don't commit them
+# because we should copy them in the main .d.ts file
+api/generated.d.ts
+
+# Ignore doc folder
+docs
+
+# Ignore test folder
+test
+
+# Ignore scripts folder
+scripts
+
+# ci configuration
+.ci
+.travis.yml
+certs
+.github
+CODE_OF_CONDUCT.md
+CONTRIBUTING.md
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
new file mode 100644
index 0000000..c286a31
--- /dev/null
+++ b/CODE_OF_CONDUCT.md
@@ -0,0 +1,3 @@
+303 See Other
+
+Location: https://www.elastic.co/community/codeofconduct
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..13a6bb3
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,100 @@
+# Contributing to the Elasticsearch Node.js client
+
+The Elasticsearch Node.js client is open source and we love to receive contributions from our community — you!
+
+There are many ways to contribute,
+from writing tutorials or blog posts,
+improving the documentation,
+submitting bug reports and feature requests or writing code.
+
+## Repository structure
+The `master` branch is considered unstable, and it's compatible with Elasticsearch master. Unless you are patching an issue, new features should always be sent to the `master` branch, in case of a bugfix, it depends if the bug affects all the release lines.
+There is a branch for every supported release line, such as `7.x` or `6.x`. We release bugfixes as soon as possible, while minor and major releases are published at the same time of the Elastic Stack.
+
+Usually for every release line there will be a *published* version and a *next* version. Eg: the `7.x` branch contains the version published on npm, and bugfixes should be sent there, while `7.2` *(assuming that 7.1.x is released)* contains the next version, and new features should be sent there.
+
+## Code contributions
+
+If you have a bugfix or new feature that you would like to contribute,
+please find or open an issue about it first.
+Talk about what you would like to do.
+It may be that somebody is already working on it,
+or that there are particular issues that you should know about before implementing the change.
+
+Note that we strictly follow the [Elastic EOL schedule](https://www.elastic.co/support/eol).
+
+### Submitting your changes
+
+Generally, we require that you test any code you are adding or modifying.
+Once your changes are ready to submit for review:
+
+1. Test your changes
+
+ Run the test suite to make sure that nothing is broken.
+ Usually run `npm test` is enough, our CI will take care of running the integration test. If you want to run the integration test yourself, see the *Testing* section below.
+
+2. Submit a pull request
+
+ Push your local changes to your forked copy of the repository and [submit a pull request](https://help.github.com/articles/using-pull-requests).
+ In the pull request,
+ choose a title which sums up the changes that you have made,
+ and in the body provide more details about what your changes do.
+ Also mention the number of the issue where discussion has taken place,
+ eg "Closes #123".
+
+3. Sign the Contributor License Agreement
+
+ Please make sure you have signed our [Contributor License Agreement](https://www.elastic.co/contributor-agreement/).
+ We are not asking you to assign copyright to us,
+ but to give us the right to distribute your code without restriction.
+ We ask this of all contributors in order to assure our users of the origin and continuing existence of the code.
+ You only need to sign the CLA once.
+
+4. Be patient
+
+ We might not be able to review your code as fast as we would like to,
+ but we'll do our best to dedicate it the attention it deserves.
+ Your effort is much appreciated!
+
+### Code generation
+
+The entire content of the API folder is generated as well as the `docs/reference.asciidoc` file.
+If you want to run the code generation you should run the following command:
+```sh
+node scripts/generate --tag
+# or
+node scripts/generate --branch
+```
+Then you should copy the content of `api/generated.d.ts` into the `index.d.ts` file *(automate this step would be a nice pr!)*.
+
+### Testing
+There are different test scripts, usually during development you only need to run `npm test`, but if you want you can run just a part of the suite, following you will find all the testing scripts and what they do.
+
+| Script | Description |
+|---|---|
+| `npm run test:unit` | Runs the content of the `test/unit` folder. |
+| `npm run test:behavior` | Runs the content of the `test/behavior` folder. |
+| `npm run test:types` | Runs the content of the `test/types` folder. |
+| `npm run test:unit -- --cov --coverage-report=html` | Runs the content of the `test/unit` folder and calculates the code coverage. |
+| `npm run test:integration` | Runs the integration test runner.
*Note: it requires a living instance of Elasticsearch.* |
+| `npm run lint` | Run the [linter](https://standardjs.com/). |
+| `npm run lint:fix` | Fixes the lint errors. |
+| `npm test` | Runs lint, unit, behavior, and types test. |
+
+#### Integration test
+The integration test are generated on the fly by the runner you will find inside `test/integration`, once you execute it, it will clone the Elasticsearch repository and checkout the correct version to grab the [OSS yaml files](https://github.com/elastic/elasticsearch/tree/master/rest-api-spec/src/main/resources/rest-api-spec/test) and the [Elastic licensed yaml files](https://github.com/elastic/elasticsearch/tree/master/x-pack/plugin/src/test/resources/rest-api-spec/test) that will be used for generating the test.
+
+Usually this step is executed by CI since it takes some time, but you can easily run this yourself! Just follow this steps:
+1. Boot an Elasticsearch instance, you can do that by running `./scripts/es-docker.sh` or `./scripts/es-docker-platinum.sh`, the first one will work only with the OSS APIs, while the second will work also with the Elastic licensed APIs;
+1. If you are running the OSS test, you should use `npm run test:integration`, otherwise use `TEST_ES_SERVER=https://elastic:changeme@localhost:9200 npm run test:integration`. You can also pass a `-b` parameter if you want the test to bail out at the first failure: `npm run test:integration -- -b`;
+1. Grab a coffee, it will take some time ;)
+
+### Releasing
+
+If you have access to make releases, the process is as follows:
+
+1. Update the version in `package.json` according to the scale of the change. (major, minor or patch)
+1. Commit changes with message `Bumped vx.y.z` where `x.y.z` is the version in `package.json`
+1. Create a release via the GitHub UI.
+1. Wait for CI to finish running the test.
+1. Publish to npm with `npm publish` *(see [publish](https://docs.npmjs.com/cli/publish) and [dist-tag](https://docs.npmjs.com/cli/dist-tag) docs)*
diff --git a/index.d.ts b/index.d.ts
new file mode 100644
index 0000000..790aabf
--- /dev/null
+++ b/index.d.ts
@@ -0,0 +1,71 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import Transport, {
+ ApiError,
+ ApiResponse,
+ RequestEvent,
+ TransportRequestParams,
+ TransportRequestOptions,
+ nodeFilterFn,
+ nodeSelectorFn,
+ generateRequestIdFn,
+ TransportRequestCallback,
+ TransportRequestPromise,
+ RequestBody,
+ RequestNDBody,
+ Context
+} from './lib/Transport';
+import Connection from './lib/Connection'
+import { ConnectionPool, CloudConnectionPool, BaseConnectionPool } from './lib/pool'
+import Serializer from './lib/Serializer'
+import * as errors from './lib/errors'
+
+declare const events: {
+ SERIALIZATION: string
+ REQUEST: string
+ DESERIALIZATION: string
+ RESPONSE: string
+ SNIFF: string
+ RESURRECT: string
+}
+
+export {
+ Transport,
+ ConnectionPool,
+ BaseConnectionPool,
+ CloudConnectionPool,
+ Connection,
+ Serializer,
+ events,
+ errors,
+ ApiError,
+ ApiResponse,
+ RequestEvent,
+ TransportRequestParams,
+ TransportRequestOptions,
+ nodeFilterFn,
+ nodeSelectorFn,
+ generateRequestIdFn,
+ TransportRequestCallback,
+ TransportRequestPromise,
+ RequestBody,
+ RequestNDBody,
+ Context
+}
diff --git a/index.js b/index.js
new file mode 100644
index 0000000..4902d53
--- /dev/null
+++ b/index.js
@@ -0,0 +1,45 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+const Transport = require('./lib/Transport')
+const Connection = require('./lib/Connection')
+const { ConnectionPool, CloudConnectionPool } = require('./lib/pool')
+const Serializer = require('./lib/Serializer')
+const errors = require('./lib/errors')
+
+const events = {
+ RESPONSE: 'response',
+ REQUEST: 'request',
+ SNIFF: 'sniff',
+ RESURRECT: 'resurrect',
+ SERIALIZATION: 'serialization',
+ DESERIALIZATION: 'deserialization'
+}
+
+module.exports = {
+ Transport,
+ Connection,
+ ConnectionPool,
+ CloudConnectionPool,
+ Serializer,
+ errors,
+ events
+}
diff --git a/lib/Connection.d.ts b/lib/Connection.d.ts
new file mode 100644
index 0000000..933a6a8
--- /dev/null
+++ b/lib/Connection.d.ts
@@ -0,0 +1,98 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+///
+
+import { URL } from 'url';
+import { inspect, InspectOptions } from 'util'
+import { Readable as ReadableStream } from 'stream';
+import { ApiKeyAuth, BasicAuth } from './pool'
+import * as http from 'http'
+import * as https from 'https'
+import * as hpagent from 'hpagent'
+import { ConnectionOptions as TlsConnectionOptions } from 'tls'
+
+export declare type agentFn = (opts: ConnectionOptions) => any;
+
+export interface ConnectionOptions {
+ url: URL;
+ ssl?: TlsConnectionOptions;
+ id?: string;
+ headers?: Record;
+ agent?: AgentOptions | agentFn;
+ status?: string;
+ roles?: ConnectionRoles;
+ auth?: BasicAuth | ApiKeyAuth;
+ proxy?: string | URL;
+}
+
+interface ConnectionRoles {
+ master?: boolean
+ data?: boolean
+ ingest?: boolean
+ ml?: boolean
+}
+
+interface RequestOptions extends http.ClientRequestArgs {
+ asStream?: boolean;
+ body?: string | Buffer | ReadableStream | null;
+ querystring?: string;
+}
+
+export interface AgentOptions {
+ keepAlive?: boolean;
+ keepAliveMsecs?: number;
+ maxSockets?: number;
+ maxFreeSockets?: number;
+}
+
+export default class Connection {
+ static statuses: {
+ ALIVE: string;
+ DEAD: string;
+ };
+ static roles: {
+ MASTER: string;
+ DATA: string;
+ INGEST: string;
+ ML: string;
+ };
+ url: URL
+ ssl: TlsConnectionOptions | null
+ id: string
+ headers: Record
+ status: string
+ roles: ConnectionRoles
+ deadCount: number
+ resurrectTimeout: number
+ makeRequest: any
+ _openRequests: number
+ _status: string
+ _agent: http.Agent | https.Agent | hpagent.HttpProxyAgent | hpagent.HttpsProxyAgent
+ constructor(opts?: ConnectionOptions)
+ request(params: RequestOptions, callback: (err: Error | null, response: http.IncomingMessage | null) => void): http.ClientRequest
+ close(): Connection
+ setRole(role: string, enabled: boolean): Connection
+ buildRequestObject(params: any): http.ClientRequestArgs
+ // @ts-ignore
+ [inspect.custom](object: any, options: InspectOptions): string
+ toJSON(): any
+}
+
+export {};
diff --git a/lib/Connection.js b/lib/Connection.js
new file mode 100644
index 0000000..0116443
--- /dev/null
+++ b/lib/Connection.js
@@ -0,0 +1,342 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+const assert = require('assert')
+const { inspect } = require('util')
+const hpagent = require('hpagent')
+const http = require('http')
+const https = require('https')
+const debug = require('debug')('elasticsearch')
+const pump = require('pump')
+const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
+const {
+ ConnectionError,
+ RequestAbortedError,
+ TimeoutError,
+ ConfigurationError
+} = require('./errors')
+
+class Connection {
+ constructor (opts) {
+ this.url = opts.url
+ this.ssl = opts.ssl || null
+ this.id = opts.id || stripAuth(opts.url.href)
+ this.headers = prepareHeaders(opts.headers, opts.auth)
+ this.deadCount = 0
+ this.resurrectTimeout = 0
+
+ this._openRequests = 0
+ this._status = opts.status || Connection.statuses.ALIVE
+ this.roles = Object.assign({}, defaultRoles, opts.roles)
+
+ if (!['http:', 'https:'].includes(this.url.protocol)) {
+ throw new ConfigurationError(`Invalid protocol: '${this.url.protocol}'`)
+ }
+
+ if (typeof opts.agent === 'function') {
+ this.agent = opts.agent(opts)
+ } else if (opts.agent === false) {
+ this.agent = undefined
+ } else {
+ const agentOptions = Object.assign({}, {
+ keepAlive: true,
+ keepAliveMsecs: 1000,
+ maxSockets: 256,
+ maxFreeSockets: 256,
+ scheduling: 'lifo'
+ }, opts.agent)
+ if (opts.proxy) {
+ agentOptions.proxy = opts.proxy
+ this.agent = this.url.protocol === 'http:'
+ ? new hpagent.HttpProxyAgent(agentOptions)
+ : new hpagent.HttpsProxyAgent(Object.assign({}, agentOptions, this.ssl))
+ } else {
+ this.agent = this.url.protocol === 'http:'
+ ? new http.Agent(agentOptions)
+ : new https.Agent(Object.assign({}, agentOptions, this.ssl))
+ }
+ }
+
+ this.makeRequest = this.url.protocol === 'http:'
+ ? http.request
+ : https.request
+ }
+
+ request (params, callback) {
+ this._openRequests++
+ let cleanedListeners = false
+
+ const requestParams = this.buildRequestObject(params)
+ // https://github.com/nodejs/node/commit/b961d9fd83
+ if (INVALID_PATH_REGEX.test(requestParams.path) === true) {
+ callback(new TypeError(`ERR_UNESCAPED_CHARACTERS: ${requestParams.path}`), null)
+ /* istanbul ignore next */
+ return { abort: () => {} }
+ }
+
+ debug('Starting a new request', params)
+ const request = this.makeRequest(requestParams)
+
+ const onResponse = response => {
+ cleanListeners()
+ this._openRequests--
+ callback(null, response)
+ }
+
+ const onTimeout = () => {
+ cleanListeners()
+ this._openRequests--
+ request.once('error', () => {}) // we need to catch the request aborted error
+ request.abort()
+ callback(new TimeoutError('Request timed out', params), null)
+ }
+
+ const onError = err => {
+ cleanListeners()
+ this._openRequests--
+ callback(new ConnectionError(err.message), null)
+ }
+
+ const onAbort = () => {
+ cleanListeners()
+ request.once('error', () => {}) // we need to catch the request aborted error
+ debug('Request aborted', params)
+ this._openRequests--
+ callback(new RequestAbortedError(), null)
+ }
+
+ request.on('response', onResponse)
+ request.on('timeout', onTimeout)
+ request.on('error', onError)
+ request.on('abort', onAbort)
+
+ // Disables the Nagle algorithm
+ request.setNoDelay(true)
+
+ // starts the request
+ if (isStream(params.body) === true) {
+ pump(params.body, request, err => {
+ /* istanbul ignore if */
+ if (err != null && cleanedListeners === false) {
+ cleanListeners()
+ this._openRequests--
+ callback(err, null)
+ }
+ })
+ } else {
+ request.end(params.body)
+ }
+
+ return request
+
+ function cleanListeners () {
+ request.removeListener('response', onResponse)
+ request.removeListener('timeout', onTimeout)
+ request.removeListener('error', onError)
+ request.removeListener('abort', onAbort)
+ cleanedListeners = true
+ }
+ }
+
+ // TODO: write a better closing logic
+ close (callback = () => {}) {
+ debug('Closing connection', this.id)
+ if (this._openRequests > 0) {
+ setTimeout(() => this.close(callback), 1000)
+ } else {
+ if (this.agent !== undefined) {
+ this.agent.destroy()
+ }
+ callback()
+ }
+ }
+
+ setRole (role, enabled) {
+ if (validRoles.indexOf(role) === -1) {
+ throw new ConfigurationError(`Unsupported role: '${role}'`)
+ }
+ if (typeof enabled !== 'boolean') {
+ throw new ConfigurationError('enabled should be a boolean')
+ }
+
+ this.roles[role] = enabled
+ return this
+ }
+
+ get status () {
+ return this._status
+ }
+
+ set status (status) {
+ assert(
+ ~validStatuses.indexOf(status),
+ `Unsupported status: '${status}'`
+ )
+ this._status = status
+ }
+
+ buildRequestObject (params) {
+ const url = this.url
+ const request = {
+ protocol: url.protocol,
+ hostname: url.hostname[0] === '['
+ ? url.hostname.slice(1, -1)
+ : url.hostname,
+ hash: url.hash,
+ search: url.search,
+ pathname: url.pathname,
+ path: '',
+ href: url.href,
+ origin: url.origin,
+ // https://github.com/elastic/elasticsearch-js/issues/843
+ port: url.port !== '' ? url.port : undefined,
+ headers: this.headers,
+ agent: this.agent
+ }
+
+ const paramsKeys = Object.keys(params)
+ for (var i = 0, len = paramsKeys.length; i < len; i++) {
+ var key = paramsKeys[i]
+ if (key === 'path') {
+ request.pathname = resolve(request.pathname, params[key])
+ } else if (key === 'querystring' && !!params[key] === true) {
+ if (request.search === '') {
+ request.search = '?' + params[key]
+ } else {
+ request.search += '&' + params[key]
+ }
+ } else if (key === 'headers') {
+ request.headers = Object.assign({}, request.headers, params.headers)
+ } else {
+ request[key] = params[key]
+ }
+ }
+
+ request.path = request.pathname + request.search
+
+ return request
+ }
+
+ // Handles console.log and utils.inspect invocations.
+ // We want to hide `auth`, `agent` and `ssl` since they made
+ // the logs very hard to read. The user can still
+ // access them with `instance.agent` and `instance.ssl`.
+ [inspect.custom] (depth, options) {
+ const {
+ authorization,
+ ...headers
+ } = this.headers
+
+ return {
+ url: stripAuth(this.url.toString()),
+ id: this.id,
+ headers,
+ deadCount: this.deadCount,
+ resurrectTimeout: this.resurrectTimeout,
+ _openRequests: this._openRequests,
+ status: this.status,
+ roles: this.roles
+ }
+ }
+
+ toJSON () {
+ const {
+ authorization,
+ ...headers
+ } = this.headers
+
+ return {
+ url: stripAuth(this.url.toString()),
+ id: this.id,
+ headers,
+ deadCount: this.deadCount,
+ resurrectTimeout: this.resurrectTimeout,
+ _openRequests: this._openRequests,
+ status: this.status,
+ roles: this.roles
+ }
+ }
+}
+
+Connection.statuses = {
+ ALIVE: 'alive',
+ DEAD: 'dead'
+}
+
+Connection.roles = {
+ MASTER: 'master',
+ DATA: 'data',
+ INGEST: 'ingest',
+ ML: 'ml'
+}
+
+const defaultRoles = {
+ [Connection.roles.MASTER]: true,
+ [Connection.roles.DATA]: true,
+ [Connection.roles.INGEST]: true,
+ [Connection.roles.ML]: false
+}
+
+const validStatuses = Object.keys(Connection.statuses)
+ .map(k => Connection.statuses[k])
+const validRoles = Object.keys(Connection.roles)
+ .map(k => Connection.roles[k])
+
+function stripAuth (url) {
+ if (url.indexOf('@') === -1) return url
+ return url.slice(0, url.indexOf('//') + 2) + url.slice(url.indexOf('@') + 1)
+}
+
+function isStream (obj) {
+ return obj != null && typeof obj.pipe === 'function'
+}
+
+function resolve (host, path) {
+ const hostEndWithSlash = host[host.length - 1] === '/'
+ const pathStartsWithSlash = path[0] === '/'
+
+ if (hostEndWithSlash === true && pathStartsWithSlash === true) {
+ return host + path.slice(1)
+ } else if (hostEndWithSlash !== pathStartsWithSlash) {
+ return host + path
+ } else {
+ return host + '/' + path
+ }
+}
+
+function prepareHeaders (headers = {}, auth) {
+ if (auth != null && headers.authorization == null) {
+ /* istanbul ignore else */
+ if (auth.apiKey) {
+ if (typeof auth.apiKey === 'object') {
+ headers.authorization = 'ApiKey ' + Buffer.from(`${auth.apiKey.id}:${auth.apiKey.api_key}`).toString('base64')
+ } else {
+ headers.authorization = `ApiKey ${auth.apiKey}`
+ }
+ } else if (auth.username && auth.password) {
+ headers.authorization = 'Basic ' + Buffer.from(`${auth.username}:${auth.password}`).toString('base64')
+ }
+ }
+ return headers
+}
+
+module.exports = Connection
+module.exports.internals = { prepareHeaders }
diff --git a/lib/Helpers.d.ts b/lib/Helpers.d.ts
new file mode 100644
index 0000000..234b481
--- /dev/null
+++ b/lib/Helpers.d.ts
@@ -0,0 +1,122 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Readable as ReadableStream } from 'stream'
+import { TransportRequestOptions, ApiError, ApiResponse, RequestBody, Context } from './Transport'
+import { Search, Msearch, Bulk } from '../api/requestParams'
+
+export default class Helpers {
+ search>(params: Search, options?: TransportRequestOptions): Promise
+ scrollSearch, TRequestBody extends RequestBody = Record, TContext = Context>(params: Search, options?: TransportRequestOptions): AsyncIterable>
+ scrollDocuments>(params: Search, options?: TransportRequestOptions): AsyncIterable
+ msearch(options?: MsearchHelperOptions): MsearchHelper
+ bulk(options: BulkHelperOptions): BulkHelper
+}
+
+export interface ScrollSearchResponse, TContext = Context> extends ApiResponse {
+ clear: () => Promise
+ documents: TDocument[]
+}
+
+export interface BulkHelper extends Promise {
+ abort: () => BulkHelper
+}
+
+export interface BulkStats {
+ total: number
+ failed: number
+ retry: number
+ successful: number
+ time: number
+ bytes: number
+ aborted: boolean
+}
+
+interface IndexAction {
+ index: {
+ _index: string
+ [key: string]: any
+ }
+}
+
+interface CreateAction {
+ create: {
+ _index: string
+ [key: string]: any
+ }
+}
+
+interface UpdateActionOperation {
+ update: {
+ _index: string
+ [key: string]: any
+ }
+}
+
+interface DeleteAction {
+ delete: {
+ _index: string
+ [key: string]: any
+ }
+}
+
+type UpdateAction = [UpdateActionOperation, Record]
+type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
+type Omit = Pick>
+
+export interface BulkHelperOptions extends Omit {
+ datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator
+ onDocument: (doc: TDocument) => Action
+ flushBytes?: number
+ flushInterval?: number
+ concurrency?: number
+ retries?: number
+ wait?: number
+ onDrop?: (doc: OnDropDocument) => void
+ refreshOnCompletion?: boolean | string
+}
+
+export interface OnDropDocument {
+ status: number
+ error: {
+ type: string,
+ reason: string,
+ caused_by: {
+ type: string,
+ reason: string
+ }
+ }
+ document: TDocument
+ retried: boolean
+}
+
+export interface MsearchHelperOptions extends Omit {
+ operations?: number
+ flushInterval?: number
+ concurrency?: number
+ retries?: number
+ wait?: number
+}
+
+declare type callbackFn = (err: ApiError, result: ApiResponse) => void;
+export interface MsearchHelper extends Promise {
+ stop(error?: Error): void
+ search, TRequestBody extends RequestBody = Record, TContext = Context>(header: Omit, body: TRequestBody): Promise>
+ search, TRequestBody extends RequestBody = Record, TContext = Context>(header: Omit, body: TRequestBody, callback: callbackFn): void
+}
diff --git a/lib/Helpers.js b/lib/Helpers.js
new file mode 100644
index 0000000..bd0eeea
--- /dev/null
+++ b/lib/Helpers.js
@@ -0,0 +1,756 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+/* eslint camelcase: 0 */
+
+const { Readable } = require('stream')
+const { promisify } = require('util')
+const { ResponseError, ConfigurationError } = require('./errors')
+
+const pImmediate = promisify(setImmediate)
+const sleep = promisify(setTimeout)
+const kClient = Symbol('elasticsearch-client')
+const kMetaHeader = Symbol('meta header')
+/* istanbul ignore next */
+const noop = () => {}
+
+class Helpers {
+ constructor (opts) {
+ this[kClient] = opts.client
+ this[kMetaHeader] = opts.metaHeader
+ this.maxRetries = opts.maxRetries
+ }
+
+ /**
+ * Runs a search operation. The only difference between client.search and this utility,
+ * is that we are only returning the hits to the user and not the full ES response.
+ * This helper automatically adds `filter_path=hits.hits._source` to the querystring,
+ * as it will only need the documents source.
+ * @param {object} params - The Elasticsearch's search parameters.
+ * @param {object} options - The client optional configuration for this request.
+ * @return {array} The documents that matched the request.
+ */
+ async search (params, options) {
+ appendFilterPath('hits.hits._source', params, true)
+ const { body } = await this[kClient].search(params, options)
+ if (body.hits && body.hits.hits) {
+ return body.hits.hits.map(d => d._source)
+ }
+ return []
+ }
+
+ /**
+ * Runs a scroll search operation. This function returns an async iterator, allowing
+ * the user to use a for await loop to get all the results of a given search.
+ * ```js
+ * for await (const result of client.helpers.scrollSearch({ params })) {
+ * console.log(result)
+ * }
+ * ```
+ * Each result represents the entire body of a single scroll search request,
+ * if you just need to scroll the results, use scrollDocuments.
+ * This function handles automatically retries on 429 status code.
+ * @param {object} params - The Elasticsearch's search parameters.
+ * @param {object} options - The client optional configuration for this request.
+ * @return {iterator} the async iterator
+ */
+ async * scrollSearch (params, options = {}) {
+ if (this[kMetaHeader] !== null) {
+ options.headers = options.headers || {}
+ options.headers['x-elastic-client-meta'] = this[kMetaHeader] + ',h=s'
+ }
+ // TODO: study scroll search slices
+ const wait = options.wait || 5000
+ const maxRetries = options.maxRetries || this.maxRetries
+ if (Array.isArray(options.ignore)) {
+ options.ignore.push(429)
+ } else {
+ options.ignore = [429]
+ }
+ params.scroll = params.scroll || '1m'
+ appendFilterPath('_scroll_id', params, false)
+ const { method, body, index, ...querystring } = params
+
+ let response = null
+ for (let i = 0; i <= maxRetries; i++) {
+ response = await this[kClient].search(params, options)
+ if (response.statusCode !== 429) break
+ await sleep(wait)
+ }
+ if (response.statusCode === 429) {
+ throw new ResponseError(response)
+ }
+
+ let scroll_id = response.body._scroll_id
+ let stop = false
+ const clear = async () => {
+ stop = true
+ await this[kClient].clearScroll(
+ { body: { scroll_id } },
+ { ignore: [400], ...options }
+ )
+ }
+
+ while (response.body.hits && response.body.hits.hits.length > 0) {
+ // scroll id is always present in the response, but it might
+ // change over time based on the number of shards
+ scroll_id = response.body._scroll_id
+ response.clear = clear
+ addDocumentsGetter(response)
+
+ yield response
+
+ if (stop === true) {
+ break
+ }
+
+ for (let i = 0; i <= maxRetries; i++) {
+ response = await this[kClient].scroll({
+ scroll: querystring.scroll,
+ rest_total_hits_as_int: querystring.rest_total_hits_as_int || querystring.restTotalHitsAsInt,
+ body: { scroll_id }
+ }, options)
+ if (response.statusCode !== 429) break
+ await sleep(wait)
+ }
+ if (response.statusCode === 429) {
+ throw new ResponseError(response)
+ }
+ }
+
+ if (stop === false) {
+ await clear()
+ }
+ }
+
+ /**
+ * Runs a scroll search operation. This function returns an async iterator, allowing
+ * the user to use a for await loop to get all the documents of a given search.
+ * ```js
+ * for await (const document of client.helpers.scrollSearch({ params })) {
+ * console.log(document)
+ * }
+ * ```
+ * Each document is what you will find by running a scrollSearch and iterating on the hits array.
+ * This helper automatically adds `filter_path=hits.hits._source` to the querystring,
+ * as it will only need the documents source.
+ * @param {object} params - The Elasticsearch's search parameters.
+ * @param {object} options - The client optional configuration for this request.
+ * @return {iterator} the async iterator
+ */
+ async * scrollDocuments (params, options) {
+ appendFilterPath('hits.hits._source', params, true)
+ for await (const { documents } of this.scrollSearch(params)) {
+ for (const document of documents) {
+ yield document
+ }
+ }
+ }
+
+ /**
+ * Creates a msearch helper instance. Once you configure it, you can use the provided
+ * `search` method to add new searches in the queue.
+ * @param {object} options - The configuration of the msearch operations.
+ * @return {object} The possible operations to run.
+ */
+ msearch (options = {}) {
+ const client = this[kClient]
+ const {
+ operations = 5,
+ concurrency = 5,
+ flushInterval = 500,
+ retries = this.maxRetries,
+ wait = 5000,
+ ...msearchOptions
+ } = options
+
+ let stopReading = false
+ let stopError = null
+ let timeoutRef = null
+ const operationsStream = new Readable({
+ objectMode: true,
+ read (size) {}
+ })
+
+ const p = iterate()
+ const helper = {
+ then (onFulfilled, onRejected) {
+ return p.then(onFulfilled, onRejected)
+ },
+ catch (onRejected) {
+ return p.catch(onRejected)
+ },
+ stop (error = null) {
+ if (stopReading === true) return
+ stopReading = true
+ stopError = error
+ operationsStream.push(null)
+ },
+ // TODO: support abort a single search?
+ // NOTE: the validation checks are synchronous and the callback/promise will
+ // be resolved in the same tick. We might want to fix this in the future.
+ search (header, body, callback) {
+ if (stopReading === true) {
+ const error = stopError === null
+ ? new ConfigurationError('The msearch processor has been stopped')
+ : stopError
+ return callback ? callback(error, {}) : Promise.reject(error)
+ }
+
+ if (!(typeof header === 'object' && header !== null && !Array.isArray(header))) {
+ const error = new ConfigurationError('The header should be an object')
+ return callback ? callback(error, {}) : Promise.reject(error)
+ }
+
+ if (!(typeof body === 'object' && body !== null && !Array.isArray(body))) {
+ const error = new ConfigurationError('The body should be an object')
+ return callback ? callback(error, {}) : Promise.reject(error)
+ }
+
+ let promise = null
+ if (callback === undefined) {
+ let onFulfilled = null
+ let onRejected = null
+ promise = new Promise((resolve, reject) => {
+ onFulfilled = resolve
+ onRejected = reject
+ })
+ callback = function callback (err, result) {
+ err ? onRejected(err) : onFulfilled(result)
+ }
+ }
+
+ operationsStream.push([header, body, callback])
+
+ if (promise !== null) {
+ return promise
+ }
+ }
+ }
+
+ return helper
+
+ async function iterate () {
+ const { semaphore, finish } = buildSemaphore()
+ const msearchBody = []
+ const callbacks = []
+ let loadedOperations = 0
+ timeoutRef = setTimeout(onFlushTimeout, flushInterval)
+
+ for await (const operation of operationsStream) {
+ timeoutRef.refresh()
+ loadedOperations += 1
+ msearchBody.push(operation[0], operation[1])
+ callbacks.push(operation[2])
+ if (loadedOperations >= operations) {
+ const send = await semaphore()
+ send(msearchBody.slice(), callbacks.slice())
+ msearchBody.length = 0
+ callbacks.length = 0
+ loadedOperations = 0
+ }
+ }
+
+ clearTimeout(timeoutRef)
+ // In some cases the previos http call does not have finished,
+ // or we didn't reach the flush bytes threshold, so we force one last operation.
+ if (loadedOperations > 0) {
+ const send = await semaphore()
+ send(msearchBody, callbacks)
+ }
+
+ await finish()
+
+ if (stopError !== null) {
+ throw stopError
+ }
+
+ async function onFlushTimeout () {
+ if (loadedOperations === 0) return
+ const msearchBodyCopy = msearchBody.slice()
+ const callbacksCopy = callbacks.slice()
+ msearchBody.length = 0
+ callbacks.length = 0
+ loadedOperations = 0
+ try {
+ const send = await semaphore()
+ send(msearchBodyCopy, callbacksCopy)
+ } catch (err) {
+ /* istanbul ignore next */
+ helper.stop(err)
+ }
+ }
+ }
+
+ // This function builds a semaphore using the concurrency
+ // options of the msearch helper. It is used inside the iterator
+ // to guarantee that no more than the number of operations
+ // allowed to run at the same time are executed.
+ // It returns a semaphore function which resolves in the next tick
+ // if we didn't reach the maximim concurrency yet, otherwise it returns
+ // a promise that resolves as soon as one of the running request has finshed.
+ // The semaphore function resolves a send function, which will be used
+ // to send the actual msearch request.
+ // It also returns a finish function, which returns a promise that is resolved
+ // when there are no longer request running.
+ function buildSemaphore () {
+ let resolveSemaphore = null
+ let resolveFinish = null
+ let running = 0
+
+ return { semaphore, finish }
+
+ function finish () {
+ return new Promise((resolve, reject) => {
+ if (running === 0) {
+ resolve()
+ } else {
+ resolveFinish = resolve
+ }
+ })
+ }
+
+ function semaphore () {
+ if (running < concurrency) {
+ running += 1
+ return pImmediate(send)
+ } else {
+ return new Promise((resolve, reject) => {
+ resolveSemaphore = resolve
+ })
+ }
+ }
+
+ function send (msearchBody, callbacks) {
+ /* istanbul ignore if */
+ if (running > concurrency) {
+ throw new Error('Max concurrency reached')
+ }
+ msearchOperation(msearchBody, callbacks, () => {
+ running -= 1
+ if (resolveSemaphore) {
+ running += 1
+ resolveSemaphore(send)
+ resolveSemaphore = null
+ } else if (resolveFinish && running === 0) {
+ resolveFinish()
+ }
+ })
+ }
+ }
+
+ function msearchOperation (msearchBody, callbacks, done) {
+ let retryCount = retries
+
+ // Instead of going full on async-await, which would make the code easier to read,
+ // we have decided to use callback style instead.
+ // This because every time we use async await, V8 will create multiple promises
+ // behind the scenes, making the code slightly slower.
+ tryMsearch(msearchBody, callbacks, retrySearch)
+ function retrySearch (msearchBody, callbacks) {
+ if (msearchBody.length > 0 && retryCount > 0) {
+ retryCount -= 1
+ setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch)
+ return
+ }
+
+ done()
+ }
+
+ // This function never returns an error, if the msearch operation fails,
+ // the error is dispatched to all search executors.
+ function tryMsearch (msearchBody, callbacks, done) {
+ client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), (err, results) => {
+ const retryBody = []
+ const retryCallbacks = []
+ if (err) {
+ addDocumentsGetter(results)
+ for (const callback of callbacks) {
+ callback(err, results)
+ }
+ return done(retryBody, retryCallbacks)
+ }
+ const { responses } = results.body
+ for (let i = 0, len = responses.length; i < len; i++) {
+ const response = responses[i]
+ if (response.status === 429 && retryCount > 0) {
+ retryBody.push(msearchBody[i * 2])
+ retryBody.push(msearchBody[(i * 2) + 1])
+ retryCallbacks.push(callbacks[i])
+ continue
+ }
+ const result = { ...results, body: response }
+ addDocumentsGetter(result)
+ if (response.status >= 400) {
+ callbacks[i](new ResponseError(result), result)
+ } else {
+ callbacks[i](null, result)
+ }
+ }
+ done(retryBody, retryCallbacks)
+ })
+ }
+ }
+ }
+
+ /**
+ * Creates a bulk helper instance. Once you configure it, you can pick which operation
+ * to execute with the given dataset, index, create, update, and delete.
+ * @param {object} options - The configuration of the bulk operation.
+ * @return {object} The possible operations to run with the datasource.
+ */
+ bulk (options) {
+ const client = this[kClient]
+ const { serialize, deserialize } = client.serializer
+ const reqOptions = this[kMetaHeader] !== null ? { headers: { 'x-elastic-client-meta': this[kMetaHeader] + ',h=bp' } } : {}
+ const {
+ datasource,
+ onDocument,
+ flushBytes = 5000000,
+ flushInterval = 30000,
+ concurrency = 5,
+ retries = this.maxRetries,
+ wait = 5000,
+ onDrop = noop,
+ refreshOnCompletion = false,
+ ...bulkOptions
+ } = options
+
+ if (datasource === undefined) {
+ return Promise.reject(new ConfigurationError('bulk helper: the datasource is required'))
+ }
+ if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function' || datasource[Symbol.asyncIterator])) {
+ return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator'))
+ }
+ if (onDocument === undefined) {
+ return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required'))
+ }
+
+ let shouldAbort = false
+ let timeoutRef = null
+ const stats = {
+ total: 0,
+ failed: 0,
+ retry: 0,
+ successful: 0,
+ time: 0,
+ bytes: 0,
+ aborted: false
+ }
+
+ const p = iterate()
+ const helper = {
+ then (onFulfilled, onRejected) {
+ return p.then(onFulfilled, onRejected)
+ },
+ catch (onRejected) {
+ return p.catch(onRejected)
+ },
+ abort () {
+ clearTimeout(timeoutRef)
+ shouldAbort = true
+ stats.aborted = true
+ return this
+ }
+ }
+
+ return helper
+
+ /**
+ * Function that iterates over the given datasource and start a bulk operation as soon
+ * as it reaches the configured bulk size. It's designed to use the Node.js asynchronous
+ * model at this maximum capacity, as it will collect the next body to send while there is
+ * a running http call. In this way, the CPU time will be used carefully.
+ * The objects will be serialized right away, to approximate the byte length of the body.
+ * It creates an array of strings instead of a ndjson string because the bulkOperation
+ * will navigate the body for matching failed operations with the original document.
+ */
+ async function iterate () {
+ const { semaphore, finish } = buildSemaphore()
+ const startTime = Date.now()
+ const bulkBody = []
+ let actionBody = ''
+ let payloadBody = ''
+ let chunkBytes = 0
+ timeoutRef = setTimeout(onFlushTimeout, flushInterval)
+
+ for await (const chunk of datasource) {
+ if (shouldAbort === true) break
+ timeoutRef.refresh()
+ const action = onDocument(chunk)
+ const operation = Array.isArray(action)
+ ? Object.keys(action[0])[0]
+ : Object.keys(action)[0]
+ if (operation === 'index' || operation === 'create') {
+ actionBody = serialize(action)
+ payloadBody = typeof chunk === 'string' ? chunk : serialize(chunk)
+ chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
+ bulkBody.push(actionBody, payloadBody)
+ } else if (operation === 'update') {
+ actionBody = serialize(action[0])
+ payloadBody = typeof chunk === 'string'
+ ? `{"doc":${chunk}}`
+ : serialize({ doc: chunk, ...action[1] })
+ chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
+ bulkBody.push(actionBody, payloadBody)
+ } else if (operation === 'delete') {
+ actionBody = serialize(action)
+ chunkBytes += Buffer.byteLength(actionBody)
+ bulkBody.push(actionBody)
+ } else {
+ clearTimeout(timeoutRef)
+ throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`)
+ }
+
+ if (chunkBytes >= flushBytes) {
+ stats.bytes += chunkBytes
+ const send = await semaphore()
+ send(bulkBody.slice())
+ bulkBody.length = 0
+ chunkBytes = 0
+ }
+ }
+
+ clearTimeout(timeoutRef)
+ // In some cases the previos http call does not have finished,
+ // or we didn't reach the flush bytes threshold, so we force one last operation.
+ if (shouldAbort === false && chunkBytes > 0) {
+ const send = await semaphore()
+ stats.bytes += chunkBytes
+ send(bulkBody)
+ }
+
+ await finish()
+
+ if (refreshOnCompletion) {
+ await client.indices.refresh({
+ index: typeof refreshOnCompletion === 'string'
+ ? refreshOnCompletion
+ : '_all'
+ })
+ }
+
+ stats.time = Date.now() - startTime
+ stats.total = stats.successful + stats.failed
+
+ return stats
+
+ async function onFlushTimeout () {
+ if (chunkBytes === 0) return
+ stats.bytes += chunkBytes
+ const bulkBodyCopy = bulkBody.slice()
+ bulkBody.length = 0
+ chunkBytes = 0
+ try {
+ const send = await semaphore()
+ send(bulkBodyCopy)
+ } catch (err) {
+ /* istanbul ignore next */
+ helper.abort()
+ }
+ }
+ }
+
+ // This function builds a semaphore using the concurrency
+ // options of the bulk helper. It is used inside the iterator
+ // to guarantee that no more than the number of operations
+ // allowed to run at the same time are executed.
+ // It returns a semaphore function which resolves in the next tick
+ // if we didn't reach the maximim concurrency yet, otherwise it returns
+ // a promise that resolves as soon as one of the running request has finshed.
+ // The semaphore function resolves a send function, which will be used
+ // to send the actual bulk request.
+ // It also returns a finish function, which returns a promise that is resolved
+ // when there are no longer request running. It rejects an error if one
+ // of the request has failed for some reason.
+ function buildSemaphore () {
+ let resolveSemaphore = null
+ let resolveFinish = null
+ let rejectFinish = null
+ let error = null
+ let running = 0
+
+ return { semaphore, finish }
+
+ function finish () {
+ return new Promise((resolve, reject) => {
+ if (running === 0) {
+ if (error) {
+ reject(error)
+ } else {
+ resolve()
+ }
+ } else {
+ resolveFinish = resolve
+ rejectFinish = reject
+ }
+ })
+ }
+
+ function semaphore () {
+ if (running < concurrency) {
+ running += 1
+ return pImmediate(send)
+ } else {
+ return new Promise((resolve, reject) => {
+ resolveSemaphore = resolve
+ })
+ }
+ }
+
+ function send (bulkBody) {
+ /* istanbul ignore if */
+ if (running > concurrency) {
+ throw new Error('Max concurrency reached')
+ }
+ bulkOperation(bulkBody, err => {
+ running -= 1
+ if (err) {
+ shouldAbort = true
+ error = err
+ }
+ if (resolveSemaphore) {
+ running += 1
+ resolveSemaphore(send)
+ resolveSemaphore = null
+ } else if (resolveFinish && running === 0) {
+ if (error) {
+ rejectFinish(error)
+ } else {
+ resolveFinish()
+ }
+ }
+ })
+ }
+ }
+
+ function bulkOperation (bulkBody, callback) {
+ let retryCount = retries
+ let isRetrying = false
+
+ // Instead of going full on async-await, which would make the code easier to read,
+ // we have decided to use callback style instead.
+ // This because every time we use async await, V8 will create multiple promises
+ // behind the scenes, making the code slightly slower.
+ tryBulk(bulkBody, retryDocuments)
+ function retryDocuments (err, bulkBody) {
+ if (err) return callback(err)
+ if (shouldAbort === true) return callback()
+
+ if (bulkBody.length > 0) {
+ if (retryCount > 0) {
+ isRetrying = true
+ retryCount -= 1
+ stats.retry += bulkBody.length
+ setTimeout(tryBulk, wait, bulkBody, retryDocuments)
+ return
+ }
+ for (let i = 0, len = bulkBody.length; i < len; i = i + 2) {
+ const operation = Object.keys(deserialize(bulkBody[i]))[0]
+ onDrop({
+ status: 429,
+ error: null,
+ operation: deserialize(bulkBody[i]),
+ document: operation !== 'delete'
+ ? deserialize(bulkBody[i + 1])
+ /* istanbul ignore next */
+ : null,
+ retried: isRetrying
+ })
+ stats.failed += 1
+ }
+ }
+ callback()
+ }
+
+ function tryBulk (bulkBody, callback) {
+ if (shouldAbort === true) return callback(null, [])
+ client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions, (err, { body }) => {
+ if (err) return callback(err, null)
+ if (body.errors === false) {
+ stats.successful += body.items.length
+ return callback(null, [])
+ }
+ const retry = []
+ const { items } = body
+ for (let i = 0, len = items.length; i < len; i++) {
+ const action = items[i]
+ const operation = Object.keys(action)[0]
+ const { status } = action[operation]
+ const indexSlice = operation !== 'delete' ? i * 2 : i
+
+ if (status >= 400) {
+ // 429 is the only staus code where we might want to retry
+ // a document, because it was not an error in the document itself,
+ // but the ES node were handling too many operations.
+ if (status === 429) {
+ retry.push(bulkBody[indexSlice])
+ /* istanbul ignore next */
+ if (operation !== 'delete') {
+ retry.push(bulkBody[indexSlice + 1])
+ }
+ } else {
+ onDrop({
+ status: status,
+ error: action[operation].error,
+ operation: deserialize(bulkBody[indexSlice]),
+ document: operation !== 'delete'
+ ? deserialize(bulkBody[indexSlice + 1])
+ : null,
+ retried: isRetrying
+ })
+ stats.failed += 1
+ }
+ } else {
+ stats.successful += 1
+ }
+ }
+ callback(null, retry)
+ })
+ }
+ }
+ }
+}
+
+// Using a getter will improve the overall performances of the code,
+// as we will reed the documents only if needed.
+function addDocumentsGetter (result) {
+ Object.defineProperty(result, 'documents', {
+ get () {
+ if (this.body.hits && this.body.hits.hits) {
+ return this.body.hits.hits.map(d => d._source)
+ }
+ return []
+ }
+ })
+}
+
+function appendFilterPath (filter, params, force) {
+ if (params.filter_path !== undefined) {
+ params.filter_path += ',' + filter
+ } else if (params.filterPath !== undefined) {
+ params.filterPath += ',' + filter
+ } else if (force === true) {
+ params.filter_path = filter
+ }
+}
+
+module.exports = Helpers
diff --git a/lib/Serializer.d.ts b/lib/Serializer.d.ts
new file mode 100644
index 0000000..bf8453a
--- /dev/null
+++ b/lib/Serializer.d.ts
@@ -0,0 +1,25 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+export default class Serializer {
+ serialize(object: any): string;
+ deserialize(json: string): any;
+ ndserialize(array: any[]): string;
+ qserialize(object: any): string;
+}
diff --git a/lib/Serializer.js b/lib/Serializer.js
new file mode 100644
index 0000000..c2cd41b
--- /dev/null
+++ b/lib/Serializer.js
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+const { stringify } = require('querystring')
+const debug = require('debug')('elasticsearch')
+const sjson = require('secure-json-parse')
+const { SerializationError, DeserializationError } = require('./errors')
+
+class Serializer {
+ serialize (object) {
+ debug('Serializing', object)
+ try {
+ var json = JSON.stringify(object)
+ } catch (err) {
+ throw new SerializationError(err.message, object)
+ }
+ return json
+ }
+
+ deserialize (json) {
+ debug('Deserializing', json)
+ try {
+ var object = sjson.parse(json)
+ } catch (err) {
+ throw new DeserializationError(err.message, json)
+ }
+ return object
+ }
+
+ ndserialize (array) {
+ debug('ndserialize', array)
+ if (Array.isArray(array) === false) {
+ throw new SerializationError('The argument provided is not an array')
+ }
+ var ndjson = ''
+ for (var i = 0, len = array.length; i < len; i++) {
+ if (typeof array[i] === 'string') {
+ ndjson += array[i] + '\n'
+ } else {
+ ndjson += this.serialize(array[i]) + '\n'
+ }
+ }
+ return ndjson
+ }
+
+ qserialize (object) {
+ debug('qserialize', object)
+ if (object == null) return ''
+ if (typeof object === 'string') return object
+ // arrays should be serialized as comma separated list
+ const keys = Object.keys(object)
+ for (var i = 0, len = keys.length; i < len; i++) {
+ var key = keys[i]
+ // elasticsearch will complain for keys without a value
+ if (object[key] === undefined) {
+ delete object[key]
+ } else if (Array.isArray(object[key]) === true) {
+ object[key] = object[key].join(',')
+ }
+ }
+ return stringify(object)
+ }
+}
+
+module.exports = Serializer
diff --git a/lib/Transport.d.ts b/lib/Transport.d.ts
new file mode 100644
index 0000000..979393a
--- /dev/null
+++ b/lib/Transport.d.ts
@@ -0,0 +1,160 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { Readable as ReadableStream } from 'stream';
+import { ConnectionPool, CloudConnectionPool } from './pool';
+import Connection from './Connection';
+import Serializer from './Serializer';
+import * as errors from './errors';
+
+export type ApiError = errors.ConfigurationError | errors.ConnectionError |
+ errors.DeserializationError | errors.SerializationError |
+ errors.NoLivingConnectionsError | errors.ResponseError |
+ errors.TimeoutError | errors.RequestAbortedError
+
+export type Context = Record | null
+
+export interface nodeSelectorFn {
+ (connections: Connection[]): Connection;
+}
+
+export interface nodeFilterFn {
+ (connection: Connection): boolean;
+}
+
+export interface generateRequestIdFn {
+ (params: TransportRequestParams, options: TransportRequestOptions): any;
+}
+
+interface TransportOptions {
+ emit: (event: string | symbol, ...args: any[]) => boolean;
+ connectionPool: ConnectionPool | CloudConnectionPool;
+ serializer: Serializer;
+ maxRetries: number;
+ requestTimeout: number | string;
+ suggestCompression?: boolean;
+ compression?: 'gzip';
+ sniffInterval?: number;
+ sniffOnConnectionFault?: boolean;
+ sniffEndpoint: string;
+ sniffOnStart?: boolean;
+ nodeFilter?: nodeFilterFn;
+ nodeSelector?: string | nodeSelectorFn;
+ headers?: Record;
+ generateRequestId?: generateRequestIdFn;
+ name?: string;
+ opaqueIdPrefix?: string;
+}
+
+export interface RequestEvent, TContext = Context> {
+ body: TResponse;
+ statusCode: number | null;
+ headers: Record | null;
+ warnings: string[] | null;
+ meta: {
+ context: TContext;
+ name: string | symbol;
+ request: {
+ params: TransportRequestParams;
+ options: TransportRequestOptions;
+ id: any;
+ };
+ connection: Connection;
+ attempts: number;
+ aborted: boolean;
+ sniff?: {
+ hosts: any[];
+ reason: string;
+ };
+ };
+}
+
+// ApiResponse and RequestEvent are the same thing
+// we are doing this for have more clear names
+export interface ApiResponse, TContext = Context> extends RequestEvent {}
+
+export type RequestBody> = T | string | Buffer | ReadableStream
+export type RequestNDBody[]> = T | string | string[] | Buffer | ReadableStream
+
+export interface TransportRequestParams {
+ method: string;
+ path: string;
+ body?: RequestBody;
+ bulkBody?: RequestNDBody;
+ querystring?: Record | string;
+}
+
+export interface TransportRequestOptions {
+ ignore?: number[];
+ requestTimeout?: number | string;
+ maxRetries?: number;
+ asStream?: boolean;
+ headers?: Record;
+ querystring?: Record;
+ compression?: 'gzip';
+ id?: any;
+ context?: Context;
+ warnings?: string[];
+ opaqueId?: string;
+}
+
+export interface TransportRequestCallback {
+ abort: () => void;
+}
+
+export interface TransportRequestPromise extends Promise {
+ abort: () => void;
+}
+
+export interface TransportGetConnectionOptions {
+ requestId: string;
+}
+
+export interface TransportSniffOptions {
+ reason: string;
+ requestId?: string;
+}
+
+export default class Transport {
+ static sniffReasons: {
+ SNIFF_ON_START: string;
+ SNIFF_INTERVAL: string;
+ SNIFF_ON_CONNECTION_FAULT: string;
+ DEFAULT: string;
+ };
+ emit: (event: string | symbol, ...args: any[]) => boolean;
+ connectionPool: ConnectionPool | CloudConnectionPool;
+ serializer: Serializer;
+ maxRetries: number;
+ requestTimeout: number;
+ suggestCompression: boolean;
+ compression: 'gzip' | false;
+ sniffInterval: number;
+ sniffOnConnectionFault: boolean;
+ opaqueIdPrefix: string | null;
+ sniffEndpoint: string;
+ _sniffEnabled: boolean;
+ _nextSniff: number;
+ _isSniffing: boolean;
+ constructor(opts: TransportOptions);
+ request(params: TransportRequestParams, options?: TransportRequestOptions): TransportRequestPromise;
+ request(params: TransportRequestParams, options?: TransportRequestOptions, callback?: (err: ApiError, result: ApiResponse) => void): TransportRequestCallback;
+ getConnection(opts: TransportGetConnectionOptions): Connection | null;
+ sniff(opts?: TransportSniffOptions, callback?: (...args: any[]) => void): void;
+}
diff --git a/lib/Transport.js b/lib/Transport.js
new file mode 100644
index 0000000..46c1e39
--- /dev/null
+++ b/lib/Transport.js
@@ -0,0 +1,566 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+const debug = require('debug')('elasticsearch')
+const os = require('os')
+const { gzip, unzip, createGzip } = require('zlib')
+const buffer = require('buffer')
+const ms = require('ms')
+const {
+ ConnectionError,
+ RequestAbortedError,
+ NoLivingConnectionsError,
+ ResponseError,
+ ConfigurationError
+} = require('./errors')
+
+const noop = () => {}
+
+const clientVersion = require('../package.json').version
+const userAgent = `elasticsearch-js/${clientVersion} (${os.platform()} ${os.release()}-${os.arch()}; Node.js ${process.version})`
+const MAX_BUFFER_LENGTH = buffer.constants.MAX_LENGTH
+const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH
+
+class Transport {
+ constructor (opts) {
+ if (typeof opts.compression === 'string' && opts.compression !== 'gzip') {
+ throw new ConfigurationError(`Invalid compression: '${opts.compression}'`)
+ }
+
+ this.emit = opts.emit
+ this.connectionPool = opts.connectionPool
+ this.serializer = opts.serializer
+ this.maxRetries = opts.maxRetries
+ this.requestTimeout = toMs(opts.requestTimeout)
+ this.suggestCompression = opts.suggestCompression === true
+ this.compression = opts.compression || false
+ this.context = opts.context || null
+ this.headers = Object.assign({},
+ { 'user-agent': userAgent },
+ opts.suggestCompression === true ? { 'accept-encoding': 'gzip,deflate' } : null,
+ lowerCaseHeaders(opts.headers)
+ )
+ this.sniffInterval = opts.sniffInterval
+ this.sniffOnConnectionFault = opts.sniffOnConnectionFault
+ this.sniffEndpoint = opts.sniffEndpoint
+ this.generateRequestId = opts.generateRequestId || generateRequestId()
+ this.name = opts.name
+ this.opaqueIdPrefix = opts.opaqueIdPrefix
+
+ this.nodeFilter = opts.nodeFilter || defaultNodeFilter
+ if (typeof opts.nodeSelector === 'function') {
+ this.nodeSelector = opts.nodeSelector
+ } else if (opts.nodeSelector === 'round-robin') {
+ this.nodeSelector = roundRobinSelector()
+ } else if (opts.nodeSelector === 'random') {
+ this.nodeSelector = randomSelector
+ } else {
+ this.nodeSelector = roundRobinSelector()
+ }
+
+ this._sniffEnabled = typeof this.sniffInterval === 'number'
+ this._nextSniff = this._sniffEnabled ? (Date.now() + this.sniffInterval) : 0
+ this._isSniffing = false
+
+ if (opts.sniffOnStart === true) {
+ this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START })
+ }
+ }
+
+ request (params, options, callback) {
+ options = options || {}
+ if (typeof options === 'function') {
+ callback = options
+ options = {}
+ }
+ var p = null
+
+ // promises support
+ if (callback === undefined) {
+ let onFulfilled = null
+ let onRejected = null
+ p = new Promise((resolve, reject) => {
+ onFulfilled = resolve
+ onRejected = reject
+ })
+ callback = function callback (err, result) {
+ err ? onRejected(err) : onFulfilled(result)
+ }
+ }
+
+ const meta = {
+ context: null,
+ request: {
+ params: null,
+ options: null,
+ id: options.id || this.generateRequestId(params, options)
+ },
+ name: this.name,
+ connection: null,
+ attempts: 0,
+ aborted: false
+ }
+
+ if (this.context != null && options.context != null) {
+ meta.context = Object.assign({}, this.context, options.context)
+ } else if (this.context != null) {
+ meta.context = this.context
+ } else if (options.context != null) {
+ meta.context = options.context
+ }
+
+ const result = {
+ body: null,
+ statusCode: null,
+ headers: null,
+ meta
+ }
+
+ Object.defineProperty(result, 'warnings', {
+ get () {
+ return this.headers && this.headers.warning
+ ? this.headers.warning.split(/(?!\B"[^"]*),(?![^"]*"\B)/)
+ : null
+ }
+ })
+
+ // We should not retry if we are sending a stream body, because we should store in memory
+ // a copy of the stream to be able to send it again, but since we don't know in advance
+ // the size of the stream, we risk to take too much memory.
+ // Furthermore, copying everytime the stream is very a expensive operation.
+ const maxRetries = isStream(params.body) || isStream(params.bulkBody)
+ ? 0 : (typeof options.maxRetries === 'number' ? options.maxRetries : this.maxRetries)
+ const compression = options.compression !== undefined ? options.compression : this.compression
+ var request = { abort: noop }
+ const transportReturn = {
+ then (onFulfilled, onRejected) {
+ return p.then(onFulfilled, onRejected)
+ },
+ catch (onRejected) {
+ return p.catch(onRejected)
+ },
+ abort () {
+ meta.aborted = true
+ request.abort()
+ debug('Aborting request', params)
+ return this
+ }
+ }
+
+ const makeRequest = () => {
+ if (meta.aborted === true) {
+ return process.nextTick(callback, new RequestAbortedError(), result)
+ }
+ meta.connection = this.getConnection({ requestId: meta.request.id })
+ if (meta.connection == null) {
+ return process.nextTick(callback, new NoLivingConnectionsError(), result)
+ }
+ this.emit('request', null, result)
+ // perform the actual http request
+ request = meta.connection.request(params, onResponse)
+ }
+
+ const onConnectionError = (err) => {
+ if (err.name !== 'RequestAbortedError') {
+ // if there is an error in the connection
+ // let's mark the connection as dead
+ this.connectionPool.markDead(meta.connection)
+
+ if (this.sniffOnConnectionFault === true) {
+ this.sniff({
+ reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
+ requestId: meta.request.id
+ })
+ }
+
+ // retry logic
+ if (meta.attempts < maxRetries) {
+ meta.attempts++
+ debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
+ makeRequest()
+ return
+ }
+ }
+
+ err.meta = result
+ this.emit('response', err, result)
+ return callback(err, result)
+ }
+
+ const onResponse = (err, response) => {
+ if (err !== null) {
+ return onConnectionError(err)
+ }
+
+ result.statusCode = response.statusCode
+ result.headers = response.headers
+
+ if (options.asStream === true) {
+ result.body = response
+ this.emit('response', null, result)
+ callback(null, result)
+ return
+ }
+
+ const contentEncoding = (result.headers['content-encoding'] || '').toLowerCase()
+ const isCompressed = contentEncoding.indexOf('gzip') > -1 || contentEncoding.indexOf('deflate') > -1
+
+ /* istanbul ignore else */
+ if (result.headers['content-length'] !== undefined) {
+ const contentLength = Number(result.headers['content-length'])
+ if (isCompressed && contentLength > MAX_BUFFER_LENGTH) {
+ response.destroy()
+ return onConnectionError(
+ new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed buffer (${MAX_BUFFER_LENGTH})`, result)
+ )
+ } else if (contentLength > MAX_STRING_LENGTH) {
+ response.destroy()
+ return onConnectionError(
+ new RequestAbortedError(`The content length (${contentLength}) is bigger than the maximum allowed string (${MAX_STRING_LENGTH})`, result)
+ )
+ }
+ }
+ // if the response is compressed, we must handle it
+ // as buffer for allowing decompression later
+ let payload = isCompressed ? [] : ''
+ const onData = isCompressed
+ ? chunk => { payload.push(chunk) }
+ : chunk => { payload += chunk }
+ const onEnd = err => {
+ response.removeListener('data', onData)
+ response.removeListener('end', onEnd)
+ response.removeListener('error', onEnd)
+ response.removeListener('aborted', onAbort)
+
+ if (err) {
+ return onConnectionError(new ConnectionError(err.message))
+ }
+
+ if (isCompressed) {
+ unzip(Buffer.concat(payload), onBody)
+ } else {
+ onBody(null, payload)
+ }
+ }
+
+ const onAbort = () => {
+ response.destroy()
+ onEnd(new Error('Response aborted while reading the body'))
+ }
+
+ if (!isCompressed) {
+ response.setEncoding('utf8')
+ }
+
+ this.emit('deserialization', null, result)
+ response.on('data', onData)
+ response.on('error', onEnd)
+ response.on('end', onEnd)
+ response.on('aborted', onAbort)
+ }
+
+ const onBody = (err, payload) => {
+ if (err) {
+ this.emit('response', err, result)
+ return callback(err, result)
+ }
+ if (Buffer.isBuffer(payload)) {
+ payload = payload.toString()
+ }
+ const isHead = params.method === 'HEAD'
+ // we should attempt the payload deserialization only if:
+ // - a `content-type` is defined and is equal to `application/json`
+ // - the request is not a HEAD request
+ // - the payload is not an empty string
+ if (result.headers['content-type'] !== undefined &&
+ result.headers['content-type'].indexOf('application/json') > -1 &&
+ isHead === false &&
+ payload !== ''
+ ) {
+ try {
+ result.body = this.serializer.deserialize(payload)
+ } catch (err) {
+ this.emit('response', err, result)
+ return callback(err, result)
+ }
+ } else {
+ // cast to boolean if the request method was HEAD
+ result.body = isHead === true ? true : payload
+ }
+
+ // we should ignore the statusCode if the user has configured the `ignore` field with
+ // the statusCode we just got or if the request method is HEAD and the statusCode is 404
+ const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(result.statusCode) > -1) ||
+ (isHead === true && result.statusCode === 404)
+
+ if (ignoreStatusCode === false &&
+ (result.statusCode === 502 || result.statusCode === 503 || result.statusCode === 504)) {
+ // if the statusCode is 502/3/4 we should run our retry strategy
+ // and mark the connection as dead
+ this.connectionPool.markDead(meta.connection)
+ // retry logic (we shoukd not retry on "429 - Too Many Requests")
+ if (meta.attempts < maxRetries && result.statusCode !== 429) {
+ meta.attempts++
+ debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
+ makeRequest()
+ return
+ }
+ } else {
+ // everything has worked as expected, let's mark
+ // the connection as alive (or confirm it)
+ this.connectionPool.markAlive(meta.connection)
+ }
+
+ if (ignoreStatusCode === false && result.statusCode >= 400) {
+ const error = new ResponseError(result)
+ this.emit('response', error, result)
+ callback(error, result)
+ } else {
+ // cast to boolean if the request method was HEAD
+ if (isHead === true && result.statusCode === 404) {
+ result.body = false
+ }
+ this.emit('response', null, result)
+ callback(null, result)
+ }
+ }
+
+ this.emit('serialization', null, result)
+ const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))
+
+ if (options.opaqueId !== undefined) {
+ headers['x-opaque-id'] = this.opaqueIdPrefix !== null
+ ? this.opaqueIdPrefix + options.opaqueId
+ : options.opaqueId
+ }
+
+ // handle json body
+ if (params.body != null) {
+ if (shouldSerialize(params.body) === true) {
+ try {
+ params.body = this.serializer.serialize(params.body)
+ } catch (err) {
+ this.emit('request', err, result)
+ process.nextTick(callback, err, result)
+ return transportReturn
+ }
+ }
+
+ if (params.body !== '') {
+ headers['content-type'] = headers['content-type'] || 'application/json'
+ }
+
+ // handle ndjson body
+ } else if (params.bulkBody != null) {
+ if (shouldSerialize(params.bulkBody) === true) {
+ try {
+ params.body = this.serializer.ndserialize(params.bulkBody)
+ } catch (err) {
+ this.emit('request', err, result)
+ process.nextTick(callback, err, result)
+ return transportReturn
+ }
+ } else {
+ params.body = params.bulkBody
+ }
+ if (params.body !== '') {
+ headers['content-type'] = headers['content-type'] || 'application/x-ndjson'
+ }
+ }
+
+ params.headers = headers
+ // serializes the querystring
+ if (options.querystring == null) {
+ params.querystring = this.serializer.qserialize(params.querystring)
+ } else {
+ params.querystring = this.serializer.qserialize(
+ Object.assign({}, params.querystring, options.querystring)
+ )
+ }
+
+ // handles request timeout
+ params.timeout = toMs(options.requestTimeout || this.requestTimeout)
+ if (options.asStream === true) params.asStream = true
+ meta.request.params = params
+ meta.request.options = options
+
+ // handle compression
+ if (params.body !== '' && params.body != null) {
+ if (isStream(params.body) === true) {
+ if (compression === 'gzip') {
+ params.headers['content-encoding'] = compression
+ params.body = params.body.pipe(createGzip())
+ }
+ makeRequest()
+ } else if (compression === 'gzip') {
+ gzip(params.body, (err, buffer) => {
+ /* istanbul ignore next */
+ if (err) {
+ this.emit('request', err, result)
+ return callback(err, result)
+ }
+ params.headers['content-encoding'] = compression
+ params.headers['content-length'] = '' + Buffer.byteLength(buffer)
+ params.body = buffer
+ makeRequest()
+ })
+ } else {
+ params.headers['content-length'] = '' + Buffer.byteLength(params.body)
+ makeRequest()
+ }
+ } else {
+ makeRequest()
+ }
+
+ return transportReturn
+ }
+
+ getConnection (opts) {
+ const now = Date.now()
+ if (this._sniffEnabled === true && now > this._nextSniff) {
+ this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId })
+ }
+ return this.connectionPool.getConnection({
+ filter: this.nodeFilter,
+ selector: this.nodeSelector,
+ requestId: opts.requestId,
+ name: this.name,
+ now
+ })
+ }
+
+ sniff (opts, callback = noop) {
+ if (this._isSniffing === true) return
+ this._isSniffing = true
+ debug('Started sniffing request')
+
+ if (typeof opts === 'function') {
+ callback = opts
+ opts = { reason: Transport.sniffReasons.DEFAULT }
+ }
+
+ const { reason } = opts
+
+ const request = {
+ method: 'GET',
+ path: this.sniffEndpoint
+ }
+
+ this.request(request, { id: opts.requestId }, (err, result) => {
+ this._isSniffing = false
+ if (this._sniffEnabled === true) {
+ this._nextSniff = Date.now() + this.sniffInterval
+ }
+
+ if (err != null) {
+ debug('Sniffing errored', err)
+ result.meta.sniff = { hosts: [], reason }
+ this.emit('sniff', err, result)
+ return callback(err)
+ }
+
+ debug('Sniffing ended successfully', result.body)
+ const protocol = result.meta.connection.url.protocol || /* istanbul ignore next */ 'http:'
+ const hosts = this.connectionPool.nodesToHost(result.body.nodes, protocol)
+ this.connectionPool.update(hosts)
+
+ result.meta.sniff = { hosts, reason }
+ this.emit('sniff', null, result)
+ callback(null, hosts)
+ })
+ }
+}
+
+Transport.sniffReasons = {
+ SNIFF_ON_START: 'sniff-on-start',
+ SNIFF_INTERVAL: 'sniff-interval',
+ SNIFF_ON_CONNECTION_FAULT: 'sniff-on-connection-fault',
+ // TODO: find a better name
+ DEFAULT: 'default'
+}
+
+function toMs (time) {
+ if (typeof time === 'string') {
+ return ms(time)
+ }
+ return time
+}
+
+function shouldSerialize (obj) {
+ return typeof obj !== 'string' &&
+ typeof obj.pipe !== 'function' &&
+ Buffer.isBuffer(obj) === false
+}
+
+function isStream (obj) {
+ return obj != null && typeof obj.pipe === 'function'
+}
+
+function defaultNodeFilter (node) {
+ // avoid master only nodes
+ if (node.roles.master === true &&
+ node.roles.data === false &&
+ node.roles.ingest === false) {
+ return false
+ }
+ return true
+}
+
+function roundRobinSelector () {
+ var current = -1
+ return function _roundRobinSelector (connections) {
+ if (++current >= connections.length) {
+ current = 0
+ }
+ return connections[current]
+ }
+}
+
+function randomSelector (connections) {
+ const index = Math.floor(Math.random() * connections.length)
+ return connections[index]
+}
+
+function generateRequestId () {
+ var maxInt = 2147483647
+ var nextReqId = 0
+ return function genReqId (params, options) {
+ return (nextReqId = (nextReqId + 1) & maxInt)
+ }
+}
+
+function lowerCaseHeaders (oldHeaders) {
+ if (oldHeaders == null) return oldHeaders
+ const newHeaders = {}
+ for (const header in oldHeaders) {
+ newHeaders[header.toLowerCase()] = oldHeaders[header]
+ }
+ return newHeaders
+}
+
+module.exports = Transport
+module.exports.internals = {
+ defaultNodeFilter,
+ roundRobinSelector,
+ randomSelector,
+ generateRequestId,
+ lowerCaseHeaders
+}
diff --git a/lib/errors.d.ts b/lib/errors.d.ts
new file mode 100644
index 0000000..12241e4
--- /dev/null
+++ b/lib/errors.d.ts
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { ApiResponse, Context } from './Transport'
+
+export declare class ElasticsearchClientError extends Error {
+ name: string;
+ message: string;
+}
+
+export declare class TimeoutError, TContext = Context> extends ElasticsearchClientError {
+ name: string;
+ message: string;
+ meta: ApiResponse;
+ constructor(message: string, meta: ApiResponse);
+}
+
+export declare class ConnectionError, TContext = Context> extends ElasticsearchClientError {
+ name: string;
+ message: string;
+ meta: ApiResponse;
+ constructor(message: string, meta: ApiResponse);
+}
+
+export declare class NoLivingConnectionsError, TContext = Context> extends ElasticsearchClientError {
+ name: string;
+ message: string;
+ meta: ApiResponse;
+ constructor(message: string, meta: ApiResponse);
+}
+
+export declare class SerializationError extends ElasticsearchClientError {
+ name: string;
+ message: string;
+ data: any;
+ constructor(message: string, data: any);
+}
+
+export declare class DeserializationError extends ElasticsearchClientError {
+ name: string;
+ message: string;
+ data: string;
+ constructor(message: string, data: string);
+}
+
+export declare class ConfigurationError extends ElasticsearchClientError {
+ name: string;
+ message: string;
+ constructor(message: string);
+}
+
+export declare class ResponseError, TContext = Context> extends ElasticsearchClientError {
+ name: string;
+ message: string;
+ meta: ApiResponse;
+ body: TResponse;
+ statusCode: number;
+ headers: Record;
+ constructor(meta: ApiResponse);
+}
+
+export declare class RequestAbortedError, TContext = Context> extends ElasticsearchClientError {
+ name: string;
+ message: string;
+ meta: ApiResponse;
+ constructor(message: string, meta: ApiResponse);
+}
diff --git a/lib/errors.js b/lib/errors.js
new file mode 100644
index 0000000..5f6d199
--- /dev/null
+++ b/lib/errors.js
@@ -0,0 +1,133 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+class ElasticsearchClientError extends Error {
+ constructor (message) {
+ super(message)
+ this.name = 'ElasticsearchClientError'
+ }
+}
+
+class TimeoutError extends ElasticsearchClientError {
+ constructor (message, meta) {
+ super(message)
+ Error.captureStackTrace(this, TimeoutError)
+ this.name = 'TimeoutError'
+ this.message = message || 'Timeout Error'
+ this.meta = meta
+ }
+}
+
+class ConnectionError extends ElasticsearchClientError {
+ constructor (message, meta) {
+ super(message)
+ Error.captureStackTrace(this, ConnectionError)
+ this.name = 'ConnectionError'
+ this.message = message || 'Connection Error'
+ this.meta = meta
+ }
+}
+
+class NoLivingConnectionsError extends ElasticsearchClientError {
+ constructor (message, meta) {
+ super(message)
+ Error.captureStackTrace(this, NoLivingConnectionsError)
+ this.name = 'NoLivingConnectionsError'
+ this.message = message || 'Given the configuration, the ConnectionPool was not able to find a usable Connection for this request.'
+ this.meta = meta
+ }
+}
+
+class SerializationError extends ElasticsearchClientError {
+ constructor (message, data) {
+ super(message, data)
+ Error.captureStackTrace(this, SerializationError)
+ this.name = 'SerializationError'
+ this.message = message || 'Serialization Error'
+ this.data = data
+ }
+}
+
+class DeserializationError extends ElasticsearchClientError {
+ constructor (message, data) {
+ super(message, data)
+ Error.captureStackTrace(this, DeserializationError)
+ this.name = 'DeserializationError'
+ this.message = message || 'Deserialization Error'
+ this.data = data
+ }
+}
+
+class ConfigurationError extends ElasticsearchClientError {
+ constructor (message) {
+ super(message)
+ Error.captureStackTrace(this, ConfigurationError)
+ this.name = 'ConfigurationError'
+ this.message = message || 'Configuration Error'
+ }
+}
+
+class ResponseError extends ElasticsearchClientError {
+ constructor (meta) {
+ super('Response Error')
+ Error.captureStackTrace(this, ResponseError)
+ this.name = 'ResponseError'
+ this.message = (meta.body && meta.body.error && meta.body.error.type) || 'Response Error'
+ this.meta = meta
+ }
+
+ get body () {
+ return this.meta.body
+ }
+
+ get statusCode () {
+ if (this.meta.body && typeof this.meta.body.status === 'number') {
+ return this.meta.body.status
+ }
+ return this.meta.statusCode
+ }
+
+ get headers () {
+ return this.meta.headers
+ }
+}
+
+class RequestAbortedError extends ElasticsearchClientError {
+ constructor (message, meta) {
+ super(message)
+ Error.captureStackTrace(this, RequestAbortedError)
+ this.name = 'RequestAbortedError'
+ this.message = message || 'Request aborted'
+ this.meta = meta
+ }
+}
+
+module.exports = {
+ ElasticsearchClientError,
+ TimeoutError,
+ ConnectionError,
+ NoLivingConnectionsError,
+ SerializationError,
+ DeserializationError,
+ ConfigurationError,
+ ResponseError,
+ RequestAbortedError
+}
diff --git a/lib/pool/BaseConnectionPool.js b/lib/pool/BaseConnectionPool.js
new file mode 100644
index 0000000..8975ea2
--- /dev/null
+++ b/lib/pool/BaseConnectionPool.js
@@ -0,0 +1,259 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+const { URL } = require('url')
+const debug = require('debug')('elasticsearch')
+const Connection = require('../Connection')
+const noop = () => {}
+
+class BaseConnectionPool {
+ constructor (opts) {
+ // list of nodes and weights
+ this.connections = []
+ // how many nodes we have in our scheduler
+ this.size = this.connections.length
+ this.Connection = opts.Connection
+ this.emit = opts.emit || noop
+ this.auth = opts.auth || null
+ this._ssl = opts.ssl
+ this._agent = opts.agent
+ this._proxy = opts.proxy || null
+ }
+
+ getConnection () {
+ throw new Error('getConnection must be implemented')
+ }
+
+ markAlive () {
+ return this
+ }
+
+ markDead () {
+ return this
+ }
+
+ /**
+ * Creates a new connection instance.
+ */
+ createConnection (opts) {
+ if (typeof opts === 'string') {
+ opts = this.urlToHost(opts)
+ }
+
+ if (this.auth !== null) {
+ opts.auth = this.auth
+ } else if (opts.url.username !== '' && opts.url.password !== '') {
+ opts.auth = {
+ username: decodeURIComponent(opts.url.username),
+ password: decodeURIComponent(opts.url.password)
+ }
+ }
+
+ if (opts.ssl == null) opts.ssl = this._ssl
+ /* istanbul ignore else */
+ if (opts.agent == null) opts.agent = this._agent
+ /* istanbul ignore else */
+ if (opts.proxy == null) opts.proxy = this._proxy
+
+ const connection = new this.Connection(opts)
+
+ for (const conn of this.connections) {
+ if (conn.id === connection.id) {
+ throw new Error(`Connection with id '${connection.id}' is already present`)
+ }
+ }
+
+ return connection
+ }
+
+ /**
+ * Adds a new connection to the pool.
+ *
+ * @param {object|string} host
+ * @returns {ConnectionPool}
+ */
+ addConnection (opts) {
+ if (Array.isArray(opts)) {
+ return opts.forEach(o => this.addConnection(o))
+ }
+
+ if (typeof opts === 'string') {
+ opts = this.urlToHost(opts)
+ }
+
+ const connectionById = this.connections.find(c => c.id === opts.id)
+ const connectionByUrl = this.connections.find(c => c.id === opts.url.href)
+
+ if (connectionById || connectionByUrl) {
+ throw new Error(`Connection with id '${opts.id || opts.url.href}' is already present`)
+ }
+
+ this.update([...this.connections, opts])
+ return this.connections[this.size - 1]
+ }
+
+ /**
+ * Removes a new connection to the pool.
+ *
+ * @param {object} connection
+ * @returns {ConnectionPool}
+ */
+ removeConnection (connection) {
+ debug('Removing connection', connection)
+ return this.update(this.connections.filter(c => c.id !== connection.id))
+ }
+
+ /**
+ * Empties the connection pool.
+ *
+ * @returns {ConnectionPool}
+ */
+ empty (callback) {
+ debug('Emptying the connection pool')
+ var openConnections = this.size
+ this.connections.forEach(connection => {
+ connection.close(() => {
+ if (--openConnections === 0) {
+ this.connections = []
+ this.size = this.connections.length
+ callback()
+ }
+ })
+ })
+ }
+
+ /**
+ * Update the ConnectionPool with new connections.
+ *
+ * @param {array} array of connections
+ * @returns {ConnectionPool}
+ */
+ update (nodes) {
+ debug('Updating the connection pool')
+ const newConnections = []
+ const oldConnections = []
+
+ for (const node of nodes) {
+ // if we already have a given connection in the pool
+ // we mark it as alive and we do not close the connection
+ // to avoid socket issues
+ const connectionById = this.connections.find(c => c.id === node.id)
+ const connectionByUrl = this.connections.find(c => c.id === node.url.href)
+ if (connectionById) {
+ debug(`The connection with id '${node.id}' is already present`)
+ this.markAlive(connectionById)
+ newConnections.push(connectionById)
+ // in case the user has passed a single url (or an array of urls),
+ // the connection id will be the full href; to avoid closing valid connections
+ // because are not present in the pool, we check also the node url,
+ // and if is already present we update its id with the ES provided one.
+ } else if (connectionByUrl) {
+ connectionByUrl.id = node.id
+ this.markAlive(connectionByUrl)
+ newConnections.push(connectionByUrl)
+ } else {
+ newConnections.push(this.createConnection(node))
+ }
+ }
+
+ const ids = nodes.map(c => c.id)
+ // remove all the dead connections and old connections
+ for (const connection of this.connections) {
+ if (ids.indexOf(connection.id) === -1) {
+ oldConnections.push(connection)
+ }
+ }
+
+ // close old connections
+ oldConnections.forEach(connection => connection.close())
+
+ this.connections = newConnections
+ this.size = this.connections.length
+
+ return this
+ }
+
+ /**
+ * Transforms the nodes objects to a host object.
+ *
+ * @param {object} nodes
+ * @returns {array} hosts
+ */
+ nodesToHost (nodes, protocol) {
+ const ids = Object.keys(nodes)
+ const hosts = []
+
+ for (var i = 0, len = ids.length; i < len; i++) {
+ const node = nodes[ids[i]]
+ // If there is no protocol in
+ // the `publish_address` new URL will throw
+ // the publish_address can have two forms:
+ // - ip:port
+ // - hostname/ip:port
+ // if we encounter the second case, we should
+ // use the hostname instead of the ip
+ var address = node.http.publish_address
+ const parts = address.split('/')
+ // the url is in the form of hostname/ip:port
+ if (parts.length > 1) {
+ const hostname = parts[0]
+ const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1)
+ address = `${hostname}:${port}`
+ }
+
+ address = address.slice(0, 4) === 'http'
+ /* istanbul ignore next */
+ ? address
+ : `${protocol}//${address}`
+ const roles = node.roles.reduce((acc, role) => {
+ acc[role] = true
+ return acc
+ }, {})
+
+ hosts.push({
+ url: new URL(address),
+ id: ids[i],
+ roles: Object.assign({
+ [Connection.roles.MASTER]: false,
+ [Connection.roles.DATA]: false,
+ [Connection.roles.INGEST]: false,
+ [Connection.roles.ML]: false
+ }, roles)
+ })
+ }
+
+ return hosts
+ }
+
+ /**
+ * Transforms an url string to a host object
+ *
+ * @param {string} url
+ * @returns {object} host
+ */
+ urlToHost (url) {
+ return {
+ url: new URL(url)
+ }
+ }
+}
+
+module.exports = BaseConnectionPool
diff --git a/lib/pool/CloudConnectionPool.js b/lib/pool/CloudConnectionPool.js
new file mode 100644
index 0000000..6f68f61
--- /dev/null
+++ b/lib/pool/CloudConnectionPool.js
@@ -0,0 +1,64 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+const BaseConnectionPool = require('./BaseConnectionPool')
+
+class CloudConnectionPool extends BaseConnectionPool {
+ constructor (opts) {
+ super(opts)
+ this.cloudConnection = null
+ }
+
+ /**
+ * Returns the only cloud connection.
+ *
+ * @returns {object} connection
+ */
+ getConnection () {
+ return this.cloudConnection
+ }
+
+ /**
+ * Empties the connection pool.
+ *
+ * @returns {ConnectionPool}
+ */
+ empty (callback) {
+ super.empty(() => {
+ this.cloudConnection = null
+ callback()
+ })
+ }
+
+ /**
+ * Update the ConnectionPool with new connections.
+ *
+ * @param {array} array of connections
+ * @returns {ConnectionPool}
+ */
+ update (connections) {
+ super.update(connections)
+ this.cloudConnection = this.connections[0]
+ return this
+ }
+}
+
+module.exports = CloudConnectionPool
diff --git a/lib/pool/ConnectionPool.js b/lib/pool/ConnectionPool.js
new file mode 100644
index 0000000..5a6c09a
--- /dev/null
+++ b/lib/pool/ConnectionPool.js
@@ -0,0 +1,247 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+const BaseConnectionPool = require('./BaseConnectionPool')
+const assert = require('assert')
+const debug = require('debug')('elasticsearch')
+const Connection = require('../Connection')
+const noop = () => {}
+
+class ConnectionPool extends BaseConnectionPool {
+ constructor (opts) {
+ super(opts)
+
+ this.dead = []
+ // the resurrect timeout is 60s
+ this.resurrectTimeout = 1000 * 60
+ // number of consecutive failures after which
+ // the timeout doesn't increase
+ this.resurrectTimeoutCutoff = 5
+ this.pingTimeout = opts.pingTimeout
+ this._sniffEnabled = opts.sniffEnabled || false
+
+ const resurrectStrategy = opts.resurrectStrategy || 'ping'
+ this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
+ assert(
+ this.resurrectStrategy != null,
+ `Invalid resurrection strategy: '${resurrectStrategy}'`
+ )
+ }
+
+ /**
+ * Marks a connection as 'alive'.
+ * If needed removes the connection from the dead list
+ * and then resets the `deadCount`.
+ *
+ * @param {object} connection
+ */
+ markAlive (connection) {
+ const { id } = connection
+ debug(`Marking as 'alive' connection '${id}'`)
+ const index = this.dead.indexOf(id)
+ if (index > -1) this.dead.splice(index, 1)
+ connection.status = Connection.statuses.ALIVE
+ connection.deadCount = 0
+ connection.resurrectTimeout = 0
+ return this
+ }
+
+ /**
+ * Marks a connection as 'dead'.
+ * If needed adds the connection to the dead list
+ * and then increments the `deadCount`.
+ *
+ * @param {object} connection
+ */
+ markDead (connection) {
+ const { id } = connection
+ debug(`Marking as 'dead' connection '${id}'`)
+ if (this.dead.indexOf(id) === -1) {
+ // It might happen that `markDead` is called jsut after
+ // a pool update, and in such case we will add to the dead
+ // list a node that no longer exist. The following check verify
+ // that the connection is still part of the pool before
+ // marking it as dead.
+ for (var i = 0; i < this.size; i++) {
+ if (this.connections[i].id === id) {
+ this.dead.push(id)
+ break
+ }
+ }
+ }
+ connection.status = Connection.statuses.DEAD
+ connection.deadCount++
+ // resurrectTimeout formula:
+ // `resurrectTimeout * 2 ** min(deadCount - 1, resurrectTimeoutCutoff)`
+ connection.resurrectTimeout = Date.now() + this.resurrectTimeout * Math.pow(
+ 2, Math.min(connection.deadCount - 1, this.resurrectTimeoutCutoff)
+ )
+
+ // sort the dead list in ascending order
+ // based on the resurrectTimeout
+ this.dead.sort((a, b) => {
+ const conn1 = this.connections.find(c => c.id === a)
+ const conn2 = this.connections.find(c => c.id === b)
+ return conn1.resurrectTimeout - conn2.resurrectTimeout
+ })
+
+ return this
+ }
+
+ /**
+ * If enabled, tries to resurrect a connection with the given
+ * resurrect strategy ('ping', 'optimistic', 'none').
+ *
+ * @param {object} { now, requestId }
+ * @param {function} callback (isAlive, connection)
+ */
+ resurrect (opts, callback = noop) {
+ if (this.resurrectStrategy === 0 || this.dead.length === 0) {
+ debug('Nothing to resurrect')
+ callback(null, null)
+ return
+ }
+
+ // the dead list is sorted in ascending order based on the timeout
+ // so the first element will always be the one with the smaller timeout
+ const connection = this.connections.find(c => c.id === this.dead[0])
+ if ((opts.now || Date.now()) < connection.resurrectTimeout) {
+ debug('Nothing to resurrect')
+ callback(null, null)
+ return
+ }
+
+ const { id } = connection
+
+ // ping strategy
+ if (this.resurrectStrategy === 1) {
+ connection.request({
+ method: 'HEAD',
+ path: '/',
+ timeout: this.pingTimeout
+ }, (err, response) => {
+ var isAlive = true
+ const statusCode = response !== null ? response.statusCode : 0
+ if (err != null ||
+ (statusCode === 502 || statusCode === 503 || statusCode === 504)) {
+ debug(`Resurrect: connection '${id}' is still dead`)
+ this.markDead(connection)
+ isAlive = false
+ } else {
+ debug(`Resurrect: connection '${id}' is now alive`)
+ this.markAlive(connection)
+ }
+ this.emit('resurrect', null, {
+ strategy: 'ping',
+ name: opts.name,
+ request: { id: opts.requestId },
+ isAlive,
+ connection
+ })
+ callback(isAlive, connection)
+ })
+ // optimistic strategy
+ } else {
+ debug(`Resurrect: optimistic resurrection for connection '${id}'`)
+ this.dead.splice(this.dead.indexOf(id), 1)
+ connection.status = Connection.statuses.ALIVE
+ this.emit('resurrect', null, {
+ strategy: 'optimistic',
+ name: opts.name,
+ request: { id: opts.requestId },
+ isAlive: true,
+ connection
+ })
+ // eslint-disable-next-line standard/no-callback-literal
+ callback(true, connection)
+ }
+ }
+
+ /**
+ * Returns an alive connection if present,
+ * otherwise returns a dead connection.
+ * By default it filters the `master` only nodes.
+ * It uses the selector to choose which
+ * connection return.
+ *
+ * @param {object} options (filter and selector)
+ * @returns {object|null} connection
+ */
+ getConnection (opts = {}) {
+ const filter = opts.filter || (() => true)
+ const selector = opts.selector || (c => c[0])
+
+ this.resurrect({
+ now: opts.now,
+ requestId: opts.requestId,
+ name: opts.name
+ })
+
+ const noAliveConnections = this.size === this.dead.length
+
+ // TODO: can we cache this?
+ const connections = []
+ for (var i = 0; i < this.size; i++) {
+ const connection = this.connections[i]
+ if (noAliveConnections || connection.status === Connection.statuses.ALIVE) {
+ if (filter(connection) === true) {
+ connections.push(connection)
+ }
+ }
+ }
+
+ if (connections.length === 0) return null
+
+ return selector(connections)
+ }
+
+ /**
+ * Empties the connection pool.
+ *
+ * @returns {ConnectionPool}
+ */
+ empty (callback) {
+ super.empty(() => {
+ this.dead = []
+ callback()
+ })
+ }
+
+ /**
+ * Update the ConnectionPool with new connections.
+ *
+ * @param {array} array of connections
+ * @returns {ConnectionPool}
+ */
+ update (connections) {
+ super.update(connections)
+ this.dead = []
+ return this
+ }
+}
+
+ConnectionPool.resurrectStrategies = {
+ none: 0,
+ ping: 1,
+ optimistic: 2
+}
+
+module.exports = ConnectionPool
diff --git a/lib/pool/index.d.ts b/lib/pool/index.d.ts
new file mode 100644
index 0000000..246f88d
--- /dev/null
+++ b/lib/pool/index.d.ts
@@ -0,0 +1,214 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+///
+
+import { URL } from 'url'
+import { SecureContextOptions } from 'tls';
+import Connection, { AgentOptions } from '../Connection';
+import { nodeFilterFn, nodeSelectorFn } from '../Transport';
+
+interface BaseConnectionPoolOptions {
+ ssl?: SecureContextOptions;
+ agent?: AgentOptions;
+ proxy?: string | URL;
+ auth?: BasicAuth | ApiKeyAuth;
+ emit: (event: string | symbol, ...args: any[]) => boolean;
+ Connection: typeof Connection;
+}
+
+interface ConnectionPoolOptions extends BaseConnectionPoolOptions {
+ pingTimeout?: number;
+ resurrectStrategy?: 'ping' | 'optimistic' | 'none';
+ sniffEnabled?: boolean;
+}
+
+interface getConnectionOptions {
+ filter?: nodeFilterFn;
+ selector?: nodeSelectorFn;
+ requestId?: string | number;
+ name?: string;
+ now?: number;
+}
+
+interface ApiKeyAuth {
+ apiKey:
+ | string
+ | {
+ id: string;
+ api_key: string;
+ }
+}
+
+interface BasicAuth {
+ username: string;
+ password: string;
+}
+
+interface resurrectOptions {
+ now?: number;
+ requestId: string;
+ name: string;
+}
+
+interface ResurrectEvent {
+ strategy: string;
+ isAlive: boolean;
+ connection: Connection;
+ name: string;
+ request: {
+ id: any;
+ };
+}
+
+
+declare class BaseConnectionPool {
+ connections: Connection[];
+ size: number;
+ emit: (event: string | symbol, ...args: any[]) => boolean;
+ _ssl: SecureContextOptions | null;
+ _agent: AgentOptions | null;
+ _proxy: string | URL;
+ auth: BasicAuth | ApiKeyAuth;
+ Connection: typeof Connection;
+ constructor(opts?: BaseConnectionPoolOptions);
+ /**
+ * Marks a connection as 'alive'.
+ * If needed removes the connection from the dead list
+ * and then resets the `deadCount`.
+ *
+ * @param {object} connection
+ */
+ markAlive(connection: Connection): this;
+ /**
+ * Marks a connection as 'dead'.
+ * If needed adds the connection to the dead list
+ * and then increments the `deadCount`.
+ *
+ * @param {object} connection
+ */
+ markDead(connection: Connection): this;
+ /**
+ * Returns an alive connection if present,
+ * otherwise returns a dead connection.
+ * By default it filters the `master` only nodes.
+ * It uses the selector to choose which
+ * connection return.
+ *
+ * @param {object} options (filter and selector)
+ * @returns {object|null} connection
+ */
+ getConnection(opts?: getConnectionOptions): Connection | null;
+ /**
+ * Adds a new connection to the pool.
+ *
+ * @param {object|string} host
+ * @returns {ConnectionPool}
+ */
+ addConnection(opts: any): Connection;
+ /**
+ * Removes a new connection to the pool.
+ *
+ * @param {object} connection
+ * @returns {ConnectionPool}
+ */
+ removeConnection(connection: Connection): this;
+ /**
+ * Empties the connection pool.
+ *
+ * @returns {ConnectionPool}
+ */
+ empty(): this;
+ /**
+ * Update the ConnectionPool with new connections.
+ *
+ * @param {array} array of connections
+ * @returns {ConnectionPool}
+ */
+ update(connections: any[]): this;
+ /**
+ * Transforms the nodes objects to a host object.
+ *
+ * @param {object} nodes
+ * @returns {array} hosts
+ */
+ nodesToHost(nodes: any, protocol: string): any[];
+ /**
+ * Transforms an url string to a host object
+ *
+ * @param {string} url
+ * @returns {object} host
+ */
+ urlToHost(url: string): { url: URL };
+}
+
+declare class ConnectionPool extends BaseConnectionPool {
+ static resurrectStrategies: {
+ none: number;
+ ping: number;
+ optimistic: number;
+ };
+ dead: string[];
+ _sniffEnabled: boolean;
+ resurrectTimeout: number;
+ resurrectTimeoutCutoff: number;
+ pingTimeout: number;
+ resurrectStrategy: number;
+ constructor(opts?: ConnectionPoolOptions);
+
+ /**
+ * If enabled, tries to resurrect a connection with the given
+ * resurrect strategy ('ping', 'optimistic', 'none').
+ *
+ * @param {object} { now, requestId, name }
+ * @param {function} callback (isAlive, connection)
+ */
+ resurrect(opts: resurrectOptions, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void;
+}
+
+declare class CloudConnectionPool extends BaseConnectionPool {
+ cloudConnection: Connection | null
+ constructor(opts?: BaseConnectionPoolOptions);
+ getConnection(): Connection | null;
+}
+
+declare function defaultNodeFilter(node: Connection): boolean;
+declare function roundRobinSelector(): (connections: Connection[]) => Connection;
+declare function randomSelector(connections: Connection[]): Connection;
+
+declare const internals: {
+ defaultNodeFilter: typeof defaultNodeFilter;
+ roundRobinSelector: typeof roundRobinSelector;
+ randomSelector: typeof randomSelector;
+};
+
+export {
+ // Interfaces
+ ConnectionPoolOptions,
+ getConnectionOptions,
+ ApiKeyAuth,
+ BasicAuth,
+ internals,
+ resurrectOptions,
+ ResurrectEvent,
+ // Classes
+ BaseConnectionPool,
+ ConnectionPool,
+ CloudConnectionPool
+};
diff --git a/lib/pool/index.js b/lib/pool/index.js
new file mode 100644
index 0000000..59fc74b
--- /dev/null
+++ b/lib/pool/index.js
@@ -0,0 +1,30 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+'use strict'
+
+const BaseConnectionPool = require('./BaseConnectionPool')
+const ConnectionPool = require('./ConnectionPool')
+const CloudConnectionPool = require('./CloudConnectionPool')
+
+module.exports = {
+ BaseConnectionPool,
+ ConnectionPool,
+ CloudConnectionPool
+}
diff --git a/package.json b/package.json
new file mode 100644
index 0000000..bca0fe7
--- /dev/null
+++ b/package.json
@@ -0,0 +1,60 @@
+{
+ "name": "@elastic/transport",
+ "version": "1.0.0",
+ "description": "",
+ "main": "index.js",
+ "directories": {
+ "test": "test"
+ },
+ "scripts": {
+ "test": "npm run lint && tap test/{unit,acceptance}/{*,**/*}.test.js && npm run test:types",
+ "test:unit": "tap test/unit/{*,**/*}.test.js",
+ "test:acceptance": "tap test/acceptance/*.test.js",
+ "test:types": "tsd",
+ "test:coverage-100": "tap test/{unit,acceptance}/{*,**/*}.test.js --coverage --100",
+ "test:coverage-report": "tap test/{unit,acceptance}/{*,**/*}.test.js --coverage && nyc report --reporter=text-lcov > coverage.lcov",
+ "test:coverage-ui": "tap test/{unit,acceptance}/{*,**/*}.test.js --coverage --coverage-report=html",
+ "lint": "standard",
+ "lint:fix": "standard --fix",
+ "license-checker": "license-checker --production --onlyAllow='MIT;Apache-2.0;Apache1.1;ISC;BSD-3-Clause;BSD-2-Clause'"
+ },
+ "repository": {
+ "type": "git",
+ "url": "git+https://github.com/elastic/elastic-transport-js.git"
+ },
+ "keywords": [],
+ "author": {
+ "name": "Tomas Della Vedova",
+ "company": "Elastic BV"
+ },
+ "license": "Apache-2.0",
+ "bugs": {
+ "url": "https://github.com/elastic/elastic-transport-js/issues"
+ },
+ "homepage": "https://github.com/elastic/elastic-transport-js#readme",
+ "engines": {
+ "node": ">=10"
+ },
+ "tsd": {
+ "directory": "test/types"
+ },
+ "devDependencies": {
+ "@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1",
+ "@types/node": "^12.6.2",
+ "into-stream": "^5.1.1",
+ "license-checker": "^25.0.1",
+ "proxy": "^1.0.2",
+ "standard": "^13.0.2",
+ "stoppable": "^1.1.0",
+ "tap": "^14.4.1",
+ "tsd": "^0.13.1",
+ "workq": "^2.1.0"
+ },
+ "dependencies": {
+ "debug": "^4.1.1",
+ "hpagent": "^0.1.1",
+ "ms": "^2.1.1",
+ "pump": "^3.0.0",
+ "secure-json-parse": "^2.1.0"
+ }
+}