Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes a bug where the Kafka buffer inverted the relationship for the create_topic configuration #4114

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void setUp() {
}

private KafkaBuffer createObjectUnderTest() {
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, null, null, null);
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, null, null);
}

@Test
Expand Down Expand Up @@ -159,7 +159,7 @@ void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldExcep
final Map<String, Object> topicConfigMap = Map.of(
"name", topicName,
"group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6),
"create_topic", false
"create_topic", true
);
final Map<String, Object> bufferConfigMap = Map.of(
"topics", List.of(topicConfigMap),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private void validateMetric(Event event) {

@Test
void test_otel_metrics_with_kafka_buffer() throws Exception {
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, new OTelMetricDecoder(), null, null);
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new OTelMetricDecoder(), null, null);
buffer = new KafkaDelegatingBuffer(kafkaBuffer);
final ExportMetricsServiceRequest request = createExportMetricsServiceRequest();
buffer.writeBytes(request.toByteArray(), null, 10_000);
Expand Down Expand Up @@ -367,7 +367,7 @@ private void validateLog(OpenTelemetryLog logRecord) throws Exception {

@Test
void test_otel_logs_with_kafka_buffer() throws Exception {
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, new OTelLogsDecoder(), null, null);
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new OTelLogsDecoder(), null, null);
buffer = new KafkaDelegatingBuffer(kafkaBuffer);
final ExportLogsServiceRequest request = createExportLogsRequest();
buffer.writeBytes(request.toByteArray(), null, 10_000);
Expand Down Expand Up @@ -453,7 +453,7 @@ private void validateSpan(Span span) throws Exception {

@Test
void test_otel_traces_with_kafka_buffer() throws Exception {
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, new OTelTraceDecoder(), null, null);
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new OTelTraceDecoder(), null, null);
buffer = new KafkaDelegatingBuffer(kafkaBuffer);
final ExportTraceServiceRequest request = createExportTraceRequest();
buffer.writeBytes(request.toByteArray(), null, 10_000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void setUp() {
}

private KafkaBuffer createObjectUnderTest() {
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, null, ignored -> DefaultCredentialsProvider.create(), null);
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, ignored -> DefaultCredentialsProvider.create(), null);
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.apache.kafka.common.errors.RecordTooLargeException;
Expand All @@ -31,6 +30,7 @@
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory;
import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -62,17 +62,17 @@ public class KafkaBuffer extends AbstractBuffer<Record<Event>> {
private ByteDecoder byteDecoder;

@DataPrepperPluginConstructor
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory,
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig,
final AcknowledgementSetManager acknowledgementSetManager,
final ByteDecoder byteDecoder, final AwsCredentialsSupplier awsCredentialsSupplier,
final CircuitBreaker circuitBreaker) {
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()), pluginSetting.getPipelineName());
final SerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory());
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier);
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, new TopicServiceFactory());
this.byteDecoder = byteDecoder;
final String metricPrefixName = kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName());
final PluginMetrics producerMetrics = PluginMetrics.fromNames(metricPrefixName + WRITE, pluginSetting.getPipelineName());
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null, producerMetrics, null, false);
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, null, null, producerMetrics, null, false);
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier);
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
this.shutdownInProgress = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter;
Expand All @@ -22,12 +20,12 @@
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;

import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
import org.opensearch.dataprepper.plugins.kafka.service.TopicService;
import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics;
Expand All @@ -43,13 +41,18 @@ public class KafkaCustomProducerFactory {
private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumerFactory.class);
private final SerializationFactory serializationFactory;
private final AwsCredentialsSupplier awsCredentialsSupplier;
private final TopicServiceFactory topicServiceFactory;

