Skip to content

Commit

Permalink
Remove Nexus sync client handler (#2403)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Feb 7, 2025
1 parent fd65ea9 commit 32df8d4
Show file tree
Hide file tree
Showing 29 changed files with 158 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.nexus.Nexus;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerFactoryOptions;
Expand Down Expand Up @@ -83,10 +84,14 @@ public class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<String, String> operation() {
return WorkflowClientOperationHandlers.fromWorkflowMethod(
(context, details, client, input) ->
client.newWorkflowStub(
TestOtherWorkflow.class,
WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build())
(context, details, input) ->
Nexus.getOperationContext()
.getWorkflowClient()
.newWorkflowStub(
TestOtherWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(details.getRequestId())
.build())
::workflow);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
* Intercepts inbound calls to a Nexus operation on the worker side.
*
* <p>An instance should be created in {@link
* WorkerInterceptor#interceptNexusOperation(NexusOperationInboundCallsInterceptor)}.
* WorkerInterceptor#interceptNexusOperation(OperationContext,
* NexusOperationInboundCallsInterceptor)}.
*
* <p>Prefer extending {@link NexusOperationInboundCallsInterceptorBase} and overriding only the
* methods you need instead of implementing this interface directly. {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.common.interceptors;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;

/**
Expand All @@ -41,4 +42,7 @@
public interface NexusOperationOutboundCallsInterceptor {
/** Intercepts call to get the metric scope in a Nexus operation. */
Scope getMetricsScope();

/** Intercepts call to get the workflow client in a Nexus operation. */
WorkflowClient getWorkflowClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.common.interceptors;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;

/** Convenience base class for {@link NexusOperationOutboundCallsInterceptor} implementations. */
Expand All @@ -37,4 +38,9 @@ public NexusOperationOutboundCallsInterceptorBase(NexusOperationOutboundCallsInt
public Scope getMetricsScope() {
return next.getMetricsScope();
}

@Override
public WorkflowClient getWorkflowClient() {
return next.getWorkflowClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
* used directly.
*/
public final class CurrentNexusOperationContext {
private static final ThreadLocal<NexusOperationContextImpl> CURRENT = new ThreadLocal<>();
private static final ThreadLocal<InternalNexusOperationContext> CURRENT = new ThreadLocal<>();

public static NexusOperationContextImpl get() {
NexusOperationContextImpl result = CURRENT.get();
public static InternalNexusOperationContext get() {
InternalNexusOperationContext result = CURRENT.get();
if (result == null) {
throw new IllegalStateException(
"NexusOperationContext can be used only inside of nexus operation handler "
Expand All @@ -37,7 +37,7 @@ public static NexusOperationContextImpl get() {
return CURRENT.get();
}

public static void set(NexusOperationContextImpl context) {
public static void set(InternalNexusOperationContext context) {
if (context == null) {
throw new IllegalArgumentException("null context");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,23 @@
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationContext;

public class NexusOperationContextImpl implements NexusOperationContext {
public class InternalNexusOperationContext {
private final String namespace;
private final String taskQueue;
private final Scope metricScope;
private final WorkflowClient client;
NexusOperationOutboundCallsInterceptor outboundCalls;

public NexusOperationContextImpl(
String namespace,
String taskQueue,
WorkflowClient client,
NexusOperationOutboundCallsInterceptor outboundCalls) {
public InternalNexusOperationContext(
String namespace, String taskQueue, Scope metricScope, WorkflowClient client) {
this.namespace = namespace;
this.taskQueue = taskQueue;
this.metricScope = metricScope;
this.client = client;
this.outboundCalls = outboundCalls;
}

@Override
public Scope getMetricsScope() {
return outboundCalls.getMetricsScope();
return metricScope;
}

public WorkflowClient getWorkflowClient() {
Expand All @@ -62,4 +59,23 @@ public String getNamespace() {
public void setOutboundInterceptor(NexusOperationOutboundCallsInterceptor outboundCalls) {
this.outboundCalls = outboundCalls;
}

public NexusOperationContext getUserFacingContext() {
if (outboundCalls == null) {
throw new IllegalStateException("Outbound interceptor is not set");
}
return new NexusOperationContextImpl();
}

private class NexusOperationContextImpl implements NexusOperationContext {
@Override
public Scope getMetricsScope() {
return outboundCalls.getMetricsScope();
}

@Override
public WorkflowClient getWorkflowClient() {
return outboundCalls.getWorkflowClient();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public final class NexusInternal {
private NexusInternal() {}

public static NexusOperationContext getOperationContext() {
return CurrentNexusOperationContext.get();
return CurrentNexusOperationContext.get().getUserFacingContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
}

CurrentNexusOperationContext.set(
new NexusOperationContextImpl(
namespace,
taskQueue,
client,
new RootNexusOperationOutboundCallsInterceptor(metricsScope)));
new InternalNexusOperationContext(namespace, taskQueue, metricsScope, client));

switch (request.getVariantCase()) {
case START_OPERATION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,26 @@
package io.temporal.internal.nexus;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;

public class RootNexusOperationOutboundCallsInterceptor
implements NexusOperationOutboundCallsInterceptor {
private final Scope scope;
private final WorkflowClient client;

RootNexusOperationOutboundCallsInterceptor(Scope scope) {
RootNexusOperationOutboundCallsInterceptor(Scope scope, WorkflowClient client) {
this.scope = scope;
this.client = client;
}

@Override
public Scope getMetricsScope() {
return scope;
}

@Override
public WorkflowClient getWorkflowClient() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ public OperationHandler<Object, Object> intercept(
interceptor.interceptNexusOperation(context, inboundCallsInterceptor);
}

InternalNexusOperationContext temporalNexusContext = CurrentNexusOperationContext.get();
inboundCallsInterceptor.init(
new RootNexusOperationOutboundCallsInterceptor(
CurrentNexusOperationContext.get().getMetricsScope()));
temporalNexusContext.getMetricsScope(), temporalNexusContext.getWorkflowClient()));
return new OperationInterceptorConverter(inboundCallsInterceptor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.nexus;

import com.uber.m3.tally.Scope;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;

Expand All @@ -39,4 +40,10 @@ public interface NexusOperationContext {
* WorkflowServiceStubsOptions.Builder#setMetricsScope(Scope)} when a worker starts up.
*/
Scope getMetricsScope();

/**
* Get a {@link WorkflowClient} that can be used to start interact with the Temporal service from
* a Nexus handler.
*/
WorkflowClient getWorkflowClient();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,11 @@

import io.nexusrpc.handler.*;
import io.nexusrpc.handler.OperationHandler;
import io.temporal.client.WorkflowClient;
import io.temporal.common.Experimental;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.nexus.NexusOperationContextImpl;

/** WorkflowClientOperationHandlers can be used to create Temporal specific OperationHandlers */
@Experimental
public final class WorkflowClientOperationHandlers {
/**
* Helper to create {@link io.nexusrpc.handler.OperationHandler} instances that take a {@link
* io.temporal.client.WorkflowClient}.
*/
public static <T, R> OperationHandler<T, R> sync(
SynchronousWorkflowClientOperationFunction<T, R> func) {
return io.nexusrpc.handler.OperationHandler.sync(
(OperationContext ctx, OperationStartDetails details, T input) -> {
NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get();
return func.apply(ctx, details, nexusCtx.getWorkflowClient(), input);
});
}

/**
* Maps a workflow method to an {@link io.nexusrpc.handler.OperationHandler}.
*
Expand All @@ -52,9 +36,8 @@ public static <T, R> OperationHandler<T, R> sync(
public static <T, R> OperationHandler<T, R> fromWorkflowMethod(
WorkflowMethodFactory<T, R> startMethod) {
return new RunWorkflowOperation<>(
(OperationContext context, OperationStartDetails details, WorkflowClient client, T input) ->
WorkflowHandle.fromWorkflowMethod(
startMethod.apply(context, details, client, input), input));
(OperationContext context, OperationStartDetails details, T input) ->
WorkflowHandle.fromWorkflowMethod(startMethod.apply(context, details, input), input));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,5 @@ public interface WorkflowHandleFactory<T, R> {
* through the provided {@link WorkflowClient}.
*/
@Nullable
WorkflowHandle<R> apply(
OperationContext context, OperationStartDetails details, WorkflowClient client, T input);
WorkflowHandle<R> apply(OperationContext context, OperationStartDetails details, T input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,5 @@ public interface WorkflowMethodFactory<T, R> {
* provided {@link WorkflowClient}.
*/
@Nullable
Functions.Func1<T, R> apply(
OperationContext context, OperationStartDetails details, WorkflowClient client, T input);
Functions.Func1<T, R> apply(OperationContext context, OperationStartDetails details, T input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.client.WorkflowClientInternal;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.nexus.NexusOperationContextImpl;
import io.temporal.internal.nexus.InternalNexusOperationContext;
import io.temporal.workflow.Functions;

class WorkflowMethodMethodInvoker implements WorkflowHandleInvoker {
Expand All @@ -36,7 +36,7 @@ public WorkflowMethodMethodInvoker(Functions.Proc workflow) {

@Override
public WorkflowExecution invoke(NexusStartWorkflowRequest request) {
NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get();
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();
return ((WorkflowClientInternal) nexusCtx.getWorkflowClient().getInternal())
.startNexus(request, workflow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.temporal.client.WorkflowClient;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.nexus.NexusOperationContextImpl;
import io.temporal.internal.nexus.InternalNexusOperationContext;
import java.net.URISyntaxException;

class RunWorkflowOperation<T, R> implements OperationHandler<T, R> {
Expand All @@ -45,10 +45,9 @@ class RunWorkflowOperation<T, R> implements OperationHandler<T, R> {
@Override
public OperationStartResult<R> start(
OperationContext ctx, OperationStartDetails operationStartDetails, T input) {
NexusOperationContextImpl nexusCtx = CurrentNexusOperationContext.get();
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();

WorkflowHandle handle =
handleFactory.apply(ctx, operationStartDetails, nexusCtx.getWorkflowClient(), input);
WorkflowHandle handle = handleFactory.apply(ctx, operationStartDetails, input);

NexusStartWorkflowRequest nexusRequest =
new NexusStartWorkflowRequest(
Expand All @@ -73,7 +72,7 @@ public OperationStartResult<R> start(
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
try {
OperationStartResult.Builder<R> result =
OperationStartResult.<R>newAsyncBuilder(workflowExec.getWorkflowId());
OperationStartResult.newAsyncBuilder(workflowExec.getWorkflowId());
if (nexusLink != null) {
result.addLink(nexusProtoLinkToLink(nexusLink));
}
Expand Down
Loading

0 comments on commit 32df8d4

Please sign in to comment.