Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expanding Lifecycle Adapters for Dynamic DAGs / Parallel Execution #1196

Open
cswartzvi opened this issue Oct 19, 2024 · 2 comments
Open

Expanding Lifecycle Adapters for Dynamic DAGs / Parallel Execution #1196

cswartzvi opened this issue Oct 19, 2024 · 2 comments
Labels
Dynamic DAGs enhancement New feature or request

Comments

@cswartzvi
Copy link
Contributor

cswartzvi commented Oct 19, 2024

Is your feature request related to a problem? Please describe.

I hit a bit of a snag while creating some custom multi-level progress bar lifecycle adapters for task-based parallel DAGs (with rich for the curious). Currently, for task-based DAGs, TaskExecutionHook will only fire before and after a task is executed. The hooks have no knowledge of the overall task landscape, including:

  1. Number (and index) of tasks in the current group
  2. Overall groups in the graph
  3. Details about the expander task parameterization
  4. Type of current task (expander, collector, etc.)
  5. Spawning task ID (if available)

Note: Item 1 was originally discussed on Slack: https://hamilton-opensource.slack.com/archives/C03MANME6G5/p1728403433108319

Describe the solution you'd like

After speaking with @elijahbenizzy, an initial implementation for item 1 was suggested that modifies the TaskImplementation object to store the current task index and the total number of tasks. This information would then be wired through various methods in the ExecutionState class and be eventually passed to the lifecycle hooks run_after_task_execution and run_before_task_execution on TaskExecutionHook.

While implementing the above in a test branch (https://github.com/cswartzvi/hamilton/tree/update_task_execution_hook) I found that it was still difficult to create a multi-level progress bar without some of the information in item 2-5. To that end I also added:

  • spawning_task_id and purpose to the methods and hooks associated with TaskExecutionHook
  • Created a new hook post_task_group that runs after the tasks are grouped 
  • Created a new hook post_task_expand that runs after the expander task is parameterized

With these additional changes (also in the branch above) I was able to create my coveted multi-level progress bar:

class TaskProgressHook(TaskExecutionHook, TaskGroupingHook, GraphExecutionHook):

    def __init__(self) -> None:
        self._console = rich.console.Console()
        self._progress = rich.progress.Progress(console=self._console)

    def run_before_graph_execution(self, **kwargs: Any):
        pass

    def run_after_graph_execution(self, **kwargs: Any):
        self._progress.stop()  # in case progress thread is lagging

    def run_after_task_grouping(self, *, tasks: List[TaskSpec], **kwargs):
        self._progress.add_task("Running Task Groups:", total=len(tasks))
        self._progress.start()

    def run_after_task_expansion(self, *, parameters: dict[str, Any], **kwargs):
        self._progress.add_task("Running Parallelizable:", total=len(parameters))

    def run_before_task_execution(self, *, purpose: NodeGroupPurpose, **kwargs):
        if purpose == NodeGroupPurpose.GATHER:
            self._progress.advance(self._progress.task_ids[0])
            self._progress.stop_task(self._progress.task_ids[-1])

    def run_after_task_execution(self, *, purpose: NodeGroupPurpose, **kwargs):
        if purpose == NodeGroupPurpose.EXECUTE_BLOCK:
            self._progress.advance(self._progress.task_ids[-1])
        else:
            self._progress.advance(self._progress.task_ids[0])

Multi-Level-Progress

Maybe I reached a little too far with this for my own selfish goals 😄, either way please let me know if you would be interested in a PR for any, or all, of the changes to the task lifecycle adapters (heck, I would also be willing to add rich plugins if you like that as well). Thanks!

Additional context

Currently, the build-in lifecycle adapter ProgressBar has an indeterminate length for task-based DAGs.

@zilto
Copy link
Collaborator

zilto commented Oct 21, 2024

I'll let Elijah give you feedback. Just wanted to say that the progress bar looks awesome!

@elijahbenizzy while reviewing this code, we can think about how this could help with caching. The NodeGroupPurpose seems to match the NodeRoleInTaskExecution (used here) and the spawning parent would help determine the role of dependencies when creating the cache key (here)

@elijahbenizzy
Copy link
Collaborator

Nice! That progress bar is beautiful. I think that, on the surface, these are quite reasonable, and I'm happy to move towards merging these! Will list a set of requirements we'll want to make this into production.

The way to think about hooks is a contract for the executor to follow. In this case, we only have one general execution system (the dynamic one), as we update (E.G. add async support for parallelism, etc...), we'll have more. So as long as the parameters make sense, or have reasonable default values, they're fine to add!

Looking at your code you've done it correctly -- added both layers (internal), and added parameters in the backwards compatible way. The other thing you did well is not pass in the list of task specs to the generator task, which still could be an iterator (if we want to get fancy and do lazy execution).

I think the fact that the code was so easy to write/simple indicates the abstractions are right (for this problem at least).

Note you could add node-level if you're not using Ray. You could also have it say which task you're running now

Anyway, open a PR! Things for getting this to production:

  • Tests to ensure the post_task_group is called
  • Documentation -- ensure they're in the docs + add to the references section
  • Add an install target for progress bars (maybe rich and tqdm) and group them? pip install sf-hamilton[progress_bars] or something like that
  • (optional) -- write a very short guest blog post on the progress bar -- a quick note on how you built it with lifecycle hooks and a nice snazzy gif/demo

Should be easy to get out. Really appreciate this! And yes @zilto I think we could simplify the caching code with this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Dynamic DAGs enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants