Skip to content

Commit

Permalink
skip already checked out stages; make retries a CLI option
Browse files Browse the repository at this point in the history
  • Loading branch information
PythonFZ committed Jul 18, 2024
1 parent 1235869 commit e68537d
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions paraffin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
app = typer.Typer()


def run_stage(stage_name: str) -> str:
def run_stage(stage_name: str, max_retries: int) -> str:
"""Run the DVC repro command for a given stage and retry if an error occurs."""
command = ["dvc", "repro", "--single-item", stage_name]
max_retries = 3
for attempt in range(max_retries):
log.debug(f"Attempting {stage_name}, attempt {attempt + 1} of {max_retries}...")
process = subprocess.Popen(command, stderr=subprocess.PIPE, text=True)
Expand All @@ -36,9 +35,6 @@ def run_stage(stage_name: str) -> str:
if process.poll() is not None:
break

# Get the final stderr output
_, stderr = process.communicate()

# Check for errors
if not failed:
return stage_name
Expand Down Expand Up @@ -69,7 +65,11 @@ def get_predecessor_subgraph(


@app.command()
def main(max_workers: Optional[int] = None, targets: List[str] = typer.Argument(None)):
def main(
max_workers: Optional[int] = None,
targets: List[str] = typer.Argument(None),
max_retries: int = 3,
):
if max_workers is None:
max_workers = os.cpu_count()
typer.echo(f"Using {max_workers} workers")
Expand All @@ -93,11 +93,17 @@ def main(max_workers: Optional[int] = None, targets: List[str] = typer.Argument(
for stage in stages:
if stage.addressing in finished:
continue
# check if the stage is finished
if stage.already_cached():
finished.add(stage.addressing)
continue
if all(
pred.addressing in finished
for pred in graph.predecessors(stage)
):
submitted.add(executor.submit(run_stage, stage.addressing))
submitted.add(
executor.submit(run_stage, stage.addressing, max_retries)
)

for future in as_completed(submitted):
finished.add(future.result())
Expand Down

0 comments on commit e68537d

Please sign in to comment.