-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zero Buffer Implementation and Tests #5416
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package org.opensearch.dataprepper.core.pipeline; | ||
|
||
public interface PipelineRunner { | ||
void runAllProcessorsAndPublishToSinks(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package org.opensearch.dataprepper.core.pipeline.buffer; | ||
|
||
import org.opensearch.dataprepper.core.pipeline.PipelineRunner; | ||
import org.opensearch.dataprepper.model.buffer.Buffer; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
|
||
/** | ||
* Represents the base class for zero buffer implementation and implements {@link Buffer} interface. | ||
* It provides a common functionality to run all processors and sinks within the same thread context. | ||
*/ | ||
public abstract class AbstractZeroBuffer <T extends Record<?>> implements Buffer<T> { | ||
private PipelineRunner pipelineRunner; | ||
|
||
protected void runAllProcessorsAndPublishToSinks() { | ||
// TODO : Implement functionality to call the processors and sinks within the same context | ||
getPipelineRunner().runAllProcessorsAndPublishToSinks(); | ||
} | ||
|
||
public PipelineRunner getPipelineRunner() { | ||
return pipelineRunner; | ||
} | ||
|
||
public void setPipelineRunner(PipelineRunner pipelineRunner) { | ||
this.pipelineRunner = pipelineRunner; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
plugins { | ||
id 'java' | ||
} | ||
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation project(':data-prepper-core') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should depend on data-prepper-core here. I think it may make the most sense to move this project ( |
||
implementation 'io.micrometer:micrometer-core' | ||
implementation 'org.projectlombok:lombok:1.18.30' | ||
annotationProcessor 'org.projectlombok:lombok:1.18.30' | ||
} | ||
|
||
jacocoTestCoverageVerification { | ||
dependsOn jacocoTestReport | ||
violationRules { | ||
rule { //in addition to core projects rule | ||
limit { | ||
minimum = 0.90 | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package org.opensearch.dataprepper.plugins.buffer.zerobuffer; | ||
|
||
import org.opensearch.dataprepper.core.pipeline.buffer.AbstractZeroBuffer; | ||
import org.opensearch.dataprepper.metrics.MetricNames; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
import org.opensearch.dataprepper.model.buffer.Buffer; | ||
import org.opensearch.dataprepper.model.CheckpointState; | ||
import org.opensearch.dataprepper.model.configuration.PipelineDescription; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import io.micrometer.core.instrument.Counter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import lombok.Getter; | ||
import lombok.AccessLevel; | ||
import java.util.AbstractMap; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
@DataPrepperPlugin(name = "zero_buffer", pluginType = Buffer.class) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this just be named |
||
public class ZeroBuffer<T extends Record<?>> extends AbstractZeroBuffer<T> { | ||
private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class); | ||
private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer"; | ||
private final PluginMetrics pluginMetrics; | ||
private final ThreadLocal<Collection<T>> threadLocalStore; | ||
@Getter(value = AccessLevel.PACKAGE) | ||
private final String pipelineName; | ||
@Getter(value = AccessLevel.PACKAGE) | ||
private final Counter writeRecordsCounter; | ||
@Getter(value = AccessLevel.PACKAGE) | ||
private final Counter readRecordsCounter; | ||
|
||
@DataPrepperPluginConstructor | ||
public ZeroBuffer(PipelineDescription pipelineDescription) { | ||
this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can now get this from dependency injection. @san81 , is that correct? |
||
this.pipelineName = pipelineDescription.getPipelineName(); | ||
this.threadLocalStore = new ThreadLocal<>(); | ||
this.writeRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_WRITTEN); | ||
this.readRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_READ); | ||
} | ||
|
||
@Override | ||
public void write(T record, int timeoutInMillis) throws TimeoutException { | ||
if (record == null) { | ||
throw new NullPointerException("The write record cannot be null"); | ||
} | ||
|
||
if (threadLocalStore.get() == null) { | ||
threadLocalStore.set(new ArrayList<>()); | ||
} | ||
|
||
threadLocalStore.get().add(record); | ||
writeRecordsCounter.increment(); | ||
|
||
runAllProcessorsAndPublishToSinks(); | ||
} | ||
|
||
@Override | ||
public void writeAll(Collection<T> records, int timeoutInMillis) throws Exception { | ||
if (records == null) { | ||
throw new NullPointerException("The write records cannot be null"); | ||
} | ||
|
||
if (threadLocalStore.get() == null) { | ||
threadLocalStore.set(records); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you are possibly reusing the thread local store, you actually are modifying this input list. You should make a copy of this. |
||
} else { | ||
// Add the new records to the existing records | ||
threadLocalStore.get().addAll(records); | ||
} | ||
|
||
writeRecordsCounter.increment(records.size() * 1.0); | ||
runAllProcessorsAndPublishToSinks(); | ||
} | ||
|
||
@Override | ||
public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) { | ||
if (threadLocalStore.get() == null) { | ||
threadLocalStore.set(new ArrayList<>()); | ||
} | ||
|
||
Collection<T> storedRecords = threadLocalStore.get(); | ||
CheckpointState checkpointState = new CheckpointState(0); | ||
if (storedRecords!= null && !storedRecords.isEmpty()) { | ||
checkpointState = new CheckpointState(storedRecords.size()); | ||
threadLocalStore.remove(); | ||
readRecordsCounter.increment(storedRecords.size() * 1.0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why |
||
} | ||
|
||
return new AbstractMap.SimpleEntry<>(storedRecords, checkpointState); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use |
||
} | ||
|
||
@Override | ||
public void checkpoint(CheckpointState checkpointState) {} | ||
|
||
@Override | ||
public boolean isEmpty() { | ||
return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than an abstract class, make this an interface. Maybe something like
SupportsPipelineRunner
.Then move the implementation into
ZeroBuffer
.