Skip to content

Commit

Permalink
feat(workflow): allow send event with no output (run-llama#1479)
Browse files Browse the repository at this point in the history
  • Loading branch information
himself65 authored Nov 14, 2024
1 parent 1d47036 commit ee20c44
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 102 deletions.
6 changes: 6 additions & 0 deletions .changeset/perfect-zebras-live.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@llamaindex/unit-test": patch
"@llamaindex/workflow": patch
---

feat(workflow): allow send event with no output
197 changes: 102 additions & 95 deletions packages/workflow/src/workflow-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@ export type StepHandler<
AnyWorkflowEventConstructor | StartEventConstructor,
...(AnyWorkflowEventConstructor | StopEventConstructor)[],
] = [AnyWorkflowEventConstructor | StartEventConstructor],
Out extends [
AnyWorkflowEventConstructor | StartEventConstructor,
...(AnyWorkflowEventConstructor | StopEventConstructor)[],
] = [AnyWorkflowEventConstructor | StopEventConstructor],
Out extends (AnyWorkflowEventConstructor | StopEventConstructor)[] = [],
> = (
context: HandlerContext<Data>,
...events: {
[K in keyof Inputs]: InstanceType<Inputs[K]>;
}
) => Promise<
{
[K in keyof Out]: InstanceType<Out[K]>;
}[number]
Out extends []
? void
: {
[K in keyof Out]: InstanceType<Out[K]>;
}[number]
>;

