diff --git a/pom.xml b/pom.xml index 4f697903..2ccf7289 100644 --- a/pom.xml +++ b/pom.xml @@ -73,7 +73,7 @@ 3 - 2.7.2 + 2.8.0 1.12.3 2.11 1.18.18 @@ -90,6 +90,7 @@ 3.7.7 1.3 4.13.2 + 5.7.0 3.5.4 @@ -114,6 +115,12 @@ ${junit.version} test + + net.java.dev.jna + jna + ${jna.version} + test + diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java index 7032208b..d5cd7aaa 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java @@ -127,7 +127,7 @@ public TypeInformation getProducedType() { public SourceReader createReader(SourceReaderContext readerContext) { FutureCompletingBlockingQueue>> elementsQueue = new FutureCompletingBlockingQueue<>(); - ExecutorProvider listenerExecutor = new ExecutorProvider(1, r -> new Thread(r, "Pulsar listener executor")); + ExecutorProvider listenerExecutor = new ExecutorProvider(1, "Pulsar listener executor"); Closer splitCloser = Closer.create(); splitCloser.register(listenerExecutor::shutdownNow); Supplier, PulsarPartitionSplit>> splitReaderSupplier = () -> { diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/util/AsyncUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/util/AsyncUtils.java index e2a5a42b..ce9a2fa5 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/util/AsyncUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/util/AsyncUtils.java @@ -17,7 +17,7 @@ import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.FunctionWithException; -import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import java.util.ArrayList; import java.util.List; @@ -52,7 +52,7 @@ public static , E extends Exception> void p for (int index = 0; index < asyncFutures.size(); index++) { try { - R result = asyncFutures.get(index).get(PulsarAdmin.DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); + R result = asyncFutures.get(index).get(PulsarAdminImpl.DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); consumer.accept(elements.get(index), result); } catch (ExecutionException e) { E cause = exceptionClass.cast(e.getCause()); diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index aadc1518..5d1887c2 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -499,7 +499,7 @@ protected void recoverAndCommit(PulsarTransactionState transaction) { log.debug("transaction {} is recoverAndCommit...", transaction.transactionalId); TransactionCoordinatorClientImpl tcClient = CachedPulsarClient.getOrCreate(clientConfigurationData).getTcClient(); TxnID transactionalId = transaction.transactionalId; - tcClient.commit(transactionalId, transaction.pendingMessages); + tcClient.commit(transactionalId); } catch (PulsarClientException executionException) { log.error("Failed to getOrCreate a PulsarClient"); throw new RuntimeException(executionException); @@ -522,7 +522,7 @@ protected void recoverAndAbort(PulsarTransactionState transaction) { log.debug("transaction {} is recoverAndAbort...", transaction.transactionalId); TransactionCoordinatorClientImpl tcClient = CachedPulsarClient.getOrCreate(clientConfigurationData).getTcClient(); TxnID transactionalId = transaction.transactionalId; - tcClient.abort(transactionalId, transaction.pendingMessages); + tcClient.abort(transactionalId); } catch (PulsarClientException executionException) { log.error("Failed to getOrCreate a PulsarClient"); throw new RuntimeException(executionException); diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java index fe7a7b8b..3bdee765 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -114,7 +115,7 @@ public void putSchema(ObjectPath tablePath, CatalogBaseTable table, String forma // Writing schemaInfo#properties causes the client to fail to consume it when it is a Pulsar native type. if (!StringUtils.equals(format, AtomicRowDataFormatFactory.IDENTIFIER)) { - schemaInfo.setProperties(extractedProperties(table)); + ((SchemaInfoImpl) schemaInfo).setProperties(extractedProperties(table)); } pulsarMetadataReader.putSchema(topicName, schemaInfo); } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java index ebf46ae7..497cd297 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java @@ -290,9 +290,9 @@ public MessageId getPositionFromSubscription(TopicRange topic, MessageId default try { String subscriptionName = subscriptionNameFrom(topic); TopicStats topicStats = admin.topics().getStats(topic.getTopic()); - if (topicStats.subscriptions.containsKey(subscriptionName)) { - SubscriptionStats subStats = topicStats.subscriptions.get(subscriptionName); - if (subStats.consumers.size() != 0) { + if (topicStats.getSubscriptions().containsKey(subscriptionName)) { + SubscriptionStats subStats = topicStats.getSubscriptions().get(subscriptionName); + if (subStats.getConsumers().size() != 0) { throw new RuntimeException("Subscription been actively used by other consumers, " + "in this situation, the exactly-once semantics cannot be guaranteed."); } else { diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java index 24efa04c..d331114e 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.PostSchemaPayload; @@ -123,15 +124,15 @@ public static boolean compatibleSchema(SchemaInfo s1, SchemaInfo s2) { static GenericSchema avroSchema2PulsarSchema(Schema avroSchema) { byte[] schemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8); - SchemaInfo si = new SchemaInfo(); + SchemaInfoImpl si = new SchemaInfoImpl(); si.setName("Avro"); si.setSchema(schemaBytes); si.setType(SchemaType.AVRO); return org.apache.pulsar.client.api.Schema.generic(si); } - public static SchemaInfo emptySchemaInfo() { - return SchemaInfo.builder() + public static SchemaInfoImpl emptySchemaInfo() { + return SchemaInfoImpl.builder() .name("empty") .type(SchemaType.NONE) .schema(new byte[0]) @@ -160,7 +161,7 @@ public static SchemaInfo buildRowSchema(DataType dataType, RecordSchemaType recordSchemaType) { org.apache.avro.Schema avroSchema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType()); byte[] schemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8); - SchemaInfo si = new SchemaInfo(); + SchemaInfoImpl si = new SchemaInfoImpl(); si.setSchema(schemaBytes); switch (recordSchemaType) { case AVRO: @@ -247,9 +248,9 @@ private static SchemaInfo getProtobufSchemaInfo(S } } - public static SchemaInfo getSchemaInfo(SchemaType type, DataType dataType) { + public static SchemaInfoImpl getSchemaInfo(SchemaType type, DataType dataType) { byte[] schemaBytes = getAvroSchema(dataType).toString().getBytes(StandardCharsets.UTF_8); - return SchemaInfo.builder() + return SchemaInfoImpl.builder() .name("Record") .schema(schemaBytes) .type(type) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SimpleSchemaTranslator.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SimpleSchemaTranslator.java index 3fafb47e..f3ff746e 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SimpleSchemaTranslator.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SimpleSchemaTranslator.java @@ -30,6 +30,7 @@ import com.google.protobuf.Descriptors; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -98,7 +99,7 @@ public static org.apache.pulsar.client.api.Schema sqlType2PulsarSchema(DataType static GenericSchema avroSchema2PulsarSchema(Schema avroSchema) { byte[] schemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8); - SchemaInfo si = new SchemaInfo(); + SchemaInfoImpl si = new SchemaInfoImpl(); si.setName("Avro"); si.setSchema(schemaBytes); si.setType(SchemaType.AVRO); @@ -215,7 +216,7 @@ private static Schema sqlType2AvroSchema(DataType flinkType, boolean nullable, } public static SchemaInfo emptySchemaInfo() { - return SchemaInfo.builder() + return SchemaInfoImpl.builder() .name("empty") .type(SchemaType.NONE) .schema(new byte[0]) diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java index c60024ff..22e0f889 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java @@ -132,7 +132,11 @@ public void testDatabases() throws Exception { try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getAdminUrl()).build()) { admin.tenants().createTenant("tn1", - new TenantInfo(Sets.newHashSet(), Sets.newHashSet("standalone"))); + TenantInfo.builder() + .adminRoles(Sets.newHashSet()) + .allowedClusters(Sets.newHashSet("standalone")) + .build() + ); for (String ns : namespaces) { admin.namespaces().createNamespace(ns); } diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAuthTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAuthTest.java index 1b923004..70a79c40 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAuthTest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAuthTest.java @@ -66,7 +66,7 @@ public class PulsarAuthTest { @BeforeClass public static void prepare() throws Exception { log.info(" Starting PulsarTestBase "); - final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.7.0"); + final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.8.0"); DockerImageName pulsar = DockerImageName.parse(pulsarImage) .asCompatibleSubstituteFor("apachepulsar/pulsar"); pulsarService = new PulsarContainer(pulsar); diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java index e0d6a6c8..4bc45c0a 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java @@ -111,7 +111,7 @@ public static void prepare() throws Exception { if (StringUtils.isNotBlank(adminUrl) && StringUtils.isNotBlank(serviceUrl)) { log.info(" Use extend Pulsar Service "); } else { - final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.7.0"); + final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.8.0"); DockerImageName pulsar = DockerImageName.parse(pulsarImage) .asCompatibleSubstituteFor("apachepulsar/pulsar"); pulsarService = new PulsarContainer(pulsar); diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTransactionalSinkTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTransactionalSinkTest.java index 3c2d4ff2..eb7b2502 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTransactionalSinkTest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTransactionalSinkTest.java @@ -22,13 +22,12 @@ import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic; import org.apache.flink.streaming.connectors.pulsar.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.pulsar.testutils.IntegerSource; +import org.apache.flink.streaming.connectors.pulsar.testutils.TestUtils; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.test.util.TestUtils; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.compress.utils.Sets; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -36,12 +35,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.TenantInfo; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.testcontainers.containers.BindMode; import org.testcontainers.containers.PulsarContainer; @@ -77,7 +72,7 @@ public class PulsarTransactionalSinkTest { public static void prepare() throws Exception { log.info(" Starting PulsarTestBase "); - final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.7.0"); + final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.8.0"); DockerImageName pulsar = DockerImageName.parse(pulsarImage) .asCompatibleSubstituteFor("apachepulsar/pulsar"); pulsarService = new PulsarContainer(pulsar); @@ -117,14 +112,7 @@ public static void shutDownServices() throws Exception { * Tests the exactly-once semantic for the simple writes into Pulsar. */ @Test - @Ignore("Pulsar 2.7.1 does not support the use of standalone transactions, requires Pulsar 2.8 version") public void testExactlyOnceRegularSink() throws Exception { - admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build(); - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfo(Sets.newHashSet("app1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16); - testExactlyOnce(1); } diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java index 66951098..6e6b0d8a 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java @@ -56,12 +56,13 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; -import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.schema.SchemaType; import org.junit.Test; @@ -137,7 +138,7 @@ public void testAvroSerializeDeserialize() throws Exception { final CompletableFuture consumer = autoConsumer(topicName ); - RowData newRowData = deserializationSchema.deserialize(consumer.get(2000, TimeUnit.MILLISECONDS)); + RowData newRowData = deserializationSchema.deserialize(consumer.get(10000, TimeUnit.MILLISECONDS)); assertEquals(rowData, newRowData); } @@ -167,7 +168,7 @@ public void testJsonSerializeDeserialize() throws Exception { rowData); final CompletableFuture consumer = autoConsumer(topicName); - RowData newRowData = deserializationSchema.deserialize(consumer.get(2000, TimeUnit.MILLISECONDS)); + RowData newRowData = deserializationSchema.deserialize(consumer.get(10000, TimeUnit.MILLISECONDS)); assertEquals(rowData, newRowData); } @@ -206,7 +207,7 @@ public void testProtoBufSerializeDeserialize() throws Exception { sendMessage(topicName, rowDataFlinkSchema, rowData); final CompletableFuture consumer = autoConsumer(topicName); - RowData newRowData = deserializationSchema.deserialize(consumer.get(2000, TimeUnit.MILLISECONDS)); + RowData newRowData = deserializationSchema.deserialize(consumer.get(10000, TimeUnit.MILLISECONDS)); newRowData = validatePbRow( newRowData, PbRowTypeInformation.generateRowType(SimpleTest.getDescriptor())); assertEquals(9, newRowData.getArity()); @@ -292,7 +293,7 @@ public org.apache.pulsar.client.api.Schema toPulsarSchema(SchemaType sche SerializationSchema serializationSchema, DeserializationSchema deserializationSchema) { byte[] schemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8); - SchemaInfo si = new SchemaInfo(); + SchemaInfoImpl si = new SchemaInfoImpl(); si.setName("Record"); si.setSchema(schemaBytes); si.setType(schemaType); @@ -301,11 +302,14 @@ public org.apache.pulsar.client.api.Schema toPulsarSchema(SchemaType sche public void sendMessage(String topic, org.apache.pulsar.client.api.Schema schema, RowData data) throws Exception { - try ( - PulsarClient pulsarClient = PulsarClient.builder() + try (PulsarAdmin admin = getPulsarAdmin()){ + admin.schemas().createSchema(topic, schema.getSchemaInfo()); + } + try ( + PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); - final Producer producer = pulsarClient.newProducer(schema) + final Producer producer = pulsarClient.newProducer(schema) .topic(topic) .create()) { pulsarClient diff --git a/pulsar-flink-connector/src/test/resources/pulsar/txnStandalone.conf b/pulsar-flink-connector/src/test/resources/pulsar/txnStandalone.conf index a7cd1bb9..2c541ad2 100644 --- a/pulsar-flink-connector/src/test/resources/pulsar/txnStandalone.conf +++ b/pulsar-flink-connector/src/test/resources/pulsar/txnStandalone.conf @@ -15,8 +15,10 @@ ### --- General broker settings --- ### # Zookeeper quorum connection string +zookeeperServers= # Configuration Store connection string +configurationStoreServers= brokerServicePort=6650 @@ -27,19 +29,26 @@ webServicePort=8080 bindAddress=0.0.0.0 # Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used. +advertisedAddress= + +# Enable or disable the HAProxy protocol. +haProxyProtocolEnabled=false # Number of threads to use for Netty IO. Default is set to 2 * Runtime.getRuntime().availableProcessors() +numIOThreads= # Number of threads to use for ordered executor. The ordered executor is used to operate with zookeeper, # such as init zookeeper client, get namespace policies from zookeeper etc. It also used to split bundle. Default is 8 numOrderedExecutorThreads=8 # Number of threads to use for HTTP requests processing. Default is set to 2 * Runtime.getRuntime().availableProcessors() +numHttpServerThreads= # Number of thread pool size to use for pulsar broker service. # The executor in thread pool will do basic broker operation like load/unload bundle, update managedLedgerConfig, # update topic/subscription/replicator message dispatch rate, do leader election etc. # Default is Runtime.getRuntime().availableProcessors() +numExecutorThreadPoolSize= # Number of thread pool size to use for pulsar zookeeper callback service # The cache executor thread pool is used for restarting global zookeeper session. @@ -79,6 +88,9 @@ backlogQuotaCheckIntervalInSeconds=60 # Default per-topic backlog quota limit backlogQuotaDefaultLimitGB=10 +# Default per-topic backlog quota time limit in second, less than 0 means no limitation. default is -1. +backlogQuotaDefaultLimitSecond=-1 + # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0 @@ -90,7 +102,7 @@ brokerDeleteInactiveTopicsFrequencySeconds=60 # Max pending publish requests per connection to avoid keeping large number of pending # requests in memory. Default: 1000 -maxPendingPublishdRequestsPerConnection=1000 +maxPendingPublishRequestsPerConnection=1000 # How frequently to proactively check and purge expired messages messageExpiryCheckIntervalInMinutes=5 @@ -139,6 +151,12 @@ brokerDeduplicationProducerInactivityTimeoutMinutes=360 # value will be used as the default defaultNumberOfNamespaceBundles=4 +# Max number of topics allowed to be created in the namespace. When the topics reach the max topics of the namespace, +# the broker should reject the new topic request(include topic auto-created by the producer or consumer) +# until the number of connected consumers decrease. +# Using a value of 0, is disabling maxTopicsPerNamespace-limit check. +maxTopicsPerNamespace=0 + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false @@ -173,6 +191,9 @@ maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 # it uses more CPU to perform frequent check. (Disable publish throttling with value 0) topicPublisherThrottlingTickTimeMillis=2 +# Enable precise rate limit for topic publish +preciseTopicPublishRateLimiterEnable=false + # Tick time to schedule task that checks broker publish rate limiting across all topics # Reducing to lower value can give more accuracy while throttling publish but # it uses more CPU to perform frequent check. (Disable publish throttling with value 0) @@ -229,6 +250,12 @@ enableNonPersistentTopics=true # Using a value of 0, is disabling maxProducersPerTopic-limit check. maxProducersPerTopic=0 +# Max number of producers with the same IP address allowed to connect to topic. +# Once this limit reaches, Broker will reject new producers until the number of +# connected producers with the same IP address decrease. +# Using a value of 0, is disabling maxSameAddressProducersPerTopic-limit check. +maxSameAddressProducersPerTopic=0 + # Enforce producer to publish encrypted messages.(default disable). encryptionRequireOnProducer=false @@ -237,6 +264,12 @@ encryptionRequireOnProducer=false # Using a value of 0, is disabling maxConsumersPerTopic-limit check. maxConsumersPerTopic=0 +# Max number of consumers with the same IP address allowed to connect to topic. +# Once this limit reaches, Broker will reject new consumers until the number of +# connected consumers with the same IP address decrease. +# Using a value of 0, is disabling maxSameAddressConsumersPerTopic-limit check. +maxSameAddressConsumersPerTopic=0 + # Max number of subscriptions allowed to subscribe to topic. Once this limit reaches, broker will reject # new subscription until the number of subscribed subscriptions decrease. # Using a value of 0, is disabling maxSubscriptionsPerTopic limit check. @@ -259,13 +292,16 @@ tlsEnabled=false tlsCertRefreshCheckDurationSec=300 # Path for the TLS certificate file +tlsCertificateFilePath= # Path for the TLS private key file +tlsKeyFilePath= # Path for the trusted TLS certificate file. # This cert is used to verify that any certs presented by connecting clients # are signed by a certificate authority. If this verification # fails, then the certs are untrusted and the connections are dropped. +tlsTrustCertsFilePath= # Accept untrusted TLS certificate from client. # If true, a client with a cert which cannot be verified with the @@ -275,11 +311,13 @@ tlsAllowInsecureConnection=false # Specify the tls protocols the broker will use to negotiate during TLS handshake # (a comma-separated list of protocol names). -# Examples:- [TLSv1.2, TLSv1.1, TLSv1] +# Examples:- [TLSv1.3, TLSv1.2] +tlsProtocols= # Specify the tls cipher the broker will use to negotiate during TLS Handshake # (a comma-separated list of ciphers). # Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256] +tlsCiphers= # Trusted client certificates are required for to connect TLS # Reject the Connection if the Client Certificate is not trusted. @@ -292,25 +330,31 @@ tlsRequireTrustedClientCertOnConnect=false tlsEnabledWithKeyStore=false # TLS Provider for KeyStore type +tlsProvider= # TLS KeyStore type configuration in broker: JKS, PKCS12 tlsKeyStoreType=JKS # TLS KeyStore path in broker +tlsKeyStore= # TLS KeyStore password for broker +tlsKeyStorePassword= # TLS TrustStore type configuration in broker: JKS, PKCS12 tlsTrustStoreType=JKS # TLS TrustStore path in broker +tlsTrustStore= # TLS TrustStore password for broker +tlsTrustStorePassword= # Whether internal client use KeyStore type to authenticate with Pulsar brokers brokerClientTlsEnabledWithKeyStore=false # The TLS Provider used by internal client to authenticate with other Pulsar brokers +brokerClientSslProvider= # TLS TrustStore type configuration for internal client: JKS, PKCS12 # used by the internal client to authenticate with Pulsar brokers @@ -318,26 +362,30 @@ brokerClientTlsTrustStoreType=JKS # TLS TrustStore path for internal client # used by the internal client to authenticate with Pulsar brokers +brokerClientTlsTrustStore= # TLS TrustStore password for internal client, # used by the internal client to authenticate with Pulsar brokers +brokerClientTlsTrustStorePassword= # Specify the tls cipher the internal client will use to negotiate during TLS Handshake # (a comma-separated list of ciphers) # e.g. [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]. # used by the internal client to authenticate with Pulsar brokers +brokerClientTlsCiphers= # Specify the tls protocols the broker will use to negotiate during TLS handshake # (a comma-separated list of protocol names). -# e.g. [TLSv1.2, TLSv1.1, TLSv1] +# e.g. [TLSv1.3, TLSv1.2] # used by the internal client to authenticate with Pulsar brokers +brokerClientTlsProtocols= # Enable or disable system topic systemTopicEnabled=true # Enable or disable topic level policies, topic level policies depends on the system topic # Please enable the system topic first. -topicLevelPoliciesEnabled=true +topicLevelPoliciesEnabled=false # If a topic remains fenced for this number of seconds, it will be closed forcefully. # If it is set to 0 or a negative number, the fenced topic will not be closed. @@ -346,6 +394,7 @@ topicFencingTimeoutSeconds=0 ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. +proxyRoles= # If this flag is set then the broker authenticates the original Auth data # else it just accepts the originalPrincipal and authorizes it (if required). @@ -354,7 +403,8 @@ authenticateOriginalAuthData=false # Enable authentication authenticationEnabled=false -# Autentication provider name list, which is comma separated list of class names +# Authentication provider name list, which is comma separated list of class names +authenticationProviders= # Enforce authorization authorizationEnabled=false @@ -369,30 +419,63 @@ authorizationAllowWildcardsMatching=false # Role names that are treated as "super-user", meaning they will be able to do all admin # operations and publish/consume from all topics +superUserRoles= # Authentication settings of the broker itself. Used when the broker connects to other brokers, # either in same or other clusters +brokerClientAuthenticationPlugin= +brokerClientAuthenticationParameters= # Supported Athenz provider domain names(comma separated) for authentication +athenzDomainNames= # When this parameter is not empty, unauthenticated users perform as anonymousUserRole +anonymousUserRole= + + +### --- Token Authentication Provider --- ### + +## Symmetric key +# Configure the secret key to be used to validate auth tokens +# The key can be specified like: +# tokenSecretKey=data:;base64,xxxxxxxxx +# tokenSecretKey=file:///my/secret.key ( Note: key file must be DER-encoded ) +tokenSecretKey= + +## Asymmetric public/private key pair +# Configure the public key to be used to validate auth tokens +# The key can be specified like: +# tokenPublicKey=data:;base64,xxxxxxxxx +# tokenPublicKey=file:///my/public.key ( Note: key file must be DER-encoded ) +tokenPublicKey= + # The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank) +tokenAuthClaim= # The token audience "claim" name, e.g. "aud", that will be used to get the audience from token. # If not set, audience will not be verified. +tokenAudienceClaim= # The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this. +tokenAudience= ### --- BookKeeper Client --- ### # Authentication plugin to use when connecting to bookies +bookkeeperClientAuthenticationPlugin= # BookKeeper auth plugin implementatation specifics parameters name and values +bookkeeperClientAuthenticationParametersName= +bookkeeperClientAuthenticationParameters= # Timeout for BK add / read operations bookkeeperClientTimeoutInSeconds=30 +# Number of BookKeeper client worker threads +# Default is Runtime.getRuntime().availableProcessors() +bookkeeperClientNumWorkerThreads= + # Speculative reads are initiated if a read request doesn't complete within a certain time # Using a value of 0, is disabling the speculative reads bookkeeperClientSpeculativeReadTimeoutInMillis=0 @@ -438,12 +521,15 @@ bookkeeperClientReorderReadSequenceEnabled=false # Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie # outside the specified groups will not be used by the broker +bookkeeperClientIsolationGroups= # Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't # have enough bookie available. +bookkeeperClientSecondaryIsolationGroups= # Minimum bookies that should be available as part of bookkeeperClientIsolationGroups # else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. +bookkeeperClientMinAvailableBookiesInIsolationGroups= # Set the client security provider factory class name. # Default: org.apache.bookkeeper.tls.TLSContextFactory @@ -459,14 +545,19 @@ bookkeeperTLSKeyFileType=PEM bookkeeperTLSTrustCertTypes=PEM # Path to file containing keystore password, if the client keystore is password protected. +bookkeeperTLSKeyStorePasswordPath= # Path to file containing truststore password, if the client truststore is password protected. +bookkeeperTLSTrustStorePasswordPath= # Path for the TLS private key file +bookkeeperTLSKeyFilePath= # Path for the TLS certificate file +bookkeeperTLSCertificateFilePath= # Path for the trusted TLS certificate file +bookkeeperTLSTrustCertsFilePath= # Enable/disable disk weight based placement. Default is false bookkeeperDiskWeightBasedPlacementEnabled=false @@ -509,6 +600,7 @@ managedLedgerNumSchedulerThreads=4 # Amount of memory to use for caching data payload in managed ledger. This memory # is allocated from JVM direct memory and it's shared across all the topics # running in the same broker. By default, uses 1/5th of available direct memory +managedLedgerCacheSizeMB= # Whether we should make a copy of the entry payloads when inserting in cache managedLedgerCacheCopyEntries=false @@ -532,7 +624,7 @@ managedLedgerDefaultMarkDeleteRateLimit=0.1 # Max number of entries to append to a ledger before triggering a rollover # A ledger rollover is triggered on these conditions # * Either the max rollover time has been reached -# * or max entries have been written to the ledged and at least min-time +# * or max entries have been written to the ledger and at least min-time # has passed managedLedgerMaxEntriesPerLedger=50000 @@ -593,6 +685,15 @@ managedLedgerPrometheusStatsLatencyRolloverSeconds=60 # Whether trace managed ledger task execution time managedLedgerTraceTaskExecution=true +# If you want to custom bookie ID or use a dynamic network address for the bookie, +# you can set this option. +# Bookie advertises itself using bookieId rather than +# BookieSocketAddress (hostname:port or IP:port). +# bookieId is a non empty string that can contain ASCII digits and letters ([a-zA-Z9-0]), +# colons, dashes, and dots. +# For more information about bookieId, see http://bookkeeper.apache.org/bps/BP-41-bookieid/. +# bookieId= + ### --- Load balancer --- ### loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager @@ -644,37 +745,37 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100 loadBalancerNamespaceMaximumBundles=128 # The broker resource usage threshold. -# When the broker resource usage is gratter than the pulsar cluster average resource usge, +# When the broker resource usage is greater than the pulsar cluster average resource usage, # the threshold shedder will be triggered to offload bundles from the broker. -# It only take effect in ThresholdSheddler strategy. +# It only takes effect in the ThresholdShedder strategy. loadBalancerBrokerThresholdShedderPercentage=10 # When calculating new resource usage, the history usage accounts for. -# It only take effect in ThresholdSheddler strategy. +# It only takes effect in the ThresholdShedder strategy. loadBalancerHistoryResourcePercentage=0.9 -# The BandWithIn usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. +# The BandWithIn usage weight when calculating new resource usage. +# It only takes effect in the ThresholdShedder strategy. loadBalancerBandwithInResourceWeight=1.0 -# The BandWithOut usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. +# The BandWithOut usage weight when calculating new resource usage. +# It only takes effect in the ThresholdShedder strategy. loadBalancerBandwithOutResourceWeight=1.0 -# The CPU usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. +# The CPU usage weight when calculating new resource usage. +# It only takes effect in the ThresholdShedder strategy. loadBalancerCPUResourceWeight=1.0 -# The heap memory usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. +# The heap memory usage weight when calculating new resource usage. +# It only takes effect in the ThresholdShedder strategy. loadBalancerMemoryResourceWeight=1.0 -# The direct memory usage weight when calculating new resourde usage. -# It only take effect in ThresholdShedder strategy. +# The direct memory usage weight when calculating new resource usage. +# It only takes effect in the ThresholdShedder strategy. loadBalancerDirectMemoryResourceWeight=1.0 -# Bundle unload minimum throughput threshold (MB), avoding bundle unload frequently. -# It only take effect in ThresholdShedder strategy. +# Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently. +# It only takes effect in the ThresholdShedder strategy. loadBalancerBundleUnloadMinThroughputThreshold=10 ### --- Replication --- ### @@ -725,7 +826,13 @@ webSocketMaxTextFrameSize=1048576 # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true +# Time in milliseconds that metrics endpoint would time out. Default is 30s. +# Increase it if there are a lot of topics to expose topic-level metrics. +# Set it to 0 to disable timeout. +metricsServletTimeoutMs=30000 + # Classname of Pluggable JVM GC metrics logger that can log GC specific metrics +# jvmGCMetricsLoggerClassName= ### --- Broker Web Stats --- ### @@ -740,6 +847,7 @@ exposePreciseBacklogInPrometheus=false ### --- Deprecated config variables --- ### # Deprecated. Use configurationStoreServers +globalZookeeperServers= # Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds brokerServicePurgeInactiveFrequencyInSeconds=60 @@ -756,10 +864,12 @@ nettyMaxFrameSizeBytes=5253120 # For good performance, it should be big enough to hold a substantial amount # of entries in the flush interval # By default it will be allocated to 1/4th of the available direct memory +dbStorage_writeCacheMaxSizeMb= # Size of Read cache. Memory is allocated from JVM direct memory. # This read cache is pre-filled doing read-ahead whenever a cache miss happens # By default it will be allocated to 1/4th of the available direct memory +dbStorage_readAheadCacheMaxSizeMb= # How many entries to pre-fill in cache after a read cache miss dbStorage_readAheadCacheBatchSize=1000 @@ -774,6 +884,7 @@ flushInterval=60000 # should be big enough to hold a significant portion of the index # database which can reach ~2GB in some cases # Default is to use 10% of the direct memory size +dbStorage_rocksDB_blockCacheSize= # Other RocksDB specific tunables dbStorage_rocksDB_writeBufferSizeMB=4 @@ -839,5 +950,31 @@ allowAutoSubscriptionCreation=true defaultNumPartitions=1 ### --- Transaction config variables --- ### -; transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider +# Enable transaction coordinator in broker transactionCoordinatorEnabled=true +; transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider +transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider + +# Transaction buffer take snapshot transaction count +transactionBufferSnapshotMaxTransactionCount=1000 + +# Transaction buffer take snapshot interval time +# Unit : millisecond +transactionBufferSnapshotMinTimeInMillis=5000 + +### --- Packages management service configuration variables (begin) --- ### + +# Enable the packages management service or not +enablePackagesManagement=false + +# The packages management service storage service provide +packagesManagementStorageProvider=org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageProvider + +# When the packages storage provider is bookkeeper, you can use this configuration to +# control the number of replicas for storing the package +packagesReplicas=1 + +# The bookkeeper ledger root path +packagesManagementLedgerRootPath=/ledgers + +### --- Packages management service configuration variables (end) --- ###