From a056aa117625686737dd619fe48aab0817851458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 13 Jan 2025 18:23:41 +0100 Subject: [PATCH] address more PR comments --- .../cli/src/__tests__/workflow-runner.test.ts | 2 +- .../execution-lifecycle-hooks-factory.test.ts | 64 +++++++++---------- .../execution-lifecycle-hooks-factory.ts | 24 +++---- .../executions/execution-recovery.service.ts | 2 +- packages/cli/src/scaling/job-processor.ts | 2 +- packages/cli/src/workflow-runner.ts | 12 ++-- packages/core/src/NodeExecuteFunctions.ts | 2 +- packages/core/src/TriggersAndPollers.ts | 8 ++- packages/core/src/WorkflowExecute.ts | 18 +++--- .../execution-lifecycle-hooks.test.ts | 24 ++++--- .../core/src/execution-lifecycle-hooks.ts | 41 ++++++------ .../node-execution-context/execute-context.ts | 2 +- .../supply-data-context.ts | 4 +- .../core/test/NodeExecuteFunctions.test.ts | 10 +-- packages/core/test/TriggersAndPollers.test.ts | 6 +- packages/core/test/WorkflowExecute.test.ts | 4 +- packages/core/test/helpers/index.ts | 4 +- packages/nodes-base/test/nodes/Helpers.ts | 4 +- 18 files changed, 117 insertions(+), 116 deletions(-) diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts index 2ea22f1e395ae..fed27d282f311 100644 --- a/packages/cli/src/__tests__/workflow-runner.test.ts +++ b/packages/cli/src/__tests__/workflow-runner.test.ts @@ -64,7 +64,7 @@ describe('processError', () => { workflow = await createWorkflow({}, owner); execution = await createExecution({ status: 'success', finished: true }, workflow); hooks = new core.ExecutionLifecycleHooks('webhook', execution.id, workflow); - hooks.addHook('workflowExecuteAfter', watcher.workflowExecuteAfter); + hooks.addCallback('workflowExecuteAfter', watcher.workflowExecuteAfter); }); test('processError should return early in Bull stalled edge case', async () => { diff --git a/packages/cli/src/execution-lifecycle-hooks/__tests__/execution-lifecycle-hooks-factory.test.ts b/packages/cli/src/execution-lifecycle-hooks/__tests__/execution-lifecycle-hooks-factory.test.ts index b949326d2cb75..86cc4b9a6972b 100644 --- a/packages/cli/src/execution-lifecycle-hooks/__tests__/execution-lifecycle-hooks-factory.test.ts +++ b/packages/cli/src/execution-lifecycle-hooks/__tests__/execution-lifecycle-hooks-factory.test.ts @@ -95,11 +95,11 @@ describe('ExecutionLifecycleHooksFactory', () => { const hooks = hooksFactory.forMainProcess(executionData, executionId); // Test workflowExecuteBefore hook - await hooks.executeHook('workflowExecuteBefore', []); + await hooks.runHook('workflowExecuteBefore', []); expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [undefined, 'manual']); // Test nodeExecuteBefore hook - await hooks.executeHook('nodeExecuteBefore', ['testNode']); + await hooks.runHook('nodeExecuteBefore', ['testNode']); expect(push.send).toHaveBeenCalledWith( { type: 'nodeExecuteBefore', @@ -113,7 +113,7 @@ describe('ExecutionLifecycleHooksFactory', () => { const taskData = mock({}); const runExecutionData: IRunExecutionData = { resultData: { runData: {} } }; - await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]); + await hooks.runHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]); expect(push.send).toHaveBeenCalledWith( { @@ -125,7 +125,7 @@ describe('ExecutionLifecycleHooksFactory', () => { push.send.mockClear(); // Test workflowExecuteAfter hook - await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]); expect(push.send).toHaveBeenCalledWith( { @@ -148,7 +148,7 @@ describe('ExecutionLifecycleHooksFactory', () => { it('should handle waiting status in workflowExecuteAfter', async () => { const hooks = hooksFactory.forMainProcess(executionData, executionId); - await hooks.executeHook('workflowExecuteAfter', [ + await hooks.runHook('workflowExecuteAfter', [ { ...fullRunData, status: 'waiting', @@ -167,7 +167,7 @@ describe('ExecutionLifecycleHooksFactory', () => { it('should not update for manual executions', async () => { const hooks = hooksFactory.forMainProcess(executionData, executionId); - await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]); expect(workflowStaticDataService.saveStaticDataById).not.toHaveBeenCalled(); }); @@ -181,7 +181,7 @@ describe('ExecutionLifecycleHooksFactory', () => { executionId, ); - await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]); expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith( workflowData.id, @@ -197,7 +197,7 @@ describe('ExecutionLifecycleHooksFactory', () => { // Mock workflow settings to not save manual executions workflowData.settings = { saveManualExecutions: false }; - await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]); expect(executionRepository.softDelete).toHaveBeenCalledWith(executionId); }); @@ -223,7 +223,7 @@ describe('ExecutionLifecycleHooksFactory', () => { .calledWith(executionId) .mockResolvedValue(fullExecutionData); - await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]); + await hooks.runHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]); expect(fullExecutionData.data.resultData.lastNodeExecuted).toBe('testNode'); expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith( @@ -262,7 +262,7 @@ describe('ExecutionLifecycleHooksFactory', () => { }, }; - await hooks.executeHook('workflowExecuteAfter', [failedRunData, newStaticData]); + await hooks.runHook('workflowExecuteAfter', [failedRunData, newStaticData]); // Verify error workflow execution expect(executeErrorWorkflowSpy).toHaveBeenCalledWith( @@ -288,7 +288,7 @@ describe('ExecutionLifecycleHooksFactory', () => { const error = new Error('Static data save failed'); workflowStaticDataService.saveStaticDataById.mockRejectedValueOnce(error); - await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]); expect(errorReporter.error).toHaveBeenCalledWith(error); }); @@ -299,7 +299,7 @@ describe('ExecutionLifecycleHooksFactory', () => { const error = new Error('DB save failed'); executionRepository.updateExistingExecution.mockRejectedValueOnce(error); - await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]); expect(errorReporter.error).toHaveBeenCalledWith(error); }); @@ -321,7 +321,7 @@ describe('ExecutionLifecycleHooksFactory', () => { }, }; - await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]); + await hooks.runHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]); expect(executionMetadataService.save).toHaveBeenCalledWith(executionId, { someMetadata: 'value', @@ -346,7 +346,7 @@ describe('ExecutionLifecycleHooksFactory', () => { }, }; - await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]); + await hooks.runHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]); expect(errorReporter.error).toHaveBeenCalledWith(error); }); @@ -380,7 +380,7 @@ describe('ExecutionLifecycleHooksFactory', () => { optionalParameters, ); - await hooks.executeHook('workflowExecuteBefore', []); + await hooks.runHook('workflowExecuteBefore', []); expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [undefined, 'manual']); }); @@ -396,7 +396,7 @@ describe('ExecutionLifecycleHooksFactory', () => { finished: false, }; - await hooks.executeHook('workflowExecuteAfter', [unfinishedRunData]); + await hooks.runHook('workflowExecuteAfter', [unfinishedRunData]); expect(executionRepository.hardDelete).not.toHaveBeenCalled(); }); @@ -415,7 +415,7 @@ describe('ExecutionLifecycleHooksFactory', () => { finished: true, }; - await hooks.executeHook('workflowExecuteAfter', [successRunData]); + await hooks.runHook('workflowExecuteAfter', [successRunData]); expect(executionRepository.hardDelete).toHaveBeenCalledWith({ workflowId, @@ -437,7 +437,7 @@ describe('ExecutionLifecycleHooksFactory', () => { finished: true, }; - await hooks.executeHook('workflowExecuteAfter', [errorRunData]); + await hooks.runHook('workflowExecuteAfter', [errorRunData]); expect(executionRepository.hardDelete).toHaveBeenCalledWith({ workflowId, @@ -459,7 +459,7 @@ describe('ExecutionLifecycleHooksFactory', () => { finished: true, }; - await hooks.executeHook('workflowExecuteAfter', [successRunData]); + await hooks.runHook('workflowExecuteAfter', [successRunData]); expect(executionRepository.hardDelete).not.toHaveBeenCalled(); }); @@ -477,7 +477,7 @@ describe('ExecutionLifecycleHooksFactory', () => { finished: true, }; - await hooks.executeHook('workflowExecuteAfter', [runData]); + await hooks.runHook('workflowExecuteAfter', [runData]); expect(runData.status).toBe('success'); }, @@ -513,7 +513,7 @@ describe('ExecutionLifecycleHooksFactory', () => { optionalParameters, ); - await hooks.executeHook('workflowExecuteBefore', []); + await hooks.runHook('workflowExecuteBefore', []); expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [ undefined, @@ -547,7 +547,7 @@ describe('ExecutionLifecycleHooksFactory', () => { .calledWith(executionId) .mockResolvedValue(fullExecutionData); - await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]); + await hooks.runHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]); expect(fullExecutionData.data.resultData.lastNodeExecuted).toBe('testNode'); expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith( @@ -566,14 +566,14 @@ describe('ExecutionLifecycleHooksFactory', () => { optionalParameters, ); - await hooks.executeHook('nodeExecuteBefore', ['testNode']); + await hooks.runHook('nodeExecuteBefore', ['testNode']); expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', { executionId, workflow: workflowData, nodeName: 'testNode', }); - await hooks.executeHook('nodeExecuteAfter', ['testNode']); + await hooks.runHook('nodeExecuteAfter', ['testNode']); expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', { executionId, workflow: workflowData, @@ -589,7 +589,7 @@ describe('ExecutionLifecycleHooksFactory', () => { optionalParameters, ); - await hooks.executeHook('workflowExecuteBefore', []); + await hooks.runHook('workflowExecuteBefore', []); expect(eventService.emit).toHaveBeenCalledWith('workflow-pre-execute', { executionId, data: workflowData, @@ -605,7 +605,7 @@ describe('ExecutionLifecycleHooksFactory', () => { ); const testNode = { name: 'Test Node' }; - await hooks.executeHook('nodeFetchedData', [workflowId, testNode]); + await hooks.runHook('nodeFetchedData', [workflowId, testNode]); expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', { workflowId, node: testNode, @@ -630,7 +630,7 @@ describe('ExecutionLifecycleHooksFactory', () => { finished: true, }; - await hooks.executeHook('workflowExecuteAfter', [successRunData, {}]); + await hooks.runHook('workflowExecuteAfter', [successRunData, {}]); expect(executionRepository.updateExistingExecution).toHaveBeenCalled(); expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', { @@ -661,7 +661,7 @@ describe('ExecutionLifecycleHooksFactory', () => { finished: true, }; - await hooks.executeHook('workflowExecuteAfter', [errorRunData, {}]); + await hooks.runHook('workflowExecuteAfter', [errorRunData, {}]); expect(executionRepository.updateExistingExecution).toHaveBeenCalled(); expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', { @@ -693,7 +693,7 @@ describe('ExecutionLifecycleHooksFactory', () => { finished: true, }; - await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, {}]); + await hooks.runHook('workflowExecuteAfter', [runDataWithMetadata, {}]); expect(executionMetadataService.save).toHaveBeenCalledWith(executionId, { parameter: 'test', @@ -709,7 +709,7 @@ describe('ExecutionLifecycleHooksFactory', () => { ); const newStaticData = { newKey: 'newValue' }; - await hooks.executeHook('workflowExecuteAfter', [ + await hooks.runHook('workflowExecuteAfter', [ { data: { resultData: { runData: {} } }, mode: 'webhook', @@ -739,7 +739,7 @@ describe('ExecutionLifecycleHooksFactory', () => { const error = new Error('Save failed'); executionRepository.updateExistingExecution.mockRejectedValueOnce(error); - await hooks.executeHook('workflowExecuteAfter', [ + await hooks.runHook('workflowExecuteAfter', [ { data: { resultData: { runData: {} } }, mode: 'manual', @@ -777,7 +777,7 @@ describe('ExecutionLifecycleHooksFactory', () => { finished: true, }; - await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, {}]); + await hooks.runHook('workflowExecuteAfter', [runDataWithMetadata, {}]); expect(errorReporter.error).toHaveBeenCalledWith(error); }); diff --git a/packages/cli/src/execution-lifecycle-hooks/execution-lifecycle-hooks-factory.ts b/packages/cli/src/execution-lifecycle-hooks/execution-lifecycle-hooks-factory.ts index e97ec7319fccc..0b17b454ad899 100644 --- a/packages/cli/src/execution-lifecycle-hooks/execution-lifecycle-hooks-factory.ts +++ b/packages/cli/src/execution-lifecycle-hooks/execution-lifecycle-hooks-factory.ts @@ -80,7 +80,7 @@ export class ExecutionLifecycleHooksFactory { const { executionRepository } = this; // TODO: >>> clear all nodeExecuteAfter hooks <<< // hookFunctions.nodeExecuteAfter = []; - hooks.addHook('workflowExecuteAfter', async function (fullRunData) { + hooks.addCallback('workflowExecuteAfter', async function (fullRunData) { // Don't delete executions before they are finished if (!fullRunData.finished) return; @@ -124,12 +124,12 @@ export class ExecutionLifecycleHooksFactory { private addPreExecuteHooks(hooks: ExecutionLifecycleHooks) { const { externalHooks } = this; - hooks.addHook('workflowExecuteBefore', async function (workflow) { + hooks.addCallback('workflowExecuteBefore', async function (workflow) { await externalHooks.run('workflow.preExecute', [workflow, this.mode]); }); // TODO: skip this if saveSettings.progress is not true - hooks.addHook('nodeExecuteAfter', async function (nodeName, data, executionData) { + hooks.addCallback('nodeExecuteAfter', async function (nodeName, data, executionData) { await saveExecutionProgress( this.workflowData, this.executionId, @@ -143,22 +143,22 @@ export class ExecutionLifecycleHooksFactory { private addEventHooks(hooks: ExecutionLifecycleHooks) { const { eventService, workflowStatisticsService } = this; - hooks.addHook('nodeExecuteBefore', async function (nodeName) { + hooks.addCallback('nodeExecuteBefore', async function (nodeName) { const { executionId, workflowData: workflow } = this; eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); }); - hooks.addHook('nodeExecuteAfter', async function (nodeName) { + hooks.addCallback('nodeExecuteAfter', async function (nodeName) { const { executionId, workflowData: workflow } = this; eventService.emit('node-post-execute', { executionId, workflow, nodeName }); }); - hooks.addHook('workflowExecuteBefore', async function () { + hooks.addCallback('workflowExecuteBefore', async function () { const { executionId, workflowData } = this; eventService.emit('workflow-pre-execute', { executionId, data: workflowData }); }); - hooks.addHook('nodeFetchedData', async (workflowId, node) => { + hooks.addCallback('nodeFetchedData', async (workflowId, node) => { workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); }); } @@ -176,7 +176,7 @@ export class ExecutionLifecycleHooksFactory { const factory = this; // eslint-disable-next-line complexity - hooks.addHook('workflowExecuteAfter', async function (fullRunData, newStaticData) { + hooks.addCallback('workflowExecuteAfter', async function (fullRunData, newStaticData) { logger.debug('Executing hook (hookFunctionsSave)', { executionId: this.executionId, workflowId: this.workflowData.id, @@ -303,7 +303,7 @@ export class ExecutionLifecycleHooksFactory { private addPushHooks(hooks: ExecutionLifecycleHooks) { const { logger, push } = this; - hooks.addHook('nodeExecuteBefore', async function (nodeName) { + hooks.addCallback('nodeExecuteBefore', async function (nodeName) { const { pushRef, executionId } = this; // Push data to session which started workflow before each // node which starts rendering @@ -320,7 +320,7 @@ export class ExecutionLifecycleHooksFactory { push.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef); }); - hooks.addHook('nodeExecuteAfter', async function (nodeName, data) { + hooks.addCallback('nodeExecuteAfter', async function (nodeName, data) { const { pushRef, executionId } = this; // Push data to session which started workflow after each rendered node if (pushRef === undefined) { @@ -336,7 +336,7 @@ export class ExecutionLifecycleHooksFactory { push.send({ type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, pushRef); }); - hooks.addHook('workflowExecuteBefore', async function (_workflow, data) { + hooks.addCallback('workflowExecuteBefore', async function (_workflow, data) { const { pushRef, executionId } = this; const { id: workflowId, name: workflowName } = this.workflowData; logger.debug('Executing hook (hookFunctionsPush)', { @@ -367,7 +367,7 @@ export class ExecutionLifecycleHooksFactory { ); }); - hooks.addHook('workflowExecuteAfter', async function (fullRunData) { + hooks.addCallback('workflowExecuteAfter', async function (fullRunData) { const { pushRef, executionId } = this; if (pushRef === undefined) return; diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index e7a097d37f79c..06c733bc62f35 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -205,6 +205,6 @@ export class ExecutionRecoveryService { status: execution.status, }; - await lifecycleHooks.executeHook('workflowExecuteAfter', [run]); + await lifecycleHooks.runHook('workflowExecuteAfter', [run]); } } diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 9ae54055e3bca..d92a6eee28d4a 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -138,7 +138,7 @@ export class JobProcessor { additionalData.sendDataToUI = WorkflowExecuteAdditionalData.sendDataToUI.bind({ pushRef }); } - additionalData.hooks.addHook('sendResponse', async (response) => { + additionalData.hooks.addCallback('sendResponse', async (response) => { const msg: RespondToWebhookMessage = { kind: 'respond-to-webhook', executionId, diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 9af554cc17e75..0e1533977398f 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -122,7 +122,7 @@ export class WorkflowRunner { // set the execution to failed. this.activeExecutions.finalizeExecution(executionId, fullRunData); - await hooks?.executeHook('workflowExecuteAfter', [fullRunData]); + await hooks?.runHook('workflowExecuteAfter', [fullRunData]); } /** Run the workflow @@ -146,8 +146,8 @@ export class WorkflowRunner { // Create a failed execution with the data for the node, save it and abort execution const runData = generateFailedExecutionFromError(data.executionMode, error, error.node); const hooks = executionLifecycleHooksFactory.forMainProcess(data, executionId); - await hooks.executeHook('workflowExecuteBefore', [undefined, data.executionData]); - await hooks.executeHook('workflowExecuteAfter', [runData]); + await hooks.runHook('workflowExecuteBefore', [undefined, data.executionData]); + await hooks.runHook('workflowExecuteAfter', [runData]); responsePromise?.reject(error); this.activeExecutions.finalizeExecution(executionId); return executionId; @@ -274,7 +274,7 @@ export class WorkflowRunner { const executionLifecycleHooksFactory = Container.get(ExecutionLifecycleHooksFactory); additionalData.hooks = executionLifecycleHooksFactory.forMainProcess(data, executionId); - additionalData.hooks.addHook('sendResponse', async (response) => { + additionalData.hooks.addCallback('sendResponse', async (response) => { this.activeExecutions.resolveResponsePromise(executionId, response); }); @@ -381,7 +381,7 @@ export class WorkflowRunner { // Normally also workflow should be supplied here but as it only used for sending // data to editor-UI is not needed. - await lifecycleHooks.executeHook('workflowExecuteBefore', [undefined, data.executionData]); + await lifecycleHooks.runHook('workflowExecuteBefore', [undefined, data.executionData]); } catch (error) { // We use "getWorkflowHooksWorkerExecuter" as "getLifecycleHooksForWorkerMain" does not contain the // "workflowExecuteAfter" which we require. @@ -453,7 +453,7 @@ export class WorkflowRunner { // Normally also static data should be supplied here but as it only used for sending // data to editor-UI is not needed. - await lifecycleHooks.executeHook('workflowExecuteAfter', [runData]); + await lifecycleHooks.runHook('workflowExecuteAfter', [runData]); resolve(runData); }, diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 30c1213d7eca1..2ff5b9269a93f 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -773,7 +773,7 @@ export async function proxyRequestToAxios( } else if (body === '') { body = axiosConfig.responseType === 'arraybuffer' ? Buffer.alloc(0) : undefined; } - await additionalData?.hooks?.executeHook('nodeFetchedData', [workflow?.id, node]); + await additionalData?.hooks?.runHook('nodeFetchedData', [workflow?.id, node]); return configObject.resolveWithFullResponse ? { body, diff --git a/packages/core/src/TriggersAndPollers.ts b/packages/core/src/TriggersAndPollers.ts index a55e241904902..e9e09a9b1ca29 100644 --- a/packages/core/src/TriggersAndPollers.ts +++ b/packages/core/src/TriggersAndPollers.ts @@ -55,10 +55,12 @@ export class TriggersAndPollers { donePromise?: IDeferredPromise, ) => { if (responsePromise) { - hooks.addHook('sendResponse', async (response) => responsePromise.resolve(response)); + hooks.addCallback('sendResponse', async (response) => + responsePromise.resolve(response), + ); } if (donePromise) { - hooks.addHook('workflowExecuteAfter', async (runData) => + hooks.addCallback('workflowExecuteAfter', async (runData) => donePromise.resolve(runData), ); } @@ -70,7 +72,7 @@ export class TriggersAndPollers { (rejectEmit) => (error: Error, responsePromise?: IDeferredPromise) => { if (responsePromise) { - hooks.addHook('sendResponse', async () => responsePromise.reject(error)); + hooks.addCallback('sendResponse', async () => responsePromise.reject(error)); } rejectEmit(error); diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index c99b837744fd9..80a57dd9c920b 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -1270,14 +1270,14 @@ export class WorkflowExecute { this.status = 'canceled'; this.abortController.abort(); const fullRunData = this.getFullRunData(startedAt); - void this.additionalData.hooks?.executeHook('workflowExecuteAfter', [fullRunData]); + void this.additionalData.hooks?.runHook('workflowExecuteAfter', [fullRunData]); }); // eslint-disable-next-line complexity const returnPromise = (async () => { try { if (!this.additionalData.restartExecutionId) { - await this.additionalData.hooks?.executeHook('workflowExecuteBefore', [ + await this.additionalData.hooks?.runHook('workflowExecuteBefore', [ workflow, this.runExecutionData, ]); @@ -1364,7 +1364,7 @@ export class WorkflowExecute { node: executionNode.name, workflowId: workflow.id, }); - await this.additionalData.hooks?.executeHook('nodeExecuteBefore', [executionNode.name]); + await this.additionalData.hooks?.runHook('nodeExecuteBefore', [executionNode.name]); // Get the index of the current run runIndex = 0; @@ -1655,7 +1655,7 @@ export class WorkflowExecute { this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); // Only execute the nodeExecuteAfter hook if the node did not get aborted if (!this.isCancelled) { - await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [ + await this.additionalData.hooks?.runHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -1697,7 +1697,7 @@ export class WorkflowExecute { this.runExecutionData.resultData.runData[executionNode.name].push(taskData); if (this.runExecutionData.waitTill) { - await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [ + await this.additionalData.hooks?.runHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -1716,7 +1716,7 @@ export class WorkflowExecute { ) { // Before stopping, make sure we are executing hooks so // That frontend is notified for example for manual executions. - await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [ + await this.additionalData.hooks?.runHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -1826,7 +1826,7 @@ export class WorkflowExecute { // Execute hooks now to make sure that all hooks are executed properly // Await is needed to make sure that we don't fall into concurrency problems // When saving node execution data - await this.additionalData.hooks?.executeHook('nodeExecuteAfter', [ + await this.additionalData.hooks?.runHook('nodeExecuteAfter', [ executionNode.name, taskData, this.runExecutionData, @@ -2030,7 +2030,7 @@ export class WorkflowExecute { this.moveNodeMetadata(); await this.additionalData.hooks - ?.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]) + ?.runHook('workflowExecuteAfter', [fullRunData, newStaticData]) .catch( // eslint-disable-next-line @typescript-eslint/no-shadow (error) => { @@ -2124,7 +2124,7 @@ export class WorkflowExecute { this.moveNodeMetadata(); // Prevent from running the hook if the error is an abort error as it was already handled if (!this.isCancelled) { - await this.additionalData.hooks?.executeHook('workflowExecuteAfter', [ + await this.additionalData.hooks?.runHook('workflowExecuteAfter', [ fullRunData, newStaticData, ]); diff --git a/packages/core/src/__tests__/execution-lifecycle-hooks.test.ts b/packages/core/src/__tests__/execution-lifecycle-hooks.test.ts index e4b0c0c97c801..75d54e98657d8 100644 --- a/packages/core/src/__tests__/execution-lifecycle-hooks.test.ts +++ b/packages/core/src/__tests__/execution-lifecycle-hooks.test.ts @@ -10,7 +10,7 @@ import type { Workflow, } from 'n8n-workflow'; -import type { ExecutionLifecycleHookName, RegisteredHooks } from '@/execution-lifecycle-hooks'; +import type { ExecutionLifecycleHookName, Callbacks } from '@/execution-lifecycle-hooks'; import { ExecutionLifecycleHooks } from '@/execution-lifecycle-hooks'; describe('ExecutionLifecycleHooks', () => { @@ -36,7 +36,7 @@ describe('ExecutionLifecycleHooks', () => { expect(hooks.pushRef).toBe(pushRef); expect(hooks.retryOf).toBe(retryOf); // @ts-expect-error private property - expect(hooks.registered).toEqual({ + expect(hooks.callbacks).toEqual({ nodeExecuteAfter: [], nodeExecuteBefore: [], nodeFetchedData: [], @@ -50,7 +50,7 @@ describe('ExecutionLifecycleHooks', () => { describe('addHook()', () => { const hooksHandler = mock<{ - [K in keyof RegisteredHooks]: RegisteredHooks[K][number]; + [K in keyof Callbacks]: Callbacks[K][number]; }>(); const testCases: Array<{ hook: ExecutionLifecycleHookName; args: unknown[] }> = [ @@ -66,8 +66,8 @@ describe('ExecutionLifecycleHooks', () => { ]; test.each(testCases)('should add and process $hook hooks', async ({ hook, args }) => { - hooks.addHook(hook, hooksHandler[hook]); - await hooks.executeHook(hook, args); + hooks.addCallback(hook, hooksHandler[hook]); + await hooks.runHook(hook, args); expect(hooksHandler[hook]).toHaveBeenCalledWith(...args); }); }); @@ -82,8 +82,8 @@ describe('ExecutionLifecycleHooks', () => { executionOrder.push('hook2'); }); - hooks.addHook('nodeExecuteBefore', hook1, hook2); - await hooks.executeHook('nodeExecuteBefore', ['testNode']); + hooks.addCallback('nodeExecuteBefore', hook1, hook2); + await hooks.runHook('nodeExecuteBefore', ['testNode']); expect(executionOrder).toEqual(['hook1', 'hook2']); expect(hook1).toHaveBeenCalled(); @@ -96,19 +96,17 @@ describe('ExecutionLifecycleHooks', () => { expect(this.mode).toBe('internal'); }); - hooks.addHook('nodeExecuteBefore', hook); - await hooks.executeHook('nodeExecuteBefore', ['testNode']); + hooks.addCallback('nodeExecuteBefore', hook); + await hooks.runHook('nodeExecuteBefore', ['testNode']); expect(hook).toHaveBeenCalled(); }); it('should handle errors in hooks', async () => { const errorHook = jest.fn().mockRejectedValue(new Error('Hook failed')); - hooks.addHook('nodeExecuteBefore', errorHook); + hooks.addCallback('nodeExecuteBefore', errorHook); - await expect(hooks.executeHook('nodeExecuteBefore', ['testNode'])).rejects.toThrow( - 'Hook failed', - ); + await expect(hooks.runHook('nodeExecuteBefore', ['testNode'])).rejects.toThrow('Hook failed'); }); }); }); diff --git a/packages/core/src/execution-lifecycle-hooks.ts b/packages/core/src/execution-lifecycle-hooks.ts index 186708b86260a..ebf30fd20f88f 100644 --- a/packages/core/src/execution-lifecycle-hooks.ts +++ b/packages/core/src/execution-lifecycle-hooks.ts @@ -10,7 +10,7 @@ import type { WorkflowExecuteMode, } from 'n8n-workflow'; -export interface RegisteredHooks { +export type Callbacks = { nodeExecuteBefore: Array<(this: ExecutionLifecycleHooks, nodeName: string) => Promise>; nodeExecuteAfter: Array< @@ -35,13 +35,13 @@ export interface RegisteredHooks { (this: ExecutionLifecycleHooks, response: IExecuteResponsePromiseData) => Promise >; - /** Executed when a node fetches data */ + /** Executed after a node fetches data */ nodeFetchedData: Array< (this: ExecutionLifecycleHooks, workflowId: string, node: INode) => Promise >; -} +}; -export type ExecutionLifecycleHookName = keyof RegisteredHooks; +export type ExecutionLifecycleHookName = keyof Callbacks; export interface ExecutionHooksOptionalParameters { retryOf?: string; @@ -49,8 +49,7 @@ export interface ExecutionHooksOptionalParameters { } /** - * This class serves as a container for execution lifecycle hooks that get triggered during different stages of an execution. - * It manages and executes callback functions registered for specific execution events. + * Contains hooks that trigger at specific events in an execution's lifecycle. Every hook has an array of callbacks to run. * * Common use cases include: * - Saving execution progress to database @@ -63,16 +62,18 @@ export interface ExecutionHooksOptionalParameters { * ```typescript * const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData); * hooks.add('workflowExecuteAfter, async function(fullRunData) { - * await saveToDatabase(this.executionId, fullRunData); + * await saveToDatabase(executionId, fullRunData); *}); * ``` */ export class ExecutionLifecycleHooks { + /** Session ID of the client that started the execution. */ pushRef?: string; + /** Execution ID of a precious execution, if this is a retry of that execution. */ retryOf?: string; - private readonly registered: RegisteredHooks = { + private readonly callbacks: Callbacks = { nodeExecuteAfter: [], nodeExecuteBefore: [], nodeFetchedData: [], @@ -92,11 +93,19 @@ export class ExecutionLifecycleHooks { this.retryOf = optionalParameters.retryOf ?? undefined; } - async executeHook< - Hook extends keyof RegisteredHooks, - Params extends unknown[] = Parameters[number]>, + addCallback( + hookName: Hook, + ...hookFunctions: Array + ): void { + // @ts-expect-error FIX THIS + this.callbacks[hookName].push(...hookFunctions); + } + + async runHook< + Hook extends keyof Callbacks, + Params extends unknown[] = Parameters[number]>, >(hookName: Hook, parameters: Params) { - const hooks = this.registered[hookName]; + const hooks = this.callbacks[hookName]; for (const hookFunction of hooks) { const typedHookFunction = hookFunction as unknown as ( this: ExecutionLifecycleHooks, @@ -105,12 +114,4 @@ export class ExecutionLifecycleHooks { await typedHookFunction.apply(this, parameters); } } - - addHook( - hookName: Hook, - ...hookFunctions: Array - ): void { - // @ts-expect-error FIX THIS - this.registered[hookName].push(...hookFunctions); - } } diff --git a/packages/core/src/node-execution-context/execute-context.ts b/packages/core/src/node-execution-context/execute-context.ts index 7390e4e52f24a..13de1ecd8d0bd 100644 --- a/packages/core/src/node-execution-context/execute-context.ts +++ b/packages/core/src/node-execution-context/execute-context.ts @@ -192,7 +192,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti } async sendResponse(response: IExecuteResponsePromiseData): Promise { - await this.additionalData.hooks?.executeHook('sendResponse', [response]); + await this.additionalData.hooks?.runHook('sendResponse', [response]); } /** @deprecated use ISupplyDataFunctions.addInputData */ diff --git a/packages/core/src/node-execution-context/supply-data-context.ts b/packages/core/src/node-execution-context/supply-data-context.ts index d92b788a2c9cc..1fe16761a6d94 100644 --- a/packages/core/src/node-execution-context/supply-data-context.ts +++ b/packages/core/src/node-execution-context/supply-data-context.ts @@ -256,12 +256,12 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData } runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData; - await additionalData.hooks?.executeHook('nodeExecuteBefore', [nodeName]); + await additionalData.hooks?.runHook('nodeExecuteBefore', [nodeName]); } else { // Outputs taskData.executionTime = new Date().getTime() - taskData.startTime; - await additionalData.hooks?.executeHook('nodeExecuteAfter', [ + await additionalData.hooks?.runHook('nodeExecuteAfter', [ nodeName, taskData, this.runExecutionData, diff --git a/packages/core/test/NodeExecuteFunctions.test.ts b/packages/core/test/NodeExecuteFunctions.test.ts index af1ef8084cdd9..1c12610328f81 100644 --- a/packages/core/test/NodeExecuteFunctions.test.ts +++ b/packages/core/test/NodeExecuteFunctions.test.ts @@ -406,7 +406,7 @@ describe('NodeExecuteFunctions', () => { const node = mock(); beforeEach(() => { - hooks.executeHook.mockClear(); + hooks.runHook.mockClear(); }); test('should rethrow an error with `status` property', async () => { @@ -422,7 +422,7 @@ describe('NodeExecuteFunctions', () => { test('should not throw if the response status is 200', async () => { nock(baseUrl).get('/test').reply(200); await proxyRequestToAxios(workflow, additionalData, node, `${baseUrl}/test`); - expect(hooks.executeHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); + expect(hooks.runHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); }); test('should throw if the response status is 403', async () => { @@ -442,7 +442,7 @@ describe('NodeExecuteFunctions', () => { expect(error.config).toBeUndefined(); expect(error.message).toEqual('403 - "Forbidden"'); } - expect(hooks.executeHook).not.toHaveBeenCalled(); + expect(hooks.runHook).not.toHaveBeenCalled(); }); test('should not throw if the response status is 404, but `simple` option is set to `false`', async () => { @@ -453,7 +453,7 @@ describe('NodeExecuteFunctions', () => { }); expect(response).toEqual('Not Found'); - expect(hooks.executeHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); + expect(hooks.runHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); }); test('should return full response when `resolveWithFullResponse` is set to true', async () => { @@ -470,7 +470,7 @@ describe('NodeExecuteFunctions', () => { statusCode: 404, statusMessage: null, }); - expect(hooks.executeHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); + expect(hooks.runHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]); }); describe('redirects', () => { diff --git a/packages/core/test/TriggersAndPollers.test.ts b/packages/core/test/TriggersAndPollers.test.ts index daf7e19c77e8f..6e399147e404f 100644 --- a/packages/core/test/TriggersAndPollers.test.ts +++ b/packages/core/test/TriggersAndPollers.test.ts @@ -91,7 +91,7 @@ describe('TriggersAndPollers', () => { getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise); - await hooks.executeHook('sendResponse', [{ testResponse: true }]); + await hooks.runHook('sendResponse', [{ testResponse: true }]); expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true }); }); @@ -103,10 +103,10 @@ describe('TriggersAndPollers', () => { await runTriggerHelper('manual'); getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise, donePromise); - await hooks.executeHook('sendResponse', [{ testResponse: true }]); + await hooks.runHook('sendResponse', [{ testResponse: true }]); expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true }); - await hooks.executeHook('workflowExecuteAfter', [mockRunData, {}]); + await hooks.runHook('workflowExecuteAfter', [mockRunData, {}]); expect(donePromise.resolve).toHaveBeenCalledWith(mockRunData); }); }); diff --git a/packages/core/test/WorkflowExecute.test.ts b/packages/core/test/WorkflowExecute.test.ts index daef555cdf470..17ad834288fe2 100644 --- a/packages/core/test/WorkflowExecute.test.ts +++ b/packages/core/test/WorkflowExecute.test.ts @@ -1226,7 +1226,7 @@ describe('WorkflowExecute', () => { additionalData.hooks = mock(); workflowExecute = new WorkflowExecute(additionalData, 'manual', runExecutionData); - jest.spyOn(additionalData.hooks, 'executeHook').mockResolvedValue(undefined); + jest.spyOn(additionalData.hooks, 'runHook').mockResolvedValue(undefined); jest.spyOn(workflowExecute, 'moveNodeMetadata').mockImplementation(); }); @@ -1294,7 +1294,7 @@ describe('WorkflowExecute', () => { // Verify static data handling expect(result).toBeDefined(); expect(workflowExecute.moveNodeMetadata).toHaveBeenCalled(); - expect(additionalData.hooks!.executeHook).toHaveBeenCalledWith('workflowExecuteAfter', [ + expect(additionalData.hooks!.runHook).toHaveBeenCalledWith('workflowExecuteAfter', [ result, workflow.staticData, ]); diff --git a/packages/core/test/helpers/index.ts b/packages/core/test/helpers/index.ts index 9d36b51f0569a..0b26731319581 100644 --- a/packages/core/test/helpers/index.ts +++ b/packages/core/test/helpers/index.ts @@ -54,10 +54,10 @@ export function WorkflowExecuteAdditionalData( nodeExecutionOrder: string[], ): IWorkflowExecuteAdditionalData { const hooks = new ExecutionLifecycleHooks('trigger', '1', mock()); - hooks.addHook('nodeExecuteAfter', async (nodeName): Promise => { + hooks.addCallback('nodeExecuteAfter', async (nodeName): Promise => { nodeExecutionOrder.push(nodeName); }); - hooks.addHook('workflowExecuteAfter', async (fullRunData): Promise => { + hooks.addCallback('workflowExecuteAfter', async (fullRunData): Promise => { waitPromise.resolve(fullRunData); }); return mock({ hooks }); diff --git a/packages/nodes-base/test/nodes/Helpers.ts b/packages/nodes-base/test/nodes/Helpers.ts index 2f3159c6ff207..21e4f8d255773 100644 --- a/packages/nodes-base/test/nodes/Helpers.ts +++ b/packages/nodes-base/test/nodes/Helpers.ts @@ -157,10 +157,10 @@ export function WorkflowExecuteAdditionalData( nodeExecutionOrder: string[], ): IWorkflowExecuteAdditionalData { const hooks = new ExecutionLifecycleHooks('trigger', '1', mock()); - hooks.addHook('nodeExecuteAfter', async (nodeName) => { + hooks.addCallback('nodeExecuteAfter', async (nodeName) => { nodeExecutionOrder.push(nodeName); }); - hooks.addHook('workflowExecuteAfter', async (fullRunData) => { + hooks.addCallback('workflowExecuteAfter', async (fullRunData) => { waitPromise.resolve(fullRunData); });