Skip to content
This repository was archived by the owner on Sep 26, 2024. It is now read-only.

Commit

Permalink
KAFKA-16860; [2/2] Introduce group.version feature flag (apache#16149)
Browse files Browse the repository at this point in the history
This patch updates the system tests to correctly enable the new consumer protocol/coordinator in the tests requiring them.

I went with the simplest approach for now. Long term, I think that we should refactor the tests to better handle features and non-production features.

I got a successful run of the consumer system tests with this patch combined with apache#16120: https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1717155071--dajac--KAFKA-16860-2--29028ae0dd/2024-05-31--001./2024-05-31--001./report.html.

Reviewers: Justine Olshan <[email protected]>
  • Loading branch information
dajac authored May 31, 2024
1 parent ba61ff0 commit 190dd79
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
3 changes: 3 additions & 0 deletions tests/kafkatest/services/kafka/config_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"

NEW_GROUP_COORDINATOR_ENABLE="group.coordinator.new.enable"
GROUP_COORDINATOR_REBALANCE_PROTOCOLS="group.coordinator.rebalance.protocols"

UNSTABLE_FEATURE_VERSIONS_ENABLE="unstable.feature.versions.enable"

"""
From KafkaConfig.scala
Expand Down
16 changes: 15 additions & 1 deletion tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,12 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self._security_config = None

# When the new group coordinator is enabled, the new consumer rebalance
# protocol is enabled too.
rebalance_protocols = "classic"
if self.use_new_coordinator:
rebalance_protocols = "classic,consumer"

for node in self.nodes:
node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)

Expand All @@ -422,7 +428,9 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
kraft_broker_configs = {
config_property.PORT: config_property.FIRST_BROKER_PORT,
config_property.NODE_ID: self.idx(node),
config_property.NEW_GROUP_COORDINATOR_ENABLE: use_new_coordinator
config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE: use_new_coordinator,
config_property.NEW_GROUP_COORDINATOR_ENABLE: use_new_coordinator,
config_property.GROUP_COORDINATOR_REBALANCE_PROTOCOLS: rebalance_protocols
}
kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
kraft_broker_plus_zk_configs.update(zk_broker_configs)
Expand Down Expand Up @@ -781,7 +789,9 @@ def prop_file(self, node):
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'

if self.use_new_coordinator:
override_configs[config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE] = 'true'
override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = 'true'
override_configs[config_property.GROUP_COORDINATOR_REBALANCE_PROTOCOLS] = 'classic,consumer'

for prop in self.server_prop_overrides:
override_configs[prop[0]] = prop[1]
Expand Down Expand Up @@ -884,6 +894,10 @@ def start_node(self, node, timeout_sec=60, **kwargs):
# format log directories if necessary
kafka_storage_script = self.path.script("kafka-storage.sh", node)
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)

if self.use_new_coordinator:
cmd += " -f group.version=1"

self.logger.info("Running log directory format command...\n%s" % cmd)
node.account.ssh(cmd)

Expand Down

0 comments on commit 190dd79

Please sign in to comment.