Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OnConflictOptions Support #2415

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,18 @@

package io.temporal.internal.testservice;

import static io.temporal.internal.common.LinkConverter.*;
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
import static io.temporal.internal.testservice.StateMachines.Action.START;
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
import static io.temporal.internal.testservice.StateMachines.State.NONE;
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
import static io.temporal.internal.common.LinkConverter.nexusLinkToWorkflowEvent;
import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
import static io.temporal.internal.testservice.StateMachines.Action.*;
import static io.temporal.internal.testservice.StateMachines.State.*;
import static java.util.Optional.ofNullable;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.*;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import io.grpc.Status;
Expand All @@ -66,7 +51,8 @@
import io.temporal.api.query.v1.WorkflowQueryResult;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.update.v1.*;
import io.temporal.api.update.v1.Acceptance;
import io.temporal.api.update.v1.Outcome;
import io.temporal.api.update.v1.Request;
import io.temporal.api.update.v1.Response;
import io.temporal.api.workflowservice.v1.*;
Expand Down Expand Up @@ -148,6 +134,8 @@ static final class WorkflowData {

Functions.Proc runTimerCancellationHandle;

private final Set<String> requestIds = new HashSet<>();

WorkflowData(
Optional<TestServiceRetryState> retryState,
Duration backoffStartInterval,
Expand All @@ -169,6 +157,14 @@ static final class WorkflowData {
this.lastFailure = Objects.requireNonNull(lastFailure);
}

boolean hasRequestId(@Nonnull String requestId) {
return requestIds.contains(requestId);
}

void addRequestId(@Nonnull String requestId) {
requestIds.add(requestId);
}

@Override
public String toString() {
return "WorkflowData{"
Expand All @@ -189,6 +185,9 @@ public String toString() {
+ '\''
+ ", continuedExecutionRunId="
+ continuedExecutionRunId
+ '\''
+ ", requestIds="
+ requestIds
+ '}';
}
}
Expand Down Expand Up @@ -246,7 +245,7 @@ void clear() {
}

Optional<UpdateWorkflowExecution> getUpdateRequest(String protocolInstanceId) {
return Optional.ofNullable(
return ofNullable(
updateRequest.getOrDefault(
protocolInstanceId, updateRequestBuffer.get(protocolInstanceId)));
}
Expand Down Expand Up @@ -2000,7 +1999,7 @@ private static State timeoutActivityTask(

// chaining with the previous run failure if we are preparing the final failure
Failure failure =
newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
newTimeoutFailure(timeoutType, ofNullable(data.heartbeatDetails), previousFailure);

RetryState retryState;
switch (timeoutType) {
Expand Down Expand Up @@ -2047,7 +2046,7 @@ private static State timeoutActivityTask(
failure =
newTimeoutFailure(
TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
Optional.ofNullable(data.heartbeatDetails),
ofNullable(data.heartbeatDetails),
cause);
}
break;
Expand Down Expand Up @@ -2098,7 +2097,7 @@ private static RetryState attemptActivityRetry(
}
if (info.get().hasNextRetryDelay()) {
nextRetryDelay =
Optional.ofNullable(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
ofNullable(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ void startWorkflowTask(

void completeWorkflowTask(int historySize, RespondWorkflowTaskCompletedRequest request);

StartWorkflowExecutionResponse applyOnConflictOptions(StartWorkflowExecutionRequest request);

void reportCancelRequested(ExternalWorkflowExecutionCancelRequestedEventAttributes a);

void completeSignalExternalWorkflowExecution(String signalId, String runId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -613,6 +614,59 @@ public void completeWorkflowTask(
request.hasStickyAttributes() ? request.getStickyAttributes() : null);
}

@Override
public StartWorkflowExecutionResponse applyOnConflictOptions(
@Nonnull StartWorkflowExecutionRequest request) {
lock.lock();
try {
StateMachines.WorkflowData data = workflow.getData();
OnConflictOptions options = request.getOnConflictOptions();

StartWorkflowExecutionResponse response =
StartWorkflowExecutionResponse.newBuilder()
.setRunId(getExecutionId().getExecution().getRunId())
.build();
if (options.getAttachRequestId()) {
boolean hasRequestId = data.hasRequestId(request.getRequestId());
if (hasRequestId) {
return response;
}
}

WorkflowExecutionOptionsUpdatedEventAttributes.Builder attrs =
WorkflowExecutionOptionsUpdatedEventAttributes.newBuilder();
if (options.getAttachRequestId()) {
attrs.setAttachedRequestId(request.getRequestId());
}

if (options.getAttachCompletionCallbacks()
&& !request.getCompletionCallbacksList().isEmpty()) {
attrs.addAllAttachedCompletionCallbacks(request.getCompletionCallbacksList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we resolve all these callbacks we are attaching? I can't find it anywhere in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applyOnConflictOptions calls ctx.commitChanges(store); The save implementation makes a call to fireCallbacks. Is that where we would expect callbacks to be invoked?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was assuming that we would trigger any callback attached to the workflow, that should be validated when tests are added.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already resolving. I just had to change to get the callbacks from this new attribute I added to TestWorkflowMutableStateImpl instead of reading from the startRequest.

}

if (options.getAttachLinks()) {
HistoryEvent event =
HistoryEvent.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED)
.setWorkflowExecutionOptionsUpdatedEventAttributes(attrs)
.addAllLinks(request.getLinksList())
.build();

RequestContext ctx = new RequestContext(clock, this, nextEventId);
ctx.addEvent(event);
store.save(ctx);
}

if (options.getAttachRequestId()) {
data.addRequestId(request.getRequestId());
}

return response;
} finally {
lock.unlock();
}
}

private void failWorkflowTaskWithAReason(
WorkflowTaskFailedCause failedCause,
ServerFailure eventAttributesFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package io.temporal.internal.testservice;

import static io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED;
import static io.temporal.api.enums.v1.WorkflowExecutionStatus.*;
import static io.temporal.api.enums.v1.WorkflowIdReusePolicy.*;
import static io.temporal.api.workflowservice.v1.ExecuteMultiOperationRequest.Operation.OperationCase.START_WORKFLOW;
import static io.temporal.api.workflowservice.v1.ExecuteMultiOperationRequest.Operation.OperationCase.UPDATE_WORKFLOW;
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
Expand Down Expand Up @@ -248,7 +250,7 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
WorkflowIdReusePolicy reusePolicy = startRequest.getWorkflowIdReusePolicy();
WorkflowIdConflictPolicy conflictPolicy = startRequest.getWorkflowIdConflictPolicy();
if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
&& reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
&& reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
throw createInvalidArgument(
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
}
Expand All @@ -261,40 +263,41 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
if (existing != null) {
WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();

if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
&& (reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
|| conflictPolicy
== WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING)) {
existing.terminateWorkflowExecution(
TerminateWorkflowExecutionRequest.newBuilder()
.setNamespace(startRequest.getNamespace())
.setWorkflowExecution(existing.getExecutionId().getExecution())
.setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
.setIdentity("history-service")
.setDetails(
Payloads.newBuilder()
.addPayloads(
Payload.newBuilder()
.setData(
ByteString.copyFromUtf8(
String.format("terminated by new runID: %s", newRunId)))
.build())
.build())
.build());
} else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
&& conflictPolicy
== WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) {
return StartWorkflowExecutionResponse.newBuilder()
.setStarted(false)
.setRunId(existing.getExecutionId().getExecution().getRunId())
.build();
} else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
|| reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
return throwDuplicatedWorkflow(startRequest, existing);
} else if (reusePolicy
== WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
&& (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
|| status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
if (status == WORKFLOW_EXECUTION_STATUS_RUNNING) {
if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some edge cases that we handled in the server (or at least planned to) for reuse policy terminate if running and the different conflict policies.

Someone can address that in a follow up PR though.

CC @rodrigozhou

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rodrigozhou have you reviewed this code and verified the different edge cases?
Can wait for a follow up PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do in a follow up PR, this one is already quite big.

|| conflictPolicy
== WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING) {
existing.terminateWorkflowExecution(
TerminateWorkflowExecutionRequest.newBuilder()
.setNamespace(startRequest.getNamespace())
.setWorkflowExecution(existing.getExecutionId().getExecution())
.setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
.setIdentity("history-service")
.setDetails(
Payloads.newBuilder()
.addPayloads(
Payload.newBuilder()
.setData(
ByteString.copyFromUtf8(
String.format("terminated by new runID: %s", newRunId)))
.build())
.build())
.build());
} else if (conflictPolicy
== WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) {
if (startRequest.hasOnConflictOptions()) {
return existing.applyOnConflictOptions(startRequest);
}
return StartWorkflowExecutionResponse.newBuilder()
.setRunId(existing.getExecutionId().getExecution().getRunId())
.build();
} else {
return throwDuplicatedWorkflow(startRequest, existing);
}
} else if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE
|| (reusePolicy == WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
&& (status == WORKFLOW_EXECUTION_STATUS_COMPLETED
|| status == WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW))) {
return throwDuplicatedWorkflow(startRequest, existing);
}
}
Expand Down
Loading
Loading