Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for spilt event processor
Browse files Browse the repository at this point in the history
Signed-off-by: srigovs <srigovs@amazon.com>
srikanthjg committed Feb 20, 2024
1 parent 680ad7a commit 458b5de
Showing 5 changed files with 595 additions and 0 deletions.
30 changes: 30 additions & 0 deletions data-prepper-plugins/split-event-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 1.0
}
}
}
}


dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.splitevent;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;


@DataPrepperPlugin(name = "split_event", pluginType = Processor.class, pluginConfigurationType = SplitEventProcessorConfig.class)
public class SplitEventProcessor extends AbstractProcessor<Record<Event>, Record<Event>>{
final Pattern pattern;
final String delimiter;
final String delimiterRegex;
final String field;

@DataPrepperPluginConstructor
public SplitEventProcessor(final PluginMetrics pluginMetrics, final SplitEventProcessorConfig config) {
super(pluginMetrics);
this.delimiter = config.getDelimiter();
this.delimiterRegex = config.getDelimiterRegex();
this.field = config.getField();

if(delimiterRegex != null && !delimiterRegex.isEmpty()
&& delimiter != null && !delimiter.isEmpty()) {
throw new IllegalArgumentException("delimiter and delimiter_regex cannot be defined at the same time");
} else if((delimiterRegex == null || delimiterRegex.isEmpty()) &&
(delimiter == null || delimiter.isEmpty())) {
throw new IllegalArgumentException("delimiter or delimiter_regex needs to be defined");
}

if(delimiterRegex != null && !delimiterRegex.isEmpty()) {
pattern = Pattern.compile(delimiterRegex);
} else {
pattern = Pattern.compile(Pattern.quote(delimiter));
}
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
Collection<Record<Event>> newRecords = new ArrayList<>();
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();

if (!recordEvent.containsKey(field)) {
Record newRecord = new Record<>(recordEvent);
newRecords.add(newRecord);
continue;
}

final Object value = recordEvent.get(field, Object.class);

//split record according to delimiter
final String[] splitValues = pattern.split((String) value);

// when no splits or empty value modify the original event
if(splitValues.length <= 1) {
Record newRecord = new Record<>(recordEvent);
newRecords.add(newRecord);
continue;
}

//create new events for the splits
for (int i = 0; i < splitValues.length-1 ; i++) {
Record newRecord = createNewRecordFromEvent(recordEvent, splitValues[i]);
addToAcknowledgementSetFromOriginEvent((Event) newRecord.getData(), recordEvent);
newRecords.add(newRecord);
}

// Modify original event to hold the last split
recordEvent.put(field, splitValues[splitValues.length-1]);
Record newRecord = new Record<>(recordEvent);
newRecords.add(newRecord);
}
return newRecords;
}

protected Record createNewRecordFromEvent(final Event recordEvent, String splitValue) {
Record newRecord;
JacksonEvent newRecordEvent;

newRecordEvent = JacksonEvent.fromEvent(recordEvent);
newRecordEvent.put(field,(Object) splitValue);
newRecord = new Record<>(newRecordEvent);
return newRecord;
}

protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) {
DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle();
if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) {
eventHandle.getAcknowledgementSet().add(recordEvent);
}
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.splitevent;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;


