Skip to content

Commit

Permalink
Implementation for default non-blocking buffer.
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 committed Jun 17, 2024
1 parent 2a076ab commit f77f5dc
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.opensearch.dataprepper.model.buffer;

import java.util.Optional;

public interface DefinesBuffer {

Optional<Buffer> getDefaultBuffer();

This comment has been minimized.

Copy link
@dlvenable

dlvenable Jun 18, 2024

The logic below forces a buffer. So we might want to rename this to getBuffer() or getDefinedBuffer().

This comment has been minimized.

Copy link
@sb2k16

sb2k16 Jun 18, 2024

Author Owner

Thank you for your comment. I will follow this.

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.DefinesBuffer;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
Expand All @@ -30,7 +31,7 @@
import org.opensearch.dataprepper.pipeline.parser.model.SinkContextPluginSetting;
import org.opensearch.dataprepper.pipeline.router.Router;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.pipeline.buffer.AbstractSynchronizedBuffer;
import org.opensearch.dataprepper.pipeline.buffer.AbstractNonBlockingBuffer;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -118,7 +119,13 @@ private void buildPipelineFromConfiguration(
pluginFactory.loadPlugin(Source.class, sourceSetting));

LOG.info("Building buffer for the pipeline [{}]", pipelineName);
final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder());
Optional<Buffer> defaultBuffer = Optional.empty();
if (source instanceof DefinesBuffer) {
defaultBuffer = ((DefinesBuffer) source).getDefaultBuffer();
}

final Buffer pipelineDefinedBuffer = defaultBuffer.orElseGet(() ->
pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder()));

