diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java index 9b34f9f59..e383611a9 100644 --- a/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java @@ -34,6 +34,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.worker.WorkerFactoryOptions; @@ -83,10 +84,14 @@ public class TestNexusServiceImpl { @OperationImpl public OperationHandler operation() { return WorkflowClientOperationHandlers.fromWorkflowMethod( - (context, details, client, input) -> - client.newWorkflowStub( - TestOtherWorkflow.class, - WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + (context, details, input) -> + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + TestOtherWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) ::workflow); } } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java index cf8af8d95..fda964191 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java @@ -28,7 +28,8 @@ * Intercepts inbound calls to a Nexus operation on the worker side. * *

An instance should be created in {@link - * WorkerInterceptor#interceptNexusOperation(NexusOperationInboundCallsInterceptor)}. + * WorkerInterceptor#interceptNexusOperation(OperationContext, + * NexusOperationInboundCallsInterceptor)}. * *

Prefer extending {@link NexusOperationInboundCallsInterceptorBase} and overriding only the * methods you need instead of implementing this interface directly. {@link diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java index 1f3d80015..65b20ad86 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java @@ -21,6 +21,7 @@ package io.temporal.common.interceptors; import com.uber.m3.tally.Scope; +import io.temporal.client.WorkflowClient; import io.temporal.common.Experimental; /** @@ -41,4 +42,7 @@ public interface NexusOperationOutboundCallsInterceptor { /** Intercepts call to get the metric scope in a Nexus operation. */ Scope getMetricsScope(); + + /** Intercepts call to get the workflow client in a Nexus operation. */ + WorkflowClient getWorkflowClient(); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java index 26ad01481..1efc78a38 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java @@ -21,6 +21,7 @@ package io.temporal.common.interceptors; import com.uber.m3.tally.Scope; +import io.temporal.client.WorkflowClient; import io.temporal.common.Experimental; /** Convenience base class for {@link NexusOperationOutboundCallsInterceptor} implementations. */ @@ -37,4 +38,9 @@ public NexusOperationOutboundCallsInterceptorBase(NexusOperationOutboundCallsInt public Scope getMetricsScope() { return next.getMetricsScope(); } + + @Override + public WorkflowClient getWorkflowClient() { + return next.getWorkflowClient(); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/CurrentNexusOperationContext.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/CurrentNexusOperationContext.java index 99ca97b3e..f670f22e1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/CurrentNexusOperationContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/CurrentNexusOperationContext.java @@ -25,10 +25,10 @@ * used directly. */ public final class CurrentNexusOperationContext { - private static final ThreadLocal CURRENT = new ThreadLocal<>(); + private static final ThreadLocal CURRENT = new ThreadLocal<>(); - public static NexusOperationContextImpl get() { - NexusOperationContextImpl result = CURRENT.get(); + public static InternalNexusOperationContext get() { + InternalNexusOperationContext result = CURRENT.get(); if (result == null) { throw new IllegalStateException( "NexusOperationContext can be used only inside of nexus operation handler " @@ -37,7 +37,7 @@ public static NexusOperationContextImpl get() { return CURRENT.get(); } - public static void set(NexusOperationContextImpl context) { + public static void set(InternalNexusOperationContext context) { if (context == null) { throw new IllegalArgumentException("null context"); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java similarity index 67% rename from temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java rename to temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java index e159d775e..e5268f063 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java @@ -25,26 +25,23 @@ import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; import io.temporal.nexus.NexusOperationContext; -public class NexusOperationContextImpl implements NexusOperationContext { +public class InternalNexusOperationContext { private final String namespace; private final String taskQueue; + private final Scope metricScope; private final WorkflowClient client; NexusOperationOutboundCallsInterceptor outboundCalls; - public NexusOperationContextImpl( - String namespace, - String taskQueue, - WorkflowClient client, - NexusOperationOutboundCallsInterceptor outboundCalls) { + public InternalNexusOperationContext( + String namespace, String taskQueue, Scope metricScope, WorkflowClient client) { this.namespace = namespace; this.taskQueue = taskQueue; + this.metricScope = metricScope; this.client = client; - this.outboundCalls = outboundCalls; } - @Override public Scope getMetricsScope() { - return outboundCalls.getMetricsScope(); + return metricScope; } public WorkflowClient getWorkflowClient() { @@ -62,4 +59,23 @@ public String getNamespace() { public void setOutboundInterceptor(NexusOperationOutboundCallsInterceptor outboundCalls) { this.outboundCalls = outboundCalls; } + + public NexusOperationContext getUserFacingContext() { + if (outboundCalls == null) { + throw new IllegalStateException("Outbound interceptor is not set"); + } + return new NexusOperationContextImpl(); + } + + private class NexusOperationContextImpl implements NexusOperationContext { + @Override + public Scope getMetricsScope() { + return outboundCalls.getMetricsScope(); + } + + @Override + public WorkflowClient getWorkflowClient() { + return outboundCalls.getWorkflowClient(); + } + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInternal.java index 56565f4ff..bef253552 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInternal.java @@ -26,6 +26,6 @@ public final class NexusInternal { private NexusInternal() {} public static NexusOperationContext getOperationContext() { - return CurrentNexusOperationContext.get(); + return CurrentNexusOperationContext.get().getUserFacingContext(); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index a31a69f9a..fb7a0d0d3 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -132,11 +132,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException } CurrentNexusOperationContext.set( - new NexusOperationContextImpl( - namespace, - taskQueue, - client, - new RootNexusOperationOutboundCallsInterceptor(metricsScope))); + new InternalNexusOperationContext(namespace, taskQueue, metricsScope, client)); switch (request.getVariantCase()) { case START_OPERATION: diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java index 1b16f87fe..961be5c60 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java @@ -21,18 +21,26 @@ package io.temporal.internal.nexus; import com.uber.m3.tally.Scope; +import io.temporal.client.WorkflowClient; import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; public class RootNexusOperationOutboundCallsInterceptor implements NexusOperationOutboundCallsInterceptor { private final Scope scope; + private final WorkflowClient client; - RootNexusOperationOutboundCallsInterceptor(Scope scope) { + RootNexusOperationOutboundCallsInterceptor(Scope scope, WorkflowClient client) { this.scope = scope; + this.client = client; } @Override public Scope getMetricsScope() { return scope; } + + @Override + public WorkflowClient getWorkflowClient() { + return client; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java index abc7bc919..b9536f13c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java @@ -44,9 +44,10 @@ public OperationHandler intercept( interceptor.interceptNexusOperation(context, inboundCallsInterceptor); } + InternalNexusOperationContext temporalNexusContext = CurrentNexusOperationContext.get(); inboundCallsInterceptor.init( new RootNexusOperationOutboundCallsInterceptor( - CurrentNexusOperationContext.get().getMetricsScope())); + temporalNexusContext.getMetricsScope(), temporalNexusContext.getWorkflowClient())); return new OperationInterceptorConverter(inboundCallsInterceptor); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java b/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java index 6642c3548..9e027f23c 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java @@ -21,6 +21,7 @@ package io.temporal.nexus; import com.uber.m3.tally.Scope; +import io.temporal.client.WorkflowClient; import io.temporal.common.Experimental; import io.temporal.serviceclient.WorkflowServiceStubsOptions; @@ -39,4 +40,10 @@ public interface NexusOperationContext { * WorkflowServiceStubsOptions.Builder#setMetricsScope(Scope)} when a worker starts up. */ Scope getMetricsScope(); + + /** + * Get a {@link WorkflowClient} that can be used to start interact with the Temporal service from + * a Nexus handler. + */ + WorkflowClient getWorkflowClient(); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/SynchronousWorkflowClientOperationFunction.java b/temporal-sdk/src/main/java/io/temporal/nexus/SynchronousWorkflowClientOperationFunction.java deleted file mode 100644 index d3f5c9610..000000000 --- a/temporal-sdk/src/main/java/io/temporal/nexus/SynchronousWorkflowClientOperationFunction.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. - * - * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this material except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.temporal.nexus; - -import io.nexusrpc.OperationUnsuccessfulException; -import io.nexusrpc.handler.OperationContext; -import io.nexusrpc.handler.OperationStartDetails; -import io.temporal.client.WorkflowClient; -import io.temporal.common.Experimental; -import javax.annotation.Nullable; - -/** - * Function interface for {@link WorkflowClientOperationHandlers#sync} representing a call made for - * every operation call that takes a {@link WorkflowClient}. - */ -@FunctionalInterface -@Experimental -public interface SynchronousWorkflowClientOperationFunction { - @Nullable - R apply( - OperationContext ctx, OperationStartDetails details, WorkflowClient client, @Nullable T input) - throws OperationUnsuccessfulException; -} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowClientOperationHandlers.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowClientOperationHandlers.java index c41a9f0e7..62bebf71c 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowClientOperationHandlers.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowClientOperationHandlers.java @@ -22,27 +22,11 @@ import io.nexusrpc.handler.*; import io.nexusrpc.handler.OperationHandler; -import io.temporal.client.WorkflowClient; import io.temporal.common.Experimental; -import io.temporal.internal.nexus.CurrentNexusOperationContext; -import io.temporal.internal.nexus.NexusOperationContextImpl; /** WorkflowClientOperationHandlers can be used to create Temporal specific OperationHandlers */ @Experimental public final class WorkflowClientOperationHandlers { - /** - * Helper to create {@link io.nexusrpc.handler.OperationHandler} instances that take a {@link - * io.temporal.client.WorkflowClient}. - */ - public static OperationHandler sync( - SynchronousWorkflowClientOperationFunction func) { - return io.nexusrpc.handler.OperationHandler.sync( - (OperationContext ctx, OperationStartDetails details, T input) -> { - NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get(); - return func.apply(ctx, details, nexusCtx.getWorkflowClient(), input); - }); - } - /** * Maps a workflow method to an {@link io.nexusrpc.handler.OperationHandler}. * @@ -52,9 +36,8 @@ public static OperationHandler sync( public static OperationHandler fromWorkflowMethod( WorkflowMethodFactory startMethod) { return new RunWorkflowOperation<>( - (OperationContext context, OperationStartDetails details, WorkflowClient client, T input) -> - WorkflowHandle.fromWorkflowMethod( - startMethod.apply(context, details, client, input), input)); + (OperationContext context, OperationStartDetails details, T input) -> + WorkflowHandle.fromWorkflowMethod(startMethod.apply(context, details, input), input)); } /** diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleFactory.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleFactory.java index a5b0e14be..6e868989b 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleFactory.java @@ -37,6 +37,5 @@ public interface WorkflowHandleFactory { * through the provided {@link WorkflowClient}. */ @Nullable - WorkflowHandle apply( - OperationContext context, OperationStartDetails details, WorkflowClient client, T input); + WorkflowHandle apply(OperationContext context, OperationStartDetails details, T input); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodFactory.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodFactory.java index f59ff75db..a460e79b6 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodFactory.java @@ -40,6 +40,5 @@ public interface WorkflowMethodFactory { * provided {@link WorkflowClient}. */ @Nullable - Functions.Func1 apply( - OperationContext context, OperationStartDetails details, WorkflowClient client, T input); + Functions.Func1 apply(OperationContext context, OperationStartDetails details, T input); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodMethodInvoker.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodMethodInvoker.java index 8512646eb..fdf228c47 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodMethodInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowMethodMethodInvoker.java @@ -24,7 +24,7 @@ import io.temporal.internal.client.NexusStartWorkflowRequest; import io.temporal.internal.client.WorkflowClientInternal; import io.temporal.internal.nexus.CurrentNexusOperationContext; -import io.temporal.internal.nexus.NexusOperationContextImpl; +import io.temporal.internal.nexus.InternalNexusOperationContext; import io.temporal.workflow.Functions; class WorkflowMethodMethodInvoker implements WorkflowHandleInvoker { @@ -36,7 +36,7 @@ public WorkflowMethodMethodInvoker(Functions.Proc workflow) { @Override public WorkflowExecution invoke(NexusStartWorkflowRequest request) { - NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get(); + InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); return ((WorkflowClientInternal) nexusCtx.getWorkflowClient().getInternal()) .startNexus(request, workflow); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java index 96791ccfe..2221b95d3 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java @@ -32,7 +32,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.internal.client.NexusStartWorkflowRequest; import io.temporal.internal.nexus.CurrentNexusOperationContext; -import io.temporal.internal.nexus.NexusOperationContextImpl; +import io.temporal.internal.nexus.InternalNexusOperationContext; import java.net.URISyntaxException; class RunWorkflowOperation implements OperationHandler { @@ -45,10 +45,9 @@ class RunWorkflowOperation implements OperationHandler { @Override public OperationStartResult start( OperationContext ctx, OperationStartDetails operationStartDetails, T input) { - NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get(); + InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); - WorkflowHandle handle = - handleFactory.apply(ctx, operationStartDetails, nexusCtx.getWorkflowClient(), input); + WorkflowHandle handle = handleFactory.apply(ctx, operationStartDetails, input); NexusStartWorkflowRequest nexusRequest = new NexusStartWorkflowRequest( @@ -73,7 +72,7 @@ public OperationStartResult start( io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink); try { OperationStartResult.Builder result = - OperationStartResult.newAsyncBuilder(workflowExec.getWorkflowId()); + OperationStartResult.newAsyncBuilder(workflowExec.getWorkflowId()); if (nexusLink != null) { result.addLink(nexusProtoLinkToLink(nexusLink)); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java index 261e6e967..210477ad2 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java @@ -24,6 +24,7 @@ import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.WorkflowReplayer; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -129,12 +130,14 @@ public class TestNexusServiceImpl { @OperationImpl public OperationHandler operation() { return WorkflowClientOperationHandlers.fromWorkflowMethod( - (context, details, client, input) -> - client.newWorkflowStub( - OperationWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(WORKFLOW_ID_PREFIX + details.getRequestId()) - .build()) + (context, details, input) -> + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + OperationWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID_PREFIX + details.getRequestId()) + .build()) ::execute); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java index bb8859748..eb939ce90 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java @@ -29,6 +29,7 @@ import io.temporal.client.WorkflowOptions; import io.temporal.failure.CanceledFailure; import io.temporal.failure.NexusOperationFailure; +import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.testing.internal.TracingWorkerInterceptor; @@ -135,10 +136,14 @@ public class TestNexusServiceImpl { @OperationImpl public OperationHandler operation() { return WorkflowClientOperationHandlers.fromWorkflowMethod( - (context, details, client, input) -> - client.newWorkflowStub( - AsyncWorkflowOperationTest.OperationWorkflow.class, - WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + (context, details, input) -> + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + AsyncWorkflowOperationTest.OperationWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) ::execute); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericListOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericListOperationTest.java index 1277111b5..3dd7afff5 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericListOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericListOperationTest.java @@ -26,7 +26,6 @@ import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; import io.temporal.common.converter.EncodedValuesTest; -import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.NexusServiceOptions; import io.temporal.workflow.Workflow; @@ -81,8 +80,8 @@ public static class TestNexusServiceImpl { @OperationImpl public OperationHandler, List> operation() { - return WorkflowClientOperationHandlers.sync( - (context, details, client, input) -> { + return OperationHandler.sync( + (context, details, input) -> { return input; }); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ParallelWorkflowOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ParallelWorkflowOperationTest.java index 154e500de..b9458ca1c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ParallelWorkflowOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ParallelWorkflowOperationTest.java @@ -24,6 +24,7 @@ import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.WorkflowReplayer; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -111,10 +112,14 @@ public class TestNexusServiceImpl { @OperationImpl public OperationHandler operation() { return WorkflowClientOperationHandlers.fromWorkflowMethod( - (context, details, client, input) -> - client.newWorkflowStub( - OperationWorkflow.class, - WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + (context, details, input) -> + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + OperationWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) ::execute); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ProtoOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ProtoOperationTest.java index dc386d63d..31df8eb4d 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ProtoOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ProtoOperationTest.java @@ -28,7 +28,7 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; -import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.nexus.Nexus; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.*; import io.temporal.workflow.shared.TestWorkflows; @@ -86,9 +86,13 @@ public class TestNexusServiceImpl { @OperationImpl public OperationHandler describeWorkflow() { - return WorkflowClientOperationHandlers.sync( - (context, details, client, input) -> - client.getWorkflowServiceStubs().blockingStub().describeWorkflowExecution(input)); + return OperationHandler.sync( + (context, details, input) -> + Nexus.getOperationContext() + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkflowExecution(input)); } } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java index d774875dd..5da665882 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java @@ -31,7 +31,6 @@ import io.temporal.common.reporter.TestStatsReporter; import io.temporal.failure.ApplicationFailure; import io.temporal.nexus.Nexus; -import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.serviceclient.MetricsTag; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.testing.internal.TracingWorkerInterceptor; @@ -158,13 +157,14 @@ public class TestNexusServiceImpl { @OperationImpl public OperationHandler operation() { // Implemented inline - return WorkflowClientOperationHandlers.sync( - (ctx, details, client, id) -> { + return OperationHandler.sync( + (ctx, details, id) -> { if (id.isEmpty()) { throw ApplicationFailure.newNonRetryableFailure("Invalid ID", "TestError"); } Nexus.getOperationContext().getMetricsScope().counter("operation").inc(1); - return client + return Nexus.getOperationContext() + .getWorkflowClient() .newWorkflowStub(TestUpdatedWorkflow.class, id) .update("Update from operation"); }); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/TerminateWorkflowAsyncOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/TerminateWorkflowAsyncOperationTest.java index 3a231216f..22b5f1d98 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/TerminateWorkflowAsyncOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/TerminateWorkflowAsyncOperationTest.java @@ -30,6 +30,7 @@ import io.temporal.failure.ApplicationFailure; import io.temporal.failure.NexusOperationFailure; import io.temporal.failure.TerminatedFailure; +import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.*; @@ -109,18 +110,25 @@ public class TestNexusServiceImpl { @OperationImpl public OperationHandler operation() { return WorkflowClientOperationHandlers.fromWorkflowMethod( - (context, details, client, input) -> - client.newWorkflowStub( - AsyncWorkflowOperationTest.OperationWorkflow.class, - WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + (context, details, input) -> + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + AsyncWorkflowOperationTest.OperationWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) ::execute); } @OperationImpl public OperationHandler terminate() { - return WorkflowClientOperationHandlers.sync( - (context, details, client, workflowId) -> { - client.newUntypedWorkflowStub(workflowId).terminate("terminate for test"); + return OperationHandler.sync( + (context, details, workflowId) -> { + Nexus.getOperationContext() + .getWorkflowClient() + .newUntypedWorkflowStub(workflowId) + .terminate("terminate for test"); return "terminated"; }); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/VoidOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/VoidOperationTest.java index 85e273c4b..4b70ae7f7 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/VoidOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/VoidOperationTest.java @@ -25,7 +25,6 @@ import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; -import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.NexusServiceOptions; import io.temporal.workflow.NexusServiceStub; @@ -82,7 +81,7 @@ public interface TestNexusService { public static class TestNexusServiceImpl { @OperationImpl public OperationHandler noop() { - return WorkflowClientOperationHandlers.sync((context, details, client, input) -> null); + return OperationHandler.sync((context, details, input) -> null); } } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleFuncTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleFuncTest.java index 83ffd0498..d4197ce18 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleFuncTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleFuncTest.java @@ -25,7 +25,9 @@ import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.nexus.WorkflowHandle; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -85,7 +87,8 @@ public class TestNexusServiceFuncImpl { @OperationImpl public OperationHandler operation() { return WorkflowClientOperationHandlers.fromWorkflowHandle( - (context, details, client, input) -> { + (context, details, input) -> { + WorkflowClient client = Nexus.getOperationContext().getWorkflowClient(); switch (input) { case 0: return WorkflowHandle.fromWorkflowMethod( diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleProcTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleProcTest.java index c951bf8d7..eb669080a 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleProcTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleProcTest.java @@ -25,7 +25,9 @@ import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.nexus.WorkflowHandle; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -86,7 +88,8 @@ public class TestNexusServiceFuncImpl { @OperationImpl public OperationHandler operation() { return WorkflowClientOperationHandlers.fromWorkflowHandle( - (context, details, client, input) -> { + (context, details, input) -> { + WorkflowClient client = Nexus.getOperationContext().getWorkflowClient(); switch (input) { case 0: return WorkflowHandle.fromWorkflowMethod( diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleStubTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleStubTest.java index af45c069b..51074cb4c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleStubTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowHandleStubTest.java @@ -25,7 +25,9 @@ import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.nexus.WorkflowHandle; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -86,7 +88,8 @@ public class TestNexusServiceFuncImpl { @OperationImpl public OperationHandler operation() { return WorkflowClientOperationHandlers.fromWorkflowHandle( - (context, details, client, input) -> { + (context, details, input) -> { + WorkflowClient client = Nexus.getOperationContext().getWorkflowClient(); switch (input) { case 0: return WorkflowHandle.fromWorkflowStub( diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java index fa70ac7ee..e35eb55cc 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java @@ -30,6 +30,7 @@ import io.temporal.api.history.v1.HistoryEvent; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; +import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.*; @@ -137,10 +138,14 @@ public class TestNexusServiceImpl { @OperationImpl public OperationHandler operation() { return WorkflowClientOperationHandlers.fromWorkflowMethod( - (context, details, client, input) -> - client.newWorkflowStub( - AsyncWorkflowOperationTest.OperationWorkflow.class, - WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + (context, details, input) -> + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + AsyncWorkflowOperationTest.OperationWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(details.getRequestId()) + .build()) ::execute); } }