diff --git a/data-prepper-plugins/decompress-processor/build.gradle b/data-prepper-plugins/decompress-processor/build.gradle new file mode 100644 index 0000000000..9d67cffc3b --- /dev/null +++ b/data-prepper-plugins/decompress-processor/build.gradle @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +dependencies { + implementation 'commons-io:commons-io:2.15.1' + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:common') + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'io.micrometer:micrometer-core' + testImplementation testLibs.mockito.inline +} \ No newline at end of file diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java new file mode 100644 index 0000000000..61e7b7e812 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java @@ -0,0 +1,114 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import com.google.common.base.Charsets; +import io.micrometer.core.instrument.Counter; +import org.apache.commons.io.IOUtils; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Collection; + +@DataPrepperPlugin(name = "decompress", pluginType = Processor.class, pluginConfigurationType = DecompressProcessorConfig.class) +public class DecompressProcessor extends AbstractProcessor, Record> { + + private static final Logger LOG = LoggerFactory.getLogger(DecompressProcessor.class); + static final String DECOMPRESSION_PROCESSING_ERRORS = "processingErrors"; + + private final DecompressProcessorConfig decompressProcessorConfig; + private final ExpressionEvaluator expressionEvaluator; + + private final Counter decompressionProcessingErrors; + + @DataPrepperPluginConstructor + public DecompressProcessor(final PluginMetrics pluginMetrics, + final DecompressProcessorConfig decompressProcessorConfig, + final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.decompressProcessorConfig = decompressProcessorConfig; + this.expressionEvaluator = expressionEvaluator; + this.decompressionProcessingErrors = pluginMetrics.counter(DECOMPRESSION_PROCESSING_ERRORS); + + if (decompressProcessorConfig.getDecompressWhen() != null + && !expressionEvaluator.isValidExpressionStatement(decompressProcessorConfig.getDecompressWhen())) { + throw new InvalidPluginConfigurationException( + String.format("decompress_when value of %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", decompressProcessorConfig.getDecompressWhen())); + } + } + + @Override + public Collection> doExecute(final Collection> records) { + for (final Record record : records) { + + try { + if (decompressProcessorConfig.getDecompressWhen() != null && !expressionEvaluator.evaluateConditional(decompressProcessorConfig.getDecompressWhen(), record.getData())) { + continue; + } + + for (final String key : decompressProcessorConfig.getKeys()) { + + final String compressedValue = record.getData().get(key, String.class); + + if (compressedValue == null) { + continue; + } + + final byte[] compressedValueAsBytes = decompressProcessorConfig.getEncodingType().getDecoderEngine().decode(compressedValue); + + try (final InputStream inputStream = decompressProcessorConfig.getDecompressionType().getDecompressionEngine().createInputStream(new ByteArrayInputStream(compressedValueAsBytes));){ + + final String decompressedString = IOUtils.toString(inputStream, Charsets.UTF_8); + record.getData().put(key, decompressedString); + } catch (final Exception e) { + LOG.error("Unable to decompress key {} using decompression type {}:", + key, decompressProcessorConfig.getDecompressionType(), e); + record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure()); + decompressionProcessingErrors.increment(); + } + } + } catch (final DecodingException e) { + LOG.error("Unable to decode key with base64: {}", e.getMessage()); + record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure()); + decompressionProcessingErrors.increment(); + } catch (final Exception e) { + LOG.error("An uncaught exception occurred while decompressing Events", e); + record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure()); + decompressionProcessingErrors.increment(); + } + } + + return records; + } + + @Override + public void prepareForShutdown() { + + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java new file mode 100644 index 0000000000..ce2d985277 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngineFactory; + +import java.util.List; + +public class DecompressProcessorConfig { + + @JsonProperty("keys") + @NotEmpty + private List keys; + + @JsonProperty("type") + @NotNull + private DecompressionType decompressionType; + + @JsonProperty("decompress_when") + private String decompressWhen; + + @JsonProperty("tags_on_failure") + private List tagsOnFailure = List.of("_decompression_failure"); + + @JsonIgnore + private final EncodingType encodingType = EncodingType.BASE64; + + public List getKeys() { + return keys; + } + + public DecompressionEngineFactory getDecompressionType() { + return decompressionType; + } + + public DecoderEngineFactory getEncodingType() { return encodingType; } + + public String getDecompressWhen() { + return decompressWhen; + } + + public List getTagsOnFailure() { + return tagsOnFailure; + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionEngineFactory.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionEngineFactory.java new file mode 100644 index 0000000000..581b3a1970 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionEngineFactory.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import org.opensearch.dataprepper.model.codec.DecompressionEngine; + +public interface DecompressionEngineFactory { + public DecompressionEngine getDecompressionEngine(); +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java new file mode 100644 index 0000000000..88f64a52e1 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; +import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum DecompressionType implements DecompressionEngineFactory { + GZIP("gzip"); + + private final String option; + + private static final Map OPTIONS_MAP = Arrays.stream(DecompressionType.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private static final Map DECOMPRESSION_ENGINE_MAP = Map.of( + "gzip", new GZipDecompressionEngine() + ); + + DecompressionType(final String option) { + this.option = option; + } + + @JsonCreator + static DecompressionType fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } + + @Override + public DecompressionEngine getDecompressionEngine() { + return DECOMPRESSION_ENGINE_MAP.get(this.option); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngine.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngine.java new file mode 100644 index 0000000000..a6d59d84ed --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngine.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; + +import java.util.Base64; + +public class Base64DecoderEngine implements DecoderEngine { + @Override + public byte[] decode(final String encodedValue) { + try { + return Base64.getDecoder().decode(encodedValue); + } catch (final Exception e) { + throw new DecodingException(String.format("There was an error decoding with the base64 encoding type: %s", e.getMessage())); + } + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngine.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngine.java new file mode 100644 index 0000000000..ef443c273f --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngine.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; + +public interface DecoderEngine { + byte[] decode(final String encodedValue) throws DecodingException; +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngineFactory.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngineFactory.java new file mode 100644 index 0000000000..89d8a6334c --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngineFactory.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +public interface DecoderEngineFactory { + DecoderEngine getDecoderEngine(); +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java new file mode 100644 index 0000000000..1e88412682 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum EncodingType implements DecoderEngineFactory { + BASE64("base64"); + + private final String option; + + private static final Map OPTIONS_MAP = Arrays.stream(EncodingType.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private static final Map DECODER_ENGINE_MAP = Map.of( + "base64", new Base64DecoderEngine() + ); + + EncodingType(final String option) { + this.option = option; + } + + @JsonCreator + static EncodingType fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } + + @Override + public DecoderEngine getDecoderEngine() { + return DECODER_ENGINE_MAP.get(this.option); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/exceptions/DecodingException.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/exceptions/DecodingException.java new file mode 100644 index 0000000000..f14e67d92c --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/exceptions/DecodingException.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.exceptions; + +public class DecodingException extends RuntimeException { + public DecodingException(final String message) { + super(message); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorIT.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorIT.java new file mode 100644 index 0000000000..f0b6a7da42 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorIT.java @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.processor.decompress.DecompressProcessorTest.buildRecordWithEvent; + +@ExtendWith(MockitoExtension.class) +public class DecompressProcessorIT { + + private List keys; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @Mock + private DecompressProcessorConfig decompressProcessorConfig; + + private DecompressProcessor createObjectUnderTest() { + return new DecompressProcessor(pluginMetrics, decompressProcessorConfig, expressionEvaluator); + } + + @BeforeEach + void setup() { + keys = List.of(UUID.randomUUID().toString()); + when(decompressProcessorConfig.getKeys()).thenReturn(keys); + } + + @ParameterizedTest + @CsvSource({"H4sIAAAAAAAAAPNIzcnJVyjPL8pJAQBSntaLCwAAAA==,Hello world", + "H4sIAAAAAAAAAwvJyCxWAKJEhYKcxMy8ktSKEoXikqLMvHQAkJ3GfRoAAAA=,This is a plaintext string"}) + void base64_encoded_gzip_is_decompressed_successfully(final String compressedValue, final String expectedDecompressedValue) { + when(decompressProcessorConfig.getEncodingType()).thenReturn(EncodingType.BASE64); + when(decompressProcessorConfig.getDecompressionType()).thenReturn(DecompressionType.GZIP); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> records = List.of(buildRecordWithEvent(Map.of(keys.get(0), compressedValue))); + + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(keys.get(0), String.class), equalTo(expectedDecompressedValue)); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java new file mode 100644 index 0000000000..8ad5cfc657 --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java @@ -0,0 +1,223 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngine; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngineFactory; +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.processor.decompress.DecompressProcessor.DECOMPRESSION_PROCESSING_ERRORS; + +@ExtendWith(MockitoExtension.class) +public class DecompressProcessorTest { + + private String key; + + @Mock + private DecompressionEngine decompressionEngine; + + @Mock + private DecoderEngine decoderEngine; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter decompressionProcessingErrors; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @Mock + private DecompressProcessorConfig decompressProcessorConfig; + + @Mock + private DecompressionEngineFactory decompressionType; + + @Mock + private DecoderEngineFactory encodingType; + + private DecompressProcessor createObjectUnderTest() { + return new DecompressProcessor(pluginMetrics, decompressProcessorConfig, expressionEvaluator); + } + + @BeforeEach + void setup() { + key = UUID.randomUUID().toString(); + + when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter(MetricNames.RECORDS_OUT)).thenReturn(mock(Counter.class)); + when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(mock(Timer.class)); + when(pluginMetrics.counter(DECOMPRESSION_PROCESSING_ERRORS)).thenReturn(decompressionProcessingErrors); + } + + @Test + void decompression_returns_expected_output() throws IOException { + final String compressedValue = UUID.randomUUID().toString(); + final String expectedResult = UUID.randomUUID().toString(); + final byte[] decodedValue = expectedResult.getBytes(); + + when(decompressProcessorConfig.getKeys()).thenReturn(List.of(key)); + when(encodingType.getDecoderEngine()).thenReturn(decoderEngine); + when(decompressProcessorConfig.getEncodingType()).thenReturn(encodingType); + when(decompressProcessorConfig.getDecompressionType()).thenReturn(decompressionType); + when(decompressionType.getDecompressionEngine()).thenReturn(decompressionEngine); + when(decoderEngine.decode(compressedValue)).thenReturn(decodedValue); + when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(new ByteArrayInputStream(decodedValue)); + + final List> records = List.of(buildRecordWithEvent(Map.of(key, compressedValue))); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(key, String.class), equalTo(expectedResult)); + } + + @Test + void decompression_with_decoding_error_adds_tags_and_increments_error_metric() { + final String compressedValue = UUID.randomUUID().toString(); + final String tagForFailure = UUID.randomUUID().toString(); + + when(decompressProcessorConfig.getKeys()).thenReturn(List.of(key)); + when(encodingType.getDecoderEngine()).thenReturn(decoderEngine); + when(decompressProcessorConfig.getEncodingType()).thenReturn(encodingType); + when(decompressProcessorConfig.getTagsOnFailure()).thenReturn(List.of(tagForFailure)); + when(decoderEngine.decode(compressedValue)).thenThrow(DecodingException.class); + + final List> records = List.of(buildRecordWithEvent(Map.of(key, compressedValue))); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(key, String.class), equalTo(compressedValue)); + assertThat(result.get(0).getData().getMetadata().getTags(), notNullValue()); + assertThat(result.get(0).getData().getMetadata().getTags().size(), equalTo(1)); + assertThat(result.get(0).getData().getMetadata().getTags().contains(tagForFailure), equalTo(true)); + + verifyNoInteractions(decompressionEngine); + verify(decompressionProcessingErrors).increment(); + } + + @Test + void exception_from_DecompressionEngine_adds_tags_and_increments_error_metric() throws IOException { + final String compressedValue = UUID.randomUUID().toString(); + final String expectedResult = UUID.randomUUID().toString(); + final byte[] decodedValue = expectedResult.getBytes(); + final String tagForFailure = UUID.randomUUID().toString(); + + when(decompressProcessorConfig.getKeys()).thenReturn(List.of(key)); + when(encodingType.getDecoderEngine()).thenReturn(decoderEngine); + when(decompressProcessorConfig.getEncodingType()).thenReturn(encodingType); + when(decompressProcessorConfig.getTagsOnFailure()).thenReturn(List.of(tagForFailure)); + when(decompressProcessorConfig.getDecompressionType()).thenReturn(decompressionType); + when(decompressionType.getDecompressionEngine()).thenReturn(decompressionEngine); + when(decoderEngine.decode(compressedValue)).thenReturn(decodedValue); + when(decompressionEngine.createInputStream(any(InputStream.class))).thenThrow(RuntimeException.class); + + final List> records = List.of(buildRecordWithEvent(Map.of(key, compressedValue))); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(key, String.class), equalTo(compressedValue)); + assertThat(result.get(0).getData().getMetadata().getTags(), notNullValue()); + assertThat(result.get(0).getData().getMetadata().getTags().size(), equalTo(1)); + assertThat(result.get(0).getData().getMetadata().getTags().contains(tagForFailure), equalTo(true)); + + verify(decompressionProcessingErrors).increment(); + } + + @Test + void exception_from_expression_evaluator_adds_tags_and_increments_error_metric() { + final String decompressWhen = UUID.randomUUID().toString(); + final String compressedValue = UUID.randomUUID().toString(); + final String tagForFailure = UUID.randomUUID().toString(); + + when(decompressProcessorConfig.getTagsOnFailure()).thenReturn(List.of(tagForFailure)); + when(decompressProcessorConfig.getDecompressWhen()).thenReturn(decompressWhen); + when(expressionEvaluator.isValidExpressionStatement(decompressWhen)).thenReturn(true); + when(expressionEvaluator.evaluateConditional(eq(decompressWhen), any(Event.class))) + .thenThrow(RuntimeException.class); + + final List> records = List.of(buildRecordWithEvent(Map.of(key, compressedValue))); + + final DecompressProcessor objectUnderTest = createObjectUnderTest(); + final List> result = (List>) objectUnderTest.doExecute(records); + + assertThat(result, notNullValue()); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0), notNullValue()); + assertThat(result.get(0).getData(), notNullValue()); + assertThat(result.get(0).getData().get(key, String.class), equalTo(compressedValue)); + assertThat(result.get(0).getData().getMetadata().getTags(), notNullValue()); + assertThat(result.get(0).getData().getMetadata().getTags().size(), equalTo(1)); + assertThat(result.get(0).getData().getMetadata().getTags().contains(tagForFailure), equalTo(true)); + + verifyNoInteractions(decoderEngine, decompressionEngine); + verify(decompressionProcessingErrors).increment(); + } + + @Test + void invalid_expression_statement_throws_InvalidPluginConfigurationException() { + + final String decompressWhen = UUID.randomUUID().toString(); + when(decompressProcessorConfig.getDecompressWhen()).thenReturn(decompressWhen); + when(expressionEvaluator.isValidExpressionStatement(decompressWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + static Record buildRecordWithEvent(final Map data) { + return new Record<>(JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build()); + } +} diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionTypeTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionTypeTest.java new file mode 100644 index 0000000000..287e0cdb1d --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionTypeTest.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; +import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine; + +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +public class DecompressionTypeTest { + + @ParameterizedTest + @ArgumentsSource(EnumToStringNameArgumentsProvider.class) + void fromOptionValue_returns_expected_DecompressionType(final DecompressionType expectedEnumValue, final String enumName) { + assertThat(DecompressionType.fromOptionValue(enumName), equalTo(expectedEnumValue)); + } + + @ParameterizedTest + @ArgumentsSource(EnumToDecompressionEngineClassArgumentsProvider.class) + void getDecompressionEngine_returns_expected_DecompressionEngine(final DecompressionType enumValue, final Class decompressionEngineClass) { + assertThat(enumValue.getDecompressionEngine(), instanceOf(decompressionEngineClass)); + } + + private static class EnumToStringNameArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(DecompressionType.GZIP, "gzip") + ); + } + } + + private static class EnumToDecompressionEngineClassArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(DecompressionType.GZIP, GZipDecompressionEngine.class) + ); + } + } +} diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngineTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngineTest.java new file mode 100644 index 0000000000..989a4a067a --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/Base64DecoderEngineTest.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.MockedStatic; +import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; + +import java.util.Base64; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +public class Base64DecoderEngineTest { + + @ParameterizedTest + @CsvSource(value = {"Hello world,SGVsbG8gd29ybGQ=", "Test123,VGVzdDEyMw=="}) + void decode_correctly_decodes_base64(final String expectedDecodedValue, final String base64EncodedValue) { + final byte[] expectedDecodedBytes = expectedDecodedValue.getBytes(); + + final DecoderEngine objectUnderTest = new Base64DecoderEngine(); + + final byte[] decodedBytes = objectUnderTest.decode(base64EncodedValue); + + assertThat(decodedBytes, equalTo(expectedDecodedBytes)); + } + + @Test + void decode_throws_DecodingException_when_decoding_base64_throws_exception() { + final String encodedValue = UUID.randomUUID().toString(); + final Base64.Decoder decoder = mock(Base64.Decoder.class); + when(decoder.decode(encodedValue)).thenThrow(RuntimeException.class); + + try(final MockedStatic base64MockedStatic = mockStatic(Base64.class)) { + base64MockedStatic.when(Base64::getDecoder).thenReturn(decoder); + + final DecoderEngine objectUnderTest = new Base64DecoderEngine(); + + assertThrows(DecodingException.class, () -> objectUnderTest.decode(encodedValue)); + } + } +} diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingTypeTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingTypeTest.java new file mode 100644 index 0000000000..8a6075eb6c --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingTypeTest.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +public class EncodingTypeTest { + + @ParameterizedTest + @ArgumentsSource(EnumToStringNameArgumentsProvider.class) + void fromOptionValue_returns_expected_DecompressionType(final EncodingType expectedEnumValue, final String enumName) { + assertThat(EncodingType.fromOptionValue(enumName), equalTo(expectedEnumValue)); + } + + @ParameterizedTest + @ArgumentsSource(EnumToDecoderEngineClassArgumentsProvider.class) + void getDecompressionEngine_returns_expected_DecompressionEngine(final EncodingType enumValue, final Class decoderEngineClass) { + assertThat(enumValue.getDecoderEngine(), instanceOf(decoderEngineClass)); + } + + private static class EnumToStringNameArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(EncodingType.BASE64, "base64") + ); + } + } + + private static class EnumToDecoderEngineClassArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(EncodingType.BASE64, Base64DecoderEngine.class) + ); + } + } +} diff --git a/settings.gradle b/settings.gradle index c5d0d6c916..1aa72e7e12 100644 --- a/settings.gradle +++ b/settings.gradle @@ -150,3 +150,4 @@ include 'data-prepper-plugins:buffer-common' //include 'data-prepper-plugins:prometheus-sink' include 'data-prepper-plugins:dissect-processor' include 'data-prepper-plugins:dynamodb-source' +include 'data-prepper-plugins:decompress-processor'