From 458b5de2dcaed99dfa1b90a9492979e2c1fecd53 Mon Sep 17 00:00:00 2001 From: srigovs Date: Tue, 20 Feb 2024 15:22:04 -0800 Subject: [PATCH] Add support for spilt event processor Signed-off-by: srigovs --- .../split-event-processor/build.gradle | 30 ++ .../splitevent/SplitEventProcessor.java | 127 ++++++ .../splitevent/SplitEventProcessorConfig.java | 42 ++ .../splitevent/SplitEventProcessorTest.java | 395 ++++++++++++++++++ settings.gradle | 1 + 5 files changed, 595 insertions(+) create mode 100644 data-prepper-plugins/split-event-processor/build.gradle create mode 100644 data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java create mode 100644 data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorConfig.java create mode 100644 data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java diff --git a/data-prepper-plugins/split-event-processor/build.gradle b/data-prepper-plugins/split-event-processor/build.gradle new file mode 100644 index 0000000000..7b6c9c934b --- /dev/null +++ b/data-prepper-plugins/split-event-processor/build.gradle @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { + limit { + minimum = 1.0 + } + } + } +} + + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:common') + implementation 'com.fasterxml.jackson.core:jackson-databind' +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java new file mode 100644 index 0000000000..f304cd5f3a --- /dev/null +++ b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessor.java @@ -0,0 +1,127 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.splitevent; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + + +@DataPrepperPlugin(name = "split_event", pluginType = Processor.class, pluginConfigurationType = SplitEventProcessorConfig.class) +public class SplitEventProcessor extends AbstractProcessor, Record>{ + final Pattern pattern; + final String delimiter; + final String delimiterRegex; + final String field; + + @DataPrepperPluginConstructor + public SplitEventProcessor(final PluginMetrics pluginMetrics, final SplitEventProcessorConfig config) { + super(pluginMetrics); + this.delimiter = config.getDelimiter(); + this.delimiterRegex = config.getDelimiterRegex(); + this.field = config.getField(); + + if(delimiterRegex != null && !delimiterRegex.isEmpty() + && delimiter != null && !delimiter.isEmpty()) { + throw new IllegalArgumentException("delimiter and delimiter_regex cannot be defined at the same time"); + } else if((delimiterRegex == null || delimiterRegex.isEmpty()) && + (delimiter == null || delimiter.isEmpty())) { + throw new IllegalArgumentException("delimiter or delimiter_regex needs to be defined"); + } + + if(delimiterRegex != null && !delimiterRegex.isEmpty()) { + pattern = Pattern.compile(delimiterRegex); + } else { + pattern = Pattern.compile(Pattern.quote(delimiter)); + } + } + + @Override + public Collection> doExecute(final Collection> records) { + Collection> newRecords = new ArrayList<>(); + for(final Record record : records) { + final Event recordEvent = record.getData(); + + if (!recordEvent.containsKey(field)) { + Record newRecord = new Record<>(recordEvent); + newRecords.add(newRecord); + continue; + } + + final Object value = recordEvent.get(field, Object.class); + + //split record according to delimiter + final String[] splitValues = pattern.split((String) value); + + // when no splits or empty value modify the original event + if(splitValues.length <= 1) { + Record newRecord = new Record<>(recordEvent); + newRecords.add(newRecord); + continue; + } + + //create new events for the splits + for (int i = 0; i < splitValues.length-1 ; i++) { + Record newRecord = createNewRecordFromEvent(recordEvent, splitValues[i]); + addToAcknowledgementSetFromOriginEvent((Event) newRecord.getData(), recordEvent); + newRecords.add(newRecord); + } + + // Modify original event to hold the last split + recordEvent.put(field, splitValues[splitValues.length-1]); + Record newRecord = new Record<>(recordEvent); + newRecords.add(newRecord); + } + return newRecords; + } + + protected Record createNewRecordFromEvent(final Event recordEvent, String splitValue) { + Record newRecord; + JacksonEvent newRecordEvent; + + newRecordEvent = JacksonEvent.fromEvent(recordEvent); + newRecordEvent.put(field,(Object) splitValue); + newRecord = new Record<>(newRecordEvent); + return newRecord; + } + + protected void addToAcknowledgementSetFromOriginEvent(Event recordEvent, Event originRecordEvent) { + DefaultEventHandle eventHandle = (DefaultEventHandle) originRecordEvent.getEventHandle(); + if (eventHandle != null && eventHandle.getAcknowledgementSet() != null) { + eventHandle.getAcknowledgementSet().add(recordEvent); + } + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorConfig.java b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorConfig.java new file mode 100644 index 0000000000..c4af96a3d4 --- /dev/null +++ b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorConfig.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.splitevent; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; + + +public class SplitEventProcessorConfig { + @NotEmpty + @NotNull + @JsonProperty("field") + private String field; + + @JsonProperty("delimiter_regex") + private String delimiterRegex; + + @Size(min = 1, max = 1) + private String delimiter; + + public String getField() { + return field; + } + + public String getDelimiterRegex() { + return delimiterRegex; + } + + public String getDelimiter() { + return delimiter; + } +} diff --git a/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java b/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java new file mode 100644 index 0000000000..fcbd6ee527 --- /dev/null +++ b/data-prepper-plugins/split-event-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorTest.java @@ -0,0 +1,395 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.processor.splitevent; + + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import static org.mockito.ArgumentMatchers.any; +import org.mockito.Mock; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + + +@ExtendWith(MockitoExtension.class) +public class SplitEventProcessorTest { + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private SplitEventProcessorConfig mockConfig; + + @Mock + private AcknowledgementSet mockAcknowledgementSet; + + private SplitEventProcessor splitEventProcessor; + + + private Record createTestRecord(final Map data) { + + Event event = JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build(); + + DefaultEventHandle eventHandle = (DefaultEventHandle) event.getEventHandle(); + + eventHandle.setAcknowledgementSet(mockAcknowledgementSet); + return new Record<>(event); + } + + @BeforeEach + void setup() { + when(mockConfig.getField()).thenReturn("k1"); + when(mockConfig.getDelimiter()).thenReturn(" "); + + splitEventProcessor = new SplitEventProcessor(pluginMetrics, mockConfig); + } + + private static Stream provideMaps() { + return Stream.of( + Arguments.of( + Map.of( + "k1", "", + "k2", "v2" + ) + ), + Arguments.of( + Map.of( + "k1", "v1", + "k2", "v2" + ) + ), + Arguments.of( + Map.of("k1", "v1 v2", + "k2", "v2" + ) + ), + Arguments.of( + Map.of("k1", "v1 v2 v3", + "k2", "v2" + ) + ) + ); + } + + @Test + void testHappyPathWithSpaceDelimiter() { + final Map testData = new HashMap<>(); + testData.put("k1", "v1 v2"); + final Record record = createTestRecord(testData); + when(mockConfig.getDelimiter()).thenReturn(" "); + + final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig); + final List> editedRecords = (List>) objectUnderTest.doExecute(Collections.singletonList(record)); + + for(Record r: editedRecords){ + Event event = (Event) r.getData(); + } + + assertThat(editedRecords.size(), equalTo(2)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2"))); + } + + @Test + void testHappyPathWithSpaceDelimiterForStringWithMultipleSpaces() { + final Map testData = new HashMap<>(); + testData.put("k1", "v1 v2"); + final Record record = createTestRecord(testData); + when(mockConfig.getDelimiter()).thenReturn(" "); + + final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig); + final List> editedRecords = (List>) objectUnderTest.doExecute(Collections.singletonList(record)); + for(Record r: editedRecords){ + Event event = (Event) r.getData(); + } + + assertThat(editedRecords.size(), equalTo(3)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1",""))); + assertThat(editedRecords.get(2).getData().toMap(), equalTo(Map.of("k1","v2"))); + } + + @Test + void testHappyPathWithSemiColonDelimiter() { + final Map testData = new HashMap<>(); + testData.put("k1", "v1;v2"); + final Record record = createTestRecord(testData); + when(mockConfig.getDelimiter()).thenReturn(";"); + + final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig); + final List> editedRecords = (List>) objectUnderTest.doExecute(Collections.singletonList(record)); + + for(Record r: editedRecords){ + Event event = (Event) r.getData(); + } + + assertThat(editedRecords.size(), equalTo(2)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2"))); + } + + @Test + void testHappyPathWithSpaceDelimiterRegex() { + final Map testData = new HashMap<>(); + testData.put("k1", "v1 v2"); + final Record record = createTestRecord(testData); + when(mockConfig.getDelimiter()).thenReturn(null); + when(mockConfig.getDelimiterRegex()).thenReturn("\\s+"); + + final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig); + final List> editedRecords = (List>) objectUnderTest.doExecute(Collections.singletonList(record)); + + for(Record r: editedRecords){ + Event event = (Event) r.getData(); + } + + assertThat(editedRecords.size(), equalTo(2)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2"))); + } + + @Test + void testHappyPathWithColonDelimiterRegex() { + final Map testData = new HashMap<>(); + testData.put("k1", "v1:v2"); + final Record record = createTestRecord(testData); + when(mockConfig.getDelimiter()).thenReturn(null); + when(mockConfig.getDelimiterRegex()).thenReturn(":"); + + final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig); + final List> editedRecords = (List>) objectUnderTest.doExecute(Collections.singletonList(record)); + + for(Record r: editedRecords){ + Event event = (Event) r.getData(); + } + + assertThat(editedRecords.size(), equalTo(2)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2"))); + } + + @Test + void testFailureWithBothDelimiterRegexAndDelimiterDefined() { + when(mockConfig.getDelimiter()).thenReturn(" "); + when(mockConfig.getDelimiterRegex()).thenReturn("\\s+"); + assertThrows(IllegalArgumentException.class, () -> new SplitEventProcessor(pluginMetrics, mockConfig)); + } + + @Test + void testFailureWithDelimiterRegexAndDelimiterDefinedMissing() { + when(mockConfig.getDelimiter()).thenReturn(null); + when(mockConfig.getDelimiterRegex()).thenReturn(null); + assertThrows(IllegalArgumentException.class, () -> new SplitEventProcessor(pluginMetrics, mockConfig)); + } + + @ParameterizedTest + @MethodSource("provideMaps") + void testSplitEventsBelongToTheSameAcknowledgementSet(Map inputMap1) { + final Record testRecord = createTestRecord(inputMap1); + Event originalEvent = testRecord.getData(); + DefaultEventHandle originalEventHandle = (DefaultEventHandle) originalEvent.getEventHandle(); + AcknowledgementSet originalAcknowledgementSet= originalEventHandle.getAcknowledgementSet(); + + final SplitEventProcessor objectUnderTest = new SplitEventProcessor(pluginMetrics, mockConfig); + final List> editedRecords = (List>) objectUnderTest.doExecute(Collections.singletonList(testRecord)); + + DefaultEventHandle eventHandle = (DefaultEventHandle) originalEvent.getEventHandle(); + AcknowledgementSet acknowledgementSet; + for(Record record: editedRecords) { + Event event = testRecord.getData(); + eventHandle = (DefaultEventHandle) event.getEventHandle(); + acknowledgementSet = eventHandle.getAcknowledgementSet(); + assertEquals(originalAcknowledgementSet, acknowledgementSet); + } + } + + @Test + void testSplitEventsWhenNoSplits() { + final Map testData = new HashMap<>(); + List> records = new ArrayList<>(); + + testData.put("k1", "v1"); + testData.put("k2", "v2"); + final Record record1 = createTestRecord(testData); + + final Map testData2 = new HashMap<>(); + testData2.put("k1", ""); + testData2.put("k3", "v3"); + final Record record2 = createTestRecord(testData2); + + records.add(record1); + records.add(record2); + final List> editedRecords = (List>) splitEventProcessor.doExecute(records); + + assertThat(editedRecords.size(), equalTo(2)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v2"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","", "k3", "v3"))); + } + + + @Test + void testSplitEventsWhenOneSplit() { + List> records = new ArrayList<>(); + final Map testData = new HashMap<>(); + testData.put("k1", "v1 v2"); + testData.put("k2", "v3"); + final Record record = createTestRecord(testData); + records.add(record); + final List> editedRecords = (List>) splitEventProcessor.doExecute(records); + + assertThat(editedRecords.size(), equalTo(2)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v3"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2", "k2", "v3"))); + } + + + @Test + void testSplitEventsWhenMultipleSplits() { + List> records = new ArrayList<>(); + final Map testData = new HashMap<>(); + testData.put("k1", "v1 v2 v3 v4"); + testData.put("k2", "v5"); + final Record record = createTestRecord(testData); + records.add(record); + final List> editedRecords = (List>) splitEventProcessor.doExecute(records); + + assertThat(editedRecords.size(), equalTo(4)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v5"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2", "k2", "v5"))); + assertThat(editedRecords.get(2).getData().toMap(), equalTo(Map.of("k1","v3", "k2", "v5"))); + assertThat(editedRecords.get(3).getData().toMap(), equalTo(Map.of("k1","v4", "k2", "v5"))); + } + + @Test + void testSplitEventsWhenMultipleSplitsMultipleRecords() { + List> records = new ArrayList<>(); + final Map testData1 = new HashMap<>(); + testData1.put("k1", "v1 v2"); + testData1.put("k2", "v5"); + final Record record = createTestRecord(testData1); + records.add(record); + + final Map testData2 = new HashMap<>(); + testData2.put("k1", "v1"); + testData2.put("k2", "v3"); + final Record record2 = createTestRecord(testData2); + records.add(record2); + final List> editedRecords = (List>) splitEventProcessor.doExecute(records); + + assertThat(editedRecords.size(), equalTo(3)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v5"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k1","v2", "k2", "v5"))); + assertThat(editedRecords.get(2).getData().toMap(), equalTo(Map.of("k1","v1", "k2", "v3"))); + } + + @Test + void testSplitEventsWhenNoKeyPresentInEvent() { + List> records = new ArrayList<>(); + final Map testData1 = new HashMap<>(); + testData1.put("k2", "v5 v6"); + final Record record = createTestRecord(testData1); + records.add(record); + + final Map testData2 = new HashMap<>(); + testData2.put("k3", "v3 v5"); + final Record record2 = createTestRecord(testData2); + records.add(record2); + final List> editedRecords = (List>) splitEventProcessor.doExecute(records); + + assertThat(editedRecords.size(), equalTo(2)); + assertThat(editedRecords.get(0).getData().toMap(), equalTo(Map.of("k2", "v5 v6"))); + assertThat(editedRecords.get(1).getData().toMap(), equalTo(Map.of("k3", "v3 v5"))); + } + + @Test + public void testCreateNewRecordFromEvent() { + Event recordEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put("someField", "someValue"); + when(recordEvent.toMap()).thenReturn(eventData); + when(recordEvent.getMetadata()).thenReturn(mock(EventMetadata.class)); + String splitValue = "splitValue"; + + Record resultRecord = splitEventProcessor.createNewRecordFromEvent(recordEvent, splitValue); + Event editedEvent = (Event) resultRecord.getData(); + // Assertions + assertEquals(editedEvent.getMetadata(),recordEvent.getMetadata()); + } + + @Test + public void testAddToAcknowledgementSetFromOriginEvent() { + Map data = Map.of("k1","v1"); + EventMetadata eventMetadata = mock(EventMetadata.class); + Event originRecordEvent = JacksonEvent.builder() + .withEventMetadata(eventMetadata) + .withEventType("event") + .withData(data) + .build(); + Event spyEvent = spy(originRecordEvent); + + DefaultEventHandle mockEventHandle = mock(DefaultEventHandle.class); + when(spyEvent.getEventHandle()).thenReturn(mockEventHandle); + + Record record = splitEventProcessor + .createNewRecordFromEvent(spyEvent, "v1"); + + Event recordEvent = (Event) record.getData(); + splitEventProcessor.addToAcknowledgementSetFromOriginEvent(recordEvent, spyEvent); + + DefaultEventHandle spyEventHandle = (DefaultEventHandle) spyEvent.getEventHandle(); + // Verify that the add method is called on the acknowledgement set + verify(spyEventHandle).getAcknowledgementSet(); + + AcknowledgementSet spyAckSet = spyEventHandle.getAcknowledgementSet(); + DefaultEventHandle eventHandle = (DefaultEventHandle) recordEvent.getEventHandle(); + AcknowledgementSet ackSet1 = eventHandle.getAcknowledgementSet(); + + assertEquals(spyAckSet, ackSet1); + } + + @Test + void testIsReadyForShutdown() { + assertTrue(splitEventProcessor.isReadyForShutdown()); + } + +} diff --git a/settings.gradle b/settings.gradle index 8edc547186..0742f43cbb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -150,3 +150,4 @@ include 'data-prepper-plugins:buffer-common' include 'data-prepper-plugins:dissect-processor' include 'data-prepper-plugins:dynamodb-source' include 'data-prepper-plugins:decompress-processor' +include 'data-prepper-plugins:split-event-processor'