From 95149d6e155391fbccb21bc046552a14d7400f97 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Fri, 8 Nov 2024 16:58:19 -0500 Subject: [PATCH] refactor: abstract gRPC server (#156) Signed-off-by: Keran Yang --- examples/pom.xml | 24 +++++ pom.xml | 4 + .../numaflow/batchmapper/GRPCConfig.java | 3 +- .../numaproj/numaflow/batchmapper/Server.java | 78 +++++------------ .../numaflow/batchmapper/Service.java | 7 +- .../numaproj/numaflow/mapper/GRPCConfig.java | 3 +- .../io/numaproj/numaflow/mapper/Server.java | 87 ++++++------------- .../numaflow/mapstreamer/GRPCConfig.java | 3 +- .../numaproj/numaflow/mapstreamer/Server.java | 82 +++++------------ .../numaproj/numaflow/reducer/GRPCConfig.java | 3 +- .../io/numaproj/numaflow/reducer/Server.java | 70 +++++---------- .../numaflow/reducestreamer/GRPCConfig.java | 3 +- .../numaflow/reducestreamer/Server.java | 74 +++++----------- .../numaflow/sessionreducer/GRPCConfig.java | 3 +- .../numaflow/sessionreducer/Server.java | 74 +++++----------- .../numaflow/shared/GrpcConfigRetriever.java | 13 +++ .../numaflow/shared/GrpcServerUtils.java | 9 -- ...rverHelper.java => GrpcServerWrapper.java} | 77 +++++++++++++++- .../numaflow/sideinput/GRPCConfig.java | 3 +- .../numaproj/numaflow/sideinput/Server.java | 74 +++++----------- .../numaproj/numaflow/sinker/GRPCConfig.java | 4 +- .../io/numaproj/numaflow/sinker/Server.java | 65 ++++---------- .../io/numaproj/numaflow/sinker/Service.java | 7 +- .../numaproj/numaflow/sourcer/GRPCConfig.java | 3 +- .../io/numaproj/numaflow/sourcer/Server.java | 85 ++++++------------ .../sourcetransformer/GRPCConfig.java | 3 +- .../numaflow/sourcetransformer/Server.java | 87 ++++++------------- .../numaflow/batchmapper/ServerErrTest.java | 55 +----------- .../numaflow/batchmapper/ServerTest.java | 7 +- .../numaflow/mapper/ServerErrTest.java | 8 +- .../numaproj/numaflow/mapper/ServerTest.java | 7 +- .../numaflow/mapstreamer/ServerErrTest.java | 55 +----------- .../numaflow/mapstreamer/ServerTest.java | 7 +- .../numaflow/reducer/ServerErrTest.java | 8 +- .../numaproj/numaflow/reducer/ServerTest.java | 8 +- .../reducestreamer/ServerErrTest.java | 8 +- .../numaflow/reducestreamer/ServerTest.java | 8 +- .../sessionreducer/ServerErrTest.java | 25 +----- .../numaflow/sessionreducer/ServerTest.java | 25 +----- .../numaflow/sideinput/ServerTest.java | 9 +- .../numaflow/sinker/ServerErrTest.java | 7 +- .../numaproj/numaflow/sinker/ServerTest.java | 7 +- .../numaflow/sourcer/ServerErrTest.java | 55 +----------- .../numaproj/numaflow/sourcer/ServerTest.java | 7 +- .../sourcetransformer/ServerErrTest.java | 56 +----------- .../sourcetransformer/ServerTest.java | 7 +- 46 files changed, 435 insertions(+), 882 deletions(-) create mode 100644 src/main/java/io/numaproj/numaflow/shared/GrpcConfigRetriever.java rename src/main/java/io/numaproj/numaflow/shared/{GrpcServerHelper.java => GrpcServerWrapper.java} (61%) diff --git a/examples/pom.xml b/examples/pom.xml index a7e7b35f..cf921537 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -139,6 +139,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.map.evenodd.EvenOddFunction @@ -156,6 +159,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.sink.simple.SimpleSink @@ -172,6 +178,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.reduce.sum.SumFactory @@ -189,6 +198,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.reducestreamer.sum.SumFactory @@ -228,6 +240,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.reduce.count.CounterFactory @@ -245,6 +260,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.sideinput.simple.SimpleSideInput @@ -263,6 +281,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.sideinput.udf.SimpleMapWithSideInput @@ -302,6 +323,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.reducesession.counter.CountFactory diff --git a/pom.xml b/pom.xml index 6959b053..16f70f6d 100644 --- a/pom.xml +++ b/pom.xml @@ -137,6 +137,10 @@ io.grpc grpc-stub + + io.grpc + grpc-inprocess + javax.annotation javax.annotation-api diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java index b3b9bbbf..e338f5d0 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.batchmapper; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; import lombok.Builder; import lombok.Getter; @@ -8,7 +9,7 @@ */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java index bca4d4fa..f74d1b90 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java @@ -2,17 +2,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; -import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; /** * Server is the gRPC server for executing batch map operation. @@ -24,8 +23,7 @@ public class Server { private final Service service; private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final GrpcServerWrapper server; /** * constructor to create sink gRPC server. @@ -36,6 +34,14 @@ public Server(BatchMapper batchMapper) { this(batchMapper, GRPCConfig.defaultGrpcConfig()); } + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, BatchMapper service, ServerInterceptor interceptor, String serverName) { + this.grpcConfig = grpcConfig; + this.shutdownSignal = new CompletableFuture<>(); + this.service = new Service(service, this.shutdownSignal); + this.server = new GrpcServerWrapper(interceptor, serverName, this.service); + } + /** * constructor to create sink gRPC server with gRPC config. * @@ -46,7 +52,7 @@ public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) { this.shutdownSignal = new CompletableFuture<>(); this.service = new Service(batchMapper, this.shutdownSignal); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.server = new GrpcServerWrapper(this.grpcConfig, this.service); } /** @@ -56,37 +62,22 @@ public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) { */ public void start() throws Exception { GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath(), + this.serverInfoAccessor, + this.grpcConfig.getSocketPath(), + this.grpcConfig.getInfoFilePath(), ContainerType.MAPPER, Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE)); - if (this.server == null) { - this.server = grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - - server.start(); + this.server.start(); - log.info( - "server started, listening on socket path: " + grpcConfig.getSocketPath()); + log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath()); // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -94,18 +85,11 @@ public void start() throws Exception { })); // if there are any exceptions, shutdown the server gracefully. - shutdownSignal.whenCompleteAsync((v, e) -> { - if (server != null && server.isTerminated()) { - return; - } - + this.shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) { System.err.println("*** shutting down batch map gRPC server because of an exception - " + e.getMessage()); try { - log.info("stopping server"); - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException ex) { Thread.interrupted(); ex.printStackTrace(System.err); @@ -123,7 +107,7 @@ public void start() throws Exception { */ public void awaitTermination() throws InterruptedException { log.info("batch map server is waiting for termination"); - server.awaitTermination(); + this.server.awaitTermination(); log.info("batch map server has terminated"); } @@ -134,25 +118,7 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { + this.server.gracefullyShutdown(); this.service.shutDown(); - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - // force shutdown if not terminated - if (!server.isTerminated()) { - server.shutdownNow(); - } - } - } - - /** - * Set server builder for testing. - * - * @param serverBuilder in process server builder can be used for testing - */ - @VisibleForTesting - public void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); } } diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index fd967cd2..0d320a19 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -185,15 +185,16 @@ private HandlerDatum constructHandlerDatum(MapOuterClass.MapRequest d) { ); } - // Shuts down the executor service which is used for batch map + // Shuts down the executor service public void shutDown() { this.mapTaskExecutor.shutdown(); try { if (!mapTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { log.error("BatchMap executor did not terminate in the specified time."); List droppedTasks = mapTaskExecutor.shutdownNow(); - log.error("BatchMap executor was abruptly shut down. " + droppedTasks.size() - + " tasks will not be executed."); + log.error( + "BatchMap executor was abruptly shut down. {} tasks will not be executed.", + droppedTasks.size()); } else { log.info("BatchMap executor was terminated."); } diff --git a/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java index ba2b65e2..495d4105 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.mapper; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; import lombok.Builder; import lombok.Getter; @@ -8,7 +9,7 @@ */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; diff --git a/src/main/java/io/numaproj/numaflow/mapper/Server.java b/src/main/java/io/numaproj/numaflow/mapper/Server.java index b7376d49..c5f638b8 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Server.java @@ -1,17 +1,17 @@ package io.numaproj.numaflow.mapper; import com.fasterxml.jackson.databind.ObjectMapper; -import io.grpc.ServerBuilder; +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ServerInterceptor; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; -import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; /** * Server is the gRPC server for executing map operation. @@ -20,11 +20,9 @@ public class Server { private final GRPCConfig grpcConfig; - private final Service service; private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final GrpcServerWrapper server; /** * constructor to create gRPC server. @@ -43,9 +41,18 @@ public Server(Mapper mapper) { */ public Server(Mapper mapper, GRPCConfig grpcConfig) { this.shutdownSignal = new CompletableFuture<>(); - this.service = new Service(mapper, this.shutdownSignal); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.server = new GrpcServerWrapper(this.grpcConfig, new Service(mapper, this.shutdownSignal)); + } + + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, Mapper service, ServerInterceptor interceptor, String serverName) { + this.grpcConfig = grpcConfig; + this.shutdownSignal = new CompletableFuture<>(); + this.server = new GrpcServerWrapper( + interceptor, + serverName, + new Service(service, this.shutdownSignal)); } /** @@ -56,43 +63,28 @@ public Server(Mapper mapper, GRPCConfig grpcConfig) { * @throws Exception if the server fails to start */ public void start() throws Exception { - - if (!grpcConfig.isLocal()) { + if (!this.grpcConfig.isLocal()) { GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath(), + this.serverInfoAccessor, + this.grpcConfig.getSocketPath(), + this.grpcConfig.getInfoFilePath(), ContainerType.MAPPER, Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE)); } - if (this.server == null) { - this.server = this.grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - - server.start(); + this.server.start(); log.info( "server started, listening on {}", - grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + this.grpcConfig.isLocal() ? + "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); // FIXME - this is a workaround to immediately terminate the JVM process // The correct way to do this is to stop all the actors and wait for them to terminate System.exit(0); @@ -103,18 +95,11 @@ public void start() throws Exception { })); // if there are any exceptions, shutdown the server gracefully. - shutdownSignal.whenCompleteAsync((v, e) -> { - if (server.isTerminated()) { - return; - } - + this.shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) { System.err.println("*** shutting down mapper gRPC server because of an exception - " + e.getMessage()); try { - log.info("stopping server"); - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); // FIXME - this is a workaround to immediately terminate the JVM process // The correct way to do this is to stop all the actors and wait for them to terminate System.exit(0); @@ -135,7 +120,7 @@ public void start() throws Exception { */ public void awaitTermination() throws InterruptedException { log.info("mapper server is waiting for termination"); - server.awaitTermination(); + this.server.awaitTermination(); log.info("mapper server has terminated"); } @@ -146,24 +131,6 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - // force shutdown if not terminated - if (!server.isTerminated()) { - server.shutdownNow(); - } - } - } - - /** - * Sets the server builder. This method can be used for testing purposes to provide a different - * grpc server builder. - * - * @param serverBuilder the server builder to be used - */ - public void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); + this.server.gracefullyShutdown(); } } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java index 686d4a9e..a0316133 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.mapstreamer; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; import lombok.Builder; import lombok.Getter; @@ -8,7 +9,7 @@ */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java index 543a5707..2f286761 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java @@ -2,17 +2,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; -import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; /** * Server is the gRPC server for executing map operation. @@ -21,11 +20,9 @@ public class Server { private final GRPCConfig grpcConfig; - private final Service service; private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final GrpcServerWrapper server; /** * constructor to create sink gRPC server. @@ -44,9 +41,18 @@ public Server(MapStreamer mapStreamer) { */ public Server(MapStreamer mapStreamer, GRPCConfig grpcConfig) { this.shutdownSignal = new CompletableFuture<>(); - this.service = new Service(mapStreamer, this.shutdownSignal); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.server = new GrpcServerWrapper(this.grpcConfig, new Service(mapStreamer, this.shutdownSignal)); + } + + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, MapStreamer service, ServerInterceptor interceptor, String serverName) { + this.grpcConfig = grpcConfig; + this.shutdownSignal = new CompletableFuture<>(); + this.server = new GrpcServerWrapper( + interceptor, + serverName, + new Service(service, this.shutdownSignal)); } /** @@ -56,37 +62,22 @@ public Server(MapStreamer mapStreamer, GRPCConfig grpcConfig) { */ public void start() throws Exception { GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath(), + this.serverInfoAccessor, + this.grpcConfig.getSocketPath(), + this.grpcConfig.getInfoFilePath(), ContainerType.MAPPER, Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE)); - if (this.server == null) { - this.server = this.grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - - server.start(); + this.server.start(); - log.info( - "server started, listening on socket path: " + grpcConfig.getSocketPath()); + log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath()); // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down map streamer gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -94,18 +85,11 @@ public void start() throws Exception { })); // if there are any exceptions, shutdown the server gracefully. - shutdownSignal.whenCompleteAsync((v, e) -> { - if (server.isTerminated()) { - return; - } - + this.shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) { System.err.println("*** shutting down map streamer gRPC server because of an exception - " + e.getMessage()); try { - log.info("stopping server"); - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException ex) { Thread.interrupted(); ex.printStackTrace(System.err); @@ -123,7 +107,7 @@ public void start() throws Exception { */ public void awaitTermination() throws InterruptedException { log.info("map stream server is waiting for termination"); - server.awaitTermination(); + this.server.awaitTermination(); log.info("map stream server has terminated"); } @@ -134,24 +118,6 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - // force shutdown if not terminated - if (!server.isTerminated()) { - server.shutdownNow(); - } - } - } - - /** - * Set server builder for testing. - * - * @param serverBuilder in process server builder can be used for testing - */ - @VisibleForTesting - public void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); + this.server.gracefullyShutdown(); } } diff --git a/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java index e3f8bb62..68a74f59 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.reducer; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; import lombok.Builder; import lombok.Getter; @@ -8,7 +9,7 @@ */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; diff --git a/src/main/java/io/numaproj/numaflow/reducer/Server.java b/src/main/java/io/numaproj/numaflow/reducer/Server.java index 478be133..2aaad86d 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Server.java @@ -2,16 +2,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; -import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.TimeUnit; - /** * Server is the gRPC server for executing reduce operation. */ @@ -19,10 +17,8 @@ public class Server { private final GRPCConfig grpcConfig; - private final Service service; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final GrpcServerWrapper server; /** * constructor to create gRPC server. @@ -40,9 +36,17 @@ public Server(ReducerFactory reducerFactory) { * @param reducerFactory to process the message */ public Server(ReducerFactory reducerFactory, GRPCConfig grpcConfig) { - this.service = new Service(reducerFactory); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.server = new GrpcServerWrapper(this.grpcConfig, new Service(reducerFactory)); + } + + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, ReducerFactory service, ServerInterceptor interceptor, String serverName) { + this.grpcConfig = grpcConfig; + this.server = new GrpcServerWrapper( + interceptor, + serverName, + new Service(service)); } /** @@ -51,24 +55,15 @@ public Server(ReducerFactory reducerFactory, GRPCConfig grpcC * @throws Exception if server fails to start */ public void start() throws Exception { - if (!grpcConfig.isLocal()) { + if (!this.grpcConfig.isLocal()) { GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath(), + this.serverInfoAccessor, + this.grpcConfig.getSocketPath(), + this.grpcConfig.getInfoFilePath(), ContainerType.REDUCER); } - if (this.server == null) { - this.server = this.grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - - server.start(); + this.server.start(); log.info( "server started, listening on {}", @@ -79,13 +74,8 @@ public void start() throws Exception { Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -102,7 +92,7 @@ public void start() throws Exception { */ public void awaitTermination() throws InterruptedException { log.info("reducer server is waiting for termination"); - server.awaitTermination(); + this.server.awaitTermination(); log.info("reducer server has terminated"); } @@ -113,24 +103,6 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - // force shutdown if not terminated - if (!server.isTerminated()) { - server.shutdownNow(); - } - } - } - - /** - * Set server builder for testing. - * - * @param serverBuilder - */ - @VisibleForTesting - void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); + this.server.gracefullyShutdown(); } } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java index c008b5b0..7ec40aa8 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.reducestreamer; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; import lombok.Builder; import lombok.Getter; @@ -8,7 +9,7 @@ */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; @Builder.Default diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java index 941292d5..a0946df8 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java @@ -2,28 +2,24 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; -import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.TimeUnit; - /** * Server is the gRPC server for executing reduce stream operation. */ @Slf4j public class Server { private final GRPCConfig grpcConfig; - private final Service service; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final GrpcServerWrapper server; /** * constructor to create gRPC server. @@ -43,9 +39,17 @@ public Server(ReduceStreamerFactory reduceStreamerFact public Server( ReduceStreamerFactory reduceStreamerFactory, GRPCConfig grpcConfig) { - this.service = new Service(reduceStreamerFactory); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.server = new GrpcServerWrapper(this.grpcConfig, new Service(reduceStreamerFactory)); + } + + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, ReduceStreamerFactory service, ServerInterceptor interceptor, String serverName) { + this.grpcConfig = grpcConfig; + this.server = new GrpcServerWrapper( + interceptor, + serverName, + new Service(service)); } /** @@ -54,41 +58,27 @@ public Server( * @throws Exception if server fails to start */ public void start() throws Exception { - if (!grpcConfig.isLocal()) { + if (!this.grpcConfig.isLocal()) { GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath(), + this.serverInfoAccessor, + this.grpcConfig.getSocketPath(), + this.grpcConfig.getInfoFilePath(), ContainerType.REDUCE_STREAMER); } - if (this.server == null) { - this.server = this.grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - - server.start(); + this.server.start(); log.info( "server started, listening on {}", - grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + this.grpcConfig.isLocal() ? + "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -105,7 +95,7 @@ public void start() throws Exception { */ public void awaitTermination() throws InterruptedException { log.info("reduce stream server is waiting for termination"); - server.awaitTermination(); + this.server.awaitTermination(); log.info("reduce stream server terminated"); } @@ -116,24 +106,6 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - // force shutdown if not terminated - if (!server.isTerminated()) { - server.shutdownNow(); - } - } - } - - /** - * Set server builder for testing. - * - * @param serverBuilder - */ - @VisibleForTesting - void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); + this.server.gracefullyShutdown(); } } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java index 2c348286..675c4a87 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sessionreducer; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; import lombok.Builder; import lombok.Getter; @@ -8,7 +9,7 @@ */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java index 4c268b97..eb0a4308 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java @@ -2,28 +2,24 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.sessionreducer.model.SessionReducer; import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory; -import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.TimeUnit; - /** * Server is the gRPC server for executing session reduce operations. */ @Slf4j public class Server { private final GRPCConfig grpcConfig; - private final Service service; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final GrpcServerWrapper server; /** * constructor to create gRPC server. @@ -43,9 +39,17 @@ public Server(SessionReducerFactory sessionReducerFact public Server( SessionReducerFactory sessionReducerFactory, GRPCConfig grpcConfig) { - this.service = new Service(sessionReducerFactory); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sessionReducerFactory)); + } + + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, SessionReducerFactory service, ServerInterceptor interceptor, String serverName) { + this.grpcConfig = grpcConfig; + this.server = new GrpcServerWrapper( + interceptor, + serverName, + new Service(service)); } /** @@ -54,41 +58,27 @@ public Server( * @throws Exception if server fails to start */ public void start() throws Exception { - if (!grpcConfig.isLocal()) { + if (!this.grpcConfig.isLocal()) { GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath(), + this.serverInfoAccessor, + this.grpcConfig.getSocketPath(), + this.grpcConfig.getInfoFilePath(), ContainerType.SESSION_REDUCER); } - if (this.server == null) { - this.server = this.grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - - server.start(); + this.server.start(); log.info( "server started, listening on {}", - grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + this.grpcConfig.isLocal() ? + "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -105,7 +95,7 @@ public void start() throws Exception { */ public void awaitTermination() throws InterruptedException { log.info("session reduce server is waiting for termination"); - server.awaitTermination(); + this.server.awaitTermination(); log.info("session reduce server terminated"); } @@ -116,24 +106,6 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - // force shutdown if not terminated - if (!server.isTerminated()) { - server.shutdownNow(); - } - } - } - - /** - * Set server builder for testing. - * - * @param serverBuilder for building the server - */ - @VisibleForTesting - void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); + this.server.gracefullyShutdown(); } } diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcConfigRetriever.java b/src/main/java/io/numaproj/numaflow/shared/GrpcConfigRetriever.java new file mode 100644 index 00000000..a144853c --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/shared/GrpcConfigRetriever.java @@ -0,0 +1,13 @@ +package io.numaproj.numaflow.shared; + +// Currently each of the UDFs (Mapper, BatchMapper, Sourcer) has its own GrpcConfig class. +// To start a gRPC server, we need to pass gRPC configurations to the GrpcServerWrapper. +// In order to make the GrpcServerWrapper more generic, we create this GrpcConfigRetriever interface, +// which is implemented by the UDFs' GrpcConfig classes. +public interface GrpcConfigRetriever { + String getSocketPath(); + int getMaxMessageSize(); + String getInfoFilePath(); + int getPort(); + boolean isLocal(); +} diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java index 9f90c737..cd85dfa3 100644 --- a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java +++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java @@ -1,15 +1,7 @@ package io.numaproj.numaflow.shared; import io.grpc.Context; -import io.grpc.Contexts; -import io.grpc.ForwardingServerCallListener; import io.grpc.Metadata; -import io.grpc.ServerBuilder; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.Status; -import io.grpc.netty.NettyServerBuilder; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.epoll.EpollEventLoopGroup; @@ -17,7 +9,6 @@ import io.netty.channel.kqueue.KQueue; import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.kqueue.KQueueServerDomainSocketChannel; -import io.netty.channel.unix.DomainSocketAddress; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.Language; import io.numaproj.numaflow.info.Protocol; diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java similarity index 61% rename from src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java rename to src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java index 9fe59cda..39a6580c 100644 --- a/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java +++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.shared; +import com.google.common.annotations.VisibleForTesting; import io.grpc.BindableService; import io.grpc.Context; import io.grpc.Contexts; @@ -10,20 +11,90 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; +import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.netty.NettyServerBuilder; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_END; import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_START; import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_END_TIME; import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_START_TIME; -public class GrpcServerHelper { +/** + * GrpcServerWrapper is a wrapper class for gRPC server. + * It takes care of creating, starting and gracefully shutting down the server. + */ +@Slf4j +public class GrpcServerWrapper { + private final Server server; private EventLoopGroup bossEventLoopGroup; private EventLoopGroup workerEventLoopGroup; - public void gracefullyShutdownEventLoopGroups() { + public GrpcServerWrapper( + GrpcConfigRetriever grpcConfigRetriever, + BindableService service) { + this.server = createServer( + grpcConfigRetriever.getSocketPath(), + grpcConfigRetriever.getMaxMessageSize(), + grpcConfigRetriever.isLocal(), + grpcConfigRetriever.getPort(), + service); + } + + @VisibleForTesting + public GrpcServerWrapper( + ServerInterceptor interceptor, + String serverName, + BindableService service) { + if (interceptor == null) { + this.server = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(service) + .build(); + return; + } + this.server = InProcessServerBuilder.forName(serverName) + .intercept(interceptor) + .directExecutor() + .addService(service) + .build(); + } + + public void start() throws Exception { + if (this.server == null) { + throw new IllegalStateException("Server is not initialized"); + } + this.server.start(); + } + + public void awaitTermination() throws InterruptedException { + this.server.awaitTermination(); + // if the server has been terminated, we should expect the event loop groups to be terminated as well. + if (!(this.workerEventLoopGroup.awaitTermination(30, TimeUnit.SECONDS) && + this.bossEventLoopGroup.awaitTermination(30, TimeUnit.SECONDS))) { + log.error("Timed out to gracefully shutdown event loop groups"); + throw new InterruptedException("Timed out to gracefully shutdown event loop groups"); + } + } + + public void gracefullyShutdown() throws InterruptedException{ + if (this.server == null || this.server.isTerminated()) { + return; + } + log.info("stopping gRPC server..."); + this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + if (!this.server.isTerminated()) { + this.server.shutdownNow(); + } + log.info("gracefully shutting down event loop groups..."); + this.gracefullyShutdownEventLoopGroups(); + } + + private void gracefullyShutdownEventLoopGroups() { if (this.bossEventLoopGroup != null) { this.bossEventLoopGroup.shutdownGracefully(); } @@ -32,7 +103,7 @@ public void gracefullyShutdownEventLoopGroups() { } } - public Server createServer( + private Server createServer( String socketPath, int maxMessageSize, boolean isLocal, diff --git a/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java index 048e0961..49cab82e 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sideinput; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; import lombok.Builder; import lombok.Getter; @@ -8,7 +9,7 @@ */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java index 1407d6b1..acfe3297 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java @@ -2,16 +2,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; -import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.TimeUnit; - /** * Server is the gRPC server for retrieving side input. */ @@ -19,10 +17,8 @@ public class Server { private final GRPCConfig grpcConfig; - private final Service service; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final GrpcServerWrapper server; /** * constructor to create gRPC server. @@ -40,9 +36,17 @@ public Server(SideInputRetriever sideInputRetriever) { * @param sideInputRetriever to retrieve the side input */ public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) { - this.service = new Service(sideInputRetriever); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sideInputRetriever)); + } + + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, SideInputRetriever service, ServerInterceptor interceptor, String serverName) { + this.grpcConfig = grpcConfig; + this.server = new GrpcServerWrapper( + interceptor, + serverName, + new Service(service)); } /** @@ -51,41 +55,26 @@ public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) { * @throws Exception if server fails to start */ public void start() throws Exception { - if (!grpcConfig.isLocal()) { + if (!this.grpcConfig.isLocal()) { GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath(), + this.serverInfoAccessor, + this.grpcConfig.getSocketPath(), + this.grpcConfig.getInfoFilePath(), ContainerType.SIDEINPUT); } - if (this.server == null) { - this.server = grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - - server.start(); + this.server.start(); log.info( "server started, listening on {}", - grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + this.grpcConfig.isLocal() ? + "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -102,7 +91,7 @@ public void start() throws Exception { */ public void awaitTermination() throws InterruptedException { log.info("side input server is waiting for termination"); - server.awaitTermination(); + this.server.awaitTermination(); log.info("side input server has terminated"); } @@ -113,23 +102,6 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - if (!server.isTerminated()) { - server.shutdownNow(); - } - } - } - - /** - * Set server builder for testing. - * - * @param serverBuilder in process server builder can be used for testing - */ - @VisibleForTesting - public void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); + this.server.gracefullyShutdown(); } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java index 673339a0..c528657e 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java @@ -3,12 +3,14 @@ import lombok.Builder; import lombok.Getter; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; + /** * GRPCConfig is used to provide configurations for gRPC server. */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index 65c8d7da..f18b23f7 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -1,16 +1,19 @@ package io.numaproj.numaflow.sinker; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import io.grpc.BindableService; import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; +import io.grpc.inprocess.InProcessServerBuilder; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; -import io.numaproj.numaflow.shared.GrpcServerHelper; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; /** * Server is the gRPC server for executing user defined sinks. @@ -19,11 +22,10 @@ public class Server { private final GRPCConfig grpcConfig; - private final Service service; private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final Service service; + private final GrpcServerWrapper server; /** * constructor to create sink gRPC server. @@ -42,9 +44,9 @@ public Server(Sinker sinker) { */ public Server(Sinker sinker, GRPCConfig grpcConfig) { this.shutdownSignal = new CompletableFuture<>(); - this.service = new Service(sinker, this.shutdownSignal); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.service = new Service(sinker, this.shutdownSignal); + this.server = new GrpcServerWrapper(grpcConfig, this.service); } /** @@ -61,15 +63,6 @@ public void start() throws Exception { ContainerType.SINKER); } - if (this.server == null) { - this.server = this.grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - server.start(); log.info( @@ -81,13 +74,8 @@ public void start() throws Exception { Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down sink gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -96,17 +84,10 @@ public void start() throws Exception { // if there are any exceptions, shutdown the server gracefully. shutdownSignal.whenCompleteAsync((v, e) -> { - if (server.isTerminated()) { - return; - } - if (e != null) { System.err.println("*** shutting down sink gRPC server because of an exception - " + e.getMessage()); try { - log.info("stopping server"); - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException ex) { Thread.interrupted(); ex.printStackTrace(System.err); @@ -135,25 +116,15 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { + server.gracefullyShutdown(); this.service.shutDown(); - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - // force shutdown if not terminated - if (!server.isTerminated()) { - server.shutdownNow(); - } - } } - /** - * Sets the server builder. This method can be used for testing purposes to provide a different - * grpc server builder. - * - * @param serverBuilder the server builder to be used - */ - public void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, Sinker sinker, ServerInterceptor interceptor, String serverName) { + this.grpcConfig = grpcConfig; + this.shutdownSignal = new CompletableFuture<>(); + this.service = new Service(sinker, this.shutdownSignal); + this.server = new GrpcServerWrapper(interceptor, serverName, this.service); } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 1cd52014..95f105b8 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -154,7 +154,7 @@ private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) { ); } - // shuts down the executor service which is used for reduce + // shuts down the executor service public void shutDown() { this.sinkTaskExecutor.shutdown(); try { @@ -166,8 +166,9 @@ public void shutDown() { if (!sinkTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { log.error("Sink executor did not terminate in the specified time."); List droppedTasks = sinkTaskExecutor.shutdownNow(); - log.error("Sink executor was abruptly shut down. " + droppedTasks.size() - + " tasks will not be executed."); + log.error( + "Sink executor was abruptly shut down. {} tasks will not be executed.", + droppedTasks.size()); } else { log.info("Sink executor was terminated."); } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java index 46b2c28e..2977f9a0 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sourcer; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; import lombok.Builder; import lombok.Getter; @@ -8,7 +9,7 @@ */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Server.java b/src/main/java/io/numaproj/numaflow/sourcer/Server.java index a91eb5fb..25978766 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Server.java @@ -2,16 +2,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; -import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; /** * Server is the gRPC server for reading from source. @@ -20,11 +19,9 @@ public class Server { private final GRPCConfig grpcConfig; - private final Service service; private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final GrpcServerWrapper server; /** * constructor to create gRPC server. @@ -43,9 +40,18 @@ public Server(Sourcer sourcer) { */ public Server(Sourcer sourcer, GRPCConfig grpcConfig) { this.shutdownSignal = new CompletableFuture<>(); - this.service = new Service(sourcer, this.shutdownSignal); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sourcer, this.shutdownSignal)); + } + + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, Sourcer service, ServerInterceptor interceptor, String serverName) { + this.shutdownSignal = new CompletableFuture<>(); + this.grpcConfig = grpcConfig; + this.server = new GrpcServerWrapper( + interceptor, + serverName, + new Service(service, this.shutdownSignal)); } /** @@ -54,41 +60,27 @@ public Server(Sourcer sourcer, GRPCConfig grpcConfig) { * @throws Exception if server fails to start */ public void start() throws Exception { - if (!grpcConfig.isLocal()) { + if (!this.grpcConfig.isLocal()) { GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath(), + this.serverInfoAccessor, + this.grpcConfig.getSocketPath(), + this.grpcConfig.getInfoFilePath(), ContainerType.SOURCER); } - if (this.server == null) { - this.server = grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - - server.start(); + this.server.start(); log.info( "server started, listening on {}", - grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + this.grpcConfig.isLocal() ? + "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down source gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -96,18 +88,11 @@ public void start() throws Exception { })); // if there are any exceptions, shutdown the server gracefully. - shutdownSignal.whenCompleteAsync((v, e) -> { - if (server.isTerminated()) { - return; - } - + this.shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) { System.err.println("*** shutting down source gRPC server because of an exception - " + e.getMessage()); try { - log.info("stopping server"); - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); } catch (InterruptedException ex) { Thread.interrupted(); ex.printStackTrace(System.err); @@ -125,7 +110,7 @@ public void start() throws Exception { */ public void awaitTermination() throws InterruptedException { log.info("waiting for server to terminate"); - server.awaitTermination(); + this.server.awaitTermination(); log.info("server has terminated"); } @@ -136,24 +121,6 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - // force shutdown if not terminated - if (!server.isTerminated()) { - server.shutdownNow(); - } - } - } - - /** - * Set server builder for testing. - * - * @param serverBuilder in process server builder can be used for testing - */ - @VisibleForTesting - public void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); + this.server.gracefullyShutdown(); } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java index 445a0f48..056a17fb 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sourcetransformer; +import io.numaproj.numaflow.shared.GrpcConfigRetriever; import lombok.Builder; import lombok.Getter; @@ -8,7 +9,7 @@ */ @Getter @Builder(builderMethodName = "newBuilder") -public class GRPCConfig { +public class GRPCConfig implements GrpcConfigRetriever { @Builder.Default private String socketPath = Constants.DEFAULT_SOCKET_PATH; diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java index fcf42bcd..43731c17 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java @@ -2,16 +2,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; -import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; /** * Server is the gRPC server for executing source transformer operation. @@ -20,11 +19,9 @@ public class Server { private final GRPCConfig grpcConfig; - private final Service service; private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); - private io.grpc.Server server; - private final GrpcServerHelper grpcServerHelper; + private final GrpcServerWrapper server; /** * constructor to create gRPC server. @@ -43,9 +40,18 @@ public Server(SourceTransformer sourceTransformer) { */ public Server(SourceTransformer sourceTransformer, GRPCConfig grpcConfig) { this.shutdownSignal = new CompletableFuture<>(); - this.service = new Service(sourceTransformer, this.shutdownSignal); this.grpcConfig = grpcConfig; - this.grpcServerHelper = new GrpcServerHelper(); + this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sourceTransformer, this.shutdownSignal)); + } + + @VisibleForTesting + protected Server(GRPCConfig grpcConfig, SourceTransformer service, ServerInterceptor interceptor, String serverName) { + this.shutdownSignal = new CompletableFuture<>(); + this.grpcConfig = grpcConfig; + this.server = new GrpcServerWrapper( + interceptor, + serverName, + new Service(service, this.shutdownSignal)); } /** @@ -54,41 +60,27 @@ public Server(SourceTransformer sourceTransformer, GRPCConfig grpcConfig) { * @throws Exception if server fails to start */ public void start() throws Exception { - if (!grpcConfig.isLocal()) { + if (!this.grpcConfig.isLocal()) { GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath(), + this.serverInfoAccessor, + this.grpcConfig.getSocketPath(), + this.grpcConfig.getInfoFilePath(), ContainerType.SOURCE_TRANSFORMER); } - if (this.server == null) { - this.server = this.grpcServerHelper.createServer( - grpcConfig.getSocketPath(), - grpcConfig.getMaxMessageSize(), - grpcConfig.isLocal(), - grpcConfig.getPort(), - this.service); - } - - server.start(); + this.server.start(); log.info( "server started, listening on {}", - grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + this.grpcConfig.isLocal() ? + "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); - if (server != null && server.isTerminated()) { - return; - } - try { - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + try { + this.stop(); // FIXME - this is a workaround to immediately terminate the JVM process // The correct way to do this is to stop all the actors and wait for them to terminate System.exit(0); @@ -99,18 +91,11 @@ public void start() throws Exception { })); // if there are any exceptions, shutdown the server gracefully. - shutdownSignal.whenCompleteAsync((v, e) -> { - if (server.isTerminated()) { - return; - } - + this.shutdownSignal.whenCompleteAsync((v, e) -> { if (e != null) { System.err.println("*** shutting down sink gRPC server because of an exception - " + e.getMessage()); try { - log.info("stopping server"); - Server.this.stop(); - log.info("gracefully shutting down event loop groups"); - this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + this.stop(); // FIXME - this is a workaround to immediately terminate the JVM process // The correct way to do this is to stop all the actors and wait for them to terminate System.exit(0); @@ -131,7 +116,7 @@ public void start() throws Exception { */ public void awaitTermination() throws InterruptedException { log.info("transformer server is waiting for termination"); - server.awaitTermination(); + this.server.awaitTermination(); log.info("transformer server has terminated"); } @@ -142,24 +127,6 @@ public void awaitTermination() throws InterruptedException { * @throws InterruptedException if shutdown is interrupted */ public void stop() throws InterruptedException { - if (server != null) { - server.shutdown().awaitTermination(30, TimeUnit.SECONDS); - // force shutdown if not terminated - if (!server.isTerminated()) { - server.shutdownNow(); - } - } - } - - /** - * Set server builder for testing. - * - * @param serverBuilder in process server builder can be used for testing - */ - @VisibleForTesting - public void setServerBuilder(ServerBuilder serverBuilder) { - this.server = serverBuilder - .addService(this.service) - .build(); + this.server.gracefullyShutdown(); } } diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java index 3ba1fd13..23e11a5a 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java @@ -1,14 +1,7 @@ package io.numaproj.numaflow.batchmapper; import com.google.protobuf.ByteString; -import io.grpc.Context; -import io.grpc.Contexts; -import io.grpc.ForwardingServerCallListener; import io.grpc.ManagedChannel; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; @@ -35,46 +28,6 @@ public class ServerErrTest { @Before public void setUp() throws Exception { - ServerInterceptor interceptor = new ServerInterceptor() { - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - io.grpc.Metadata headers, - ServerCallHandler next) { - - final var context = - Context.current(); - ServerCall.Listener listener = Contexts.interceptCall( - context, - call, - headers, - next); - return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>( - listener) { - @Override - public void onHalfClose() { - try { - super.onHalfClose(); - } catch (RuntimeException ex) { - handleException(ex, call, headers); - throw ex; - } - } - - private void handleException( - RuntimeException e, - ServerCall serverCall, - io.grpc.Metadata headers) { - // Currently, we only have application level exceptions. - // Translate it to UNKNOWN status. - var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e); - var newStatus = Status.fromThrowable(status.asException()); - serverCall.close(newStatus, headers); - } - }; - } - }; - String serverName = InProcessServerBuilder.generateName(); GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() @@ -84,12 +37,10 @@ private void handleException( .build(); server = new Server( + grpcServerConfig, new TestMapFn(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java index 3b4f5385..f15a9eb2 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerTest.java @@ -41,11 +41,10 @@ public void setUp() throws Exception { .build(); server = new Server( + grpcServerConfig, new TestMapFn(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java index ec398c2c..3fc18f4d 100644 --- a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java @@ -82,12 +82,10 @@ private void handleException( .build(); server = new Server( + grpcServerConfig, new TestMapFnErr(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + interceptor, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java b/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java index 553a8b6d..93a32f8f 100644 --- a/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/mapper/ServerTest.java @@ -39,11 +39,10 @@ public void setUp() throws Exception { .build(); server = new Server( + grpcServerConfig, new TestMapFn(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java index 9559e2a8..43355d04 100644 --- a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java @@ -1,14 +1,7 @@ package io.numaproj.numaflow.mapstreamer; import com.google.protobuf.ByteString; -import io.grpc.Context; -import io.grpc.Contexts; -import io.grpc.ForwardingServerCallListener; import io.grpc.ManagedChannel; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.testing.GrpcCleanupRule; @@ -31,46 +24,6 @@ public class ServerErrTest { @Before public void setUp() throws Exception { - ServerInterceptor interceptor = new ServerInterceptor() { - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - io.grpc.Metadata headers, - ServerCallHandler next) { - - final var context = - Context.current(); - ServerCall.Listener listener = Contexts.interceptCall( - context, - call, - headers, - next); - return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>( - listener) { - @Override - public void onHalfClose() { - try { - super.onHalfClose(); - } catch (RuntimeException ex) { - handleException(ex, call, headers); - throw ex; - } - } - - private void handleException( - RuntimeException e, - ServerCall serverCall, - io.grpc.Metadata headers) { - // Currently, we only have application level exceptions. - // Translate it to UNKNOWN status. - var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e); - var newStatus = Status.fromThrowable(status.asException()); - serverCall.close(newStatus, headers); - } - }; - } - }; - String serverName = InProcessServerBuilder.generateName(); GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() @@ -80,12 +33,10 @@ private void handleException( .build(); server = new Server( + grpcServerConfig, new TestMapStreamFnErr(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerTest.java index 5e65bc11..6f448b7a 100644 --- a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerTest.java @@ -40,11 +40,10 @@ public void setUp() throws Exception { .build(); server = new Server( + grpcServerConfig, new TestMapStreamFn(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java index 2efe8259..6d4b15e0 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java @@ -67,12 +67,10 @@ public ServerCall.Listener interceptCall( .build(); server = new Server( + grpcServerConfig, new ReduceErrTestFactory(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + interceptor, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java index 14164476..880c2f0e 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java @@ -69,12 +69,10 @@ public ServerCall.Listener interceptCall( .build(); server = new Server( + grpcServerConfig, new ReduceTestFactory(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + interceptor, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java index cbd13c48..97c7d709 100644 --- a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java @@ -71,12 +71,10 @@ public ServerCall.Listener interceptCall( .build(); server = new Server( + grpcServerConfig, new ReduceStreamerErrTestFactory(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + interceptor, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java index ec5b28d4..9f8faefa 100644 --- a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java @@ -75,12 +75,10 @@ public ServerCall.Listener interceptCall( .build(); server = new Server( + grpcServerConfig, new ReduceStreamerTestFactory(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + interceptor, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/sessionreducer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sessionreducer/ServerErrTest.java index 630d0ca1..5e7b63b1 100644 --- a/src/test/java/io/numaproj/numaflow/sessionreducer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sessionreducer/ServerErrTest.java @@ -2,13 +2,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; -import io.grpc.Context; -import io.grpc.Contexts; import io.grpc.ManagedChannel; -import io.grpc.Metadata; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; @@ -38,17 +32,6 @@ public class ServerErrTest { @Before public void setUp() throws Exception { - ServerInterceptor interceptor = new ServerInterceptor() { - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - Metadata headers, - ServerCallHandler next) { - final var context = Context.current(); - return Contexts.interceptCall(context, call, headers, next); - } - }; - String serverName = InProcessServerBuilder.generateName(); GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() @@ -58,12 +41,10 @@ public ServerCall.Listener interceptCall( .build(); server = new Server( + grpcServerConfig, new SessionReducerErrTestFactory(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/sessionreducer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sessionreducer/ServerTest.java index 93775e9a..e6ac3d9a 100644 --- a/src/test/java/io/numaproj/numaflow/sessionreducer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sessionreducer/ServerTest.java @@ -2,13 +2,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; -import io.grpc.Context; -import io.grpc.Contexts; import io.grpc.ManagedChannel; -import io.grpc.Metadata; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; @@ -40,17 +34,6 @@ public class ServerTest { @Before public void setUp() throws Exception { - ServerInterceptor interceptor = new ServerInterceptor() { - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - Metadata headers, - ServerCallHandler next) { - final var context = Context.current(); - return Contexts.interceptCall(context, call, headers, next); - } - }; - String serverName = InProcessServerBuilder.generateName(); GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() @@ -60,12 +43,10 @@ public ServerCall.Listener interceptCall( .build(); server = new Server( + grpcServerConfig, new SessionReducerTestFactory(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java b/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java index aaa33083..02a7aa3a 100644 --- a/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.sideinput; -import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import io.grpc.ManagedChannel; import io.grpc.inprocess.InProcessChannelBuilder; @@ -32,11 +31,10 @@ public void setUp() throws Exception { .build(); server = new Server( + grpcServerConfig, new TestSideInput(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .directExecutor()); + null, + serverName); server.start(); @@ -53,7 +51,6 @@ public void tearDown() throws Exception { @Test public void TestSideInputRetriever() { - ByteString inValue = ByteString.copyFromUtf8("invalue"); var stub = SideInputGrpc.newBlockingStub(inProcessChannel); // First call should return the broadcast message diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java index da88c8fb..61bfc51f 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java @@ -41,11 +41,10 @@ public void setUp() throws Exception { .build(); server = new Server( + grpcServerConfig, new TestSinkFnErr(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java index aba89491..62734c68 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/ServerTest.java @@ -42,11 +42,10 @@ public void setUp() throws Exception { .build(); server = new Server( + grpcServerConfig, new TestSinkFn(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java index 4a59006e..57b3c6ea 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java @@ -1,13 +1,6 @@ package io.numaproj.numaflow.sourcer; -import io.grpc.Context; -import io.grpc.Contexts; -import io.grpc.ForwardingServerCallListener; import io.grpc.ManagedChannel; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; @@ -33,46 +26,6 @@ public class ServerErrTest { @Before public void setUp() throws Exception { - ServerInterceptor interceptor = new ServerInterceptor() { - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - io.grpc.Metadata headers, - ServerCallHandler next) { - - final var context = - Context.current(); - ServerCall.Listener listener = Contexts.interceptCall( - context, - call, - headers, - next); - return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>( - listener) { - @Override - public void onHalfClose() { - try { - super.onHalfClose(); - } catch (RuntimeException ex) { - handleException(ex, call, headers); - throw ex; - } - } - - private void handleException( - RuntimeException e, - ServerCall serverCall, - io.grpc.Metadata headers) { - // Currently, we only have application level exceptions. - // Translate it to UNKNOWN status. - var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e); - var newStatus = Status.fromThrowable(status.asException()); - serverCall.close(newStatus, headers); - } - }; - } - }; - String serverName = InProcessServerBuilder.generateName(); GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() @@ -82,12 +35,10 @@ private void handleException( .build(); server = new Server( + grpcServerConfig, new TestSourcerErr(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java index 932d8e76..2e1796ed 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java @@ -43,11 +43,10 @@ public void setUp() throws Exception { .build(); server = new Server( + grpcServerConfig, new TestSourcer(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java index cc8a44da..03ab6ee0 100644 --- a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java @@ -1,18 +1,10 @@ package io.numaproj.numaflow.sourcetransformer; import com.google.protobuf.ByteString; -import io.grpc.Context; -import io.grpc.Contexts; -import io.grpc.ForwardingServerCallListener; import io.grpc.ManagedChannel; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.testing.GrpcCleanupRule; -import io.numaproj.numaflow.map.v1.MapGrpc; import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc; import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; import org.junit.After; @@ -34,46 +26,6 @@ public class ServerErrTest { @Before public void setUp() throws Exception { - ServerInterceptor interceptor = new ServerInterceptor() { - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - io.grpc.Metadata headers, - ServerCallHandler next) { - - final var context = - Context.current(); - ServerCall.Listener listener = Contexts.interceptCall( - context, - call, - headers, - next); - return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>( - listener) { - @Override - public void onHalfClose() { - try { - super.onHalfClose(); - } catch (RuntimeException ex) { - handleException(ex, call, headers); - throw ex; - } - } - - private void handleException( - RuntimeException e, - ServerCall serverCall, - io.grpc.Metadata headers) { - // Currently, we only have application level exceptions. - // Translate it to UNKNOWN status. - var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e); - var newStatus = Status.fromThrowable(status.asException()); - serverCall.close(newStatus, headers); - } - }; - } - }; - String serverName = InProcessServerBuilder.generateName(); GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() @@ -83,12 +35,10 @@ private void handleException( .build(); server = new Server( + grpcServerConfig, new SourceTransformerTestErr(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .intercept(interceptor) - .directExecutor()); + null, + serverName); server.start(); diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java index 666775c8..4975c8a6 100644 --- a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerTest.java @@ -41,11 +41,10 @@ public void setUp() throws Exception { .build(); server = new Server( + grpcServerConfig, new TestSourceTransformer(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .directExecutor()); + null, + serverName); server.start();