Skip to content

Commit

Permalink
Fix AsyncHandlingAutoCommitKafkaConsumerTest tests.
Browse files Browse the repository at this point in the history
Tests were failing with Kafka 3.6.1 because of changes to
the MockConsumer.rebalance() method.
The rebalance-related tests have been changed to now
work with the MockConsumer.rebalance() method (removing the
Hono KafkaMockConsumer override of that method).
  • Loading branch information
calohmn authored and sophokles73 committed May 22, 2024
1 parent 16dddde commit 157836f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -63,9 +63,10 @@
* completed gets committed. This prevents incompletely handled records and thereby enables at-least-once semantics.
* <p>
* In terms of when offsets are committed, the behaviour is similar to the one used for a consumer with
* <em>enable.auto.commit</em>. Commits are done periodically (using <em>commitAsync</em>) and when a rebalance
* happens or the consumer is stopped (using <em>commitSync</em>). The periodic commit interval is defined via
* the standard <em>auto.commit.interval.ms</em> configuration property.
* <em>enable.auto.commit</em>. Commits are done periodically (using <em>commitAsync</em>), and they are also done
* (using <em>commitSync</em>) when the consumer is stopped or a rebalance happens (with eager rebalancing
* or a non-empty revoked partitions list).
* The periodic commit interval is defined via the standard <em>auto.commit.interval.ms</em> configuration property.
* <p>
* In order to not fall behind with the position of the committed offset vs. the last received offset, users of this
* class have to make sure that the record handling function, which provides the completion Future, is completed in time.
Expand All @@ -90,7 +91,7 @@
* still done) but record fetching from all assigned topic partitions is suspended until the throttling threshold is
* reached again.
* The overall limit, i.e. the maximum number of incomplete record handler result futures at a given point in time, is
* calculated from the above mentioned throttling threshold plus the maximum number of records per poll operation.
* calculated from the above-mentioned throttling threshold plus the maximum number of records per poll operation.
*
* @param <V> The type of record payload this consumer can process.
*/
Expand Down Expand Up @@ -653,7 +654,7 @@ public OffsetsQueueEntry addOffset(final long offset) {
* the 'skipOffsetRecommitPeriodSeconds'. Note that for the actual commit, {@code 1} has to be added to the
* returned value.
* <p>
* Otherwise an empty Optional is returned.
* Otherwise, an empty Optional is returned.
*
* @return The offset wrapped in an Optional or an empty Optional if no offset commit is needed.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -720,7 +720,8 @@ public void onPartitionsAssigned(final Collection<org.apache.kafka.common.TopicP
// invoked on the Kafka polling thread, not the event loop thread!
final Set<TopicPartition> partitionsSet = Helper.from(partitions);
if (LOG.isDebugEnabled()) {
LOG.debug("partitions assigned: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
LOG.debug("partitions assigned: [{}] [client-id: {}]",
HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), getClientId());
}
ensurePositionsHaveBeenSetIfNeeded(partitionsSet);
updateSubscribedTopicPatternTopicsAndRemoveMetrics();
Expand All @@ -744,7 +745,8 @@ public void onPartitionsRevoked(final Collection<org.apache.kafka.common.TopicPa
// invoked on the Kafka polling thread, not the event loop thread!
final Set<TopicPartition> partitionsSet = Helper.from(partitions);
if (LOG.isDebugEnabled()) {
LOG.debug("partitions revoked: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
LOG.debug("partitions revoked: [{}] [client-id: {}]",
HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), getClientId());
}
onPartitionsRevokedBlocking(partitionsSet);
context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,18 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
consumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Set.of(TOPIC), handler, consumerConfig);
consumer.setKafkaConsumerSupplier(() -> mockConsumer);
consumer.addOnKafkaConsumerReadyHandler(readyTracker);
consumer.start()
.compose(ok -> readyTracker.future())
.onComplete(ctx.succeeding(v2 -> {
mockConsumer.schedulePollTask(() -> {
IntStream.range(0, numTestRecords).forEach(offset -> {
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, PARTITION, offset, "key_" + offset, Buffer.buffer()));
});
});
}));
final Context consumerVertxContext = vertx.getOrCreateContext();
consumerVertxContext.runOnContext(v -> {
consumer.start()
.compose(ok -> readyTracker.future())
.onComplete(ctx.succeeding(v2 -> {
mockConsumer.schedulePollTask(() -> {
IntStream.range(0, numTestRecords).forEach(offset -> {
mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, PARTITION, offset, "key_" + offset, Buffer.buffer()));
});
});
}));
});
assertWithMessage("records received in 5s")
.that(receivedRecordsCtx.awaitCompletion(5, TimeUnit.SECONDS))
.isTrue();
Expand All @@ -356,66 +359,23 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th
final AtomicInteger latestFullyHandledOffset = new AtomicInteger(1);
recordsHandlingPromiseMap.get(4L).complete();

