Skip to content

Commit

Permalink
Create parse xml processor
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Mar 1, 2024
1 parent 9cdc5af commit 5c5b23b
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 1 deletion.
1 change: 1 addition & 0 deletions data-prepper-plugins/parse-json-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
implementation 'org.apache.parquet:parquet-common:1.13.1'
testImplementation project(':data-prepper-test-common')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}

final String message = event.get(source, String.class);
if (Objects.isNull(message)) {
if (Objects.isNull(message) && !doUsePointer) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.opensearch.dataprepper.plugins.processor.parse.xml;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Optional;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

@DataPrepperPlugin(name = "parse_xml", pluginType =Processor.class, pluginConfigurationType =ParseXmlProcessorConfig.class)
public class ParseXmlProcessor extends AbstractParseProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ParseXmlProcessor.class);

private final XmlMapper xmlMapper = new XmlMapper();

@DataPrepperPluginConstructor
public ParseXmlProcessor(final PluginMetrics pluginMetrics,
final ParseXmlProcessorConfig parseXmlProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator);
}

@Override
protected Optional<HashMap<String, Object>> readValue(final String message, final Event context) {
try {
return Optional.of(xmlMapper.readValue(message, new TypeReference<>() {}));
} catch (JsonProcessingException e) {
LOG.error(EVENT, "An exception occurred due to invalid XML while reading event [{}]", context, e);
return Optional.empty();
} catch (Exception e) {
LOG.error(EVENT, "An exception occurred while using the parse_xml processor on Event [{}]", context, e);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.opensearch.dataprepper.plugins.processor.parse.xml;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotBlank;
import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig;

import java.util.List;
import java.util.Objects;

public class ParseXmlProcessorConfig implements CommonParseConfig {
static final String DEFAULT_SOURCE = "message";

@NotBlank
@JsonProperty("source")
private String source = DEFAULT_SOURCE;

@JsonProperty("destination")
private String destination;

@JsonProperty("pointer")
private String pointer;

@JsonProperty("parse_when")
private String parseWhen;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@Override
public String getSource() {
return source;
}

@Override
public String getDestination() {
return destination;
}

@Override
public String getPointer() {
return pointer;
}

@Override
public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

@Override
public String getParseWhen() {
return parseWhen;
}

@Override
public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}

@AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)")
boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.opensearch.dataprepper.plugins.processor.parse.xml;

import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

public class ParseXmlProcessorConfigTest {

private ParseXmlProcessorConfig createObjectUnderTest() {
return new ParseXmlProcessorConfig();
}

@Test
public void test_when_defaultParseXmlProcessorConfig_then_returns_default_values() {
final ParseXmlProcessorConfig objectUnderTest = createObjectUnderTest();

assertThat(objectUnderTest.getSource(), equalTo(ParseXmlProcessorConfig.DEFAULT_SOURCE));
assertThat(objectUnderTest.getDestination(), equalTo(null));
assertThat(objectUnderTest.getPointer(), equalTo(null));
assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null));
assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true));
}

@Nested
class Validation {
final ParseXmlProcessorConfig config = createObjectUnderTest();

@Test
void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse()
throws NoSuchFieldException, IllegalAccessException {
setField(ParseXmlProcessorConfig.class, config, "destination", "good destination");

assertThat(config.isValidDestination(), equalTo(true));

setField(ParseXmlProcessorConfig.class, config, "destination", "");

assertThat(config.isValidDestination(), equalTo(false));

setField(ParseXmlProcessorConfig.class, config, "destination", " ");

assertThat(config.isValidDestination(), equalTo(false));

setField(ParseXmlProcessorConfig.class, config, "destination", " / ");

assertThat(config.isValidDestination(), equalTo(false));
List<String> tagsList = List.of("tag1", "tag2");
setField(ParseXmlProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.opensearch.dataprepper.plugins.processor.parse.xml;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.processor.parse.xml.ParseXmlProcessorConfig.DEFAULT_SOURCE;


@ExtendWith(MockitoExtension.class)
public class ParseXmlProcessorTest {

@Mock
private ParseXmlProcessorConfig processorConfig;

@Mock
private PluginMetrics pluginMetrics;

@Mock
private ExpressionEvaluator expressionEvaluator;

private AbstractParseProcessor parseXmlProcessor;

@BeforeEach
public void setup() {
when(processorConfig.getSource()).thenReturn(DEFAULT_SOURCE);
when(processorConfig.getParseWhen()).thenReturn(null);
when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true);
}

protected AbstractParseProcessor createObjectUnderTest() {
return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator);
}

@Test
void test_when_using_xml_features_then_processorParsesCorrectly() {
parseXmlProcessor = createObjectUnderTest();

final String serializedMessage = "<Person><name>John Doe</name><age>30</age></Person>";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.get("name", String.class), equalTo("John Doe"));
assertThat(parsedEvent.get("age", String.class), equalTo("30"));
}

@Test
void test_when_using_invalid_xml_tags_correctly() {

final String tagOnFailure = UUID.randomUUID().toString();
when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure));

parseXmlProcessor = createObjectUnderTest();

final String serializedMessage = "invalidXml";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.getMetadata().hasTags(List.of(tagOnFailure)), equalTo(true));
}

private Event createAndParseMessageEvent(final String message) {
final Record<Event> eventUnderTest = createMessageEvent(message);
final List<Record<Event>> editedEvents = (List<Record<Event>>) parseXmlProcessor.doExecute(
Collections.singletonList(eventUnderTest));
return editedEvents.get(0).getData();
}

private Record<Event> createMessageEvent(final String message) {
final Map<String, Object> eventData = new HashMap<>();
eventData.put(processorConfig.getSource(), message);
return buildRecordWithEvent(eventData);
}

private Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build());
}
}

0 comments on commit 5c5b23b

Please sign in to comment.