Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18383: Remove reserved.broker.max.id, broker.id.generation.enable, and broker.id #18478

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.StopPartition;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
Expand Down Expand Up @@ -163,7 +163,7 @@ public class RemoteLogManager implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PATTERN = "remote-log-reader-%d";
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
private final int nodeId;
private final String logDir;
private final Time time;
private final Function<TopicPartition, Optional<UnifiedLog>> fetchLog;
Expand Down Expand Up @@ -212,7 +212,7 @@ public class RemoteLogManager implements Closeable {
* Creates RemoteLogManager instance with the given arguments.
*
* @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
* @param brokerId id of the current broker.
* @param nodeId id of the current node.
* @param logDir directory of Kafka log segments.
* @param time Time instance.
* @param clusterId The cluster id.
Expand All @@ -223,7 +223,7 @@ public class RemoteLogManager implements Closeable {
*/
@SuppressWarnings({"this-escape"})
public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
int brokerId,
int nodeId,
String logDir,
String clusterId,
Time time,
Expand All @@ -232,7 +232,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
BrokerTopicStats brokerTopicStats,
Metrics metrics) throws IOException {
this.rlmConfig = rlmConfig;
this.brokerId = brokerId;
this.nodeId = nodeId;
this.logDir = logDir;
this.clusterId = clusterId;
this.time = time;
Expand Down Expand Up @@ -376,7 +376,7 @@ RemoteStorageManager createRemoteStorageManager() {

private void configureRSM() {
final Map<String, Object> rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps());
rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
rsmProps.put(KRaftConfigs.NODE_ID_CONFIG, nodeId);
remoteLogStorageManager.configure(rsmProps);
}

Expand Down Expand Up @@ -406,7 +406,7 @@ private void configureRLMM() {
// update the remoteLogMetadataProps here to override endpoint config if any
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
rlmmProps.put(KRaftConfigs.NODE_ID_CONFIG, nodeId);
rlmmProps.put(LOG_DIR_CONFIG, logDir);
rlmmProps.put("cluster.id", clusterId);

Expand Down Expand Up @@ -570,7 +570,7 @@ private void deleteRemoteLogPartition(TopicIdPartition partition) throws RemoteS
List<RemoteLogSegmentMetadataUpdate> deleteSegmentStartedEvents = metadataList.stream()
.map(metadata ->
new RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), time.milliseconds(),
metadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId))
metadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, nodeId))
.collect(Collectors.toList());
publishEvents(deleteSegmentStartedEvents).get();

Expand All @@ -585,7 +585,7 @@ private void deleteRemoteLogPartition(TopicIdPartition partition) throws RemoteS
List<RemoteLogSegmentMetadataUpdate> deleteSegmentFinishedEvents = metadataList.stream()
.map(metadata ->
new RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), time.milliseconds(),
metadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId))
metadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, nodeId))
.collect(Collectors.toList());
publishEvents(deleteSegmentFinishedEvents).get();
}
Expand Down Expand Up @@ -800,7 +800,7 @@ public RLMTask(TopicIdPartition topicIdPartition) {
}

protected LogContext getLogContext() {
return new LogContext("[RemoteLogManager=" + brokerId + " partition=" + topicIdPartition + "] ");
return new LogContext("[RemoteLogManager=" + nodeId + " partition=" + topicIdPartition + "] ");
}

public void run() {
Expand Down Expand Up @@ -1014,7 +1014,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment

boolean isTxnIdxEmpty = segment.txnIndex().isEmpty();
RemoteLogSegmentMetadata copySegmentStartedRlsm = new RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset,
segment.largestTimestamp(), brokerId, time.milliseconds(), segment.log().sizeInBytes(),
segment.largestTimestamp(), nodeId, time.milliseconds(), segment.log().sizeInBytes(),
segmentLeaderEpochs, isTxnIdxEmpty);

remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();
Expand All @@ -1041,7 +1041,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
}

RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, nodeId);

