From 1607a0d3f78a6e60c44d62c5326dc36dedb972ea Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Mon, 5 Feb 2024 23:58:21 -0600 Subject: [PATCH] Fix hang in bulk helper semaphore when server responses are slower than flushInterval (#2027) * Set version to 8.10.1 * Add tests for bulk helper with various flush and server timeouts * Copy and empty bulkBody when flushBytes is reached Before it was waiting until after semaphore resolved, then sending with a reference to bulkBody. If flushInterval is reached after `await semaphore()` but before `send(bulkBody)`, onFlushTimeout is "stealing" bulkBody so that there is nothing left in bulkBody for the flushBytes block to send, causing an indefinite hang for a promise that does not resolve. * comment typo fixes --------- Co-authored-by: Quentin Pradet --- src/helpers.ts | 13 +-- test/unit/helpers/bulk.test.ts | 149 +++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+), 6 deletions(-) diff --git a/src/helpers.ts b/src/helpers.ts index efad8b49b..fbf4ff334 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -624,7 +624,7 @@ export default class Helpers { let chunkBytes = 0 timeoutRef = setTimeout(onFlushTimeout, flushInterval) // eslint-disable-line - // @ts-expect-error datasoruce is an iterable + // @ts-expect-error datasource is an iterable for await (const chunk of datasource) { if (shouldAbort) break timeoutRef.refresh() @@ -656,15 +656,16 @@ export default class Helpers { if (chunkBytes >= flushBytes) { stats.bytes += chunkBytes - const send = await semaphore() - send(bulkBody.slice()) + const bulkBodyCopy = bulkBody.slice() bulkBody.length = 0 chunkBytes = 0 + const send = await semaphore() + send(bulkBodyCopy) } } clearTimeout(timeoutRef) - // In some cases the previos http call does not have finished, + // In some cases the previous http call has not finished, // or we didn't reach the flush bytes threshold, so we force one last operation. if (!shouldAbort && chunkBytes > 0) { const send = await semaphore() @@ -708,8 +709,8 @@ export default class Helpers { // to guarantee that no more than the number of operations // allowed to run at the same time are executed. // It returns a semaphore function which resolves in the next tick - // if we didn't reach the maximim concurrency yet, otherwise it returns - // a promise that resolves as soon as one of the running request has finshed. + // if we didn't reach the maximum concurrency yet, otherwise it returns + // a promise that resolves as soon as one of the running requests has finished. // The semaphore function resolves a send function, which will be used // to send the actual bulk request. // It also returns a finish function, which returns a promise that is resolved diff --git a/test/unit/helpers/bulk.test.ts b/test/unit/helpers/bulk.test.ts index 2c3229ce9..62c297ebf 100644 --- a/test/unit/helpers/bulk.test.ts +++ b/test/unit/helpers/bulk.test.ts @@ -23,9 +23,11 @@ import { createReadStream } from 'fs' import * as http from 'http' import { join } from 'path' import split from 'split2' +import { Readable } from 'stream' import { test } from 'tap' import { Client, errors } from '../../../' import { buildServer, connection } from '../../utils' +const { sleep } = require('../../integration/helper') let clientVersion: string = require('../../../package.json').version // eslint-disable-line if (clientVersion.includes('-')) { @@ -1594,3 +1596,150 @@ test('Flush interval', t => { t.end() }) + +test(`flush timeout does not lock process when flushInterval is less than server timeout`, async t => { + const flushInterval = 500 + + async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ errors: false, items: [{}] })) + }, 1000) + } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ node: `http://localhost:${port}` }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + await sleep(flushInterval) + yield doc + } + } + + const result = await client.helpers.bulk({ + datasource: Readable.from(generator()), + flushBytes: 1, + flushInterval: flushInterval, + concurrency: 1, + onDocument (_) { + return { + index: { _index: 'test' } + } + }, + onDrop (_) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + + server.stop() +}) + +test(`flush timeout does not lock process when flushInterval is greater than server timeout`, async t => { + const flushInterval = 500 + + async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ errors: false, items: [{}] })) + }, 250) + } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ node: `http://localhost:${port}` }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + await sleep(flushInterval) + yield doc + } + } + + const result = await client.helpers.bulk({ + datasource: Readable.from(generator()), + flushBytes: 1, + flushInterval: flushInterval, + concurrency: 1, + onDocument (_) { + return { + index: { _index: 'test' } + } + }, + onDrop (_) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + + server.stop() +}) + +test(`flush timeout does not lock process when flushInterval is equal to server timeout`, async t => { + const flushInterval = 500 + + async function handler (req: http.IncomingMessage, res: http.ServerResponse) { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ errors: false, items: [{}] })) + }, flushInterval) + } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ node: `http://localhost:${port}` }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + await sleep(flushInterval) + yield doc + } + } + + const result = await client.helpers.bulk({ + datasource: Readable.from(generator()), + flushBytes: 1, + flushInterval: flushInterval, + concurrency: 1, + onDocument (_) { + return { + index: { _index: 'test' } + } + }, + onDrop (_) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + + server.stop() +})