Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: abstract gRPC server #156

Merged
merged 6 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.map.evenodd.EvenOddFunction
Expand All @@ -156,6 +159,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>io.numaproj.numaflow.examples.sink.simple.SimpleSink
</mainClass>
Expand All @@ -172,6 +178,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.reduce.sum.SumFactory
Expand All @@ -189,6 +198,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducestreamer.sum.SumFactory
Expand Down Expand Up @@ -228,6 +240,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.reduce.count.CounterFactory
Expand All @@ -245,6 +260,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.sideinput.simple.SimpleSideInput
Expand All @@ -263,6 +281,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.sideinput.udf.SimpleMapWithSideInput
Expand Down Expand Up @@ -302,6 +323,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducesession.counter.CountFactory
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-inprocess</artifactId>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.batchmapper;

import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;

Expand All @@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;

Expand Down
78 changes: 22 additions & 56 deletions src/main/java/io/numaproj/numaflow/batchmapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* Server is the gRPC server for executing batch map operation.
Expand All @@ -24,8 +23,7 @@
private final Service service;
private final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;
private final GrpcServerWrapper server;

/**
* constructor to create sink gRPC server.
Expand All @@ -36,6 +34,14 @@
this(batchMapper, GRPCConfig.defaultGrpcConfig());
}

@VisibleForTesting
protected Server(GRPCConfig grpcConfig, BatchMapper service, ServerInterceptor interceptor, String serverName) {
this.grpcConfig = grpcConfig;
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(service, this.shutdownSignal);
this.server = new GrpcServerWrapper(interceptor, serverName, this.service);
}

/**
* constructor to create sink gRPC server with gRPC config.
*
Expand All @@ -46,7 +52,7 @@
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(batchMapper, this.shutdownSignal);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
this.server = new GrpcServerWrapper(this.grpcConfig, this.service);

Check warning on line 55 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L55

Added line #L55 was not covered by tests
}

/**
Expand All @@ -56,56 +62,34 @@
*/
public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath(),
this.serverInfoAccessor,
this.grpcConfig.getSocketPath(),
this.grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));

if (this.server == null) {
this.server = grpcServerHelper.createServer(
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort(),
this.service);
}

server.start();
this.server.start();

log.info(
"server started, listening on socket path: " + grpcConfig.getSocketPath());
log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath());

// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server != null && server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server != null && server.isTerminated()) {
return;
}

this.shutdownSignal.whenCompleteAsync((v, e) -> {
if (e != null) {
System.err.println("*** shutting down batch map gRPC server because of an exception - " + e.getMessage());
try {
log.info("stopping server");
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
this.stop();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
Expand All @@ -123,7 +107,7 @@
*/
public void awaitTermination() throws InterruptedException {
log.info("batch map server is waiting for termination");
server.awaitTermination();
this.server.awaitTermination();

Check warning on line 110 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L110

Added line #L110 was not covered by tests
log.info("batch map server has terminated");
}

Expand All @@ -134,25 +118,7 @@
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
this.server.gracefullyShutdown();
this.service.shutDown();
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
// force shutdown if not terminated
if (!server.isTerminated()) {
server.shutdownNow();
}
}
}

/**
* Set server builder for testing.
*
* @param serverBuilder in process server builder can be used for testing
*/
@VisibleForTesting
public void setServerBuilder(ServerBuilder<?> serverBuilder) {
this.server = serverBuilder
.addService(this.service)
.build();
}
}
7 changes: 4 additions & 3 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,16 @@
);
}

// Shuts down the executor service which is used for batch map
// Shuts down the executor service
public void shutDown() {
this.mapTaskExecutor.shutdown();
try {
if (!mapTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
log.error("BatchMap executor did not terminate in the specified time.");
List<Runnable> droppedTasks = mapTaskExecutor.shutdownNow();
log.error("BatchMap executor was abruptly shut down. " + droppedTasks.size()
+ " tasks will not be executed.");
log.error(

Check warning on line 195 in src/main/java/io/numaproj/numaflow/batchmapper/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Service.java#L195

Added line #L195 was not covered by tests
"BatchMap executor was abruptly shut down. {} tasks will not be executed.",
droppedTasks.size());

Check warning on line 197 in src/main/java/io/numaproj/numaflow/batchmapper/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Service.java#L197

Added line #L197 was not covered by tests
} else {
log.info("BatchMap executor was terminated.");
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.mapper;

import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;

Expand All @@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;

Expand Down
Loading
Loading