Skip to content

Commit

Permalink
GP-4544 fix provision out recovery where all or some operations are s…
Browse files Browse the repository at this point in the history
…ucceeded
  • Loading branch information
Heather Armstrong authored and callunity committed Feb 10, 2025
1 parent c89a62c commit 8082e9d
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 98 deletions.
1 change: 1 addition & 0 deletions changes/fix_p4_recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
recovery of provision out phase when all or some operations are succeeded
224 changes: 126 additions & 98 deletions vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.lang.System.Logger.Level;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -35,7 +36,7 @@
import java.util.stream.Stream;

public abstract class BaseProcessor<
W extends ActiveWorkflow<PO, TX>, PO extends ActiveOperation<TX>, TX>
W extends ActiveWorkflow<PO, TX>, PO extends ActiveOperation<TX>, TX>
implements TransactionManager<TX>, FileResolver {

<State extends Record, Output> OperationControlFlow<State, Output> createNext(
Expand Down Expand Up @@ -70,6 +71,7 @@ private interface PhaseManager<W, R, N, PO> {
}

interface TerminalHandler<Output> {

void failed();

JsonNode serialize(Output output);
Expand All @@ -79,6 +81,7 @@ interface TerminalHandler<Output> {

private final class TerminalOperationControlFlow<State extends Record, Output>
implements OperationControlFlow<State, Output> {

private boolean finished;
private final PO operation;
private final TerminalHandler<Output> handler;
Expand Down Expand Up @@ -167,6 +170,7 @@ public W workflow() {
}

private class Phase1Preflight implements PhaseManager<W, Boolean, JsonMutation, PO> {

private final W activeWorkflow;
private final WorkflowDefinition definition;
private boolean ok;
Expand Down Expand Up @@ -229,9 +233,9 @@ public void release(Boolean result) {
p ->
activeWorkflow.arguments().has(p.name())
? p.type()
.apply(
new ExtractRetryValues(
mapper(), activeWorkflow.arguments().get(p.name())))
.apply(
new ExtractRetryValues(
mapper(), activeWorkflow.arguments().get(p.name())))
: Stream.empty())
.distinct()
.collect(Collectors.toMap(Function.identity(), i -> new ArrayList<>()));
Expand Down Expand Up @@ -282,22 +286,22 @@ public void release(Boolean result) {
parameter ->
activeWorkflow.arguments().has(parameter.name())
? parameter
.type()
.apply(
new ExtractInputExternalIds(
mapper(),
activeWorkflow.arguments().get(parameter.name()),
BaseProcessor.this))
.type()
.apply(
new ExtractInputExternalIds(
mapper(),
activeWorkflow.arguments().get(parameter.name()),
BaseProcessor.this))
: Stream.empty())
.map(
ei ->
new ExternalId(
((ExternalId) ei).getProvider(), ((ExternalId) ei).getId()))
.collect(Collectors.toSet());
if (activeWorkflow
.extraInputIdsHandled() // Set to true when in Remaining or All case
.extraInputIdsHandled() // Set to true when in Remaining or All case
? discoveredExternalIds.containsAll(
outputRequestedExternalIds) // Doesn't need to be equal in this case
outputRequestedExternalIds) // Doesn't need to be equal in this case
: discoveredExternalIds.equals(outputRequestedExternalIds)) {
startNextPhase(this, provisionInTasks, transaction);
} else {
Expand Down Expand Up @@ -581,7 +585,8 @@ public JsonNode serialize(ProvisionData result) {
.visit(
new ResultVisitor() {
@Override
public void file(String storagePath, String checksum, String checksumType, long size, String metatype) {
public void file(String storagePath, String checksum, String checksumType,
long size, String metatype) {
final var file = node.putObject("result");
file.put("path", storagePath);
file.put("checksum", checksum);
Expand Down Expand Up @@ -610,7 +615,8 @@ public void succeeded(ProvisionData result) {
new ResultVisitor() {
@Override
public void file(
String storagePath, String checksum, String checksumType, long size, String metatype) {
String storagePath, String checksum, String checksumType, long size,
String metatype) {
workflow()
.provisionFile(
result.ids(),
Expand Down Expand Up @@ -680,7 +686,8 @@ private Phase5Cleanup(WorkflowDefinition definition, W activeWorkflow) {
public TerminalHandler<Void> createTerminal(PO operation) {
return new TerminalHandler<>() {
@Override
public void failed() {}
public void failed() {
}

@Override
public JsonNode serialize(Void result) {
Expand Down Expand Up @@ -759,12 +766,12 @@ public static Stream<String> validateInput(
o ->
metadata.has(o.name())
? o.type()
.apply(
new ValidateOutputMetadata(
mapper,
target,
"\"" + o.name() + "\"",
metadata.get(o.name())))
.apply(
new ValidateOutputMetadata(
mapper,
target,
"\"" + o.name() + "\"",
metadata.get(o.name())))
: Stream.of("Missing metadata attribute " + o.name())),
workflow
.parameters()
Expand Down Expand Up @@ -796,8 +803,8 @@ public static Stream<String> validateInput(
.orElseGet(
() ->
engineParameters == null
|| engineParameters.isNull()
|| engineParameters.isEmpty()
|| engineParameters.isNull()
|| engineParameters.isEmpty()
? Stream.empty()
: Stream.of(
"Workflow engine does not support engine parameters, but they"
Expand Down Expand Up @@ -832,44 +839,46 @@ protected void recover(
final var p1 =
new Phase1Preflight(
target, activeOperations.size(), workflow, definition, workflow.isPreflightOkay());
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
p1.release(true);
} else {
for (final var operation : activeOperations) {
TaskStarter.of(
operation.type(),
target
.provisionerFor(WorkflowOutputDataType.valueOf(operation.type()).format())
.runPreflight()
.recover(operation.recoveryState()))
.start(this, operation, p1.createTerminal(operation));
}}
for (final var operation : activeOperations) {
TaskStarter.of(
operation.type(),
target
.provisionerFor(WorkflowOutputDataType.valueOf(operation.type()).format())
.runPreflight()
.recover(operation.recoveryState()))
.start(this, operation, p1.createTerminal(operation));
}
}
break;
case PROVISION_IN:
final var p2 = new Phase2ProvisionIn(target, activeOperations.size(), definition, workflow);
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
inTransaction(
transaction -> {
startNextPhase(p2, List.of(TaskStarter.launch(p2.definition(), p2.activeWorkflow,
target.engine(), p2.activeWorkflow.realInputs().get(0))), transaction);
}
);
} else {
for (final var operation : activeOperations) {
PrepareInputProvisioning.recover(
definition.language(),
operation,
target.provisionerFor(
InputProvisionFormat.valueOf(operation.type().substring(1))))
.start(this, operation, p2.createTerminal(operation));
}}
for (final var operation : activeOperations) {
PrepareInputProvisioning.recover(
definition.language(),
operation,
target.provisionerFor(
InputProvisionFormat.valueOf(operation.type().substring(1))))
.start(this, operation, p2.createTerminal(operation));
}
}
break;
case RUNNING:
/*
* The constructor of Phase3Run enforces that activeOperations.size() >1 is illegal
*/
final var p3 = new Phase3Run(target, definition, activeOperations.size(), workflow);
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
inTransaction(
transaction -> {
// if cleanup state is not null, do cleanup
Expand All @@ -885,38 +894,38 @@ protected void recover(
Set<ExternalId> remainingIds = new HashSet<>(allIds);
remainingIds.removeAll(workflow.requestedExternalIds());
JsonNode workflowRunUrl = recovery.get("workflowRunUrl");
if (null != workflowRunUrl && !(workflowRunUrl instanceof NullNode)){
if (null != workflowRunUrl && !(workflowRunUrl instanceof NullNode)) {
target.runtimeProvisioners().forEach(
p ->
tasks.add(
TaskStarter.launch(p, allIds, Map.of(), workflowRunUrl.asText())
)
);

if(definition.outputs().allMatch(
if (definition.outputs().allMatch(
output -> {
final AtomicBoolean isOk = new AtomicBoolean(true);
JsonNode jsonOutput = recovery.get("output");
if(jsonOutput != null && !(jsonOutput instanceof NullNode)
&& jsonOutput.has(output.name())){
if (jsonOutput != null && !(jsonOutput instanceof NullNode)
&& jsonOutput.has(output.name())) {
output.type().apply(
new PrepareOutputProvisioning(
mapper(),
target,
jsonOutput.get(output.name()),
workflow.metadata().get(output.name()),
allIds,
remainingIds,
() -> isOk.set(false),
workflow.id()))
new PrepareOutputProvisioning(
mapper(),
target,
jsonOutput.get(output.name()),
workflow.metadata().get(output.name()),
allIds,
remainingIds,
() -> isOk.set(false),
workflow.id()))
.forEach(tasks::add);
return isOk.get();
} else {
return false;
}
}
)){
// launch phase 4
)) {
// launch phase 4
startNextPhase(p3, tasks, transaction);
} else {
workflow.phase(Phase.FAILED, Collections.emptyList(), transaction);
Expand All @@ -925,57 +934,76 @@ protected void recover(
}
);
} else {
for (final var operation : activeOperations) {
TaskStarter.of(
"",
target
.engine()
.run()
.map(WorkflowEngine.Result::serialize)
.recover(operation.recoveryState()))
.start(this, operation, p3.createTerminal(operation));
}}
for (final var operation : activeOperations) {
TaskStarter.of(
"",
target
.engine()
.run()
.map(WorkflowEngine.Result::serialize)
.recover(operation.recoveryState()))
.start(this, operation, p3.createTerminal(operation));
}
}
break;
case PROVISION_OUT:
final var p4 =
new Phase4ProvisionOut(target, definition, activeOperations.size(), workflow);
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
// TODO advance p4
// but we also don't have the result here...
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
// SUCCEEDED means we've already created the file entries in the db, so all that's left to do is clean up
inTransaction(
transaction -> {
final var cleanup = workflow.cleanup();
if (cleanup == null) {
workflow.succeeded(transaction);
} else {
startNextPhase(
p4,
List.of(TaskStarter.launchCleanup(target.engine(), cleanup)),
transaction
);
}
});
} else {
for (final var operation : activeOperations) {
if (operation.type().startsWith("$")) {
TaskStarter.of(
operation.type(),
recoverType.prepare(
TaskStarter.wrapRuntimeProvisioner(
target
.runtimeProvisioners()
.filter(p -> p.name().equals(operation.type().substring(1)))
.findAny()
.orElseThrow()),
operation.recoveryState()))
.start(this, operation, p4.createTerminal(operation));
} else {
TaskStarter.of(
operation.type(),
recoverType.prepare(
TaskStarter.wrapOutputProvisioner(
target.provisionerFor(OutputProvisionFormat.valueOf(operation.type()))),
operation.recoveryState()))
.start(this, operation, p4.createTerminal(operation));
for (final var operation : activeOperations) {
if (operation.status().equals(OperationStatus.SUCCEEDED)) {
// only launch the non-succeeded operations
continue;
}
if (operation.type().startsWith("$")) {
TaskStarter.of(
operation.type(),
recoverType.prepare(
TaskStarter.wrapRuntimeProvisioner(
target
.runtimeProvisioners()
.filter(p -> p.name().equals(operation.type().substring(1)))
.findAny()
.orElseThrow()),
operation.recoveryState()))
.start(this, operation, p4.createTerminal(operation));
} else {
TaskStarter.of(
operation.type(),
recoverType.prepare(
TaskStarter.wrapOutputProvisioner(
target.provisionerFor(OutputProvisionFormat.valueOf(operation.type()))),
operation.recoveryState()))
.start(this, operation, p4.createTerminal(operation));
}
}
}}
}
break;
case CLEANUP:
final var p5 = new Phase5Cleanup(definition, workflow);
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
inTransaction(workflow::succeeded);
} else {
for (final var operation : activeOperations) {
TaskStarter.of("", target.engine().cleanup().recover(operation.recoveryState()))
.start(this, operation, p5.createTerminal(operation));
}}
for (final var operation : activeOperations) {
TaskStarter.of("", target.engine().cleanup().recover(operation.recoveryState()))
.start(this, operation, p5.createTerminal(operation));
}
}
break;
case FAILED:
throw new UnsupportedOperationException();
Expand Down

0 comments on commit 8082e9d

Please sign in to comment.