diff --git a/changes/fix_p4_recovery.md b/changes/fix_p4_recovery.md new file mode 100644 index 00000000..c61aa19f --- /dev/null +++ b/changes/fix_p4_recovery.md @@ -0,0 +1 @@ +recovery of provision out phase when all or some operations are succeeded diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java index 91202642..2f2ecb72 100644 --- a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java @@ -19,6 +19,7 @@ import java.lang.System.Logger.Level; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -35,7 +36,7 @@ import java.util.stream.Stream; public abstract class BaseProcessor< - W extends ActiveWorkflow, PO extends ActiveOperation, TX> + W extends ActiveWorkflow, PO extends ActiveOperation, TX> implements TransactionManager, FileResolver { OperationControlFlow createNext( @@ -70,6 +71,7 @@ private interface PhaseManager { } interface TerminalHandler { + void failed(); JsonNode serialize(Output output); @@ -79,6 +81,7 @@ interface TerminalHandler { private final class TerminalOperationControlFlow implements OperationControlFlow { + private boolean finished; private final PO operation; private final TerminalHandler handler; @@ -167,6 +170,7 @@ public W workflow() { } private class Phase1Preflight implements PhaseManager { + private final W activeWorkflow; private final WorkflowDefinition definition; private boolean ok; @@ -229,9 +233,9 @@ public void release(Boolean result) { p -> activeWorkflow.arguments().has(p.name()) ? p.type() - .apply( - new ExtractRetryValues( - mapper(), activeWorkflow.arguments().get(p.name()))) + .apply( + new ExtractRetryValues( + mapper(), activeWorkflow.arguments().get(p.name()))) : Stream.empty()) .distinct() .collect(Collectors.toMap(Function.identity(), i -> new ArrayList<>())); @@ -282,12 +286,12 @@ public void release(Boolean result) { parameter -> activeWorkflow.arguments().has(parameter.name()) ? parameter - .type() - .apply( - new ExtractInputExternalIds( - mapper(), - activeWorkflow.arguments().get(parameter.name()), - BaseProcessor.this)) + .type() + .apply( + new ExtractInputExternalIds( + mapper(), + activeWorkflow.arguments().get(parameter.name()), + BaseProcessor.this)) : Stream.empty()) .map( ei -> @@ -295,9 +299,9 @@ public void release(Boolean result) { ((ExternalId) ei).getProvider(), ((ExternalId) ei).getId())) .collect(Collectors.toSet()); if (activeWorkflow - .extraInputIdsHandled() // Set to true when in Remaining or All case + .extraInputIdsHandled() // Set to true when in Remaining or All case ? discoveredExternalIds.containsAll( - outputRequestedExternalIds) // Doesn't need to be equal in this case + outputRequestedExternalIds) // Doesn't need to be equal in this case : discoveredExternalIds.equals(outputRequestedExternalIds)) { startNextPhase(this, provisionInTasks, transaction); } else { @@ -581,7 +585,8 @@ public JsonNode serialize(ProvisionData result) { .visit( new ResultVisitor() { @Override - public void file(String storagePath, String checksum, String checksumType, long size, String metatype) { + public void file(String storagePath, String checksum, String checksumType, + long size, String metatype) { final var file = node.putObject("result"); file.put("path", storagePath); file.put("checksum", checksum); @@ -610,7 +615,8 @@ public void succeeded(ProvisionData result) { new ResultVisitor() { @Override public void file( - String storagePath, String checksum, String checksumType, long size, String metatype) { + String storagePath, String checksum, String checksumType, long size, + String metatype) { workflow() .provisionFile( result.ids(), @@ -680,7 +686,8 @@ private Phase5Cleanup(WorkflowDefinition definition, W activeWorkflow) { public TerminalHandler createTerminal(PO operation) { return new TerminalHandler<>() { @Override - public void failed() {} + public void failed() { + } @Override public JsonNode serialize(Void result) { @@ -759,12 +766,12 @@ public static Stream validateInput( o -> metadata.has(o.name()) ? o.type() - .apply( - new ValidateOutputMetadata( - mapper, - target, - "\"" + o.name() + "\"", - metadata.get(o.name()))) + .apply( + new ValidateOutputMetadata( + mapper, + target, + "\"" + o.name() + "\"", + metadata.get(o.name()))) : Stream.of("Missing metadata attribute " + o.name())), workflow .parameters() @@ -796,8 +803,8 @@ public static Stream validateInput( .orElseGet( () -> engineParameters == null - || engineParameters.isNull() - || engineParameters.isEmpty() + || engineParameters.isNull() + || engineParameters.isEmpty() ? Stream.empty() : Stream.of( "Workflow engine does not support engine parameters, but they" @@ -832,22 +839,23 @@ protected void recover( final var p1 = new Phase1Preflight( target, activeOperations.size(), workflow, definition, workflow.isPreflightOkay()); - if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){ + if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) { p1.release(true); } else { - for (final var operation : activeOperations) { - TaskStarter.of( - operation.type(), - target - .provisionerFor(WorkflowOutputDataType.valueOf(operation.type()).format()) - .runPreflight() - .recover(operation.recoveryState())) - .start(this, operation, p1.createTerminal(operation)); - }} + for (final var operation : activeOperations) { + TaskStarter.of( + operation.type(), + target + .provisionerFor(WorkflowOutputDataType.valueOf(operation.type()).format()) + .runPreflight() + .recover(operation.recoveryState())) + .start(this, operation, p1.createTerminal(operation)); + } + } break; case PROVISION_IN: final var p2 = new Phase2ProvisionIn(target, activeOperations.size(), definition, workflow); - if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){ + if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) { inTransaction( transaction -> { startNextPhase(p2, List.of(TaskStarter.launch(p2.definition(), p2.activeWorkflow, @@ -855,21 +863,22 @@ protected void recover( } ); } else { - for (final var operation : activeOperations) { - PrepareInputProvisioning.recover( - definition.language(), - operation, - target.provisionerFor( - InputProvisionFormat.valueOf(operation.type().substring(1)))) - .start(this, operation, p2.createTerminal(operation)); - }} + for (final var operation : activeOperations) { + PrepareInputProvisioning.recover( + definition.language(), + operation, + target.provisionerFor( + InputProvisionFormat.valueOf(operation.type().substring(1)))) + .start(this, operation, p2.createTerminal(operation)); + } + } break; case RUNNING: /* * The constructor of Phase3Run enforces that activeOperations.size() >1 is illegal */ final var p3 = new Phase3Run(target, definition, activeOperations.size(), workflow); - if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){ + if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) { inTransaction( transaction -> { // if cleanup state is not null, do cleanup @@ -885,7 +894,7 @@ protected void recover( Set remainingIds = new HashSet<>(allIds); remainingIds.removeAll(workflow.requestedExternalIds()); JsonNode workflowRunUrl = recovery.get("workflowRunUrl"); - if (null != workflowRunUrl && !(workflowRunUrl instanceof NullNode)){ + if (null != workflowRunUrl && !(workflowRunUrl instanceof NullNode)) { target.runtimeProvisioners().forEach( p -> tasks.add( @@ -893,30 +902,30 @@ protected void recover( ) ); - if(definition.outputs().allMatch( + if (definition.outputs().allMatch( output -> { final AtomicBoolean isOk = new AtomicBoolean(true); JsonNode jsonOutput = recovery.get("output"); - if(jsonOutput != null && !(jsonOutput instanceof NullNode) - && jsonOutput.has(output.name())){ + if (jsonOutput != null && !(jsonOutput instanceof NullNode) + && jsonOutput.has(output.name())) { output.type().apply( - new PrepareOutputProvisioning( - mapper(), - target, - jsonOutput.get(output.name()), - workflow.metadata().get(output.name()), - allIds, - remainingIds, - () -> isOk.set(false), - workflow.id())) + new PrepareOutputProvisioning( + mapper(), + target, + jsonOutput.get(output.name()), + workflow.metadata().get(output.name()), + allIds, + remainingIds, + () -> isOk.set(false), + workflow.id())) .forEach(tasks::add); return isOk.get(); } else { return false; } } - )){ - // launch phase 4 + )) { + // launch phase 4 startNextPhase(p3, tasks, transaction); } else { workflow.phase(Phase.FAILED, Collections.emptyList(), transaction); @@ -925,57 +934,76 @@ protected void recover( } ); } else { - for (final var operation : activeOperations) { - TaskStarter.of( - "", - target - .engine() - .run() - .map(WorkflowEngine.Result::serialize) - .recover(operation.recoveryState())) - .start(this, operation, p3.createTerminal(operation)); - }} + for (final var operation : activeOperations) { + TaskStarter.of( + "", + target + .engine() + .run() + .map(WorkflowEngine.Result::serialize) + .recover(operation.recoveryState())) + .start(this, operation, p3.createTerminal(operation)); + } + } break; case PROVISION_OUT: final var p4 = new Phase4ProvisionOut(target, definition, activeOperations.size(), workflow); - if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){ - // TODO advance p4 - // but we also don't have the result here... + if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) { + // SUCCEEDED means we've already created the file entries in the db, so all that's left to do is clean up + inTransaction( + transaction -> { + final var cleanup = workflow.cleanup(); + if (cleanup == null) { + workflow.succeeded(transaction); + } else { + startNextPhase( + p4, + List.of(TaskStarter.launchCleanup(target.engine(), cleanup)), + transaction + ); + } + }); } else { - for (final var operation : activeOperations) { - if (operation.type().startsWith("$")) { - TaskStarter.of( - operation.type(), - recoverType.prepare( - TaskStarter.wrapRuntimeProvisioner( - target - .runtimeProvisioners() - .filter(p -> p.name().equals(operation.type().substring(1))) - .findAny() - .orElseThrow()), - operation.recoveryState())) - .start(this, operation, p4.createTerminal(operation)); - } else { - TaskStarter.of( - operation.type(), - recoverType.prepare( - TaskStarter.wrapOutputProvisioner( - target.provisionerFor(OutputProvisionFormat.valueOf(operation.type()))), - operation.recoveryState())) - .start(this, operation, p4.createTerminal(operation)); + for (final var operation : activeOperations) { + if (operation.status().equals(OperationStatus.SUCCEEDED)) { + // only launch the non-succeeded operations + continue; + } + if (operation.type().startsWith("$")) { + TaskStarter.of( + operation.type(), + recoverType.prepare( + TaskStarter.wrapRuntimeProvisioner( + target + .runtimeProvisioners() + .filter(p -> p.name().equals(operation.type().substring(1))) + .findAny() + .orElseThrow()), + operation.recoveryState())) + .start(this, operation, p4.createTerminal(operation)); + } else { + TaskStarter.of( + operation.type(), + recoverType.prepare( + TaskStarter.wrapOutputProvisioner( + target.provisionerFor(OutputProvisionFormat.valueOf(operation.type()))), + operation.recoveryState())) + .start(this, operation, p4.createTerminal(operation)); + } } - }} + } break; case CLEANUP: final var p5 = new Phase5Cleanup(definition, workflow); - if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){ + if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) { inTransaction(workflow::succeeded); } else { - for (final var operation : activeOperations) { - TaskStarter.of("", target.engine().cleanup().recover(operation.recoveryState())) - .start(this, operation, p5.createTerminal(operation)); - }} + for (final var operation : activeOperations) { + TaskStarter.of("", target.engine().cleanup().recover(operation.recoveryState())) + .start(this, operation, p5.createTerminal(operation)); + } + } break; case FAILED: throw new UnsupportedOperationException();