Skip to content

Commit

Permalink
Initial implementation of Pipeline DLQ
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Dec 20, 2024
1 parent 76b06ef commit fd78dc1
Show file tree
Hide file tree
Showing 27 changed files with 178 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model;

import org.opensearch.dataprepper.model.source.Source;

public interface PipelineIf {
Source getSource();
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.failures;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Collection;
public interface FailurePipeline {
void sendFailedEvents(Collection<Record<Event>> events);
}


Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.PipelineIf;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -64,16 +66,16 @@ public void initialize() {
* @param records the records to write to the sink.
*/
@Override
public void output(Collection<T> records) {
public void output(Collection<T> records, final PipelineIf failurePipeline) {
recordsInCounter.increment(records.size()*1.0);
timeElapsedTimer.record(() -> doOutput(records));
timeElapsedTimer.record(() -> doOutput(records, failurePipeline));
}

/**
* This method should implement the output logic
* @param records Records to be output
*/
public abstract void doOutput(Collection<T> records);
public abstract void doOutput(Collection<T> records, final PipelineIf failurePipeline);

@Override
public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.PipelineIf;

import java.util.Collection;

Expand All @@ -20,7 +21,11 @@ public interface Sink<T extends Record<?>> {
*
* @param records the records to write to the sink.
*/
void output(Collection<T> records);
default void output(Collection<T> records) {
output(records, null);
}

void output(Collection<T> records, PipelineIf failurePipeline);

/**
* Prepare sink for shutdown, by cleaning up resources and threads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.PipelineIf;
import org.junit.jupiter.api.Test;

import java.util.Collection;
Expand All @@ -28,7 +29,7 @@ public void initialize() {
}

@Override
public void output(Collection<Record<?>> records) {
public void output(Collection<Record<?>> records, PipelineIf failurePipeline) {
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.PipelineIf;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -34,7 +35,7 @@ public InMemorySink(final InMemoryConfig inMemoryConfig,
}

@Override
public void output(final Collection<Record<Event>> records) {
public void output(final Collection<Record<Event>> records, final PipelineIf failurePipeline) {
inMemorySinkAccessor.addEvents(testingKey, records);
boolean result = inMemorySinkAccessor.getResult();
records.stream().forEach((record) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.DataPrepperShutdownListener;
import org.opensearch.dataprepper.DataPrepperShutdownOptions;
import org.opensearch.dataprepper.core.parser.PipelineTransformer;
import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.core.peerforwarder.server.PeerForwarderServer;
import org.opensearch.dataprepper.core.pipeline.Pipeline;
import org.opensearch.dataprepper.core.pipeline.PipelineObserver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator;
import org.opensearch.dataprepper.core.pipeline.Pipeline;
import org.opensearch.dataprepper.core.pipeline.FailurePipelineSource;
import org.opensearch.dataprepper.core.pipeline.PipelineConnector;
import org.opensearch.dataprepper.core.pipeline.router.Router;
import org.opensearch.dataprepper.core.pipeline.router.RouterFactory;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class PipelineTransformer {
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.";
private static final String PIPELINE_TYPE = "pipeline";
private static final String ATTRIBUTE_NAME = "name";
//private static final String FAILURE_PIPELINE_NAME = "dlq";
private final PipelinesDataFlowModel pipelinesDataFlowModel;
private final RouterFactory routerFactory;
private final DataPrepperConfiguration dataPrepperConfiguration;
Expand Down Expand Up @@ -128,24 +130,30 @@ private void buildPipelineFromConfiguration(
final Map<String, Pipeline> pipelineMap) {
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipelineName);
LOG.info("Building pipeline [{}] from provided configuration", pipelineName);
final String failurePipelineName = dataPrepperConfiguration.getFailurePipelineName();
try {
Source source;
final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting();
final Optional<Source> pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting,
pipelineMap, pipelineConfigurationMap);
final Source source = pipelineSource.orElseGet(() -> {
try {
return pluginFactory.loadPlugin(Source.class, sourceSetting);
} catch (Exception e) {
final PluginError pluginError = PluginError.builder()
.componentType(PipelineModel.SOURCE_PLUGIN_TYPE)
.pipelineName(pipelineName)
.pluginName(sourceSetting.getName())
.exception(e)
.build();
pluginErrorCollector.collectPluginError(pluginError);
return null;
}
});
if (!pipelineName.equals(failurePipelineName)) {
final Optional<Source> pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting,
pipelineMap, pipelineConfigurationMap);
source = pipelineSource.orElseGet(() -> {
try {
return pluginFactory.loadPlugin(Source.class, sourceSetting);
} catch (Exception e) {
final PluginError pluginError = PluginError.builder()
.componentType(PipelineModel.SOURCE_PLUGIN_TYPE)
.pipelineName(pipelineName)
.pluginName(sourceSetting.getName())
.exception(e)
.build();
pluginErrorCollector.collectPluginError(pluginError);
return null;
}
});
} else {
source = new FailurePipelineSource();
}

LOG.info("Building buffer for the pipeline [{}]", pipelineName);
Buffer pipelineDefinedBuffer = null;
Expand Down Expand Up @@ -234,7 +242,14 @@ private void buildPipelineFromConfiguration(
"pipelines", pipelineName, ex);
processRemoveIfRequired(pipelineName, pipelineConfigurationMap, pipelineMap);
}

final Pipeline failurePipeline = pipelineMap.get(failurePipelineName);
if (failurePipeline != null) {
for (Map.Entry<String, Pipeline> pipelineEntry : pipelineMap.entrySet()) {
if (!(pipelineEntry.getKey().equals(failurePipelineName))) {
pipelineEntry.getValue().setFailurePipeline(failurePipeline);
}
}
}
}

private List<IdentifiedComponent<Processor>> newProcessor(final PluginSetting pluginSetting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer {
static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L);

static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq";
private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory";

static final int MAX_TAGS_NUMBER = 3;
Expand Down Expand Up @@ -213,6 +214,8 @@ public void setMetricTagFilters(final List<MetricTagFilter> metricTagFilters) {
}
}

public String getFailurePipelineName() { return DEFAULT_FAILURE_PIPELINE_NAME; }

public Duration getProcessorShutdownTimeout() {
return processorShutdownTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.pipeline;

import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.failures.FailurePipeline;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;

public class FailurePipelineSource implements Source<Record<Event>>, FailurePipeline {
private static final Logger LOG = LoggerFactory.getLogger(FailurePipelineSource.class);
private static final int DEFAULT_WRITE_TIMEOUT = Integer.MAX_VALUE;
private Buffer buffer;
private AtomicBoolean isStopRequested;

public FailurePipelineSource() {
isStopRequested = new AtomicBoolean(false);
}

@Override
public void start(Buffer buffer) {
this.buffer = buffer;
}

@Override
public void stop() {
isStopRequested.set(true);
}

@Override
public void sendFailedEvents(Collection<Record<Event>> records) {
try {
buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT);
} catch (Exception e) {
LOG.error("Failed to write to failure pipeline");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.core.pipeline.router.RouterGetRecordStrategy;
import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.PipelineIf;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.processor.Processor;
Expand Down Expand Up @@ -52,7 +53,7 @@
* {@link Processor} and outputs the transformed (or original) data to {@link Sink}.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class Pipeline {
public class Pipeline implements PipelineIf {
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis();
private final PipelineShutdown pipelineShutdown;
Expand All @@ -73,6 +74,7 @@ public class Pipeline {
private final ExecutorService processorExecutorService;
private final ExecutorService sinkExecutorService;
private final EventFactory eventFactory;
private Pipeline failurePipeline;
private final AcknowledgementSetManager acknowledgementSetManager;
private final List<PipelineObserver> observers = Collections.synchronizedList(new LinkedList<>());

Expand Down Expand Up @@ -119,6 +121,7 @@ public Pipeline(
this.name = name;
this.source = source;
this.buffer = buffer;
this.failurePipeline = null;
this.processorSets = processorSets;
this.sinks = sinks;
this.router = router;
Expand Down Expand Up @@ -165,6 +168,14 @@ public Buffer getBuffer() {
return this.buffer;
}

public void setFailurePipeline(Pipeline failurePipeline) {
this.failurePipeline = failurePipeline;
}

public Pipeline getFailurePipeline() {
return failurePipeline;
}

/**
* @return {@link Sink} of this pipeline.
*/
Expand Down Expand Up @@ -360,7 +371,7 @@ List<Future<Void>> publishToSinks(final Collection<Record> records) {
router.route(records, sinks, getRecordStrategy, (sink, events) ->
sinkFutures.add(sinkExecutorService.submit(() -> {
sink.updateLatencyMetrics(events);
sink.output(events);
sink.output(events, failurePipeline);
}, null))
);
return sinkFutures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.PipelineIf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,7 +74,7 @@ public boolean isReady() {
}

@Override
public void output(final Collection<T> records) {
public void output(final Collection<T> records, final PipelineIf failurePipeline) {
if (buffer != null && !isStopRequested.get()) {
for (T record : records) {
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ private void doRun() {
if (inputEvents != null) {
processAcknowledgements(inputEvents, Collections.emptyList());
}
if (pipeline.getFailurePipeline() != null) {
((FailurePipelineSource)(pipeline.getFailurePipeline().getSource())).sendFailedEvents(records);
}

records = Collections.emptyList();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.PipelineIf;
import org.opensearch.dataprepper.model.sink.Sink;

import java.time.Duration;
Expand Down Expand Up @@ -44,7 +45,7 @@ public TestSink(boolean failSinkForTest) {
}

@Override
public void output(Collection<Record<String>> records) {
public void output(Collection<Record<String>> records, PipelineIf failurePipeline) {
if(failSinkForTest) {
throw new RuntimeException("Sink is expected to fail");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.PipelineIf;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.sink.Sink;
Expand Down Expand Up @@ -145,7 +146,7 @@ private void doInitializeInternal() {
* @param records Records to be output
*/
@Override
public void doOutput(final Collection<Record<Event>> records) {
public void doOutput(final Collection<Record<Event>> records, final PipelineIf failurePipeline) {
if (records.isEmpty()) {
return;
}
Expand Down
Loading

0 comments on commit fd78dc1

Please sign in to comment.