public class SplitEventProcessorConfig {
@NotEmpty
@NotNull
@JsonProperty("field")
private String field;

@JsonProperty("delimiter_regex")
private String delimiterRegex;

@Size(min = 1, max = 1)
private String delimiter;

public String getField() {
return field;
}

public String getDelimiterRegex() {
return delimiterRegex;
}

public String getDelimiter() {
return delimiter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,395 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.processor.splitevent;


import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import static org.mockito.ArgumentMatchers.any;
import org.mockito.Mock;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;


@ExtendWith(MockitoExtension.class)
public class SplitEventProcessorTest {
@Mock
private PluginMetrics pluginMetrics;

@Mock
private SplitEventProcessorConfig mockConfig;

@Mock
private AcknowledgementSet mockAcknowledgementSet;

private SplitEventProcessor splitEventProcessor;


private Record<Event> createTestRecord(final Map<String, Object> data) {

Event event = JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build();

DefaultEventHandle eventHandle = (DefaultEventHandle) event.getEventHandle();

eventHandle.setAcknowledgementSet(mockAcknowledgementSet);
return new Record<>(event);
}

@BeforeEach
void setup() {
when(mockConfig.getField()).thenReturn("k1");
when(mockConfig.getDelimiter()).thenReturn(" ");

splitEventProcessor = new SplitEventProcessor(pluginMetrics, mockConfig);
}

private static Stream<Arguments> provideMaps() {
return Stream.of(
Arguments.of(
Map.of(
"k1", "",
"k2", "v2"
)
),
Arguments.of(
Map.of(
"k1", "v1",
"k2", "v2"
)
),
Arguments.of(
Map.of("k1", "v1 v2",
"k2", "v2"
)
),
Arguments.of(
Map.of("k1", "v1 v2 v3",
"k2", "v2"
)
)
);
}

@Test
void testHappyPathWithSpaceDelimiter() {
final Map<String, Object> testData = new HashMap<>();
testData.put("k1", "v1 v2");
final Record<Event> record = createTestRecord(testData);
when(mockConfig.getDelimiter()).thenReturn(" ");

final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig);
final List<Record<Event>> editedRecords = (List<Record<Event>>) objectUnderTest.doExecute(Collections.singletonList(record));

for(Record r: editedRecords){
Event event = (Event) r.getData();
}

assertThat(editedRecords.size(), equalTo(2));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2")));
}

@Test
void testHappyPathWithSpaceDelimiterForStringWithMultipleSpaces() {
final Map<String, Object> testData = new HashMap<>();
testData.put("k1", "v1 v2");
final Record<Event> record = createTestRecord(testData);
when(mockConfig.getDelimiter()).thenReturn(" ");

final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig);
final List<Record<Event>> editedRecords = (List<Record<Event>>) objectUnderTest.doExecute(Collections.singletonList(record));
for(Record r: editedRecords){
Event event = (Event) r.getData();
}

assertThat(editedRecords.size(), equalTo(3));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","")));
assertThat(editedRecords.get(2).getData().toMap(), equalTo(Map.of("k1","v2")));
}

@Test
void testHappyPathWithSemiColonDelimiter() {
final Map<String, Object> testData = new HashMap<>();
testData.put("k1", "v1;v2");
final Record<Event> record = createTestRecord(testData);
when(mockConfig.getDelimiter()).thenReturn(";");

final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig);
final List<Record<Event>> editedRecords = (List<Record<Event>>) objectUnderTest.doExecute(Collections.singletonList(record));

for(Record r: editedRecords){
Event event = (Event) r.getData();
}

assertThat(editedRecords.size(), equalTo(2));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2")));
}

@Test
void testHappyPathWithSpaceDelimiterRegex() {
final Map<String, Object> testData = new HashMap<>();
testData.put("k1", "v1 v2");
final Record<Event> record = createTestRecord(testData);
when(mockConfig.getDelimiter()).thenReturn(null);
when(mockConfig.getDelimiterRegex()).thenReturn("\\s+");

final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig);
final List<Record<Event>> editedRecords = (List<Record<Event>>) objectUnderTest.doExecute(Collections.singletonList(record));

for(Record r: editedRecords){
Event event = (Event) r.getData();
}

assertThat(editedRecords.size(), equalTo(2));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2")));
}

@Test
void testHappyPathWithColonDelimiterRegex() {
final Map<String, Object> testData = new HashMap<>();
testData.put("k1", "v1:v2");
final Record<Event> record = createTestRecord(testData);
when(mockConfig.getDelimiter()).thenReturn(null);
when(mockConfig.getDelimiterRegex()).thenReturn(":");

final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig);
final List<Record<Event>> editedRecords = (List<Record<Event>>) objectUnderTest.doExecute(Collections.singletonList(record));

