From 4aa00e03e1fac5385387dfa7cfd6e811d5307028 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Tue, 2 Apr 2024 14:38:09 -0500 Subject: [PATCH] onSuccess function for bulk helper (#2199) * Bulk helper onSuccess callback For https://github.com/elastic/elasticsearch-js/issues/2090 Includes refactor of the tryBulk result processing code, to make iterating over bulk response data easier to understand. * Add onSuccess tests for each datasource type * Cleanup, additional comments * Add documentation for onSuccess callback * Update changelog * Drop link to 8.14 release notes. Page not yet published, breaking docs build. --- docs/changelog.asciidoc | 8 + docs/helpers.asciidoc | 11 + src/helpers.ts | 92 +++++-- test/unit/helpers/bulk.test.ts | 447 +++++++++++++++++++++++---------- 4 files changed, 407 insertions(+), 151 deletions(-) diff --git a/docs/changelog.asciidoc b/docs/changelog.asciidoc index 33b6128c9..dc918e32f 100644 --- a/docs/changelog.asciidoc +++ b/docs/changelog.asciidoc @@ -1,6 +1,14 @@ [[changelog-client]] == Release notes +[discrete] +=== 8.14.0 + +[discrete] +===== `onSuccess` callback added to bulk helper + +The bulk helper now supports an `onSuccess` callback that will be called for each successful operation. https://github.com/elastic/elasticsearch-js/pull/2199[#2199] + [discrete] === 8.13.0 diff --git a/docs/helpers.asciidoc b/docs/helpers.asciidoc index 4815ebc4a..2ecf8cd30 100644 --- a/docs/helpers.asciidoc +++ b/docs/helpers.asciidoc @@ -98,6 +98,17 @@ const b = client.helpers.bulk({ }) ---- +|`onSuccess` +a|A function that is called for each successful operation in the bulk request, which includes the result from Elasticsearch along with the original document that was sent, or `null` for delete operations. +[source,js] +---- +const b = client.helpers.bulk({ + onSuccess ({ result, document }) { + console.log(`SUCCESS: Document ${result.index._id} indexed to ${result.index._index}`) + } +}) +---- + |`flushBytes` a|The size of the bulk body in bytes to reach before to send it. Default of 5MB. + _Default:_ `5000000` diff --git a/src/helpers.ts b/src/helpers.ts index fbf4ff334..94d59b062 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -103,6 +103,24 @@ export interface OnDropDocument { retried: boolean } +type BulkResponseItem = Partial> + +export interface OnSuccessDocument { + result: BulkResponseItem + document?: TDocument +} + +interface ZippedResult { + result: BulkResponseItem + raw: { + action: string + document?: string + } + // this is a function so that deserialization is only done when needed + // to avoid a performance hit + document?: () => TDocument +} + export interface BulkHelperOptions extends T.BulkRequest { datasource: TDocument[] | Buffer | Readable | AsyncIterator onDocument: (doc: TDocument) => Action @@ -112,6 +130,7 @@ export interface BulkHelperOptions extends T.BulkRequest { retries?: number wait?: number onDrop?: (doc: OnDropDocument) => void + onSuccess?: (doc: OnSuccessDocument) => void refreshOnCompletion?: boolean | string } @@ -551,6 +570,9 @@ export default class Helpers { retries = this[kMaxRetries], wait = 5000, onDrop = noop, + // onSuccess does not default to noop, to avoid the performance hit + // of deserializing every document in the bulk request + onSuccess, refreshOnCompletion = false, ...bulkOptions } = options @@ -817,57 +839,93 @@ export default class Helpers { callback() } + /** + * Zips bulk response items (the action's result) with the original document body. + * The raw string version of action and document lines are also included. + */ + function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] { + const zipped = [] + let indexSlice = 0 + for (let i = 0, len = responseItems.length; i < len; i++) { + const result = responseItems[i] + const operation = Object.keys(result)[0] + let zipResult + + if (operation === 'delete') { + zipResult = { + result, + raw: { action: bulkBody[indexSlice] } + } + indexSlice += 1 + } else { + const document = bulkBody[indexSlice + 1] + zipResult = { + result, + raw: { action: bulkBody[indexSlice], document }, + // this is a function so that deserialization is only done when needed + // to avoid a performance hit + document: () => serializer.deserialize(document) + } + indexSlice += 2 + } + + zipped.push(zipResult as ZippedResult) + } + + return zipped + } + function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void { if (shouldAbort) return callback(null, []) client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta) .then(response => { const result = response.body + const results = zipBulkResults(result.items, bulkBody) + if (!result.errors) { stats.successful += result.items.length - for (const item of result.items) { - if (item.update?.result === 'noop') { + for (const item of results) { + const { result, document = noop } = item + if (result.update?.result === 'noop') { stats.noop++ } + if (onSuccess != null) onSuccess({ result, document: document() }) } return callback(null, []) } const retry = [] - const { items } = result - let indexSlice = 0 - for (let i = 0, len = items.length; i < len; i++) { - const action = items[i] - const operation = Object.keys(action)[0] + for (const item of results) { + const { result, raw, document = noop } = item + const operation = Object.keys(result)[0] // @ts-expect-error - const responseItem = action[operation as keyof T.BulkResponseItemContainer] + const responseItem = result[operation as keyof T.BulkResponseItemContainer] assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report') if (responseItem.status >= 400) { - // 429 is the only staus code where we might want to retry + // 429 is the only status code where we might want to retry // a document, because it was not an error in the document itself, - // but the ES node were handling too many operations. + // but the ES node was handling too many operations. if (responseItem.status === 429) { - retry.push(bulkBody[indexSlice]) + retry.push(raw.action) /* istanbul ignore next */ if (operation !== 'delete') { - retry.push(bulkBody[indexSlice + 1]) + retry.push(raw.document ?? '') } } else { onDrop({ status: responseItem.status, error: responseItem.error ?? null, - operation: serializer.deserialize(bulkBody[indexSlice]), + operation: serializer.deserialize(raw.action), // @ts-expect-error - document: operation !== 'delete' - ? serializer.deserialize(bulkBody[indexSlice + 1]) - : null, + document: document(), retried: isRetrying }) stats.failed += 1 } } else { stats.successful += 1 + if (onSuccess != null) onSuccess({ result, document: document() }) } - operation === 'delete' ? indexSlice += 1 : indexSlice += 2 } callback(null, retry) }) diff --git a/test/unit/helpers/bulk.test.ts b/test/unit/helpers/bulk.test.ts index 62c297ebf..0a15c3fc6 100644 --- a/test/unit/helpers/bulk.test.ts +++ b/test/unit/helpers/bulk.test.ts @@ -514,7 +514,7 @@ test('bulk index', t => { t.test('Server error', async t => { const MockConnection = connection.buildMockConnection({ - onRequest (params) { + onRequest (_params) { return { statusCode: 500, body: { somothing: 'went wrong' } @@ -530,12 +530,12 @@ test('bulk index', t => { datasource: dataset.slice(), flushBytes: 1, concurrency: 1, - onDocument (doc) { + onDocument (_doc) { return { index: { _index: 'test' } } }, - onDrop (doc) { + onDrop (_doc) { t.fail('This should never be called') } }) @@ -550,7 +550,7 @@ test('bulk index', t => { t.test('Server error (high flush size, to trigger the finish error)', async t => { const MockConnection = connection.buildMockConnection({ - onRequest (params) { + onRequest (_params) { return { statusCode: 500, body: { somothing: 'went wrong' } @@ -566,12 +566,12 @@ test('bulk index', t => { datasource: dataset.slice(), flushBytes: 5000000, concurrency: 1, - onDocument (doc) { + onDocument (_doc) { return { index: { _index: 'test' } } }, - onDrop (doc) { + onDrop (_doc) { t.fail('This should never be called') } }) @@ -625,12 +625,12 @@ test('bulk index', t => { flushBytes: 1, concurrency: 1, wait: 10, - onDocument (doc) { + onDocument (_doc) { return { index: { _index: 'test' } } }, - onDrop (doc) { + onDrop (_doc) { b.abort() } }) @@ -651,7 +651,7 @@ test('bulk index', t => { t.test('Invalid operation', t => { t.plan(2) const MockConnection = connection.buildMockConnection({ - onRequest (params) { + onRequest (_params) { return { body: { errors: false, items: [{}] } } } }) @@ -666,7 +666,7 @@ test('bulk index', t => { flushBytes: 1, concurrency: 1, // @ts-expect-error - onDocument (doc) { + onDocument (_doc) { return { foo: { _index: 'test' } } @@ -678,6 +678,43 @@ test('bulk index', t => { }) }) + t.test('should call onSuccess callback for each indexed document', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + // @ts-expect-error + let [action] = params.body.split('\n') + action = JSON.parse(action) + return { body: { errors: false, items: [action] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + let count = 0 + await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + return { + index: { _index: 'test' } + } + }, + onSuccess ({ result, document }) { + t.same(result, { index: { _index: 'test' }}) + t.same(document, dataset[count++]) + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + t.equal(count, 3) + t.end() + }) + t.end() }) @@ -731,6 +768,44 @@ test('bulk index', t => { }) }) + t.test('onSuccess is called for each indexed document', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + // @ts-expect-error + let [action] = params.body.split('\n') + action = JSON.parse(action) + return { body: { errors: false, items: [action] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const stream = createReadStream(join(__dirname, '..', '..', 'fixtures', 'small-dataset.ndjson'), 'utf8') + + let count = 0 + await client.helpers.bulk({ + datasource: stream.pipe(split()), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + return { + index: { _index: 'test' } + } + }, + onSuccess ({ result, document }) { + t.same(result, { index: { _index: 'test' }}) + t.same(document, dataset[count++]) + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + t.equal(count, 3) + t.end() + }) + t.end() }) @@ -785,6 +860,50 @@ test('bulk index', t => { aborted: false }) }) + + t.test('onSuccess is called for each indexed document', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + // @ts-expect-error + let [action] = params.body.split('\n') + action = JSON.parse(action) + return { body: { errors: false, items: [action] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + yield doc + } + } + + let count = 0 + await client.helpers.bulk({ + datasource: generator(), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + return { + index: { _index: 'test' } + } + }, + onSuccess ({ result, document }) { + t.same(result, { index: { _index: 'test' }}) + t.same(document, dataset[count++]) + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + t.equal(count, 3) + t.end() + }) t.end() }) @@ -943,6 +1062,8 @@ test('bulk create', t => { }) }) + + t.end() }) @@ -1279,6 +1400,63 @@ test('bulk delete', t => { server.stop() }) + t.test('should call onSuccess callback with delete action object', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + // @ts-expect-error + let [action, payload] = params.body.split('\n') + action = JSON.parse(action) + return { body: { errors: false, items: [action] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + let docCount = 0 + let successCount = 0 + await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (_doc) { + if (docCount++ === 1) { + return { + delete: { + _index: 'test', + _id: String(docCount) + } + } + } else { + return { + index: { _index: 'test' } + } + } + }, + onSuccess ({ result, document }) { + const item = dataset[successCount] + if (successCount++ === 1) { + t.same(result, { + delete: { + _index: 'test', + _id: String(successCount) + } + }) + } else { + t.same(result, { index: { _index: 'test' }}) + t.same(document, item) + } + }, + onDrop (_doc) { + t.fail('This should never be called') + } + }) + + t.end() + }) + t.end() }) @@ -1594,152 +1772,153 @@ test('Flush interval', t => { }) }) - t.end() -}) - -test(`flush timeout does not lock process when flushInterval is less than server timeout`, async t => { - const flushInterval = 500 + test(`flush timeout does not lock process when flushInterval is less than server timeout`, async t => { + const flushInterval = 500 - async function handler (req: http.IncomingMessage, res: http.ServerResponse) { - setTimeout(() => { - res.writeHead(200, { 'content-type': 'application/json' }) - res.end(JSON.stringify({ errors: false, items: [{}] })) - }, 1000) - } + async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ errors: false, items: [{}] })) + }, 1000) + } - const [{ port }, server] = await buildServer(handler) - const client = new Client({ node: `http://localhost:${port}` }) + const [{ port }, server] = await buildServer(handler) + const client = new Client({ node: `http://localhost:${port}` }) - async function * generator () { - const data = dataset.slice() - for (const doc of data) { - await sleep(flushInterval) - yield doc - } - } - - const result = await client.helpers.bulk({ - datasource: Readable.from(generator()), - flushBytes: 1, - flushInterval: flushInterval, - concurrency: 1, - onDocument (_) { - return { - index: { _index: 'test' } + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + await sleep(flushInterval) + yield doc } - }, - onDrop (_) { - t.fail('This should never be called') } - }) - t.type(result.time, 'number') - t.type(result.bytes, 'number') - t.match(result, { - total: 3, - successful: 3, - retry: 0, - failed: 0, - aborted: false - }) - - server.stop() -}) + const result = await client.helpers.bulk({ + datasource: Readable.from(generator()), + flushBytes: 1, + flushInterval: flushInterval, + concurrency: 1, + onDocument (_) { + return { + index: { _index: 'test' } + } + }, + onDrop (_) { + t.fail('This should never be called') + } + }) -test(`flush timeout does not lock process when flushInterval is greater than server timeout`, async t => { - const flushInterval = 500 + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) - async function handler (req: http.IncomingMessage, res: http.ServerResponse) { - setTimeout(() => { - res.writeHead(200, { 'content-type': 'application/json' }) - res.end(JSON.stringify({ errors: false, items: [{}] })) - }, 250) - } + server.stop() + }) - const [{ port }, server] = await buildServer(handler) - const client = new Client({ node: `http://localhost:${port}` }) + test(`flush timeout does not lock process when flushInterval is greater than server timeout`, async t => { + const flushInterval = 500 - async function * generator () { - const data = dataset.slice() - for (const doc of data) { - await sleep(flushInterval) - yield doc + async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ errors: false, items: [{}] })) + }, 250) } - } - - const result = await client.helpers.bulk({ - datasource: Readable.from(generator()), - flushBytes: 1, - flushInterval: flushInterval, - concurrency: 1, - onDocument (_) { - return { - index: { _index: 'test' } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ node: `http://localhost:${port}` }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + await sleep(flushInterval) + yield doc } - }, - onDrop (_) { - t.fail('This should never be called') } - }) - - t.type(result.time, 'number') - t.type(result.bytes, 'number') - t.match(result, { - total: 3, - successful: 3, - retry: 0, - failed: 0, - aborted: false - }) - server.stop() -}) + const result = await client.helpers.bulk({ + datasource: Readable.from(generator()), + flushBytes: 1, + flushInterval: flushInterval, + concurrency: 1, + onDocument (_) { + return { + index: { _index: 'test' } + } + }, + onDrop (_) { + t.fail('This should never be called') + } + }) -test(`flush timeout does not lock process when flushInterval is equal to server timeout`, async t => { - const flushInterval = 500 + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) - async function handler (req: http.IncomingMessage, res: http.ServerResponse) { - setTimeout(() => { - res.writeHead(200, { 'content-type': 'application/json' }) - res.end(JSON.stringify({ errors: false, items: [{}] })) - }, flushInterval) - } + server.stop() + }) - const [{ port }, server] = await buildServer(handler) - const client = new Client({ node: `http://localhost:${port}` }) + test(`flush timeout does not lock process when flushInterval is equal to server timeout`, async t => { + const flushInterval = 500 - async function * generator () { - const data = dataset.slice() - for (const doc of data) { - await sleep(flushInterval) - yield doc + async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ errors: false, items: [{}] })) + }, flushInterval) } - } - - const result = await client.helpers.bulk({ - datasource: Readable.from(generator()), - flushBytes: 1, - flushInterval: flushInterval, - concurrency: 1, - onDocument (_) { - return { - index: { _index: 'test' } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ node: `http://localhost:${port}` }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + await sleep(flushInterval) + yield doc } - }, - onDrop (_) { - t.fail('This should never be called') } - }) - t.type(result.time, 'number') - t.type(result.bytes, 'number') - t.match(result, { - total: 3, - successful: 3, - retry: 0, - failed: 0, - aborted: false + const result = await client.helpers.bulk({ + datasource: Readable.from(generator()), + flushBytes: 1, + flushInterval: flushInterval, + concurrency: 1, + onDocument (_) { + return { + index: { _index: 'test' } + } + }, + onDrop (_) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + + server.stop() }) - server.stop() + t.end() }) +