From 632aedcf4ff7e718bf1295fe5132ebaf1d3f92a0 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Wed, 29 Jan 2025 22:33:43 +0530 Subject: [PATCH] KAFKA-18632: Multibroker test improvements. (#18718) Reviewers: Andrew Schofield --- ...areCoordinatorMetadataCacheHelperImpl.java | 57 ++-- .../kafka/test/api/ShareConsumerTest.java | 274 ++++++++++++------ 2 files changed, 213 insertions(+), 118 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java index 28148eab7ffc4..40dcac8ca0bc7 100644 --- a/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java +++ b/core/src/main/scala/kafka/server/metadata/ShareCoordinatorMetadataCacheHelperImpl.java @@ -20,6 +20,7 @@ import kafka.server.MetadataCache; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.message.MetadataResponseData; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.Errors; @@ -27,6 +28,9 @@ import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -41,6 +45,7 @@ public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinator private final MetadataCache metadataCache; private final Function keyToPartitionMapper; private final ListenerName interBrokerListenerName; + private final Logger log = LoggerFactory.getLogger(ShareCoordinatorMetadataCacheHelperImpl.class); public ShareCoordinatorMetadataCacheHelperImpl( MetadataCache metadataCache, @@ -63,35 +68,39 @@ public boolean containsTopic(String topic) { @Override public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) { - if (metadataCache.contains(internalTopicName)) { - Set topicSet = new HashSet<>(); - topicSet.add(internalTopicName); - - List topicMetadata = CollectionConverters.asJava( - metadataCache.getTopicMetadata( - CollectionConverters.asScala(topicSet), - interBrokerListenerName, - false, - false - ) - ); + try { + if (metadataCache.contains(internalTopicName)) { + Set topicSet = new HashSet<>(); + topicSet.add(internalTopicName); - if (topicMetadata == null || topicMetadata.isEmpty() || topicMetadata.get(0).errorCode() != Errors.NONE.code()) { - return Node.noNode(); - } else { - int partition = keyToPartitionMapper.apply(key); - Optional response = topicMetadata.get(0).partitions().stream() - .filter(responsePart -> responsePart.partitionIndex() == partition - && responsePart.leaderId() != MetadataResponse.NO_LEADER_ID) - .findFirst(); + List topicMetadata = CollectionConverters.asJava( + metadataCache.getTopicMetadata( + CollectionConverters.asScala(topicSet), + interBrokerListenerName, + false, + false + ) + ); - if (response.isPresent()) { - return OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName)) - .orElse(Node.noNode()); - } else { + if (topicMetadata == null || topicMetadata.isEmpty() || topicMetadata.get(0).errorCode() != Errors.NONE.code()) { return Node.noNode(); + } else { + int partition = keyToPartitionMapper.apply(key); + Optional response = topicMetadata.get(0).partitions().stream() + .filter(responsePart -> responsePart.partitionIndex() == partition + && responsePart.leaderId() != MetadataResponse.NO_LEADER_ID) + .findFirst(); + + if (response.isPresent()) { + return OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName)) + .orElse(Node.noNode()); + } else { + return Node.noNode(); + } } } + } catch (CoordinatorNotAvailableException e) { + log.warn("Coordinator not available", e); } return Node.noNode(); } diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 0de4f18044437..f5761003e103b 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -55,6 +55,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterTest; @@ -88,6 +90,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1832,86 +1835,142 @@ public void testShareAutoOffsetResetByDurationInvalidFormat() throws Exception { @ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true") } ) + @Timeout(90) public void testShareConsumerAfterCoordinatorMovement() throws Exception { setup(); String topicName = "multipart"; String groupId = "multipartGrp"; Uuid topicId = createTopic(topicName, 3, 3); alterShareAutoOffsetReset(groupId, "earliest"); - - try (Admin admin = createAdminClient()) { - TopicPartition tpMulti = new TopicPartition(topicName, 0); - - // produce some messages - try (Producer producer = createProducer()) { - ProducerRecord record = new ProducerRecord<>( - tpMulti.topic(), - tpMulti.partition(), - null, - "key".getBytes(), - "value".getBytes() - ); - IntStream.range(0, 10).forEach(__ -> producer.send(record)); - producer.flush(); - } - - // consume messages - try (ShareConsumer shareConsumer = createShareConsumer(groupId)) { - shareConsumer.subscribe(List.of(topicName)); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(10, records.count()); + ScheduledExecutorService service = Executors.newScheduledThreadPool(5); + + TopicPartition tpMulti = new TopicPartition(topicName, 0); + + // produce some messages + ClientState prodState = new ClientState(); + final Set produced = new HashSet<>(); + service.execute(() -> { + int i = 0; + try (Producer producer = createProducer(Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName() + ))) { + while (!prodState.done().get()) { + String key = "key-" + (i++); + ProducerRecord record = new ProducerRecord<>( + tpMulti.topic(), + tpMulti.partition(), + null, + key, + "value" + ); + try { + producer.send(record); + producer.flush(); + // count only correctly produced records + prodState.count().incrementAndGet(); + produced.add(key); + } catch (Exception e) { + // ignore + } + } + } } + ); - // get current share coordinator node - SharePartitionKey key = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti)); - int shareGroupStateTp = Utils.abs(key.asCoordinatorKey().hashCode()) % 3; - List curShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME) - .partitions().stream() - .filter(info -> info.partition() == shareGroupStateTp) - .map(info -> info.leader().id()) - .toList(); - - assertEquals(1, curShareCoordNodeId.size()); + // consume messages - start after small delay + ClientState consState = new ClientState(); + // using map here if we want to debug specific keys + Map consumed = new HashMap<>(); + service.schedule(() -> { + try (ShareConsumer shareConsumer = createShareConsumer(groupId, Map.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName() + ))) { + shareConsumer.subscribe(List.of(topicName)); + while (!consState.done().get()) { + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(2000L)); + consState.count().addAndGet(records.count()); + records.forEach(rec -> consumed.compute(rec.key(), (k, v) -> v == null ? 1 : v + 1)); + if (prodState.done().get() && records.count() == 0) { + consState.done().set(true); + } + } + } + }, 100L, TimeUnit.MILLISECONDS + ); - // shutdown the coordinator - KafkaBroker broker = cluster.brokers().get(curShareCoordNodeId.get(0)); - cluster.shutdownBroker(curShareCoordNodeId.get(0)); + // To be closer to real world scenarios, we will execute after + // some time has elapsed since the producer and consumer started + // working. + service.schedule(() -> { + // Get the current node hosting the __share_group_state partition + // on which tpMulti is hosted. Then shut down this node and wait + // for it to be gracefully shutdown. Then fetch the coordinator again + // and verify that it has moved to some other broker. + try (Admin admin = createAdminClient()) { + SharePartitionKey key = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti)); + int shareGroupStateTp = Utils.abs(key.asCoordinatorKey().hashCode()) % 3; + List curShareCoordNodeId = null; + try { + curShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME) + .partitions().stream() + .filter(info -> info.partition() == shareGroupStateTp) + .map(info -> info.leader().id()) + .toList(); + } catch (Exception e) { + fail(e); + } + assertEquals(1, curShareCoordNodeId.size()); + + // shutdown the coordinator + KafkaBroker broker = cluster.brokers().get(curShareCoordNodeId.get(0)); + cluster.shutdownBroker(curShareCoordNodeId.get(0)); + + // wait for it to be completely shutdown + broker.awaitShutdown(); + + List newShareCoordNodeId = null; + try { + newShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME) + .partitions().stream() + .filter(info -> info.partition() == shareGroupStateTp) + .map(info -> info.leader().id()) + .toList(); + } catch (Exception e) { + fail(e); + } - // give some breathing time - broker.awaitShutdown(); + assertEquals(1, newShareCoordNodeId.size()); + assertNotEquals(curShareCoordNodeId.get(0), newShareCoordNodeId.get(0)); + } + }, 5L, TimeUnit.SECONDS + ); - List newShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME) - .partitions().stream() - .filter(info -> info.partition() == shareGroupStateTp) - .map(info -> info.leader().id()) - .toList(); + // top the producer after some time (but after coordinator shutdown) + service.schedule(() -> { + prodState.done().set(true); + }, 10L, TimeUnit.SECONDS + ); - assertEquals(1, newShareCoordNodeId.size()); - assertNotEquals(curShareCoordNodeId.get(0), newShareCoordNodeId.get(0)); + // wait for both producer and consumer to finish + TestUtils.waitForCondition( + () -> prodState.done().get() && consState.done().get(), + 45_000L, + 500L, + () -> "prod/cons not done yet" + ); - // again produce to same topic partition - try (Producer producer = createProducer()) { - ProducerRecord record = new ProducerRecord<>( - tpMulti.topic(), - tpMulti.partition(), - null, - "key".getBytes(), - "value".getBytes() - ); - IntStream.range(0, 10).forEach(__ -> producer.send(record)); - producer.flush(); - } + // Make sure we consumed all records. Consumed records could be higher + // due to re-delivery but that is expected since we are only guaranteeing + // at least once semantics. + assertTrue(prodState.count().get() <= consState.count().get()); + Set consumedKeys = consumed.keySet(); + assertTrue(produced.containsAll(consumedKeys) && consumedKeys.containsAll(produced)); - // consume messages should only be possible if partition and share coord has moved - // from shutdown broker since we are only producing to partition 0 of topic. - try (ShareConsumer shareConsumer = createShareConsumer(groupId)) { - shareConsumer.subscribe(List.of(topicName)); - ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); - assertEquals(20, records.count()); - } + shutdownExecutorService(service); - verifyShareGroupStateTopicRecordsProduced(); - } + verifyShareGroupStateTopicRecordsProduced(); } @ClusterTest( @@ -1929,6 +1988,8 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception { @ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true") } ) + @Timeout(150) + @Flaky("KAFKA-18665") public void testComplexShareConsumer() throws Exception { setup(); String topicName = "multipart"; @@ -1936,19 +1997,18 @@ public void testComplexShareConsumer() throws Exception { createTopic(topicName, 3, 3); TopicPartition multiTp = new TopicPartition(topicName, 0); - ExecutorService executer = Executors.newCachedThreadPool(); + ScheduledExecutorService service = Executors.newScheduledThreadPool(5); - AtomicBoolean prodDone = new AtomicBoolean(false); - AtomicInteger sentCount = new AtomicInteger(0); + ClientState prodState = new ClientState(); // produce messages until we want - executer.execute(() -> { + service.execute(() -> { try (Producer producer = createProducer()) { - while (!prodDone.get()) { + while (!prodState.done().get()) { ProducerRecord record = new ProducerRecord<>(multiTp.topic(), multiTp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); producer.flush(); - sentCount.incrementAndGet(); + prodState.count().incrementAndGet(); } } }); @@ -1961,28 +2021,45 @@ public void testComplexShareConsumer() throws Exception { Map.of() ); - executer.execute(complexCons1); + service.schedule( + complexCons1, + 100L, + TimeUnit.MILLISECONDS + ); // let the complex consumer read the messages - executer.execute(() -> { - try { - TimeUnit.SECONDS.sleep(10L); - prodDone.set(true); - } catch (InterruptedException e) { - // ignore - } - }); + service.schedule(() -> { + prodState.done().set(true); + }, 10L, TimeUnit.SECONDS + ); // all messages which can be read are read, some would be redelivered - TestUtils.waitForCondition(complexCons1::isDone, 30_000L, () -> "did not close!"); - assertTrue(sentCount.get() < complexCons1.recordsRead()); + TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!"); - executer.shutdown(); - executer.shutdownNow(); + assertTrue(prodState.count().get() < complexCons1.recordsRead()); + + shutdownExecutorService(service); verifyShareGroupStateTopicRecordsProduced(); } + /** + * Util class to encapsulate state for a consumer/producer + * being executed by an {@link ExecutorService}. + */ + private static class ClientState { + private final AtomicBoolean done = new AtomicBoolean(false); + private final AtomicInteger count = new AtomicInteger(0); + + AtomicBoolean done() { + return done; + } + + AtomicInteger count() { + return count; + } + } + private int produceMessages(int messageCount) { try (Producer producer = createProducer()) { ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); @@ -2217,9 +2294,7 @@ private static class ComplexShareConsumer implements Runnable { private final String topicName; private final Map configs = new HashMap<>(); - private final AtomicBoolean isDone = new AtomicBoolean(false); - private final AtomicBoolean shouldLoop = new AtomicBoolean(true); - private final AtomicInteger readCount = new AtomicInteger(0); + private final ClientState state = new ClientState(); private final Predicate> exitCriteria; private final BiConsumer, ConsumerRecord> processFunc; @@ -2267,31 +2342,42 @@ private static class ComplexShareConsumer implements Runnable { } void stop() { - shouldLoop.set(false); + state.done().set(true); } @Override public void run() { try (ShareConsumer consumer = new KafkaShareConsumer<>(configs)) { consumer.subscribe(Set.of(this.topicName)); - while (shouldLoop.get()) { + while (!state.done().get()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); - readCount.addAndGet(records.count()); + state.count().addAndGet(records.count()); if (exitCriteria.test(records)) { - break; + state.done().set(true); } records.forEach(record -> processFunc.accept(consumer, record)); } } - isDone.set(true); } boolean isDone() { - return isDone.get(); + return state.done().get(); } int recordsRead() { - return readCount.get(); + return state.count().get(); + } + } + + private void shutdownExecutorService(ExecutorService service) { + service.shutdown(); + try { + if (!service.awaitTermination(5L, TimeUnit.SECONDS)) { + service.shutdownNow(); + } + } catch (Exception e) { + service.shutdownNow(); + Thread.currentThread().interrupt(); } } }