Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero Buffer Implementation and Tests #5416

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.dataprepper.core.pipeline;

public interface PipelineRunner {
void runAllProcessorsAndPublishToSinks();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.opensearch.dataprepper.core.pipeline.buffer;

import org.opensearch.dataprepper.core.pipeline.PipelineRunner;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;

/**
* Represents the base class for zero buffer implementation and implements {@link Buffer} interface.
* It provides a common functionality to run all processors and sinks within the same thread context.
*/
public abstract class AbstractZeroBuffer <T extends Record<?>> implements Buffer<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than an abstract class, make this an interface. Maybe something like SupportsPipelineRunner.

interface SupportsPipelineRunner {
  PipelineRunner getPipelineRunner();
  void setPipelineRunner(PipelineRunner pipelineRunner);
}

Then move the implementation into ZeroBuffer.

private PipelineRunner pipelineRunner;

protected void runAllProcessorsAndPublishToSinks() {
// TODO : Implement functionality to call the processors and sinks within the same context
getPipelineRunner().runAllProcessorsAndPublishToSinks();
}

public PipelineRunner getPipelineRunner() {
return pipelineRunner;
}

public void setPipelineRunner(PipelineRunner pipelineRunner) {
this.pipelineRunner = pipelineRunner;
}
}
27 changes: 27 additions & 0 deletions data-prepper-plugins/zero-buffer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-core')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should depend on data-prepper-core here.

I think it may make the most sense to move this project (zero-buffer) into data-prepper-core.

implementation 'io.micrometer:micrometer-core'
implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.opensearch.dataprepper.plugins.buffer.zerobuffer;

import org.opensearch.dataprepper.core.pipeline.buffer.AbstractZeroBuffer;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Getter;
import lombok.AccessLevel;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "zero_buffer", pluginType = Buffer.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be named zero? We have moved away from including the type suffix. For example, we went from otel_trace_source to otel_trace.

public class ZeroBuffer<T extends Record<?>> extends AbstractZeroBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class);
private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer";
private final PluginMetrics pluginMetrics;
private final ThreadLocal<Collection<T>> threadLocalStore;
@Getter(value = AccessLevel.PACKAGE)
private final String pipelineName;
@Getter(value = AccessLevel.PACKAGE)
private final Counter writeRecordsCounter;
@Getter(value = AccessLevel.PACKAGE)
private final Counter readRecordsCounter;

@DataPrepperPluginConstructor
public ZeroBuffer(PipelineDescription pipelineDescription) {
this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can now get this from dependency injection. @san81 , is that correct?

this.pipelineName = pipelineDescription.getPipelineName();
this.threadLocalStore = new ThreadLocal<>();
this.writeRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_WRITTEN);
this.readRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_READ);
}

@Override
public void write(T record, int timeoutInMillis) throws TimeoutException {
if (record == null) {
throw new NullPointerException("The write record cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

threadLocalStore.get().add(record);
writeRecordsCounter.increment();

runAllProcessorsAndPublishToSinks();
}

@Override
public void writeAll(Collection<T> records, int timeoutInMillis) throws Exception {
if (records == null) {
throw new NullPointerException("The write records cannot be null");
}

if (threadLocalStore.get() == null) {
threadLocalStore.set(records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you are possibly reusing the thread local store, you actually are modifying this input list. You should make a copy of this. new ArrayList<>(records).

} else {
// Add the new records to the existing records
threadLocalStore.get().addAll(records);
}

writeRecordsCounter.increment(records.size() * 1.0);
runAllProcessorsAndPublishToSinks();
}

@Override
public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
if (threadLocalStore.get() == null) {
threadLocalStore.set(new ArrayList<>());
}

Collection<T> storedRecords = threadLocalStore.get();
CheckpointState checkpointState = new CheckpointState(0);
if (storedRecords!= null && !storedRecords.isEmpty()) {
checkpointState = new CheckpointState(storedRecords.size());
threadLocalStore.remove();
readRecordsCounter.increment(storedRecords.size() * 1.0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why * 1.0? If it is make a double, may just cast.

}

return new AbstractMap.SimpleEntry<>(storedRecords, checkpointState);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Map.entry(storedRecords, checkpointState) instead.

}

@Override
public void checkpoint(CheckpointState checkpointState) {}

@Override
public boolean isEmpty() {
return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty());
}
}
Loading
Loading