// define VertxTestContexts for 3 checks (3x rebalance/commit)
final var checkIndex = new AtomicInteger(0);
final var commitCheckContexts = IntStream.range(0, 3)
.mapToObj(i -> new VertxTestContext()).toList();
final var commitCheckpoints = commitCheckContexts.stream()
.map(c -> c.laxCheckpoint(1)).toList();
final InterruptableSupplier<Boolean> waitForCurrentCommitCheckResult = () -> {
final var checkContext = commitCheckContexts.get(checkIndex.get());
assertWithMessage("partition assigned in 5s for checking of commits")
.that(checkContext.awaitCompletion(5, TimeUnit.SECONDS))
.isTrue();
if (checkContext.failed()) {
ctx.failNow(checkContext.causeOfFailure());
return false;
}
return true;
};
final Checkpoint commitCheckDone = ctx.checkpoint(1);

consumer.setOnPartitionsAssignedHandler(partitions -> {
LOG.debug("onPartitionsAssignedHandler invoked [check index: {}, newly assigned partitions: {}]",
checkIndex.get(), partitions.stream().map(t -> t.toString()).collect(Collectors.joining(", ")));
final var committedPartitions = mockConsumer.committed(Set.of(TOPIC_PARTITION));
final var offsetAndMetadata = committedPartitions.get(TOPIC_PARTITION);
LOG.debug("committed partition [name: {}, offset: {}, expected offset: {}]",
TOPIC_PARTITION, offsetAndMetadata.offset(), latestFullyHandledOffset.get() + 1L);
ctx.verify(() -> {
assertThat(offsetAndMetadata).isNotNull();
// assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L);
});
if (offsetAndMetadata.offset() == latestFullyHandledOffset.get() + 1L) {
commitCheckpoints.get(checkIndex.get()).flag();
if (!partitions.isEmpty()) {
final var committedPartitions = mockConsumer.committed(Set.of(TOPIC_PARTITION));
final var offsetAndMetadata = committedPartitions.get(TOPIC_PARTITION);
ctx.verify(() -> {
assertThat(offsetAndMetadata).isNotNull();
assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L);
});
commitCheckDone.flag();
}
});
// now force a rebalance which should trigger the above onPartitionsAssignedHandler
LOG.debug("force rebalance 1");
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
if (!waitForCurrentCommitCheckResult.get()) {
return;
}
checkIndex.incrementAndGet();

// now another rebalance (ie. commit trigger) - no change in offsets
LOG.debug("force rebalance 2");
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
if (!waitForCurrentCommitCheckResult.get()) {
return;
}
checkIndex.incrementAndGet();

// now complete some more promises
recordsHandlingPromiseMap.get(2L).complete();
recordsHandlingPromiseMap.get(3L).complete();
// offset 4 already complete
latestFullyHandledOffset.set(4);
// again rebalance/commit
LOG.debug("force rebalance 3");
LOG.debug("force rebalance");
mockConsumer.rebalance(List.of());
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
if (waitForCurrentCommitCheckResult.get()) {
ctx.completeNow();
}
}

