diff --git a/NOTICE b/NOTICE index 6c7dc983f8..4a671c614a 100644 --- a/NOTICE +++ b/NOTICE @@ -10,3 +10,7 @@ Foundation (http://www.apache.org/). This product includes software developed by Joda.org (http://www.joda.org/). + +This product includes software developed by +Twilio Inc. (https://www.twilio.com/). +Copyright 2024 Twilio Inc. diff --git a/data-prepper-plugins/common/README.md b/data-prepper-plugins/common/README.md index 96e5e560a2..c774d72e95 100644 --- a/data-prepper-plugins/common/README.md +++ b/data-prepper-plugins/common/README.md @@ -35,6 +35,8 @@ A source plugin to read input data from the specified file path. The file source Temporarily, `type` can either be `event` or `string`. If you would like to use the file source for log analytics use cases like grok, change this to `event`. +* `compression` (String): The source file compression format, if any. Valid options are `none`, `gzip` and `snappy`. Default is `none`. + ## `file` (sink) A sink plugin to write output data to the specified file path. diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java index a0da7461f1..9698144097 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -23,11 +24,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.io.BufferedReader; -import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -48,6 +52,7 @@ public class FileSource implements Source> { private final FileSourceConfig fileSourceConfig; private final FileStrategy fileStrategy; private final EventFactory eventFactory; + private final DecompressionEngine decompressionEngine; private Thread readThread; @@ -63,6 +68,7 @@ public FileSource( this.fileSourceConfig = fileSourceConfig; this.isStopRequested = false; this.writeTimeout = FileSourceConfig.DEFAULT_TIMEOUT; + this.decompressionEngine = fileSourceConfig.getCompression().getDecompressionEngine(); if(fileSourceConfig.getCodec() != null) { fileStrategy = new CodecFileStrategy(pluginFactory); @@ -104,7 +110,8 @@ private interface FileStrategy { private class ClassicFileStrategy implements FileStrategy { @Override public void start(Buffer> buffer) { - try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) { + Path filePath = Paths.get(fileSourceConfig.getFilePathToRead()); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(decompressionEngine.createInputStream(Files.newInputStream(filePath)), StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null && !isStopRequested) { writeLineAsEventOrString(line, buffer); @@ -166,13 +173,13 @@ private class CodecFileStrategy implements FileStrategy { final PluginModel codecConfiguration = fileSourceConfig.getCodec(); final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); - } @Override public void start(final Buffer> buffer) { - try { - codec.parse(new FileInputStream(fileSourceConfig.getFilePathToRead()), eventRecord -> { + Path filePath = Paths.get(fileSourceConfig.getFilePathToRead()); + try(InputStream is = decompressionEngine.createInputStream(Files.newInputStream(filePath))) { + codec.parse(is, eventRecord -> { try { buffer.write((Record) eventRecord, writeTimeout); } catch (TimeoutException e) { @@ -186,4 +193,4 @@ public void start(final Buffer> buffer) { } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java index 255857a4bb..9eb8dd961d 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java @@ -10,6 +10,7 @@ import com.google.common.base.Preconditions; import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; import java.util.Objects; @@ -35,6 +36,9 @@ public class FileSourceConfig { @JsonProperty("codec") private PluginModel codec; + @JsonProperty("compression") + private CompressionOption compression = CompressionOption.NONE; + public String getFilePathToRead() { return filePathToRead; } @@ -52,6 +56,10 @@ public PluginModel getCodec() { return codec; } + public CompressionOption getCompression() { + return compression; + } + void validate() { Objects.requireNonNull(filePathToRead, "File path is required"); Preconditions.checkArgument(recordType.equals(EVENT_TYPE) || recordType.equals(DEFAULT_TYPE), "Invalid type: must be either [event] or [string]"); diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java index aedacdcbb2..e1111207d4 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java @@ -18,13 +18,16 @@ import org.opensearch.dataprepper.event.TestEventFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +88,31 @@ private FileSource createObjectUnderTest() { return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory, TestEventFactory.getTestEventFactory()); } + /** + * Variant of creatgeObjectUnderTest that uses mocks for the configuration instead of object mapper, so we can + * pass concrete mocks to the FileSource through the FileSourceConfig. + * @param codec the codec to use in the configuration + * @param engine the {@link DecompressionEngine} to use in the configuration + * @return + */ + private FileSource createObjectUnderTest(PluginModel codec, DecompressionEngine engine) { + FileSourceConfig fileSourceConfig = mock(FileSourceConfig.class); + + when(fileSourceConfig.getFilePathToRead()).thenReturn(TEST_FILE_PATH_PLAIN); + + if (codec != null) { + when(fileSourceConfig.getCodec()).thenReturn(codec); + } + + if (engine != null) { + CompressionOption compressionOption = mock(CompressionOption.class); + when(compressionOption.getDecompressionEngine()).thenReturn(engine); + when(fileSourceConfig.getCompression()).thenReturn(compressionOption); + } + + return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory, TestEventFactory.getTestEventFactory()); + } + @Nested class WithRecord { private static final String TEST_PIPELINE_NAME = "pipeline"; @@ -278,6 +306,9 @@ class WithCodec { @Mock private Buffer buffer; + @Mock + private DecompressionEngine decompressionEngine; + @BeforeEach void setUp() { Map codecConfiguration = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); @@ -290,21 +321,18 @@ void setUp() { @Test void start_will_parse_codec_with_correct_inputStream() throws IOException { - createObjectUnderTest().start(buffer); + final FileInputStream decompressedStream = new FileInputStream(TEST_FILE_PATH_PLAIN); + DecompressionEngine mockEngine = mock(DecompressionEngine.class); + when(mockEngine.createInputStream(any(InputStream.class))).thenReturn(decompressedStream); - final ArgumentCaptor inputStreamArgumentCaptor = ArgumentCaptor.forClass(InputStream.class); + PluginModel fakeCodec = mock(PluginModel.class); + when(fakeCodec.getPluginName()).thenReturn("fake_codec"); + when(fakeCodec.getPluginSettings()).thenReturn(Map.of()); - await().atMost(2, TimeUnit.SECONDS) - .untilAsserted(() -> verify(inputCodec).parse(any(InputStream.class), any(Consumer.class))); - verify(inputCodec).parse(inputStreamArgumentCaptor.capture(), any(Consumer.class)); - - final InputStream actualInputStream = inputStreamArgumentCaptor.getValue(); + createObjectUnderTest(fakeCodec, mockEngine).start(buffer); - final byte[] actualBytes = actualInputStream.readAllBytes(); - final FileInputStream fileInputStream = new FileInputStream(TEST_FILE_PATH_PLAIN); - final byte[] expectedBytes = fileInputStream.readAllBytes(); - - assertThat(actualBytes, equalTo(expectedBytes)); + await().atMost(2, TimeUnit.SECONDS) + .untilAsserted(() -> verify(inputCodec).parse(eq(decompressedStream), any(Consumer.class))); } @Test