Skip to content

Commit

Permalink
feat: allow custom class name for SubGraphWorker via factory function
Browse files Browse the repository at this point in the history
  • Loading branch information
provos committed Dec 4, 2024
1 parent 2c2bdd5 commit ef1d793
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/planai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from .cached_task import CachedTaskWorker
from .graph import Graph
from .graph_task import SubGraphWorker
from .joined_task import InitialTaskWorker, JoinedTaskWorker
from .llm_config import llm_from_config
from .llm_task import CachedLLMTaskWorker, LLMTaskWorker
Expand All @@ -33,4 +34,5 @@
"CachedLLMTaskWorker",
"JoinedTaskWorker",
"PydanticDictWrapper",
"SubGraphWorker",
]
43 changes: 41 additions & 2 deletions src/planai/graph_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
PRIVATE_STATE_KEY = "_graph_task_private_state"


class SubGraphWorker(TaskWorker):
class SubGraphWorkerInternal(TaskWorker):
graph: Graph = Field(
..., description="The graph that will be run as part of this TaskWorker"
)
Expand Down Expand Up @@ -84,6 +84,43 @@ def consume_work(self, task: Task):
self.graph._add_work(self.entry_worker, new_task)


def SubGraphWorker(
*,
graph: Graph,
entry_worker: TaskWorker,
exit_worker: TaskWorker,
name: str = "SubGraphWorker",
) -> SubGraphWorkerInternal:
"""
Factory function to create a SubGraphWorker that manages a subgraph within a larger PlanAI graph.
Parameters
----------
name : str, optional
Custom name for the SubGraphWorker class, defaults to "SubGraphWorker"
graph : Graph
The graph that will be run as part of this TaskWorker
entry_worker : TaskWorker
The entry point worker of the graph that receives initial tasks
exit_worker : TaskWorker
The exit point worker of the graph that produces final outputs
Must have exactly one output type
Returns
-------
SubGraphWorkerInternal
A new instance of SubGraphWorker with the specified configuration
Raises
------
ValueError
If the exit_worker has more than one output type
"""
# Create a new class with the custom name
CustomClass = type(name, (SubGraphWorkerInternal,), {})
return CustomClass(graph=graph, entry_worker=entry_worker, exit_worker=exit_worker)


def main():
import argparse
import random
Expand Down Expand Up @@ -164,7 +201,9 @@ def consume_work(self, task: Task3WorkItem):
sub_graph.set_dependency(task1, task2)

# Create the graph task
graph_task = SubGraphWorker(graph=sub_graph, entry_worker=task1, exit_worker=task2)
graph_task = SubGraphWorker(
name="SubGraph", graph=sub_graph, entry_worker=task1, exit_worker=task2
)

# Create the final consumer
task3 = Task3Worker()
Expand Down

0 comments on commit ef1d793

Please sign in to comment.