From e0513ca056b5caa7de0fb637e9acccec747fceab Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 4 Mar 2024 15:37:22 -0600 Subject: [PATCH] Create parse xml processor (#4191) Signed-off-by: Taylor Gray --- .../parse-json-processor/build.gradle | 1 + .../parse/AbstractParseProcessor.java | 2 +- .../parse/xml/ParseXmlProcessor.java | 46 +++++++++ .../parse/xml/ParseXmlProcessorConfig.java | 70 ++++++++++++++ .../xml/ParseXmlProcessorConfigTest.java | 57 +++++++++++ .../parse/xml/ParseXmlProcessorTest.java | 96 +++++++++++++++++++ 6 files changed, 271 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java create mode 100644 data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java create mode 100644 data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java create mode 100644 data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java diff --git a/data-prepper-plugins/parse-json-processor/build.gradle b/data-prepper-plugins/parse-json-processor/build.gradle index cc2602d862..8ddb6af3c6 100644 --- a/data-prepper-plugins/parse-json-processor/build.gradle +++ b/data-prepper-plugins/parse-json-processor/build.gradle @@ -12,6 +12,7 @@ dependencies { implementation project(':data-prepper-plugins:common') implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' implementation 'org.apache.parquet:parquet-common:1.13.1' testImplementation project(':data-prepper-test-common') } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java index b113534983..a2b984d070 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java @@ -72,7 +72,7 @@ public Collection> doExecute(final Collection> recor } final String message = event.get(source, String.class); - if (Objects.isNull(message)) { + if (Objects.isNull(message) && !doUsePointer) { continue; } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java new file mode 100644 index 0000000000..840515afb2 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java @@ -0,0 +1,46 @@ +package org.opensearch.dataprepper.plugins.processor.parse.xml; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Optional; + +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + +@DataPrepperPlugin(name = "parse_xml", pluginType =Processor.class, pluginConfigurationType =ParseXmlProcessorConfig.class) +public class ParseXmlProcessor extends AbstractParseProcessor { + private static final Logger LOG = LoggerFactory.getLogger(ParseXmlProcessor.class); + + private final XmlMapper xmlMapper = new XmlMapper(); + + @DataPrepperPluginConstructor + public ParseXmlProcessor(final PluginMetrics pluginMetrics, + final ParseXmlProcessorConfig parseXmlProcessorConfig, + final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator); + } + + @Override + protected Optional> readValue(final String message, final Event context) { + try { + return Optional.of(xmlMapper.readValue(message, new TypeReference<>() {})); + } catch (JsonProcessingException e) { + LOG.error(EVENT, "An exception occurred due to invalid XML while reading event [{}]", context, e); + return Optional.empty(); + } catch (Exception e) { + LOG.error(EVENT, "An exception occurred while using the parse_xml processor on Event [{}]", context, e); + return Optional.empty(); + } + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java new file mode 100644 index 0000000000..df4fabc397 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java @@ -0,0 +1,70 @@ +package org.opensearch.dataprepper.plugins.processor.parse.xml; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.NotBlank; +import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; + +import java.util.List; +import java.util.Objects; + +public class ParseXmlProcessorConfig implements CommonParseConfig { + static final String DEFAULT_SOURCE = "message"; + + @NotBlank + @JsonProperty("source") + private String source = DEFAULT_SOURCE; + + @JsonProperty("destination") + private String destination; + + @JsonProperty("pointer") + private String pointer; + + @JsonProperty("parse_when") + private String parseWhen; + + @JsonProperty("tags_on_failure") + private List tagsOnFailure; + + @JsonProperty("overwrite_if_destination_exists") + private boolean overwriteIfDestinationExists = true; + + @Override + public String getSource() { + return source; + } + + @Override + public String getDestination() { + return destination; + } + + @Override + public String getPointer() { + return pointer; + } + + @Override + public List getTagsOnFailure() { + return tagsOnFailure; + } + + @Override + public String getParseWhen() { + return parseWhen; + } + + @Override + public boolean getOverwriteIfDestinationExists() { + return overwriteIfDestinationExists; + } + + @AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)") + boolean isValidDestination() { + if (Objects.isNull(destination)) return true; + + final String trimmedDestination = destination.trim(); + return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/")); + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java new file mode 100644 index 0000000000..d5e7e1ec43 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java @@ -0,0 +1,57 @@ +package org.opensearch.dataprepper.plugins.processor.parse.xml; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; + +public class ParseXmlProcessorConfigTest { + + private ParseXmlProcessorConfig createObjectUnderTest() { + return new ParseXmlProcessorConfig(); + } + + @Test + public void test_when_defaultParseXmlProcessorConfig_then_returns_default_values() { + final ParseXmlProcessorConfig objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.getSource(), equalTo(ParseXmlProcessorConfig.DEFAULT_SOURCE)); + assertThat(objectUnderTest.getDestination(), equalTo(null)); + assertThat(objectUnderTest.getPointer(), equalTo(null)); + assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); + assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); + } + + @Nested + class Validation { + final ParseXmlProcessorConfig config = createObjectUnderTest(); + + @Test + void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse() + throws NoSuchFieldException, IllegalAccessException { + setField(ParseXmlProcessorConfig.class, config, "destination", "good destination"); + + assertThat(config.isValidDestination(), equalTo(true)); + + setField(ParseXmlProcessorConfig.class, config, "destination", ""); + + assertThat(config.isValidDestination(), equalTo(false)); + + setField(ParseXmlProcessorConfig.class, config, "destination", " "); + + assertThat(config.isValidDestination(), equalTo(false)); + + setField(ParseXmlProcessorConfig.class, config, "destination", " / "); + + assertThat(config.isValidDestination(), equalTo(false)); + List tagsList = List.of("tag1", "tag2"); + setField(ParseXmlProcessorConfig.class, config, "tagsOnFailure", tagsList); + + assertThat(config.getTagsOnFailure(), equalTo(tagsList)); + } + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java new file mode 100644 index 0000000000..51de35ca70 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java @@ -0,0 +1,96 @@ +package org.opensearch.dataprepper.plugins.processor.parse.xml; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.processor.parse.xml.ParseXmlProcessorConfig.DEFAULT_SOURCE; + + +@ExtendWith(MockitoExtension.class) +public class ParseXmlProcessorTest { + + @Mock + private ParseXmlProcessorConfig processorConfig; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + private AbstractParseProcessor parseXmlProcessor; + + @BeforeEach + public void setup() { + when(processorConfig.getSource()).thenReturn(DEFAULT_SOURCE); + when(processorConfig.getParseWhen()).thenReturn(null); + when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); + } + + protected AbstractParseProcessor createObjectUnderTest() { + return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator); + } + + @Test + void test_when_using_xml_features_then_processorParsesCorrectly() { + parseXmlProcessor = createObjectUnderTest(); + + final String serializedMessage = "John Doe30"; + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThat(parsedEvent.get("name", String.class), equalTo("John Doe")); + assertThat(parsedEvent.get("age", String.class), equalTo("30")); + } + + @Test + void test_when_using_invalid_xml_tags_correctly() { + + final String tagOnFailure = UUID.randomUUID().toString(); + when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure)); + + parseXmlProcessor = createObjectUnderTest(); + + final String serializedMessage = "invalidXml"; + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThat(parsedEvent.getMetadata().hasTags(List.of(tagOnFailure)), equalTo(true)); + } + + private Event createAndParseMessageEvent(final String message) { + final Record eventUnderTest = createMessageEvent(message); + final List> editedEvents = (List>) parseXmlProcessor.doExecute( + Collections.singletonList(eventUnderTest)); + return editedEvents.get(0).getData(); + } + + private Record createMessageEvent(final String message) { + final Map eventData = new HashMap<>(); + eventData.put(processorConfig.getSource(), message); + return buildRecordWithEvent(eventData); + } + + private Record buildRecordWithEvent(final Map data) { + return new Record<>(JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build()); + } +}