Skip to content

Commit

Permalink
refactor: simplify add_multiple_work method by delegating to add_work
Browse files Browse the repository at this point in the history
  • Loading branch information
provos committed Jan 28, 2025
1 parent ab9833c commit b37081d
Showing 1 changed file with 1 addition and 18 deletions.
19 changes: 1 addition & 18 deletions src/planai/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,25 +533,8 @@ def add_work(self, worker: TaskWorker, task: Task):
self._add_to_queue(worker, task_copy)

def add_multiple_work(self, work_items: List[Tuple[TaskWorker, Task]]):
filtered_work_items = []
for worker, task in work_items:
if not self._is_provenance_aborted(worker, task._provenance):
filtered_work_items.append((worker, task))
else:
logging.info(
"Skipping task %s with %s due to aborted provenance chain",
task.name,
task._provenance,
)
work_items = filtered_work_items

# the ordering of adding provenance first is important for join tasks to
# work correctly. Otherwise, caching may lead to fast execution of tasks
# before all the provenance is added.
for worker, task in work_items:
worker._graph._provenance_tracker._add_provenance(task)
for worker, task in work_items:
self._add_to_queue(worker, task)
self.add_work(worker, task)

def _add_to_queue(self, worker: TaskWorker, task: Task):
inheritance_chain = get_inheritance_chain(worker.__class__)
Expand Down

0 comments on commit b37081d

Please sign in to comment.