Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka buffer integration test to verify that data in the buffer is correctly read #3973

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -27,6 +28,8 @@
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -283,16 +286,19 @@ void writeBytes_puts_correctly_formatted_data_in_protobuf_wrapper() throws Excep

@Nested
class Encrypted {
private Cipher cipher;
private Cipher decryptCipher;
private Cipher encryptCipher;

@BeforeEach
void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException {
final KeyGenerator aesKeyGenerator = KeyGenerator.getInstance("AES");
aesKeyGenerator.init(256);
final SecretKey secretKey = aesKeyGenerator.generateKey();

cipher = Cipher.getInstance("AES");
cipher.init(Cipher.DECRYPT_MODE, secretKey);
decryptCipher = Cipher.getInstance("AES");
decryptCipher.init(Cipher.DECRYPT_MODE, secretKey);
encryptCipher = Cipher.getInstance("AES");
encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey);
final byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded());
final String aesKey = new String(base64Bytes);

Expand Down Expand Up @@ -359,7 +365,7 @@ void write_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() throws T
assertThat(innerData, notNullValue());
assertThrows(JsonParseException.class, () -> objectMapper.readValue(innerData, Map.class));

final byte[] deserializedBytes = cipher.doFinal(innerData);
final byte[] deserializedBytes = decryptCipher.doFinal(innerData);

final Map<String, Object> actualEventData = objectMapper.readValue(deserializedBytes, Map.class);
assertThat(actualEventData, notNullValue());
Expand Down Expand Up @@ -401,10 +407,47 @@ void writeBytes_puts_correctly_formatted_and_encrypted_data_in_Kafka_topic() thr
assertThat(innerData, notNullValue());
assertThat(innerData, not(equalTo(writtenBytes)));

final byte[] decryptedBytes = cipher.doFinal(innerData);
final byte[] decryptedBytes = decryptCipher.doFinal(innerData);

assertThat(decryptedBytes, equalTo(writtenBytes));
}

@Test
void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeException, BadPaddingException {
final KafkaBuffer objectUnderTest = createObjectUnderTest();
final TestProducer testProducer = new TestProducer(bootstrapServersCommaDelimited, topicName);

final Record<Event> record = createRecord();
final byte[] unencryptedBytes = record.getData().toJsonString().getBytes();
final byte[] encryptedBytes = encryptCipher.doFinal(unencryptedBytes);

final KafkaBufferMessage.BufferData bufferedData = KafkaBufferMessage.BufferData.newBuilder()
.setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)
.setData(ByteString.copyFrom(encryptedBytes))
.build();

final byte[] unencryptedKeyBytes = createRandomBytes();
final byte[] encryptedKeyBytes = encryptCipher.doFinal(unencryptedKeyBytes);

final KafkaBufferMessage.BufferData keyData = KafkaBufferMessage.BufferData.newBuilder()
.setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)
.setData(ByteString.copyFrom(encryptedKeyBytes))
.build();

testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray());

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(1));

final Record<Event> onlyResult = readResult.getKey().stream().iterator().next();

assertThat(onlyResult, notNullValue());
assertThat(onlyResult.getData(), notNullValue());
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
}
}

private byte[] createRandomBytes() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.util;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.util.Properties;
import java.util.UUID;

/**
* Utility class to produce data to Kafka to help with testing.
*/
public class TestProducer {
private final Producer<byte[], byte[]> kafkaProducer;
private final String topicName;

public TestProducer(final String bootstrapServersCommaDelimited, final String topicName) {
this.topicName = topicName;
final String testGroupId = UUID.randomUUID().toString();
final Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersCommaDelimited);
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId);
kafkaProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProducer = new KafkaProducer<>(kafkaProperties, new ByteArraySerializer(), new ByteArraySerializer());
kafkaProducer.flush();
}

/**
* Publishes a single record to Kafka.
*
* @param key The key as a byte[]
* @param value The value as a byte[]
*/
public void publishRecord(final byte[] key, final byte[] value) {
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topicName, key, value);
kafkaProducer.send(producerRecord);
kafkaProducer.flush();
}
}
Loading