diff --git a/data-prepper-plugins/decompress-processor/build.gradle b/data-prepper-plugins/decompress-processor/build.gradle index 4ef54f729c..9d67cffc3b 100644 --- a/data-prepper-plugins/decompress-processor/build.gradle +++ b/data-prepper-plugins/decompress-processor/build.gradle @@ -3,11 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -plugins { - id 'java' -} - dependencies { + implementation 'commons-io:commons-io:2.15.1' implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') implementation 'com.fasterxml.jackson.core:jackson-databind' diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java index 7c731123c9..61e7b7e812 100644 --- a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessor.java @@ -7,11 +7,13 @@ import com.google.common.base.Charsets; import io.micrometer.core.instrument.Counter; +import org.apache.commons.io.IOUtils; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -19,18 +21,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.util.Collection; @DataPrepperPlugin(name = "decompress", pluginType = Processor.class, pluginConfigurationType = DecompressProcessorConfig.class) public class DecompressProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(DecompressProcessor.class); - static final String DECOMPRESSION_PROCESSING_ERRORS = "decompressionProcessingErrors"; + static final String DECOMPRESSION_PROCESSING_ERRORS = "processingErrors"; private final DecompressProcessorConfig decompressProcessorConfig; private final ExpressionEvaluator expressionEvaluator; @@ -45,6 +44,13 @@ public DecompressProcessor(final PluginMetrics pluginMetrics, this.decompressProcessorConfig = decompressProcessorConfig; this.expressionEvaluator = expressionEvaluator; this.decompressionProcessingErrors = pluginMetrics.counter(DECOMPRESSION_PROCESSING_ERRORS); + + if (decompressProcessorConfig.getDecompressWhen() != null + && !expressionEvaluator.isValidExpressionStatement(decompressProcessorConfig.getDecompressWhen())) { + throw new InvalidPluginConfigurationException( + String.format("decompress_when value of %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", decompressProcessorConfig.getDecompressWhen())); + } } @Override @@ -66,10 +72,10 @@ public Collection> doExecute(final Collection> recor final byte[] compressedValueAsBytes = decompressProcessorConfig.getEncodingType().getDecoderEngine().decode(compressedValue); - try (final InputStream inputStream = decompressProcessorConfig.getDecompressionType().getDecompressionEngine().createInputStream(new ByteArrayInputStream(compressedValueAsBytes)); - final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8)) - ){ - record.getData().put(key, getDecompressedString(bufferedReader)); + try (final InputStream inputStream = decompressProcessorConfig.getDecompressionType().getDecompressionEngine().createInputStream(new ByteArrayInputStream(compressedValueAsBytes));){ + + final String decompressedString = IOUtils.toString(inputStream, Charsets.UTF_8); + record.getData().put(key, decompressedString); } catch (final Exception e) { LOG.error("Unable to decompress key {} using decompression type {}:", key, decompressProcessorConfig.getDecompressionType(), e); @@ -105,15 +111,4 @@ public boolean isReadyForShutdown() { public void shutdown() { } - - private String getDecompressedString(final BufferedReader bufferedReader) throws IOException { - final StringBuilder stringBuilder = new StringBuilder(); - String line; - - while ((line = bufferedReader.readLine()) != null) { - stringBuilder.append(line); - } - - return stringBuilder.toString(); - } } diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java index 81000f07b3..ce2d985277 100644 --- a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java @@ -10,7 +10,7 @@ import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType; -import org.opensearch.dataprepper.plugins.processor.decompress.encoding.IEncodingType; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngineFactory; import java.util.List; @@ -37,11 +37,11 @@ public List getKeys() { return keys; } - public IDecompressionType getDecompressionType() { + public DecompressionEngineFactory getDecompressionType() { return decompressionType; } - public IEncodingType getEncodingType() { return encodingType; } + public DecoderEngineFactory getEncodingType() { return encodingType; } public String getDecompressWhen() { return decompressWhen; diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/IDecompressionType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionEngineFactory.java similarity index 85% rename from data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/IDecompressionType.java rename to data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionEngineFactory.java index 6853eb3228..581b3a1970 100644 --- a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/IDecompressionType.java +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionEngineFactory.java @@ -7,6 +7,6 @@ import org.opensearch.dataprepper.model.codec.DecompressionEngine; -public interface IDecompressionType { +public interface DecompressionEngineFactory { public DecompressionEngine getDecompressionEngine(); } diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java index ad127c7290..88f64a52e1 100644 --- a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionType.java @@ -13,7 +13,7 @@ import java.util.Map; import java.util.stream.Collectors; -public enum DecompressionType implements IDecompressionType { +public enum DecompressionType implements DecompressionEngineFactory { GZIP("gzip"); private final String option; diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/IEncodingType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngineFactory.java similarity index 83% rename from data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/IEncodingType.java rename to data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngineFactory.java index 3b513523c7..89d8a6334c 100644 --- a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/IEncodingType.java +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/DecoderEngineFactory.java @@ -5,6 +5,6 @@ package org.opensearch.dataprepper.plugins.processor.decompress.encoding; -public interface IEncodingType { +public interface DecoderEngineFactory { DecoderEngine getDecoderEngine(); } diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java index 43b9f18fa7..1e88412682 100644 --- a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingType.java @@ -11,7 +11,7 @@ import java.util.Map; import java.util.stream.Collectors; -public enum EncodingType implements IEncodingType { +public enum EncodingType implements DecoderEngineFactory { BASE64("base64"); private final String option; diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/ITDecompressProcessorTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorIT.java similarity index 98% rename from data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/ITDecompressProcessorTest.java rename to data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorIT.java index 4950893af8..f0b6a7da42 100644 --- a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/ITDecompressProcessorTest.java +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorIT.java @@ -28,7 +28,7 @@ import static org.opensearch.dataprepper.plugins.processor.decompress.DecompressProcessorTest.buildRecordWithEvent; @ExtendWith(MockitoExtension.class) -public class ITDecompressProcessorTest { +public class DecompressProcessorIT { private List keys; diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java index 542058c2a2..8ad5cfc657 100644 --- a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorTest.java @@ -18,9 +18,10 @@ import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngine; -import org.opensearch.dataprepper.plugins.processor.decompress.encoding.IEncodingType; +import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngineFactory; import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException; import java.io.ByteArrayInputStream; @@ -33,6 +34,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -65,10 +67,10 @@ public class DecompressProcessorTest { private DecompressProcessorConfig decompressProcessorConfig; @Mock - private IDecompressionType decompressionType; + private DecompressionEngineFactory decompressionType; @Mock - private IEncodingType encodingType; + private DecoderEngineFactory encodingType; private DecompressProcessor createObjectUnderTest() { return new DecompressProcessor(pluginMetrics, decompressProcessorConfig, expressionEvaluator); @@ -180,6 +182,7 @@ void exception_from_expression_evaluator_adds_tags_and_increments_error_metric() when(decompressProcessorConfig.getTagsOnFailure()).thenReturn(List.of(tagForFailure)); when(decompressProcessorConfig.getDecompressWhen()).thenReturn(decompressWhen); + when(expressionEvaluator.isValidExpressionStatement(decompressWhen)).thenReturn(true); when(expressionEvaluator.evaluateConditional(eq(decompressWhen), any(Event.class))) .thenThrow(RuntimeException.class); @@ -201,6 +204,16 @@ void exception_from_expression_evaluator_adds_tags_and_increments_error_metric() verify(decompressionProcessingErrors).increment(); } + @Test + void invalid_expression_statement_throws_InvalidPluginConfigurationException() { + + final String decompressWhen = UUID.randomUUID().toString(); + when(decompressProcessorConfig.getDecompressWhen()).thenReturn(decompressWhen); + when(expressionEvaluator.isValidExpressionStatement(decompressWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + static Record buildRecordWithEvent(final Map data) { return new Record<>(JacksonEvent.builder() .withData(data) diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionTypeTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionTypeTest.java new file mode 100644 index 0000000000..287e0cdb1d --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressionTypeTest.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; +import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine; + +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +public class DecompressionTypeTest { + + @ParameterizedTest + @ArgumentsSource(EnumToStringNameArgumentsProvider.class) + void fromOptionValue_returns_expected_DecompressionType(final DecompressionType expectedEnumValue, final String enumName) { + assertThat(DecompressionType.fromOptionValue(enumName), equalTo(expectedEnumValue)); + } + + @ParameterizedTest + @ArgumentsSource(EnumToDecompressionEngineClassArgumentsProvider.class) + void getDecompressionEngine_returns_expected_DecompressionEngine(final DecompressionType enumValue, final Class decompressionEngineClass) { + assertThat(enumValue.getDecompressionEngine(), instanceOf(decompressionEngineClass)); + } + + private static class EnumToStringNameArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(DecompressionType.GZIP, "gzip") + ); + } + } + + private static class EnumToDecompressionEngineClassArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(DecompressionType.GZIP, GZipDecompressionEngine.class) + ); + } + } +} diff --git a/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingTypeTest.java b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingTypeTest.java new file mode 100644 index 0000000000..8a6075eb6c --- /dev/null +++ b/data-prepper-plugins/decompress-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/decompress/encoding/EncodingTypeTest.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.decompress.encoding; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +public class EncodingTypeTest { + + @ParameterizedTest + @ArgumentsSource(EnumToStringNameArgumentsProvider.class) + void fromOptionValue_returns_expected_DecompressionType(final EncodingType expectedEnumValue, final String enumName) { + assertThat(EncodingType.fromOptionValue(enumName), equalTo(expectedEnumValue)); + } + + @ParameterizedTest + @ArgumentsSource(EnumToDecoderEngineClassArgumentsProvider.class) + void getDecompressionEngine_returns_expected_DecompressionEngine(final EncodingType enumValue, final Class decoderEngineClass) { + assertThat(enumValue.getDecoderEngine(), instanceOf(decoderEngineClass)); + } + + private static class EnumToStringNameArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(EncodingType.BASE64, "base64") + ); + } + } + + private static class EnumToDecoderEngineClassArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(EncodingType.BASE64, Base64DecoderEngine.class) + ); + } + } +}