Skip to content

Commit

Permalink
Merge branch 'main' into openSearch-sink-configuration-changes
Browse files Browse the repository at this point in the history
Signed-off-by: Maxwell Brown <[email protected]>
  • Loading branch information
Galactus22625 authored Jan 16, 2025
2 parents 7edfe65 + b2cbbae commit ddc7faf
Show file tree
Hide file tree
Showing 153 changed files with 4,059 additions and 1,469 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
* @sb2k16 @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
* @sb2k16 @chenqi0805 @engechas @san81 @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ If you are modifying existing files with license headers, or including new files
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
*/
```

### Shell, Python
Expand Down
1 change: 1 addition & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon |
| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon |
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
| Santhosh Gandhe | [san81](https://github.com/san81) | Amazon |
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon |
| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon |
Expand Down
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ coreProjects.each { coreProject ->
def assembleTasks = collectTasksRecursively(coreProject, 'assemble')
def publishTasks = collectTasksRecursively(coreProject, 'publish')

// Add these tasks as dependencies of the release task
release.dependsOn assembleTasks
release.dependsOn publishTasks
// Explicitly declare release task for better gradle compatibility
def releaseTask = tasks.named('release').get()
releaseTask.dependsOn assembleTasks
releaseTask.dependsOn publishTasks
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Marks a Data Prepper plugin as experimental.
* <p>
* Experimental plugins do not have the same compatibility guarantees as other plugins and may be unstable.
* They may have breaking changes between minor versions and may even be removed.
* <p>
* Data Prepper administrators must enable experimental plugins in order to use them.
* Otherwise, they are not available to use with pipelines.
*
* @since 2.11
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Experimental {
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ public interface Event extends Serializable {
*/
void clear();

/**
* Merges another Event into the current Event.
* The values from the other Event will overwrite the values in the current Event for all keys in the current Event.
* If the other Event has keys that are not in the current Event, they will be unmodified.
*
* @param other the other Event to merge into this Event
* @throws IllegalArgumentException if the input event is not compatible to merge.
* @throws UnsupportedOperationException if the current Event does not support merging.
* @since 2.11
*/
void merge(Event other);

/**
* Generates a serialized Json string of the entire Event
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ public static Event fromMessage(String message) {
}

private JsonNode getInitialJsonNode(final Object data) {

if (data == null) {
return mapper.valueToTree(new HashMap<>());
} else if (data instanceof String) {
Expand Down Expand Up @@ -348,14 +347,30 @@ public void clear() {
}
}

@Override
public void merge(final Event other) {
if(!(other instanceof JacksonEvent))
throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent.");
final JacksonEvent otherJacksonEvent = (JacksonEvent) other;
if(!(otherJacksonEvent.jsonNode instanceof ObjectNode)) {
throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent with object data.");
}
final ObjectNode otherObjectNode = (ObjectNode) otherJacksonEvent.jsonNode;

if(!(jsonNode instanceof ObjectNode)) {
throw new UnsupportedOperationException("Unable to merge the Event. The current Event must have object data.");
}

((ObjectNode) jsonNode).setAll(otherObjectNode);
}

@Override
public String toJsonString() {
return jsonNode.toString();
}

@Override
public String getAsJsonString(EventKey key) {

JacksonEventKey jacksonEventKey = asJacksonEventKey(key);
final JsonNode node = getNode(jacksonEventKey);
if (node.isMissingNode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,52 @@ public void testClear() {
assertThat(event.toMap().size(), equalTo(0));
}

@Test
void merge_with_non_JacksonEvent_throws() {
final Event otherEvent = mock(Event.class);
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent));
}

@Test
void merge_with_array_JsonNode_throws() {
final JacksonEvent otherEvent = (JacksonEvent) event;
event = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(List.of(UUID.randomUUID().toString())).build();
assertThrows(UnsupportedOperationException.class, () -> event.merge(otherEvent));
}

@Test
void merge_with_array_JsonNode_in_other_throws() {
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(List.of(UUID.randomUUID().toString())).build();
assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent));
}

@Test
void merge_sets_all_values() {
final String jsonString = "{\"a\": \"alpha\", \"info\": {\"ids\": {\"id\":\"idx\"}}}";
event.put("b", "original");
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
event.merge(otherEvent);

assertThat(event.get("b", Object.class), equalTo("original"));
assertThat(event.get("a", Object.class), equalTo("alpha"));
assertThat(event.containsKey("info"), equalTo(true));
assertThat(event.get("info/ids/id", String.class), equalTo("idx"));
}

@Test
void merge_overrides_existing_values() {
final String jsonString = "{\"a\": \"alpha\", \"info\": {\"ids\": {\"id\":\"idx\"}}}";
event.put("a", "original");
event.put("b", "original");
Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build();
event.merge(otherEvent);

assertThat(event.get("b", Object.class), equalTo("original"));
assertThat(event.get("a", Object.class), equalTo("alpha"));
assertThat(event.containsKey("info"), equalTo(true));
assertThat(event.get("info/ids/id", String.class), equalTo("idx"));
}

@ParameterizedTest
@ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"})
public void testDelete_withNonexistentKey(final String key) {
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies {
implementation 'io.micrometer:micrometer-registry-cloudwatch2'
implementation 'javax.ws.rs:javax.ws.rs-api:2.1.1'
implementation 'software.amazon.awssdk:cloudwatch'
implementation platform('org.apache.logging.log4j:log4j-bom:2.23.1')
implementation platform('org.apache.logging.log4j:log4j-bom:2.24.3')
implementation 'org.apache.logging.log4j:log4j-core'
implementation 'org.apache.logging.log4j:log4j-slf4j2-impl'
implementation 'javax.inject:javax.inject:1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker;
import org.opensearch.dataprepper.core.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.core.validation.PluginErrorCollector;
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject;
import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
Expand All @@ -27,6 +28,7 @@
import org.opensearch.dataprepper.plugins.test.TestPlugin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand All @@ -38,6 +40,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;

/**
* Integration test of the plugin framework. These tests should not mock any portion
Expand All @@ -49,6 +52,8 @@ class DefaultPluginFactoryIT {
private PipelinesDataFlowModel pipelinesDataFlowModel;
@Mock
private ExtensionsConfiguration extensionsConfiguration;
@Mock
private ExperimentalConfigurationContainer experimentalConfigurationContainer;
private String pluginName;
private String objectPluginName;
private String pipelineName;
Expand All @@ -67,6 +72,8 @@ private DefaultPluginFactory createObjectUnderTest() {
final AnnotationConfigApplicationContext coreContext = new AnnotationConfigApplicationContext();
coreContext.setParent(publicContext);

when(experimentalConfigurationContainer.getExperimental()).thenReturn(ExperimentalConfiguration.defaultConfiguration());

coreContext.scan(EventFactoryApplicationContextMarker.class.getPackage().getName());
coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName());
coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
Expand All @@ -75,6 +82,7 @@ private DefaultPluginFactory createObjectUnderTest() {
coreContext.registerBean(PluginErrorsHandler.class, LoggingPluginErrorsHandler::new);
coreContext.registerBean(ExtensionsConfiguration.class, () -> extensionsConfiguration);
coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel);
coreContext.registerBean(ExperimentalConfigurationContainer.class, () -> experimentalConfigurationContainer);
coreContext.refresh();

return coreContext.getBean(DefaultPluginFactory.class);
Expand Down Expand Up @@ -188,6 +196,20 @@ void loadPlugin_should_throw_when_a_plugin_configuration_is_invalid() {
assertThat(actualException.getMessage(), equalTo("Plugin test_plugin in pipeline " + pipelineName + " is configured incorrectly: requiredString must not be null"));
}

@Test
void loadPlugin_should_throw_when_a_plugin_is_experimental_by_default() {
pluginName = "test_experimental_plugin";
final PluginSetting pluginSetting = createPluginSettings(Collections.emptyMap());

final DefaultPluginFactory objectUnderTest = createObjectUnderTest();

final NoPluginFoundException actualException = assertThrows(NoPluginFoundException.class,
() -> objectUnderTest.loadPlugin(TestPluggableInterface.class, pluginSetting));

assertThat(actualException.getMessage(), notNullValue());
assertThat(actualException.getMessage(), equalTo("Unable to create experimental plugin test_experimental_plugin. You must enable experimental plugins in data-prepper-config.yaml in order to use them."));
}

private PluginSetting createPluginSettings(final Map<String, Object> pluginSettingMap) {
final PluginSetting pluginSetting = new PluginSetting(pluginName, pluginSettingMap);
pluginSetting.setPipelineName(pipelineName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,6 @@ private void buildPipelineFromConfiguration(
.map(this::buildRoutedSinkOrConnector)
.collect(Collectors.toList());

final List<PluginError> subPipelinePluginErrors = pluginErrorCollector.getPluginErrors()
.stream().filter(pluginError -> pipelineName.equals(pluginError.getPipelineName()))
.collect(Collectors.toList());

final List<PluginError> invalidRouteExpressions = pipelineConfiguration.getRoutes()
.stream().filter(route -> !expressionEvaluator.isValidExpressionStatement(route.getCondition()))
.map(route -> PluginError.builder()
Expand All @@ -190,8 +186,12 @@ private void buildPipelineFromConfiguration(
.build())
.collect(Collectors.toList());

if (!subPipelinePluginErrors.isEmpty() || !invalidRouteExpressions.isEmpty()) {
subPipelinePluginErrors.addAll(invalidRouteExpressions);
invalidRouteExpressions.forEach(pluginErrorCollector::collectPluginError);
final List<PluginError> subPipelinePluginErrors = pluginErrorCollector.getPluginErrors()
.stream().filter(pluginError -> pipelineName.equals(pluginError.getPipelineName()))
.collect(Collectors.toList());

if (!subPipelinePluginErrors.isEmpty()) {
pluginErrorsHandler.handleErrors(subPipelinePluginErrors);
throw new InvalidPluginConfigurationException(
String.format("One or more plugins are not configured correctly in the pipeline: %s.\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.opensearch.dataprepper.core.pipeline.PipelineShutdownOption;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugin.ExperimentalConfiguration;
import org.opensearch.dataprepper.plugin.ExperimentalConfigurationContainer;
import org.opensearch.dataprepper.plugin.ExtensionsConfiguration;

import java.time.Duration;
Expand All @@ -31,7 +33,7 @@
/**
* Class to hold configuration for DataPrepper, including server port and Log4j settings
*/
public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer {
public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer, ExperimentalConfigurationContainer {
static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L);

private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory";
Expand All @@ -55,6 +57,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventC
private PeerForwarderConfiguration peerForwarderConfiguration;
private Duration processorShutdownTimeout;
private Duration sinkShutdownTimeout;
private ExperimentalConfiguration experimental;
private PipelineExtensions pipelineExtensions;

public static final DataPrepperConfiguration DEFAULT_CONFIG = new DataPrepperConfiguration();
Expand Down Expand Up @@ -96,6 +99,7 @@ public DataPrepperConfiguration(
@JsonProperty("source_coordination") final SourceCoordinationConfig sourceCoordinationConfig,
@JsonProperty("pipeline_shutdown") final PipelineShutdownOption pipelineShutdown,
@JsonProperty("event") final EventConfiguration eventConfiguration,
@JsonProperty("experimental") final ExperimentalConfiguration experimental,
@JsonProperty("extensions")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSetter(nulls = Nulls.SKIP)
Expand Down Expand Up @@ -126,6 +130,8 @@ public DataPrepperConfiguration(
if (this.sinkShutdownTimeout.isNegative()) {
throw new IllegalArgumentException("sinkShutdownTimeout must be non-negative.");
}
this.experimental = experimental != null ? experimental : ExperimentalConfiguration.defaultConfiguration();

this.pipelineExtensions = pipelineExtensions;
}

Expand Down Expand Up @@ -239,4 +245,9 @@ public EventConfiguration getEventConfiguration() {
public PipelineExtensions getPipelineExtensions() {
return pipelineExtensions;
}

@Override
public ExperimentalConfiguration getExperimental() {
return experimental;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ void setUp() {
@AfterEach
void tearDown() {
verify(dataPrepperConfiguration).getEventConfiguration();
verify(dataPrepperConfiguration).getExperimental();
verifyNoMoreInteractions(dataPrepperConfiguration);
}

Expand Down Expand Up @@ -396,6 +397,7 @@ void parseConfiguration_with_invalid_route_expressions_handles_errors_and_return
final Collection<PluginError> pluginErrorCollection = pluginErrorArgumentCaptor.getValue();
assertThat(pluginErrorCollection, notNullValue());
assertThat(pluginErrorCollection.size(), equalTo(1));
assertThat(pluginErrorCollector.getPluginErrors(), equalTo(pluginErrorCollection));

final PluginError pluginError = pluginErrorCollection.stream().findAny().orElseThrow();
final String expectedErrorMessage = String.format(CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT, "service", "/value == service");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.Experimental;
import org.opensearch.dataprepper.plugin.TestPluggableInterface;

@DataPrepperPlugin(name = "test_experimental_plugin", pluginType = TestPluggableInterface.class)
@Experimental
public class TestExperimentalPlugin {
}
1 change: 1 addition & 0 deletions data-prepper-plugin-framework/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ dependencies {
implementation libs.reflections.core
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-text:1.10.0'
testImplementation 'ch.qos.logback:logback-classic:1.5.16'
}
Loading

0 comments on commit ddc7faf

Please sign in to comment.