Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into openSearch-sink-configura…
Browse files Browse the repository at this point in the history
…tion-changes
  • Loading branch information
Galactus22625 authored Jan 6, 2025
2 parents 21b2b97 + 795401f commit 6d87001
Show file tree
Hide file tree
Showing 100 changed files with 4,301 additions and 10,623 deletions.
82 changes: 82 additions & 0 deletions .github/workflows/kinesis-source-integration-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
name: kinesis-source-integration-tests

on:
push:
paths:
- 'data-prepper-plugins/kinesis-source/**'
- '*gradle*'
pull_request:
paths:
- 'data-prepper-plugins/kinesis-source/**'
- '*gradle*'

workflow_dispatch:

jobs:
build:
strategy:
matrix:
java: [ 11, 17, 21, docker ]
fail-fast: false

runs-on: ubuntu-latest

steps:

- name: Git clone the repository
uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha }}

- name: configure aws credentials
id: creds
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.TEST_IAM_ROLE_ARN }}
aws-region: ${{ secrets.TEST_REGION }}
output-credentials: true

- name: get caller identity 1
run: |
aws sts get-caller-identity
- name: Configure AWS Credentials file
run: |
aws configure set default.region ${{ secrets.TEST_REGION }}
aws configure set default.aws_access_key_id ${{ steps.creds.outputs.aws-access-key-id }}
aws configure set default.aws_secret_access_key ${{ steps.creds.outputs.aws-secret-access-key }}
aws configure set default.aws_session_token ${{ steps.creds.outputs.aws-session-token }}
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}

- name: Checkout Data Prepper
uses: actions/checkout@v2

- name: Run Kinesis Source integration tests
run: |
./gradlew data-prepper-plugins:kinesis-source:integrationTest \
-Dtests.kinesis.source.aws.region=us-east-1 --tests KinesisSourceIT
- name: Upload Unit Test Results
if: always()
uses: actions/upload-artifact@v3
with:
name: data-prepper-kinesis-source-integration-tests-java-${{ matrix.java }}
path: '**/test-results/**/*.xml'

publish-test-results:
name: "Publish Unit Tests Results"
needs: build
runs-on: ubuntu-latest
if: always()

steps:
- name: Download Artifacts
uses: actions/download-artifact@v3
with:
path: test-results

