Skip to content

Commit

Permalink
small clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 10, 2024
1 parent 7a8f856 commit 1f20979
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
@Getter
@AllArgsConstructor
class ActorResponse {
// TODO - do we need keys? they seem already present in the ReduceResponse
// FIXME - with the latest proto update, each response has a single set of keys, hence we can remove the keys field here.
String[] keys;
ReduceOuterClass.ReduceResponse response;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ private void getResult(String eof) {
}

private ActorResponse buildActorResponse(Message message) {
// TODO(nit) - this transformation is complex, we can create a local model of ReduceResponse and handle transformation between gRPC response and ReduceResponse at a central place.
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
// set the window using the actor metadata.
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public Receive createReceive() {
private void invokeActors(ReduceOuterClass.ReduceRequest reduceRequest) {
ReduceOuterClass.ReduceRequest.Payload payload = reduceRequest.getPayload();
String[] keys = payload.getKeysList().toArray(new String[0]);
// TODO - do we need to include window information in the keyStr?
String keyStr = String.join(Constants.DELIMITER, keys);
System.out.println("kerantest keyStr: " + keyStr);
if (!actorsMap.containsKey(keyStr)) {
Reducer reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
Expand All @@ -103,7 +103,6 @@ private void invokeActors(ReduceOuterClass.ReduceRequest reduceRequest) {

HandlerDatum handlerDatum = constructHandlerDatum(payload);
actorsMap.get(keyStr).tell(handlerDatum, getSelf());
System.out.println("kerantest number of actors: " + actorsMap.size());
}

private void sendEOF(String EOF) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public void addMessage(String[] keys, Datum datum, Metadata md) {

@Override
public MessageList getOutput(String[] keys, Metadata md) {
System.out.println("returning number: " + sum);
String[] updatedKeys = Arrays
.stream(keys)
.map(c -> c + "-processed")
Expand Down
2 changes: 0 additions & 2 deletions src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
try {
t.join();
} catch (InterruptedException e) {
// the thread should always finish successfully with test assertion passing.
// if not, fail the test.
fail("Thread got interrupted before test assertion.");
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/numaproj/numaflow/reducer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
.reduceFn(outputStreamObserver);

// send messages with 3 different keys
// send messages with keyCount different keys
for (int j = 0; j < keyCount; j++) {
for (int i = 1; i <= 10; i++) {
ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest
Expand Down

0 comments on commit 1f20979

Please sign in to comment.