diff --git a/examples/pom.xml b/examples/pom.xml
index a7e7b35f..cf921537 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -139,6 +139,9 @@
dockerBuild
+
+ amazoncorretto:11
+
io.numaproj.numaflow.examples.map.evenodd.EvenOddFunction
@@ -156,6 +159,9 @@
dockerBuild
+
+ amazoncorretto:11
+
io.numaproj.numaflow.examples.sink.simple.SimpleSink
@@ -172,6 +178,9 @@
dockerBuild
+
+ amazoncorretto:11
+
io.numaproj.numaflow.examples.reduce.sum.SumFactory
@@ -189,6 +198,9 @@
dockerBuild
+
+ amazoncorretto:11
+
io.numaproj.numaflow.examples.reducestreamer.sum.SumFactory
@@ -228,6 +240,9 @@
dockerBuild
+
+ amazoncorretto:11
+
io.numaproj.numaflow.examples.reduce.count.CounterFactory
@@ -245,6 +260,9 @@
dockerBuild
+
+ amazoncorretto:11
+
io.numaproj.numaflow.examples.sideinput.simple.SimpleSideInput
@@ -263,6 +281,9 @@
dockerBuild
+
+ amazoncorretto:11
+
io.numaproj.numaflow.examples.sideinput.udf.SimpleMapWithSideInput
@@ -302,6 +323,9 @@
dockerBuild
+
+ amazoncorretto:11
+
io.numaproj.numaflow.examples.reducesession.counter.CountFactory
diff --git a/pom.xml b/pom.xml
index 6959b053..16f70f6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,6 +137,10 @@
io.grpc
grpc-stub
+
+ io.grpc
+ grpc-inprocess
+
javax.annotation
javax.annotation-api
diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java
index b3b9bbbf..e338f5d0 100644
--- a/src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.batchmapper;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;
@@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java
index bca4d4fa..f74d1b90 100644
--- a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java
+++ b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java
@@ -2,17 +2,16 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
/**
* Server is the gRPC server for executing batch map operation.
@@ -24,8 +23,7 @@ public class Server {
private final Service service;
private final CompletableFuture shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final GrpcServerWrapper server;
/**
* constructor to create sink gRPC server.
@@ -36,6 +34,14 @@ public Server(BatchMapper batchMapper) {
this(batchMapper, GRPCConfig.defaultGrpcConfig());
}
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, BatchMapper service, ServerInterceptor interceptor, String serverName) {
+ this.grpcConfig = grpcConfig;
+ this.shutdownSignal = new CompletableFuture<>();
+ this.service = new Service(service, this.shutdownSignal);
+ this.server = new GrpcServerWrapper(interceptor, serverName, this.service);
+ }
+
/**
* constructor to create sink gRPC server with gRPC config.
*
@@ -46,7 +52,7 @@ public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) {
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(batchMapper, this.shutdownSignal);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.server = new GrpcServerWrapper(this.grpcConfig, this.service);
}
/**
@@ -56,37 +62,22 @@ public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) {
*/
public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
- serverInfoAccessor,
- grpcConfig.getSocketPath(),
- grpcConfig.getInfoFilePath(),
+ this.serverInfoAccessor,
+ this.grpcConfig.getSocketPath(),
+ this.grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));
- if (this.server == null) {
- this.server = grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
- server.start();
+ this.server.start();
- log.info(
- "server started, listening on socket path: " + grpcConfig.getSocketPath());
+ log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath());
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
@@ -94,18 +85,11 @@ public void start() throws Exception {
}));
// if there are any exceptions, shutdown the server gracefully.
- shutdownSignal.whenCompleteAsync((v, e) -> {
- if (server != null && server.isTerminated()) {
- return;
- }
-
+ this.shutdownSignal.whenCompleteAsync((v, e) -> {
if (e != null) {
System.err.println("*** shutting down batch map gRPC server because of an exception - " + e.getMessage());
try {
- log.info("stopping server");
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
@@ -123,7 +107,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("batch map server is waiting for termination");
- server.awaitTermination();
+ this.server.awaitTermination();
log.info("batch map server has terminated");
}
@@ -134,25 +118,7 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
+ this.server.gracefullyShutdown();
this.service.shutDown();
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- // force shutdown if not terminated
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
- }
-
- /**
- * Set server builder for testing.
- *
- * @param serverBuilder in process server builder can be used for testing
- */
- @VisibleForTesting
- public void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
}
}
diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java
index fd967cd2..0d320a19 100644
--- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java
+++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java
@@ -185,15 +185,16 @@ private HandlerDatum constructHandlerDatum(MapOuterClass.MapRequest d) {
);
}
- // Shuts down the executor service which is used for batch map
+ // Shuts down the executor service
public void shutDown() {
this.mapTaskExecutor.shutdown();
try {
if (!mapTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
log.error("BatchMap executor did not terminate in the specified time.");
List droppedTasks = mapTaskExecutor.shutdownNow();
- log.error("BatchMap executor was abruptly shut down. " + droppedTasks.size()
- + " tasks will not be executed.");
+ log.error(
+ "BatchMap executor was abruptly shut down. {} tasks will not be executed.",
+ droppedTasks.size());
} else {
log.info("BatchMap executor was terminated.");
}
diff --git a/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java
index ba2b65e2..495d4105 100644
--- a/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.mapper;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;
@@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
diff --git a/src/main/java/io/numaproj/numaflow/mapper/Server.java b/src/main/java/io/numaproj/numaflow/mapper/Server.java
index b7376d49..c5f638b8 100644
--- a/src/main/java/io/numaproj/numaflow/mapper/Server.java
+++ b/src/main/java/io/numaproj/numaflow/mapper/Server.java
@@ -1,17 +1,17 @@
package io.numaproj.numaflow.mapper;
import com.fasterxml.jackson.databind.ObjectMapper;
-import io.grpc.ServerBuilder;
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
/**
* Server is the gRPC server for executing map operation.
@@ -20,11 +20,9 @@
public class Server {
private final GRPCConfig grpcConfig;
- private final Service service;
private final CompletableFuture shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final GrpcServerWrapper server;
/**
* constructor to create gRPC server.
@@ -43,9 +41,18 @@ public Server(Mapper mapper) {
*/
public Server(Mapper mapper, GRPCConfig grpcConfig) {
this.shutdownSignal = new CompletableFuture<>();
- this.service = new Service(mapper, this.shutdownSignal);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.server = new GrpcServerWrapper(this.grpcConfig, new Service(mapper, this.shutdownSignal));
+ }
+
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, Mapper service, ServerInterceptor interceptor, String serverName) {
+ this.grpcConfig = grpcConfig;
+ this.shutdownSignal = new CompletableFuture<>();
+ this.server = new GrpcServerWrapper(
+ interceptor,
+ serverName,
+ new Service(service, this.shutdownSignal));
}
/**
@@ -56,43 +63,28 @@ public Server(Mapper mapper, GRPCConfig grpcConfig) {
* @throws Exception if the server fails to start
*/
public void start() throws Exception {
-
- if (!grpcConfig.isLocal()) {
+ if (!this.grpcConfig.isLocal()) {
GrpcServerUtils.writeServerInfo(
- serverInfoAccessor,
- grpcConfig.getSocketPath(),
- grpcConfig.getInfoFilePath(),
+ this.serverInfoAccessor,
+ this.grpcConfig.getSocketPath(),
+ this.grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));
}
- if (this.server == null) {
- this.server = this.grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
- server.start();
+ this.server.start();
log.info(
"server started, listening on {}",
- grpcConfig.isLocal() ?
- "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
+ this.grpcConfig.isLocal() ?
+ "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath());
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
// FIXME - this is a workaround to immediately terminate the JVM process
// The correct way to do this is to stop all the actors and wait for them to terminate
System.exit(0);
@@ -103,18 +95,11 @@ public void start() throws Exception {
}));
// if there are any exceptions, shutdown the server gracefully.
- shutdownSignal.whenCompleteAsync((v, e) -> {
- if (server.isTerminated()) {
- return;
- }
-
+ this.shutdownSignal.whenCompleteAsync((v, e) -> {
if (e != null) {
System.err.println("*** shutting down mapper gRPC server because of an exception - " + e.getMessage());
try {
- log.info("stopping server");
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
// FIXME - this is a workaround to immediately terminate the JVM process
// The correct way to do this is to stop all the actors and wait for them to terminate
System.exit(0);
@@ -135,7 +120,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("mapper server is waiting for termination");
- server.awaitTermination();
+ this.server.awaitTermination();
log.info("mapper server has terminated");
}
@@ -146,24 +131,6 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- // force shutdown if not terminated
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
- }
-
- /**
- * Sets the server builder. This method can be used for testing purposes to provide a different
- * grpc server builder.
- *
- * @param serverBuilder the server builder to be used
- */
- public void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
+ this.server.gracefullyShutdown();
}
}
diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java
index 686d4a9e..a0316133 100644
--- a/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.mapstreamer;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;
@@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java
index 543a5707..2f286761 100644
--- a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java
+++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java
@@ -2,17 +2,16 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
/**
* Server is the gRPC server for executing map operation.
@@ -21,11 +20,9 @@
public class Server {
private final GRPCConfig grpcConfig;
- private final Service service;
private final CompletableFuture shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final GrpcServerWrapper server;
/**
* constructor to create sink gRPC server.
@@ -44,9 +41,18 @@ public Server(MapStreamer mapStreamer) {
*/
public Server(MapStreamer mapStreamer, GRPCConfig grpcConfig) {
this.shutdownSignal = new CompletableFuture<>();
- this.service = new Service(mapStreamer, this.shutdownSignal);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.server = new GrpcServerWrapper(this.grpcConfig, new Service(mapStreamer, this.shutdownSignal));
+ }
+
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, MapStreamer service, ServerInterceptor interceptor, String serverName) {
+ this.grpcConfig = grpcConfig;
+ this.shutdownSignal = new CompletableFuture<>();
+ this.server = new GrpcServerWrapper(
+ interceptor,
+ serverName,
+ new Service(service, this.shutdownSignal));
}
/**
@@ -56,37 +62,22 @@ public Server(MapStreamer mapStreamer, GRPCConfig grpcConfig) {
*/
public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
- serverInfoAccessor,
- grpcConfig.getSocketPath(),
- grpcConfig.getInfoFilePath(),
+ this.serverInfoAccessor,
+ this.grpcConfig.getSocketPath(),
+ this.grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));
- if (this.server == null) {
- this.server = this.grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
- server.start();
+ this.server.start();
- log.info(
- "server started, listening on socket path: " + grpcConfig.getSocketPath());
+ log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath());
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down map streamer gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
@@ -94,18 +85,11 @@ public void start() throws Exception {
}));
// if there are any exceptions, shutdown the server gracefully.
- shutdownSignal.whenCompleteAsync((v, e) -> {
- if (server.isTerminated()) {
- return;
- }
-
+ this.shutdownSignal.whenCompleteAsync((v, e) -> {
if (e != null) {
System.err.println("*** shutting down map streamer gRPC server because of an exception - " + e.getMessage());
try {
- log.info("stopping server");
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
@@ -123,7 +107,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("map stream server is waiting for termination");
- server.awaitTermination();
+ this.server.awaitTermination();
log.info("map stream server has terminated");
}
@@ -134,24 +118,6 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- // force shutdown if not terminated
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
- }
-
- /**
- * Set server builder for testing.
- *
- * @param serverBuilder in process server builder can be used for testing
- */
- @VisibleForTesting
- public void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
+ this.server.gracefullyShutdown();
}
}
diff --git a/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java
index e3f8bb62..68a74f59 100644
--- a/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.reducer;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;
@@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
diff --git a/src/main/java/io/numaproj/numaflow/reducer/Server.java b/src/main/java/io/numaproj/numaflow/reducer/Server.java
index 478be133..2aaad86d 100644
--- a/src/main/java/io/numaproj/numaflow/reducer/Server.java
+++ b/src/main/java/io/numaproj/numaflow/reducer/Server.java
@@ -2,16 +2,14 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
-import java.util.concurrent.TimeUnit;
-
/**
* Server is the gRPC server for executing reduce operation.
*/
@@ -19,10 +17,8 @@
public class Server {
private final GRPCConfig grpcConfig;
- private final Service service;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final GrpcServerWrapper server;
/**
* constructor to create gRPC server.
@@ -40,9 +36,17 @@ public Server(ReducerFactory extends Reducer> reducerFactory) {
* @param reducerFactory to process the message
*/
public Server(ReducerFactory extends Reducer> reducerFactory, GRPCConfig grpcConfig) {
- this.service = new Service(reducerFactory);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.server = new GrpcServerWrapper(this.grpcConfig, new Service(reducerFactory));
+ }
+
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, ReducerFactory extends Reducer> service, ServerInterceptor interceptor, String serverName) {
+ this.grpcConfig = grpcConfig;
+ this.server = new GrpcServerWrapper(
+ interceptor,
+ serverName,
+ new Service(service));
}
/**
@@ -51,24 +55,15 @@ public Server(ReducerFactory extends Reducer> reducerFactory, GRPCConfig grpcC
* @throws Exception if server fails to start
*/
public void start() throws Exception {
- if (!grpcConfig.isLocal()) {
+ if (!this.grpcConfig.isLocal()) {
GrpcServerUtils.writeServerInfo(
- serverInfoAccessor,
- grpcConfig.getSocketPath(),
- grpcConfig.getInfoFilePath(),
+ this.serverInfoAccessor,
+ this.grpcConfig.getSocketPath(),
+ this.grpcConfig.getInfoFilePath(),
ContainerType.REDUCER);
}
- if (this.server == null) {
- this.server = this.grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
- server.start();
+ this.server.start();
log.info(
"server started, listening on {}",
@@ -79,13 +74,8 @@ public void start() throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
@@ -102,7 +92,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("reducer server is waiting for termination");
- server.awaitTermination();
+ this.server.awaitTermination();
log.info("reducer server has terminated");
}
@@ -113,24 +103,6 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- // force shutdown if not terminated
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
- }
-
- /**
- * Set server builder for testing.
- *
- * @param serverBuilder
- */
- @VisibleForTesting
- void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
+ this.server.gracefullyShutdown();
}
}
diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java
index c008b5b0..7ec40aa8 100644
--- a/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.reducestreamer;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;
@@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
@Builder.Default
diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java
index 941292d5..a0946df8 100644
--- a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java
+++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java
@@ -2,28 +2,24 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
-import java.util.concurrent.TimeUnit;
-
/**
* Server is the gRPC server for executing reduce stream operation.
*/
@Slf4j
public class Server {
private final GRPCConfig grpcConfig;
- private final Service service;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final GrpcServerWrapper server;
/**
* constructor to create gRPC server.
@@ -43,9 +39,17 @@ public Server(ReduceStreamerFactory extends ReduceStreamer> reduceStreamerFact
public Server(
ReduceStreamerFactory extends ReduceStreamer> reduceStreamerFactory,
GRPCConfig grpcConfig) {
- this.service = new Service(reduceStreamerFactory);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.server = new GrpcServerWrapper(this.grpcConfig, new Service(reduceStreamerFactory));
+ }
+
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, ReduceStreamerFactory extends ReduceStreamer> service, ServerInterceptor interceptor, String serverName) {
+ this.grpcConfig = grpcConfig;
+ this.server = new GrpcServerWrapper(
+ interceptor,
+ serverName,
+ new Service(service));
}
/**
@@ -54,41 +58,27 @@ public Server(
* @throws Exception if server fails to start
*/
public void start() throws Exception {
- if (!grpcConfig.isLocal()) {
+ if (!this.grpcConfig.isLocal()) {
GrpcServerUtils.writeServerInfo(
- serverInfoAccessor,
- grpcConfig.getSocketPath(),
- grpcConfig.getInfoFilePath(),
+ this.serverInfoAccessor,
+ this.grpcConfig.getSocketPath(),
+ this.grpcConfig.getInfoFilePath(),
ContainerType.REDUCE_STREAMER);
}
- if (this.server == null) {
- this.server = this.grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
- server.start();
+ this.server.start();
log.info(
"server started, listening on {}",
- grpcConfig.isLocal() ?
- "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
+ this.grpcConfig.isLocal() ?
+ "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath());
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
@@ -105,7 +95,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("reduce stream server is waiting for termination");
- server.awaitTermination();
+ this.server.awaitTermination();
log.info("reduce stream server terminated");
}
@@ -116,24 +106,6 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- // force shutdown if not terminated
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
- }
-
- /**
- * Set server builder for testing.
- *
- * @param serverBuilder
- */
- @VisibleForTesting
- void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
+ this.server.gracefullyShutdown();
}
}
diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java
index 2c348286..675c4a87 100644
--- a/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.sessionreducer;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;
@@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java
index 4c268b97..eb0a4308 100644
--- a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java
+++ b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java
@@ -2,28 +2,24 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
-import java.util.concurrent.TimeUnit;
-
/**
* Server is the gRPC server for executing session reduce operations.
*/
@Slf4j
public class Server {
private final GRPCConfig grpcConfig;
- private final Service service;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final GrpcServerWrapper server;
/**
* constructor to create gRPC server.
@@ -43,9 +39,17 @@ public Server(SessionReducerFactory extends SessionReducer> sessionReducerFact
public Server(
SessionReducerFactory extends SessionReducer> sessionReducerFactory,
GRPCConfig grpcConfig) {
- this.service = new Service(sessionReducerFactory);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sessionReducerFactory));
+ }
+
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, SessionReducerFactory extends SessionReducer> service, ServerInterceptor interceptor, String serverName) {
+ this.grpcConfig = grpcConfig;
+ this.server = new GrpcServerWrapper(
+ interceptor,
+ serverName,
+ new Service(service));
}
/**
@@ -54,41 +58,27 @@ public Server(
* @throws Exception if server fails to start
*/
public void start() throws Exception {
- if (!grpcConfig.isLocal()) {
+ if (!this.grpcConfig.isLocal()) {
GrpcServerUtils.writeServerInfo(
- serverInfoAccessor,
- grpcConfig.getSocketPath(),
- grpcConfig.getInfoFilePath(),
+ this.serverInfoAccessor,
+ this.grpcConfig.getSocketPath(),
+ this.grpcConfig.getInfoFilePath(),
ContainerType.SESSION_REDUCER);
}
- if (this.server == null) {
- this.server = this.grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
- server.start();
+ this.server.start();
log.info(
"server started, listening on {}",
- grpcConfig.isLocal() ?
- "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
+ this.grpcConfig.isLocal() ?
+ "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath());
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
@@ -105,7 +95,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("session reduce server is waiting for termination");
- server.awaitTermination();
+ this.server.awaitTermination();
log.info("session reduce server terminated");
}
@@ -116,24 +106,6 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- // force shutdown if not terminated
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
- }
-
- /**
- * Set server builder for testing.
- *
- * @param serverBuilder for building the server
- */
- @VisibleForTesting
- void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
+ this.server.gracefullyShutdown();
}
}
diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcConfigRetriever.java b/src/main/java/io/numaproj/numaflow/shared/GrpcConfigRetriever.java
new file mode 100644
index 00000000..a144853c
--- /dev/null
+++ b/src/main/java/io/numaproj/numaflow/shared/GrpcConfigRetriever.java
@@ -0,0 +1,13 @@
+package io.numaproj.numaflow.shared;
+
+// Currently each of the UDFs (Mapper, BatchMapper, Sourcer) has its own GrpcConfig class.
+// To start a gRPC server, we need to pass gRPC configurations to the GrpcServerWrapper.
+// In order to make the GrpcServerWrapper more generic, we create this GrpcConfigRetriever interface,
+// which is implemented by the UDFs' GrpcConfig classes.
+public interface GrpcConfigRetriever {
+ String getSocketPath();
+ int getMaxMessageSize();
+ String getInfoFilePath();
+ int getPort();
+ boolean isLocal();
+}
diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java
index 9f90c737..cd85dfa3 100644
--- a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java
+++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java
@@ -1,15 +1,7 @@
package io.numaproj.numaflow.shared;
import io.grpc.Context;
-import io.grpc.Contexts;
-import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
-import io.grpc.ServerBuilder;
-import io.grpc.ServerCall;
-import io.grpc.ServerCallHandler;
-import io.grpc.ServerInterceptor;
-import io.grpc.Status;
-import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -17,7 +9,6 @@
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
-import io.netty.channel.unix.DomainSocketAddress;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java
similarity index 61%
rename from src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java
rename to src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java
index 9fe59cda..39a6580c 100644
--- a/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java
+++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.shared;
+import com.google.common.annotations.VisibleForTesting;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Contexts;
@@ -10,20 +11,90 @@
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
+import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_END;
import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_START;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_END_TIME;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_START_TIME;
-public class GrpcServerHelper {
+/**
+ * GrpcServerWrapper is a wrapper class for gRPC server.
+ * It takes care of creating, starting and gracefully shutting down the server.
+ */
+@Slf4j
+public class GrpcServerWrapper {
+ private final Server server;
private EventLoopGroup bossEventLoopGroup;
private EventLoopGroup workerEventLoopGroup;
- public void gracefullyShutdownEventLoopGroups() {
+ public GrpcServerWrapper(
+ GrpcConfigRetriever grpcConfigRetriever,
+ BindableService service) {
+ this.server = createServer(
+ grpcConfigRetriever.getSocketPath(),
+ grpcConfigRetriever.getMaxMessageSize(),
+ grpcConfigRetriever.isLocal(),
+ grpcConfigRetriever.getPort(),
+ service);
+ }
+
+ @VisibleForTesting
+ public GrpcServerWrapper(
+ ServerInterceptor interceptor,
+ String serverName,
+ BindableService service) {
+ if (interceptor == null) {
+ this.server = InProcessServerBuilder.forName(serverName)
+ .directExecutor()
+ .addService(service)
+ .build();
+ return;
+ }
+ this.server = InProcessServerBuilder.forName(serverName)
+ .intercept(interceptor)
+ .directExecutor()
+ .addService(service)
+ .build();
+ }
+
+ public void start() throws Exception {
+ if (this.server == null) {
+ throw new IllegalStateException("Server is not initialized");
+ }
+ this.server.start();
+ }
+
+ public void awaitTermination() throws InterruptedException {
+ this.server.awaitTermination();
+ // if the server has been terminated, we should expect the event loop groups to be terminated as well.
+ if (!(this.workerEventLoopGroup.awaitTermination(30, TimeUnit.SECONDS) &&
+ this.bossEventLoopGroup.awaitTermination(30, TimeUnit.SECONDS))) {
+ log.error("Timed out to gracefully shutdown event loop groups");
+ throw new InterruptedException("Timed out to gracefully shutdown event loop groups");
+ }
+ }
+
+ public void gracefullyShutdown() throws InterruptedException{
+ if (this.server == null || this.server.isTerminated()) {
+ return;
+ }
+ log.info("stopping gRPC server...");
+ this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+ if (!this.server.isTerminated()) {
+ this.server.shutdownNow();
+ }
+ log.info("gracefully shutting down event loop groups...");
+ this.gracefullyShutdownEventLoopGroups();
+ }
+
+ private void gracefullyShutdownEventLoopGroups() {
if (this.bossEventLoopGroup != null) {
this.bossEventLoopGroup.shutdownGracefully();
}
@@ -32,7 +103,7 @@ public void gracefullyShutdownEventLoopGroups() {
}
}
- public Server createServer(
+ private Server createServer(
String socketPath,
int maxMessageSize,
boolean isLocal,
diff --git a/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java
index 048e0961..49cab82e 100644
--- a/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.sideinput;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;
@@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java
index 1407d6b1..acfe3297 100644
--- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java
+++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java
@@ -2,16 +2,14 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
-import java.util.concurrent.TimeUnit;
-
/**
* Server is the gRPC server for retrieving side input.
*/
@@ -19,10 +17,8 @@
public class Server {
private final GRPCConfig grpcConfig;
- private final Service service;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final GrpcServerWrapper server;
/**
* constructor to create gRPC server.
@@ -40,9 +36,17 @@ public Server(SideInputRetriever sideInputRetriever) {
* @param sideInputRetriever to retrieve the side input
*/
public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) {
- this.service = new Service(sideInputRetriever);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sideInputRetriever));
+ }
+
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, SideInputRetriever service, ServerInterceptor interceptor, String serverName) {
+ this.grpcConfig = grpcConfig;
+ this.server = new GrpcServerWrapper(
+ interceptor,
+ serverName,
+ new Service(service));
}
/**
@@ -51,41 +55,26 @@ public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) {
* @throws Exception if server fails to start
*/
public void start() throws Exception {
- if (!grpcConfig.isLocal()) {
+ if (!this.grpcConfig.isLocal()) {
GrpcServerUtils.writeServerInfo(
- serverInfoAccessor,
- grpcConfig.getSocketPath(),
- grpcConfig.getInfoFilePath(),
+ this.serverInfoAccessor,
+ this.grpcConfig.getSocketPath(),
+ this.grpcConfig.getInfoFilePath(),
ContainerType.SIDEINPUT);
}
- if (this.server == null) {
- this.server = grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
- server.start();
+ this.server.start();
log.info(
"server started, listening on {}",
- grpcConfig.isLocal() ?
- "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
+ this.grpcConfig.isLocal() ?
+ "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath());
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
- System.err.println("*** shutting down gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
@@ -102,7 +91,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("side input server is waiting for termination");
- server.awaitTermination();
+ this.server.awaitTermination();
log.info("side input server has terminated");
}
@@ -113,23 +102,6 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
- }
-
- /**
- * Set server builder for testing.
- *
- * @param serverBuilder in process server builder can be used for testing
- */
- @VisibleForTesting
- public void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
+ this.server.gracefullyShutdown();
}
}
diff --git a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java
index 673339a0..c528657e 100644
--- a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java
@@ -3,12 +3,14 @@
import lombok.Builder;
import lombok.Getter;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
+
/**
* GRPCConfig is used to provide configurations for gRPC server.
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java
index 65c8d7da..f18b23f7 100644
--- a/src/main/java/io/numaproj/numaflow/sinker/Server.java
+++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java
@@ -1,16 +1,19 @@
package io.numaproj.numaflow.sinker;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.BindableService;
import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
+import io.grpc.inprocess.InProcessServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
/**
* Server is the gRPC server for executing user defined sinks.
@@ -19,11 +22,10 @@
public class Server {
private final GRPCConfig grpcConfig;
- private final Service service;
private final CompletableFuture shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final Service service;
+ private final GrpcServerWrapper server;
/**
* constructor to create sink gRPC server.
@@ -42,9 +44,9 @@ public Server(Sinker sinker) {
*/
public Server(Sinker sinker, GRPCConfig grpcConfig) {
this.shutdownSignal = new CompletableFuture<>();
- this.service = new Service(sinker, this.shutdownSignal);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.service = new Service(sinker, this.shutdownSignal);
+ this.server = new GrpcServerWrapper(grpcConfig, this.service);
}
/**
@@ -61,15 +63,6 @@ public void start() throws Exception {
ContainerType.SINKER);
}
- if (this.server == null) {
- this.server = this.grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
server.start();
log.info(
@@ -81,13 +74,8 @@ public void start() throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down sink gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
@@ -96,17 +84,10 @@ public void start() throws Exception {
// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
- if (server.isTerminated()) {
- return;
- }
-
if (e != null) {
System.err.println("*** shutting down sink gRPC server because of an exception - " + e.getMessage());
try {
- log.info("stopping server");
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
@@ -135,25 +116,15 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
+ server.gracefullyShutdown();
this.service.shutDown();
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- // force shutdown if not terminated
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
}
- /**
- * Sets the server builder. This method can be used for testing purposes to provide a different
- * grpc server builder.
- *
- * @param serverBuilder the server builder to be used
- */
- public void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, Sinker sinker, ServerInterceptor interceptor, String serverName) {
+ this.grpcConfig = grpcConfig;
+ this.shutdownSignal = new CompletableFuture<>();
+ this.service = new Service(sinker, this.shutdownSignal);
+ this.server = new GrpcServerWrapper(interceptor, serverName, this.service);
}
}
diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java
index 1cd52014..95f105b8 100644
--- a/src/main/java/io/numaproj/numaflow/sinker/Service.java
+++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java
@@ -154,7 +154,7 @@ private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) {
);
}
- // shuts down the executor service which is used for reduce
+ // shuts down the executor service
public void shutDown() {
this.sinkTaskExecutor.shutdown();
try {
@@ -166,8 +166,9 @@ public void shutDown() {
if (!sinkTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
log.error("Sink executor did not terminate in the specified time.");
List droppedTasks = sinkTaskExecutor.shutdownNow();
- log.error("Sink executor was abruptly shut down. " + droppedTasks.size()
- + " tasks will not be executed.");
+ log.error(
+ "Sink executor was abruptly shut down. {} tasks will not be executed.",
+ droppedTasks.size());
} else {
log.info("Sink executor was terminated.");
}
diff --git a/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java
index 46b2c28e..2977f9a0 100644
--- a/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.sourcer;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;
@@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Server.java b/src/main/java/io/numaproj/numaflow/sourcer/Server.java
index a91eb5fb..25978766 100644
--- a/src/main/java/io/numaproj/numaflow/sourcer/Server.java
+++ b/src/main/java/io/numaproj/numaflow/sourcer/Server.java
@@ -2,16 +2,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
/**
* Server is the gRPC server for reading from source.
@@ -20,11 +19,9 @@
public class Server {
private final GRPCConfig grpcConfig;
- private final Service service;
private final CompletableFuture shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final GrpcServerWrapper server;
/**
* constructor to create gRPC server.
@@ -43,9 +40,18 @@ public Server(Sourcer sourcer) {
*/
public Server(Sourcer sourcer, GRPCConfig grpcConfig) {
this.shutdownSignal = new CompletableFuture<>();
- this.service = new Service(sourcer, this.shutdownSignal);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sourcer, this.shutdownSignal));
+ }
+
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, Sourcer service, ServerInterceptor interceptor, String serverName) {
+ this.shutdownSignal = new CompletableFuture<>();
+ this.grpcConfig = grpcConfig;
+ this.server = new GrpcServerWrapper(
+ interceptor,
+ serverName,
+ new Service(service, this.shutdownSignal));
}
/**
@@ -54,41 +60,27 @@ public Server(Sourcer sourcer, GRPCConfig grpcConfig) {
* @throws Exception if server fails to start
*/
public void start() throws Exception {
- if (!grpcConfig.isLocal()) {
+ if (!this.grpcConfig.isLocal()) {
GrpcServerUtils.writeServerInfo(
- serverInfoAccessor,
- grpcConfig.getSocketPath(),
- grpcConfig.getInfoFilePath(),
+ this.serverInfoAccessor,
+ this.grpcConfig.getSocketPath(),
+ this.grpcConfig.getInfoFilePath(),
ContainerType.SOURCER);
}
- if (this.server == null) {
- this.server = grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
- server.start();
+ this.server.start();
log.info(
"server started, listening on {}",
- grpcConfig.isLocal() ?
- "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
+ this.grpcConfig.isLocal() ?
+ "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath());
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down source gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
@@ -96,18 +88,11 @@ public void start() throws Exception {
}));
// if there are any exceptions, shutdown the server gracefully.
- shutdownSignal.whenCompleteAsync((v, e) -> {
- if (server.isTerminated()) {
- return;
- }
-
+ this.shutdownSignal.whenCompleteAsync((v, e) -> {
if (e != null) {
System.err.println("*** shutting down source gRPC server because of an exception - " + e.getMessage());
try {
- log.info("stopping server");
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
@@ -125,7 +110,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("waiting for server to terminate");
- server.awaitTermination();
+ this.server.awaitTermination();
log.info("server has terminated");
}
@@ -136,24 +121,6 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- // force shutdown if not terminated
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
- }
-
- /**
- * Set server builder for testing.
- *
- * @param serverBuilder in process server builder can be used for testing
- */
- @VisibleForTesting
- public void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
+ this.server.gracefullyShutdown();
}
}
diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java
index 445a0f48..056a17fb 100644
--- a/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java
+++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.sourcetransformer;
+import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;
@@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
-public class GRPCConfig {
+public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;
diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java
index fcf42bcd..43731c17 100644
--- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java
+++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java
@@ -2,16 +2,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
-import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
+import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
/**
* Server is the gRPC server for executing source transformer operation.
@@ -20,11 +19,9 @@
public class Server {
private final GRPCConfig grpcConfig;
- private final Service service;
private final CompletableFuture shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
- private io.grpc.Server server;
- private final GrpcServerHelper grpcServerHelper;
+ private final GrpcServerWrapper server;
/**
* constructor to create gRPC server.
@@ -43,9 +40,18 @@ public Server(SourceTransformer sourceTransformer) {
*/
public Server(SourceTransformer sourceTransformer, GRPCConfig grpcConfig) {
this.shutdownSignal = new CompletableFuture<>();
- this.service = new Service(sourceTransformer, this.shutdownSignal);
this.grpcConfig = grpcConfig;
- this.grpcServerHelper = new GrpcServerHelper();
+ this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sourceTransformer, this.shutdownSignal));
+ }
+
+ @VisibleForTesting
+ protected Server(GRPCConfig grpcConfig, SourceTransformer service, ServerInterceptor interceptor, String serverName) {
+ this.shutdownSignal = new CompletableFuture<>();
+ this.grpcConfig = grpcConfig;
+ this.server = new GrpcServerWrapper(
+ interceptor,
+ serverName,
+ new Service(service, this.shutdownSignal));
}
/**
@@ -54,41 +60,27 @@ public Server(SourceTransformer sourceTransformer, GRPCConfig grpcConfig) {
* @throws Exception if server fails to start
*/
public void start() throws Exception {
- if (!grpcConfig.isLocal()) {
+ if (!this.grpcConfig.isLocal()) {
GrpcServerUtils.writeServerInfo(
- serverInfoAccessor,
- grpcConfig.getSocketPath(),
- grpcConfig.getInfoFilePath(),
+ this.serverInfoAccessor,
+ this.grpcConfig.getSocketPath(),
+ this.grpcConfig.getInfoFilePath(),
ContainerType.SOURCE_TRANSFORMER);
}
- if (this.server == null) {
- this.server = this.grpcServerHelper.createServer(
- grpcConfig.getSocketPath(),
- grpcConfig.getMaxMessageSize(),
- grpcConfig.isLocal(),
- grpcConfig.getPort(),
- this.service);
- }
-
- server.start();
+ this.server.start();
log.info(
"server started, listening on {}",
- grpcConfig.isLocal() ?
- "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
+ this.grpcConfig.isLocal() ?
+ "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath());
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
- if (server != null && server.isTerminated()) {
- return;
- }
- try {
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ try {
+ this.stop();
// FIXME - this is a workaround to immediately terminate the JVM process
// The correct way to do this is to stop all the actors and wait for them to terminate
System.exit(0);
@@ -99,18 +91,11 @@ public void start() throws Exception {
}));
// if there are any exceptions, shutdown the server gracefully.
- shutdownSignal.whenCompleteAsync((v, e) -> {
- if (server.isTerminated()) {
- return;
- }
-
+ this.shutdownSignal.whenCompleteAsync((v, e) -> {
if (e != null) {
System.err.println("*** shutting down sink gRPC server because of an exception - " + e.getMessage());
try {
- log.info("stopping server");
- Server.this.stop();
- log.info("gracefully shutting down event loop groups");
- this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
+ this.stop();
// FIXME - this is a workaround to immediately terminate the JVM process
// The correct way to do this is to stop all the actors and wait for them to terminate
System.exit(0);
@@ -131,7 +116,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("transformer server is waiting for termination");
- server.awaitTermination();
+ this.server.awaitTermination();
log.info("transformer server has terminated");
}
@@ -142,24 +127,6 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- // force shutdown if not terminated
- if (!server.isTerminated()) {
- server.shutdownNow();
- }
- }
- }
-
- /**
- * Set server builder for testing.
- *
- * @param serverBuilder in process server builder can be used for testing
- */
- @VisibleForTesting
- public void setServerBuilder(ServerBuilder> serverBuilder) {
- this.server = serverBuilder
- .addService(this.service)
- .build();
+ this.server.gracefullyShutdown();
}
}
diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java
index 3ba1fd13..23e11a5a 100644
--- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java
+++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java
@@ -1,14 +1,7 @@
package io.numaproj.numaflow.batchmapper;
import com.google.protobuf.ByteString;
-import io.grpc.Context;
-import io.grpc.Contexts;
-import io.grpc.ForwardingServerCallListener;
import io.grpc.ManagedChannel;
-import io.grpc.ServerCall;
-import io.grpc.ServerCallHandler;
-import io.grpc.ServerInterceptor;
-import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
@@ -35,46 +28,6 @@ public class ServerErrTest {
@Before
public void setUp() throws Exception {
- ServerInterceptor interceptor = new ServerInterceptor() {
- @Override
- public ServerCall.Listener interceptCall(
- ServerCall call,
- io.grpc.Metadata headers,
- ServerCallHandler next) {
-
- final var context =
- Context.current();
- ServerCall.Listener listener = Contexts.interceptCall(
- context,
- call,
- headers,
- next);
- return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(
- listener) {
- @Override
- public void onHalfClose() {
- try {
- super.onHalfClose();
- } catch (RuntimeException ex) {
- handleException(ex, call, headers);
- throw ex;
- }
- }
-
- private void handleException(
- RuntimeException e,
- ServerCall serverCall,
- io.grpc.Metadata headers) {
- // Currently, we only have application level exceptions.
- // Translate it to UNKNOWN status.
- var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
- var newStatus = Status.fromThrowable(status.asException());
- serverCall.close(newStatus, headers);
- }
- };
- }
- };
-
String serverName = InProcessServerBuilder.generateName();
GRPCConfig grpcServerConfig = GRPCConfig.newBuilder()
@@ -84,12 +37,10 @@ private void handleException(
.build();
server = new Server(
+ grpcServerConfig,
new TestMapFn(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java
index 3b4f5385..f15a9eb2 100644
--- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java
@@ -41,11 +41,10 @@ public void setUp() throws Exception {
.build();
server = new Server(
+ grpcServerConfig,
new TestMapFn(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java
index ec398c2c..3fc18f4d 100644
--- a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java
+++ b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java
@@ -82,12 +82,10 @@ private void handleException(
.build();
server = new Server(
+ grpcServerConfig,
new TestMapFnErr(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ interceptor,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java b/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java
index 553a8b6d..93a32f8f 100644
--- a/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java
@@ -39,11 +39,10 @@ public void setUp() throws Exception {
.build();
server = new Server(
+ grpcServerConfig,
new TestMapFn(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java
index 9559e2a8..43355d04 100644
--- a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java
+++ b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java
@@ -1,14 +1,7 @@
package io.numaproj.numaflow.mapstreamer;
import com.google.protobuf.ByteString;
-import io.grpc.Context;
-import io.grpc.Contexts;
-import io.grpc.ForwardingServerCallListener;
import io.grpc.ManagedChannel;
-import io.grpc.ServerCall;
-import io.grpc.ServerCallHandler;
-import io.grpc.ServerInterceptor;
-import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.testing.GrpcCleanupRule;
@@ -31,46 +24,6 @@ public class ServerErrTest {
@Before
public void setUp() throws Exception {
- ServerInterceptor interceptor = new ServerInterceptor() {
- @Override
- public ServerCall.Listener interceptCall(
- ServerCall call,
- io.grpc.Metadata headers,
- ServerCallHandler next) {
-
- final var context =
- Context.current();
- ServerCall.Listener listener = Contexts.interceptCall(
- context,
- call,
- headers,
- next);
- return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(
- listener) {
- @Override
- public void onHalfClose() {
- try {
- super.onHalfClose();
- } catch (RuntimeException ex) {
- handleException(ex, call, headers);
- throw ex;
- }
- }
-
- private void handleException(
- RuntimeException e,
- ServerCall serverCall,
- io.grpc.Metadata headers) {
- // Currently, we only have application level exceptions.
- // Translate it to UNKNOWN status.
- var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
- var newStatus = Status.fromThrowable(status.asException());
- serverCall.close(newStatus, headers);
- }
- };
- }
- };
-
String serverName = InProcessServerBuilder.generateName();
GRPCConfig grpcServerConfig = GRPCConfig.newBuilder()
@@ -80,12 +33,10 @@ private void handleException(
.build();
server = new Server(
+ grpcServerConfig,
new TestMapStreamFnErr(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerTest.java
index 5e65bc11..6f448b7a 100644
--- a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerTest.java
@@ -40,11 +40,10 @@ public void setUp() throws Exception {
.build();
server = new Server(
+ grpcServerConfig,
new TestMapStreamFn(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java
index 2efe8259..6d4b15e0 100644
--- a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java
+++ b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java
@@ -67,12 +67,10 @@ public ServerCall.Listener interceptCall(
.build();
server = new Server(
+ grpcServerConfig,
new ReduceErrTestFactory(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ interceptor,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java
index 14164476..880c2f0e 100644
--- a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java
@@ -69,12 +69,10 @@ public ServerCall.Listener interceptCall(
.build();
server = new Server(
+ grpcServerConfig,
new ReduceTestFactory(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ interceptor,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java
index cbd13c48..97c7d709 100644
--- a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java
+++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java
@@ -71,12 +71,10 @@ public ServerCall.Listener interceptCall(
.build();
server = new Server(
+ grpcServerConfig,
new ReduceStreamerErrTestFactory(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ interceptor,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java
index ec5b28d4..9f8faefa 100644
--- a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java
@@ -75,12 +75,10 @@ public ServerCall.Listener interceptCall(
.build();
server = new Server(
+ grpcServerConfig,
new ReduceStreamerTestFactory(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ interceptor,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/sessionreducer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sessionreducer/ServerErrTest.java
index 630d0ca1..5e7b63b1 100644
--- a/src/test/java/io/numaproj/numaflow/sessionreducer/ServerErrTest.java
+++ b/src/test/java/io/numaproj/numaflow/sessionreducer/ServerErrTest.java
@@ -2,13 +2,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
-import io.grpc.Context;
-import io.grpc.Contexts;
import io.grpc.ManagedChannel;
-import io.grpc.Metadata;
-import io.grpc.ServerCall;
-import io.grpc.ServerCallHandler;
-import io.grpc.ServerInterceptor;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
@@ -38,17 +32,6 @@ public class ServerErrTest {
@Before
public void setUp() throws Exception {
- ServerInterceptor interceptor = new ServerInterceptor() {
- @Override
- public ServerCall.Listener interceptCall(
- ServerCall call,
- Metadata headers,
- ServerCallHandler next) {
- final var context = Context.current();
- return Contexts.interceptCall(context, call, headers, next);
- }
- };
-
String serverName = InProcessServerBuilder.generateName();
GRPCConfig grpcServerConfig = GRPCConfig.newBuilder()
@@ -58,12 +41,10 @@ public ServerCall.Listener interceptCall(
.build();
server = new Server(
+ grpcServerConfig,
new SessionReducerErrTestFactory(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/sessionreducer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sessionreducer/ServerTest.java
index 93775e9a..e6ac3d9a 100644
--- a/src/test/java/io/numaproj/numaflow/sessionreducer/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/sessionreducer/ServerTest.java
@@ -2,13 +2,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
-import io.grpc.Context;
-import io.grpc.Contexts;
import io.grpc.ManagedChannel;
-import io.grpc.Metadata;
-import io.grpc.ServerCall;
-import io.grpc.ServerCallHandler;
-import io.grpc.ServerInterceptor;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
@@ -40,17 +34,6 @@ public class ServerTest {
@Before
public void setUp() throws Exception {
- ServerInterceptor interceptor = new ServerInterceptor() {
- @Override
- public ServerCall.Listener interceptCall(
- ServerCall call,
- Metadata headers,
- ServerCallHandler next) {
- final var context = Context.current();
- return Contexts.interceptCall(context, call, headers, next);
- }
- };
-
String serverName = InProcessServerBuilder.generateName();
GRPCConfig grpcServerConfig = GRPCConfig.newBuilder()
@@ -60,12 +43,10 @@ public ServerCall.Listener interceptCall(
.build();
server = new Server(
+ grpcServerConfig,
new SessionReducerTestFactory(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java b/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java
index aaa33083..02a7aa3a 100644
--- a/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java
@@ -1,6 +1,5 @@
package io.numaproj.numaflow.sideinput;
-import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.inprocess.InProcessChannelBuilder;
@@ -32,11 +31,10 @@ public void setUp() throws Exception {
.build();
server = new Server(
+ grpcServerConfig,
new TestSideInput(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .directExecutor());
+ null,
+ serverName);
server.start();
@@ -53,7 +51,6 @@ public void tearDown() throws Exception {
@Test
public void TestSideInputRetriever() {
- ByteString inValue = ByteString.copyFromUtf8("invalue");
var stub = SideInputGrpc.newBlockingStub(inProcessChannel);
// First call should return the broadcast message
diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java
index da88c8fb..61bfc51f 100644
--- a/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java
+++ b/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java
@@ -41,11 +41,10 @@ public void setUp() throws Exception {
.build();
server = new Server(
+ grpcServerConfig,
new TestSinkFnErr(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java
index aba89491..62734c68 100644
--- a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java
@@ -42,11 +42,10 @@ public void setUp() throws Exception {
.build();
server = new Server(
+ grpcServerConfig,
new TestSinkFn(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java
index 4a59006e..57b3c6ea 100644
--- a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java
+++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java
@@ -1,13 +1,6 @@
package io.numaproj.numaflow.sourcer;
-import io.grpc.Context;
-import io.grpc.Contexts;
-import io.grpc.ForwardingServerCallListener;
import io.grpc.ManagedChannel;
-import io.grpc.ServerCall;
-import io.grpc.ServerCallHandler;
-import io.grpc.ServerInterceptor;
-import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
@@ -33,46 +26,6 @@ public class ServerErrTest {
@Before
public void setUp() throws Exception {
- ServerInterceptor interceptor = new ServerInterceptor() {
- @Override
- public ServerCall.Listener interceptCall(
- ServerCall call,
- io.grpc.Metadata headers,
- ServerCallHandler next) {
-
- final var context =
- Context.current();
- ServerCall.Listener listener = Contexts.interceptCall(
- context,
- call,
- headers,
- next);
- return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(
- listener) {
- @Override
- public void onHalfClose() {
- try {
- super.onHalfClose();
- } catch (RuntimeException ex) {
- handleException(ex, call, headers);
- throw ex;
- }
- }
-
- private void handleException(
- RuntimeException e,
- ServerCall serverCall,
- io.grpc.Metadata headers) {
- // Currently, we only have application level exceptions.
- // Translate it to UNKNOWN status.
- var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
- var newStatus = Status.fromThrowable(status.asException());
- serverCall.close(newStatus, headers);
- }
- };
- }
- };
-
String serverName = InProcessServerBuilder.generateName();
GRPCConfig grpcServerConfig = GRPCConfig.newBuilder()
@@ -82,12 +35,10 @@ private void handleException(
.build();
server = new Server(
+ grpcServerConfig,
new TestSourcerErr(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java
index 932d8e76..2e1796ed 100644
--- a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java
@@ -43,11 +43,10 @@ public void setUp() throws Exception {
.build();
server = new Server(
+ grpcServerConfig,
new TestSourcer(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java
index cc8a44da..03ab6ee0 100644
--- a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java
+++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java
@@ -1,18 +1,10 @@
package io.numaproj.numaflow.sourcetransformer;
import com.google.protobuf.ByteString;
-import io.grpc.Context;
-import io.grpc.Contexts;
-import io.grpc.ForwardingServerCallListener;
import io.grpc.ManagedChannel;
-import io.grpc.ServerCall;
-import io.grpc.ServerCallHandler;
-import io.grpc.ServerInterceptor;
-import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.testing.GrpcCleanupRule;
-import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc;
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
import org.junit.After;
@@ -34,46 +26,6 @@ public class ServerErrTest {
@Before
public void setUp() throws Exception {
- ServerInterceptor interceptor = new ServerInterceptor() {
- @Override
- public ServerCall.Listener interceptCall(
- ServerCall call,
- io.grpc.Metadata headers,
- ServerCallHandler next) {
-
- final var context =
- Context.current();
- ServerCall.Listener listener = Contexts.interceptCall(
- context,
- call,
- headers,
- next);
- return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(
- listener) {
- @Override
- public void onHalfClose() {
- try {
- super.onHalfClose();
- } catch (RuntimeException ex) {
- handleException(ex, call, headers);
- throw ex;
- }
- }
-
- private void handleException(
- RuntimeException e,
- ServerCall serverCall,
- io.grpc.Metadata headers) {
- // Currently, we only have application level exceptions.
- // Translate it to UNKNOWN status.
- var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
- var newStatus = Status.fromThrowable(status.asException());
- serverCall.close(newStatus, headers);
- }
- };
- }
- };
-
String serverName = InProcessServerBuilder.generateName();
GRPCConfig grpcServerConfig = GRPCConfig.newBuilder()
@@ -83,12 +35,10 @@ private void handleException(
.build();
server = new Server(
+ grpcServerConfig,
new SourceTransformerTestErr(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .intercept(interceptor)
- .directExecutor());
+ null,
+ serverName);
server.start();
diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java
index 666775c8..4975c8a6 100644
--- a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java
+++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java
@@ -41,11 +41,10 @@ public void setUp() throws Exception {
.build();
server = new Server(
+ grpcServerConfig,
new TestSourceTransformer(),
- grpcServerConfig);
-
- server.setServerBuilder(InProcessServerBuilder.forName(serverName)
- .directExecutor());
+ null,
+ serverName);
server.start();