From cd6fd96ccf41fe82ba9cf4d3432f70ce38671f79 Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Wed, 5 Feb 2025 21:29:35 +0530 Subject: [PATCH 1/5] chore: formalize gRPC error for transformer Signed-off-by: adarsh0728 --- .../TransformSupervisorActor.java | 41 +++++++++++++++---- .../sourcetransformer/ServerErrTest.java | 11 +++-- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java index ec37b4ed..d91b2558 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java @@ -8,11 +8,17 @@ import akka.actor.SupervisorStrategy; import akka.japi.pf.DeciderBuilder; import akka.japi.pf.ReceiveBuilder; -import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.grpc.Status; +import com.google.protobuf.Any; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; +import io.grpc.protobuf.StatusProto; import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; import lombok.extern.slf4j.Slf4j; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -49,6 +55,7 @@ class TransformSupervisorActor extends AbstractActor { private final CompletableFuture shutdownSignal; private int activeTransformersCount; private Exception userException; + private static final String ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)"; /** * Constructor for TransformSupervisorActor. @@ -136,19 +143,38 @@ public Receive createReceive() { * @param e The exception to be handled. */ private void handleFailure(Exception e) { - log.error("Encountered error in sourceTransformFn - {}", e.getMessage()); + String stackTrace = getStackTrace(e); + log.error("Exception in sourceTransformFn: {} {}", e.getMessage(), stackTrace); if (userException == null) { userException = e; // only send the very first exception to the client // one exception should trigger a container restart - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + + // Build gRPC Status [error] + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ERR_TRANSFORMER_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(stackTrace) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } activeTransformersCount--; } + /** + * Converts the stack trace of an exception into a String. + * + * @param e the exception to extract the stack trace from + * @return the stack trace as a String + */ + private String getStackTrace(Exception e) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + return sw.toString(); + } + /** * Sends the SourceTransformResponse back to the client. * @@ -214,7 +240,6 @@ public SupervisorStrategy supervisorStrategy() { .asException()); return SupervisorStrategy.stop(); }) - .build() - ); + .build()); } } diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java index 03ab6ee0..c3ce92f4 100644 --- a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java @@ -14,8 +14,9 @@ import java.util.List; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class ServerErrTest { @@ -87,9 +88,11 @@ public void testSourceTransformerFailure() { responseObserver.done.get(); fail("Expected exception not thrown"); } catch (Exception e) { - assertEquals( - "io.grpc.StatusRuntimeException: INTERNAL: unknown exception", - e.getMessage()); + String expectedSubstring = "UDF_EXECUTION_ERROR(transformer)"; + String actualMessage = e.getMessage(); + assertNotNull("Error message should not be null", actualMessage); + assertTrue("Expected substring '" + expectedSubstring + "' not found in error message: " + actualMessage, + actualMessage.contains(expectedSubstring)); } } From fd1f19a1c3930b2914580e3787e3fe12c50dccae Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Thu, 6 Feb 2025 10:56:56 +0530 Subject: [PATCH 2/5] handle error case for source exception Signed-off-by: adarsh0728 --- .../numaflow/shared/ExceptionUtils.java | 24 ++++++++++++++ .../io/numaproj/numaflow/sourcer/Service.java | 32 +++++++++++++------ .../TransformSupervisorActor.java | 20 ++---------- 3 files changed, 49 insertions(+), 27 deletions(-) create mode 100644 src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java diff --git a/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java new file mode 100644 index 00000000..20a065e7 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java @@ -0,0 +1,24 @@ +package io.numaproj.numaflow.shared; + +import java.io.PrintWriter; +import java.io.StringWriter; + +public class ExceptionUtils { + /** + * Formalized exception error strings + */ + public static final String ERR_SOURCE_EXCEPTION = "UDF_EXECUTION_ERROR(source)"; + public static final String ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)"; + + /** + * Converts the stack trace of an exception into a String. + * + * @param e the exception to extract the stack trace from + * @return the stack trace as a String + */ + public static String getStackTrace(Exception e) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + return sw.toString(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index c65d3c66..559da750 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -1,8 +1,13 @@ package io.numaproj.numaflow.sourcer; +import com.google.protobuf.Any; import com.google.protobuf.Empty; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; import io.grpc.Status; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.shared.ExceptionUtils; import io.numaproj.numaflow.source.v1.SourceGrpc; import io.numaproj.numaflow.source.v1.SourceOuterClass; import lombok.AllArgsConstructor; @@ -15,7 +20,6 @@ import static io.numaproj.numaflow.source.v1.SourceGrpc.getPendingFnMethod; - /** * Implementation of the gRPC service for the sourcer. */ @@ -31,7 +35,8 @@ class Service extends SourceGrpc.SourceImplBase { * @param responseObserver the response observer */ @Override - public StreamObserver readFn(final StreamObserver responseObserver) { + public StreamObserver readFn( + final StreamObserver responseObserver) { return new StreamObserver<>() { private boolean handshakeDone = false; @@ -77,12 +82,18 @@ public void onNext(SourceOuterClass.ReadRequest request) { responseObserver.onNext(response); } catch (Exception e) { - log.error("Encountered error in readFn onNext - {}", e.getMessage()); + String stackTrace = ExceptionUtils.getStackTrace(e); + log.error("Encountered error in readFn onNext - {} {}", e.getMessage(), stackTrace); shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + // Build gRPC Status [error] + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_SOURCE_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(stackTrace) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } } @@ -200,7 +211,8 @@ public void pendingFn( SourceOuterClass.PendingResponse.Result .newBuilder() .setCount(this.sourcer.getPending()) - .build()).build()); + .build()) + .build()); responseObserver.onCompleted(); } @@ -235,8 +247,8 @@ public void partitionsFn( responseObserver.onNext(SourceOuterClass.PartitionsResponse.newBuilder() .setResult( SourceOuterClass.PartitionsResponse.Result.newBuilder() - .addAllPartitions(partitions)). - build()); + .addAllPartitions(partitions)) + .build()); responseObserver.onCompleted(); } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java index d91b2558..76194b80 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java @@ -14,11 +14,10 @@ import com.google.rpc.Code; import com.google.rpc.DebugInfo; import io.grpc.protobuf.StatusProto; +import io.numaproj.numaflow.shared.ExceptionUtils; import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; import lombok.extern.slf4j.Slf4j; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -55,7 +54,6 @@ class TransformSupervisorActor extends AbstractActor { private final CompletableFuture shutdownSignal; private int activeTransformersCount; private Exception userException; - private static final String ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)"; /** * Constructor for TransformSupervisorActor. @@ -143,7 +141,7 @@ public Receive createReceive() { * @param e The exception to be handled. */ private void handleFailure(Exception e) { - String stackTrace = getStackTrace(e); + String stackTrace = ExceptionUtils.getStackTrace(e); log.error("Exception in sourceTransformFn: {} {}", e.getMessage(), stackTrace); if (userException == null) { userException = e; @@ -153,7 +151,7 @@ private void handleFailure(Exception e) { // Build gRPC Status [error] com.google.rpc.Status status = com.google.rpc.Status.newBuilder() .setCode(Code.INTERNAL.getNumber()) - .setMessage(ERR_TRANSFORMER_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .setMessage(ExceptionUtils.ERR_TRANSFORMER_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) .addDetails(Any.pack(DebugInfo.newBuilder() .setDetail(stackTrace) .build())) @@ -163,18 +161,6 @@ private void handleFailure(Exception e) { activeTransformersCount--; } - /** - * Converts the stack trace of an exception into a String. - * - * @param e the exception to extract the stack trace from - * @return the stack trace as a String - */ - private String getStackTrace(Exception e) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - return sw.toString(); - } - /** * Sends the SourceTransformResponse back to the client. * From 3da64a032a143132a5c5ffba2989b59198f1abda Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Thu, 6 Feb 2025 17:38:29 +0530 Subject: [PATCH 3/5] update: tests and gRPC errors for other components Signed-off-by: adarsh0728 --- .../numaflow/batchmapper/Service.java | 21 ++++++++++++++----- .../numaflow/mapper/MapSupervisorActor.java | 21 ++++++++++++++----- .../numaflow/mapstreamer/Service.java | 21 ++++++++++++++----- .../numaflow/shared/ExceptionUtils.java | 5 +++++ .../io/numaproj/numaflow/sinker/Service.java | 18 ++++++++++++++-- .../numaflow/batchmapper/ServerErrTest.java | 10 ++++++--- .../numaflow/mapper/ServerErrTest.java | 2 +- .../numaflow/mapstreamer/ServerErrTest.java | 2 +- 8 files changed, 78 insertions(+), 22 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index 0d320a19..90c9093c 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -1,11 +1,16 @@ package io.numaproj.numaflow.batchmapper; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; import io.grpc.Status; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapGrpc; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.ExceptionUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -96,12 +101,18 @@ public void onNext(MapOuterClass.MapRequest mapRequest) { datumStream.writeMessage(constructHandlerDatum(mapRequest)); } } catch (Exception e) { - log.error("Encountered an error in batch map onNext - {}", e.getMessage()); + String stackTrace = ExceptionUtils.getStackTrace(e); + log.error("Exception in batch map onNext - {} {}", e.getMessage(), stackTrace); shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + // Build gRPC Status [error] + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_BATCH_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(stackTrace) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } } diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java index 3ce663bd..f209a04f 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java @@ -9,8 +9,13 @@ import akka.japi.pf.DeciderBuilder; import akka.japi.pf.ReceiveBuilder; import io.grpc.Status; +import com.google.protobuf.Any; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.ExceptionUtils; import lombok.extern.slf4j.Slf4j; import java.util.Optional; @@ -98,15 +103,21 @@ public Receive createReceive() { } private void handleFailure(Exception e) { - log.error("Encountered error in mapFn - {}", e.getMessage()); + String stackTrace = ExceptionUtils.getStackTrace(e); + log.error("Exception in mapFn - {} {}", e.getMessage(), stackTrace); if (userException == null) { userException = e; // only send the very first exception to the client // one exception should trigger a container restart - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + // Build gRPC Status [error] + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(stackTrace) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } activeMapperCount--; } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java index a0961439..ecb0c1ba 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java @@ -1,10 +1,15 @@ package io.numaproj.numaflow.mapstreamer; +import com.google.protobuf.Any; import com.google.protobuf.Empty; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; import io.grpc.Status; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapGrpc; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.ExceptionUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -59,12 +64,18 @@ public void onNext(MapOuterClass.MapRequest request) { constructHandlerDatum(request), new OutputObserverImpl(responseObserver)); } catch (Exception e) { - log.error("Encountered error in mapFn onNext - {}", e.getMessage()); + String stackTrace = ExceptionUtils.getStackTrace(e); + log.error("Exception in mapStreamFn onNext - {} {}", e.getMessage(), stackTrace); shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asException()); + // Build gRPC Status [error] + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_MAP_STREAM_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(stackTrace) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); return; } diff --git a/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java index 20a065e7..6f698b36 100644 --- a/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java +++ b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java @@ -9,6 +9,11 @@ public class ExceptionUtils { */ public static final String ERR_SOURCE_EXCEPTION = "UDF_EXECUTION_ERROR(source)"; public static final String ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)"; + public static final String ERR_SINK_EXCEPTION = "UDF_EXECUTION_ERROR(sink)"; + public static final String ERR_MAP_STREAM_EXCEPTION = "UDF_EXECUTION_ERROR(mapstream)"; + public static final String ERR_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(map)"; + public static final String ERR_BATCH_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(batchmap)"; + /** * Converts the stack trace of an exception into a String. diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 95f105b8..d360519d 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -1,8 +1,13 @@ package io.numaproj.numaflow.sinker; +import com.google.protobuf.Any; import com.google.protobuf.Empty; +import com.google.rpc.Code; +import com.google.rpc.DebugInfo; import io.grpc.Status; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.shared.ExceptionUtils; import io.numaproj.numaflow.sink.v1.SinkGrpc; import io.numaproj.numaflow.sink.v1.SinkOuterClass; import lombok.AllArgsConstructor; @@ -95,9 +100,18 @@ public void onNext(SinkOuterClass.SinkRequest request) { datumStream.writeMessage(constructHandlerDatum(request)); } } catch (Exception e) { - log.error("Encountered error in sinkFn onNext - {}", e.getMessage()); + String stackTrace = ExceptionUtils.getStackTrace(e); + log.error("Exception in sinkFn onNext - {} {}", e.getMessage(), stackTrace); shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException()); + // Build gRPC Status [error] + com.google.rpc.Status status = com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(ExceptionUtils.ERR_SINK_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) + .addDetails(Any.pack(DebugInfo.newBuilder() + .setDetail(stackTrace) + .build())) + .build(); + responseObserver.onError(StatusProto.toStatusRuntimeException(status)); } } diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java index 23e11a5a..fc767870 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java @@ -17,6 +17,8 @@ import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class ServerErrTest { @@ -85,9 +87,11 @@ public void testErrorFromUDF() { outputStreamObserver.done.get(); fail("Expected exception not thrown"); } catch (InterruptedException | ExecutionException e) { - assertEquals( - "INTERNAL: java.lang.RuntimeException: unknown exception", - e.getCause().getMessage()); + String expectedSubstring = "UDF_EXECUTION_ERROR(batchmap)"; + String actualMessage = e.getMessage(); + assertNotNull("Error message should not be null", actualMessage); + assertTrue("Expected substring '" + expectedSubstring + "' not found in error message: " + actualMessage, + actualMessage.contains(expectedSubstring)); } } diff --git a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java index 3fc18f4d..8fa1dece 100644 --- a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java @@ -127,7 +127,7 @@ public void testMapperFailure() { fail("Expected exception not thrown"); } catch (Exception e) { assertEquals( - "io.grpc.StatusRuntimeException: INTERNAL: unknown exception", + "io.grpc.StatusRuntimeException: INTERNAL: UDF_EXECUTION_ERROR(map): unknown exception", e.getMessage()); } } diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java index 43355d04..47d5da94 100644 --- a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java @@ -79,7 +79,7 @@ public void TestMapStreamerErr() { fail("Should have thrown an exception"); } catch (Exception e) { assertEquals( - "io.grpc.StatusRuntimeException: INTERNAL: unknown exception", + "io.grpc.StatusRuntimeException: INTERNAL: UDF_EXECUTION_ERROR(mapstream): unknown exception", e.getMessage()); } } From 42972db5c7afee3eee45f49bdda6094560ee3834 Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Sun, 9 Feb 2025 18:19:47 +0530 Subject: [PATCH 4/5] revert sideInput case, handle in sep PR Signed-off-by: adarsh0728 --- .../numaflow/batchmapper/Service.java | 2 +- .../numaflow/mapper/MapSupervisorActor.java | 2 +- .../numaflow/mapstreamer/Service.java | 2 +- .../numaproj/numaflow/sideinput/Server.java | 26 ++------------- .../numaproj/numaflow/sideinput/Service.java | 33 +++---------------- .../io/numaproj/numaflow/sinker/Service.java | 2 +- .../io/numaproj/numaflow/sourcer/Service.java | 2 +- .../TransformSupervisorActor.java | 2 +- 8 files changed, 13 insertions(+), 58 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index 6df94f96..d9764cbd 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -103,7 +103,7 @@ public void onNext(MapOuterClass.MapRequest mapRequest) { } catch (Exception e) { log.error("Encountered an error in batch map onNext", e); shutdownSignal.completeExceptionally(e); - // Build gRPC Status [error] + // Build gRPC Status com.google.rpc.Status status = com.google.rpc.Status.newBuilder() .setCode(Code.INTERNAL.getNumber()) .setMessage(ExceptionUtils.ERR_BATCH_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java index 153bc06a..4d0cfb2d 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java @@ -111,7 +111,7 @@ private void handleFailure(Exception e) { userException = e; // only send the very first exception to the client // one exception should trigger a container restart - // Build gRPC Status [error] + // Build gRPC Status com.google.rpc.Status status = com.google.rpc.Status.newBuilder() .setCode(Code.INTERNAL.getNumber()) .setMessage(ExceptionUtils.ERR_MAP_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java index 7ca76522..2302245f 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java @@ -66,7 +66,7 @@ public void onNext(MapOuterClass.MapRequest request) { } catch (Exception e) { log.error("Encountered error in mapFn onNext", e); shutdownSignal.completeExceptionally(e); - // Build gRPC Status [error] + // Build gRPC Status com.google.rpc.Status status = com.google.rpc.Status.newBuilder() .setCode(Code.INTERNAL.getNumber()) .setMessage(ExceptionUtils.ERR_MAP_STREAM_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java index ce02bcc1..be4394e2 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java @@ -10,8 +10,6 @@ import io.numaproj.numaflow.shared.GrpcServerWrapper; import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.CompletableFuture; - /** * Server is the gRPC server for retrieving side input. */ @@ -19,7 +17,6 @@ public class Server { private final GRPCConfig grpcConfig; - private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private final GrpcServerWrapper server; @@ -39,20 +36,18 @@ public Server(SideInputRetriever sideInputRetriever) { * @param sideInputRetriever to retrieve the side input */ public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) { - this.shutdownSignal = new CompletableFuture<>(); this.grpcConfig = grpcConfig; - this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sideInputRetriever, this.shutdownSignal)); + this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sideInputRetriever)); } @VisibleForTesting protected Server(GRPCConfig grpcConfig, SideInputRetriever service, ServerInterceptor interceptor, String serverName) { - this.shutdownSignal = new CompletableFuture<>(); this.grpcConfig = grpcConfig; this.server = new GrpcServerWrapper( interceptor, serverName, - new Service(service, this.shutdownSignal)); + new Service(service)); } /** @@ -77,8 +72,7 @@ public void start() throws Exception { // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Use stderr here since the logger may have been reset by its JVM shutdown - // hook. + // Use stderr here since the logger may have been reset by its JVM shutdown hook. try { this.stop(); } catch (InterruptedException e) { @@ -86,20 +80,6 @@ public void start() throws Exception { e.printStackTrace(System.err); } })); - - // if there are any exceptions, shutdown the server gracefully. - this.shutdownSignal.whenCompleteAsync((v, e) -> { // Add this block - if (e != null) { - System.err.println( - "*** shutting down side input gRPC server because of an exception - " + e.getMessage()); - try { - this.stop(); - } catch (InterruptedException ex) { - Thread.interrupted(); - ex.printStackTrace(System.err); - } - } - }); } /** diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Service.java b/src/main/java/io/numaproj/numaflow/sideinput/Service.java index b9c62f19..d503ec39 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Service.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Service.java @@ -1,10 +1,6 @@ package io.numaproj.numaflow.sideinput; -import com.google.protobuf.Any; import com.google.protobuf.Empty; -import com.google.rpc.Code; -import com.google.rpc.DebugInfo; -import io.grpc.protobuf.StatusProto; import io.numaproj.numaflow.shared.ExceptionUtils; import io.grpc.stub.StreamObserver; import com.google.protobuf.ByteString; @@ -13,14 +9,11 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.CompletableFuture; - @Slf4j @AllArgsConstructor class Service extends SideInputGrpc.SideInputImplBase { private final SideInputRetriever sideInputRetriever; - private final CompletableFuture shutdownSignal; /** * Invokes the side input retriever to retrieve side input. @@ -36,28 +29,10 @@ public void retrieveSideInput( responseObserver); return; } - try { - // process request - Message message = sideInputRetriever.retrieveSideInput(); - // set response - responseObserver.onNext(buildResponse(message)); - - } catch (Exception e) { - log.error("Encountered error in retrieveSideInput", e); - shutdownSignal.completeExceptionally(e); - // Build gRPC Status [error] - com.google.rpc.Status status = com.google.rpc.Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage( - ExceptionUtils.ERR_SIDE_INPUT_EXCEPTION + ": " - + (e.getMessage() != null ? e.getMessage() : "")) - .addDetails(Any.pack(DebugInfo.newBuilder() - .setDetail(ExceptionUtils.getStackTrace(e)) - .build())) - .build(); - responseObserver.onError(StatusProto.toStatusRuntimeException(status)); - return; - } + // process request + Message message = sideInputRetriever.retrieveSideInput(); + // set response + responseObserver.onNext(buildResponse(message)); responseObserver.onCompleted(); } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index ac4bd90c..7d334b18 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -105,7 +105,7 @@ public void onNext(SinkOuterClass.SinkRequest request) { responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) .asException()); - // Build gRPC Status [error] + // Build gRPC Status com.google.rpc.Status status = com.google.rpc.Status.newBuilder() .setCode(Code.INTERNAL.getNumber()) .setMessage(ExceptionUtils.ERR_SINK_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index 7ad1b3b5..01e1aba5 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -85,7 +85,7 @@ public void onNext(SourceOuterClass.ReadRequest request) { } catch (Exception e) { log.error("Encountered error in readFn onNext", e); shutdownSignal.completeExceptionally(e); - // Build gRPC Status [error] + // Build gRPC Status com.google.rpc.Status status = com.google.rpc.Status.newBuilder() .setCode(Code.INTERNAL.getNumber()) .setMessage(ExceptionUtils.ERR_SOURCE_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java index 1b7093e9..fd1c3b25 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java @@ -150,7 +150,7 @@ private void handleFailure(Exception e) { // only send the very first exception to the client // one exception should trigger a container restart - // Build gRPC Status [error] + // Build gRPC Status com.google.rpc.Status status = com.google.rpc.Status.newBuilder() .setCode(Code.INTERNAL.getNumber()) .setMessage(ExceptionUtils.ERR_TRANSFORMER_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : "")) From 60a3386653d743c911e396c74444eec7f7ae2bb1 Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Sun, 9 Feb 2025 20:36:41 +0530 Subject: [PATCH 5/5] test for stacktrace fn Signed-off-by: adarsh0728 --- .../numaflow/shared/ExceptionUtils.java | 1 - .../numaproj/numaflow/sideinput/Service.java | 1 - .../numaflow/shared/ExceptionUtilsTest.java | 23 +++++++++++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) create mode 100644 src/test/java/io/numaproj/numaflow/shared/ExceptionUtilsTest.java diff --git a/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java index fd53b207..641d660b 100644 --- a/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java +++ b/src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java @@ -13,7 +13,6 @@ public class ExceptionUtils { public static final String ERR_MAP_STREAM_EXCEPTION = "UDF_EXECUTION_ERROR(mapstream)"; public static final String ERR_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(map)"; public static final String ERR_BATCH_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(batchmap)"; - public static final String ERR_SIDE_INPUT_EXCEPTION = "UDF_EXECUTION_ERROR(sideinput)"; /** * Converts the stack trace of an exception into a String. diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Service.java b/src/main/java/io/numaproj/numaflow/sideinput/Service.java index d503ec39..b558e10a 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Service.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Service.java @@ -1,7 +1,6 @@ package io.numaproj.numaflow.sideinput; import com.google.protobuf.Empty; -import io.numaproj.numaflow.shared.ExceptionUtils; import io.grpc.stub.StreamObserver; import com.google.protobuf.ByteString; import io.numaproj.numaflow.sideinput.v1.SideInputGrpc; diff --git a/src/test/java/io/numaproj/numaflow/shared/ExceptionUtilsTest.java b/src/test/java/io/numaproj/numaflow/shared/ExceptionUtilsTest.java new file mode 100644 index 00000000..5dc464a9 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/shared/ExceptionUtilsTest.java @@ -0,0 +1,23 @@ +package io.numaproj.numaflow.shared; + +import org.junit.Test; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +public class ExceptionUtilsTest { + + @Test + public void testGetStackTrace_NullException() { + String result = ExceptionUtils.getStackTrace(null); + assertEquals("No exception provided.", result); + } + + @Test + public void testGetStackTrace_ValidException() { + Exception exception = new Exception("Test exception"); + String result = ExceptionUtils.getStackTrace(exception); + assertTrue(result.contains("Test exception")); + assertTrue(result.contains("ExceptionUtilsTest.testGetStackTrace_ValidException")); + } + +}