Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
Bump pulsar version to 2.8.0 (#359)
Browse files Browse the repository at this point in the history
* bump pulsar version to 2.8.0
  • Loading branch information
Jianyun Zhao authored Jun 21, 2021
1 parent a1a11a4 commit 2a57809
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 65 deletions.
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<testRetryCount>3</testRetryCount>

<!-- use Pulsar stable version -->
<pulsar.version>2.7.2</pulsar.version>
<pulsar.version>2.8.0</pulsar.version>
<flink.version>1.12.3</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<lombok.version>1.18.18</lombok.version>
Expand All @@ -90,6 +90,7 @@
<mockito.version>3.7.7</mockito.version>
<hamcrest.version>1.3</hamcrest.version>
<junit.version>4.13.2</junit.version>
<jna.version>5.7.0</jna.version>

<!-- plugin dependencies -->
<maven.version>3.5.4</maven.version>
Expand All @@ -114,6 +115,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>${jna.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public TypeInformation<OUT> getProducedType() {
public SourceReader<OUT, PulsarPartitionSplit> createReader(SourceReaderContext readerContext) {
FutureCompletingBlockingQueue<RecordsWithSplitIds<ParsedMessage<OUT>>> elementsQueue =
new FutureCompletingBlockingQueue<>();
ExecutorProvider listenerExecutor = new ExecutorProvider(1, r -> new Thread(r, "Pulsar listener executor"));
ExecutorProvider listenerExecutor = new ExecutorProvider(1, "Pulsar listener executor");
Closer splitCloser = Closer.create();
splitCloser.register(listenerExecutor::shutdownNow);
Supplier<SplitReader<ParsedMessage<OUT>, PulsarPartitionSplit>> splitReaderSupplier = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static <T, R, C extends CompletableFuture<R>, E extends Exception> void p

for (int index = 0; index < asyncFutures.size(); index++) {
try {
R result = asyncFutures.get(index).get(PulsarAdmin.DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
R result = asyncFutures.get(index).get(PulsarAdminImpl.DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
consumer.accept(elements.get(index), result);
} catch (ExecutionException e) {
E cause = exceptionClass.cast(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ protected void recoverAndCommit(PulsarTransactionState<T> transaction) {
log.debug("transaction {} is recoverAndCommit...", transaction.transactionalId);
TransactionCoordinatorClientImpl tcClient = CachedPulsarClient.getOrCreate(clientConfigurationData).getTcClient();
TxnID transactionalId = transaction.transactionalId;
tcClient.commit(transactionalId, transaction.pendingMessages);
tcClient.commit(transactionalId);
} catch (PulsarClientException executionException) {
log.error("Failed to getOrCreate a PulsarClient");
throw new RuntimeException(executionException);
Expand All @@ -522,7 +522,7 @@ protected void recoverAndAbort(PulsarTransactionState<T> transaction) {
log.debug("transaction {} is recoverAndAbort...", transaction.transactionalId);
TransactionCoordinatorClientImpl tcClient = CachedPulsarClient.getOrCreate(clientConfigurationData).getTcClient();
TxnID transactionalId = transaction.transactionalId;
tcClient.abort(transactionalId, transaction.pendingMessages);
tcClient.abort(transactionalId);
} catch (PulsarClientException executionException) {
log.error("Failed to getOrCreate a PulsarClient");
throw new RuntimeException(executionException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void putSchema(ObjectPath tablePath, CatalogBaseTable table, String forma

// Writing schemaInfo#properties causes the client to fail to consume it when it is a Pulsar native type.
if (!StringUtils.equals(format, AtomicRowDataFormatFactory.IDENTIFIER)) {
schemaInfo.setProperties(extractedProperties(table));
((SchemaInfoImpl) schemaInfo).setProperties(extractedProperties(table));
}
pulsarMetadataReader.putSchema(topicName, schemaInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ public MessageId getPositionFromSubscription(TopicRange topic, MessageId default
try {
String subscriptionName = subscriptionNameFrom(topic);
TopicStats topicStats = admin.topics().getStats(topic.getTopic());
if (topicStats.subscriptions.containsKey(subscriptionName)) {
SubscriptionStats subStats = topicStats.subscriptions.get(subscriptionName);
if (subStats.consumers.size() != 0) {
if (topicStats.getSubscriptions().containsKey(subscriptionName)) {
SubscriptionStats subStats = topicStats.getSubscriptions().get(subscriptionName);
if (subStats.getConsumers().size() != 0) {
throw new RuntimeException("Subscription been actively used by other consumers, " +
"in this situation, the exactly-once semantics cannot be guaranteed.");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
Expand Down Expand Up @@ -123,15 +124,15 @@ public static boolean compatibleSchema(SchemaInfo s1, SchemaInfo s2) {

static GenericSchema<GenericRecord> avroSchema2PulsarSchema(Schema avroSchema) {
byte[] schemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8);
SchemaInfo si = new SchemaInfo();
SchemaInfoImpl si = new SchemaInfoImpl();
si.setName("Avro");
si.setSchema(schemaBytes);
si.setType(SchemaType.AVRO);
return org.apache.pulsar.client.api.Schema.generic(si);
}

public static SchemaInfo emptySchemaInfo() {
return SchemaInfo.builder()
public static SchemaInfoImpl emptySchemaInfo() {
return SchemaInfoImpl.builder()
.name("empty")
.type(SchemaType.NONE)
.schema(new byte[0])
Expand Down Expand Up @@ -160,7 +161,7 @@ public static SchemaInfo buildRowSchema(DataType dataType,
RecordSchemaType recordSchemaType) {
org.apache.avro.Schema avroSchema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
byte[] schemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8);
SchemaInfo si = new SchemaInfo();
SchemaInfoImpl si = new SchemaInfoImpl();
si.setSchema(schemaBytes);
switch (recordSchemaType) {
case AVRO:
Expand Down Expand Up @@ -247,9 +248,9 @@ private static <T extends GeneratedMessageV3> SchemaInfo getProtobufSchemaInfo(S
}
}

public static SchemaInfo getSchemaInfo(SchemaType type, DataType dataType) {
public static SchemaInfoImpl getSchemaInfo(SchemaType type, DataType dataType) {
byte[] schemaBytes = getAvroSchema(dataType).toString().getBytes(StandardCharsets.UTF_8);
return SchemaInfo.builder()
return SchemaInfoImpl.builder()
.name("Record")
.schema(schemaBytes)
.type(type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.protobuf.Descriptors;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -98,7 +99,7 @@ public static org.apache.pulsar.client.api.Schema sqlType2PulsarSchema(DataType

static GenericSchema<GenericRecord> avroSchema2PulsarSchema(Schema avroSchema) {
byte[] schemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8);
SchemaInfo si = new SchemaInfo();
SchemaInfoImpl si = new SchemaInfoImpl();
si.setName("Avro");
si.setSchema(schemaBytes);
si.setType(SchemaType.AVRO);
Expand Down Expand Up @@ -215,7 +216,7 @@ private static Schema sqlType2AvroSchema(DataType flinkType, boolean nullable,
}

public static SchemaInfo emptySchemaInfo() {
return SchemaInfo.builder()
return SchemaInfoImpl.builder()
.name("empty")
.type(SchemaType.NONE)
.schema(new byte[0])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ public void testDatabases() throws Exception {

try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getAdminUrl()).build()) {
admin.tenants().createTenant("tn1",
new TenantInfo(Sets.newHashSet(), Sets.newHashSet("standalone")));
TenantInfo.builder()
.adminRoles(Sets.newHashSet())
.allowedClusters(Sets.newHashSet("standalone"))
.build()
);
for (String ns : namespaces) {
admin.namespaces().createNamespace(ns);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class PulsarAuthTest {
@BeforeClass
public static void prepare() throws Exception {
log.info(" Starting PulsarTestBase ");
final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.7.0");
final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.8.0");
DockerImageName pulsar = DockerImageName.parse(pulsarImage)
.asCompatibleSubstituteFor("apachepulsar/pulsar");
pulsarService = new PulsarContainer(pulsar);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public static void prepare() throws Exception {
if (StringUtils.isNotBlank(adminUrl) && StringUtils.isNotBlank(serviceUrl)) {
log.info(" Use extend Pulsar Service ");
} else {
final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.7.0");
final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.8.0");
DockerImageName pulsar = DockerImageName.parse(pulsarImage)
.asCompatibleSubstituteFor("apachepulsar/pulsar");
pulsarService = new PulsarContainer(pulsar);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,21 @@
import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
import org.apache.flink.streaming.connectors.pulsar.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.pulsar.testutils.IntegerSource;
import org.apache.flink.streaming.connectors.pulsar.testutils.TestUtils;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.test.util.TestUtils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Sets;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.PulsarContainer;
Expand Down Expand Up @@ -77,7 +72,7 @@ public class PulsarTransactionalSinkTest {
public static void prepare() throws Exception {
log.info(" Starting PulsarTestBase ");

final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.7.0");
final String pulsarImage = System.getProperty("pulsar.systemtest.image", "apachepulsar/pulsar:2.8.0");
DockerImageName pulsar = DockerImageName.parse(pulsarImage)
.asCompatibleSubstituteFor("apachepulsar/pulsar");
pulsarService = new PulsarContainer(pulsar);
Expand Down Expand Up @@ -117,14 +112,7 @@ public static void shutDownServices() throws Exception {
* Tests the exactly-once semantic for the simple writes into Pulsar.
*/
@Test
@Ignore("Pulsar 2.7.1 does not support the use of standalone transactions, requires Pulsar 2.8 version")
public void testExactlyOnceRegularSink() throws Exception {
admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build();
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfo(Sets.newHashSet("app1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);

testExactlyOnce(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.Test;

Expand Down Expand Up @@ -137,7 +138,7 @@ public void testAvroSerializeDeserialize() throws Exception {
final CompletableFuture<byte[]> consumer = autoConsumer(topicName
);

RowData newRowData = deserializationSchema.deserialize(consumer.get(2000, TimeUnit.MILLISECONDS));
RowData newRowData = deserializationSchema.deserialize(consumer.get(10000, TimeUnit.MILLISECONDS));
assertEquals(rowData, newRowData);
}

Expand Down Expand Up @@ -167,7 +168,7 @@ public void testJsonSerializeDeserialize() throws Exception {
rowData);
final CompletableFuture<byte[]> consumer = autoConsumer(topicName);

RowData newRowData = deserializationSchema.deserialize(consumer.get(2000, TimeUnit.MILLISECONDS));
RowData newRowData = deserializationSchema.deserialize(consumer.get(10000, TimeUnit.MILLISECONDS));
assertEquals(rowData, newRowData);
}

Expand Down Expand Up @@ -206,7 +207,7 @@ public void testProtoBufSerializeDeserialize() throws Exception {
sendMessage(topicName, rowDataFlinkSchema, rowData);
final CompletableFuture<byte[]> consumer = autoConsumer(topicName);

RowData newRowData = deserializationSchema.deserialize(consumer.get(2000, TimeUnit.MILLISECONDS));
RowData newRowData = deserializationSchema.deserialize(consumer.get(10000, TimeUnit.MILLISECONDS));
newRowData = validatePbRow(
newRowData, PbRowTypeInformation.generateRowType(SimpleTest.getDescriptor()));
assertEquals(9, newRowData.getArity());
Expand Down Expand Up @@ -292,7 +293,7 @@ public <T> org.apache.pulsar.client.api.Schema<T> toPulsarSchema(SchemaType sche
SerializationSchema<T> serializationSchema,
DeserializationSchema<T> deserializationSchema) {
byte[] schemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8);
SchemaInfo si = new SchemaInfo();
SchemaInfoImpl si = new SchemaInfoImpl();
si.setName("Record");
si.setSchema(schemaBytes);
si.setType(schemaType);
Expand All @@ -301,11 +302,14 @@ public <T> org.apache.pulsar.client.api.Schema<T> toPulsarSchema(SchemaType sche

public void sendMessage(String topic, org.apache.pulsar.client.api.Schema<RowData> schema, RowData data)
throws Exception {
try (
PulsarClient pulsarClient = PulsarClient.builder()
try (PulsarAdmin admin = getPulsarAdmin()){
admin.schemas().createSchema(topic, schema.getSchemaInfo());
}
try (
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
final Producer<RowData> producer = pulsarClient.newProducer(schema)
final Producer<RowData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.create()) {
pulsarClient
Expand Down
Loading

0 comments on commit 2a57809

Please sign in to comment.