- name: Publish Unit Test Results
uses: EnricoMi/publish-unit-test-result-action@v1
with:
files: "test-results/**/*.xml"
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,17 @@ public interface ExpressionEvaluator {
Object evaluate(final String statement, final Event context);

default Boolean evaluateConditional(final String statement, final Event context) {
final Object result = evaluate(statement, context);
if (result instanceof Boolean) {
return (Boolean) result;
} else {
throw new ClassCastException("Unexpected expression return type of " + result.getClass());
Object result;
try {
result = evaluate(statement, context);
if (result instanceof Boolean) {
return (Boolean) result;
}
throw new ClassCastException("Unexpected expression return value of " + result);
} catch (ExpressionParsingException e) {
throw e;
} catch (ExpressionEvaluationException e) {
return false;
}
}

Expand All @@ -42,4 +48,4 @@ default Boolean evaluateConditional(final String statement, final Event context)
List<String> extractDynamicKeysFromFormatExpression(final String format);

List<String> extractDynamicExpressionsFromFormatExpression(final String format);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.expression;

/**
* @since 2.11
* Wrapper exception for any exception thrown while evaluating a statement
*/
public class ExpressionParsingException extends ExpressionEvaluationException {
public ExpressionParsingException(final String message, final Throwable cause) {
super(message, cause);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ public Builder<T> withTimeReceived(final Instant timeReceived) {
* @return returns the builder
* @since 2.10
*/
protected Builder<T> withEventHandle(final EventHandle eventHandle) {
public Builder<T> withEventHandle(final EventHandle eventHandle) {
this.eventHandle = eventHandle;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public interface Processor<InputRecord extends Record<?>, OutputRecord extends R
*/
void prepareForShutdown();

/**
* @since 2.11
* Indicates if the processor holds the events or not
* Holding events indicates that the events are not ready to be released.
*/
default boolean holdsEvents() {
return false;
}

/**
* @since 1.2
* Returns true if the Processor's internal state is safe to be shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,29 @@
public class ExpressionEvaluatorTest {
private ExpressionEvaluator expressionEvaluator;
class TestExpressionEvaluator implements ExpressionEvaluator {
private final boolean throwsExpressionEvaluationException;
private final boolean throwsExpressionParsingException;
private final boolean returnNull;
public TestExpressionEvaluator() {
throwsExpressionEvaluationException = false;
throwsExpressionParsingException = false;
returnNull = false;
}

public TestExpressionEvaluator(boolean throwsExpressionEvaluationException, boolean throwsExpressionParsingException, boolean returnNull) {
this.throwsExpressionEvaluationException = throwsExpressionEvaluationException;
this.throwsExpressionParsingException = throwsExpressionParsingException;
this.returnNull = returnNull;
}

public Object evaluate(final String statement, final Event event) {
if (throwsExpressionEvaluationException) {
throw new ExpressionEvaluationException("Expression Evaluation Exception", new RuntimeException("runtime exception"));
} else if (throwsExpressionParsingException) {
throw new ExpressionParsingException("Expression Parsing Exception", new RuntimeException("runtime exception"));
} else if (returnNull) {
return null;
}
return event.get(statement, Object.class);
}

Expand Down Expand Up @@ -48,7 +70,30 @@ public List<String> extractDynamicExpressionsFromFormatExpression(String format)
public void testDefaultEvaluateConditional() {
expressionEvaluator = new TestExpressionEvaluator();
assertThat(expressionEvaluator.evaluateConditional("/status", event("{\"status\":true}")), equalTo(true));

}

@Test
public void testEvaluateReturningException() {
expressionEvaluator = new TestExpressionEvaluator();
assertThrows(ClassCastException.class, () -> expressionEvaluator.evaluateConditional("/status", event("{\"nostatus\":true}")));
}

@Test
public void testThrowExpressionEvaluationException() {
expressionEvaluator = new TestExpressionEvaluator(true, false, false);
assertThat(expressionEvaluator.evaluateConditional("/status", event("{\"nostatus\":true}")), equalTo(false));
}

@Test
public void testThrowExpressionParsingException() {
expressionEvaluator = new TestExpressionEvaluator(false, true, false);
assertThrows(ExpressionParsingException.class, () -> expressionEvaluator.evaluateConditional("/status", event("{\"nostatus\":true}")));
}

@Test
public void testExpressionEvaluationReturnsNull() {
expressionEvaluator = new TestExpressionEvaluator(false, false, true);
assertThrows(ClassCastException.class, () -> expressionEvaluator.evaluateConditional("/status", event("{\"nostatus\":true}")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.dataprepper.expression;

import org.junit.jupiter.api.Test;

import java.util.UUID;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

class ExpressionParsingExceptionTest {

@Test
void testExpressionParsingException() {
final ExpressionParsingException exception = new ExpressionParsingException(UUID.randomUUID().toString(), null);
assertThat(exception instanceof RuntimeException, is(true));
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.processor;

import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ProcessorTest {

@Test
public void testDefault() {
Processor processor = mock(Processor.class);
when(processor.holdsEvents()).thenCallRealMethod();
assertThat(processor.holdsEvents(), equalTo(false));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void shutdownExecutorService(final ExecutorService executorService, fina
* @param records records that needs to published to each sink
* @return List of Future, each future for each sink
*/
List<Future<Void>> publishToSinks(final Collection<Record> records) {
public List<Future<Void>> publishToSinks(final Collection<Record> records) {
final int sinksSize = sinks.size();
final List<Future<Void>> sinkFutures = new ArrayList<>(sinksSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private void doRun() {

try {
records = processor.execute(records);
if (inputEvents != null) {
// acknowledge missing events only if the processor is not holding events
if (!processor.holdsEvents() && inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.atLeast;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult;
import org.opensearch.dataprepper.model.CheckpointState;
Expand Down Expand Up @@ -132,6 +134,61 @@ void testProcessWorkerHappyPathWithAcknowledgments() {
}
}

@Test
void testProcessWorkerWithProcessorsNotHoldingEvents() {
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
when(eventHandle.release(true)).thenReturn(true);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(false);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);
processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, atLeast(1)).release(true);
}


@Test
void testProcessWorkerWithProcessorsHoldingEvents() {
EventHandle eventHandle = mock(EventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(true);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);

processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, never()).release(true);
}

@Test
void testProcessWorkerWithProcessorThrowingExceptionIsCaughtProperly() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ class AddBinaryOperator implements Operator<Object> {
private final String displayName;
private final Map<Class<? extends Number>, Map<Class<? extends Number>, BiFunction<Object, Object, Number>>> operandsToOperationMap;


@Override
public boolean isBooleanOperator() {
return false;
}

public AddBinaryOperator(final int symbol,
final Map<Class<? extends Number>, Map<Class<? extends Number>, BiFunction<Object, Object, Number>>> operandsToOperationMap) {
this.symbol = symbol;
Expand All @@ -41,6 +47,7 @@ public Object evaluate(final Object ... args) {
checkArgument(args.length == 2, displayName + " requires operands length needs to be 2.");
final Object leftValue = args[0];
final Object rightValue = args[1];
checkArgument(leftValue != null && rightValue != null, displayName + " requires operands length needs to be non-null.");
final Class<?> leftValueClass = leftValue.getClass();
final Class<?> rightValueClass = rightValue.getClass();
if (leftValue instanceof String && rightValue instanceof String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class AndOperator implements Operator<Boolean> {
private static final String DISPLAY_NAME = DataPrepperExpressionParser.VOCABULARY
.getDisplayName(DataPrepperExpressionParser.AND);

@Override
public boolean isBooleanOperator() {
return true;
}

@Override
public boolean shouldEvaluate(final RuleContext ctx) {
return ctx.getRuleIndex() == DataPrepperExpressionParser.RULE_conditionalExpression;
Expand Down
Loading

0 comments on commit 6d87001

Please sign in to comment.