Skip to content

Commit

Permalink
address more PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Jan 13, 2025
1 parent cebc665 commit a056aa1
Show file tree
Hide file tree
Showing 18 changed files with 117 additions and 116 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/__tests__/workflow-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ describe('processError', () => {
workflow = await createWorkflow({}, owner);
execution = await createExecution({ status: 'success', finished: true }, workflow);
hooks = new core.ExecutionLifecycleHooks('webhook', execution.id, workflow);
hooks.addHook('workflowExecuteAfter', watcher.workflowExecuteAfter);
hooks.addCallback('workflowExecuteAfter', watcher.workflowExecuteAfter);
});

test('processError should return early in Bull stalled edge case', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ describe('ExecutionLifecycleHooksFactory', () => {
const hooks = hooksFactory.forMainProcess(executionData, executionId);

// Test workflowExecuteBefore hook
await hooks.executeHook('workflowExecuteBefore', []);
await hooks.runHook('workflowExecuteBefore', []);
expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [undefined, 'manual']);

// Test nodeExecuteBefore hook
await hooks.executeHook('nodeExecuteBefore', ['testNode']);
await hooks.runHook('nodeExecuteBefore', ['testNode']);
expect(push.send).toHaveBeenCalledWith(
{
type: 'nodeExecuteBefore',
Expand All @@ -113,7 +113,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
const taskData = mock<ITaskData>({});
const runExecutionData: IRunExecutionData = { resultData: { runData: {} } };

await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]);
await hooks.runHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]);

expect(push.send).toHaveBeenCalledWith(
{
Expand All @@ -125,7 +125,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
push.send.mockClear();

// Test workflowExecuteAfter hook
await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]);

expect(push.send).toHaveBeenCalledWith(
{
Expand All @@ -148,7 +148,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
it('should handle waiting status in workflowExecuteAfter', async () => {
const hooks = hooksFactory.forMainProcess(executionData, executionId);

await hooks.executeHook('workflowExecuteAfter', [
await hooks.runHook('workflowExecuteAfter', [
{
...fullRunData,
status: 'waiting',
Expand All @@ -167,7 +167,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
it('should not update for manual executions', async () => {
const hooks = hooksFactory.forMainProcess(executionData, executionId);

await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]);

expect(workflowStaticDataService.saveStaticDataById).not.toHaveBeenCalled();
});
Expand All @@ -181,7 +181,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
executionId,
);

await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]);

expect(workflowStaticDataService.saveStaticDataById).toHaveBeenCalledWith(
workflowData.id,
Expand All @@ -197,7 +197,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
// Mock workflow settings to not save manual executions
workflowData.settings = { saveManualExecutions: false };

await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]);

expect(executionRepository.softDelete).toHaveBeenCalledWith(executionId);
});
Expand All @@ -223,7 +223,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
.calledWith(executionId)
.mockResolvedValue(fullExecutionData);

await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]);
await hooks.runHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]);

expect(fullExecutionData.data.resultData.lastNodeExecuted).toBe('testNode');
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith(
Expand Down Expand Up @@ -262,7 +262,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
},
};

await hooks.executeHook('workflowExecuteAfter', [failedRunData, newStaticData]);
await hooks.runHook('workflowExecuteAfter', [failedRunData, newStaticData]);

// Verify error workflow execution
expect(executeErrorWorkflowSpy).toHaveBeenCalledWith(
Expand All @@ -288,7 +288,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
const error = new Error('Static data save failed');
workflowStaticDataService.saveStaticDataById.mockRejectedValueOnce(error);

await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]);

expect(errorReporter.error).toHaveBeenCalledWith(error);
});
Expand All @@ -299,7 +299,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
const error = new Error('DB save failed');
executionRepository.updateExistingExecution.mockRejectedValueOnce(error);

await hooks.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]);

expect(errorReporter.error).toHaveBeenCalledWith(error);
});
Expand All @@ -321,7 +321,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
},
};

await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]);
await hooks.runHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]);

expect(executionMetadataService.save).toHaveBeenCalledWith(executionId, {
someMetadata: 'value',
Expand All @@ -346,7 +346,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
},
};

await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]);
await hooks.runHook('workflowExecuteAfter', [runDataWithMetadata, newStaticData]);

expect(errorReporter.error).toHaveBeenCalledWith(error);
});
Expand Down Expand Up @@ -380,7 +380,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
optionalParameters,
);

await hooks.executeHook('workflowExecuteBefore', []);
await hooks.runHook('workflowExecuteBefore', []);
expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [undefined, 'manual']);
});

Expand All @@ -396,7 +396,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
finished: false,
};

await hooks.executeHook('workflowExecuteAfter', [unfinishedRunData]);
await hooks.runHook('workflowExecuteAfter', [unfinishedRunData]);

