Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero buffer impl #1

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.opensearch.dataprepper.model.buffer;

import java.util.Optional;

public interface DefinesBuffer {

Optional<Buffer> getDefaultBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,4 @@ void testWriteBytes() {
assertThrows(UnsupportedOperationException.class, () -> buffer.writeBytes(bytes, "", 10));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.DefinesBuffer;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
Expand All @@ -24,11 +25,13 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.pipeline.Pipeline;
import org.opensearch.dataprepper.pipeline.PipelineConnector;
import org.opensearch.dataprepper.pipeline.PipelineRunner;
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationValidator;
import org.opensearch.dataprepper.pipeline.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.pipeline.parser.model.SinkContextPluginSetting;
import org.opensearch.dataprepper.pipeline.router.Router;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.pipeline.buffer.AbstractNonBlockingBuffer;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -116,7 +119,13 @@ private void buildPipelineFromConfiguration(
pluginFactory.loadPlugin(Source.class, sourceSetting));

LOG.info("Building buffer for the pipeline [{}]", pipelineName);
final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder());
Optional<Buffer> defaultBuffer = Optional.empty();
if (source instanceof DefinesBuffer) {
defaultBuffer = ((DefinesBuffer) source).getDefaultBuffer();
}

final Buffer pipelineDefinedBuffer = defaultBuffer.orElseGet(() ->
pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder()));

