Skip to content

Commit

Permalink
chore: formalize gRPC errors in case of UDF exceptions (#166)
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 authored Feb 13, 2025
1 parent 097467c commit d1cdb8f
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 47 deletions.
18 changes: 14 additions & 4 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package io.numaproj.numaflow.batchmapper;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import io.numaproj.numaflow.shared.ExceptionUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -98,10 +103,15 @@ public void onNext(MapOuterClass.MapRequest mapRequest) {
} catch (Exception e) {
log.error("Encountered an error in batch map onNext", e);
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_BATCH_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}

Expand Down
18 changes: 14 additions & 4 deletions src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import io.grpc.Status;
import com.google.protobuf.Any;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import io.numaproj.numaflow.shared.ExceptionUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
Expand Down Expand Up @@ -106,10 +111,15 @@ private void handleFailure(Exception e) {
userException = e;
// only send the very first exception to the client
// one exception should trigger a container restart
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
activeMapperCount--;
}
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.numaproj.numaflow.shared;

import java.io.PrintWriter;
import java.io.StringWriter;

public class ExceptionUtils {
/**
* Formalized exception error strings
*/
public static final String ERR_SOURCE_EXCEPTION = "UDF_EXECUTION_ERROR(source)";
public static final String ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)";
public static final String ERR_SINK_EXCEPTION = "UDF_EXECUTION_ERROR(sink)";
public static final String ERR_MAP_STREAM_EXCEPTION = "UDF_EXECUTION_ERROR(mapstream)";
public static final String ERR_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(map)";
public static final String ERR_BATCH_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(batchmap)";

/**
* Converts the stack trace of an exception into a String.
*
* @param e the exception to extract the stack trace from
* @return the stack trace as a String
*/
public static String getStackTrace(Throwable t) {
if (t == null) {
return "No exception provided.";
}
StringWriter sw = new StringWriter();
t.printStackTrace(new PrintWriter(sw));
return sw.toString();
}
}
20 changes: 12 additions & 8 deletions src/main/java/io/numaproj/numaflow/sideinput/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Server(SideInputRetriever sideInputRetriever) {
/**
* constructor to create gRPC server with gRPC config.
*
* @param grpcConfig to configure the max message size for grpc
* @param grpcConfig to configure the max message size for grpc
* @param sideInputRetriever to retrieve the side input
*/
public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) {
Expand All @@ -41,7 +41,8 @@ public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) {
}

