diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index 2d76dd488a..7989f5dd8d 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.protobuf.ByteString; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; @@ -27,6 +28,8 @@ import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; + +import org.opensearch.dataprepper.plugins.kafka.util.TestProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,7 +286,8 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep @Nested class Encrypted { - private Cipher cipher; + private Cipher decryptCipher; + private Cipher encryptCipher; @BeforeEach void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException { @@ -291,8 +295,10 @@ void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPadding aesKeyGenerator.init(256); final SecretKey secretKey = aesKeyGenerator.generateKey(); - cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.DECRYPT_MODE, secretKey); + decryptCipher = Cipher.getInstance("AES"); + decryptCipher.init(Cipher.DECRYPT_MODE, secretKey); + encryptCipher = Cipher.getInstance("AES"); + encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey); final byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded()); final String aesKey = new String(base64Bytes); @@ -359,7 +365,7 @@ void write_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() throws T assertThat(innerData, notNullValue()); assertThrows(JsonParseException.class, () -> objectMapper.readValue(innerData, Map.class)); - final byte[] deserializedBytes = cipher.doFinal(innerData); + final byte[] deserializedBytes = decryptCipher.doFinal(innerData); final Map actualEventData = objectMapper.readValue(deserializedBytes, Map.class); assertThat(actualEventData, notNullValue()); @@ -401,10 +407,47 @@ void writeBytes_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() thr assertThat(innerData, notNullValue()); assertThat(innerData, not(equalTo(writtenBytes))); - final byte[] decryptedBytes = cipher.doFinal(innerData); + final byte[] decryptedBytes = decryptCipher.doFinal(innerData); assertThat(decryptedBytes, equalTo(writtenBytes)); } + + @Test + void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeException, BadPaddingException { + final KafkaBuffer objectUnderTest = createObjectUnderTest(); + final TestProducer testProducer = new TestProducer(bootstrapServersCommaDelimited, topicName); + + final Record record = createRecord(); + final byte[] unencryptedBytes = record.getData().toJsonString().getBytes(); + final byte[] encryptedBytes = encryptCipher.doFinal(unencryptedBytes); + + final KafkaBufferMessage.BufferData bufferedData = KafkaBufferMessage.BufferData.newBuilder() + .setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES) + .setData(ByteString.copyFrom(encryptedBytes)) + .build(); + + final byte[] unencryptedKeyBytes = createRandomBytes(); + final byte[] encryptedKeyBytes = encryptCipher.doFinal(unencryptedKeyBytes); + + final KafkaBufferMessage.BufferData keyData = KafkaBufferMessage.BufferData.newBuilder() + .setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES) + .setData(ByteString.copyFrom(encryptedKeyBytes)) + .build(); + + testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray()); + + final Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + + final Record onlyResult = readResult.getKey().stream().iterator().next(); + + assertThat(onlyResult, notNullValue()); + assertThat(onlyResult.getData(), notNullValue()); + assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap())); + } } private byte[] createRandomBytes() { diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/util/TestProducer.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/util/TestProducer.java new file mode 100644 index 0000000000..54bb30e59c --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/util/TestProducer.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.util; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import java.util.Properties; +import java.util.UUID; + +/** + * Utility class to produce data to Kafka to help with testing. + */ +public class TestProducer { + private final Producer kafkaProducer; + private final String topicName; + + public TestProducer(final String bootstrapServersCommaDelimited, final String topicName) { + this.topicName = topicName; + final String testGroupId = UUID.randomUUID().toString(); + final Properties kafkaProperties = new Properties(); + kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersCommaDelimited); + kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId); + kafkaProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + kafkaProducer = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer()); + kafkaProducer.flush(); + } + + /** + * Publishes a single record to Kafka. + * + * @param key The key as a byte[] + * @param value The value as a byte[] + */ + public void publishRecord(final byte[] key, final byte[] value) { + final ProducerRecord producerRecord = new ProducerRecord<>(topicName, key, value); + kafkaProducer.send(producerRecord); + kafkaProducer.flush(); + } +}