diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index a72a5bf3..d9764cbd 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -1,11 +1,16 @@ package io.numaproj.numaflow.batchmapper; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; import io.grpc.Status; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapGrpc; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.ExceptionUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -98,10 +103,15 @@ public void onNext(MapOuterClass.MapRequest mapRequest) { } catch (Exception e) { log.error("Encountered an error in batch map onNext", e); shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + // Build gRPC Status + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_BATCH_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(ExceptionUtils.getStackTrace(e)) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } } diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java index cb02e0b3..70234e6b 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java @@ -9,8 +9,13 @@ import akka.japi.pf.DeciderBuilder; import akka.japi.pf.ReceiveBuilder; import io.grpc.Status; +import com.google.protobuf.Any; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.ExceptionUtils; import lombok.extern.slf4j.Slf4j; import java.util.Optional; @@ -106,10 +111,15 @@ private void handleFailure(Exception e) { userException = e; // only send the very first exception to the client // one exception should trigger a container restart - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + // Build gRPC Status + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(ExceptionUtils.getStackTrace(e)) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } activeMapperCount--; } diff --git a/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java new file mode 100644 index 00000000..641d660b --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java @@ -0,0 +1,31 @@ +package io.numaproj.numaflow.shared; + +import java.io.PrintWriter; +import java.io.StringWriter; + +public class ExceptionUtils { + /** + * Formalized exception error strings + */ + public static final String ERR_SOURCE_EXCEPTION = "UDF_EXECUTION_ERROR(source)"; + public static final String ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)"; + public static final String ERR_SINK_EXCEPTION = "UDF_EXECUTION_ERROR(sink)"; + public static final String ERR_MAP_STREAM_EXCEPTION = "UDF_EXECUTION_ERROR(mapstream)"; + public static final String ERR_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(map)"; + public static final String ERR_BATCH_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(batchmap)"; + + /** + * Converts the stack trace of an exception into a String. + * + * @param e the exception to extract the stack trace from + * @return the stack trace as a String + */ + public static String getStackTrace(Throwable t) { + if (t == null) { + return "No exception provided."; + } + StringWriter sw = new StringWriter(); + t.printStackTrace(new PrintWriter(sw)); + return sw.toString(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java index acfe3297..be4394e2 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java @@ -32,7 +32,7 @@ public Server(SideInputRetriever sideInputRetriever) { /** * constructor to create gRPC server with gRPC config. * - * @param grpcConfig to configure the max message size for grpc + * @param grpcConfig to configure the max message size for grpc * @param sideInputRetriever to retrieve the side input */ public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) { @@ -41,7 +41,8 @@ public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) { } @VisibleForTesting - protected Server(GRPCConfig grpcConfig, SideInputRetriever service, ServerInterceptor interceptor, String serverName) { + protected Server(GRPCConfig grpcConfig, SideInputRetriever service, ServerInterceptor interceptor, + String serverName) { this.grpcConfig = grpcConfig; this.server = new GrpcServerWrapper( interceptor, @@ -67,8 +68,7 @@ public void start() throws Exception { log.info( "server started, listening on {}", - this.grpcConfig.isLocal() ? - "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); + this.grpcConfig.isLocal() ? "localhost:" + this.grpcConfig.getPort() : this.grpcConfig.getSocketPath()); // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -83,11 +83,14 @@ public void start() throws Exception { } /** - * Blocks until the server has terminated. If the server is already terminated, this method - * will return immediately. If the server is not yet terminated, this method will block the + * Blocks until the server has terminated. If the server is already terminated, + * this method + * will return immediately. If the server is not yet terminated, this method + * will block the * calling thread until the server has terminated. * - * @throws InterruptedException if the current thread is interrupted while waiting + * @throws InterruptedException if the current thread is interrupted while + * waiting */ public void awaitTermination() throws InterruptedException { log.info("side input server is waiting for termination"); @@ -96,7 +99,8 @@ public void awaitTermination() throws InterruptedException { } /** - * Stop serving requests and shutdown resources. Await termination on the main thread since the + * Stop serving requests and shutdown resources. Await termination on the main + * thread since the * grpc library uses daemon threads. * * @throws InterruptedException if shutdown is interrupted diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Service.java b/src/main/java/io/numaproj/numaflow/sideinput/Service.java index 18506737..b558e10a 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Service.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Service.java @@ -1,14 +1,13 @@ package io.numaproj.numaflow.sideinput; -import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import io.grpc.stub.StreamObserver; +import com.google.protobuf.ByteString; import io.numaproj.numaflow.sideinput.v1.SideInputGrpc; import io.numaproj.numaflow.sideinput.v1.Sideinput; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; - @Slf4j @AllArgsConstructor class Service extends SideInputGrpc.SideInputImplBase { @@ -29,11 +28,8 @@ public void retrieveSideInput( responseObserver); return; } - - // process request Message message = sideInputRetriever.retrieveSideInput(); - // set response responseObserver.onNext(buildResponse(message)); responseObserver.onCompleted(); @@ -50,8 +46,9 @@ public void isReady(Empty request, StreamObserver respo private Sideinput.SideInputResponse buildResponse(Message message) { return Sideinput.SideInputResponse.newBuilder() - .setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( - message.getValue())) + .setValue(message.getValue() == null ? ByteString.EMPTY + : ByteString.copyFrom( + message.getValue())) .setNoBroadcast(message.isNoBroadcast()) .build(); } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 8a21548e..7d334b18 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -1,8 +1,13 @@ package io.numaproj.numaflow.sinker; +import com.google.protobuf.Any; import com.google.protobuf.Empty; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; import io.grpc.Status; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.shared.ExceptionUtils; import io.numaproj.numaflow.sink.v1.SinkGrpc; import io.numaproj.numaflow.sink.v1.SinkOuterClass; import lombok.AllArgsConstructor; @@ -100,6 +105,15 @@ public void onNext(SinkOuterClass.SinkRequest request) { responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) .asException()); + // Build gRPC Status + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_SINK_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(ExceptionUtils.getStackTrace(e)) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index 8d7850b7..01e1aba5 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -1,8 +1,13 @@ package io.numaproj.numaflow.sourcer; +import com.google.protobuf.Any; import com.google.protobuf.Empty; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; import io.grpc.Status; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.shared.ExceptionUtils; import io.numaproj.numaflow.source.v1.SourceGrpc; import io.numaproj.numaflow.source.v1.SourceOuterClass; import lombok.AllArgsConstructor; @@ -15,7 +20,6 @@ import static io.numaproj.numaflow.source.v1.SourceGrpc.getPendingFnMethod; - /** * Implementation of the gRPC service for the sourcer. */ @@ -31,7 +35,8 @@ class Service extends SourceGrpc.SourceImplBase { * @param responseObserver the response observer */ @Override - public StreamObserver readFn(final StreamObserver responseObserver) { + public StreamObserver readFn( + final StreamObserver responseObserver) { return new StreamObserver<>() { private boolean handshakeDone = false; @@ -80,10 +85,15 @@ public void onNext(SourceOuterClass.ReadRequest request) { } catch (Exception e) { log.error("Encountered error in readFn onNext", e); shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + // Build gRPC Status + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_SOURCE_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(ExceptionUtils.getStackTrace(e)) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } } @@ -201,7 +211,8 @@ public void pendingFn( SourceOuterClass.PendingResponse.Result .newBuilder() .setCount(this.sourcer.getPending()) - .build()).build()); + .build()) + .build()); responseObserver.onCompleted(); } @@ -236,8 +247,8 @@ public void partitionsFn( responseObserver.onNext(SourceOuterClass.PartitionsResponse.newBuilder() .setResult( SourceOuterClass.PartitionsResponse.Result.newBuilder() - .addAllPartitions(partitions)). - build()); + .addAllPartitions(partitions)) + .build()); responseObserver.onCompleted(); } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java index 36335898..fd1c3b25 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java @@ -8,8 +8,13 @@ import akka.actor.SupervisorStrategy; import akka.japi.pf.DeciderBuilder; import akka.japi.pf.ReceiveBuilder; -import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.grpc.Status; +import com.google.protobuf.Any; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; +import io.grpc.protobuf.StatusProto; +import io.numaproj.numaflow.shared.ExceptionUtils; import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; import lombok.extern.slf4j.Slf4j; @@ -144,10 +149,16 @@ private void handleFailure(Exception e) { userException = e; // only send the very first exception to the client // one exception should trigger a container restart - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + + // Build gRPC Status + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_TRANSFORMER_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(ExceptionUtils.getStackTrace(e)) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } activeTransformersCount--; } @@ -217,7 +228,6 @@ public SupervisorStrategy supervisorStrategy() { .asException()); return SupervisorStrategy.stop(); }) - .build() - ); + .build()); } } diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java index 23e11a5a..fc767870 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java @@ -17,6 +17,8 @@ import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class ServerErrTest { @@ -85,9 +87,11 @@ public void testErrorFromUDF() { outputStreamObserver.done.get(); fail("Expected exception not thrown"); } catch (InterruptedException | ExecutionException e) { - assertEquals( - "INTERNAL: java.lang.RuntimeException: unknown exception", - e.getCause().getMessage()); + String expectedSubstring = "UDF_EXECUTION_ERROR(batchmap)"; + String actualMessage = e.getMessage(); + assertNotNull("Error message should not be null", actualMessage); + assertTrue("Expected substring '" + expectedSubstring + "' not found in error message: " + actualMessage, + actualMessage.contains(expectedSubstring)); } } diff --git a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java index 3fc18f4d..8fa1dece 100644 --- a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java @@ -127,7 +127,7 @@ public void testMapperFailure() { fail("Expected exception not thrown"); } catch (Exception e) { assertEquals( - "io.grpc.StatusRuntimeException: INTERNAL: unknown exception", + "io.grpc.StatusRuntimeException: INTERNAL: UDF_EXECUTION_ERROR(map): unknown exception", e.getMessage()); } } diff --git a/src/test/java/io/numaproj/numaflow/shared/ExceptionUtilsTest.java b/src/test/java/io/numaproj/numaflow/shared/ExceptionUtilsTest.java new file mode 100644 index 00000000..5dc464a9 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/shared/ExceptionUtilsTest.java @@ -0,0 +1,23 @@ +package io.numaproj.numaflow.shared; + +import org.junit.Test; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +public class ExceptionUtilsTest { + + @Test + public void testGetStackTrace_NullException() { + String result = ExceptionUtils.getStackTrace(null); + assertEquals("No exception provided.", result); + } + + @Test + public void testGetStackTrace_ValidException() { + Exception exception = new Exception("Test exception"); + String result = ExceptionUtils.getStackTrace(exception); + assertTrue(result.contains("Test exception")); + assertTrue(result.contains("ExceptionUtilsTest.testGetStackTrace_ValidException")); + } + +} diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java index 03ab6ee0..c3ce92f4 100644 --- a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java @@ -14,8 +14,9 @@ import java.util.List; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class ServerErrTest { @@ -87,9 +88,11 @@ public void testSourceTransformerFailure() { responseObserver.done.get(); fail("Expected exception not thrown"); } catch (Exception e) { - assertEquals( - "io.grpc.StatusRuntimeException: INTERNAL: unknown exception", - e.getMessage()); + String expectedSubstring = "UDF_EXECUTION_ERROR(transformer)"; + String actualMessage = e.getMessage(); + assertNotNull("Error message should not be null", actualMessage); + assertTrue("Expected substring '" + expectedSubstring + "' not found in error message: " + actualMessage, + actualMessage.contains(expectedSubstring)); } }