for(Record r: editedRecords){
Event event = (Event) r.getData();
}

assertThat(editedRecords.size(), equalTo(2));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2")));
}

@Test
void testFailureWithBothDelimiterRegexAndDelimiterDefined() {
when(mockConfig.getDelimiter()).thenReturn(" ");
when(mockConfig.getDelimiterRegex()).thenReturn("\\s+");
assertThrows(IllegalArgumentException.class, () -> new SplitEventProcessor(pluginMetrics, mockConfig));
}

@Test
void testFailureWithDelimiterRegexAndDelimiterDefinedMissing() {
when(mockConfig.getDelimiter()).thenReturn(null);
when(mockConfig.getDelimiterRegex()).thenReturn(null);
assertThrows(IllegalArgumentException.class, () -> new SplitEventProcessor(pluginMetrics, mockConfig));
}

@ParameterizedTest
@MethodSource("provideMaps")
void testSplitEventsBelongToTheSameAcknowledgementSet(Map<String, Object> inputMap1) {
final Record<Event> testRecord = createTestRecord(inputMap1);
Event originalEvent = testRecord.getData();
DefaultEventHandle originalEventHandle = (DefaultEventHandle) originalEvent.getEventHandle();
AcknowledgementSet originalAcknowledgementSet= originalEventHandle.getAcknowledgementSet();

final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig);
final List<Record<Event>> editedRecords = (List<Record<Event>>) objectUnderTest.doExecute(Collections.singletonList(testRecord));

DefaultEventHandle eventHandle = (DefaultEventHandle) originalEvent.getEventHandle();
AcknowledgementSet acknowledgementSet;
for(Record record: editedRecords) {
Event event = testRecord.getData();
eventHandle = (DefaultEventHandle) event.getEventHandle();
acknowledgementSet = eventHandle.getAcknowledgementSet();
assertEquals(originalAcknowledgementSet, acknowledgementSet);
}
}

@Test
void testSplitEventsWhenNoSplits() {
final Map<String, Object> testData = new HashMap<>();
List<Record<Event>> records = new ArrayList<>();

testData.put("k1", "v1");
testData.put("k2", "v2");
final Record<Event> record1 = createTestRecord(testData);

final Map<String, Object> testData2 = new HashMap<>();
testData2.put("k1", "");
testData2.put("k3", "v3");
final Record<Event> record2 = createTestRecord(testData2);

records.add(record1);
records.add(record2);
final List<Record<Event>> editedRecords = (List<Record<Event>>) splitEventProcessor.doExecute(records);

assertThat(editedRecords.size(), equalTo(2));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v2")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","", "k3", "v3")));
}


@Test
void testSplitEventsWhenOneSplit() {
List<Record<Event>> records = new ArrayList<>();
final Map<String, Object> testData = new HashMap<>();
testData.put("k1", "v1 v2");
testData.put("k2", "v3");
final Record<Event> record = createTestRecord(testData);
records.add(record);
final List<Record<Event>> editedRecords = (List<Record<Event>>) splitEventProcessor.doExecute(records);

assertThat(editedRecords.size(), equalTo(2));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v3")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2", "k2", "v3")));
}


@Test
void testSplitEventsWhenMultipleSplits() {
List<Record<Event>> records = new ArrayList<>();
final Map<String, Object> testData = new HashMap<>();
testData.put("k1", "v1 v2 v3 v4");
testData.put("k2", "v5");
final Record<Event> record = createTestRecord(testData);
records.add(record);
final List<Record<Event>> editedRecords = (List<Record<Event>>) splitEventProcessor.doExecute(records);

assertThat(editedRecords.size(), equalTo(4));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v5")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2", "k2", "v5")));
assertThat(editedRecords.get(2).getData().toMap(), equalTo(Map.of("k1","v3", "k2", "v5")));
assertThat(editedRecords.get(3).getData().toMap(), equalTo(Map.of("k1","v4", "k2", "v5")));
}

