Skip to content

Commit

Permalink
Fix lambda metrics (opensearch-project#5201)
Browse files Browse the repository at this point in the history
* Addressed review comments.

Signed-off-by: Kondaka <[email protected]>

* Fixed test cases

Signed-off-by: Kondaka <[email protected]>

* Fixed code coverage error

Signed-off-by: Kondaka <[email protected]>

---------

Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka authored Nov 19, 2024
1 parent 8645c1e commit 72fa423
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ public class PluginMetrics {

private final String metricsPrefix;

public static PluginMetrics fromPluginSetting(final PluginSetting pluginSetting) {
public static PluginMetrics fromPluginSetting(final PluginSetting pluginSetting, final String name) {
if(pluginSetting.getPipelineName() == null) {
throw new IllegalArgumentException("PluginSetting.pipelineName must not be null");
}
return PluginMetrics.fromNames(pluginSetting.getName(), pluginSetting.getPipelineName());
return PluginMetrics.fromNames(name, pluginSetting.getPipelineName());
}

public static PluginMetrics fromPluginSetting(final PluginSetting pluginSetting) {
return fromPluginSetting(pluginSetting, pluginSetting.getName());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ public void testCounterWithMetricsPrefix() {
counter.getId().getName());
}

@Test
public void testCounterWithMetricsPrefixWithCustomMetricsName() {
final String customName = PLUGIN_NAME + "_custom";
objectUnderTest = PluginMetrics.fromPluginSetting(pluginSetting, customName);

final Counter counter = objectUnderTest.counter("counter");
assertEquals(
pluginSetting.getPipelineName() + MetricNames.DELIMITER +
customName + MetricNames.DELIMITER +
"counter",
counter.getId().getName());
}

@Test
public void testCounter() {
final Counter counter = objectUnderTest.counter("counter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.mockito.Mockito.lenient;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;

import java.util.ArrayList;
Expand Down Expand Up @@ -88,11 +89,21 @@ public class LambdaProcessorSinkIT {
@Mock
private ExpressionEvaluator expressionEvaluator;
@Mock
private Counter testCounter;
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private Counter numberOfRequestsSuccessCounter;
@Mock
private Counter numberOfRequestsFailedCounter;
@Mock
private Counter sinkSuccessCounter;
@Mock
private Timer testTimer;
private Timer lambdaLatencyMetric;
@Mock
private DistributionSummary requestPayloadMetric;
@Mock
private DistributionSummary responsePayloadMetric;
@Mock
InvocationType invocationType;

Expand All @@ -103,21 +114,34 @@ public class LambdaProcessorSinkIT {
private AcknowledgementSet acknowledgementSet;

private LambdaProcessor createLambdaProcessor(LambdaProcessorConfig processorConfig) {
return new LambdaProcessor(pluginFactory, pluginMetrics, processorConfig, awsCredentialsSupplier, expressionEvaluator);
return new LambdaProcessor(pluginFactory, pluginSetting, processorConfig, awsCredentialsSupplier, expressionEvaluator);
}

private LambdaSink createLambdaSink(LambdaSinkConfig lambdaSinkConfig) {
return new LambdaSink(pluginSetting, lambdaSinkConfig, pluginFactory, null, awsCredentialsSupplier, expressionEvaluator);

}

private void setPrivateField(Object targetObject, String fieldName, Object value) throws Exception {
Field field = targetObject.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(targetObject, value);
}

@BeforeEach
public void setup() {
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");
successCount = new AtomicLong();
numEventHandlesReleased = new AtomicLong();
numberOfRecordsSuccessCounter = mock(Counter.class);
numberOfRecordsFailedCounter = mock(Counter.class);
numberOfRequestsSuccessCounter = mock(Counter.class);
numberOfRequestsFailedCounter = mock(Counter.class);
lambdaLatencyMetric = mock(Timer.class);
requestPayloadMetric = mock(DistributionSummary.class);
responsePayloadMetric = mock(DistributionSummary.class);

acknowledgementSet = mock(AcknowledgementSet.class);
try {
Expand All @@ -132,7 +156,6 @@ public void setup() {
}).when(acknowledgementSet).release(any(EventHandle.class), any(Boolean.class));
} catch (Exception e){ }
pluginMetrics = mock(PluginMetrics.class);
when(pluginMetrics.gauge(any(), any(AtomicLong.class))).thenReturn(new AtomicLong());
sinkSuccessCounter = mock(Counter.class);
try {
lenient().doAnswer(args -> {
Expand All @@ -141,26 +164,22 @@ public void setup() {
return null;
}).when(sinkSuccessCounter).increment(any(Double.class));
} catch (Exception e){ }
testCounter = mock(Counter.class);
try {
lenient().doAnswer(args -> {
return null;
}).when(testCounter).increment(any(Double.class));
}).when(numberOfRecordsSuccessCounter).increment(any(Double.class));
} catch (Exception e){}
try {
lenient().doAnswer(args -> {
return null;
}).when(testCounter).increment();
}).when(numberOfRecordsFailedCounter).increment();
} catch (Exception e){}
try {
lenient().doAnswer(args -> {
return null;
}).when(testTimer).record(any(Long.class), any(TimeUnit.class));
}).when(lambdaLatencyMetric).record(any(Long.class), any(TimeUnit.class));
} catch (Exception e){}
when(pluginMetrics.counter(any())).thenReturn(testCounter);

testTimer = mock(Timer.class);
when(pluginMetrics.timer(any())).thenReturn(testTimer);
lambdaProcessorConfig = mock(LambdaProcessorConfig.class);
expressionEvaluator = mock(ExpressionEvaluator.class);
awsCredentialsProvider = DefaultCredentialsProvider.create();
Expand Down Expand Up @@ -212,29 +231,31 @@ public void setup() {

}

private void setPrivateField(Object targetObject, String fieldName, Object value)
throws Exception {
Field field = targetObject.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(targetObject, value);
private void setPrivateFields(final LambdaProcessor lambdaProcessor) throws Exception {
setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", numberOfRecordsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsSuccessCounter", numberOfRequestsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsFailedCounter", numberOfRequestsFailedCounter);
setPrivateField(lambdaProcessor, "lambdaLatencyMetric", lambdaLatencyMetric);
setPrivateField(lambdaProcessor, "requestPayloadMetric", requestPayloadMetric);
setPrivateField(lambdaProcessor, "responsePayloadMetric", responsePayloadMetric);
}

@ParameterizedTest
@ValueSource(ints = {11})
public void testLambdaProcessorAndLambdaSink(int numRecords) {
public void testLambdaProcessorAndLambdaSink(int numRecords) throws Exception {
when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue());
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true);
lambdaProcessor = createLambdaProcessor(lambdaProcessorConfig);
setPrivateFields(lambdaProcessor);
List<Record<Event>> records = createRecords(numRecords);

Collection<Record<Event>> results = lambdaProcessor.doExecute(records);

assertThat(results.size(), equalTo(numRecords));
validateStrictModeResults(records, results);
LambdaSink lambdaSink = createLambdaSink(lambdaSinkConfig);
try {
setPrivateField(lambdaSink, "numberOfRecordsSuccessCounter", sinkSuccessCounter);
} catch (Exception e){}
setPrivateField(lambdaSink, "numberOfRecordsSuccessCounter", sinkSuccessCounter);
lambdaSink.output(results);
assertThat(successCount.get(), equalTo((long)numRecords));
assertThat(numEventHandlesReleased.get(), equalTo((long)numRecords));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static org.mockito.Mockito.spy;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventMetadata;
Expand Down Expand Up @@ -43,9 +42,12 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -71,40 +73,78 @@ public class LambdaProcessorIT {
@Mock
private PluginFactory pluginFactory;
@Mock
private PluginMetrics pluginMetrics;
private PluginSetting pluginSetting;
@Mock
private ExpressionEvaluator expressionEvaluator;
@Mock
private Counter testCounter;
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private Counter numberOfRequestsSuccessCounter;
@Mock
private Counter numberOfRequestsFailedCounter;
@Mock
private Timer testTimer;
private Counter sinkSuccessCounter;
@Mock
private Timer lambdaLatencyMetric;
@Mock
private DistributionSummary requestPayloadMetric;
@Mock
private DistributionSummary responsePayloadMetric;
@Mock
InvocationType invocationType;
private LambdaProcessor createObjectUnderTest(LambdaProcessorConfig processorConfig) {
return new LambdaProcessor(pluginFactory, pluginMetrics, processorConfig, awsCredentialsSupplier, expressionEvaluator);
return new LambdaProcessor(pluginFactory, pluginSetting, processorConfig, awsCredentialsSupplier, expressionEvaluator);
}

private void setPrivateField(Object targetObject, String fieldName, Object value) throws Exception {
Field field = targetObject.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(targetObject, value);
}

private void setPrivateFields(final LambdaProcessor lambdaProcessor) throws Exception {
setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", numberOfRecordsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsSuccessCounter", numberOfRequestsSuccessCounter);
setPrivateField(lambdaProcessor, "numberOfRequestsFailedCounter", numberOfRequestsFailedCounter);
setPrivateField(lambdaProcessor, "lambdaLatencyMetric", lambdaLatencyMetric);
setPrivateField(lambdaProcessor, "requestPayloadMetric", requestPayloadMetric);
setPrivateField(lambdaProcessor, "responsePayloadMetric", responsePayloadMetric);
}

@BeforeEach
public void setup() {
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");
pluginMetrics = mock(PluginMetrics.class);
//when(pluginMetrics.gauge(any(), any(AtomicLong.class))).thenReturn(new AtomicLong());
//testCounter = mock(Counter.class);
pluginSetting = mock(PluginSetting.class);
when(pluginSetting.getPipelineName()).thenReturn("pipeline");
when(pluginSetting.getName()).thenReturn("name");
numberOfRecordsSuccessCounter = mock(Counter.class);
numberOfRecordsFailedCounter = mock(Counter.class);
numberOfRequestsSuccessCounter = mock(Counter.class);
numberOfRequestsFailedCounter = mock(Counter.class);
lambdaLatencyMetric = mock(Timer.class);
requestPayloadMetric = mock(DistributionSummary.class);
responsePayloadMetric = mock(DistributionSummary.class);
try {
lenient().doAnswer(args -> {
return null;
}).when(numberOfRecordsSuccessCounter).increment(any(Double.class));
} catch (Exception e){}
try {
lenient().doAnswer(args -> {
return null;
}).when(testCounter).increment(any(Double.class));
}).when(numberOfRecordsFailedCounter).increment();
} catch (Exception e){}
try {
lenient().doAnswer(args -> {
return null;
}).when(testTimer).record(any(Long.class), any(TimeUnit.class));
}).when(lambdaLatencyMetric).record(any(Long.class), any(TimeUnit.class));
} catch (Exception e){}
when(pluginMetrics.counter(any())).thenReturn(testCounter);
testTimer = mock(Timer.class);
when(pluginMetrics.timer(any())).thenReturn(testTimer);