@VisibleForTesting
protected Server(GRPCConfig grpcConfig, SideInputRetriever service, ServerInterceptor interceptor, String serverName) {
protected Server(GRPCConfig grpcConfig, SideInputRetriever service, ServerInterceptor interceptor,
String serverName) {
this.grpcConfig = grpcConfig;
this.server = new GrpcServerWrapper(
interceptor,
Expand All @@ -67,8 +68,7 @@ public void start() throws Exception {

log.info(
"server started, listening on {}",
this.grpcConfig.isLocal() ?
"localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath());
this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath());

// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand All @@ -83,11 +83,14 @@ public void start() throws Exception {
}

/**
* Blocks until the server has terminated. If the server is already terminated, this method
* will return immediately. If the server is not yet terminated, this method will block the
* Blocks until the server has terminated. If the server is already terminated,
* this method
* will return immediately. If the server is not yet terminated, this method
* will block the
* calling thread until the server has terminated.
*
* @throws InterruptedException if the current thread is interrupted while waiting
* @throws InterruptedException if the current thread is interrupted while
* waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("side input server is waiting for termination");
Expand All @@ -96,7 +99,8 @@ public void awaitTermination() throws InterruptedException {
}

/**
* Stop serving requests and shutdown resources. Await termination on the main thread since the
* Stop serving requests and shutdown resources. Await termination on the main
* thread since the
* grpc library uses daemon threads.
*
* @throws InterruptedException if shutdown is interrupted
Expand Down
11 changes: 4 additions & 7 deletions src/main/java/io/numaproj/numaflow/sideinput/Service.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package io.numaproj.numaflow.sideinput;

import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.stub.StreamObserver;
import com.google.protobuf.ByteString;
import io.numaproj.numaflow.sideinput.v1.SideInputGrpc;
import io.numaproj.numaflow.sideinput.v1.Sideinput;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;


@Slf4j
@AllArgsConstructor
class Service extends SideInputGrpc.SideInputImplBase {
Expand All @@ -29,11 +28,8 @@ public void retrieveSideInput(
responseObserver);
return;
}


// process request
Message message = sideInputRetriever.retrieveSideInput();

// set response
responseObserver.onNext(buildResponse(message));
responseObserver.onCompleted();
Expand All @@ -50,8 +46,9 @@ public void isReady(Empty request, StreamObserver<Sideinput.ReadyResponse> respo

private Sideinput.SideInputResponse buildResponse(Message message) {
return Sideinput.SideInputResponse.newBuilder()
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.setValue(message.getValue() == null ? ByteString.EMPTY
: ByteString.copyFrom(
message.getValue()))
.setNoBroadcast(message.isNoBroadcast())
.build();
}
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package io.numaproj.numaflow.sinker;

import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.shared.ExceptionUtils;
import io.numaproj.numaflow.sink.v1.SinkGrpc;
import io.numaproj.numaflow.sink.v1.SinkOuterClass;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -100,6 +105,15 @@ public void onNext(SinkOuterClass.SinkRequest request) {
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asException());
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_SINK_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}

Expand Down
29 changes: 20 additions & 9 deletions src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package io.numaproj.numaflow.sourcer;

import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.shared.ExceptionUtils;
import io.numaproj.numaflow.source.v1.SourceGrpc;
import io.numaproj.numaflow.source.v1.SourceOuterClass;
import lombok.AllArgsConstructor;
Expand All @@ -15,7 +20,6 @@

import static io.numaproj.numaflow.source.v1.SourceGrpc.getPendingFnMethod;


/**
* Implementation of the gRPC service for the sourcer.
*/
Expand All @@ -31,7 +35,8 @@ class Service extends SourceGrpc.SourceImplBase {
* @param responseObserver the response observer
*/
@Override
public StreamObserver<SourceOuterClass.ReadRequest> readFn(final StreamObserver<SourceOuterClass.ReadResponse> responseObserver) {
public StreamObserver<SourceOuterClass.ReadRequest> readFn(
final StreamObserver<SourceOuterClass.ReadResponse> responseObserver) {
return new StreamObserver<>() {
private boolean handshakeDone = false;

Expand Down Expand Up @@ -80,10 +85,15 @@ public void onNext(SourceOuterClass.ReadRequest request) {
} catch (Exception e) {
log.error("Encountered error in readFn onNext", e);
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_SOURCE_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}

Expand Down Expand Up @@ -201,7 +211,8 @@ public void pendingFn(
SourceOuterClass.PendingResponse.Result
.newBuilder()
.setCount(this.sourcer.getPending())
.build()).build());
.build())
.build());
responseObserver.onCompleted();
}

Expand Down Expand Up @@ -236,8 +247,8 @@ public void partitionsFn(
responseObserver.onNext(SourceOuterClass.PartitionsResponse.newBuilder()
.setResult(
SourceOuterClass.PartitionsResponse.Result.newBuilder()
.addAllPartitions(partitions)).
build());
.addAllPartitions(partitions))
.build());
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.grpc.Status;
import com.google.protobuf.Any;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.protobuf.StatusProto;
import io.numaproj.numaflow.shared.ExceptionUtils;
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -144,10 +149,16 @@ private void handleFailure(Exception e) {
userException = e;
// only send the very first exception to the client
// one exception should trigger a container restart
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());

// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.ERR_TRANSFORMER_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
activeTransformersCount--;
}
Expand Down Expand Up @@ -217,7 +228,6 @@ public SupervisorStrategy supervisorStrategy() {
.asException());
return SupervisorStrategy.stop();
})
.build()
);
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class ServerErrTest {
Expand Down Expand Up @@ -85,9 +87,11 @@ public void testErrorFromUDF() {
outputStreamObserver.done.get();
fail("Expected exception not thrown");
} catch (InterruptedException | ExecutionException e) {
assertEquals(
"INTERNAL: java.lang.RuntimeException: unknown exception",
e.getCause().getMessage());
String expectedSubstring = "UDF_EXECUTION_ERROR(batchmap)";
String actualMessage = e.getMessage();
assertNotNull("Error message should not be null", actualMessage);
assertTrue("Expected substring '" + expectedSubstring + "' not found in error message: " + actualMessage,
actualMessage.contains(expectedSubstring));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testMapperFailure() {
fail("Expected exception not thrown");
} catch (Exception e) {
assertEquals(
"io.grpc.StatusRuntimeException: INTERNAL: unknown exception",
"io.grpc.StatusRuntimeException: INTERNAL: UDF_EXECUTION_ERROR(map): unknown exception",
e.getMessage());
}
}
Expand Down
Loading

0 comments on commit d1cdb8f

Please sign in to comment.