From 4e06b8dd351a4c9c3b3f2809d034bc0f04034746 Mon Sep 17 00:00:00 2001 From: Josh Crean Date: Thu, 16 Jan 2025 10:02:34 -0500 Subject: [PATCH] Let Exception bubble out of consumeRecords() Signed-off-by: Josh Crean Signed-off-by: Josh Crean --- .../plugins/kafka/consumer/KafkaCustomConsumerTest.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index 69ab72810a..ef6b2fbab2 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -274,16 +274,13 @@ public void testBufferOverflowPauseResume() throws InterruptedException, Excepti } @Test - public void testKafkaMetadata() { + public void testKafkaMetadata() throws Exception { String topic = topicConfig.getName(); consumerRecords = createPlainTextRecords(topic, 0L); when(kafkaConsumer.poll(any(Duration.class))).thenReturn(consumerRecords); consumer = createObjectUnderTest("plaintext", false); - - try { - consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testPartition))); - consumer.consumeRecords(); - } catch (Exception e){} + consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testPartition))); + consumer.consumeRecords(); final Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); ArrayList> bufferedRecords = new ArrayList<>(bufferRecords.getKey()); Assertions.assertEquals(consumerRecords.count(), bufferedRecords.size());