public KafkaCustomProducerFactory(final SerializationFactory serializationFactory, AwsCredentialsSupplier awsCredentialsSupplier) {
public KafkaCustomProducerFactory(
final SerializationFactory serializationFactory,
final AwsCredentialsSupplier awsCredentialsSupplier,
final TopicServiceFactory topicServiceFactory) {
this.serializationFactory = serializationFactory;
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.topicServiceFactory = topicServiceFactory;
}

public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig, final PluginFactory pluginFactory, final PluginSetting pluginSetting,
public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig,
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics,
final DLQSink dlqSink,
final boolean topicNameInMetrics) {
Expand Down Expand Up @@ -102,8 +105,8 @@ private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig

private void checkTopicCreationCriteriaAndCreateTopic(final KafkaProducerConfig kafkaProducerConfig, final Integer maxRequestSize) {
final TopicProducerConfig topic = kafkaProducerConfig.getTopic();
if (!topic.isCreateTopic()) {
final TopicService topicService = new TopicService(kafkaProducerConfig);
if (topic.isCreateTopic()) {
final TopicService topicService = topicServiceFactory.createTopicService(kafkaProducerConfig);
Long maxMessageBytes = null;
if (maxRequestSize != null) {
maxMessageBytes = Long.valueOf(maxRequestSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class TopicService {
private final AdminClient adminClient;


public TopicService(final KafkaProducerConfig kafkaProducerConfig) {
TopicService(final KafkaProducerConfig kafkaProducerConfig) {
this.adminClient = AdminClient.create(SinkPropertyConfigurer.getPropertiesForAdminClient(kafkaProducerConfig));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.service;

import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;

public class TopicServiceFactory {
public TopicService createTopicService(final KafkaProducerConfig kafkaProducerConfig) {
return new TopicService(kafkaProducerConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker;
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
import org.opensearch.dataprepper.plugins.kafka.service.TopicService;
import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory;
import org.opensearch.dataprepper.plugins.kafka.util.RestUtils;
//import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.slf4j.Logger;
Expand All @@ -50,6 +51,7 @@ public class KafkaSink extends AbstractSink<Record<Event>> {

private final KafkaSinkConfig kafkaSinkConfig;
private final KafkaCustomProducerFactory kafkaCustomProducerFactory;
private final TopicServiceFactory topicServiceFactory;

private volatile boolean sinkInitialized;

Expand Down Expand Up @@ -86,7 +88,8 @@ public KafkaSink(final PluginSetting pluginSetting, final KafkaSinkConfig kafkaS
this.sinkContext = sinkContext;

SerializationFactory serializationFactory = new CommonSerializationFactory();
kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier);
topicServiceFactory = new TopicServiceFactory();
kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, topicServiceFactory);

}

Expand Down Expand Up @@ -154,7 +157,7 @@ private void prepareTopicAndSchema() {
private void checkTopicCreationCriteriaAndCreateTopic() {
final TopicProducerConfig topic = kafkaSinkConfig.getTopic();
if (topic.isCreateTopic()) {
final TopicService topicService = new TopicService(kafkaSinkConfig);
final TopicService topicService = topicServiceFactory.createTopicService(kafkaSinkConfig);
topicService.createTopic(kafkaSinkConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor(), null);
topicService.closeAdminClient();
}
Expand All @@ -164,7 +167,7 @@ private void checkTopicCreationCriteriaAndCreateTopic() {

private KafkaCustomProducer createProducer() {
final DLQSink dlqSink = new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting);
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, pluginFactory, pluginSetting, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, true);
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, true);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public KafkaBuffer createObjectUnderTest(final List<KafkaCustomConsumer> consume
final MockedConstruction<KafkaCustomProducerFactory> producerFactoryMock =
mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> {
producerFactory = mock;
when(producerFactory.createProducer(any() ,any(), any(), isNull(), isNull(), any(), any(), anyBoolean())).thenReturn(producer);
when(producerFactory.createProducer(any(), isNull(), isNull(), any(), any(), anyBoolean())).thenReturn(producer);
});
final MockedConstruction<KafkaCustomConsumerFactory> consumerFactoryMock =
mockConstruction(KafkaCustomConsumerFactory.class, (mock, context) -> {
Expand All @@ -152,7 +152,7 @@ public KafkaBuffer createObjectUnderTest(final List<KafkaCustomConsumer> consume
})) {

executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executorService);
return new KafkaBuffer(pluginSetting, bufferConfig, pluginFactory, acknowledgementSetManager, null, awsCredentialsSupplier, circuitBreaker);
return new KafkaBuffer(pluginSetting, bufferConfig, acknowledgementSetManager, null, awsCredentialsSupplier, circuitBreaker);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.service.TopicService;
import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;

import java.util.Collections;
import java.util.Random;
import java.util.UUID;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class KafkaCustomProducerFactoryTest {
private static final Random RANDOM = new Random();
@Mock
private SerializationFactory serializationFactory;
@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;
@Mock
private TopicServiceFactory topicServiceFactory;

@Mock
private KafkaProducerConfig kafkaProducerConfig;
@Mock
private ExpressionEvaluator expressionEvaluator;
@Mock
private SinkContext sinkContext;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private DLQSink dlqSink;

@Mock
private TopicProducerConfig topicProducerConfig;
@Mock
private EncryptionConfig encryptionConfig;

@BeforeEach
void setUp() {
when(kafkaProducerConfig.getTopic()).thenReturn(topicProducerConfig);
when(kafkaProducerConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
when(kafkaProducerConfig.getBootstrapServers()).thenReturn(Collections.singletonList(UUID.randomUUID().toString()));

final Serializer serializer = mock(Serializer.class);
when(serializationFactory.getSerializer(any())).thenReturn(serializer);
}

private KafkaCustomProducerFactory createObjectUnderTest() {
return new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, topicServiceFactory);
}

@Test
void createProducer_does_not_create_TopicService_when_createTopic_is_false() {
final KafkaCustomProducerFactory objectUnderTest = createObjectUnderTest();
try(final MockedConstruction<KafkaProducer> ignored = mockConstruction(KafkaProducer.class)) {
objectUnderTest.createProducer(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, false);
}

verify(topicServiceFactory, never()).createTopicService(any());
}

@Test
void createProducer_creates_TopicService_and_creates_topic_when_createTopic_is_true() {
final TopicService topicService = mock(TopicService.class);
when(topicServiceFactory.createTopicService(kafkaProducerConfig)).thenReturn(topicService);

final String topicName = UUID.randomUUID().toString();
final int numberOfPartitions = RANDOM.nextInt(1000);
final short replicationFactor = (short) RANDOM.nextInt(1000);
when(topicProducerConfig.getName()).thenReturn(topicName);
when(topicProducerConfig.getNumberOfPartitions()).thenReturn(numberOfPartitions);
when(topicProducerConfig.getReplicationFactor()).thenReturn(replicationFactor);


final KafkaCustomProducerFactory objectUnderTest = createObjectUnderTest();

when(topicProducerConfig.isCreateTopic()).thenReturn(true);

try(final MockedConstruction<KafkaProducer> ignored = mockConstruction(KafkaProducer.class)) {
objectUnderTest.createProducer(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, false);
}

final InOrder inOrder = inOrder(topicService);
inOrder.verify(topicService).createTopic(topicName, numberOfPartitions, replicationFactor, null);
inOrder.verify(topicService).closeAdminClient();

}

}
Loading
Loading