Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing core package in the data-prepper-core module #5056

Merged
merged 20 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.Assert.assertFalse;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.time.Instant;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class PipelinesWithAcksIT {
private static final Logger LOG = LoggerFactory.getLogger(PipelinesWithAcksIT.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker;
import org.opensearch.dataprepper.core.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.core.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
Expand All @@ -20,9 +23,6 @@
import org.opensearch.dataprepper.plugins.test.TestComponent;
import org.opensearch.dataprepper.plugins.test.TestDISource;
import org.opensearch.dataprepper.plugins.test.TestPlugin;
import org.opensearch.dataprepper.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.core.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker;
import org.opensearch.dataprepper.core.parser.config.DataPrepperAppConfiguration;
import org.opensearch.dataprepper.core.parser.config.FileStructurePathProvider;
import org.opensearch.dataprepper.core.parser.config.PipelineParserConfiguration;
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.core.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.core.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.core.validation.PluginErrorCollector;
import org.opensearch.dataprepper.expression.StartsWithExpressionFunction;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.config.DataPrepperAppConfiguration;
import org.opensearch.dataprepper.parser.config.FileStructurePathProvider;
import org.opensearch.dataprepper.parser.config.PipelineParserConfiguration;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.plugin.DefaultPluginFactory;
import org.opensearch.dataprepper.plugin.ObjectMapperConfiguration;
import org.opensearch.dataprepper.plugin.TestPluggableInterface;
import org.opensearch.dataprepper.plugins.test.TestExtension;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.ArrayList;
Expand Down Expand Up @@ -92,6 +93,7 @@ void setUp() {
coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName());

coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
coreContext.scan(StartsWithExpressionFunction.class.getPackage().getName());

when(fileStructurePathProvider.getPipelineConfigFileLocation()).thenReturn(
"src/test/resources/valid_pipeline.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Collection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.time.Duration;

/**
* A Data Prepper source which can receive records via the {@link InMemorySourceAccessor}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package org.opensearch.dataprepper.test.framework;

import org.opensearch.dataprepper.AbstractContextManager;
import org.opensearch.dataprepper.DataPrepper;
import org.opensearch.dataprepper.parser.config.FileStructurePathProvider;
import org.opensearch.dataprepper.core.DataPrepper;
import org.opensearch.dataprepper.core.parser.config.FileStructurePathProvider;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper;

import org.opensearch.dataprepper.core.DataPrepper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
Expand All @@ -24,7 +25,11 @@
public abstract class AbstractContextManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractContextManager.class);
private static final String BASE_DATA_PREPPER_PACKAGE = "org.opensearch.dataprepper";
private static final String EXPRESSION_PACKAGE = BASE_DATA_PREPPER_PACKAGE + ".expression";
private static final String[] BASE_DATA_PREPPER_PACKAGES = {
BASE_DATA_PREPPER_PACKAGE + ".core",
BASE_DATA_PREPPER_PACKAGE + ".plugin"
};
private static final String EXPRESSION_PACKAGE = BASE_DATA_PREPPER_PACKAGE + ".expression";

private final AnnotationConfigApplicationContext publicApplicationContext;
private final AnnotationConfigApplicationContext coreApplicationContext;
Expand Down Expand Up @@ -53,7 +58,7 @@ private void start() {

publicApplicationContext.refresh();
coreApplicationContext.setParent(publicApplicationContext);
coreApplicationContext.scan(BASE_DATA_PREPPER_PACKAGE);
coreApplicationContext.scan(BASE_DATA_PREPPER_PACKAGES);
preRefreshCoreApplicationContext(coreApplicationContext);

coreApplicationContext.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;
package org.opensearch.dataprepper.core;

import io.micrometer.core.instrument.util.StringUtils;
import org.opensearch.dataprepper.DataPrepperShutdownListener;
import org.opensearch.dataprepper.DataPrepperShutdownOptions;
import org.opensearch.dataprepper.core.parser.PipelineTransformer;
import org.opensearch.dataprepper.core.peerforwarder.server.PeerForwarderServer;
import org.opensearch.dataprepper.core.pipeline.Pipeline;
import org.opensearch.dataprepper.core.pipeline.PipelineObserver;
import org.opensearch.dataprepper.core.pipeline.PipelinesProvider;
import org.opensearch.dataprepper.core.pipeline.server.DataPrepperServer;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineTransformer;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer;
import org.opensearch.dataprepper.pipeline.Pipeline;
import org.opensearch.dataprepper.pipeline.PipelineObserver;
import org.opensearch.dataprepper.pipeline.PipelinesProvider;
import org.opensearch.dataprepper.pipeline.server.DataPrepperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
Expand Down Expand Up @@ -106,6 +108,8 @@ private void shutdownPipelines() {

/**
* Triggers the shutdown of all configured valid pipelines.
* @param shutdownOptions {@link DataPrepperShutdownOptions} to control the behavior of the shutdown process
* e.g. timeout, graceful shutdown, etc.
*/
public void shutdownPipelines(final DataPrepperShutdownOptions shutdownOptions) {
transformationPipelines.forEach((name, pipeline) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

@Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;

import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.HashSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* AcknowledgementSetMonitor - monitors the acknowledgement sets for completion/expiration
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import java.time.Duration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -19,12 +19,12 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class DefaultAcknowledgementSet implements AcknowledgementSet {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.metrics.PluginMetrics;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;

public class DefaultAcknowledgementSetMetrics {
static final String CREATED_METRIC_NAME = "numberOfAcknowledgementSetsCreated";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.ProgressCheck;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.acknowledgements;
package org.opensearch.dataprepper.core.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.util.function.Consumer;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;

import java.time.Duration;
import java.util.function.Consumer;

public class InactiveAcknowledgementSetManager implements AcknowledgementSetManager {
private static InactiveAcknowledgementSetManager theInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.breaker;
package org.opensearch.dataprepper.core.breaker;

import org.opensearch.dataprepper.core.parser.model.CircuitBreakerConfig;
import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.model.breaker.CircuitBreaker;
import org.opensearch.dataprepper.parser.model.CircuitBreakerConfig;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand Down
Loading
Loading