Skip to content

Commit

Permalink
Add support for spilt event processor(#4089)
Browse files Browse the repository at this point in the history
Signed-off-by: srigovs <[email protected]>
  • Loading branch information
srikanthjg committed Feb 21, 2024
1 parent 2ef6aca commit 7d94919
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 18 deletions.
8 changes: 0 additions & 8 deletions data-prepper-plugins/split-event-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
Expand All @@ -23,8 +19,4 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.function.Function;
import java.util.regex.Pattern;


@DataPrepperPlugin(name = "split_event", pluginType = Processor.class, pluginConfigurationType = SplitEventProcessorConfig.class)
public class SplitEventProcessor extends AbstractProcessor<Record<Event>, Record<Event>>{
final Pattern pattern;
final String delimiter;
final String delimiterRegex;
final String field;
final Pattern pattern;
private final Function<String, String[]> splitter;

@DataPrepperPluginConstructor
public SplitEventProcessor(final PluginMetrics pluginMetrics, final SplitEventProcessorConfig config) {
Expand All @@ -49,8 +51,10 @@ public SplitEventProcessor(final PluginMetrics pluginMetrics, final SplitEventPr

if(delimiterRegex != null && !delimiterRegex.isEmpty()) {
pattern = Pattern.compile(delimiterRegex);
splitter = pattern::split;
} else {
pattern = Pattern.compile(Pattern.quote(delimiter));
splitter = inputString -> inputString.split(delimiter);
pattern = null;
}
}

Expand All @@ -61,20 +65,18 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
final Event recordEvent = record.getData();

if (!recordEvent.containsKey(field)) {
Record newRecord = new Record<>(recordEvent);
newRecords.add(newRecord);
newRecords.add(record);
continue;
}

final Object value = recordEvent.get(field, Object.class);

//split record according to delimiter
final String[] splitValues = pattern.split((String) value);
final String[] splitValues = splitter.apply((String) value);

// when no splits or empty value modify the original event
// when no splits or empty value use the original record
if(splitValues.length <= 1) {
Record newRecord = new Record<>(recordEvent);
newRecords.add(newRecord);
newRecords.add(record);
continue;
}

Expand All @@ -87,8 +89,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

// Modify original event to hold the last split
recordEvent.put(field, splitValues[splitValues.length-1]);
Record newRecord = new Record<>(recordEvent);
newRecords.add(newRecord);
newRecords.add(record);
}
return newRecords;
}
Expand Down

0 comments on commit 7d94919

Please sign in to comment.