From 44f562e1f962874d52b79431e0efecfdb95771dd Mon Sep 17 00:00:00 2001 From: rodrigozhou Date: Tue, 25 Feb 2025 14:33:18 -0800 Subject: [PATCH] fixes --- .../internal/testservice/StateMachines.java | 13 --- .../testservice/TestWorkflowMutableState.java | 2 +- .../TestWorkflowMutableStateImpl.java | 91 ++++++++++--------- .../testservice/TestWorkflowService.java | 23 ++++- 4 files changed, 68 insertions(+), 61 deletions(-) diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 2527923c2..b34379825 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -132,8 +132,6 @@ static final class WorkflowData { Functions.Proc runTimerCancellationHandle; - private final Set requestIds = new HashSet<>(); - WorkflowData( Optional retryState, Duration backoffStartInterval, @@ -155,14 +153,6 @@ static final class WorkflowData { this.lastFailure = Objects.requireNonNull(lastFailure); } - boolean hasRequestId(@Nonnull String requestId) { - return requestIds.contains(requestId); - } - - void addRequestId(@Nonnull String requestId) { - requestIds.add(requestId); - } - @Override public String toString() { return "WorkflowData{" @@ -183,9 +173,6 @@ public String toString() { + '\'' + ", continuedExecutionRunId=" + continuedExecutionRunId - + '\'' - + ", requestIds=" - + requestIds + '}'; } } diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java index ee1877f1b..e00c31420 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java @@ -146,5 +146,5 @@ PollWorkflowExecutionUpdateResponse pollUpdateWorkflowExecution( boolean isTerminalState(); - boolean hasRequestId(String requestId); + boolean isRequestIdAttached(String requestId); } diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index f548b23d4..f1fe72be5 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -137,6 +137,8 @@ private interface UpdateProcedure { new ConcurrentHashMap<>(); public StickyExecutionAttributes stickyExecutionAttributes; private Map currentMemo; + private final Set attachedRequestIds = new HashSet<>(); + private final List completionCallbacks = new ArrayList<>(); /** * @param retryState present if workflow is a retry @@ -185,6 +187,7 @@ private interface UpdateProcedure { this.workflow = StateMachines.newWorkflowStateMachine(data); this.workflowTaskStateMachine = StateMachines.newWorkflowTaskStateMachine(store, startRequest); this.currentMemo = new HashMap(startRequest.getMemo().getFieldsMap()); + this.completionCallbacks.addAll(startRequest.getCompletionCallbacksList()); } /** Based on overrideStartWorkflowExecutionRequest from historyEngine.go */ @@ -616,48 +619,25 @@ public void completeWorkflowTask( @Override public void applyOnConflictOptions(@Nonnull StartWorkflowExecutionRequest request) { - OnConflictOptions options = request.getOnConflictOptions(); - - if (options.getAttachCompletionCallbacks() && !options.getAttachRequestId()) { - throw Status.INVALID_ARGUMENT - .withDescription( - "on_conflict_options: attach_completion_callbacks cannot be 'true' if attach_request_id is 'false'.") - .asRuntimeException(); - } - - WorkflowExecutionOptionsUpdatedEventAttributes.Builder attrs = - WorkflowExecutionOptionsUpdatedEventAttributes.newBuilder(); - - lock.lock(); - try { - if (options.getAttachRequestId()) { - attrs.setAttachedRequestId(request.getRequestId()); - } - - if (options.getAttachCompletionCallbacks() - && !request.getCompletionCallbacksList().isEmpty()) { - attrs.addAllAttachedCompletionCallbacks(request.getCompletionCallbacksList()); - } - - HistoryEvent.Builder eventBuilder = - HistoryEvent.newBuilder() - .setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED) - .setWorkflowExecutionOptionsUpdatedEventAttributes(attrs); - if (options.getAttachLinks()) { - eventBuilder.addAllLinks(request.getLinksList()); - } + update( + ctx -> { + OnConflictOptions options = request.getOnConflictOptions(); + String requestId = null; + List completionCallbacks = null; + List links = null; - RequestContext ctx = new RequestContext(clock, this, nextEventId); - ctx.addEvent(eventBuilder.build()); - nextEventId = ctx.commitChanges(store); + if (options.getAttachRequestId()) { + requestId = request.getRequestId(); + } + if (options.getAttachCompletionCallbacks()) { + completionCallbacks = request.getCompletionCallbacksList(); + } + if (options.getAttachLinks()) { + links = request.getLinksList(); + } - StateMachines.WorkflowData data = workflow.getData(); - if (options.getAttachRequestId()) { - data.addRequestId(request.getRequestId()); - } - } finally { - lock.unlock(); - } + addWorkflowExecutionOptionsUpdatedEvent(ctx, requestId, completionCallbacks, links); + }); } private void failWorkflowTaskWithAReason( @@ -1743,7 +1723,7 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) { } }); - for (Callback cb : startRequest.getCompletionCallbacksList()) { + for (Callback cb : completionCallbacks) { if (!cb.hasNexus()) { // test server only supports nexus callbacks currently log.warn("skipping non-nexus completion callback"); @@ -2033,8 +2013,8 @@ public boolean isTerminalState() { } @Override - public boolean hasRequestId(@Nonnull String requestId) { - return workflow.getData().hasRequestId(requestId); + public boolean isRequestIdAttached(@Nonnull String requestId) { + return attachedRequestIds.contains(requestId); } private void updateHeartbeatTimer( @@ -3481,6 +3461,31 @@ private void addExecutionSignaledByExternalEvent( ctx.addEvent(executionSignaled); } + private void addWorkflowExecutionOptionsUpdatedEvent( + RequestContext ctx, String requestId, List completionCallbacks, List links) { + WorkflowExecutionOptionsUpdatedEventAttributes.Builder attrs = + WorkflowExecutionOptionsUpdatedEventAttributes.newBuilder(); + if (requestId != null) { + attrs.setAttachedRequestId(requestId); + this.attachedRequestIds.add(requestId); + } + if (completionCallbacks != null) { + attrs.addAllAttachedCompletionCallbacks(completionCallbacks); + this.completionCallbacks.addAll(completionCallbacks); + } + + HistoryEvent.Builder event = + HistoryEvent.newBuilder() + .setWorkerMayIgnore(true) + .setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED) + .setWorkflowExecutionOptionsUpdatedEventAttributes(attrs); + if (links != null) { + event.addAllLinks(links); + } + + ctx.addEvent(event.build()); + } + private StateMachine getPendingActivityById(String activityId) { Long scheduledEventId = activityById.get(activityId); if (scheduledEventId == null) { diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 5320be304..b8a5a5078 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -55,6 +55,7 @@ import io.temporal.api.testservice.v1.SleepRequest; import io.temporal.api.testservice.v1.TestServiceGrpc; import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest; +import io.temporal.api.workflow.v1.OnConflictOptions; import io.temporal.api.workflow.v1.WorkflowExecutionInfo; import io.temporal.api.workflowservice.v1.*; import io.temporal.internal.common.ProtoUtils; @@ -258,17 +259,18 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl( "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy."); } + validateOnConflictOptions(startRequest); + TestWorkflowMutableState existing; lock.lock(); try { String newRunId = UUID.randomUUID().toString(); - existing = executionsByWorkflowId.get(workflowId); if (existing != null) { WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus(); if (status == WORKFLOW_EXECUTION_STATUS_RUNNING) { - StartWorkflowExecutionResponse dedupedResponse = dedupedRequest(startRequest, existing); + StartWorkflowExecutionResponse dedupedResponse = dedupeRequest(startRequest, existing); if (dedupedResponse != null) { return dedupedResponse; } @@ -298,6 +300,7 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl( existing.applyOnConflictOptions(startRequest); } return StartWorkflowExecutionResponse.newBuilder() + .setStarted(false) .setRunId(existing.getExecutionId().getExecution().getRunId()) .build(); } else { @@ -370,6 +373,17 @@ private StartWorkflowExecutionResponse throwDuplicatedWorkflow( WorkflowExecutionAlreadyStartedFailure.getDescriptor()); } + private void validateOnConflictOptions(StartWorkflowExecutionRequest startRequest) { + if (!startRequest.hasOnConflictOptions()) { + return; + } + OnConflictOptions options = startRequest.getOnConflictOptions(); + if (options.getAttachCompletionCallbacks() && !options.getAttachRequestId()) { + throw createInvalidArgument( + "Invalid OnConflictOptions: AttachCompletionCallbacks cannot be 'true' if AttachRequestId is 'false'."); + } + } + private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked( StartWorkflowExecutionRequest startRequest, @Nonnull String runId, @@ -1898,7 +1912,7 @@ public WorkflowServiceStubs newClientStub() { return workflowServiceStubs; } - private StartWorkflowExecutionResponse dedupedRequest( + private StartWorkflowExecutionResponse dedupeRequest( StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existingWorkflow) { String requestId = startRequest.getRequestId(); String existingRequestId = existingWorkflow.getStartRequest().getRequestId(); @@ -1909,8 +1923,9 @@ private StartWorkflowExecutionResponse dedupedRequest( .build(); } - if (existingWorkflow.hasRequestId(requestId)) { + if (existingWorkflow.isRequestIdAttached(requestId)) { return StartWorkflowExecutionResponse.newBuilder() + .setStarted(false) .setRunId(existingWorkflow.getExecutionId().getExecution().getRunId()) .build(); }