Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create parse xml processor #4191

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/parse-json-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
implementation 'org.apache.parquet:parquet-common:1.13.1'
testImplementation project(':data-prepper-test-common')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}

final String message = event.get(source, String.class);
if (Objects.isNull(message)) {
if (Objects.isNull(message) && !doUsePointer) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.opensearch.dataprepper.plugins.processor.parse.xml;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Optional;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

@DataPrepperPlugin(name = "parse_xml", pluginType =Processor.class, pluginConfigurationType =ParseXmlProcessorConfig.class)
public class ParseXmlProcessor extends AbstractParseProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ParseXmlProcessor.class);

private final XmlMapper xmlMapper = new XmlMapper();

@DataPrepperPluginConstructor
public ParseXmlProcessor(final PluginMetrics pluginMetrics,
final ParseXmlProcessorConfig parseXmlProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator);
}

@Override
protected Optional<HashMap<String, Object>> readValue(final String message, final Event context) {
try {
return Optional.of(xmlMapper.readValue(message, new TypeReference<>() {}));
} catch (JsonProcessingException e) {
LOG.error(EVENT, "An exception occurred due to invalid XML while reading event [{}]", context, e);
return Optional.empty();
} catch (Exception e) {
LOG.error(EVENT, "An exception occurred while using the parse_xml processor on Event [{}]", context, e);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.opensearch.dataprepper.plugins.processor.parse.xml;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotBlank;
import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig;

import java.util.List;
import java.util.Objects;

public class ParseXmlProcessorConfig implements CommonParseConfig {
static final String DEFAULT_SOURCE = "message";

@NotBlank
@JsonProperty("source")
private String source = DEFAULT_SOURCE;

@JsonProperty("destination")
private String destination;

@JsonProperty("pointer")
private String pointer;

@JsonProperty("parse_when")
private String parseWhen;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

@JsonProperty("overwrite_if_destination_exists")
private boolean overwriteIfDestinationExists = true;

@Override
public String getSource() {
return source;
}

@Override
public String getDestination() {
return destination;
}

@Override
public String getPointer() {
return pointer;
}

@Override
public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

@Override
public String getParseWhen() {
return parseWhen;
}

@Override
public boolean getOverwriteIfDestinationExists() {
return overwriteIfDestinationExists;
}

@AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)")
boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.opensearch.dataprepper.plugins.processor.parse.xml;

import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

public class ParseXmlProcessorConfigTest {

private ParseXmlProcessorConfig createObjectUnderTest() {
return new ParseXmlProcessorConfig();
}

@Test
public void test_when_defaultParseXmlProcessorConfig_then_returns_default_values() {
final ParseXmlProcessorConfig objectUnderTest = createObjectUnderTest();

assertThat(objectUnderTest.getSource(), equalTo(ParseXmlProcessorConfig.DEFAULT_SOURCE));
assertThat(objectUnderTest.getDestination(), equalTo(null));
assertThat(objectUnderTest.getPointer(), equalTo(null));
assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null));
assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true));
}

@Nested
class Validation {
final ParseXmlProcessorConfig config = createObjectUnderTest();

@Test
void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse()
throws NoSuchFieldException, IllegalAccessException {
setField(ParseXmlProcessorConfig.class, config, "destination", "good destination");

assertThat(config.isValidDestination(), equalTo(true));

setField(ParseXmlProcessorConfig.class, config, "destination", "");

assertThat(config.isValidDestination(), equalTo(false));

setField(ParseXmlProcessorConfig.class, config, "destination", " ");

assertThat(config.isValidDestination(), equalTo(false));

setField(ParseXmlProcessorConfig.class, config, "destination", " / ");

assertThat(config.isValidDestination(), equalTo(false));
List<String> tagsList = List.of("tag1", "tag2");
setField(ParseXmlProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.opensearch.dataprepper.plugins.processor.parse.xml;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.processor.parse.xml.ParseXmlProcessorConfig.DEFAULT_SOURCE;


@ExtendWith(MockitoExtension.class)
public class ParseXmlProcessorTest {

@Mock
private ParseXmlProcessorConfig processorConfig;

@Mock
private PluginMetrics pluginMetrics;

@Mock
private ExpressionEvaluator expressionEvaluator;

private AbstractParseProcessor parseXmlProcessor;

@BeforeEach
public void setup() {
when(processorConfig.getSource()).thenReturn(DEFAULT_SOURCE);
when(processorConfig.getParseWhen()).thenReturn(null);
when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true);
}

protected AbstractParseProcessor createObjectUnderTest() {
return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator);
}

@Test
void test_when_using_xml_features_then_processorParsesCorrectly() {
parseXmlProcessor = createObjectUnderTest();

final String serializedMessage = "<Person><name>John Doe</name><age>30</age></Person>";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.get("name", String.class), equalTo("John Doe"));
assertThat(parsedEvent.get("age", String.class), equalTo("30"));
}

@Test
void test_when_using_invalid_xml_tags_correctly() {

final String tagOnFailure = UUID.randomUUID().toString();
when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure));

parseXmlProcessor = createObjectUnderTest();

final String serializedMessage = "invalidXml";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.getMetadata().hasTags(List.of(tagOnFailure)), equalTo(true));
}

private Event createAndParseMessageEvent(final String message) {
final Record<Event> eventUnderTest = createMessageEvent(message);
final List<Record<Event>> editedEvents = (List<Record<Event>>) parseXmlProcessor.doExecute(
Collections.singletonList(eventUnderTest));
return editedEvents.get(0).getData();
}

private Record<Event> createMessageEvent(final String message) {
final Map<String, Object> eventData = new HashMap<>();
eventData.put(processorConfig.getSource(), message);
return buildRecordWithEvent(eventData);
}

private Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build());
}
}
Loading