lambdaProcessorConfig = mock(LambdaProcessorConfig.class);
expressionEvaluator = mock(ExpressionEvaluator.class);
awsCredentialsProvider = DefaultCredentialsProvider.create();
Expand Down Expand Up @@ -166,7 +206,7 @@ public void testRequestResponseWithMatchingEventsAggregateMode(int numRecords) {

@ParameterizedTest
@ValueSource(ints = {1000})
public void testRequestResponse_WithMatchingEvents_StrictMode_WithMultipleThreads(int numRecords) throws InterruptedException {
public void testRequestResponse_WithMatchingEvents_StrictMode_WithMultipleThreads(int numRecords) throws Exception {
when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue());
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true);
lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig);
Expand All @@ -191,7 +231,7 @@ public void testRequestResponse_WithMatchingEvents_StrictMode_WithMultipleThread

@ParameterizedTest
@ValueSource(strings = {"RequestResponse", "Event"})
public void testDifferentInvocationTypes(String invocationType) {
public void testDifferentInvocationTypes(String invocationType) throws Exception {
when(this.invocationType.getAwsLambdaValue()).thenReturn(invocationType);
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true);
lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig);
Expand All @@ -207,7 +247,7 @@ public void testDifferentInvocationTypes(String invocationType) {
}

@Test
public void testWithFailureTags() {
public void testWithFailureTags() throws Exception {
when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue());
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false);
when(lambdaProcessorConfig.getTagsOnFailure()).thenReturn(Collections.singletonList("lambda_failure"));
Expand Down
Loading

0 comments on commit 72fa423

Please sign in to comment.