export type ReadonlyStepMap<Data> = ReadonlyMap<
Expand Down Expand Up @@ -275,7 +274,7 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
*/
#createStreamEvents(): AsyncIterableIterator<WorkflowEvent<unknown>> {
const isPendingEvents = new WeakSet<WorkflowEvent<unknown>>();
const pendingTasks = new Set<Promise<WorkflowEvent<unknown>>>();
const pendingTasks = new Set<Promise<WorkflowEvent<unknown> | void>>();
const enqueuedEvents = new Set<WorkflowEvent<unknown>>();
const stream = new ReadableStream<WorkflowEvent<unknown>>({
start: async (controller) => {
Expand Down Expand Up @@ -325,102 +324,104 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
}
const [steps, inputsMap, outputsMap] =
this.#getStepFunction(event);
const nextEventPromises: Promise<WorkflowEvent<unknown>>[] = [
...steps,
]
.map((step) => {
const inputs = [...(inputsMap.get(step) ?? [])];
const acceptableInputs: WorkflowEvent<unknown>[] =
this.#pendingInputQueue.filter((event) =>
inputs.some((input) => event instanceof input),
);
const events: WorkflowEvent<unknown>[] = flattenEvents(
inputs,
[event, ...acceptableInputs],
);
// remove the event from the queue, in case of infinite loop
events.forEach((event) => {
const protocolIdx = this.#queue.findIndex(
(protocol) =>
protocol.type === "event" &&
protocol.event === event,
const nextEventPromises: Promise<WorkflowEvent<unknown> | void>[] =
[...steps]
.map((step) => {
const inputs = [...(inputsMap.get(step) ?? [])];
const acceptableInputs: WorkflowEvent<unknown>[] =
this.#pendingInputQueue.filter((event) =>
inputs.some((input) => event instanceof input),
);
const events: WorkflowEvent<unknown>[] = flattenEvents(
inputs,
[event, ...acceptableInputs],
);
if (protocolIdx !== -1) {
this.#queue.splice(protocolIdx, 1);
// remove the event from the queue, in case of infinite loop
events.forEach((event) => {
const protocolIdx = this.#queue.findIndex(
(protocol) =>
protocol.type === "event" &&
protocol.event === event,
);
if (protocolIdx !== -1) {
this.#queue.splice(protocolIdx, 1);
}
});
if (events.length !== inputs.length) {
if (this.#verbose) {
console.log(
`Not enough inputs for step ${step.name}, waiting for more events`,
);
}
// not enough to run the step, push back to the queue
this.#sendEvent(event);
isPendingEvents.add(event);
return null;
}
if (isPendingEvents.has(event)) {
isPendingEvents.delete(event);
}
});
if (events.length !== inputs.length) {
if (this.#verbose) {
console.log(
`Not enough inputs for step ${step.name}, waiting for more events`,
`Running step ${step.name} with inputs ${events}`,
);
}
// not enough to run the step, push back to the queue
this.#sendEvent(event);
isPendingEvents.add(event);
return null;
}
if (isPendingEvents.has(event)) {
isPendingEvents.delete(event);
}
if (this.#verbose) {
console.log(
`Running step ${step.name} with inputs ${events}`,
);
}
const data = this.data;
return (step as StepHandler<Data>)
.call(
null,
{
get data() {
return data;
const data = this.data;
return (step as StepHandler<Data>)
.call(
null,
{
get data() {
return data;
},
sendEvent: this.#sendEvent,
requireEvent: this.#requireEvent,
},
sendEvent: this.#sendEvent,
requireEvent: this.#requireEvent,
},
// @ts-expect-error IDK why
...events.sort((a, b) => {
const aIndex = inputs.indexOf(
a.constructor as AnyWorkflowEventConstructor,
);
const bIndex = inputs.indexOf(
b.constructor as AnyWorkflowEventConstructor,
);
return aIndex - bIndex;
}),
)
.then((nextEvent) => {
if (this.#verbose) {
console.log(
`Step ${step.name} completed, next event is ${nextEvent}`,
);
}
const outputs = outputsMap.get(step) ?? [];
if (
!outputs.some(
(output) => nextEvent.constructor === output,
)
) {
if (this.#strict) {
const error = Error(
`Step ${step.name} returned an unexpected output event ${nextEvent}`,
// @ts-expect-error IDK why
...events.sort((a, b) => {
const aIndex = inputs.indexOf(
a.constructor as AnyWorkflowEventConstructor,
);
controller.error(error);
} else {
console.warn(
`Step ${step.name} returned an unexpected output event ${nextEvent}`,
const bIndex = inputs.indexOf(
b.constructor as AnyWorkflowEventConstructor,
);
return aIndex - bIndex;
}),
)
.then((nextEvent: void | WorkflowEvent<unknown>) => {
if (nextEvent === undefined) {
return;
}
}
if (!(nextEvent instanceof StopEvent)) {
this.#pendingInputQueue.unshift(nextEvent);
this.#sendEvent(nextEvent);
}
return nextEvent;
});
})
.filter((promise) => promise !== null);
if (this.#verbose) {
console.log(
`Step ${step.name} completed, next event is ${nextEvent}`,
);
}
const outputs = outputsMap.get(step) ?? [];
if (
!outputs.some(
(output) => nextEvent.constructor === output,
)
) {
if (this.#strict) {
const error = Error(
`Step ${step.name} returned an unexpected output event ${nextEvent}`,
);
controller.error(error);
} else {
console.warn(
`Step ${step.name} returned an unexpected output event ${nextEvent}`,
);
}
}
if (!(nextEvent instanceof StopEvent)) {
this.#pendingInputQueue.unshift(nextEvent);
this.#sendEvent(nextEvent);
}
return nextEvent;
});
})
.filter((promise) => promise !== null);
nextEventPromises.forEach((promise) => {
pendingTasks.add(promise);
promise
Expand All @@ -433,6 +434,9 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
});
Promise.race(nextEventPromises)
.then((fastestNextEvent) => {
if (fastestNextEvent === undefined) {
return;
}
if (!enqueuedEvents.has(fastestNextEvent)) {
controller.enqueue(fastestNextEvent);
enqueuedEvents.add(fastestNextEvent);
Expand All @@ -441,7 +445,10 @@ export class WorkflowContext<Start = string, Stop = string, Data = unknown>
})
.then(async (fastestNextEvent) =>
Promise.all(nextEventPromises).then((nextEvents) => {
for (const nextEvent of nextEvents) {
const events = nextEvents.filter(
(event) => event !== undefined,
);
for (const nextEvent of events) {
// do not enqueue the same event twice
if (fastestNextEvent !== nextEvent) {
if (!enqueuedEvents.has(nextEvent)) {
Expand Down
13 changes: 6 additions & 7 deletions packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ export class Workflow<ContextData, Start, Stop> {
AnyWorkflowEventConstructor | StartEventConstructor,
...(AnyWorkflowEventConstructor | StopEventConstructor)[],
],
const Out extends [
AnyWorkflowEventConstructor | StopEventConstructor,
...(AnyWorkflowEventConstructor | StopEventConstructor)[],
],
const Out extends (AnyWorkflowEventConstructor | StopEventConstructor)[],
>(
parameters: StepParameters<In, Out>,
stepFn: (
Expand All @@ -69,9 +66,11 @@ export class Workflow<ContextData, Start, Stop> {
[K in keyof In]: InstanceType<In[K]>;
}
) => Promise<
{
[K in keyof Out]: InstanceType<Out[K]>;
}[number]
Out extends []
? void
: {
[K in keyof Out]: InstanceType<Out[K]>;
}[number]
>,
): this {
const { inputs, outputs } = parameters;
Expand Down
15 changes: 15 additions & 0 deletions unit/workflow/workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,21 @@ describe("workflow basic", () => {
const result = await myWorkflow.run("start");
expect(result.data).toBe("query result");
});

test("allow output with send event", async () => {
const myFlow = new Workflow<unknown, string, string>({ verbose: true });
myFlow.addStep(
{
inputs: [StartEvent<string>],
outputs: [],
},
async (context, ev) => {
context.sendEvent(new StopEvent(`Hello ${ev.data}!`));
},
);
const result = myFlow.run("world");
expect((await result).data).toBe("Hello world!");
});
});

describe("workflow event loop", () => {
Expand Down

0 comments on commit ee20c44

Please sign in to comment.