@Test
void testSplitEventsWhenMultipleSplitsMultipleRecords() {
List<Record<Event>> records = new ArrayList<>();
final Map<String, Object> testData1 = new HashMap<>();
testData1.put("k1", "v1 v2");
testData1.put("k2", "v5");
final Record<Event> record = createTestRecord(testData1);
records.add(record);

final Map<String, Object> testData2 = new HashMap<>();
testData2.put("k1", "v1");
testData2.put("k2", "v3");
final Record<Event> record2 = createTestRecord(testData2);
records.add(record2);
final List<Record<Event>> editedRecords = (List<Record<Event>>) splitEventProcessor.doExecute(records);

assertThat(editedRecords.size(), equalTo(3));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v5")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2", "k2", "v5")));
assertThat(editedRecords.get(2).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v3")));
}

@Test
void testSplitEventsWhenNoKeyPresentInEvent() {
List<Record<Event>> records = new ArrayList<>();
final Map<String, Object> testData1 = new HashMap<>();
testData1.put("k2", "v5 v6");
final Record<Event> record = createTestRecord(testData1);
records.add(record);

final Map<String, Object> testData2 = new HashMap<>();
testData2.put("k3", "v3 v5");
final Record<Event> record2 = createTestRecord(testData2);
records.add(record2);
final List<Record<Event>> editedRecords = (List<Record<Event>>) splitEventProcessor.doExecute(records);

assertThat(editedRecords.size(), equalTo(2));
assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k2", "v5 v6")));
assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k3", "v3 v5")));
}

@Test
public void testCreateNewRecordFromEvent() {
Event recordEvent = mock(Event.class);

Map<String, Object> eventData = new HashMap<>();
eventData.put("someField", "someValue");
when(recordEvent.toMap()).thenReturn(eventData);
when(recordEvent.getMetadata()).thenReturn(mock(EventMetadata.class));
String splitValue = "splitValue";

Record resultRecord = splitEventProcessor.createNewRecordFromEvent(recordEvent, splitValue);
Event editedEvent = (Event) resultRecord.getData();
// Assertions
assertEquals(editedEvent.getMetadata(),recordEvent.getMetadata());
}

@Test
public void testAddToAcknowledgementSetFromOriginEvent() {
Map<String, Object> data = Map.of("k1","v1");
EventMetadata eventMetadata = mock(EventMetadata.class);
Event originRecordEvent = JacksonEvent.builder()
.withEventMetadata(eventMetadata)
.withEventType("event")
.withData(data)
.build();
Event spyEvent = spy(originRecordEvent);

DefaultEventHandle mockEventHandle = mock(DefaultEventHandle.class);
when(spyEvent.getEventHandle()).thenReturn(mockEventHandle);

Record record = splitEventProcessor
.createNewRecordFromEvent(spyEvent, "v1");

Event recordEvent = (Event) record.getData();
splitEventProcessor.addToAcknowledgementSetFromOriginEvent(recordEvent, spyEvent);

DefaultEventHandle spyEventHandle = (DefaultEventHandle) spyEvent.getEventHandle();
// Verify that the add method is called on the acknowledgement set
verify(spyEventHandle).getAcknowledgementSet();

AcknowledgementSet spyAckSet = spyEventHandle.getAcknowledgementSet();
DefaultEventHandle eventHandle = (DefaultEventHandle) recordEvent.getEventHandle();
AcknowledgementSet ackSet1 = eventHandle.getAcknowledgementSet();

assertEquals(spyAckSet, ackSet1);
}

@Test
void testIsReadyForShutdown() {
assertTrue(splitEventProcessor.isReadyForShutdown());
}

}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -150,3 +150,4 @@ include 'data-prepper-plugins:buffer-common'
include 'data-prepper-plugins:dissect-processor'
include 'data-prepper-plugins:dynamodb-source'
include 'data-prepper-plugins:decompress-processor'
include 'data-prepper-plugins:split-event-processor'

0 comments on commit 458b5de

Please sign in to comment.