Skip to content

Commit

Permalink
KAFKA-18383: Remove reserved.broker.max.id, broker.id.generation.enab…
Browse files Browse the repository at this point in the history
…le, and broker.id

Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Jan 18, 2025
1 parent 6811b8b commit 418d94a
Show file tree
Hide file tree
Showing 85 changed files with 319 additions and 375 deletions.
6 changes: 3 additions & 3 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.StopPartition;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
Expand Down Expand Up @@ -376,7 +376,7 @@ RemoteStorageManager createRemoteStorageManager() {

private void configureRSM() {
final Map<String, Object> rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps());
rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
rsmProps.put(KRaftConfigs.NODE_ID_CONFIG, brokerId);
remoteLogStorageManager.configure(rsmProps);
}

Expand Down Expand Up @@ -406,7 +406,7 @@ private void configureRLMM() {
// update the remoteLogMetadataProps here to override endpoint config if any
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
rlmmProps.put(KRaftConfigs.NODE_ID_CONFIG, brokerId);
rlmmProps.put(LOG_DIR_CONFIG, logDir);
rlmmProps.put("cluster.id", clusterId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public MetadataVersionConfigValidator(
KafkaConfig config,
FaultHandler faultHandler
) {
int id = config.brokerId();
int id = config.nodeId();
this.name = "MetadataVersionPublisher(id=" + id + ")";
this.config = config;
this.faultHandler = faultHandler;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ object Partition {
new Partition(topicPartition,
_topicId = topicId,
replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
localBrokerId = replicaManager.config.brokerId,
localBrokerId = replicaManager.config.nodeId,
localBrokerEpochSupplier = replicaManager.brokerEpochSupplier,
time = time,
alterPartitionListener = isrChangeListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1773,8 +1773,8 @@ object GroupCoordinator {
time: Time,
metrics: Metrics
): GroupCoordinator = {
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.nodeId)
val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.nodeId)
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
}

Expand Down Expand Up @@ -1804,9 +1804,9 @@ object GroupCoordinator {
groupMaxSize = config.groupCoordinatorConfig.classicGroupMaxSize,
groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.classicGroupInitialRebalanceDelayMs)

val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
val groupMetadataManager = new GroupMetadataManager(config.nodeId, config.interBrokerProtocolVersion,
offsetConfig, replicaManager, time, metrics)
new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory,
new GroupCoordinator(config.nodeId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory,
rebalancePurgatory, time, metrics)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ object TransactionCoordinator {
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
config.requestTimeoutMs)

val txnStateManager = new TransactionStateManager(config.brokerId, scheduler, replicaManager, metadataCache, txnConfig,
val txnStateManager = new TransactionStateManager(config.nodeId, scheduler, replicaManager, metadataCache, txnConfig,
time, metrics)

val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
val logContext = new LogContext(s"[TransactionCoordinator id=${config.nodeId}] ")
val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager,
time, logContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object TransactionMarkerChannelManager {
val networkClient = new NetworkClient(
selector,
new ManualMetadataUpdater(),
s"broker-${config.brokerId}-txn-marker-sender",
s"broker-${config.nodeId}-txn-marker-sender",
1,
50,
50,
Expand Down Expand Up @@ -161,12 +161,12 @@ class TransactionMarkerChannelManager(
networkClient: NetworkClient,
txnStateManager: TransactionStateManager,
time: Time
) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, config.requestTimeoutMs, time)
) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.nodeId, networkClient, config.requestTimeoutMs, time)
with Logging {

private val metricsGroup = new KafkaMetricsGroup(this.getClass)

this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: "
this.logIdent = "[Transaction Marker Channel Manager " + config.nodeId + "]: "

private val interBrokerListenerName: ListenerName = config.interBrokerListenerName

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SocketServer(

private val maxQueuedRequests = config.queuedMaxRequests

protected val nodeId: Int = config.brokerId
protected val nodeId: Int = config.nodeId

private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class AddPartitionsToTxnManager(
partitionFor: String => Int,
time: Time
) extends InterBrokerSendThread(
"AddPartitionsToTxnSenderThread-" + config.brokerId,
"AddPartitionsToTxnSenderThread-" + config.nodeId,
client,
config.requestTimeoutMs,
time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object AlterPartitionManager {
controllerChannelManager = channelManager,
scheduler = scheduler,
time = time,
brokerId = config.brokerId,
brokerId = config.nodeId,
brokerEpochSupplier = brokerEpochSupplier,
metadataVersionSupplier = () => metadataCache.metadataVersion()
)
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class BrokerServer(
)
alterPartitionManager.start()

val addPartitionsLogContext = new LogContext(s"[AddPartitionsToTxnManager broker=${config.brokerId}]")
val addPartitionsLogContext = new LogContext(s"[AddPartitionsToTxnManager broker=${config.nodeId}]")
val addPartitionsToTxnNetworkClient = NetworkUtils.buildNetworkClient("AddPartitionsManager", config, metrics, time, addPartitionsLogContext)
val addPartitionsToTxnManager = new AddPartitionsToTxnManager(
config,
Expand All @@ -315,7 +315,7 @@ class BrokerServer(
assignmentsManager = new AssignmentsManager(
time,
assignmentsChannelManager,
config.brokerId,
config.nodeId,
() => metadataCache.getImage(),
(directoryId: Uuid) => logManager.directoryPath(directoryId).
getOrElse("[unknown directory path]")
Expand Down Expand Up @@ -370,7 +370,7 @@ class BrokerServer(
groupCoordinator = createGroupCoordinator()

val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
config.nodeId,
time,
() => lifecycleManager.brokerEpoch,
clientToControllerChannelManager
Expand Down Expand Up @@ -534,7 +534,7 @@ class BrokerServer(
if (e != null) brokerMetadataPublisher.firstPublishFuture.completeExceptionally(e)
})
metadataPublishers.add(brokerMetadataPublisher)
brokerRegistrationTracker = new BrokerRegistrationTracker(config.brokerId,
brokerRegistrationTracker = new BrokerRegistrationTracker(config.nodeId,
() => lifecycleManager.resendBrokerRegistrationUnlessZkMode())
metadataPublishers.add(brokerRegistrationTracker)

Expand Down Expand Up @@ -631,7 +631,7 @@ class BrokerServer(
val writer = new CoordinatorPartitionWriter(
replicaManager
)
new GroupCoordinatorService.Builder(config.brokerId, config.groupCoordinatorConfig)
new GroupCoordinatorService.Builder(config.nodeId, config.groupCoordinatorConfig)
.withTime(time)
.withTimer(timer)
.withLoader(loader)
Expand Down Expand Up @@ -669,7 +669,7 @@ class BrokerServer(
val writer = new CoordinatorPartitionWriter(
replicaManager
)
Some(new ShareCoordinatorService.Builder(config.brokerId, config.shareCoordinatorConfig)
Some(new ShareCoordinatorService.Builder(config.nodeId, config.shareCoordinatorConfig)
.withTimer(timer)
.withTime(time)
.withLoader(loader)
Expand All @@ -691,7 +691,7 @@ class BrokerServer(
klass.getConstructor(classOf[PersisterStateManager])
.newInstance(
new PersisterStateManager(
NetworkUtils.buildNetworkClient("Persister", config, metrics, Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
NetworkUtils.buildNetworkClient("Persister", config, metrics, Time.SYSTEM, new LogContext(s"[Persister broker=${config.nodeId}]")),
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => shareCoordinator.get.partitionFor(key), config.interBrokerListenerName),
Time.SYSTEM,
new SystemTimerReaper(
Expand All @@ -717,7 +717,7 @@ class BrokerServer(

protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.nodeId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).toJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,

def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager): Unit = {
if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).nonEmpty) {
val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop)
val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.nodeId, prop)
quotaManager.markThrottled(topic, partitions.map(Integer.valueOf).asJava)
debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
debug(s"Setting $prop on broker ${kafkaConfig.nodeId} for topic: $topic and partitions $partitions")
} else {
quotaManager.removeThrottle(topic)
debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
debug(s"Removing $prop from broker ${kafkaConfig.nodeId} for topic $topic")
}
}
updateThrottledList(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, quotas.leader)
Expand Down Expand Up @@ -206,8 +206,8 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
def processConfigChanges(brokerId: String, properties: Properties): Unit = {
if (brokerId == ZooKeeperInternals.DEFAULT_STRING)
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
else if (brokerConfig.nodeId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.nodeId, properties)
}
val updatedDynamicBrokerConfigs = brokerConfig.dynamicConfig.currentDynamicBrokerConfigs
val updatedDynamicDefaultConfigs = brokerConfig.dynamicConfig.currentDynamicDefaultConfigs
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/ConfigHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,17 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
if (resource.resourceName == null || resource.resourceName.isEmpty)
createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation))
else if (resourceNameToBrokerId(resource.resourceName) == config.brokerId)
else if (resourceNameToBrokerId(resource.resourceName) == config.nodeId)
createResponseConfig(allConfigs(config),
createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation))
else
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.resourceName}")
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.nodeId} or empty string, but received ${resource.resourceName}")

case ConfigResource.Type.BROKER_LOGGER =>
if (resource.resourceName == null || resource.resourceName.isEmpty)
throw new InvalidRequestException("Broker id must not be empty")
else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId)
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}")
else if (resourceNameToBrokerId(resource.resourceName) != config.nodeId)
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.nodeId} but received ${resource.resourceName}")
else
createResponseConfig(Log4jController.loggers,
(name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object DelegationTokenManager {
class DelegationTokenManager(val config: KafkaConfig,
val tokenCache: DelegationTokenCache,
val time: Time) extends Logging {
this.logIdent = s"[Token Manager on Node ${config.brokerId}]: "
this.logIdent = s"[Token Manager on Node ${config.nodeId}]: "

protected val lock = new Object()

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.ClientTelemetry
Expand Down Expand Up @@ -246,7 +246,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
case _ =>
}
addReconfigurable(kafkaServer.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.nodeId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
addReconfigurable(new DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config))

addBrokerReconfigurable(new BrokerDynamicThreadPool(kafkaServer))
Expand Down Expand Up @@ -820,7 +820,7 @@ class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: Metri

class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) {
private[server] val dynamicConfig = config.dynamicConfig
private val propsOverride = Map[String, AnyRef](ServerConfigs.BROKER_ID_CONFIG -> brokerId.toString)
private val propsOverride = Map[String, AnyRef](KRaftConfigs.NODE_ID_CONFIG -> brokerId.toString)
private[server] val currentReporters = mutable.Map[String, MetricsReporter]()
createReporters(config, clusterId, metricsReporterClasses(dynamicConfig.currentKafkaConfig.values()).asJava,
Collections.emptyMap[String, Object])
Expand Down
23 changes: 2 additions & 21 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,6 @@ object KafkaConfig {
}
if (maybeSensitive) Password.HIDDEN else value
}

/**
* Copy a configuration map, populating some keys that we want to treat as synonyms.
*/
def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
val output = new util.HashMap[Any, Any](input)
val brokerId = output.get(ServerConfigs.BROKER_ID_CONFIG)
val nodeId = output.get(KRaftConfigs.NODE_ID_CONFIG)
if (brokerId == null && nodeId != null) {
output.put(ServerConfigs.BROKER_ID_CONFIG, nodeId)
} else if (brokerId != null && nodeId == null) {
output.put(KRaftConfigs.NODE_ID_CONFIG, brokerId)
}
output
}
}

/**
Expand All @@ -155,8 +140,8 @@ object KafkaConfig {
class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
extends AbstractKafkaConfig(KafkaConfig.configDef, props, Utils.castToStringObjectMap(props), doLog) with Logging {

def this(props: java.util.Map[_, _]) = this(true, KafkaConfig.populateSynonyms(props))
def this(props: java.util.Map[_, _], doLog: Boolean) = this(doLog, KafkaConfig.populateSynonyms(props))
def this(props: java.util.Map[_, _]) = this(true, props)
def this(props: java.util.Map[_, _], doLog: Boolean) = this(doLog, props)

// Cache the current config to avoid acquiring read lock to access from dynamicConfig
@volatile private var currentConfig = this
Expand Down Expand Up @@ -221,7 +206,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def quotaConfig: QuotaConfig = _quotaConfig

/** ********* General Configuration ***********/
var brokerId: Int = getInt(ServerConfigs.BROKER_ID_CONFIG)
val nodeId: Int = getInt(KRaftConfigs.NODE_ID_CONFIG)
val initialRegistrationTimeoutMs: Int = getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG)
val brokerHeartbeatIntervalMs: Int = getInt(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG)
Expand Down Expand Up @@ -630,9 +614,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
validateValues()

private def validateValues(): Unit = {
if (nodeId != brokerId) {
throw new ConfigException(s"You must set `${KRaftConfigs.NODE_ID_CONFIG}` to the same value as `${ServerConfigs.BROKER_ID_CONFIG}`.")
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class KafkaRaftServer(
// the controller endpoints are passed to the KRaft manager
controller.foreach(_.startup())
broker.foreach(_.startup())
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.nodeId.toString, metrics, time.milliseconds())
info(KafkaBroker.STARTED_MESSAGE)
}

Expand All @@ -104,7 +104,7 @@ class KafkaRaftServer(
// stops the raft client early on, which would disrupt broker shutdown.
broker.foreach(_.shutdown())
controller.foreach(_.shutdown())
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.nodeId.toString, metrics), this)
}

override def awaitShutdown(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
replicaManager: ReplicaManager,
quota: ReplicaQuota) extends LeaderEndPoint with Logging {

private val replicaId = brokerConfig.brokerId
private val replicaId = brokerConfig.nodeId
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
private var inProgressPartition: Option[TopicPartition] = None
Expand Down
Loading

0 comments on commit 418d94a

Please sign in to comment.