/**
Expand Down Expand Up @@ -448,6 +408,8 @@ public void testConsumerCommitsOffsetsOnRebalanceAfterWaitingForRecordCompletion

mockConsumer.updateBeginningOffsets(Map.of(TOPIC_PARTITION, 0L));
mockConsumer.updateEndOffsets(Map.of(TOPIC_PARTITION, 0L));
mockConsumer.updateBeginningOffsets(Map.of(TOPIC2_PARTITION, 0L));
mockConsumer.updateEndOffsets(Map.of(TOPIC2_PARTITION, 0L));
mockConsumer.updatePartitions(TOPIC_PARTITION, KafkaMockConsumer.DEFAULT_NODE);
mockConsumer.setRebalancePartitionAssignmentAfterSubscribe(List.of(TOPIC_PARTITION));
final AtomicReference<Handler<Void>> onNextPartitionsRevokedBlockingHandlerRef = new AtomicReference<>();
Expand Down Expand Up @@ -483,38 +445,59 @@ protected void onPartitionsRevokedBlocking(
ctx.failNow(receivedRecordsCtx.causeOfFailure());
return;
}
// records received, complete the handling of all except the first 2 records
LOG.debug("all records received, complete the handling of all except the first 2 records");
LongStream.range(2, numTestRecords).forEach(offset -> recordsHandlingPromiseMap.get(offset).complete());
ctx.verify(() -> assertThat(recordsHandlingPromiseMap.get(1L).future().isComplete()).isFalse());
ctx.verify(() -> {
LongStream.range(0, 2).forEach(offset -> {
assertThat(recordsHandlingPromiseMap.get(offset).future().isComplete()).isFalse();
});
});
final Checkpoint commitCheckDone = ctx.checkpoint(1);

// partitions revoked handler shall get called after the blocking partitions-revoked handling has waited for the records to be marked as completed
consumer.setOnPartitionsRevokedHandler(s -> {
ctx.verify(() -> assertThat(recordsHandlingPromiseMap.get(1L).future().isComplete()).isTrue());
// (3) this partitions revoked handler is called after the blocking partitions-revoked handling (2)
// has waited for the records to be marked as completed and after the offsets were committed
// (we can't check for committed offsets of the just revoked partition here because
// mockConsumer.committed() only returns offsets of assigned partitions)
ctx.verify(() -> {
LongStream.range(0, 2).forEach(offset -> {
assertThat(recordsHandlingPromiseMap.get(offset).future().isComplete()).isTrue();
});
});
});
final Checkpoint commitCheckDone = ctx.laxCheckpoint(1);
consumer.setOnPartitionsAssignedHandler(partitions -> {
final var committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
final var offsetAndMetadata = committed.get(TOPIC_PARTITION);
if (partitions.isEmpty()) {
// (4) ignore if invoked when all partitions got revoked (1); only the subsequent invocation with assigned partitions is relevant
return;
}
// (5) ensure all offsets were committed
final var committedOffsets = mockConsumer.committed(Set.of(TOPIC_PARTITION));
LOG.debug("committed partition offsets: {}", committedOffsets);
final var offsetAndMetadata = committedOffsets.get(TOPIC_PARTITION);
ctx.verify(() -> {
assertThat(offsetAndMetadata).isNotNull();
assertThat(offsetAndMetadata.offset()).isEqualTo(numTestRecords);
});
if (offsetAndMetadata.offset() == numTestRecords) {
commitCheckDone.flag();
}
commitCheckDone.flag();
});
// trigger a rebalance where the currently assigned partition is revoked
// (and then assigned again - otherwise its offset wouldn't be returned by mockConsumer.committed())
// the remaining 2 records are to be marked as completed with some delay

onNextPartitionsRevokedBlockingHandlerRef.set(v -> {
// (2) handler to complete the remaining record handling promises (on the Kafka polling thread; invoked before the OnPartitionsRevokedHandler)
consumerVertxContext.runOnContext(v2 -> {
LOG.debug("complete remaining record handling promises");
recordsHandlingPromiseMap.get(0L).complete();
recordsHandlingPromiseMap.get(1L).complete();
});
// trigger another rebalance; this time the partition is assigned again;
// this means we can then (see (5)) check the committed offsets (only available from MockConsumer for currently assigned partitions)
mockConsumer.setNextPollRebalancePartitionAssignment(List.of(TOPIC_PARTITION, TOPIC2_PARTITION));
});
mockConsumer.setRevokeAllOnRebalance(true);
mockConsumer.updateBeginningOffsets(Map.of(TOPIC2_PARTITION, 0L));
mockConsumer.updateEndOffsets(Map.of(TOPIC2_PARTITION, 0L));
mockConsumer.setNextPollRebalancePartitionAssignment(List.of(TOPIC_PARTITION, TOPIC2_PARTITION));
// (1) Trigger a rebalance where the currently assigned partition is revoked (via mockConsumer.setNextPollRebalancePartitionAssignment(List.of())).
// Since the records at offsets 0 and 1 are not yet completely handled here, the revocation logic will
// wait some time (up to 300ms by default) for the handling to be marked as completed, until committing offsets of completed records.
// The wait time will not be long here, since the blocking partitions-revoked handler above (2) will complete
// the remaining 2 record-handling promises shortly after the partition revocation (via consumerVertxContext.runOnContext()).
mockConsumer.setNextPollRebalancePartitionAssignment(List.of());
}

/**
Expand Down Expand Up @@ -674,7 +657,7 @@ public void testConsumerCommitsInitialOffset(final VertxTestContext ctx) throws
mockConsumer.setRebalancePartitionAssignmentAfterSubscribe(List.of(TOPIC_PARTITION));

final VertxTestContext consumerStartedCtx = new VertxTestContext();
final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.laxCheckpoint(2);
final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.checkpoint(2);
consumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Set.of(TOPIC), handler, consumerConfig);
consumer.setKafkaConsumerSupplier(() -> mockConsumer);
consumer.setOnRebalanceDoneHandler(s -> {
Expand Down Expand Up @@ -816,7 +799,7 @@ public void testScenarioWithPartitionRevokedWhileHandlingIncomplete(final VertxT
recordsHandlingPromiseMap.get(1L).complete();
recordsHandlingPromiseMap.get(2L).complete();

final Checkpoint commitCheckDone = ctx.laxCheckpoint(1);
final Checkpoint commitCheckDone = ctx.checkpoint(1);
consumer.setOnPartitionsAssignedHandler(partitions -> {
LOG.info("rebalancing ...");
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
Expand Down Expand Up @@ -895,13 +878,15 @@ protected void onRecordHandlerSkippedForExpiredRecord(final KafkaConsumerRecord<
final Checkpoint commitCheckpoint = commitCheckContext.checkpoint(1);

consumer.setOnPartitionsAssignedHandler(partitions -> {
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
ctx.verify(() -> {
final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION);
assertThat(offsetAndMetadata).isNotNull();
assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset + 1L);
});
commitCheckpoint.flag();
if (!partitions.isEmpty()) {
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Set.of(TOPIC_PARTITION));
ctx.verify(() -> {
final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION);
assertThat(offsetAndMetadata).isNotNull();
assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset + 1L);
});
commitCheckpoint.flag();
}
});
// now force a rebalance which should trigger the above onPartitionsAssignedHandler
// (rebalance is done as part of the poll() invocation; the vert.x consumer will schedule that invocation
Expand All @@ -910,6 +895,7 @@ protected void onRecordHandlerSkippedForExpiredRecord(final KafkaConsumerRecord<
final CountDownLatch latch = new CountDownLatch(1);
consumerVertxContext.runOnContext(v -> latch.countDown());
latch.await();
mockConsumer.rebalance(List.of());
mockConsumer.rebalance(List.of(TOPIC_PARTITION));
assertWithMessage("partition assigned in 5s for checking of commits")
.that(commitCheckContext.awaitCompletion(5, TimeUnit.SECONDS))
Expand Down Expand Up @@ -941,18 +927,4 @@ private ConsumerRecord<String, Buffer> createRecordWithElapsedTtl() {
new RecordHeaders(new Header[] { ttl, creationTime }),
Optional.empty());
}

/**
* Supplier whose get() method might throw an {@link InterruptedException}.
* @param <T> The type of results supplied by this supplier.
*/
@FunctionalInterface
interface InterruptableSupplier<T> {
/**
* Gets a result.
* @return The result.
* @throws InterruptedException If getting the result was interrupted.
*/
T get() throws InterruptedException;
}
}
Loading

0 comments on commit 157836f

Please sign in to comment.