LOG.info("Building processors for the pipeline [{}]", pipelineName);
final int processorThreads = pipelineConfiguration.getWorkers();
Expand Down Expand Up @@ -147,7 +156,6 @@ private void buildPipelineFromConfiguration(
LOG.info("Constructing MultiBufferDecorator with [{}] secondary buffers for pipeline [{}]", secondaryBuffers.size(), pipelineName);
final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(pipelineDefinedBuffer, secondaryBuffers);


final Buffer buffer = applyCircuitBreakerToBuffer(source, multiBufferDecorator);

final Router router = routerFactory.createRouter(pipelineConfiguration.getRoutes());
Expand All @@ -156,6 +164,11 @@ private void buildPipelineFromConfiguration(
eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, processorThreads, readBatchDelay,
dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(),
getPeerForwarderDrainTimeout(dataPrepperConfiguration));

if (pipelineDefinedBuffer instanceof AbstractNonBlockingBuffer) {
((AbstractNonBlockingBuffer<?>) pipelineDefinedBuffer).setPipelineRunner(new PipelineRunner(pipeline));
}

pipelineMap.put(pipelineName, pipeline);
} catch (Exception ex) {
//If pipeline construction errors out, we will skip that pipeline and proceed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
@SuppressWarnings({"rawtypes", "unchecked"})
public class Pipeline {
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
private final boolean acknowledgementsEnabled;
private volatile AtomicBoolean stopRequested;

private final String name;
Expand Down Expand Up @@ -137,6 +138,7 @@ public Pipeline(
new PipelineThreadFactory(format("%s-sink-worker", name)), this);

stopRequested = new AtomicBoolean(false);
this.acknowledgementsEnabled = source.areAcknowledgementsEnabled() || buffer.areAcknowledgementsEnabled();
}

AcknowledgementSetManager getAcknowledgementSetManager() {
Expand Down Expand Up @@ -178,6 +180,10 @@ public boolean isStopRequested() {
return stopRequested.get();
}

public boolean isAcknowledgementsEnabled() {
return this.source.areAcknowledgementsEnabled() || this.buffer.areAcknowledgementsEnabled();
}

public Duration getPeerForwarderDrainTimeout() {
return peerForwarderDrainTimeout;
}
Expand Down Expand Up @@ -226,7 +232,7 @@ private synchronized void startSourceAndProcessors() {
}
}
).collect(Collectors.toList());
processorExecutorService.submit(new ProcessWorker(buffer, processors, this));
processorExecutorService.submit(new ProcessWorker(buffer, processors, this, new PipelineRunner(this)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.dataprepper.pipeline;

import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.pipeline.common.FutureHelperResult;
import org.opensearch.dataprepper.pipeline.exceptions.InvalidEventHandleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class PipelineRunner {
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
private final Pipeline pipeline;
private boolean isEmptyRecordsLogged = false;

public PipelineRunner(@Nonnull final Pipeline pipeline) {
this.pipeline = pipeline;
}

public void runProcessorsAndPublishToSinks(final List<Processor> processors) {
final boolean acknowledgementsEnabled = pipeline.isAcknowledgementsEnabled();
final Map.Entry<Collection, CheckpointState> readResult = pipeline.getBuffer().read(pipeline.getReadBatchTimeoutInMillis());
Collection records = readResult.getKey();
final CheckpointState checkpointState = readResult.getValue();
//TODO Hacky way to avoid logging continuously - Will be removed as part of metrics implementation
if (records.isEmpty()) {
if (!isEmptyRecordsLogged) {
LOG.debug(" {} Worker: No records received from buffer", pipeline.getName());
isEmptyRecordsLogged = true;
}
} else {
LOG.debug(" {} Worker: Processing {} records from buffer", pipeline.getName(), records.size());
}

//Should Empty list from buffer should be sent to the processors? For now sending as the Stateful processors expects it.
records = runProcessorsAndHandleAcknowledgements(processors, records, acknowledgementsEnabled);

postToSink(records);
// Checkpoint the current batch read from the buffer after being processed by processors and sinks.
pipeline.getBuffer().checkpoint(checkpointState);
}

public void runAllProcessorsAndPublishToSinks() {
List<Processor> processors = pipeline.getProcessorSets().stream().flatMap(Collection::stream).collect(Collectors.toList());
runProcessorsAndPublishToSinks(processors);
}

private Collection runProcessorsAndHandleAcknowledgements(List<Processor> processors, Collection records, final boolean acknowledgementsEnabled) {
for (final Processor processor : processors) {
List<Event> inputEvents = null;
if (acknowledgementsEnabled) {
inputEvents = ((List<Record<Event>>) records).stream().map(Record::getData).collect(Collectors.toList());
}

try {
records = processor.execute(records);
if (inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e);
if (inputEvents != null) {
processAcknowledgements(inputEvents, Collections.emptyList());
}

records = Collections.emptyList();
break;
}
}
return records;
}

private void processAcknowledgements(List<Event> inputEvents, Collection<Record<Event>> outputRecords) {
Set<Event> outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet());
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
inputEvents.forEach(event -> {
EventHandle eventHandle = event.getEventHandle();
if (eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle) eventHandle;
if (internalEventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) {
eventHandle.release(true);
}
} else if (eventHandle != null) {
throw new InvalidEventHandleException("Unexpected EventHandle");
}
});
}

/**
* TODO Add isolator pattern - Fail if one of the Sink fails [isolator Pattern]
* Uses the pipeline method to publish to sinks, waits for each of the sink result to be true before attempting to
* process more records from buffer.
*/
private boolean postToSink(final Collection<Record> records) {
LOG.debug("Pipeline Worker: Submitting {} processed records to sinks", records.size());
final List<Future<Void>> sinkFutures = pipeline.publishToSinks(records);
final FutureHelperResult<Void> futureResults = FutureHelper.awaitFuturesIndefinitely(sinkFutures);
return futureResults.getFailedReasons().isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,13 @@

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.pipeline.common.FutureHelperResult;
import org.opensearch.dataprepper.pipeline.exceptions.InvalidEventHandleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

@SuppressWarnings({"rawtypes", "unchecked"})
public class ProcessWorker implements Runnable {
Expand All @@ -36,21 +23,21 @@ public class ProcessWorker implements Runnable {
private final Buffer readBuffer;
private final List<Processor> processors;
private final Pipeline pipeline;
private boolean isEmptyRecordsLogged = false;
private final PipelineRunner pipelineRunner;
private PluginMetrics pluginMetrics;
private final Counter invalidEventHandlesCounter;
private boolean acknowledgementsEnabled;

public ProcessWorker(
final Buffer readBuffer,
final List<Processor> processors,
final Pipeline pipeline) {
final Pipeline pipeline,
final PipelineRunner pipelineRunner) {
this.readBuffer = readBuffer;
this.processors = processors;
this.pipeline = pipeline;
this.pluginMetrics = PluginMetrics.fromNames("ProcessWorker", pipeline.getName());
this.invalidEventHandlesCounter = pluginMetrics.counter(INVALID_EVENT_HANDLES);
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled() || readBuffer.areAcknowledgementsEnabled();
this.pipelineRunner = pipelineRunner;
}

@Override
Expand Down Expand Up @@ -93,80 +80,18 @@ public void run() {
}
}

private void processAcknowledgements(List<Event> inputEvents, Collection<Record<Event>> outputRecords) {
Set<Event> outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet());
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
inputEvents.forEach(event -> {
EventHandle eventHandle = event.getEventHandle();
if (eventHandle != null && eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
if (internalEventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) {
eventHandle.release(true);
}
} else if (eventHandle != null) {
invalidEventHandlesCounter.increment();
throw new RuntimeException("Unexpected EventHandle");
}
});
}

private void doRun() {
final Map.Entry<Collection, CheckpointState> readResult = readBuffer.read(pipeline.getReadBatchTimeoutInMillis());
Collection records = readResult.getKey();
final CheckpointState checkpointState = readResult.getValue();
//TODO Hacky way to avoid logging continuously - Will be removed as part of metrics implementation
if (records.isEmpty()) {
if(!isEmptyRecordsLogged) {
LOG.debug(" {} Worker: No records received from buffer", pipeline.getName());
isEmptyRecordsLogged = true;
}
} else {
LOG.debug(" {} Worker: Processing {} records from buffer", pipeline.getName(), records.size());
}
//Should Empty list from buffer should be sent to the processors? For now sending as the Stateful processors expects it.
for (final Processor processor : processors) {

List<Event> inputEvents = null;
if (acknowledgementsEnabled) {
inputEvents = ((List<Record<Event>>) records).stream().map(Record::getData).collect(Collectors.toList());
}

try {
records = processor.execute(records);
if (inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e);
if (inputEvents != null) {
processAcknowledgements(inputEvents, Collections.emptyList());
}

records = Collections.emptyList();
break;
}
try {
pipelineRunner.runProcessorsAndPublishToSinks(processors);
} catch (InvalidEventHandleException ex) {
invalidEventHandlesCounter.increment();
throw ex;
}

postToSink(records);
// Checkpoint the current batch read from the buffer after being processed by processors and sinks.
readBuffer.checkpoint(checkpointState);
}

private boolean areComponentsReadyForShutdown() {
return readBuffer.isEmpty() && processors.stream()
.map(Processor::isReadyForShutdown)
.allMatch(result -> result == true);
}

/**
* TODO Add isolator pattern - Fail if one of the Sink fails [isolator Pattern]
* Uses the pipeline method to publish to sinks, waits for each of the sink result to be true before attempting to
* process more records from buffer.
*/
private boolean postToSink(final Collection<Record> records) {
LOG.debug("Pipeline Worker: Submitting {} processed records to sinks", records.size());
final List<Future<Void>> sinkFutures = pipeline.publishToSinks(records);
final FutureHelperResult<Void> futureResults = FutureHelper.awaitFuturesIndefinitely(sinkFutures);
return futureResults.getFailedReasons().size() == 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.dataprepper.pipeline.buffer;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.pipeline.PipelineRunner;

public abstract class AbstractNonBlockingBuffer<T extends Record<?>> implements Buffer<T> {
private PipelineRunner pipelineRunner;

public PipelineRunner getPipelineRunner() {
return pipelineRunner;
}

public void setPipelineRunner(PipelineRunner pipelineRunner) {
this.pipelineRunner = pipelineRunner;
}
}
Loading
Loading