Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix provision out recovery when some or all operations are succeeded #389

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/fix_p4_recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
recovery of provision out phase when all or some operations are succeeded
224 changes: 126 additions & 98 deletions vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.lang.System.Logger.Level;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -35,7 +36,7 @@
import java.util.stream.Stream;

public abstract class BaseProcessor<
W extends ActiveWorkflow<PO, TX>, PO extends ActiveOperation<TX>, TX>
W extends ActiveWorkflow<PO, TX>, PO extends ActiveOperation<TX>, TX>
implements TransactionManager<TX>, FileResolver {

<State extends Record, Output> OperationControlFlow<State, Output> createNext(
Expand Down Expand Up @@ -70,6 +71,7 @@ private interface PhaseManager<W, R, N, PO> {
}

interface TerminalHandler<Output> {

void failed();

JsonNode serialize(Output output);
Expand All @@ -79,6 +81,7 @@ interface TerminalHandler<Output> {

private final class TerminalOperationControlFlow<State extends Record, Output>
implements OperationControlFlow<State, Output> {

private boolean finished;
private final PO operation;
private final TerminalHandler<Output> handler;
Expand Down Expand Up @@ -167,6 +170,7 @@ public W workflow() {
}

private class Phase1Preflight implements PhaseManager<W, Boolean, JsonMutation, PO> {

private final W activeWorkflow;
private final WorkflowDefinition definition;
private boolean ok;
Expand Down Expand Up @@ -229,9 +233,9 @@ public void release(Boolean result) {
p ->
activeWorkflow.arguments().has(p.name())
? p.type()
.apply(
new ExtractRetryValues(
mapper(), activeWorkflow.arguments().get(p.name())))
.apply(
new ExtractRetryValues(
mapper(), activeWorkflow.arguments().get(p.name())))
: Stream.empty())
.distinct()
.collect(Collectors.toMap(Function.identity(), i -> new ArrayList<>()));
Expand Down Expand Up @@ -282,22 +286,22 @@ public void release(Boolean result) {
parameter ->
activeWorkflow.arguments().has(parameter.name())
? parameter
.type()
.apply(
new ExtractInputExternalIds(
mapper(),
activeWorkflow.arguments().get(parameter.name()),
BaseProcessor.this))
.type()
.apply(
new ExtractInputExternalIds(
mapper(),
activeWorkflow.arguments().get(parameter.name()),
BaseProcessor.this))
: Stream.empty())
.map(
ei ->
new ExternalId(
((ExternalId) ei).getProvider(), ((ExternalId) ei).getId()))
.collect(Collectors.toSet());
if (activeWorkflow
.extraInputIdsHandled() // Set to true when in Remaining or All case
.extraInputIdsHandled() // Set to true when in Remaining or All case
? discoveredExternalIds.containsAll(
outputRequestedExternalIds) // Doesn't need to be equal in this case
outputRequestedExternalIds) // Doesn't need to be equal in this case
: discoveredExternalIds.equals(outputRequestedExternalIds)) {
startNextPhase(this, provisionInTasks, transaction);
} else {
Expand Down Expand Up @@ -581,7 +585,8 @@ public JsonNode serialize(ProvisionData result) {
.visit(
new ResultVisitor() {
@Override
public void file(String storagePath, String checksum, String checksumType, long size, String metatype) {
public void file(String storagePath, String checksum, String checksumType,
long size, String metatype) {
final var file = node.putObject("result");
file.put("path", storagePath);
file.put("checksum", checksum);
Expand Down Expand Up @@ -610,7 +615,8 @@ public void succeeded(ProvisionData result) {
new ResultVisitor() {
@Override
public void file(
String storagePath, String checksum, String checksumType, long size, String metatype) {
String storagePath, String checksum, String checksumType, long size,
String metatype) {
workflow()
.provisionFile(
result.ids(),
Expand Down Expand Up @@ -680,7 +686,8 @@ private Phase5Cleanup(WorkflowDefinition definition, W activeWorkflow) {
public TerminalHandler<Void> createTerminal(PO operation) {
return new TerminalHandler<>() {
@Override
public void failed() {}
public void failed() {
}

@Override
public JsonNode serialize(Void result) {
Expand Down Expand Up @@ -759,12 +766,12 @@ public static Stream<String> validateInput(
o ->
metadata.has(o.name())
? o.type()
.apply(
new ValidateOutputMetadata(
mapper,
target,
"\"" + o.name() + "\"",
metadata.get(o.name())))
.apply(
new ValidateOutputMetadata(
mapper,
target,
"\"" + o.name() + "\"",
metadata.get(o.name())))
: Stream.of("Missing metadata attribute " + o.name())),
workflow
.parameters()
Expand Down Expand Up @@ -796,8 +803,8 @@ public static Stream<String> validateInput(
.orElseGet(
() ->
engineParameters == null
|| engineParameters.isNull()
|| engineParameters.isEmpty()
|| engineParameters.isNull()
|| engineParameters.isEmpty()
? Stream.empty()
: Stream.of(
"Workflow engine does not support engine parameters, but they"
Expand Down Expand Up @@ -832,44 +839,46 @@ protected void recover(
final var p1 =
new Phase1Preflight(
target, activeOperations.size(), workflow, definition, workflow.isPreflightOkay());
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
p1.release(true);
} else {
for (final var operation : activeOperations) {
TaskStarter.of(
operation.type(),
target
.provisionerFor(WorkflowOutputDataType.valueOf(operation.type()).format())
.runPreflight()
.recover(operation.recoveryState()))
.start(this, operation, p1.createTerminal(operation));
}}
for (final var operation : activeOperations) {
TaskStarter.of(
operation.type(),
target
.provisionerFor(WorkflowOutputDataType.valueOf(operation.type()).format())
.runPreflight()
.recover(operation.recoveryState()))
.start(this, operation, p1.createTerminal(operation));
}
}
break;
case PROVISION_IN:
final var p2 = new Phase2ProvisionIn(target, activeOperations.size(), definition, workflow);
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
inTransaction(
transaction -> {
startNextPhase(p2, List.of(TaskStarter.launch(p2.definition(), p2.activeWorkflow,
target.engine(), p2.activeWorkflow.realInputs().get(0))), transaction);
}
);
} else {
for (final var operation : activeOperations) {
PrepareInputProvisioning.recover(
definition.language(),
operation,
target.provisionerFor(
InputProvisionFormat.valueOf(operation.type().substring(1))))
.start(this, operation, p2.createTerminal(operation));
}}
for (final var operation : activeOperations) {
PrepareInputProvisioning.recover(
definition.language(),
operation,
target.provisionerFor(
InputProvisionFormat.valueOf(operation.type().substring(1))))
.start(this, operation, p2.createTerminal(operation));
}
}
break;
case RUNNING:
/*
* The constructor of Phase3Run enforces that activeOperations.size() >1 is illegal
*/
final var p3 = new Phase3Run(target, definition, activeOperations.size(), workflow);
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
inTransaction(
transaction -> {
// if cleanup state is not null, do cleanup
Expand All @@ -885,38 +894,38 @@ protected void recover(
Set<ExternalId> remainingIds = new HashSet<>(allIds);
remainingIds.removeAll(workflow.requestedExternalIds());
JsonNode workflowRunUrl = recovery.get("workflowRunUrl");
if (null != workflowRunUrl && !(workflowRunUrl instanceof NullNode)){
if (null != workflowRunUrl && !(workflowRunUrl instanceof NullNode)) {
target.runtimeProvisioners().forEach(
p ->
tasks.add(
TaskStarter.launch(p, allIds, Map.of(), workflowRunUrl.asText())
)
);

if(definition.outputs().allMatch(
if (definition.outputs().allMatch(
output -> {
final AtomicBoolean isOk = new AtomicBoolean(true);
JsonNode jsonOutput = recovery.get("output");
if(jsonOutput != null && !(jsonOutput instanceof NullNode)
&& jsonOutput.has(output.name())){
if (jsonOutput != null && !(jsonOutput instanceof NullNode)
&& jsonOutput.has(output.name())) {
output.type().apply(
new PrepareOutputProvisioning(
mapper(),
target,
jsonOutput.get(output.name()),
workflow.metadata().get(output.name()),
allIds,
remainingIds,
() -> isOk.set(false),
workflow.id()))
new PrepareOutputProvisioning(
mapper(),
target,
jsonOutput.get(output.name()),
workflow.metadata().get(output.name()),
allIds,
remainingIds,
() -> isOk.set(false),
workflow.id()))
.forEach(tasks::add);
return isOk.get();
} else {
return false;
}
}
)){
// launch phase 4
)) {
// launch phase 4
startNextPhase(p3, tasks, transaction);
} else {
workflow.phase(Phase.FAILED, Collections.emptyList(), transaction);
Expand All @@ -925,57 +934,76 @@ protected void recover(
}
);
} else {
for (final var operation : activeOperations) {
TaskStarter.of(
"",
target
.engine()
.run()
.map(WorkflowEngine.Result::serialize)
.recover(operation.recoveryState()))
.start(this, operation, p3.createTerminal(operation));
}}
for (final var operation : activeOperations) {
TaskStarter.of(
"",
target
.engine()
.run()
.map(WorkflowEngine.Result::serialize)
.recover(operation.recoveryState()))
.start(this, operation, p3.createTerminal(operation));
}
}
break;
case PROVISION_OUT:
final var p4 =
new Phase4ProvisionOut(target, definition, activeOperations.size(), workflow);
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
// TODO advance p4
// but we also don't have the result here...
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
// SUCCEEDED means we've already created the file entries in the db, so all that's left to do is clean up
inTransaction(
transaction -> {
final var cleanup = workflow.cleanup();
if (cleanup == null) {
workflow.succeeded(transaction);
} else {
startNextPhase(
p4,
List.of(TaskStarter.launchCleanup(target.engine(), cleanup)),
transaction
);
}
});
} else {
for (final var operation : activeOperations) {
if (operation.type().startsWith("$")) {
TaskStarter.of(
operation.type(),
recoverType.prepare(
TaskStarter.wrapRuntimeProvisioner(
target
.runtimeProvisioners()
.filter(p -> p.name().equals(operation.type().substring(1)))
.findAny()
.orElseThrow()),
operation.recoveryState()))
.start(this, operation, p4.createTerminal(operation));
} else {
TaskStarter.of(
operation.type(),
recoverType.prepare(
TaskStarter.wrapOutputProvisioner(
target.provisionerFor(OutputProvisionFormat.valueOf(operation.type()))),
operation.recoveryState()))
.start(this, operation, p4.createTerminal(operation));
for (final var operation : activeOperations) {
if (operation.status().equals(OperationStatus.SUCCEEDED)) {
// only launch the non-succeeded operations
continue;
}
if (operation.type().startsWith("$")) {
TaskStarter.of(
operation.type(),
recoverType.prepare(
TaskStarter.wrapRuntimeProvisioner(
target
.runtimeProvisioners()
.filter(p -> p.name().equals(operation.type().substring(1)))
.findAny()
.orElseThrow()),
operation.recoveryState()))
.start(this, operation, p4.createTerminal(operation));
} else {
TaskStarter.of(
operation.type(),
recoverType.prepare(
TaskStarter.wrapOutputProvisioner(
target.provisionerFor(OutputProvisionFormat.valueOf(operation.type()))),
operation.recoveryState()))
.start(this, operation, p4.createTerminal(operation));
}
}
}}
}
break;
case CLEANUP:
final var p5 = new Phase5Cleanup(definition, workflow);
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))){
if (activeOperations.stream().allMatch(o -> o.status().equals(OperationStatus.SUCCEEDED))) {
inTransaction(workflow::succeeded);
} else {
for (final var operation : activeOperations) {
TaskStarter.of("", target.engine().cleanup().recover(operation.recoveryState()))
.start(this, operation, p5.createTerminal(operation));
}}
for (final var operation : activeOperations) {
TaskStarter.of("", target.engine().cleanup().recover(operation.recoveryState()))
.start(this, operation, p5.createTerminal(operation));
}
}
break;
case FAILED:
throw new UnsupportedOperationException();
Expand Down