LOG.info("Building processors for the pipeline [{}]", pipelineName);
final int processorThreads = pipelineConfiguration.getWorkers();
Expand Down Expand Up @@ -149,7 +156,6 @@ private void buildPipelineFromConfiguration(
LOG.info("Constructing MultiBufferDecorator with [{}] secondary buffers for pipeline [{}]", secondaryBuffers.size(), pipelineName);
final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(pipelineDefinedBuffer, secondaryBuffers);


final Buffer buffer = applyCircuitBreakerToBuffer(source, multiBufferDecorator);

final Router router = routerFactory.createRouter(pipelineConfiguration.getRoutes());
Expand All @@ -159,8 +165,8 @@ private void buildPipelineFromConfiguration(
dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(),
getPeerForwarderDrainTimeout(dataPrepperConfiguration));

if (pipelineDefinedBuffer instanceof AbstractSynchronizedBuffer) {
((AbstractSynchronizedBuffer<?>) pipelineDefinedBuffer).setPipelineRunner(new PipelineRunner(pipeline));
if (pipelineDefinedBuffer instanceof AbstractNonBlockingBuffer) {
((AbstractNonBlockingBuffer<?>) pipelineDefinedBuffer).setPipelineRunner(new PipelineRunner(pipeline));
}

pipelineMap.put(pipelineName, pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.pipeline.PipelineRunner;

public abstract class AbstractSynchronizedBuffer<T extends Record<?>> implements Buffer<T> {
public abstract class AbstractNonBlockingBuffer<T extends Record<?>> implements Buffer<T> {
private PipelineRunner pipelineRunner;

public PipelineRunner getPipelineRunner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.pipeline.buffer;

import io.micrometer.core.instrument.Counter;
import org.apache.commons.collections.buffer.SynchronizedBuffer;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
Expand All @@ -23,16 +24,15 @@

import static com.google.common.base.Preconditions.checkNotNull;

@DataPrepperPlugin(name = "synchronized_buffer", pluginType = Buffer.class)
public class SynchronizedBuffer<T extends Record<?>> extends AbstractSynchronizedBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(SynchronizedBuffer.class);
public class NonBlockingBuffer<T extends Record<?>> extends AbstractNonBlockingBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(NonBlockingBuffer.class);
private static final String SYNCHRONIZED_BUFFER = "SynchronizedBuffer";
private final String pipelineName;
private final ThreadLocal<Collection<T>> threadLocalStore;
private final Counter recordsWrittenCounter;
private final Counter recordsReadCounter;

public SynchronizedBuffer(final String pipelineName) {
public NonBlockingBuffer(final String pipelineName) {

This comment has been minimized.

Copy link
@kkondaka

kkondaka Jun 20, 2024

NonBlockBuffer a good name? It still indicates there is some buffer. Isn't ZeroBuffer better name?

this.pipelineName = pipelineName;
this.threadLocalStore = new ThreadLocal<>();

Expand All @@ -41,7 +41,7 @@ public SynchronizedBuffer(final String pipelineName) {
this.recordsReadCounter = pluginMetrics.counter(MetricNames.RECORDS_READ);
}

public SynchronizedBuffer(final PluginSetting pluginSetting) {
public NonBlockingBuffer(final PluginSetting pluginSetting) {
this(checkNotNull(pluginSetting, "PluginSetting cannot be null").getPipelineName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SynchronizedBufferTests {
public class NoBlockingBufferTests {
private static final String TEST_PIPELINE_NAME = "test-pipeline";
private static final int TEST_WRITE_TIMEOUT = 10;
private static final int TEST_BATCH_READ_TIMEOUT = 500;
Expand All @@ -54,78 +54,78 @@ public void setup() {

@Test
public void testCreationUsingPipelineDescription() {
final SynchronizedBuffer<Record<String>> synchronizedBuffer = new SynchronizedBuffer<>(pipelineDescription);
synchronizedBuffer.setPipelineRunner(mockPipelineRunner);
assertThat(synchronizedBuffer, notNullValue());
final NonBlockingBuffer<Record<String>> NonBlockingBuffer = new NonBlockingBuffer<>(pipelineDescription);
NonBlockingBuffer.setPipelineRunner(mockPipelineRunner);
assertThat(NonBlockingBuffer, notNullValue());
}

@Test
public void testCreationUsingNullPipelineDescription() {
try {
new SynchronizedBuffer<Record<String>>((PluginSetting) null);
new NonBlockingBuffer<Record<String>>((PluginSetting) null);
} catch (NullPointerException ex) {
assertThat(ex.getMessage(), is(equalTo("PipelineDescription cannot be null")));
}
}

@Test
public void testCreationUsingValues() {
final SynchronizedBuffer<Record<String>> synchronizedBuffer = new SynchronizedBuffer<>(TEST_PIPELINE_NAME);
synchronizedBuffer.setPipelineRunner(mockPipelineRunner);
final NonBlockingBuffer<Record<String>> NonBlockingBuffer = new NonBlockingBuffer<>(TEST_PIPELINE_NAME);
NonBlockingBuffer.setPipelineRunner(mockPipelineRunner);

assertThat(synchronizedBuffer, notNullValue());
assertThat(NonBlockingBuffer, notNullValue());
}

@Test
public void testInsertNull() {
final SynchronizedBuffer<Record<String>> synchronizedBuffer = new SynchronizedBuffer<>(TEST_PIPELINE_NAME);
synchronizedBuffer.setPipelineRunner(mockPipelineRunner);
final NonBlockingBuffer<Record<String>> NonBlockingBuffer = new NonBlockingBuffer<>(TEST_PIPELINE_NAME);
NonBlockingBuffer.setPipelineRunner(mockPipelineRunner);

assertThat(synchronizedBuffer, notNullValue());
assertThrows(NullPointerException.class, () -> synchronizedBuffer.write(null, TEST_WRITE_TIMEOUT));
assertThat(NonBlockingBuffer, notNullValue());
assertThrows(NullPointerException.class, () -> NonBlockingBuffer.write(null, TEST_WRITE_TIMEOUT));
}

@Test
public void testReadEmptyBuffer() {
final SynchronizedBuffer<Record<String>> synchronizedBuffer = new SynchronizedBuffer<>(TEST_PIPELINE_NAME);
synchronizedBuffer.setPipelineRunner(mockPipelineRunner);
final NonBlockingBuffer<Record<String>> NonBlockingBuffer = new NonBlockingBuffer<>(TEST_PIPELINE_NAME);
NonBlockingBuffer.setPipelineRunner(mockPipelineRunner);

assertThat(synchronizedBuffer, notNullValue());
final Map.Entry<Collection<Record<String>>, CheckpointState> readResult = synchronizedBuffer.read(TEST_BATCH_READ_TIMEOUT);
assertThat(NonBlockingBuffer, notNullValue());
final Map.Entry<Collection<Record<String>>, CheckpointState> readResult = NonBlockingBuffer.read(TEST_BATCH_READ_TIMEOUT);
assertThat(readResult.getKey().size(), is(0));
}

@Test
public void testBufferIsEmpty() {
final SynchronizedBuffer<Record<String>> synchronizedBuffer = new SynchronizedBuffer<>(pipelineDescription);
synchronizedBuffer.setPipelineRunner(mockPipelineRunner);
final NonBlockingBuffer<Record<String>> NonBlockingBuffer = new NonBlockingBuffer<>(pipelineDescription);
NonBlockingBuffer.setPipelineRunner(mockPipelineRunner);

assertTrue(synchronizedBuffer.isEmpty());
assertTrue(NonBlockingBuffer.isEmpty());
}

@Test
public void testBufferIsNotEmpty() {
doNothing().when(mockPipelineRunner).runAllProcessorsAndPublishToSinks();

final SynchronizedBuffer<Record<String>> synchronizedBuffer = new SynchronizedBuffer<>(pipelineDescription);
synchronizedBuffer.setPipelineRunner(mockPipelineRunner);
final NonBlockingBuffer<Record<String>> NonBlockingBuffer = new NonBlockingBuffer<>(pipelineDescription);
NonBlockingBuffer.setPipelineRunner(mockPipelineRunner);

Record<String> record = new Record<>("TEST");
synchronizedBuffer.write(record, TEST_WRITE_TIMEOUT);
NonBlockingBuffer.write(record, TEST_WRITE_TIMEOUT);

assertFalse(synchronizedBuffer.isEmpty());
assertFalse(NonBlockingBuffer.isEmpty());
}

@Test
void testNonZeroBatchDelayReturnsAllRecords() throws Exception {
final SynchronizedBuffer<Record<String>> synchronizedBuffer = new SynchronizedBuffer<>(pipelineDescription);
synchronizedBuffer.setPipelineRunner(mockPipelineRunner);
final NonBlockingBuffer<Record<String>> NonBlockingBuffer = new NonBlockingBuffer<>(pipelineDescription);
NonBlockingBuffer.setPipelineRunner(mockPipelineRunner);

assertThat(synchronizedBuffer, notNullValue());
assertThat(NonBlockingBuffer, notNullValue());

final Collection<Record<String>> testRecords = generateBatchRecords();
synchronizedBuffer.writeAll(testRecords, TEST_WRITE_TIMEOUT);
final Map.Entry<Collection<Record<String>>, CheckpointState> readResult = synchronizedBuffer.read(TEST_BATCH_READ_TIMEOUT);
NonBlockingBuffer.writeAll(testRecords, TEST_WRITE_TIMEOUT);
final Map.Entry<Collection<Record<String>>, CheckpointState> readResult = NonBlockingBuffer.read(TEST_BATCH_READ_TIMEOUT);
final Collection<Record<String>> records = readResult.getKey();
final CheckpointState checkpointState = readResult.getValue();
assertThat(records.size(), is(testRecords.size()));
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch-api-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ plugins {

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-core')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation project(':data-prepper-plugins:http-source-common')
implementation project(':data-prepper-plugins:common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.DefinesBuffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
Expand All @@ -28,6 +29,7 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.pipeline.buffer.NonBlockingBuffer;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
Expand All @@ -45,7 +47,7 @@
import java.util.function.Function;

@DataPrepperPlugin(name = "opensearch_api", pluginType = Source.class, pluginConfigurationType = OpenSearchAPISourceConfig.class)
public class OpenSearchAPISource implements Source<Record<Event>> {
public class OpenSearchAPISource implements Source<Record<Event>>, DefinesBuffer {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPISource.class);
private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}";
public static final String REGEX_HEALTH = "regex:^/(?!health$).*$";
Expand All @@ -60,6 +62,7 @@ public class OpenSearchAPISource implements Source<Record<Event>> {
private final PluginMetrics pluginMetrics;
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
private ByteDecoder byteDecoder;
private final Buffer<Record<Event>> defaultBuffer;

@DataPrepperPluginConstructor
public OpenSearchAPISource(final OpenSearchAPISourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
Expand Down Expand Up @@ -87,10 +90,12 @@ public OpenSearchAPISource(final OpenSearchAPISourceConfig sourceConfig, final P
authenticationPluginSetting.setPipelineName(pipelineName);
authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting);
httpRequestExceptionHandler = new HttpRequestExceptionHandler(pluginMetrics);
defaultBuffer = new NonBlockingBuffer(pipelineName);
}

@Override
public void start(final Buffer<Record<Event>> buffer) {

if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
Expand Down Expand Up @@ -195,4 +200,9 @@ public void stop() {
}
LOG.info("Stopped OpenSearch API source.");
}

@Override
public Optional<Buffer> getDefaultBuffer() {
return Optional.of(defaultBuffer);
}
}

0 comments on commit f77f5dc

Please sign in to comment.