Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
FrankYang0529 committed Jan 18, 2025
1 parent e124d39 commit 6811b8b
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 37 deletions.
3 changes: 0 additions & 3 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def quotaConfig: QuotaConfig = _quotaConfig

/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG)
val maxReservedBrokerId: Int = getInt(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG)
var brokerId: Int = getInt(ServerConfigs.BROKER_ID_CONFIG)
val nodeId: Int = getInt(KRaftConfigs.NODE_ID_CONFIG)
val initialRegistrationTimeoutMs: Int = getInt(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG)
Expand All @@ -231,7 +229,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)

def requiresZookeeper: Boolean = processRoles.isEmpty
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty

private def parseProcessRoles(): Set[ProcessRole] = {
Expand Down
14 changes: 0 additions & 14 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1182,8 +1182,6 @@ class KafkaConfigTest {
defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
// For ZkConnectionTimeoutMs
defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234")
defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false")
defaults.setProperty(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, "1")
defaults.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
defaults.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:1122")
defaults.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, "127.0.0.1:2, 127.0.0.2:3")
Expand All @@ -1199,8 +1197,6 @@ class KafkaConfigTest {

val config = KafkaConfig.fromProps(defaults)
assertEquals(1234, config.zkConnectionTimeoutMs)
assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId)
assertEquals(1, config.brokerId)
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(_.connectionString))
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
Expand Down Expand Up @@ -1458,16 +1454,6 @@ class KafkaConfigTest {
assertFalse(isValidKafkaConfig(props))
}

@Test
def testRejectsNegativeNodeIdForRaftBasedCaseWithAutoGenDisabled(): Unit = {
// -1 is the default for both node.id and broker.id
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
assertFalse(isValidKafkaConfig(props))
}

@Test
def testCustomMetadataLogDir(): Unit = {
val metadataDir = "/path/to/metadata/dir"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util
import java.util.concurrent.atomic.AtomicReference
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter}
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
import org.apache.kafka.server.metrics.MetricConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -73,8 +73,7 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
super.setUp(testInfo)
val props = TestUtils.createBrokerConfig(1)
props.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "true")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
config = KafkaConfig.fromProps(props)
broker = createBroker(config, threadNamePrefix = Option(this.getClass.getName))
broker.startup()
Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ object TestUtils extends Logging {
props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
props.setProperty(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, TimeUnit.MINUTES.toMillis(10).toString)
props.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
props.put(ServerConfigs.BROKER_ID_CONFIG, nodeId.toString)
props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners)
props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners)
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,7 @@

public class ServerConfigs {
/** ********* General Configuration ***********/
public static final String RESERVED_BROKER_MAX_ID_CONFIG = "reserved.broker.max.id";
public static final int RESERVED_BROKER_MAX_ID_DEFAULT = 1000;
public static final String RESERVED_BROKER_MAX_ID_DOC = "Max number that can be used for a broker.id";

public static final String BROKER_ID_GENERATION_ENABLE_CONFIG = "broker.id.generation.enable";
public static final boolean BROKER_ID_GENERATION_ENABLE_DEFAULT = true;
public static final String BROKER_ID_GENERATION_ENABLE_DOC = "Enable automatic broker id generation on the server. When enabled the value configured for " + RESERVED_BROKER_MAX_ID_CONFIG + " should be reviewed.";


public static final String BROKER_ID_CONFIG = "broker.id";
public static final int BROKER_ID_DEFAULT = -1;
public static final String BROKER_ID_DOC = "The broker id for this server. If unset, a unique broker id will be generated." +
"To avoid conflicts between ZooKeeper generated broker id's and user configured broker id's, generated broker ids " +
"start from " + RESERVED_BROKER_MAX_ID_CONFIG + " + 1.";

public static final String MESSAGE_MAX_BYTES_CONFIG = "message.max.bytes";
public static final String MESSAGE_MAX_BYTES_DOC = TopicConfig.MAX_MESSAGE_BYTES_DOC +
Expand Down Expand Up @@ -131,9 +118,6 @@ public class ServerConfigs {
"the StandardAuthorizer (which stores ACLs in the metadata log.) By default, all listeners included in controller.listener.names " +
"will also be early start listeners. A listener should not appear in this list if it accepts external traffic.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(BROKER_ID_GENERATION_ENABLE_CONFIG, BOOLEAN, BROKER_ID_GENERATION_ENABLE_DEFAULT, MEDIUM, BROKER_ID_GENERATION_ENABLE_DOC)
.define(RESERVED_BROKER_MAX_ID_CONFIG, INT, RESERVED_BROKER_MAX_ID_DEFAULT, atLeast(0), MEDIUM, RESERVED_BROKER_MAX_ID_DOC)
.define(BROKER_ID_CONFIG, INT, BROKER_ID_DEFAULT, HIGH, BROKER_ID_DOC)
.define(MESSAGE_MAX_BYTES_CONFIG, INT, LogConfig.DEFAULT_MAX_MESSAGE_BYTES, atLeast(0), HIGH, MESSAGE_MAX_BYTES_DOC)
.define(NUM_IO_THREADS_CONFIG, INT, NUM_IO_THREADS_DEFAULT, atLeast(1), HIGH, NUM_IO_THREADS_DOC)
.define(NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG, INT, null, HIGH, NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC)
Expand Down

0 comments on commit 6811b8b

Please sign in to comment.