Skip to content

Commit

Permalink
onSuccess function for bulk helper (#2199)
Browse files Browse the repository at this point in the history
* Bulk helper onSuccess callback

For #2090

Includes refactor of the tryBulk result processing code, to make
iterating over bulk response data easier to understand.

* Add onSuccess tests for each datasource type

* Cleanup, additional comments

* Add documentation for onSuccess callback

* Update changelog

* Drop link to 8.14 release notes.

Page not yet published, breaking docs build.
  • Loading branch information
JoshMock authored Apr 2, 2024
1 parent e2974b0 commit 4aa00e0
Show file tree
Hide file tree
Showing 4 changed files with 407 additions and 151 deletions.
8 changes: 8 additions & 0 deletions docs/changelog.asciidoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
[[changelog-client]]
== Release notes

[discrete]
=== 8.14.0

[discrete]
===== `onSuccess` callback added to bulk helper

The bulk helper now supports an `onSuccess` callback that will be called for each successful operation. https://github.com/elastic/elasticsearch-js/pull/2199[#2199]

[discrete]
=== 8.13.0

Expand Down
11 changes: 11 additions & 0 deletions docs/helpers.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ const b = client.helpers.bulk({
})
----

|`onSuccess`
a|A function that is called for each successful operation in the bulk request, which includes the result from Elasticsearch along with the original document that was sent, or `null` for delete operations.
[source,js]
----
const b = client.helpers.bulk({
onSuccess ({ result, document }) {
console.log(`SUCCESS: Document ${result.index._id} indexed to ${result.index._index}`)
}
})
----

|`flushBytes`
a|The size of the bulk body in bytes to reach before to send it. Default of 5MB. +
_Default:_ `5000000`
Expand Down
92 changes: 75 additions & 17 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,24 @@ export interface OnDropDocument<TDocument = unknown> {
retried: boolean
}

type BulkResponseItem = Partial<Record<T.BulkOperationType, T.BulkResponseItem>>

export interface OnSuccessDocument<TDocument = unknown> {
result: BulkResponseItem
document?: TDocument
}

interface ZippedResult<TDocument = unknown> {
result: BulkResponseItem
raw: {
action: string
document?: string
}
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document?: () => TDocument
}

export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>
onDocument: (doc: TDocument) => Action
Expand All @@ -112,6 +130,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
retries?: number
wait?: number
onDrop?: (doc: OnDropDocument<TDocument>) => void
onSuccess?: (doc: OnSuccessDocument) => void
refreshOnCompletion?: boolean | string
}

Expand Down Expand Up @@ -551,6 +570,9 @@ export default class Helpers {
retries = this[kMaxRetries],
wait = 5000,
onDrop = noop,
// onSuccess does not default to noop, to avoid the performance hit
// of deserializing every document in the bulk request
onSuccess,
refreshOnCompletion = false,
...bulkOptions
} = options
Expand Down Expand Up @@ -817,57 +839,93 @@ export default class Helpers {
callback()
}

/**
* Zips bulk response items (the action's result) with the original document body.
* The raw string version of action and document lines are also included.
*/
function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] {
const zipped = []
let indexSlice = 0
for (let i = 0, len = responseItems.length; i < len; i++) {
const result = responseItems[i]
const operation = Object.keys(result)[0]
let zipResult

if (operation === 'delete') {
zipResult = {
result,
raw: { action: bulkBody[indexSlice] }
}
indexSlice += 1
} else {
const document = bulkBody[indexSlice + 1]
zipResult = {
result,
raw: { action: bulkBody[indexSlice], document },
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document: () => serializer.deserialize(document)
}
indexSlice += 2
}

zipped.push(zipResult as ZippedResult)
}

return zipped
}

function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void {
if (shouldAbort) return callback(null, [])
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta)
.then(response => {
const result = response.body
const results = zipBulkResults(result.items, bulkBody)

if (!result.errors) {
stats.successful += result.items.length
for (const item of result.items) {
if (item.update?.result === 'noop') {
for (const item of results) {
const { result, document = noop } = item
if (result.update?.result === 'noop') {
stats.noop++
}
if (onSuccess != null) onSuccess({ result, document: document() })
}
return callback(null, [])
}
const retry = []
const { items } = result
let indexSlice = 0
for (let i = 0, len = items.length; i < len; i++) {
const action = items[i]
const operation = Object.keys(action)[0]
for (const item of results) {
const { result, raw, document = noop } = item
const operation = Object.keys(result)[0]
// @ts-expect-error
const responseItem = action[operation as keyof T.BulkResponseItemContainer]
const responseItem = result[operation as keyof T.BulkResponseItemContainer]
assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report')

if (responseItem.status >= 400) {
// 429 is the only staus code where we might want to retry
// 429 is the only status code where we might want to retry
// a document, because it was not an error in the document itself,
// but the ES node were handling too many operations.
// but the ES node was handling too many operations.
if (responseItem.status === 429) {
retry.push(bulkBody[indexSlice])
retry.push(raw.action)
/* istanbul ignore next */
if (operation !== 'delete') {
retry.push(bulkBody[indexSlice + 1])
retry.push(raw.document ?? '')
}
} else {
onDrop({
status: responseItem.status,
error: responseItem.error ?? null,
operation: serializer.deserialize(bulkBody[indexSlice]),
operation: serializer.deserialize(raw.action),
// @ts-expect-error
document: operation !== 'delete'
? serializer.deserialize(bulkBody[indexSlice + 1])
: null,
document: document(),
retried: isRetrying
})
stats.failed += 1
}
} else {
stats.successful += 1
if (onSuccess != null) onSuccess({ result, document: document() })
}
operation === 'delete' ? indexSlice += 1 : indexSlice += 2
}
callback(null, retry)
})
Expand Down
Loading

0 comments on commit 4aa00e0

Please sign in to comment.