diff --git a/src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java b/src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java index 57b1af8f..3833ef75 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java @@ -10,7 +10,7 @@ @Getter @AllArgsConstructor class ActorResponse { - // TODO - do we need keys? they seem already present in the ReduceResponse + // FIXME - with the latest proto update, each response has a single set of keys, hence we can remove the keys field here. String[] keys; ReduceOuterClass.ReduceResponse response; } diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java index 5b4eb3a2..0d6e045b 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java @@ -50,7 +50,6 @@ private void getResult(String eof) { } private ActorResponse buildActorResponse(Message message) { - // TODO(nit) - this transformation is complex, we can create a local model of ReduceResponse and handle transformation between gRPC response and ReduceResponse at a central place. ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder(); // set the window using the actor metadata. responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder() diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java index 198a7b2e..23774709 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java @@ -92,8 +92,8 @@ public Receive createReceive() { private void invokeActors(ReduceOuterClass.ReduceRequest reduceRequest) { ReduceOuterClass.ReduceRequest.Payload payload = reduceRequest.getPayload(); String[] keys = payload.getKeysList().toArray(new String[0]); + // TODO - do we need to include window information in the keyStr? String keyStr = String.join(Constants.DELIMITER, keys); - System.out.println("kerantest keyStr: " + keyStr); if (!actorsMap.containsKey(keyStr)) { Reducer reduceHandler = reducerFactory.createReducer(); ActorRef actorRef = getContext() @@ -103,7 +103,6 @@ private void invokeActors(ReduceOuterClass.ReduceRequest reduceRequest) { HandlerDatum handlerDatum = constructHandlerDatum(payload); actorsMap.get(keyStr).tell(handlerDatum, getSelf()); - System.out.println("kerantest number of actors: " + actorsMap.size()); } private void sendEOF(String EOF) { diff --git a/src/test/java/io/numaproj/numaflow/reducer/ReduceTestFactory.java b/src/test/java/io/numaproj/numaflow/reducer/ReduceTestFactory.java index ee79e880..f983a221 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ReduceTestFactory.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ReduceTestFactory.java @@ -19,7 +19,6 @@ public void addMessage(String[] keys, Datum datum, Metadata md) { @Override public MessageList getOutput(String[] keys, Metadata md) { - System.out.println("returning number: " + sum); String[] updatedKeys = Arrays .stream(keys) .map(c -> c + "-processed") diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java index 4c2772cb..33f42655 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java @@ -135,8 +135,6 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab try { t.join(); } catch (InterruptedException e) { - // the thread should always finish successfully with test assertion passing. - // if not, fail the test. fail("Thread got interrupted before test assertion."); } } diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java index 95174923..4df6425a 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java @@ -158,7 +158,7 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) .reduceFn(outputStreamObserver); - // send messages with 3 different keys + // send messages with keyCount different keys for (int j = 0; j < keyCount; j++) { for (int i = 1; i <= 10; i++) { ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest