Skip to content

Commit

Permalink
Adds ConsumerRecord offset to the event metadata fields when reading …
Browse files Browse the repository at this point in the history
…from Kafka.

Signed-off-by: Josh Crean <[email protected]>
Signed-off-by: Josh Crean <[email protected]>
  • Loading branch information
Josh Crean committed Jan 16, 2025
1 parent 2196382 commit 025a9bd
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
eventMetadata.setAttribute("kafka_timestamp_type", consumerRecord.timestampType().toString());
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));
eventMetadata.setAttribute("kafka_offset", consumerRecord.offset());
eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp));
event.getEventHandle().setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,41 @@ public void testBufferOverflowPauseResume() throws InterruptedException, Excepti
Assertions.assertNotNull(event.getEventHandle().getExternalOriginationTime());
}
}

@Test
public void testKafkaMetadata() {
String topic = topicConfig.getName();
consumerRecords = createPlainTextRecords(topic, 0L);
when(kafkaConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
consumer = createObjectUnderTest("plaintext", false);

try {
consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testPartition)));
consumer.consumeRecords();
} catch (Exception e){}
final Map.Entry<Collection<Record<Event>>, CheckpointState> bufferRecords = buffer.read(1000);
ArrayList<Record<Event>> bufferedRecords = new ArrayList<>(bufferRecords.getKey());
Assertions.assertEquals(consumerRecords.count(), bufferedRecords.size());
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = consumer.getOffsetsToCommit();
Assertions.assertEquals(offsetsToCommit.size(), 1);
offsetsToCommit.forEach((topicPartition, offsetAndMetadata) -> {
Assertions.assertEquals(topicPartition.partition(), testPartition);
Assertions.assertEquals(topicPartition.topic(), topic);
Assertions.assertEquals(offsetAndMetadata.offset(), 2L);
});
Assertions.assertEquals(consumer.getNumRecordsCommitted(), 2L);

for (int offset = 0; offset < bufferedRecords.size(); offset++) {
Record<Event> record = bufferedRecords.get(offset);
Event event = record.getData();
Assertions.assertNotNull(event.getMetadata().getAttribute("kafka_timestamp"));
Assertions.assertNotNull(event.getMetadata().getAttribute("kafka_timestamp_type"));
Assertions.assertEquals(TOPIC_NAME, event.getMetadata().getAttribute("kafka_topic"));
Assertions.assertEquals(String.valueOf(testPartition), event.getMetadata().getAttribute("kafka_partition"));
Assertions.assertEquals(Long.valueOf(offset), event.getMetadata().getAttribute("kafka_offset"));
}
}

@Test
public void testPlainTextConsumeRecords() throws InterruptedException {
String topic = topicConfig.getName();
Expand Down

0 comments on commit 025a9bd

Please sign in to comment.