diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index 852892cd5c..8eb83f4dc2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -119,9 +119,8 @@ public void doCheckpoint(CheckpointState checkpointState) { @Override public boolean isEmpty() { final boolean areTopicsEmpty = emptyCheckingConsumers.stream() - .allMatch(KafkaCustomConsumer::isEmpty); + .allMatch(KafkaCustomConsumer::isTopicEmpty); - // TODO: check Kafka topic is empty as well. return areTopicsEmpty && innerBuffer.isEmpty(); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 6687bc363e..ee68e64c4c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -531,7 +531,7 @@ final String getTopicPartitionOffset(final Map offsetMap, return Objects.isNull(offset) ? "-" : offset.toString(); } - public boolean isEmpty() { + public boolean isTopicEmpty() { final List partitions = consumer.partitionsFor(topicName); final List topicPartitions = partitions.stream() .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition())) @@ -543,8 +543,6 @@ public boolean isEmpty() { for (TopicPartition topicPartition : topicPartitions) { final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition); final Long endOffset = endOffsets.get(topicPartition); - LOG.info("Partition {} offsets: endOffset: {}, committedOffset: {}", - topicPartition, endOffset, Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset()); // If there is data in the partition if (endOffset != 0L) { diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 6747ab4894..64fb4c1681 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -25,6 +25,8 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaBufferConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer; +import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer; import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory; import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker; @@ -33,6 +35,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.Random; import java.util.UUID; @@ -55,7 +58,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBuffer.EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT; @@ -103,6 +108,12 @@ class KafkaBufferTest { @Mock KafkaCustomProducer producer; + @Mock + private KafkaCustomConsumerFactory consumerFactory; + + @Mock + private KafkaCustomConsumer consumer; + @Mock BlockingBuffer> blockingBuffer; @@ -110,7 +121,10 @@ class KafkaBufferTest { private AwsCredentialsSupplier awsCredentialsSupplier; public KafkaBuffer> createObjectUnderTest() { + return createObjectUnderTest(List.of(consumer)); + } + public KafkaBuffer> createObjectUnderTest(final List consumers) { try ( final MockedStatic executorsMockedStatic = mockStatic(Executors.class); final MockedConstruction producerFactoryMock = @@ -118,6 +132,11 @@ public KafkaBuffer> createObjectUnderTest() { producerFactory = mock; when(producerFactory.createProducer(any() ,any(), any(), isNull(), isNull())).thenReturn(producer); }); + final MockedConstruction consumerFactoryMock = + mockConstruction(KafkaCustomConsumerFactory.class, (mock, context) -> { + consumerFactory = mock; + when(consumerFactory.createConsumersForTopic(any(), any(), any(), any(), any(), any(), any())).thenReturn(consumers); + }); final MockedConstruction blockingBufferMock = mockConstruction(BlockingBuffer.class, (mock, context) -> { blockingBuffer = mock; @@ -206,12 +225,100 @@ void test_kafkaBuffer_doWriteAll() throws Exception { } @Test - void test_kafkaBuffer_isEmpty() { + void test_kafkaBuffer_isEmpty_True() { + kafkaBuffer = createObjectUnderTest(); + assertTrue(Objects.nonNull(kafkaBuffer)); + when(blockingBuffer.isEmpty()).thenReturn(true); + when(consumer.isTopicEmpty()).thenReturn(true); + + final boolean result = kafkaBuffer.isEmpty(); + assertThat(result, equalTo(true)); + + verify(blockingBuffer).isEmpty(); + verify(consumer).isTopicEmpty(); + } + + @Test + void test_kafkaBuffer_isEmpty_BufferNotEmpty() { + kafkaBuffer = createObjectUnderTest(); + assertTrue(Objects.nonNull(kafkaBuffer)); + when(blockingBuffer.isEmpty()).thenReturn(false); + when(consumer.isTopicEmpty()).thenReturn(true); + + final boolean result = kafkaBuffer.isEmpty(); + assertThat(result, equalTo(false)); + + verify(blockingBuffer).isEmpty(); + verify(consumer).isTopicEmpty(); + } + + @Test + void test_kafkaBuffer_isEmpty_TopicNotEmpty() { kafkaBuffer = createObjectUnderTest(); assertTrue(Objects.nonNull(kafkaBuffer)); + when(blockingBuffer.isEmpty()).thenReturn(true); + when(consumer.isTopicEmpty()).thenReturn(false); + + final boolean result = kafkaBuffer.isEmpty(); + assertThat(result, equalTo(false)); + + verifyNoInteractions(blockingBuffer); + verify(consumer).isTopicEmpty(); + } + + @Test + void test_kafkaBuffer_isEmpty_MultipleTopics_AllNotEmpty() { + kafkaBuffer = createObjectUnderTest(List.of(consumer, consumer)); + assertTrue(Objects.nonNull(kafkaBuffer)); + when(blockingBuffer.isEmpty()).thenReturn(true); + when(consumer.isTopicEmpty()).thenReturn(false).thenReturn(false); + + final boolean result = kafkaBuffer.isEmpty(); + assertThat(result, equalTo(false)); + + verifyNoInteractions(blockingBuffer); + verify(consumer).isTopicEmpty(); + } + + @Test + void test_kafkaBuffer_isEmpty_MultipleTopics_SomeNotEmpty() { + kafkaBuffer = createObjectUnderTest(List.of(consumer, consumer)); + assertTrue(Objects.nonNull(kafkaBuffer)); + when(blockingBuffer.isEmpty()).thenReturn(true); + when(consumer.isTopicEmpty()).thenReturn(true).thenReturn(false); + + final boolean result = kafkaBuffer.isEmpty(); + assertThat(result, equalTo(false)); + + verifyNoInteractions(blockingBuffer); + verify(consumer, times(2)).isTopicEmpty(); + } + + @Test + void test_kafkaBuffer_isEmpty_MultipleTopics_AllEmpty() { + kafkaBuffer = createObjectUnderTest(List.of(consumer, consumer)); + assertTrue(Objects.nonNull(kafkaBuffer)); + when(blockingBuffer.isEmpty()).thenReturn(true); + when(consumer.isTopicEmpty()).thenReturn(true).thenReturn(true); + + final boolean result = kafkaBuffer.isEmpty(); + assertThat(result, equalTo(true)); + + verify(blockingBuffer).isEmpty(); + verify(consumer, times(2)).isTopicEmpty(); + } + + @Test + void test_kafkaBuffer_isEmpty_ZeroTopics() { + kafkaBuffer = createObjectUnderTest(Collections.emptyList()); + assertTrue(Objects.nonNull(kafkaBuffer)); + when(blockingBuffer.isEmpty()).thenReturn(true); + + final boolean result = kafkaBuffer.isEmpty(); + assertThat(result, equalTo(true)); - kafkaBuffer.isEmpty(); verify(blockingBuffer).isEmpty(); + verifyNoInteractions(consumer); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index 0d443e4413..8f2a4918f5 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -14,6 +14,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.junit.jupiter.api.Assertions; @@ -42,26 +43,38 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class KafkaCustomConsumerTest { + private static final String TOPIC_NAME = "topic1"; + private static final Random RANDOM = new Random(); @Mock private KafkaConsumer kafkaConsumer; @@ -79,6 +92,11 @@ public class KafkaCustomConsumerTest { @Mock private TopicConfig topicConfig; + @Mock + private PartitionInfo partitionInfo; + @Mock + private OffsetAndMetadata offsetAndMetadata; + @Mock private KafkaTopicMetrics topicMetrics; @@ -141,7 +159,7 @@ public void setUp() { sourceConfig = mock(KafkaSourceConfig.class); buffer = getBuffer(); shutdownInProgress = new AtomicBoolean(false); - when(topicConfig.getName()).thenReturn("topic1"); + when(topicConfig.getName()).thenReturn(TOPIC_NAME); } public KafkaCustomConsumer createObjectUnderTest(String schemaType, boolean acknowledgementsEnabled) { @@ -455,6 +473,146 @@ public void testAwsGlueErrorWithAcknowledgements() throws Exception { }); } + @Test + public void isTopicEmpty_OnePartition_IsEmpty() { + final Long offset = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(1); + + consumer = createObjectUnderTest("json", true); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); + when(partitionInfo.partition()).thenReturn(0); + when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); + when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(getTopicPartitionToMap(topicPartitions, offset)); + when(offsetAndMetadata.offset()).thenReturn(offset); + + assertThat(consumer.isTopicEmpty(), equalTo(true)); + + verify(kafkaConsumer).partitionsFor(TOPIC_NAME); + verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); + verify(kafkaConsumer).endOffsets(topicPartitions); + verify(partitionInfo).partition(); + verify(offsetAndMetadata).offset(); + } + + @Test + public void isTopicEmpty_OnePartition_PartitionNeverHadData() { + final Long offset = 0L; + final List topicPartitions = buildTopicPartitions(1); + + consumer = createObjectUnderTest("json", true); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); + when(partitionInfo.partition()).thenReturn(0); + when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); + when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(getTopicPartitionToMap(topicPartitions, offset)); + when(offsetAndMetadata.offset()).thenReturn(offset - 1); + + assertThat(consumer.isTopicEmpty(), equalTo(true)); + + verify(kafkaConsumer).partitionsFor(TOPIC_NAME); + verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); + verify(kafkaConsumer).endOffsets(topicPartitions); + verify(partitionInfo).partition(); + } + + @Test + public void isTopicEmpty_OnePartition_IsNotEmpty() { + final Long offset = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(1); + + consumer = createObjectUnderTest("json", true); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); + when(partitionInfo.partition()).thenReturn(0); + when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); + when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(getTopicPartitionToMap(topicPartitions, offset)); + when(offsetAndMetadata.offset()).thenReturn(offset - 1); + + assertThat(consumer.isTopicEmpty(), equalTo(false)); + + verify(kafkaConsumer).partitionsFor(TOPIC_NAME); + verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); + verify(kafkaConsumer).endOffsets(topicPartitions); + verify(partitionInfo).partition(); + verify(offsetAndMetadata).offset(); + } + + @Test + public void isTopicEmpty_OnePartition_NoCommittedPartition() { + final Long offset = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(1); + + consumer = createObjectUnderTest("json", true); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); + when(partitionInfo.partition()).thenReturn(0); + when(kafkaConsumer.committed(anySet())).thenReturn(Collections.emptyMap()); + when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(getTopicPartitionToMap(topicPartitions, offset)); + + assertThat(consumer.isTopicEmpty(), equalTo(false)); + + verify(kafkaConsumer).partitionsFor(TOPIC_NAME); + verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); + verify(kafkaConsumer).endOffsets(topicPartitions); + verify(partitionInfo).partition(); + } + + @Test + public void isTopicEmpty_MultiplePartitions_AllEmpty() { + final Long offset1 = RANDOM.nextLong(); + final Long offset2 = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(2); + + consumer = createObjectUnderTest("json", true); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo, partitionInfo)); + when(partitionInfo.partition()).thenReturn(0).thenReturn(1); + when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); + final Map endOffsets = getTopicPartitionToMap(topicPartitions, offset1); + endOffsets.put(topicPartitions.get(1), offset2); + when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(endOffsets); + when(offsetAndMetadata.offset()).thenReturn(offset1).thenReturn(offset2); + + assertThat(consumer.isTopicEmpty(), equalTo(true)); + + verify(kafkaConsumer).partitionsFor(TOPIC_NAME); + verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); + verify(kafkaConsumer).endOffsets(topicPartitions); + verify(partitionInfo, times(2)).partition(); + verify(offsetAndMetadata, times(2)).offset(); + } + + @Test + public void isTopicEmpty_MultiplePartitions_OneNotEmpty() { + final Long offset1 = RANDOM.nextLong(); + final Long offset2 = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(2); + + consumer = createObjectUnderTest("json", true); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo, partitionInfo)); + when(partitionInfo.partition()).thenReturn(0).thenReturn(1); + when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); + final Map endOffsets = getTopicPartitionToMap(topicPartitions, offset1); + endOffsets.put(topicPartitions.get(1), offset2); + when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(endOffsets); + when(offsetAndMetadata.offset()).thenReturn(offset1).thenReturn(offset2 - 1); + + assertThat(consumer.isTopicEmpty(), equalTo(false)); + + verify(kafkaConsumer).partitionsFor(TOPIC_NAME); + verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); + verify(kafkaConsumer).endOffsets(topicPartitions); + verify(partitionInfo, times(2)).partition(); + verify(offsetAndMetadata, times(2)).offset(); + } + + private List buildTopicPartitions(final int partitionCount) { + return IntStream.range(0, partitionCount) + .mapToObj(i -> new TopicPartition(TOPIC_NAME, i)) + .collect(Collectors.toList()); + } + + private Map getTopicPartitionToMap(final List topicPartitions, final T value) { + return topicPartitions.stream() + .collect(Collectors.toMap(i -> i, i -> value)); + } + private ConsumerRecords createPlainTextRecords(String topic, final long startOffset) { Map> records = new HashMap<>(); ConsumerRecord record1 = new ConsumerRecord<>(topic, testPartition, startOffset, testKey1, testValue1);