Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][fn] Record Pulsar Function processing time properly for asynchronous functions #23811

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ef36d8d
delete processTimeStart method in ComponentStatsManager
walkinggo Jan 6, 2025
6f9bffa
change processTimeEnd method in ComponentStatsManager
walkinggo Jan 6, 2025
b4435dd
add startTime in JavaExecutionResult and process endTime in handleResult
walkinggo Jan 6, 2025
6c4c5a1
add field JavaExecutionResult in AsyncFuncRequest and use JavaExecuti…
walkinggo Jan 6, 2025
a776932
use the same executionResult in non asyncPreserveInputOrderForOutputM…
walkinggo Jan 6, 2025
1749db7
fix the bug that do not set result
walkinggo Jan 7, 2025
493c364
fix the bug that do not set exception
walkinggo Jan 7, 2025
0344726
add test case
walkinggo Jan 7, 2025
21816d8
remove LongSupplier timeSupplier
walkinggo Jan 8, 2025
1d34566
remove useless import
walkinggo Jan 8, 2025
ae9bbc6
remove test case
walkinggo Jan 8, 2025
7aba620
pass the start time as parameter and remove result from AsyncFuncRequest
walkinggo Jan 9, 2025
d2db727
Revert "pass the start time as parameter and remove result from Async…
walkinggo Jan 10, 2025
81f2d4b
add bugsExclude to remove EI_EXPOSE_REP
walkinggo Jan 10, 2025
6ae6819
add testFunctionAsyncTime in JavaInstanceRunnableTest
walkinggo Jan 12, 2025
86b4d41
add test in PulsarFunctionE2ETest
walkinggo Jan 13, 2025
c0994d0
Modify the test case to capture the running time of `processTimeEnd`.
walkinggo Jan 13, 2025
3895bd1
Update pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctio…
walkinggo Jan 22, 2025
94454bc
Update pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctio…
walkinggo Jan 22, 2025
659d57c
Update pulsar-functions/instance/src/test/java/org/apache/pulsar/func…
walkinggo Jan 22, 2025
312245d
unformat code in JavaInstanceRunnable
walkinggo Jan 22, 2025
0c2f1f9
unformat code in JavaInstanceRunnable
walkinggo Jan 22, 2025
b958524
unformat code in PulsarFunctionE2ETest
walkinggo Jan 22, 2025
162bf20
unformat code in PulsarFunctionE2ETest
walkinggo Jan 22, 2025
6625e88
unformat code in PulsarFunctionE2ETest
walkinggo Jan 22, 2025
8d0ae62
unformat code in JavaInstance
walkinggo Jan 22, 2025
ed315b4
unformat code in ComponentStatsManager
walkinggo Jan 22, 2025
9910dd2
unformat code in SinkStatsManager
walkinggo Jan 22, 2025
56881a4
unformat code in SourceStatsManager and JavaInstanceRunnableTest
walkinggo Jan 22, 2025
cb34cf9
unformat code in JavaInstanceRunnableTest
walkinggo Jan 22, 2025
ae3ab27
unformat code in JavaInstanceTest
walkinggo Jan 22, 2025
44c8b11
unformat code in JavaInstanceTest
walkinggo Jan 22, 2025
99af7cb
unformat code in JavaInstanceRunnableTest
walkinggo Jan 22, 2025
5d74461
set stats as setMethod in JavaInstanceRunnable
walkinggo Jan 22, 2025
17ca085
use LF
walkinggo Jan 22, 2025
5c9a150
use LF
walkinggo Jan 22, 2025
be533fb
use LF
walkinggo Jan 22, 2025
2d1571b
unformat code in JavaInstanceRunnableTest
walkinggo Jan 22, 2025
4c82591
remove SpendTimeFunction class in PulsarFunctionE2ETest
walkinggo Jan 22, 2025
d1dd15b
remove unused import in JavaInstanceRunnableTest
walkinggo Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
walkinggo marked this conversation as resolved.
Show resolved Hide resolved

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class JavaExecutionResult {
private Throwable userException;
private Object result;
private final long startTime = System.nanoTime();

public void reset() {
setUserException(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class JavaInstance implements AutoCloseable {
public static class AsyncFuncRequest {
private final Record record;
private final CompletableFuture processResult;
private final JavaExecutionResult result;
}

@Getter(AccessLevel.PACKAGE)
Expand Down Expand Up @@ -136,7 +137,7 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
if (asyncPreserveInputOrderForOutputMessages) {
// Function is in format: Function<I, CompletableFuture<O>>
AsyncFuncRequest request = new AsyncFuncRequest(
record, (CompletableFuture) output
record, (CompletableFuture) output, executionResult
);
pendingAsyncRequests.put(request);
} else {
Expand All @@ -148,13 +149,12 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input,
processAsyncResultsInInputOrder(asyncResultConsumer);
} else {
try {
JavaExecutionResult execResult = new JavaExecutionResult();
if (cause != null) {
execResult.setUserException(FutureUtil.unwrapCompletionException(cause));
executionResult.setUserException(FutureUtil.unwrapCompletionException(cause));
} else {
execResult.setResult(res);
executionResult.setResult(res);
}
asyncResultConsumer.accept(record, execResult);
asyncResultConsumer.accept(record, executionResult);
} finally {
asyncRequestsConcurrencyLimiter.release();
}
Expand Down Expand Up @@ -187,16 +187,14 @@ private void processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultCon
while (asyncResult != null && asyncResult.getProcessResult().isDone()) {
pendingAsyncRequests.remove(asyncResult);

JavaExecutionResult execResult = new JavaExecutionResult();
JavaExecutionResult execResult = asyncResult.getResult();
try {
Object result = asyncResult.getProcessResult().get();
execResult.setResult(result);
} catch (ExecutionException e) {
execResult.setUserException(FutureUtil.unwrapCompletionException(e));
}

resultConsumer.accept(asyncResult.getRecord(), execResult);

// peek the next result
asyncResult = pendingAsyncRequests.peek();
}
Expand Down
walkinggo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
this.secretsProvider = secretsProvider;
this.componentClassLoader = componentClassLoader;
this.functionClassLoader = transformFunctionClassLoader != null
? transformFunctionClassLoader
: componentClassLoader;
? transformFunctionClassLoader
: componentClassLoader;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
Expand Down Expand Up @@ -235,7 +235,7 @@ private synchronized void setup() throws Exception {
ThreadContext.put("instance", instanceConfig.getInstanceName());

log.info("Starting Java Instance {} : \n Details = {}",
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());

Object object;
if (instanceConfig.getFunctionDetails().getClassName()
Expand Down Expand Up @@ -293,8 +293,8 @@ ContextImpl setupContext() throws PulsarClientException {
try {
Thread.currentThread().setContextClassLoader(functionClassLoader);
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin, clientBuilder, fatalHandler, producerCache);
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin, clientBuilder, fatalHandler, producerCache);
} finally {
Thread.currentThread().setContextClassLoader(clsLoader);
}
Expand Down Expand Up @@ -334,8 +334,6 @@ public void run() {
// set last invocation time
stats.setLastInvocation(System.currentTimeMillis());

// start time for process latency stat
stats.processTimeStart();

// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
Expand All @@ -346,9 +344,6 @@ public void run() {
asyncErrorHandler);
Thread.currentThread().setContextClassLoader(instanceClassLoader);

// register end time
stats.processTimeEnd();

if (result != null) {
// process the synchronous results
handleResult(currentRecord, result);
Expand Down Expand Up @@ -401,9 +396,9 @@ private void setupStateStore() throws Exception {
stateStoreProvider.init(stateStoreProviderConfig);

StateStore store = stateStoreProvider.getStateStore(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName()
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName()
);
StateStoreContext context = new StateStoreContextImpl();
store.init(context);
Expand Down Expand Up @@ -448,6 +443,8 @@ void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
// handle endTime here
stats.processTimeEnd(result.getStartTime());
}

private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
Expand Down Expand Up @@ -513,7 +510,7 @@ private OutputRecordSinkRecord encodeWithRecordSchemaAndDecodeWithSinkSchema(Rec
if (isKeyValueSeparated && schema instanceof KeyValueSchema) {
KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) schema;
finalSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema(),
KeyValueEncodingType.SEPARATED);
KeyValueEncodingType.SEPARATED);
} else {
finalSchema = schema;
}
Expand Down Expand Up @@ -548,7 +545,7 @@ record = this.source.read();

/**
* NOTE: this method is be synchronized because it is potentially called by two different places
* one inside the run/finally clause and one inside the ThreadRuntime::stop.
* one inside the run/finally clause and one inside the ThreadRuntime::stop.
*/
@Override
public synchronized void close() {
Expand Down Expand Up @@ -678,8 +675,8 @@ private InstanceCommunication.MetricsData internalGetMetrics() {
}

private void internalResetMetrics() {
stats.reset();
javaInstance.resetMetrics();
stats.reset();
javaInstance.resetMetrics();
}

private Builder createMetricsDataBuilder() {
Expand Down Expand Up @@ -834,7 +831,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
);

pulsarSourceConfig.setSkipToLatest(
sourceSpec.getSkipToLatest()
sourceSpec.getSkipToLatest()
);

Objects.requireNonNull(contextImpl.getSubscriptionType());
Expand Down Expand Up @@ -874,12 +871,12 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
// check if source is a batch source
if (sourceSpec.getClassName().equals(BatchSourceExecutor.class.getName())) {
object = Reflections.createInstance(
sourceSpec.getClassName(),
this.instanceClassLoader);
sourceSpec.getClassName(),
this.instanceClassLoader);
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
this.componentClassLoader);
sourceSpec.getClassName(),
this.componentClassLoader);
}
}

Expand Down Expand Up @@ -911,8 +908,9 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
/**
* Recursively interpolate configured secrets into the config map by calling
* {@link SecretsProvider#interpolateSecretForValue(String)}.
*
* @param secretsProvider - the secrets provider that will convert secret's values into config values.
* @param configs - the connector configuration map, which will be mutated.
* @param configs - the connector configuration map, which will be mutated.
*/
private static void interpolateSecretsIntoConfigs(SecretsProvider secretsProvider,
Map<String, Object> configs) {
Expand All @@ -939,12 +937,13 @@ static Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfi
SecretsProvider secretsProvider,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
.FunctionDetails.ComponentType componentType)
.FunctionDetails.ComponentType componentType)
throws IOException {
final Map<String, Object> config = connectorConfigs.isEmpty() ? new HashMap<>() : ObjectMapperFactory
.getMapper()
.reader()
.forType(new TypeReference<Map<String, Object>>() {})
.forType(new TypeReference<Map<String, Object>>() {
})
.readValue(connectorConfigs);
if (componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK
&& componentType != org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
Expand All @@ -959,7 +958,7 @@ static Map<String, Object> augmentAndFilterConnectorConfig(String connectorConfi
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass();
} else {
configClassName = ConnectorUtils
configClassName = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass();
}
if (configClassName != null) {
Expand Down Expand Up @@ -1105,7 +1104,7 @@ private static <T> Schema<T> getSinkSchema(Record<?> record, Class<T> clazz) {

case AVRO:
return AvroSchema.of(SchemaDefinition.<T>builder()
.withPojo(clazz).build());
.withPojo(clazz).build());

case JSON:
return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
Expand All @@ -1131,8 +1130,8 @@ private static SchemaType getSchemaTypeOrDefault(Record<?> record, Class<?> claz
if (GenericObject.class.isAssignableFrom(clazz)) {
return SchemaType.AUTO_CONSUME;
} else if (byte[].class.equals(clazz)
|| ByteBuf.class.equals(clazz)
|| ByteBuffer.class.equals(clazz)) {
|| ByteBuf.class.equals(clazz)
|| ByteBuffer.class.equals(clazz)) {
// if sink uses bytes, we should ignore
return SchemaType.NONE;
} else {
Expand All @@ -1151,8 +1150,8 @@ private static SchemaType getSchemaTypeOrDefault(Record<?> record, Class<?> claz

private static SchemaType getDefaultSchemaType(Class<?> clazz) {
if (byte[].class.equals(clazz)
|| ByteBuf.class.equals(clazz)
|| ByteBuffer.class.equals(clazz)) {
|| ByteBuf.class.equals(clazz)
|| ByteBuffer.class.equals(clazz)) {
return SchemaType.NONE;
} else if (GenericObject.class.isAssignableFrom(clazz)) {
// the sink is taking generic record/object, so we do auto schema detection
Expand Down
walkinggo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public abstract class ComponentStatsManager implements AutoCloseable {
}

public static ComponentStatsManager getStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
Function.FunctionDetails.ComponentType componentType) {
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
Function.FunctionDetails.ComponentType componentType) {
switch (componentType) {
case FUNCTION:
return new FunctionStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService);
Expand Down Expand Up @@ -100,9 +100,8 @@ public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry,

public abstract void setLastInvocation(long ts);

public abstract void processTimeStart();

public abstract void processTimeEnd();
public abstract void processTimeEnd(long startTime);

public abstract double getTotalProcessedSuccessfully();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,20 +336,13 @@ public void setLastInvocation(long ts) {
statlastInvocationChild.set(ts);
}

private Long processTimeStart;

@Override
public void processTimeStart() {
processTimeStart = System.nanoTime();
}

@Override
public void processTimeEnd() {
if (processTimeStart != null) {
double endTimeMs = ((double) System.nanoTime() - processTimeStart) / 1.0E6D;
public void processTimeEnd(long startTime) {
double endTimeMs = ((double) System.nanoTime() - startTime) / 1.0E6D;
statProcessLatencyChild.observe(endTimeMs);
statProcessLatency1minChild.observe(endTimeMs);
}
}

@Override
Expand Down
Loading
Loading