if (customMetadata.isPresent()) {
long customMetadataSize = customMetadata.get().value().length;
Expand Down Expand Up @@ -1486,7 +1486,7 @@ private boolean deleteRemoteLogSegment(
// Publish delete segment started event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, nodeId)).get();

brokerTopicStats.topicStats(topic).remoteDeleteRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().mark();
Expand All @@ -1503,7 +1503,7 @@ private boolean deleteRemoteLogSegment(
// Publish delete segment finished event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, nodeId)).get();
LOGGER.debug("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public MetadataVersionConfigValidator(
KafkaConfig config,
FaultHandler faultHandler
) {
int id = config.brokerId();
int id = config.nodeId();
this.name = "MetadataVersionPublisher(id=" + id + ")";
this.config = config;
this.faultHandler = faultHandler;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object Partition {
new Partition(topicPartition,
_topicId = topicId,
replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
localBrokerId = replicaManager.config.brokerId,
localBrokerId = replicaManager.config.nodeId,
localBrokerEpochSupplier = replicaManager.brokerEpochSupplier,
time = time,
alterPartitionListener = isrChangeListener,
Expand Down Expand Up @@ -1364,7 +1364,7 @@ class Partition(val topicPartition: TopicPartition,
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException(s"The size of the current ISR : $inSyncSize " +
s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition, " +
s"live replica(s) broker.id are : $inSyncReplicaIds")
s"live replica(s) node.id are : $inSyncReplicaIds")
}

val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1773,8 +1773,8 @@ object GroupCoordinator {
time: Time,
metrics: Metrics
): GroupCoordinator = {
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.nodeId)
val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.nodeId)
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
}

Expand Down Expand Up @@ -1804,8 +1804,8 @@ object GroupCoordinator {
groupMaxSize = config.groupCoordinatorConfig.classicGroupMaxSize,
groupInitialRebalanceDelayMs = config.groupCoordinatorConfig.classicGroupInitialRebalanceDelayMs)

val groupMetadataManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager, time, metrics)
new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory,
val groupMetadataManager = new GroupMetadataManager(config.nodeId, offsetConfig, replicaManager, time, metrics)
new GroupCoordinator(config.nodeId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory,
rebalancePurgatory, time, metrics)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ object TransactionCoordinator {
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
config.requestTimeoutMs)

val txnStateManager = new TransactionStateManager(config.brokerId, scheduler, replicaManager, metadataCache, txnConfig,
val txnStateManager = new TransactionStateManager(config.nodeId, scheduler, replicaManager, metadataCache, txnConfig,
time, metrics)

val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
val logContext = new LogContext(s"[TransactionCoordinator id=${config.nodeId}] ")
val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager,
time, logContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object TransactionMarkerChannelManager {
val networkClient = new NetworkClient(
selector,
new ManualMetadataUpdater(),
s"broker-${config.brokerId}-txn-marker-sender",
s"broker-${config.nodeId}-txn-marker-sender",
1,
50,
50,
Expand Down Expand Up @@ -161,12 +161,12 @@ class TransactionMarkerChannelManager(
networkClient: NetworkClient,
txnStateManager: TransactionStateManager,
time: Time
) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, config.requestTimeoutMs, time)
) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.nodeId, networkClient, config.requestTimeoutMs, time)
with Logging {

private val metricsGroup = new KafkaMetricsGroup(this.getClass)

this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: "
this.logIdent = "[Transaction Marker Channel Manager " + config.nodeId + "]: "

private val interBrokerListenerName: ListenerName = config.interBrokerListenerName

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SocketServer(

private val maxQueuedRequests = config.queuedMaxRequests

protected val nodeId: Int = config.brokerId
protected val nodeId: Int = config.nodeId

private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class AddPartitionsToTxnManager(
partitionFor: String => Int,
time: Time
) extends InterBrokerSendThread(
"AddPartitionsToTxnSenderThread-" + config.brokerId,
"AddPartitionsToTxnSenderThread-" + config.nodeId,
client,
config.requestTimeoutMs,
time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object AlterPartitionManager {
controllerChannelManager = channelManager,
scheduler = scheduler,
time = time,
brokerId = config.brokerId,
brokerId = config.nodeId,
brokerEpochSupplier = brokerEpochSupplier,
metadataVersionSupplier = () => metadataCache.metadataVersion()
)
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class BrokerServer(
)
alterPartitionManager.start()

val addPartitionsLogContext = new LogContext(s"[AddPartitionsToTxnManager broker=${config.brokerId}]")
val addPartitionsLogContext = new LogContext(s"[AddPartitionsToTxnManager broker=${config.nodeId}]")
val addPartitionsToTxnNetworkClient = NetworkUtils.buildNetworkClient("AddPartitionsManager", config, metrics, time, addPartitionsLogContext)
val addPartitionsToTxnManager = new AddPartitionsToTxnManager(
config,
Expand All @@ -315,7 +315,7 @@ class BrokerServer(
assignmentsManager = new AssignmentsManager(
time,
assignmentsChannelManager,
config.brokerId,
config.nodeId,
() => metadataCache.getImage(),
(directoryId: Uuid) => logManager.directoryPath(directoryId).
getOrElse("[unknown directory path]")
Expand Down Expand Up @@ -370,7 +370,7 @@ class BrokerServer(
groupCoordinator = createGroupCoordinator()

val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
config.nodeId,
time,
() => lifecycleManager.brokerEpoch,
clientToControllerChannelManager
Expand Down Expand Up @@ -534,7 +534,7 @@ class BrokerServer(
if (e != null) brokerMetadataPublisher.firstPublishFuture.completeExceptionally(e)
})
metadataPublishers.add(brokerMetadataPublisher)
brokerRegistrationTracker = new BrokerRegistrationTracker(config.brokerId,
brokerRegistrationTracker = new BrokerRegistrationTracker(config.nodeId,
() => lifecycleManager.resendBrokerRegistrationUnlessZkMode())
metadataPublishers.add(brokerRegistrationTracker)

Expand Down Expand Up @@ -631,7 +631,7 @@ class BrokerServer(
val writer = new CoordinatorPartitionWriter(
replicaManager
)
new GroupCoordinatorService.Builder(config.brokerId, config.groupCoordinatorConfig)
new GroupCoordinatorService.Builder(config.nodeId, config.groupCoordinatorConfig)
.withTime(time)
.withTimer(timer)
.withLoader(loader)
Expand Down Expand Up @@ -669,7 +669,7 @@ class BrokerServer(
val writer = new CoordinatorPartitionWriter(
replicaManager
)
Some(new ShareCoordinatorService.Builder(config.brokerId, config.shareCoordinatorConfig)
Some(new ShareCoordinatorService.Builder(config.nodeId, config.shareCoordinatorConfig)
.withTimer(timer)
.withTime(time)
.withLoader(loader)
Expand All @@ -691,7 +691,7 @@ class BrokerServer(
klass.getConstructor(classOf[PersisterStateManager])
.newInstance(
new PersisterStateManager(
NetworkUtils.buildNetworkClient("Persister", config, metrics, Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
NetworkUtils.buildNetworkClient("Persister", config, metrics, Time.SYSTEM, new LogContext(s"[Persister broker=${config.nodeId}]")),
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => shareCoordinator.get.partitionFor(key), config.interBrokerListenerName),
Time.SYSTEM,
new SystemTimerReaper(
Expand All @@ -717,7 +717,7 @@ class BrokerServer(

protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.nodeId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).toJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,

def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager): Unit = {
if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).nonEmpty) {
val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop)
val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.nodeId, prop)
quotaManager.markThrottled(topic, partitions.map(Integer.valueOf).asJava)
debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions")
debug(s"Setting $prop on broker ${kafkaConfig.nodeId} for topic: $topic and partitions $partitions")
} else {
quotaManager.removeThrottle(topic)
debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic")
debug(s"Removing $prop from broker ${kafkaConfig.nodeId} for topic $topic")
}
}
updateThrottledList(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, quotas.leader)
Expand Down Expand Up @@ -206,8 +206,8 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
def processConfigChanges(brokerId: String, properties: Properties): Unit = {
if (brokerId == ZooKeeperInternals.DEFAULT_STRING)
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
else if (brokerConfig.nodeId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.nodeId, properties)
}
val updatedDynamicBrokerConfigs = brokerConfig.dynamicConfig.currentDynamicBrokerConfigs
val updatedDynamicDefaultConfigs = brokerConfig.dynamicConfig.currentDynamicDefaultConfigs
Expand Down
Loading
Loading