Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Feb 25, 2025
1 parent c6b9925 commit 44f562e
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ static final class WorkflowData {

Functions.Proc runTimerCancellationHandle;

private final Set<String> requestIds = new HashSet<>();

WorkflowData(
Optional<TestServiceRetryState> retryState,
Duration backoffStartInterval,
Expand All @@ -155,14 +153,6 @@ static final class WorkflowData {
this.lastFailure = Objects.requireNonNull(lastFailure);
}

boolean hasRequestId(@Nonnull String requestId) {
return requestIds.contains(requestId);
}

void addRequestId(@Nonnull String requestId) {
requestIds.add(requestId);
}

@Override
public String toString() {
return "WorkflowData{"
Expand All @@ -183,9 +173,6 @@ public String toString() {
+ '\''
+ ", continuedExecutionRunId="
+ continuedExecutionRunId
+ '\''
+ ", requestIds="
+ requestIds
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,5 @@ PollWorkflowExecutionUpdateResponse pollUpdateWorkflowExecution(

boolean isTerminalState();

boolean hasRequestId(String requestId);
boolean isRequestIdAttached(String requestId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ private interface UpdateProcedure {
new ConcurrentHashMap<>();
public StickyExecutionAttributes stickyExecutionAttributes;
private Map<String, Payload> currentMemo;
private final Set<String> attachedRequestIds = new HashSet<>();
private final List<Callback> completionCallbacks = new ArrayList<>();

/**
* @param retryState present if workflow is a retry
Expand Down Expand Up @@ -185,6 +187,7 @@ private interface UpdateProcedure {
this.workflow = StateMachines.newWorkflowStateMachine(data);
this.workflowTaskStateMachine = StateMachines.newWorkflowTaskStateMachine(store, startRequest);
this.currentMemo = new HashMap(startRequest.getMemo().getFieldsMap());
this.completionCallbacks.addAll(startRequest.getCompletionCallbacksList());
}

/** Based on overrideStartWorkflowExecutionRequest from historyEngine.go */
Expand Down Expand Up @@ -616,48 +619,25 @@ public void completeWorkflowTask(

@Override
public void applyOnConflictOptions(@Nonnull StartWorkflowExecutionRequest request) {
OnConflictOptions options = request.getOnConflictOptions();

if (options.getAttachCompletionCallbacks() && !options.getAttachRequestId()) {
throw Status.INVALID_ARGUMENT
.withDescription(
"on_conflict_options: attach_completion_callbacks cannot be 'true' if attach_request_id is 'false'.")
.asRuntimeException();
}

WorkflowExecutionOptionsUpdatedEventAttributes.Builder attrs =
WorkflowExecutionOptionsUpdatedEventAttributes.newBuilder();

lock.lock();
try {
if (options.getAttachRequestId()) {
attrs.setAttachedRequestId(request.getRequestId());
}

if (options.getAttachCompletionCallbacks()
&& !request.getCompletionCallbacksList().isEmpty()) {
attrs.addAllAttachedCompletionCallbacks(request.getCompletionCallbacksList());
}

HistoryEvent.Builder eventBuilder =
HistoryEvent.newBuilder()
.setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED)
.setWorkflowExecutionOptionsUpdatedEventAttributes(attrs);
if (options.getAttachLinks()) {
eventBuilder.addAllLinks(request.getLinksList());
}
update(
ctx -> {
OnConflictOptions options = request.getOnConflictOptions();
String requestId = null;
List<Callback> completionCallbacks = null;
List<Link> links = null;

RequestContext ctx = new RequestContext(clock, this, nextEventId);
ctx.addEvent(eventBuilder.build());
nextEventId = ctx.commitChanges(store);
if (options.getAttachRequestId()) {
requestId = request.getRequestId();
}
if (options.getAttachCompletionCallbacks()) {
completionCallbacks = request.getCompletionCallbacksList();
}
if (options.getAttachLinks()) {
links = request.getLinksList();
}

StateMachines.WorkflowData data = workflow.getData();
if (options.getAttachRequestId()) {
data.addRequestId(request.getRequestId());
}
} finally {
lock.unlock();
}
addWorkflowExecutionOptionsUpdatedEvent(ctx, requestId, completionCallbacks, links);
});
}

private void failWorkflowTaskWithAReason(
Expand Down Expand Up @@ -1743,7 +1723,7 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
}
});

for (Callback cb : startRequest.getCompletionCallbacksList()) {
for (Callback cb : completionCallbacks) {
if (!cb.hasNexus()) {
// test server only supports nexus callbacks currently
log.warn("skipping non-nexus completion callback");
Expand Down Expand Up @@ -2033,8 +2013,8 @@ public boolean isTerminalState() {
}

@Override
public boolean hasRequestId(@Nonnull String requestId) {
return workflow.getData().hasRequestId(requestId);
public boolean isRequestIdAttached(@Nonnull String requestId) {
return attachedRequestIds.contains(requestId);
}

private void updateHeartbeatTimer(
Expand Down Expand Up @@ -3481,6 +3461,31 @@ private void addExecutionSignaledByExternalEvent(
ctx.addEvent(executionSignaled);
}

private void addWorkflowExecutionOptionsUpdatedEvent(
RequestContext ctx, String requestId, List<Callback> completionCallbacks, List<Link> links) {
WorkflowExecutionOptionsUpdatedEventAttributes.Builder attrs =
WorkflowExecutionOptionsUpdatedEventAttributes.newBuilder();
if (requestId != null) {
attrs.setAttachedRequestId(requestId);
this.attachedRequestIds.add(requestId);
}
if (completionCallbacks != null) {
attrs.addAllAttachedCompletionCallbacks(completionCallbacks);
this.completionCallbacks.addAll(completionCallbacks);
}

HistoryEvent.Builder event =
HistoryEvent.newBuilder()
.setWorkerMayIgnore(true)
.setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED)
.setWorkflowExecutionOptionsUpdatedEventAttributes(attrs);
if (links != null) {
event.addAllLinks(links);
}

ctx.addEvent(event.build());
}

private StateMachine<ActivityTaskData> getPendingActivityById(String activityId) {
Long scheduledEventId = activityById.get(activityId);
if (scheduledEventId == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.temporal.api.testservice.v1.SleepRequest;
import io.temporal.api.testservice.v1.TestServiceGrpc;
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
import io.temporal.api.workflow.v1.OnConflictOptions;
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.internal.common.ProtoUtils;
Expand Down Expand Up @@ -258,17 +259,18 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
}

validateOnConflictOptions(startRequest);

TestWorkflowMutableState existing;
lock.lock();
try {
String newRunId = UUID.randomUUID().toString();

existing = executionsByWorkflowId.get(workflowId);
if (existing != null) {
WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();

if (status == WORKFLOW_EXECUTION_STATUS_RUNNING) {
StartWorkflowExecutionResponse dedupedResponse = dedupedRequest(startRequest, existing);
StartWorkflowExecutionResponse dedupedResponse = dedupeRequest(startRequest, existing);
if (dedupedResponse != null) {
return dedupedResponse;
}
Expand Down Expand Up @@ -298,6 +300,7 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
existing.applyOnConflictOptions(startRequest);
}
return StartWorkflowExecutionResponse.newBuilder()
.setStarted(false)
.setRunId(existing.getExecutionId().getExecution().getRunId())
.build();
} else {
Expand Down Expand Up @@ -370,6 +373,17 @@ private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
WorkflowExecutionAlreadyStartedFailure.getDescriptor());
}

private void validateOnConflictOptions(StartWorkflowExecutionRequest startRequest) {
if (!startRequest.hasOnConflictOptions()) {
return;
}
OnConflictOptions options = startRequest.getOnConflictOptions();
if (options.getAttachCompletionCallbacks() && !options.getAttachRequestId()) {
throw createInvalidArgument(
"Invalid OnConflictOptions: AttachCompletionCallbacks cannot be 'true' if AttachRequestId is 'false'.");
}
}

private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
StartWorkflowExecutionRequest startRequest,
@Nonnull String runId,
Expand Down Expand Up @@ -1898,7 +1912,7 @@ public WorkflowServiceStubs newClientStub() {
return workflowServiceStubs;
}

private StartWorkflowExecutionResponse dedupedRequest(
private StartWorkflowExecutionResponse dedupeRequest(
StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existingWorkflow) {
String requestId = startRequest.getRequestId();
String existingRequestId = existingWorkflow.getStartRequest().getRequestId();
Expand All @@ -1909,8 +1923,9 @@ private StartWorkflowExecutionResponse dedupedRequest(
.build();
}

if (existingWorkflow.hasRequestId(requestId)) {
if (existingWorkflow.isRequestIdAttached(requestId)) {
return StartWorkflowExecutionResponse.newBuilder()
.setStarted(false)
.setRunId(existingWorkflow.getExecutionId().getExecution().getRunId())
.build();
}
Expand Down

0 comments on commit 44f562e

Please sign in to comment.