diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/PipelineIf.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/PipelineIf.java new file mode 100644 index 0000000000..8c90be7913 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/PipelineIf.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model; + +import org.opensearch.dataprepper.model.source.Source; + +public interface PipelineIf { + Source getSource(); +} + diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java new file mode 100644 index 0000000000..b3f0af9e58 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/FailurePipeline.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.failures; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Collection; +public interface FailurePipeline { + void sendFailedEvents(Collection> events); +} + + diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java index 26dd7e98a6..6b3df197db 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java @@ -9,6 +9,8 @@ import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.PipelineIf; +import org.opensearch.dataprepper.model.configuration.PipelineExtensions; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; @@ -64,16 +66,16 @@ public void initialize() { * @param records the records to write to the sink. */ @Override - public void output(Collection records) { + public void output(Collection records, final PipelineIf failurePipeline) { recordsInCounter.increment(records.size()*1.0); - timeElapsedTimer.record(() -> doOutput(records)); + timeElapsedTimer.record(() -> doOutput(records, failurePipeline)); } /** * This method should implement the output logic * @param records Records to be output */ - public abstract void doOutput(Collection records); + public abstract void doOutput(Collection records, final PipelineIf failurePipeline); @Override public void shutdown() { diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java index 178566ba5b..2c95e21d8b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import java.util.Collection; @@ -20,7 +21,11 @@ public interface Sink> { * * @param records the records to write to the sink. */ - void output(Collection records); + default void output(Collection records) { + output(records, null); + } + + void output(Collection records, PipelineIf failurePipeline); /** * Prepare sink for shutdown, by cleaning up resources and threads. diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java index 5f66a623aa..717c26a056 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.sink; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.junit.jupiter.api.Test; import java.util.Collection; @@ -28,7 +29,7 @@ public void initialize() { } @Override - public void output(Collection> records) { + public void output(Collection> records, PipelineIf failurePipeline) { } }; diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java index a98b1c56b8..6727711ea3 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; @@ -34,7 +35,7 @@ public InMemorySink(final InMemoryConfig inMemoryConfig, } @Override - public void output(final Collection> records) { + public void output(final Collection> records, final PipelineIf failurePipeline) { inMemorySinkAccessor.addEvents(testingKey, records); boolean result = inMemorySinkAccessor.getResult(); records.stream().forEach((record) -> { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/DataPrepper.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/DataPrepper.java index c6a04bca29..a1c8359fff 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/DataPrepper.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/DataPrepper.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.DataPrepperShutdownListener; import org.opensearch.dataprepper.DataPrepperShutdownOptions; import org.opensearch.dataprepper.core.parser.PipelineTransformer; +import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.core.peerforwarder.server.PeerForwarderServer; import org.opensearch.dataprepper.core.pipeline.Pipeline; import org.opensearch.dataprepper.core.pipeline.PipelineObserver; diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java index 77cf649535..c02b4c4c64 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator; import org.opensearch.dataprepper.core.pipeline.Pipeline; +import org.opensearch.dataprepper.core.pipeline.FailurePipelineSource; import org.opensearch.dataprepper.core.pipeline.PipelineConnector; import org.opensearch.dataprepper.core.pipeline.router.Router; import org.opensearch.dataprepper.core.pipeline.router.RouterFactory; @@ -59,6 +60,7 @@ public class PipelineTransformer { "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax."; private static final String PIPELINE_TYPE = "pipeline"; private static final String ATTRIBUTE_NAME = "name"; + //private static final String FAILURE_PIPELINE_NAME = "dlq"; private final PipelinesDataFlowModel pipelinesDataFlowModel; private final RouterFactory routerFactory; private final DataPrepperConfiguration dataPrepperConfiguration; @@ -128,24 +130,30 @@ private void buildPipelineFromConfiguration( final Map pipelineMap) { final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipelineName); LOG.info("Building pipeline [{}] from provided configuration", pipelineName); + final String failurePipelineName = dataPrepperConfiguration.getFailurePipelineName(); try { + Source source; final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); - final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, - pipelineMap, pipelineConfigurationMap); - final Source source = pipelineSource.orElseGet(() -> { - try { - return pluginFactory.loadPlugin(Source.class, sourceSetting); - } catch (Exception e) { - final PluginError pluginError = PluginError.builder() - .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) - .pipelineName(pipelineName) - .pluginName(sourceSetting.getName()) - .exception(e) - .build(); - pluginErrorCollector.collectPluginError(pluginError); - return null; - } - }); + if (!pipelineName.equals(failurePipelineName)) { + final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, + pipelineMap, pipelineConfigurationMap); + source = pipelineSource.orElseGet(() -> { + try { + return pluginFactory.loadPlugin(Source.class, sourceSetting); + } catch (Exception e) { + final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) + .pipelineName(pipelineName) + .pluginName(sourceSetting.getName()) + .exception(e) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + return null; + } + }); + } else { + source = new FailurePipelineSource(); + } LOG.info("Building buffer for the pipeline [{}]", pipelineName); Buffer pipelineDefinedBuffer = null; @@ -234,7 +242,14 @@ private void buildPipelineFromConfiguration( "pipelines", pipelineName, ex); processRemoveIfRequired(pipelineName, pipelineConfigurationMap, pipelineMap); } - + final Pipeline failurePipeline = pipelineMap.get(failurePipelineName); + if (failurePipeline != null) { + for (Map.Entry pipelineEntry : pipelineMap.entrySet()) { + if (!(pipelineEntry.getKey().equals(failurePipelineName))) { + pipelineEntry.getValue().setFailurePipeline(failurePipeline); + } + } + } } private List> newProcessor(final PluginSetting pluginSetting) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java index 9212a9943b..e7b72cf22b 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java @@ -34,6 +34,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer { static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L); + static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq"; private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory"; static final int MAX_TAGS_NUMBER = 3; @@ -213,6 +214,8 @@ public void setMetricTagFilters(final List metricTagFilters) { } } + public String getFailurePipelineName() { return DEFAULT_FAILURE_PIPELINE_NAME; } + public Duration getProcessorShutdownTimeout() { return processorShutdownTimeout; } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java new file mode 100644 index 0000000000..548e4da0cb --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/FailurePipelineSource.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.core.pipeline; + +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.failures.FailurePipeline; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; + +public class FailurePipelineSource implements Source>, FailurePipeline { + private static final Logger LOG = LoggerFactory.getLogger(FailurePipelineSource.class); + private static final int DEFAULT_WRITE_TIMEOUT = Integer.MAX_VALUE; + private Buffer buffer; + private AtomicBoolean isStopRequested; + + public FailurePipelineSource() { + isStopRequested = new AtomicBoolean(false); + } + + @Override + public void start(Buffer buffer) { + this.buffer = buffer; + } + + @Override + public void stop() { + isStopRequested.set(true); + } + + @Override + public void sendFailedEvents(Collection> records) { + try { + buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT); + } catch (Exception e) { + LOG.error("Failed to write to failure pipeline"); + } + } + +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java index b5d3f812cf..b53f6ffd43 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.core.pipeline.router.RouterGetRecordStrategy; import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.processor.Processor; @@ -52,7 +53,7 @@ * {@link Processor} and outputs the transformed (or original) data to {@link Sink}. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class Pipeline { +public class Pipeline implements PipelineIf { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis(); private final PipelineShutdown pipelineShutdown; @@ -73,6 +74,7 @@ public class Pipeline { private final ExecutorService processorExecutorService; private final ExecutorService sinkExecutorService; private final EventFactory eventFactory; + private Pipeline failurePipeline; private final AcknowledgementSetManager acknowledgementSetManager; private final List observers = Collections.synchronizedList(new LinkedList<>()); @@ -119,6 +121,7 @@ public Pipeline( this.name = name; this.source = source; this.buffer = buffer; + this.failurePipeline = null; this.processorSets = processorSets; this.sinks = sinks; this.router = router; @@ -165,6 +168,14 @@ public Buffer getBuffer() { return this.buffer; } + public void setFailurePipeline(Pipeline failurePipeline) { + this.failurePipeline = failurePipeline; + } + + public Pipeline getFailurePipeline() { + return failurePipeline; + } + /** * @return {@link Sink} of this pipeline. */ @@ -360,7 +371,7 @@ List> publishToSinks(final Collection records) { router.route(records, sinks, getRecordStrategy, (sink, events) -> sinkFutures.add(sinkExecutorService.submit(() -> { sink.updateLatencyMetrics(events); - sink.output(events); + sink.output(events, failurePipeline); }, null)) ); return sinkFutures; diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java index 08bcfab642..5981c6dfd3 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/PipelineConnector.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.PipelineIf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,7 @@ public boolean isReady() { } @Override - public void output(final Collection records) { + public void output(final Collection records, final PipelineIf failurePipeline) { if (buffer != null && !isStopRequested.get()) { for (T record : records) { while (true) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java index e313430b49..c9cedf29ef 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java @@ -145,6 +145,9 @@ private void doRun() { if (inputEvents != null) { processAcknowledgements(inputEvents, Collections.emptyList()); } + if (pipeline.getFailurePipeline() != null) { + ((FailurePipelineSource)(pipeline.getFailurePipeline().getSource())).sendFailedEvents(records); + } records = Collections.emptyList(); break; diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java index 1e2742a0ba..b215432f34 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugins/test/TestSink.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.sink.Sink; import java.time.Duration; @@ -44,7 +45,7 @@ public TestSink(boolean failSinkForTest) { } @Override - public void output(Collection> records) { + public void output(Collection> records, PipelineIf failurePipeline) { if(failSinkForTest) { throw new RuntimeException("Sink is expected to fail"); } diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java index d981cf67ca..8da37d892c 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.sink.Sink; @@ -145,7 +146,7 @@ private void doInitializeInternal() { * @param records Records to be output */ @Override - public void doOutput(final Collection> records) { + public void doOutput(final Collection> records, final PipelineIf failurePipeline) { if (records.isEmpty()) { return; } diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java index ae9f52bbf9..98c12aae8d 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; @@ -64,7 +65,7 @@ public FileSink(final FileSinkConfig fileSinkConfig, final SinkContext sinkConte } @Override - public void output(final Collection> records) { + public void output(final Collection> records, final PipelineIf failurePipeline) { lock.lock(); try { if (isStopRequested) diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/NoopSink.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/NoopSink.java index 0ce0194b31..3e7c638d9a 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/NoopSink.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/NoopSink.java @@ -1,6 +1,7 @@ package org.opensearch.dataprepper.plugins.sink; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; @@ -14,7 +15,7 @@ public class NoopSink implements Sink> { private static final Logger LOG = LoggerFactory.getLogger(NoopSink.class); @Override - public void output(Collection> records) { + public void output(Collection> records, final PipelineIf failurePipeline) { LOG.info("Releasing events for NOOP sink"); for (Record record : records) { Event event = (Event)record.getData(); diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java index 0396af3176..1590a627ae 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; @@ -43,7 +44,7 @@ public StdOutSink() { this.tagsTargetKey = null; } @Override - public void output(final Collection> records) { + public void output(final Collection> records, final PipelineIf failurePipeline) { for (final Record record : records) { checkTypeAndPrintObject(record.getData()); } diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java index c392bf4ae0..69e7dfb75d 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; @@ -162,7 +163,7 @@ public HttpSinkService(final HttpSinkConfiguration httpSinkConfiguration, * This method process buffer records and send to Http End points based on configured codec * @param records Collection of Event */ - public void output(Collection> records) { + public void output(Collection> records, final PipelineIf failurePipeline) { reentrantLock.lock(); if (currentBuffer == null) { this.currentBuffer = bufferFactory.getBuffer(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java index 97c93d22de..c3e68fd34c 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; @@ -115,7 +116,7 @@ private void doInitializeInternal() { } @Override - public void doOutput(Collection> records) { + public void doOutput(Collection> records, final PipelineIf failurePipeline) { reentrantLock.lock(); if (records.isEmpty()) { return; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 2248ba669a..cc5a43041c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -26,6 +26,7 @@ import org.opensearch.dataprepper.expression.ExpressionEvaluationException; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginModel; @@ -380,7 +381,7 @@ private BulkOperation getBulkOperationForAction(final String action, } @Override - public void doOutput(final Collection> records) { + public void doOutput(final Collection> records, final PipelineIf failurePipelineObj) { final long threadId = Thread.currentThread().getId(); if (!bulkRequestMap.containsKey(threadId)) { bulkRequestMap.put(threadId, bulkRequestSupplier.get()); diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java index a93e58875c..e667f5d161 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; @@ -74,7 +75,7 @@ public void doInitialize() { * @param records Records to be output */ @Override - public void doOutput(final Collection> records) { - personalizeSinkService.output(records); + public void doOutput(final Collection> records, final PipelineIf failurePipeline) { + personalizeSinkService.output(records, failurePipeline); } } \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java index 80ea94bcf1..96279ca7e7 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java @@ -8,6 +8,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.plugins.sink.personalize.configuration.PersonalizeSinkConfiguration; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -61,8 +62,8 @@ public PersonalizeSinkService(final PersonalizeSinkConfiguration personalizeSink /** * @param records received records and add into buffer. */ - void output(Collection> records) { + void output(Collection> records, final PipelineIf failurePipeline) { LOG.trace("{} records received", records.size()); return; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java b/data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java index 10251041a3..09faac0a14 100644 --- a/data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java +++ b/data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java @@ -20,6 +20,7 @@ import org.apache.hc.core5.util.Timeout; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.metric.JacksonExponentialHistogram; @@ -149,7 +150,7 @@ public PrometheusSinkService(final PrometheusSinkConfiguration prometheusSinkCon * This method process buffer records and send to Http End points based on configured codec * @param records Collection of Event */ - public void output(final Collection> records) { + public void output(final Collection> records, final PipelineIf failurePipeline) { reentrantLock.lock(); try { records.forEach(record -> { @@ -451,4 +452,4 @@ private static void setMetricName(final String metricName, final List> records) { - s3SinkService.output(records); + public void doOutput(final Collection> records, final PipelineIf failurePipeline) { + s3SinkService.output(records, failurePipeline); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 571a952f01..53d263e0f7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; @@ -100,7 +101,7 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, /** * @param records received records and add into buffer. */ - void output(Collection> records) { + void output(Collection> records, final PipelineIf failurePipeline) { // Don't acquire the lock if there's no work to be done if (records.isEmpty() && s3GroupManager.hasNoGroups()) { return; diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java index da29d71e17..d7ebb40568 100644 --- a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.PipelineIf; import org.opensearch.dataprepper.plugins.sink.sns.dlq.DlqPushHandler; import org.opensearch.dataprepper.plugins.sink.sns.dlq.SnsSinkFailedDlqData; import org.slf4j.Logger; @@ -115,7 +116,7 @@ public SnsSinkService(final SnsSinkConfig snsSinkConfig, /** * @param records received records and add into buffer. */ - void output(Collection> records) { + void output(Collection> records, final PipelineIf failurePipeline) { reentrantLock.lock(); try { for (Record record : records) {