From 22d319d1166776483fc62087ac2cd2f3a604be49 Mon Sep 17 00:00:00 2001 From: David Venable Date: Wed, 17 Jan 2024 14:30:08 -0600 Subject: [PATCH] Adds a new integration test for the Kafka buffer which verifies that data written is correctly read and decrypted. This work will be used to verify upcoming changes to the Protobuf model. Signed-off-by: David Venable --- .../plugins/kafka/buffer/KafkaBufferIT.java | 53 +++++++++++++++++-- .../plugins/kafka/util/TestProducer.java | 46 ++++++++++++++++ 2 files changed, 94 insertions(+), 5 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/util/TestProducer.java diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index 2d76dd488a..7989f5dd8d 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.protobuf.ByteString; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; @@ -27,6 +28,8 @@ import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; + +import org.opensearch.dataprepper.plugins.kafka.util.TestProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,7 +286,8 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep @Nested class Encrypted { - private Cipher cipher; + private Cipher decryptCipher; + private Cipher encryptCipher; @BeforeEach void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException { @@ -291,8 +295,10 @@ void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPadding aesKeyGenerator.init(256); final SecretKey secretKey = aesKeyGenerator.generateKey(); - cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.DECRYPT_MODE, secretKey); + decryptCipher = Cipher.getInstance("AES"); + decryptCipher.init(Cipher.DECRYPT_MODE, secretKey); + encryptCipher = Cipher.getInstance("AES"); + encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey); final byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded()); final String aesKey = new String(base64Bytes); @@ -359,7 +365,7 @@ void write_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() throws T assertThat(innerData, notNullValue()); assertThrows(JsonParseException.class, () -> objectMapper.readValue(innerData, Map.class)); - final byte[] deserializedBytes = cipher.doFinal(innerData); + final byte[] deserializedBytes = decryptCipher.doFinal(innerData); final Map actualEventData = objectMapper.readValue(deserializedBytes, Map.class); assertThat(actualEventData, notNullValue()); @@ -401,10 +407,47 @@ void writeBytes_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() thr assertThat(innerData, notNullValue()); assertThat(innerData, not(equalTo(writtenBytes))); - final byte[] decryptedBytes = cipher.doFinal(innerData); + final byte[] decryptedBytes = decryptCipher.doFinal(innerData); assertThat(decryptedBytes, equalTo(writtenBytes)); } + + @Test + void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeException, BadPaddingException { + final KafkaBuffer objectUnderTest = createObjectUnderTest(); + final TestProducer testProducer = new TestProducer(bootstrapServersCommaDelimited, topicName); + + final Record record = createRecord(); + final byte[] unencryptedBytes = record.getData().toJsonString().getBytes(); + final byte[] encryptedBytes = encryptCipher.doFinal(unencryptedBytes); + + final KafkaBufferMessage.BufferData bufferedData = KafkaBufferMessage.BufferData.newBuilder() + .setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES) + .setData(ByteString.copyFrom(encryptedBytes)) + .build(); + + final byte[] unencryptedKeyBytes = createRandomBytes(); + final byte[] encryptedKeyBytes = encryptCipher.doFinal(unencryptedKeyBytes); + + final KafkaBufferMessage.BufferData keyData = KafkaBufferMessage.BufferData.newBuilder() + .setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES) + .setData(ByteString.copyFrom(encryptedKeyBytes)) + .build(); + + testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray()); + + final Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + + final Record onlyResult = readResult.getKey().stream().iterator().next(); + + assertThat(onlyResult, notNullValue()); + assertThat(onlyResult.getData(), notNullValue()); + assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap())); + } } private byte[] createRandomBytes() { diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/util/TestProducer.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/util/TestProducer.java new file mode 100644 index 0000000000..54bb30e59c --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/util/TestProducer.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import java.util.Properties; +import java.util.UUID; + +/** + * Utility class to produce data to Kafka to help with testing. + */ +public class TestProducer { + private final Producer kafkaProducer; + private final String topicName; + + public TestProducer(final String bootstrapServersCommaDelimited, final String topicName) { + this.topicName = topicName; + final String testGroupId = UUID.randomUUID().toString(); + final Properties kafkaProperties = new Properties(); + kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersCommaDelimited); + kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId); + kafkaProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + kafkaProducer = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer()); + kafkaProducer.flush(); + } + + /** + * Publishes a single record to Kafka. + * + * @param key The key as a byte[] + * @param value The value as a byte[] + */ + public void publishRecord(final byte[] key, final byte[] value) { + final ProducerRecord producerRecord = new ProducerRecord<>(topicName, key, value); + kafkaProducer.send(producerRecord); + kafkaProducer.flush(); + } +}