diff --git a/docker/github/docker-compose.yaml b/docker/github/docker-compose.yaml index a2d270b00..f01de7fba 100644 --- a/docker/github/docker-compose.yaml +++ b/docker/github/docker-compose.yaml @@ -2,9 +2,9 @@ version: '3.5' services: elasticsearch: - image: elasticsearch:7.16.2 - logging: - driver: none + image: elasticsearch:7.10.1 + # logging: + # driver: none expose: - 9200 environment: diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/ExecutionId.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/ExecutionId.java index 63333a016..8ec9ae841 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/ExecutionId.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/ExecutionId.java @@ -30,17 +30,17 @@ import java.io.IOException; import java.util.Objects; -final class ExecutionId { +public final class ExecutionId { private final String namespace; private final WorkflowExecution execution; - ExecutionId(String namespace, WorkflowExecution execution) { + public ExecutionId(String namespace, WorkflowExecution execution) { this.namespace = Objects.requireNonNull(namespace); this.execution = Objects.requireNonNull(execution); } - ExecutionId(String namespace, String workflowId, String runId) { + public ExecutionId(String namespace, String workflowId, String runId) { this( namespace, WorkflowExecution.newBuilder() diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 627b46f91..4c73e628f 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -1748,8 +1748,16 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) { .build()) .build()); - service.completeNexusOperation( - ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get()); + try { + service.completeNexusOperation( + ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get()); + } catch (StatusRuntimeException e) { + // Callback destination not found should not block processing the callbacks nor + // completing the workflow. + if (e.getStatus().getCode() != Status.Code.NOT_FOUND) { + throw e; + } + } } } diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdConflictPolicyTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdConflictPolicyTest.java index 467dc7a2c..3f4d61a9b 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdConflictPolicyTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdConflictPolicyTest.java @@ -22,7 +22,6 @@ import static java.util.UUID.randomUUID; -import com.google.protobuf.ByteString; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.temporal.api.common.v1.Callback; @@ -40,6 +39,8 @@ import io.temporal.api.workflowservice.v1.StartWorkflowExecutionResponse; import io.temporal.client.*; import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.internal.testservice.ExecutionId; +import io.temporal.internal.testservice.NexusOperationRef; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.testserver.functional.common.TestWorkflows; import io.temporal.workflow.Workflow; @@ -97,6 +98,11 @@ public void conflictPolicyUseExisting() { // Different request ID should still work but update history String newRequestId = randomUUID().toString(); + NexusOperationRef ref = + new NexusOperationRef( + new ExecutionId( + "some-random-namespace", "some-random-workflow-id", "some-random-run-id"), + 1); StartWorkflowExecutionRequest request2 = request1.toBuilder() .setRequestId(newRequestId) @@ -107,15 +113,17 @@ public void conflictPolicyUseExisting() { .setAttachLinks(true)) .addCompletionCallbacks( Callback.newBuilder() - .setInternal( - Callback.Internal.newBuilder() - .setData(ByteString.copyFromUtf8("some-random-callback-data")))) + .setNexus( + Callback.Nexus.newBuilder() + .setUrl("http://localhost/test") + .putHeader("operation-reference", ref.toBytes().toStringUtf8()))) .addLinks( Link.newBuilder() .setWorkflowEvent( Link.WorkflowEvent.newBuilder() .setNamespace("some-random-namespace") - .setWorkflowId("some-random-workflow-id"))) + .setWorkflowId("some-random-workflow-id") + .setRunId("some-random-run-id"))) .build(); StartWorkflowExecutionResponse response2 = @@ -143,14 +151,13 @@ public void conflictPolicyUseExisting() { // up at this point because there a workflow task running. So, I'm signaling // the workflow so it will complete. workflowStub.signal(); - WorkflowStub.fromTyped(workflowStub).getResult(Void.class); + workflowStub.execute(); WorkflowExecutionHistory history = testWorkflowRule.getExecutionHistory(workflowId); List events = history.getEvents().stream() .filter( - item -> - item.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED) + ev -> ev.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED) .collect(Collectors.toList()); Assert.assertEquals(1, events.size()); HistoryEvent event = events.get(0); @@ -161,19 +168,19 @@ public void conflictPolicyUseExisting() { Assert.assertEquals(newRequestId, attrs.getAttachedRequestId()); Assert.assertEquals(1, attrs.getAttachedCompletionCallbacksCount()); Assert.assertEquals( - "some-random-callback-data", - attrs.getAttachedCompletionCallbacks(0).getInternal().getData().toStringUtf8()); + "http://localhost/test", attrs.getAttachedCompletionCallbacks(0).getNexus().getUrl()); Assert.assertEquals(1, event.getLinksCount()); Assert.assertEquals( "some-random-namespace", event.getLinks(0).getWorkflowEvent().getNamespace()); Assert.assertEquals( "some-random-workflow-id", event.getLinks(0).getWorkflowEvent().getWorkflowId()); + Assert.assertEquals("some-random-run-id", event.getLinks(0).getWorkflowEvent().getRunId()); DescribeWorkflowAsserter asserter = describe(we); Assert.assertEquals(1, asserter.getActual().getCallbacksCount()); Assert.assertEquals( - "some-random-callback-data", - asserter.getActual().getCallbacks(0).getCallback().getInternal().getData().toStringUtf8()); + "http://localhost/test", + asserter.getActual().getCallbacks(0).getCallback().getNexus().getUrl()); } @Test @@ -185,11 +192,11 @@ public void conflictPolicyFail() { .setTaskQueue(testWorkflowRule.getTaskQueue()) .build(); - TestWorkflows.WorkflowWithSignal workflowStub1 = + TestWorkflows.WorkflowWithSignal workflowStub = testWorkflowRule .getWorkflowClient() .newWorkflowStub(TestWorkflows.WorkflowWithSignal.class, options); - WorkflowClient.start(workflowStub1::execute); + WorkflowClient.start(workflowStub::execute); // Same workflow ID with conflict policy FAIL StartWorkflowExecutionRequest request1 = @@ -197,6 +204,8 @@ public void conflictPolicyFail() { .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) .setWorkflowId(workflowId) .setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL) + .setWorkflowType(WorkflowType.newBuilder().setName("WorkflowWithSignal")) + .setTaskQueue(TaskQueue.newBuilder().setName(testWorkflowRule.getTaskQueue())) .build(); StatusRuntimeException e = @@ -226,7 +235,6 @@ public void conflictPolicyFail() { .getWorkflowServiceStubs() .blockingStub() .startWorkflowExecution(request2)); - Assert.assertEquals(Status.Code.ALREADY_EXISTS, e.getStatus().getCode()); }