Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create decrompress processor to decompress gzipped keys #4118

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions data-prepper-plugins/decompress-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

dependencies {
implementation 'commons-io:commons-io:2.15.1'
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'io.micrometer:micrometer-core'
testImplementation testLibs.mockito.inline
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import com.google.common.base.Charsets;
import io.micrometer.core.instrument.Counter;
import org.apache.commons.io.IOUtils;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Collection;

@DataPrepperPlugin(name = "decompress", pluginType = Processor.class, pluginConfigurationType = DecompressProcessorConfig.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please open a documentation issue so that we track the need to document this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public class DecompressProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(DecompressProcessor.class);
static final String DECOMPRESSION_PROCESSING_ERRORS = "processingErrors";

private final DecompressProcessorConfig decompressProcessorConfig;
private final ExpressionEvaluator expressionEvaluator;

private final Counter decompressionProcessingErrors;

@DataPrepperPluginConstructor
public DecompressProcessor(final PluginMetrics pluginMetrics,
final DecompressProcessorConfig decompressProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please validate the expression in the constructor.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to do the validation in the Config.java? I thought that's the convention we follow. I am ok with this too. Just checking.

super(pluginMetrics);
this.decompressProcessorConfig = decompressProcessorConfig;
this.expressionEvaluator = expressionEvaluator;
this.decompressionProcessingErrors = pluginMetrics.counter(DECOMPRESSION_PROCESSING_ERRORS);

if (decompressProcessorConfig.getDecompressWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(decompressProcessorConfig.getDecompressWhen())) {
throw new InvalidPluginConfigurationException(
String.format("decompress_when value of %s is not a valid expression statement. " +
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", decompressProcessorConfig.getDecompressWhen()));
}
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {

try {
if (decompressProcessorConfig.getDecompressWhen() != null && !expressionEvaluator.evaluateConditional(decompressProcessorConfig.getDecompressWhen(), record.getData())) {
continue;
}

for (final String key : decompressProcessorConfig.getKeys()) {

final String compressedValue = record.getData().get(key, String.class);

if (compressedValue == null) {
continue;
}

final byte[] compressedValueAsBytes = decompressProcessorConfig.getEncodingType().getDecoderEngine().decode(compressedValue);

try (final InputStream inputStream = decompressProcessorConfig.getDecompressionType().getDecompressionEngine().createInputStream(new ByteArrayInputStream(compressedValueAsBytes));){

final String decompressedString = IOUtils.toString(inputStream, Charsets.UTF_8);
record.getData().put(key, decompressedString);
} catch (final Exception e) {
LOG.error("Unable to decompress key {} using decompression type {}:",
key, decompressProcessorConfig.getDecompressionType(), e);
record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure());
decompressionProcessingErrors.increment();
}
}
} catch (final DecodingException e) {
LOG.error("Unable to decode key with base64: {}", e.getMessage());
record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure());
decompressionProcessingErrors.increment();
} catch (final Exception e) {
LOG.error("An uncaught exception occurred while decompressing Events", e);
record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure());
decompressionProcessingErrors.increment();
}
}

return records;
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngineFactory;

import java.util.List;

public class DecompressProcessorConfig {

@JsonProperty("keys")
@NotEmpty
private List<String> keys;

@JsonProperty("type")
@NotNull
private DecompressionType decompressionType;

@JsonProperty("decompress_when")
private String decompressWhen;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure = List.of("_decompression_failure");

@JsonIgnore
private final EncodingType encodingType = EncodingType.BASE64;

public List<String> getKeys() {
return keys;
}

public DecompressionEngineFactory getDecompressionType() {
return decompressionType;
}

public DecoderEngineFactory getEncodingType() { return encodingType; }

public String getDecompressWhen() {
return decompressWhen;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import org.opensearch.dataprepper.model.codec.DecompressionEngine;

public interface DecompressionEngineFactory {
public DecompressionEngine getDecompressionEngine();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import com.fasterxml.jackson.annotation.JsonCreator;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum DecompressionType implements DecompressionEngineFactory {
GZIP("gzip");

private final String option;

private static final Map<String, DecompressionType> OPTIONS_MAP = Arrays.stream(DecompressionType.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private static final Map<String, DecompressionEngine> DECOMPRESSION_ENGINE_MAP = Map.of(
"gzip", new GZipDecompressionEngine()
);

DecompressionType(final String option) {
this.option = option;
}

@JsonCreator
static DecompressionType fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}

@Override
public DecompressionEngine getDecompressionEngine() {
return DECOMPRESSION_ENGINE_MAP.get(this.option);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;

import java.util.Base64;

public class Base64DecoderEngine implements DecoderEngine {
@Override
public byte[] decode(final String encodedValue) {
try {
return Base64.getDecoder().decode(encodedValue);
} catch (final Exception e) {
throw new DecodingException(String.format("There was an error decoding with the base64 encoding type: %s", e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;

public interface DecoderEngine {
byte[] decode(final String encodedValue) throws DecodingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

public interface DecoderEngineFactory {
DecoderEngine getDecoderEngine();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum EncodingType implements DecoderEngineFactory {
BASE64("base64");

private final String option;

private static final Map<String, EncodingType> OPTIONS_MAP = Arrays.stream(EncodingType.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private static final Map<String, DecoderEngine> DECODER_ENGINE_MAP = Map.of(
"base64", new Base64DecoderEngine()
);

EncodingType(final String option) {
this.option = option;
}

@JsonCreator
static EncodingType fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}

@Override
public DecoderEngine getDecoderEngine() {
return DECODER_ENGINE_MAP.get(this.option);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.exceptions;

public class DecodingException extends RuntimeException {
public DecodingException(final String message) {
super(message);
}
}
Loading
Loading