diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunner.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunner.java new file mode 100644 index 0000000000..be7a4595cc --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineRunner.java @@ -0,0 +1,5 @@ +package org.opensearch.dataprepper.core.pipeline; + +public interface PipelineRunner { + void runAllProcessorsAndPublishToSinks(); +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/buffer/AbstractZeroBuffer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/buffer/AbstractZeroBuffer.java new file mode 100644 index 0000000000..ca7ea822b8 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/buffer/AbstractZeroBuffer.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.core.pipeline.buffer; + +import org.opensearch.dataprepper.core.pipeline.PipelineRunner; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.record.Record; + +/** + * Represents the base class for zero buffer implementation and implements {@link Buffer} interface. + * It provides a common functionality to run all processors and sinks within the same thread context. + */ +public abstract class AbstractZeroBuffer > implements Buffer { + private PipelineRunner pipelineRunner; + + protected void runAllProcessorsAndPublishToSinks() { + // TODO : Implement functionality to call the processors and sinks within the same context + getPipelineRunner().runAllProcessorsAndPublishToSinks(); + } + + public PipelineRunner getPipelineRunner() { + return pipelineRunner; + } + + public void setPipelineRunner(PipelineRunner pipelineRunner) { + this.pipelineRunner = pipelineRunner; + } +} diff --git a/data-prepper-plugins/zero-buffer/build.gradle b/data-prepper-plugins/zero-buffer/build.gradle new file mode 100644 index 0000000000..5309b57130 --- /dev/null +++ b/data-prepper-plugins/zero-buffer/build.gradle @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-core') + implementation 'io.micrometer:micrometer-core' + implementation 'org.projectlombok:lombok:1.18.30' + annotationProcessor 'org.projectlombok:lombok:1.18.30' +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { //in addition to core projects rule + limit { + minimum = 0.90 + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/zero-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/zerobuffer/ZeroBuffer.java b/data-prepper-plugins/zero-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/zerobuffer/ZeroBuffer.java new file mode 100644 index 0000000000..17d7f797ed --- /dev/null +++ b/data-prepper-plugins/zero-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/zerobuffer/ZeroBuffer.java @@ -0,0 +1,102 @@ +package org.opensearch.dataprepper.plugins.buffer.zerobuffer; + +import org.opensearch.dataprepper.core.pipeline.buffer.AbstractZeroBuffer; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.record.Record; +import io.micrometer.core.instrument.Counter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import lombok.Getter; +import lombok.AccessLevel; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +@DataPrepperPlugin(name = "zero_buffer", pluginType = Buffer.class) +public class ZeroBuffer> extends AbstractZeroBuffer { + private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class); + private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer"; + private final PluginMetrics pluginMetrics; + private final ThreadLocal> threadLocalStore; + @Getter(value = AccessLevel.PACKAGE) + private final String pipelineName; + @Getter(value = AccessLevel.PACKAGE) + private final Counter writeRecordsCounter; + @Getter(value = AccessLevel.PACKAGE) + private final Counter readRecordsCounter; + + @DataPrepperPluginConstructor + public ZeroBuffer(PipelineDescription pipelineDescription) { + this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName()); + this.pipelineName = pipelineDescription.getPipelineName(); + this.threadLocalStore = new ThreadLocal<>(); + this.writeRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_WRITTEN); + this.readRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_READ); + } + + @Override + public void write(T record, int timeoutInMillis) throws TimeoutException { + if (record == null) { + throw new NullPointerException("The write record cannot be null"); + } + + if (threadLocalStore.get() == null) { + threadLocalStore.set(new ArrayList<>()); + } + + threadLocalStore.get().add(record); + writeRecordsCounter.increment(); + + runAllProcessorsAndPublishToSinks(); + } + + @Override + public void writeAll(Collection records, int timeoutInMillis) throws Exception { + if (records == null) { + throw new NullPointerException("The write records cannot be null"); + } + + if (threadLocalStore.get() == null) { + threadLocalStore.set(records); + } else { + // Add the new records to the existing records + threadLocalStore.get().addAll(records); + } + + writeRecordsCounter.increment(records.size() * 1.0); + runAllProcessorsAndPublishToSinks(); + } + + @Override + public Map.Entry, CheckpointState> read(int timeoutInMillis) { + if (threadLocalStore.get() == null) { + threadLocalStore.set(new ArrayList<>()); + } + + Collection storedRecords = threadLocalStore.get(); + CheckpointState checkpointState = new CheckpointState(0); + if (storedRecords!= null && !storedRecords.isEmpty()) { + checkpointState = new CheckpointState(storedRecords.size()); + threadLocalStore.remove(); + readRecordsCounter.increment(storedRecords.size() * 1.0); + } + + return new AbstractMap.SimpleEntry<>(storedRecords, checkpointState); + } + + @Override + public void checkpoint(CheckpointState checkpointState) {} + + @Override + public boolean isEmpty() { + return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty()); + } +} diff --git a/data-prepper-plugins/zero-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/zerobuffer/ZeroBufferTests.java b/data-prepper-plugins/zero-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/zerobuffer/ZeroBufferTests.java new file mode 100644 index 0000000000..f426ee0536 --- /dev/null +++ b/data-prepper-plugins/zero-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/zerobuffer/ZeroBufferTests.java @@ -0,0 +1,235 @@ +package org.opensearch.dataprepper.plugins.buffer.zerobuffer; + +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.core.pipeline.PipelineRunner; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.TimeoutException; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ZeroBufferTests { + private static final Logger LOG = LoggerFactory.getLogger(ZeroBufferTests.class); + private static final String MOCK_PIPELINE_NAME = "mock-pipeline"; + private static final int WRITE_TIMEOUT = 100; + private static final int READ_TIMEOUT = 500; + private static final String SINGLE_RECORD_DATA_FORMAT = "{\"message\":\"test\"}"; + private static final String BATCH_RECORDS_DATA_FORMAT = "{\"message\":\"test-%d\"}"; + + @BeforeEach + public void setup() { + Metrics.globalRegistry.getRegistries().forEach(Metrics.globalRegistry::remove); + Metrics.globalRegistry.getMeters().forEach(Metrics.globalRegistry::remove); + Metrics.addRegistry(new SimpleMeterRegistry()); + } + + /*-------------------------Tests for Writing to ZeroBuffer---------------------------*/ + @Test + public void testSingleWriteAndReadReturnsCorrectRecord() throws Exception { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + assertEquals(zeroBuffer.getWriteRecordsCounter().count(), 1); + + Collection> readRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + assertEquals(readRecords.size(), 1); + assertEquals(zeroBuffer.getReadRecordsCounter().count(), 1); + assertEquals(readRecords.iterator().next().getData(), SINGLE_RECORD_DATA_FORMAT); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + readRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + assertEquals(readRecords.size(), 1); + } + + @Test + public void testMultipleWriteAndReadReturnsCorrectRecord() throws Exception { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + assertEquals(zeroBuffer.getWriteRecordsCounter().count(), 2); + + Collection> readRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + assertEquals(readRecords.size(), 2); + assertEquals(zeroBuffer.getReadRecordsCounter().count(), 2); + assertEquals(readRecords.iterator().next().getData(), SINGLE_RECORD_DATA_FORMAT); + assertEquals(readRecords.iterator().next().getData(), SINGLE_RECORD_DATA_FORMAT); + } + + @Test + public void testWriteAllAndReadReturnsAllRecords() throws Exception { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + + Collection> writeRecords = generateRecords(IntStream.range(0, 10) + .mapToObj(i -> String.format(BATCH_RECORDS_DATA_FORMAT, i)) + .collect(Collectors.toList())); + zeroBuffer.writeAll(writeRecords, WRITE_TIMEOUT); + + Map.Entry>, CheckpointState> readRecordsMap = zeroBuffer.read(READ_TIMEOUT); + Collection> readRecords = readRecordsMap.getKey(); + for (Record record : readRecords) { + LOG.debug(record.getData()); + } + + // Ensure that the write records are the same as the read records + assertEquals(writeRecords.size(), readRecords.size()); + } + + @Test + public void testWriteNullRecordThrowsException() { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + + Exception writeException = assertThrows(NullPointerException.class, () -> { + zeroBuffer.write(null, WRITE_TIMEOUT); + }); + + Exception writeAllException = assertThrows(NullPointerException.class, () -> { + zeroBuffer.writeAll(null, WRITE_TIMEOUT); + }); + + assertEquals(writeException.getMessage(), "The write record cannot be null"); + assertEquals(writeAllException.getMessage(), "The write records cannot be null"); + } + + @Test + public void testWriteEmptyRecordDoesNotThrowException() { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + + Record emptyRecord = generateRecord(null); + Collection> emptyRecordCollection = generateRecords(new ArrayList<>()); + + assertDoesNotThrow(() -> zeroBuffer.write(emptyRecord, WRITE_TIMEOUT)); + assertDoesNotThrow(() -> zeroBuffer.writeAll(emptyRecordCollection, WRITE_TIMEOUT)); + } + + @Test + public void testThreadReadAndWriteIsolation() throws Exception { + final ZeroBuffer> zeroBuffer = initializeZeroBufferWithPipelineName(); + + Thread workerThread = new Thread(() -> { + try { + PipelineRunner pipelineRunnerMock = mock(PipelineRunner.class); + zeroBuffer.setPipelineRunner(pipelineRunnerMock); + doNothing().when(pipelineRunnerMock).runAllProcessorsAndPublishToSinks(); + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + } catch (TimeoutException e) { + fail("Timeout exception occurred"); + } + }); + workerThread.start(); + workerThread.join(); + + // Ensure that main thread does not share the same records store as the worker thread + assertEquals(zeroBuffer.read(READ_TIMEOUT).getKey().size(), 0); + assertTrue(zeroBuffer.isEmpty()); + } + + @Test + public void testWriteAndWriteAllReturnsCorrectRecords() throws Exception { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + zeroBuffer.writeAll(generateRecords(IntStream.range(0, 10) + .mapToObj(i -> String.format(BATCH_RECORDS_DATA_FORMAT, i)) + .collect(Collectors.toList())), WRITE_TIMEOUT); + + Collection> readRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + for (Record record : readRecords) { + LOG.debug(record.getData()); + } + assertEquals(readRecords.size(), 11); + } + + /*-------------------------Tests for Reading From ZeroBuffer---------------------------*/ + @Test + public void testReadFromNonEmptyBufferReturnsCorrectRecords() throws Exception { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + + Collection> initialReadRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + Collection> secondAttemptToReadRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); + + assertEquals(initialReadRecords.size(), 1); + assertEquals(initialReadRecords.iterator().next().getData(), SINGLE_RECORD_DATA_FORMAT); + + assertEquals(secondAttemptToReadRecords.size(), 0); + } + + @Test + public void testReadFromEmptyBufferReturnsNoRecords() { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + + Map.Entry>, CheckpointState> readRecordsMap = zeroBuffer.read(READ_TIMEOUT); + assertTrue(readRecordsMap.getKey().isEmpty()); + } + + /*-------------------------Tests for Empty Buffer---------------------------*/ + @Test + public void testIsEmptyReturnsTrueWhenBufferIsEmpty() { + ZeroBuffer> zeroBuffer = initializeZeroBufferWithPipelineName(); + assertTrue(zeroBuffer.isEmpty()); + } + + @Test + public void testIsEmptyReturnsFalseWhenBufferIsNotEmpty() throws Exception { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + + zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + + assertFalse(zeroBuffer.isEmpty()); + } + + /*---------------------------Other Tests-----------------------------*/ + @Test + public void testCreateZeroBufferWithPipelineName() { + ZeroBuffer> zeroBuffer = initializeZeroBufferWithPipelineName(); + assertEquals(zeroBuffer.getPipelineName(), MOCK_PIPELINE_NAME); + } + + @Test + public void testCheckpointDoesNotThrowException() { + ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + assertDoesNotThrow(() -> zeroBuffer.checkpoint(null)); + assertDoesNotThrow(() -> zeroBuffer.checkpoint(new CheckpointState(0))); + } + + /*-------------------------Private Helper Methods---------------------------*/ + private Record generateRecord(final T data) { + return new Record<>(data); + } + + private Collection> generateRecords(Collection data) { + Collection> records = new ArrayList<>(); + for (T recordData : data) { + Record record = new Record<>(recordData); + records.add(record); + } + return records; + } + + private ZeroBuffer> setupAndInitializeZeroBuffer() { + ZeroBuffer> zeroBuffer = initializeZeroBufferWithPipelineName(); + PipelineRunner pipelineRunnerMock = mock(PipelineRunner.class); + zeroBuffer.setPipelineRunner(pipelineRunnerMock); + doNothing().when(pipelineRunnerMock).runAllProcessorsAndPublishToSinks(); + return zeroBuffer; + } + + private ZeroBuffer> initializeZeroBufferWithPipelineName() { + PipelineDescription pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(MOCK_PIPELINE_NAME); + return new ZeroBuffer<>(pipelineDescription); + } +} diff --git a/settings.gradle b/settings.gradle index d2aa09b52c..e99ee7049a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -190,4 +190,5 @@ include 'data-prepper-plugins:opensearch-api-source' include 'data-prepper-plugins:saas-source-plugins' include 'data-prepper-plugins:saas-source-plugins:source-crawler' include 'data-prepper-plugins:saas-source-plugins:jira-source' +include 'data-prepper-plugins:zero-buffer'