From 2daf34c59ca4933092adb7628b95ce6ffe700c2e Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 14 Dec 2023 08:40:37 +0530 Subject: [PATCH] chore: adding partitions for user defined source (#88) Signed-off-by: Yashash H L --- .../examples/source/simple/SimpleSource.java | 14 +++++++--- .../io/numaproj/numaflow/sinker/Sinker.java | 1 + .../io/numaproj/numaflow/sourcer/Offset.java | 14 ++++++++-- .../io/numaproj/numaflow/sourcer/Service.java | 21 +++++++++++++++ .../io/numaproj/numaflow/sourcer/Sourcer.java | 27 +++++++++++++++++++ src/main/proto/source/v1/source.proto | 17 +++++++++++- .../numaflow/sourcer/ServerErrTest.java | 7 +++++ .../numaproj/numaflow/sourcer/ServerTest.java | 11 +++++++- 8 files changed, 104 insertions(+), 8 deletions(-) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java index fffc73ab..575f302c 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.examples.source.simple; +import com.google.common.primitives.Longs; import io.numaproj.numaflow.sourcer.AckRequest; import io.numaproj.numaflow.sourcer.Message; import io.numaproj.numaflow.sourcer.Offset; @@ -8,8 +9,8 @@ import io.numaproj.numaflow.sourcer.Server; import io.numaproj.numaflow.sourcer.Sourcer; -import java.nio.ByteBuffer; import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -41,9 +42,9 @@ public void read(ReadRequest request, OutputObserver observer) { return; } // create a message with increasing offset - Offset offset = new Offset(ByteBuffer.allocate(4).putLong(readIndex).array(), "0"); + Offset offset = new Offset(Longs.toByteArray(readIndex)); Message message = new Message( - ByteBuffer.allocate(4).putLong(readIndex).array(), + Long.toString(readIndex).getBytes(), offset, Instant.now()); // send the message to the observer @@ -58,7 +59,7 @@ public void read(ReadRequest request, OutputObserver observer) { public void ack(AckRequest request) { // remove the acknowledged messages from the map for (Offset offset : request.getOffsets()) { - messages.remove(ByteBuffer.wrap(offset.getValue()).getLong()); + messages.remove(Longs.fromByteArray(offset.getValue())); } } @@ -67,4 +68,9 @@ public long getPending() { // pending messages will be zero for a simple source return 0; } + + @Override + public List getPartitions() { + return Sourcer.defaultPartitions(); + } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Sinker.java b/src/main/java/io/numaproj/numaflow/sinker/Sinker.java index 644d0db3..075cdec3 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Sinker.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Sinker.java @@ -14,6 +14,7 @@ public abstract class Sinker { * response list should be returned. * * @param datumStream stream of messages to be processed + * @return response list */ public abstract ResponseList processMessages(DatumIterator datumStream); } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Offset.java b/src/main/java/io/numaproj/numaflow/sourcer/Offset.java index 60c6df31..03adc15a 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Offset.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Offset.java @@ -10,7 +10,7 @@ @Setter public class Offset { private final byte[] value; - private final String partitionId; + private final Integer partitionId; /** * used to create Offset with value and partitionId. @@ -18,8 +18,18 @@ public class Offset { * @param value offset value * @param partitionId offset partitionId */ - public Offset(byte[] value, String partitionId) { + public Offset(byte[] value, Integer partitionId) { this.value = value; this.partitionId = partitionId; } + + /** + * used to create Offset with value and default partitionId. + * + * @param value offset value + */ + public Offset(byte[] value) { + this.value = value; + this.partitionId = Sourcer.defaultPartitions().get(0); + } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index 38253252..e63ea969 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -122,4 +122,25 @@ public void isReady( responseObserver.onNext(SourceOuterClass.ReadyResponse.newBuilder().setReady(true).build()); responseObserver.onCompleted(); } + + @Override + public void partitionsFn( + Empty request, + StreamObserver responseObserver) { + + if (this.sourcer == null) { + io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall( + getPendingFnMethod(), + responseObserver); + return; + } + + List partitions = this.sourcer.getPartitions(); + responseObserver.onNext(SourceOuterClass.PartitionsResponse.newBuilder() + .setResult( + SourceOuterClass.PartitionsResponse.Result.newBuilder() + .addAllPartitions(partitions)). + build()); + responseObserver.onCompleted(); + } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java b/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java index ecb9064c..26b262c2 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java @@ -1,5 +1,8 @@ package io.numaproj.numaflow.sourcer; +import java.util.Collections; +import java.util.List; + /** * Sourcer exposes method for reading messages from source. * Implementations should override the read method which will be used @@ -30,4 +33,28 @@ public abstract class Sourcer { * @return number of pending messages */ public abstract long getPending(); + + /** + * method returns the partitions associated with the source, will be used by the platform to determine + * the partitions to which the watermark should be published. If the source doesn't have partitions, + * `defaultPartitions()` can be used to return the default partitions. + * In most cases, the defaultPartitions() should be enough; the cases where we need to implement custom getPartitions() + * is in a case like Kafka, where a reader can read from multiple Kafka partitions. + * + * @return list of partitions + */ + public abstract List getPartitions(); + + /** + * method returns default partitions for the source. + * It can be used in the getPartitions() function of the Sourcer interface only + * if the source doesn't have partitions. DefaultPartition will be the pod replica + * index of the source. + * + * @return list of partitions + */ + public static List defaultPartitions() { + String partition = System.getenv().getOrDefault("NUMAFLOW_REPLICA", "0"); + return Collections.singletonList(Integer.parseInt(partition)); + } } diff --git a/src/main/proto/source/v1/source.proto b/src/main/proto/source/v1/source.proto index e2ec042b..b757fc1e 100644 --- a/src/main/proto/source/v1/source.proto +++ b/src/main/proto/source/v1/source.proto @@ -23,6 +23,9 @@ service Source { // PendingFn returns the number of pending records at the user defined source. rpc PendingFn(google.protobuf.Empty) returns (PendingResponse); + // PartitionsFn returns the list of partitions for the user defined source. + rpc PartitionsFn(google.protobuf.Empty) returns (PartitionsResponse); + // IsReady is the heartbeat endpoint for user defined source gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); } @@ -120,6 +123,18 @@ message PendingResponse { Result result = 1; } +/* + * PartitionsResponse is the response for the partitions request. + */ +message PartitionsResponse { + message Result { + // Required field holding the list of partitions. + repeated int32 partitions = 1; + } + // Required field holding the result. + Result result = 1; +} + /* * Offset is the offset of the datum. */ @@ -132,5 +147,5 @@ message Offset { // Optional partition_id indicates which partition of the source the datum belongs to. // It is useful for sources that have multiple partitions. e.g. Kafka. // If the partition_id is not specified, it is assumed that the source has a single partition. - string partition_id = 2; + int32 partition_id = 2; } diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java index 9ad142bf..9eb347a5 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerErrTest.java @@ -18,6 +18,8 @@ import org.junit.Rule; import org.junit.Test; +import java.util.List; + import static org.junit.Assert.assertEquals; public class ServerErrTest { @@ -124,6 +126,11 @@ public void ack(AckRequest request) { } + @Override + public List getPartitions() { + return Sourcer.defaultPartitions(); + } + @Override public long getPending() { throw new RuntimeException("unknown exception"); diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java index 05fce365..03f9a0e7 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java @@ -92,6 +92,10 @@ public void TestSourcer() { var pending = stub.pendingFn(Empty.newBuilder().build()); assertEquals(5, pending.getResult().getCount()); + var partitions = stub.partitionsFn(Empty.newBuilder().build()); + assertEquals(1, partitions.getResult().getPartitionsCount()); + assertEquals(0, partitions.getResult().getPartitions(0)); + // ack the 5 messages var ackResponse = stub.ackFn(ackRequestBuilder.build()); assertEquals(Empty.newBuilder().build(), ackResponse.getResult().getSuccess()); @@ -137,7 +141,7 @@ public TestSourcer() { for (int i = 0; i < 10; i++) { messages.add(new Message( ByteBuffer.allocate(4).putInt(i).array(), - new Offset(ByteBuffer.allocate(4).putInt(i).array(), "0"), + new Offset(ByteBuffer.allocate(4).putInt(i).array(), 0), eventTime )); eventTime = eventTime.plusMillis(1000L); @@ -159,6 +163,11 @@ public void read(ReadRequest request, OutputObserver observer) { } } + @Override + public List getPartitions() { + return Sourcer.defaultPartitions(); + } + @Override public void ack(AckRequest request) { for (Offset offset : request.getOffsets()) {