Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Mar 3, 2025
1 parent 1e69607 commit 171f29a
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 23 deletions.
6 changes: 3 additions & 3 deletions docker/github/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ version: '3.5'

services:
elasticsearch:
image: elasticsearch:7.16.2
logging:
driver: none
image: elasticsearch:7.10.1
# logging:
# driver: none
expose:
- 9200
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@
import java.io.IOException;
import java.util.Objects;

final class ExecutionId {
public final class ExecutionId {

private final String namespace;
private final WorkflowExecution execution;

ExecutionId(String namespace, WorkflowExecution execution) {
public ExecutionId(String namespace, WorkflowExecution execution) {
this.namespace = Objects.requireNonNull(namespace);
this.execution = Objects.requireNonNull(execution);
}

ExecutionId(String namespace, String workflowId, String runId) {
public ExecutionId(String namespace, String workflowId, String runId) {
this(
namespace,
WorkflowExecution.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,8 +1748,16 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
.build())
.build());

service.completeNexusOperation(
ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get());
try {
service.completeNexusOperation(
ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get());
} catch (StatusRuntimeException e) {
// Callback destination not found should not block processing the callbacks nor
// completing the workflow.
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
throw e;
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import static java.util.UUID.randomUUID;

import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.Callback;
Expand All @@ -40,6 +39,8 @@
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionResponse;
import io.temporal.client.*;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.internal.testservice.ExecutionId;
import io.temporal.internal.testservice.NexusOperationRef;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.testserver.functional.common.TestWorkflows;
import io.temporal.workflow.Workflow;
Expand Down Expand Up @@ -97,6 +98,11 @@ public void conflictPolicyUseExisting() {

// Different request ID should still work but update history
String newRequestId = randomUUID().toString();
NexusOperationRef ref =
new NexusOperationRef(
new ExecutionId(
"some-random-namespace", "some-random-workflow-id", "some-random-run-id"),
1);
StartWorkflowExecutionRequest request2 =
request1.toBuilder()
.setRequestId(newRequestId)
Expand All @@ -107,15 +113,17 @@ public void conflictPolicyUseExisting() {
.setAttachLinks(true))
.addCompletionCallbacks(
Callback.newBuilder()
.setInternal(
Callback.Internal.newBuilder()
.setData(ByteString.copyFromUtf8("some-random-callback-data"))))
.setNexus(
Callback.Nexus.newBuilder()
.setUrl("http://localhost/test")
.putHeader("operation-reference", ref.toBytes().toStringUtf8())))
.addLinks(
Link.newBuilder()
.setWorkflowEvent(
Link.WorkflowEvent.newBuilder()
.setNamespace("some-random-namespace")
.setWorkflowId("some-random-workflow-id")))
.setWorkflowId("some-random-workflow-id")
.setRunId("some-random-run-id")))
.build();

StartWorkflowExecutionResponse response2 =
Expand Down Expand Up @@ -143,14 +151,13 @@ public void conflictPolicyUseExisting() {
// up at this point because there a workflow task running. So, I'm signaling
// the workflow so it will complete.
workflowStub.signal();
WorkflowStub.fromTyped(workflowStub).getResult(Void.class);
workflowStub.execute();

WorkflowExecutionHistory history = testWorkflowRule.getExecutionHistory(workflowId);
List<HistoryEvent> events =
history.getEvents().stream()
.filter(
item ->
item.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED)
ev -> ev.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED)
.collect(Collectors.toList());
Assert.assertEquals(1, events.size());
HistoryEvent event = events.get(0);
Expand All @@ -161,19 +168,19 @@ public void conflictPolicyUseExisting() {
Assert.assertEquals(newRequestId, attrs.getAttachedRequestId());
Assert.assertEquals(1, attrs.getAttachedCompletionCallbacksCount());
Assert.assertEquals(
"some-random-callback-data",
attrs.getAttachedCompletionCallbacks(0).getInternal().getData().toStringUtf8());
"http://localhost/test", attrs.getAttachedCompletionCallbacks(0).getNexus().getUrl());
Assert.assertEquals(1, event.getLinksCount());
Assert.assertEquals(
"some-random-namespace", event.getLinks(0).getWorkflowEvent().getNamespace());
Assert.assertEquals(
"some-random-workflow-id", event.getLinks(0).getWorkflowEvent().getWorkflowId());
Assert.assertEquals("some-random-run-id", event.getLinks(0).getWorkflowEvent().getRunId());

DescribeWorkflowAsserter asserter = describe(we);
Assert.assertEquals(1, asserter.getActual().getCallbacksCount());
Assert.assertEquals(
"some-random-callback-data",
asserter.getActual().getCallbacks(0).getCallback().getInternal().getData().toStringUtf8());
"http://localhost/test",
asserter.getActual().getCallbacks(0).getCallback().getNexus().getUrl());
}

@Test
Expand All @@ -185,18 +192,20 @@ public void conflictPolicyFail() {
.setTaskQueue(testWorkflowRule.getTaskQueue())
.build();

TestWorkflows.WorkflowWithSignal workflowStub1 =
TestWorkflows.WorkflowWithSignal workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestWorkflows.WorkflowWithSignal.class, options);
WorkflowClient.start(workflowStub1::execute);
WorkflowClient.start(workflowStub::execute);

// Same workflow ID with conflict policy FAIL
StartWorkflowExecutionRequest request1 =
StartWorkflowExecutionRequest.newBuilder()
.setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace())
.setWorkflowId(workflowId)
.setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL)
.setWorkflowType(WorkflowType.newBuilder().setName("WorkflowWithSignal"))
.setTaskQueue(TaskQueue.newBuilder().setName(testWorkflowRule.getTaskQueue()))
.build();

StatusRuntimeException e =
Expand Down Expand Up @@ -226,7 +235,6 @@ public void conflictPolicyFail() {
.getWorkflowServiceStubs()
.blockingStub()
.startWorkflowExecution(request2));

Assert.assertEquals(Status.Code.ALREADY_EXISTS, e.getStatus().getCode());
}

Expand Down

0 comments on commit 171f29a

Please sign in to comment.