expect(executionRepository.hardDelete).not.toHaveBeenCalled();
});
Expand All @@ -415,7 +415,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
finished: true,
};

await hooks.executeHook('workflowExecuteAfter', [successRunData]);
await hooks.runHook('workflowExecuteAfter', [successRunData]);

expect(executionRepository.hardDelete).toHaveBeenCalledWith({
workflowId,
Expand All @@ -437,7 +437,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
finished: true,
};

await hooks.executeHook('workflowExecuteAfter', [errorRunData]);
await hooks.runHook('workflowExecuteAfter', [errorRunData]);

expect(executionRepository.hardDelete).toHaveBeenCalledWith({
workflowId,
Expand All @@ -459,7 +459,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
finished: true,
};

await hooks.executeHook('workflowExecuteAfter', [successRunData]);
await hooks.runHook('workflowExecuteAfter', [successRunData]);

expect(executionRepository.hardDelete).not.toHaveBeenCalled();
});
Expand All @@ -477,7 +477,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
finished: true,
};

await hooks.executeHook('workflowExecuteAfter', [runData]);
await hooks.runHook('workflowExecuteAfter', [runData]);

expect(runData.status).toBe('success');
},
Expand Down Expand Up @@ -513,7 +513,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
optionalParameters,
);

await hooks.executeHook('workflowExecuteBefore', []);
await hooks.runHook('workflowExecuteBefore', []);

expect(externalHooks.run).toHaveBeenCalledWith('workflow.preExecute', [
undefined,
Expand Down Expand Up @@ -547,7 +547,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
.calledWith(executionId)
.mockResolvedValue(fullExecutionData);

await hooks.executeHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]);
await hooks.runHook('nodeExecuteAfter', ['testNode', taskData, runExecutionData]);

expect(fullExecutionData.data.resultData.lastNodeExecuted).toBe('testNode');
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith(
Expand All @@ -566,14 +566,14 @@ describe('ExecutionLifecycleHooksFactory', () => {
optionalParameters,
);

await hooks.executeHook('nodeExecuteBefore', ['testNode']);
await hooks.runHook('nodeExecuteBefore', ['testNode']);
expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', {
executionId,
workflow: workflowData,
nodeName: 'testNode',
});

await hooks.executeHook('nodeExecuteAfter', ['testNode']);
await hooks.runHook('nodeExecuteAfter', ['testNode']);
expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', {
executionId,
workflow: workflowData,
Expand All @@ -589,7 +589,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
optionalParameters,
);

await hooks.executeHook('workflowExecuteBefore', []);
await hooks.runHook('workflowExecuteBefore', []);
expect(eventService.emit).toHaveBeenCalledWith('workflow-pre-execute', {
executionId,
data: workflowData,
Expand All @@ -605,7 +605,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
);
const testNode = { name: 'Test Node' };

await hooks.executeHook('nodeFetchedData', [workflowId, testNode]);
await hooks.runHook('nodeFetchedData', [workflowId, testNode]);
expect(workflowStatisticsService.emit).toHaveBeenCalledWith('nodeFetchedData', {
workflowId,
node: testNode,
Expand All @@ -630,7 +630,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
finished: true,
};

await hooks.executeHook('workflowExecuteAfter', [successRunData, {}]);
await hooks.runHook('workflowExecuteAfter', [successRunData, {}]);

expect(executionRepository.updateExistingExecution).toHaveBeenCalled();
expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', {
Expand Down Expand Up @@ -661,7 +661,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
finished: true,
};

await hooks.executeHook('workflowExecuteAfter', [errorRunData, {}]);
await hooks.runHook('workflowExecuteAfter', [errorRunData, {}]);

expect(executionRepository.updateExistingExecution).toHaveBeenCalled();
expect(workflowStatisticsService.emit).toHaveBeenCalledWith('workflowExecutionCompleted', {
Expand Down Expand Up @@ -693,7 +693,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
finished: true,
};

await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, {}]);
await hooks.runHook('workflowExecuteAfter', [runDataWithMetadata, {}]);

