From 07f977b347aa181c450c2a17e8ba2a66cc1a4fce Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Thu, 11 Jan 2024 10:27:57 -0500 Subject: [PATCH] . Signed-off-by: Keran Yang --- .../numaflow/reducer/ReduceSupervisorActor.java | 4 ++++ .../io/numaproj/numaflow/reducer/ServerTest.java | 12 ++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java index 23774709..f39fd6fc 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java @@ -93,6 +93,8 @@ private void invokeActors(ReduceOuterClass.ReduceRequest reduceRequest) { ReduceOuterClass.ReduceRequest.Payload payload = reduceRequest.getPayload(); String[] keys = payload.getKeysList().toArray(new String[0]); // TODO - do we need to include window information in the keyStr? + // for aligned reducer, there is always single window. + // but at the same time, would like to be consistent with GO SDK implementation. String keyStr = String.join(Constants.DELIMITER, keys); if (!actorsMap.containsKey(keyStr)) { Reducer reduceHandler = reducerFactory.createReducer(); @@ -122,6 +124,8 @@ private void responseListener(ActorResponse actorResponse) { responseObserver.onNext(actorResponse.getResponse()); // TODO - do we need to include window information for aligned windows? + // for aligned reducer, there is always single window. + // but at the same time, would like to be consistent with GO SDK implementation. actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys())); if (actorsMap.isEmpty()) { responseObserver.onCompleted(); diff --git a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java index 4df6425a..8a83787d 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ServerTest.java @@ -17,7 +17,6 @@ import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; import io.numaproj.numaflow.shared.GrpcServerUtils; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -27,6 +26,7 @@ import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY; import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY; +import static org.junit.Assert.assertEquals; public class ServerTest { public static final Metadata.Key DATUM_METADATA_WIN_START = io.grpc.Metadata.Key.of( @@ -123,8 +123,8 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55)); while (!outputStreamObserver.completed.get()) ; - Assert.assertEquals(1, outputStreamObserver.resultDatum.get().size()); - Assert.assertEquals( + assertEquals(1, outputStreamObserver.resultDatum.get().size()); + assertEquals( expectedKeys, outputStreamObserver.resultDatum .get() @@ -132,7 +132,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ .getResult() .getKeysList() .toArray(new String[0])); - Assert.assertEquals( + assertEquals( expectedValue, outputStreamObserver.resultDatum .get() @@ -180,9 +180,9 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then while (!outputStreamObserver.completed.get()) ; List result = outputStreamObserver.resultDatum.get(); // the outputStreamObserver should have observed keyCount responses, each of which has value 55. - Assert.assertEquals(keyCount, result.size()); + assertEquals(keyCount, result.size()); result.forEach(response -> { - Assert.assertEquals(expectedValue, response.getResult().getValue()); + assertEquals(expectedValue, response.getResult().getValue()); }); } }