expect(executionMetadataService.save).toHaveBeenCalledWith(executionId, {
parameter: 'test',
Expand All @@ -709,7 +709,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
);
const newStaticData = { newKey: 'newValue' };

await hooks.executeHook('workflowExecuteAfter', [
await hooks.runHook('workflowExecuteAfter', [
{
data: { resultData: { runData: {} } },
mode: 'webhook',
Expand Down Expand Up @@ -739,7 +739,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
const error = new Error('Save failed');
executionRepository.updateExistingExecution.mockRejectedValueOnce(error);

await hooks.executeHook('workflowExecuteAfter', [
await hooks.runHook('workflowExecuteAfter', [
{
data: { resultData: { runData: {} } },
mode: 'manual',
Expand Down Expand Up @@ -777,7 +777,7 @@ describe('ExecutionLifecycleHooksFactory', () => {
finished: true,
};

await hooks.executeHook('workflowExecuteAfter', [runDataWithMetadata, {}]);
await hooks.runHook('workflowExecuteAfter', [runDataWithMetadata, {}]);

expect(errorReporter.error).toHaveBeenCalledWith(error);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class ExecutionLifecycleHooksFactory {
const { executionRepository } = this;
// TODO: >>> clear all nodeExecuteAfter hooks <<<
// hookFunctions.nodeExecuteAfter = [];
hooks.addHook('workflowExecuteAfter', async function (fullRunData) {
hooks.addCallback('workflowExecuteAfter', async function (fullRunData) {
// Don't delete executions before they are finished
if (!fullRunData.finished) return;

Expand Down Expand Up @@ -124,12 +124,12 @@ export class ExecutionLifecycleHooksFactory {
private addPreExecuteHooks(hooks: ExecutionLifecycleHooks) {
const { externalHooks } = this;

hooks.addHook('workflowExecuteBefore', async function (workflow) {
hooks.addCallback('workflowExecuteBefore', async function (workflow) {
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
});

// TODO: skip this if saveSettings.progress is not true
hooks.addHook('nodeExecuteAfter', async function (nodeName, data, executionData) {
hooks.addCallback('nodeExecuteAfter', async function (nodeName, data, executionData) {
await saveExecutionProgress(
this.workflowData,
this.executionId,
Expand All @@ -143,22 +143,22 @@ export class ExecutionLifecycleHooksFactory {

private addEventHooks(hooks: ExecutionLifecycleHooks) {
const { eventService, workflowStatisticsService } = this;
hooks.addHook('nodeExecuteBefore', async function (nodeName) {
hooks.addCallback('nodeExecuteBefore', async function (nodeName) {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-pre-execute', { executionId, workflow, nodeName });
});

hooks.addHook('nodeExecuteAfter', async function (nodeName) {
hooks.addCallback('nodeExecuteAfter', async function (nodeName) {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-post-execute', { executionId, workflow, nodeName });
});

hooks.addHook('workflowExecuteBefore', async function () {
hooks.addCallback('workflowExecuteBefore', async function () {
const { executionId, workflowData } = this;
eventService.emit('workflow-pre-execute', { executionId, data: workflowData });
});

hooks.addHook('nodeFetchedData', async (workflowId, node) => {
hooks.addCallback('nodeFetchedData', async (workflowId, node) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
});
}
Expand All @@ -176,7 +176,7 @@ export class ExecutionLifecycleHooksFactory {
const factory = this;

// eslint-disable-next-line complexity
hooks.addHook('workflowExecuteAfter', async function (fullRunData, newStaticData) {
hooks.addCallback('workflowExecuteAfter', async function (fullRunData, newStaticData) {
logger.debug('Executing hook (hookFunctionsSave)', {
executionId: this.executionId,
workflowId: this.workflowData.id,
Expand Down Expand Up @@ -303,7 +303,7 @@ export class ExecutionLifecycleHooksFactory {
private addPushHooks(hooks: ExecutionLifecycleHooks) {
const { logger, push } = this;

hooks.addHook('nodeExecuteBefore', async function (nodeName) {
hooks.addCallback('nodeExecuteBefore', async function (nodeName) {
const { pushRef, executionId } = this;
// Push data to session which started workflow before each
// node which starts rendering
Expand All @@ -320,7 +320,7 @@ export class ExecutionLifecycleHooksFactory {
push.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef);
});

hooks.addHook('nodeExecuteAfter', async function (nodeName, data) {
hooks.addCallback('nodeExecuteAfter', async function (nodeName, data) {
const { pushRef, executionId } = this;
// Push data to session which started workflow after each rendered node
if (pushRef === undefined) {
Expand All @@ -336,7 +336,7 @@ export class ExecutionLifecycleHooksFactory {
push.send({ type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, pushRef);
});

hooks.addHook('workflowExecuteBefore', async function (_workflow, data) {
hooks.addCallback('workflowExecuteBefore', async function (_workflow, data) {
const { pushRef, executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
Expand Down Expand Up @@ -367,7 +367,7 @@ export class ExecutionLifecycleHooksFactory {
);
});

hooks.addHook('workflowExecuteAfter', async function (fullRunData) {
hooks.addCallback('workflowExecuteAfter', async function (fullRunData) {
const { pushRef, executionId } = this;
if (pushRef === undefined) return;

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/executions/execution-recovery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,6 @@ export class ExecutionRecoveryService {
status: execution.status,
};

await lifecycleHooks.executeHook('workflowExecuteAfter', [run]);
await lifecycleHooks.runHook('workflowExecuteAfter', [run]);
}
}
Loading

0 comments on commit a056aa1

Please sign in to comment.