diff --git a/pom.xml b/pom.xml
index 2ccf7289..055e2b5b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
io.streamnative.connectors
pulsar-flink-parent
- 2.7.7-SNAPSHOT
+ 1.13.1.0
StreamNative :: Pulsar Flink Connector :: Root
https://pulsar.apache.org
@@ -74,7 +74,7 @@
2.8.0
- 1.12.3
+ 1.13.1
2.11
1.18.18
0.16
diff --git a/pulsar-flink-connector-shade-2.11/pom.xml b/pulsar-flink-connector-shade-2.11/pom.xml
index 6ccea88f..d4fce5e9 100644
--- a/pulsar-flink-connector-shade-2.11/pom.xml
+++ b/pulsar-flink-connector-shade-2.11/pom.xml
@@ -21,7 +21,7 @@
io.streamnative.connectors
pulsar-flink-parent
- 2.7.7-SNAPSHOT
+ 1.13.1.0
pulsar-flink-connector_2.11
diff --git a/pulsar-flink-connector-shade-2.12/pom.xml b/pulsar-flink-connector-shade-2.12/pom.xml
index 7db9f17c..c67ec0d4 100644
--- a/pulsar-flink-connector-shade-2.12/pom.xml
+++ b/pulsar-flink-connector-shade-2.12/pom.xml
@@ -21,7 +21,7 @@
io.streamnative.connectors
pulsar-flink-parent
- 2.7.7-SNAPSHOT
+ 1.13.1.0
pulsar-flink-connector_2.12
diff --git a/pulsar-flink-connector/pom.xml b/pulsar-flink-connector/pom.xml
index d065a9a5..e147b6b1 100644
--- a/pulsar-flink-connector/pom.xml
+++ b/pulsar-flink-connector/pom.xml
@@ -21,7 +21,7 @@
io.streamnative.connectors
pulsar-flink-parent
- 2.7.7-SNAPSHOT
+ 1.13.1.0
pulsar-flink-connector-origin
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index c3e483c4..9fe38e7f 100644
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ b/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -142,7 +142,7 @@ public void addReader(int subtaskId) {
}
@Override
- public PulsarSourceEnumeratorState snapshotState() throws Exception {
+ public PulsarSourceEnumeratorState snapshotState(long l) throws Exception {
return new PulsarSourceEnumeratorState(readerIdToSplitAssignments);
}
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java
index 3bdee765..9d7c9dc0 100644
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java
+++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java
@@ -36,12 +36,13 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
+import static org.apache.flink.table.catalog.CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX;
/**
* catalog support.
@@ -87,7 +88,7 @@ public List getTopics(String databaseName) throws PulsarAdminException {
return pulsarMetadataReader.getTopics(databaseName);
}
- public CatalogTableImpl getTableSchema(ObjectPath tablePath,
+ public CatalogTable getTableSchema(ObjectPath tablePath,
Map properties)
throws PulsarAdminException, IncompatibleSchemaException {
String topicName = objectPath2TopicName(tablePath);
@@ -163,7 +164,7 @@ private SchemaInfo tableSchemaToPulsarSchema(String format, TableSchema schema,
return SchemaUtils.tableSchemaToSchemaInfo(format, physicalRowDataType, options);
}
- private CatalogTableImpl schemaToCatalogTable(SchemaInfo pulsarSchema,
+ private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema,
ObjectPath tablePath,
Map flinkProperties)
throws IncompatibleSchemaException {
@@ -182,10 +183,20 @@ private CatalogTableImpl schemaToCatalogTable(SchemaInfo pulsarSchema,
properties.putAll(flinkProperties);
properties.remove(IS_CATALOG_TOPIC);
String comment = properties.remove(PulsarCatalogSupport.COMMENT);
- return new CatalogTableImpl(tableSchema, partitionKeys, properties, comment);
+ return CatalogTable.of(
+ tableSchema.toSchema(),
+ comment,
+ partitionKeys,
+ properties
+ );
} else {
final TableSchema tableSchema = schemaTranslator.pulsarSchemaToTableSchema(pulsarSchema);
- return new CatalogTableImpl(tableSchema, flinkProperties, "");
+ return CatalogTable.of(
+ tableSchema.toSchema(),
+ "",
+ Collections.emptyList(),
+ flinkProperties
+ );
}
}
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/util/DataTypeUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/util/DataTypeUtils.java
index 3e87dc8a..6d4a48e6 100644
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/util/DataTypeUtils.java
+++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/util/DataTypeUtils.java
@@ -14,7 +14,6 @@
package org.apache.flink.streaming.connectors.pulsar.util;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
@@ -33,8 +32,4 @@ public static Optional> extractType(DataType dataType) {
}
return Optional.empty();
}
-
- public static DataType toDataType(Class> clazz) {
- return DataTypes.RAW(TypeInformationUtils.getTypesAsRow(clazz));
- }
}
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchema.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchema.java
index dbcf43c4..3bb049b0 100644
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchema.java
+++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchema.java
@@ -55,7 +55,7 @@ public Schema getSchema() {
@Override
public V deserialize(Message message) throws IOException {
- return valueDeserializer.deserialize(message.getData());
+ return message.getValue();
}
@Override
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchemaBuilder.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchemaBuilder.java
index 46087af2..4bc6eed5 100644
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchemaBuilder.java
+++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchemaBuilder.java
@@ -15,8 +15,6 @@
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.pulsar.util.TypeInformationUtils;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.pulsar.client.api.Schema;
@@ -60,9 +58,6 @@ public PulsarDeserializationSchemaBuilder setRecordClass(Class recordClass
}
public PulsarDeserializationSchema build() {
- if (dataType == null) {
- dataType = DataTypes.RAW(TypeInformationUtils.getTypesAsRow(recordClass)).bridgedTo(recordClass);
- }
- return new PulsarDeserializationSchemaWrapper<>(valueDeserializer, dataType);
+ throw new UnsupportedOperationException("PulsarDeserializationSchemaBuilder is deprecated, use PulsarDeserializationSchema#valueOnly.");
}
}
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java
index aac71799..20ad3472 100644
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java
+++ b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java
@@ -68,6 +68,8 @@ public class PulsarCatalog extends GenericInMemoryCatalog {
private PulsarCatalogSupport catalogSupport;
+ public static final String DEFAULT_DB = "public/default";
+
public PulsarCatalog(String adminUrl, String catalogName, Map props, String defaultDatabase) {
super(catalogName, defaultDatabase);
this.adminUrl = adminUrl;
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogDescriptor.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogDescriptor.java
deleted file mode 100644
index 952e1d0a..00000000
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogDescriptor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.pulsar;
-
-import org.apache.flink.table.descriptors.CatalogDescriptor;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Map;
-
-import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_PULSAR_VERSION;
-import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_TYPE_VALUE_PULSAR;
-
-/**
- * Pulsar {@CatalogDescriptor}.
- */
-public class PulsarCatalogDescriptor extends CatalogDescriptor {
-
- private String pulsarVersion;
-
- public PulsarCatalogDescriptor() {
- super(CATALOG_TYPE_VALUE_PULSAR, 1, "public/default");
- }
-
- public PulsarCatalogDescriptor pulsarVersion(String pulsarVersion) {
- Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(pulsarVersion));
- this.pulsarVersion = pulsarVersion;
-
- return this;
- }
-
- @Override
- protected Map toCatalogProperties() {
- DescriptorProperties props = new DescriptorProperties();
-
- if (pulsarVersion != null) {
- props.putString(CATALOG_PULSAR_VERSION, pulsarVersion);
- }
-
- return props.asMap();
- }
-}
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogValidator.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogValidator.java
deleted file mode 100644
index 577cb3a6..00000000
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogValidator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.pulsar;
-
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
-import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.FormatDescriptorValidator;
-
-/**
- * Pulsar {@CatalogDescriptorValidator}.
- */
-public class PulsarCatalogValidator extends CatalogDescriptorValidator {
-
- public static final String CATALOG_TYPE_VALUE_PULSAR = "pulsar";
- public static final String CATALOG_PULSAR_VERSION = "pulsar-version";
- public static final String CATALOG_SERVICE_URL =
- PulsarTableOptions.SERVICE_URL.key();
- public static final String CATALOG_ADMIN_URL =
- PulsarTableOptions.ADMIN_URL.key();
- public static final String CATALOG_STARTUP_MODE =
- PulsarTableOptions.SCAN_STARTUP_MODE.key();
- public static final String CATALOG_DEFAULT_PARTITIONS = PulsarOptions.DEFAULT_PARTITIONS;
-
- @Override
- public void validate(DescriptorProperties properties) {
- super.validate(properties);
- properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_PULSAR, false);
- properties.validateString(CATALOG_PULSAR_VERSION, true, 1);
- properties.validateString(CATALOG_SERVICE_URL, false, 1);
- properties.validateString(CATALOG_ADMIN_URL, false, 1);
- properties.validateInt(CATALOG_DEFAULT_PARTITIONS, true, 1);
- properties.validateString(FormatDescriptorValidator.FORMAT, false);
- validateStartingOffsets(properties);
- }
-
- private void validateStartingOffsets(DescriptorProperties properties) {
- if (properties.containsKey(CATALOG_STARTUP_MODE)) {
- String v = properties.getString(CATALOG_STARTUP_MODE);
- if (!v.equals("earliest") && !v.equals("latest")) {
- throw new ValidationException(CATALOG_STARTUP_MODE + " should be either earliest or latest");
- }
- }
- }
-}
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogDescriptor.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogDescriptor.java
deleted file mode 100644
index d133c228..00000000
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogDescriptor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.pulsar.descriptors;
-
-import org.apache.flink.table.descriptors.CatalogDescriptor;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Map;
-
-import static org.apache.flink.table.catalog.pulsar.descriptors.PulsarCatalogValidator.CATALOG_PULSAR_VERSION;
-import static org.apache.flink.table.catalog.pulsar.descriptors.PulsarCatalogValidator.CATALOG_TYPE_VALUE_PULSAR;
-
-/**
- * Pulsar {@CatalogDescriptor}.
- */
-public class PulsarCatalogDescriptor extends CatalogDescriptor {
-
- private String pulsarVersion;
-
- public PulsarCatalogDescriptor() {
- super(CATALOG_TYPE_VALUE_PULSAR, 1, "public/default");
- }
-
- public PulsarCatalogDescriptor pulsarVersion(String pulsarVersion) {
- Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(pulsarVersion));
- this.pulsarVersion = pulsarVersion;
-
- return this;
- }
-
- @Override
- protected Map toCatalogProperties() {
- DescriptorProperties props = new DescriptorProperties();
-
- if (pulsarVersion != null) {
- props.putString(CATALOG_PULSAR_VERSION, pulsarVersion);
- }
-
- return props.asMap();
- }
-}
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java
deleted file mode 100644
index b1686036..00000000
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.pulsar.descriptors;
-
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-
-/**
- * Pulsar {@CatalogDescriptorValidator}.
- */
-public class PulsarCatalogValidator extends CatalogDescriptorValidator {
-
- public static final String CATALOG_TYPE_VALUE_PULSAR = "pulsar";
- public static final String CATALOG_PULSAR_VERSION = "pulsar-version";
- public static final String CATALOG_SERVICE_URL = PulsarOptions.SERVICE_URL_OPTION_KEY;
- public static final String CATALOG_ADMIN_URL = PulsarOptions.ADMIN_URL_OPTION_KEY;
- public static final String CATALOG_STARTUP_MODE = PulsarOptions.STARTUP_MODE_OPTION_KEY;
- public static final String CATALOG_DEFAULT_PARTITIONS = PulsarOptions.DEFAULT_PARTITIONS;
-
- @Override
- public void validate(DescriptorProperties properties) {
- super.validate(properties);
- properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_PULSAR, false);
- properties.validateString(CATALOG_PULSAR_VERSION, true, 1);
- properties.validateString(CATALOG_SERVICE_URL, false, 1);
- properties.validateString(CATALOG_ADMIN_URL, false, 1);
- properties.validateInt(CATALOG_DEFAULT_PARTITIONS, true, 1);
- validateStartingOffsets(properties);
- }
-
- private void validateStartingOffsets(DescriptorProperties properties) {
- if (properties.containsKey(CATALOG_STARTUP_MODE)) {
- String v = properties.getString(CATALOG_STARTUP_MODE);
- if (!v.equals("earliest") && !v.equals("latest")) {
- throw new ValidationException(CATALOG_STARTUP_MODE + " should be either earliest or latest");
- }
- }
- }
-}
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java
index 00e5fc8b..83e4500a 100644
--- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java
+++ b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java
@@ -14,29 +14,30 @@
package org.apache.flink.table.catalog.pulsar.factories;
-import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.pulsar.PulsarCatalog;
-import org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
-import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES_PREFIX;
-import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_ADMIN_URL;
-import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_DEFAULT_PARTITIONS;
-import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_PULSAR_VERSION;
-import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_SERVICE_URL;
-import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_STARTUP_MODE;
-import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_TYPE_VALUE_PULSAR;
-import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.ADMIN_URL;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS_PREFIX;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_MODE;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_SEMANTIC;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FIELDS_INCLUDE;
+import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FORMAT;
+import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.DEFAULT_DATABASE;
+import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.DEFAULT_PARTITIONS;
+import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.IDENTIFIER;
+import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.PULSAR_VERSION;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
/**
* Pulsar {@CatalogFactory}.
@@ -44,46 +45,46 @@
public class PulsarCatalogFactory implements CatalogFactory {
@Override
- public Catalog createCatalog(String name, Map properties) {
- DescriptorProperties dp = getValidateProperties(properties);
- String defaultDB = dp.getOptionalString(CATALOG_DEFAULT_DATABASE).orElse("public/default");
- String adminUrl = dp.getString(CATALOG_ADMIN_URL);
- return new PulsarCatalog(adminUrl, name, dp.asMap(), defaultDB);
+ public String factoryIdentifier() {
+ return IDENTIFIER;
}
@Override
- public Map requiredContext() {
- HashMap context = new HashMap<>();
- context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_PULSAR);
- context.put(CATALOG_PROPERTY_VERSION, "1");
- return context;
+ public Catalog createCatalog(Context context) {
+ final FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtil.createCatalogFactoryHelper(this, context);
+ helper.validate();
+ return new PulsarCatalog(
+ helper.getOptions().get(ADMIN_URL),
+ context.getName(),
+ context.getOptions(),
+ helper.getOptions().get(DEFAULT_DATABASE));
}
@Override
- public List supportedProperties() {
- List props = new ArrayList();
- props.add(CATALOG_DEFAULT_DATABASE);
- props.add(CATALOG_PULSAR_VERSION);
- props.add(CATALOG_SERVICE_URL);
- props.add(CATALOG_ADMIN_URL);
- props.add(CATALOG_STARTUP_MODE);
- props.add(CATALOG_DEFAULT_PARTITIONS);
- props.add(PulsarTableOptions.KEY_FORMAT.key());
- props.add(PulsarTableOptions.KEY_FIELDS.key());
- props.add(PulsarTableOptions.KEY_FIELDS_PREFIX.key());
- props.add(PulsarTableOptions.VALUE_FORMAT.key());
- props.add(PulsarTableOptions.VALUE_FIELDS_INCLUDE.key());
- props.add(PulsarTableOptions.SINK_SEMANTIC.key());
- props.add(FormatDescriptorValidator.FORMAT);
- props.add(FormatDescriptorValidator.FORMAT + ".*");
- props.add(PROPERTIES_PREFIX + "*");
- return props;
+ public Set> requiredOptions() {
+ final Set> options = new HashSet<>();
+ options.add(ADMIN_URL);
+ options.add(SERVICE_URL);
+ return options;
}
- private DescriptorProperties getValidateProperties(Map properties) {
- DescriptorProperties dp = new DescriptorProperties();
- dp.putProperties(properties);
- new PulsarCatalogValidator().validate(dp);
- return dp;
+ @Override
+ public Set> optionalOptions() {
+ Set> props = new HashSet<>();
+ props.add(DEFAULT_DATABASE);
+ props.add(PROPERTY_VERSION);
+ props.add(SCAN_STARTUP_MODE);
+ props.add(DEFAULT_PARTITIONS);
+ props.add(KEY_FORMAT);
+ props.add(KEY_FIELDS);
+ props.add(KEY_FIELDS_PREFIX);
+ props.add(VALUE_FORMAT);
+ props.add(VALUE_FIELDS_INCLUDE);
+ props.add(SINK_SEMANTIC);
+ props.add(PULSAR_VERSION);
+ props.add(FactoryUtil.FORMAT);
+ props.add(PROPERTIES);
+ return props;
}
}
diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactoryOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactoryOptions.java
new file mode 100644
index 00000000..ad5d1aa3
--- /dev/null
+++ b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactoryOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.pulsar.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.pulsar.PulsarCatalog;
+
+import org.apache.pulsar.PulsarVersion;
+
+/**
+ * {@link ConfigOption}s for {@link PulsarCatalog}.
+ */
+@Internal
+public final class PulsarCatalogFactoryOptions {
+
+ public static final String IDENTIFIER = "pulsar";
+
+ public static final ConfigOption DEFAULT_DATABASE =
+ ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+ .stringType()
+ .defaultValue(PulsarCatalog.DEFAULT_DB);
+
+ public static final ConfigOption DEFAULT_PARTITIONS =
+ ConfigOptions.key("table-default-partitions")
+ .intType()
+ .defaultValue(5);
+
+ public static final ConfigOption PULSAR_VERSION =
+ ConfigOptions.key("pulsar-version")
+ .stringType()
+ .defaultValue(PulsarVersion.getVersion());
+
+ private PulsarCatalogFactoryOptions() {
+ }
+}
diff --git a/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index b187e544..a2f93b8a 100644
--- a/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableFactory
org.apache.flink.streaming.connectors.pulsar.table.UpsertPulsarDynamicTableFactory
+org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactory
org.apache.flink.formats.atomic.AtomicRowDataFormatFactory
\ No newline at end of file
diff --git a/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
deleted file mode 100644
index abba4aac..00000000
--- a/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ /dev/null
@@ -1,15 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactory
diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java
index 22e0f889..a8f269f0 100644
--- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java
+++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java
@@ -15,8 +15,9 @@
package org.apache.flink.streaming.connectors.pulsar;
import org.apache.flink.client.cli.DefaultCLI;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.testutils.EnvironmentFileUtil;
import org.apache.flink.streaming.connectors.pulsar.testutils.FailingIdentityMapper;
@@ -28,13 +29,13 @@
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.pulsar.PulsarCatalog;
import org.apache.flink.table.client.config.Environment;
-import org.apache.flink.table.client.gateway.SessionContext;
-import org.apache.flink.table.client.gateway.local.ExecutionContext;
+import org.apache.flink.table.client.gateway.context.DefaultContext;
+import org.apache.flink.table.client.gateway.context.ExecutionContext;
+import org.apache.flink.table.client.gateway.context.SessionContext;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
-import org.apache.commons.cli.Options;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -83,10 +84,13 @@ public class CatalogITest extends PulsarTestBaseWithFlink {
private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-pulsar-catalog.yaml";
private static final String CATALOGS_ENVIRONMENT_FILE_START = "test-sql-client-pulsar-start-catalog.yaml";
+ private static ClusterClient> clusterClient;
+
@Before
public void clearStates() {
SingletonStreamSink.clear();
FailingIdentityMapper.failedBefore = false;
+ clusterClient = flink.getClusterClient();
}
@Test(timeout = 40 * 1000L)
@@ -222,6 +226,7 @@ public void run() {
Thread.sleep(2000);
SingletonStreamSink.compareWithList(
INTEGER_LIST.subList(0, INTEGER_LIST.size() - 1).stream().map(Objects::toString)
+ .map(s -> "+I[" + s + "]")
.collect(Collectors.toList()));
}
@@ -265,6 +270,7 @@ public void run() {
SingletonStreamSink.compareWithList(
INTEGER_LIST.subList(0, INTEGER_LIST.size() - 1).stream().map(Objects::toString)
+ .map(s -> "+I[" + s + "]")
.collect(Collectors.toList()));
}
@@ -312,6 +318,8 @@ public void testAvroTableSink() throws Exception {
ExecutionContext context = createExecutionContext(CATALOGS_ENVIRONMENT_FILE_START, conf);
TableEnvironment tableEnv = context.getTableEnvironment();
+ tableEnv.getConfig()
+ .addConfiguration(new Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 1));
tableEnv.useCatalog(pulsarCatalog1);
@@ -382,7 +390,6 @@ public void testJsonTableSink() throws Exception {
ExecutionContext context = createExecutionContext(CATALOGS_ENVIRONMENT_FILE_START, conf);
TableEnvironment tableEnv = context.getTableEnvironment();
-
tableEnv.useCatalog(pulsarCatalog1);
String sinkDDL = "create table " + tableSinkName + "(\n" +
@@ -396,7 +403,8 @@ public void testJsonTableSink() throws Exception {
" ('oid3', 30, 'cid3'),\n" +
" ('oid4', 10, 'cid4')";
- tableEnv.executeSql(sinkDDL).print();
+ tableEnv.executeSql(sinkDDL).await(10, TimeUnit.SECONDS);
+
tableEnv.executeSql(insertQ);
List result = consumeMessage(tableSinkName, Schema.AUTO_CONSUME(), 4, 10);
@@ -404,6 +412,30 @@ public void testJsonTableSink() throws Exception {
assertEquals(4, result.size());
}
+ @Test(timeout = 40 * 10000L)
+ public void testCreateTopic() throws Exception {
+
+ String tableSinkTopic = newTopic("tableSink");
+ String tableSinkName = TopicName.get(tableSinkTopic).getLocalName();
+ String pulsarCatalog1 = "pulsarcatalog3";
+
+ Map conf = getStreamingConfs();
+ conf.put("$VAR_STARTING", "earliest");
+ conf.put("$VAR_FORMAT", "json");
+
+ ExecutionContext context = createExecutionContext(CATALOGS_ENVIRONMENT_FILE_START, conf);
+ TableEnvironment tableEnv = context.getTableEnvironment();
+ tableEnv.useCatalog(pulsarCatalog1);
+
+ String sinkDDL = "create table " + tableSinkName + "(\n" +
+ " oid STRING,\n" +
+ " totalprice INT,\n" +
+ " customerid STRING\n" +
+ ")";
+ tableEnv.executeSql(sinkDDL).await(10, TimeUnit.SECONDS);
+ assertTrue(Arrays.asList(tableEnv.listTables()).contains(tableSinkName));
+ }
+
@NotNull
private List consumeMessage(String topic, Schema schema, int count, int timeout)
throws InterruptedException, ExecutionException, TimeoutException {
@@ -466,15 +498,15 @@ private ExecutionContext createExecutionContext(String file, Map
final Environment env = EnvironmentFileUtil.parseModified(
file,
replaceVars);
- final Configuration flinkConfig = new Configuration();
- return ExecutionContext.builder(
- env,
- new SessionContext("test-session", new Environment()),
- Collections.emptyList(),
- flinkConfig,
- new DefaultClusterClientServiceLoader(),
- new Options(),
- Collections.singletonList(new DefaultCLI())).build();
+
+ DefaultContext defaultContext =
+ new DefaultContext(
+ env,
+ new ArrayList<>(),
+ clusterClient.getFlinkConfiguration(),
+ Collections.singletonList(new DefaultCLI()));
+ SessionContext sessionContext = SessionContext.create(defaultContext, "test-session");
+ return sessionContext.getExecutionContext();
}
private Map getStreamingConfs() {
diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java
index a3070384..a12eaf55 100644
--- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java
+++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java
@@ -26,10 +26,10 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.atomic.AtomicRowDataDeserializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -1090,7 +1090,7 @@ private void produceIntoPulsar(DataStream stream, DataType dt, Properti
public static SerializationSchema getJsonSerializationSchema(RowType rowType) {
return new JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601, JsonOptions.MapNullKeyMode.DROP,
- "");
+ "" , true);
}
private class AssertSink extends FlinkPulsarSink {
diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java
index ae90ecb0..4783286c 100644
--- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java
+++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java
@@ -28,6 +28,7 @@
import org.apache.flink.streaming.connectors.pulsar.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.pulsar.testutils.PulsarTableTestUtils;
import org.apache.flink.streaming.connectors.pulsar.testutils.SingletonStreamSink;
+import org.apache.flink.streaming.connectors.pulsar.testutils.TestUtils;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableColumn;
@@ -36,7 +37,6 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.test.util.TestUtils;
import org.apache.flink.types.Row;
import com.google.protobuf.ByteString;
@@ -116,18 +116,12 @@ public void testBasicFunctioning() throws Exception {
tEnv.executeSql(createTableSql(tableName, table, tSchema, "atomic")).print();
- Table t = tEnv.scan(tableName).select("value");
-
- tEnv.toAppendStream(t, t.getSchema().toRowType())
+ Table t = tEnv.sqlQuery("select `value` from " + tableName);
+ tEnv.toDataStream(t, Boolean.class)
.map(new FailingIdentityMapper<>(BOOLEAN_LIST.size()))
.addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1);
- try {
- see.execute("basic functionality");
- } catch (Exception e) {
-
- }
-
+ TestUtils.tryExecute(see, "basic functionality");
SingletonStreamSink.compareWithList(
BOOLEAN_LIST.subList(0, BOOLEAN_LIST.size() - 1).stream().map(Objects::toString)
.collect(Collectors.toList()));
@@ -162,15 +156,11 @@ public void testWriteThenRead() throws Exception {
tEnv.executeSql(createTableSql(tableName, tp, tSchema, "json")).print();
Table t = tEnv.sqlQuery("select i, f, bar from " + tableName);
- tEnv.toAppendStream(t, t.getSchema().toRowType())
- .map(new FailingIdentityMapper(fooList.size()))
+ tEnv.toDataStream(t, SchemaData.Foo.class)
+ .map(new FailingIdentityMapper<>(fooList.size()))
.addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1);
- try {
- env.execute("count elements from topics");
- } catch (Exception e) {
-
- }
+ TestUtils.tryExecute(env, "count elements from topics");
SingletonStreamSink.compareWithList(
fooList.subList(0, fooList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList()));
}
@@ -188,12 +178,12 @@ public void testStructTypesInJson() throws Exception {
TableSchema tSchema = getTableSchema(table);
tEnv.executeSql(createTableSql(tableName, table, tSchema, "json")).print();
- Table t = tEnv.scan(tableName).select("i, f, bar");
- tEnv.toAppendStream(t, t.getSchema().toRowType())
- .map(new FailingIdentityMapper(fooList.size()))
+ Table t = tEnv.sqlQuery("select i, f, bar from " + tableName);
+ tEnv.toDataStream(t, SchemaData.Foo.class)
+ .map(new FailingIdentityMapper<>(fooList.size()))
.addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1);
- TestUtils.tryExecute(see, "test struct in avro");
+ TestUtils.tryExecute(see, "test struct in json");
SingletonStreamSink.compareWithList(
fooList.subList(0, fooList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList()));
}
@@ -211,16 +201,11 @@ public void testStructTypesWithJavaList() throws Exception {
TableSchema tSchema = getTableSchema(table);
tEnv.executeSql(createTableSql(tableName, table, tSchema, "json")).print();
- Table t = tEnv.scan(tableName).select("l");
- tEnv.toAppendStream(t, t.getSchema().toRowType())
- .map(new FailingIdentityMapper(flList.size()))
+ Table t = tEnv.sqlQuery("select l from " + tableName);
+ tEnv.toDataStream(t, SchemaData.FL.class)
+ .map(new FailingIdentityMapper<>(flList.size()))
.addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1);
-
- try {
- see.execute("test struct in avro");
- } catch (Exception e) {
- log.error("", e);
- }
+ TestUtils.tryExecute(see, "test struct in json");
SingletonStreamSink.compareWithList(
flList.subList(0, flList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList()));
}
@@ -250,16 +235,13 @@ public void testStructTypesWithJavaArray() throws Exception {
tEnv.executeSql(createTableSql(tableName, table, tSchema, "json")).print();
- Table t = tEnv.scan(tableName).select("l");
- tEnv.toAppendStream(t, t.getSchema().toRowType())
- .map(new FailingIdentityMapper(faList.size()))
+ Table t = tEnv.sqlQuery("select l from " + tableName);
+ tEnv.toDataStream(t, SchemaData.FA.class)
+ .map(new FailingIdentityMapper<>(faList.size()))
.addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1);
- try {
- see.execute("test struct in avro");
- } catch (Exception e) {
+ TestUtils.tryExecute(see, "test struct in avro");
- }
SingletonStreamSink.compareWithList(
faList.subList(0, faList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList()));
}
@@ -278,17 +260,13 @@ public void testStructTypesWithJavaMap() throws Exception {
tEnv.executeSql(createTableSql(tableName, table, tSchema, "json")).print();
- Table t = tEnv.scan(tableName).select("m");
-
- tEnv.toAppendStream(t, t.getSchema().toRowType())
- .map(new FailingIdentityMapper(faList.size()))
+ Table t = tEnv.sqlQuery("select m from " + tableName);
+ tEnv.toDataStream(t, SchemaData.FM.class)
+ .map(new FailingIdentityMapper<>(faList.size()))
.addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1);
- try {
- see.execute("test struct in avro");
- } catch (Exception e) {
+ TestUtils.tryExecute(see, "test struct in avro");
- }
SingletonStreamSink.compareWithList(
fmList.subList(0, fmList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList()));
}
@@ -304,7 +282,7 @@ public void testSimpleSQLWork() throws Exception {
Assert.fail();
} catch (ValidationException e) {
// success
- } catch (Exception e){
+ } catch (Exception e) {
log.error("test fail", e);
Assert.fail(e.getMessage());
}
@@ -322,7 +300,7 @@ public void testProtobufSQLWork() throws Exception {
Map map = new HashMap<>();
map.put("protobuf.message-class-name", SimpleTest.class.getCanonicalName());
String extendParamStr = "";
- if (map != null && !map.isEmpty()){
+ if (map != null && !map.isEmpty()) {
extendParamStr = map.entrySet().stream()
.map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(",\n"));
@@ -394,7 +372,7 @@ public void testSimpleSQL(String format, Map extend) throws Exce
String topic = newTopic();
final String createTable;
String extendParamStr = "";
- if (extend != null && !extend.isEmpty()){
+ if (extend != null && !extend.isEmpty()) {
extendParamStr = extend.entrySet().stream()
.map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(",\n"));
diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java
index 4bc45c0a..8296733c 100644
--- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java
+++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java
@@ -14,15 +14,12 @@
package org.apache.flink.streaming.connectors.pulsar;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.pulsar.source.BrokerPartition;
import org.apache.flink.connector.pulsar.source.StartOffsetInitializer;
import org.apache.flink.connector.pulsar.source.StopCondition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.util.PulsarAdminUtils;
-import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.streaming.util.TestStreamEnvironment;
@@ -161,15 +158,6 @@ public static void shutDownServices() throws Exception {
log.info("-------------------------------------------------------------------------");
}
- protected static Configuration getFlinkConfiguration() {
- Configuration flinkConfig = new Configuration();
-
- flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "16m");
- flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." +
- ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
- return flinkConfig;
- }
-
public static List sendTypedMessages(
String topic,
SchemaType type,
diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBaseWithFlink.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBaseWithFlink.java
index f107f860..d42f6356 100644
--- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBaseWithFlink.java
+++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBaseWithFlink.java
@@ -16,6 +16,11 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.TableColumn;
@@ -57,6 +62,18 @@ public void noJobIsRunning() throws Exception {
waitUntilNoJobIsRunning(client);
}
+ protected static Configuration getFlinkConfiguration() {
+ Configuration flinkConfig = new Configuration();
+
+ flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "16m");
+ flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." +
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
+ flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+ flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
+ flinkConfig.setBoolean(WebOptions.SUBMIT_ENABLE, false);
+ return flinkConfig;
+ }
+
public static void waitUntilJobIsRunning(ClusterClient> client) throws Exception {
while (getRunningJobs(client).isEmpty()) {
Thread.sleep(50);
diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java
index 6e6b0d8a..227f4ab0 100644
--- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java
+++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java
@@ -20,10 +20,10 @@
import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.formats.protobuf.PbRowTypeInformation;
import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema;
import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
@@ -157,7 +157,7 @@ public void testJsonSerializeDeserialize() throws Exception {
JsonRowDataSerializationSchema serializationSchema =
new JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601, JsonOptions.MapNullKeyMode.DROP,
- "");
+ "", true);
serializationSchema.open(null);
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(rowType, typeInfo,
false, false, TimestampFormat.ISO_8601);
diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java
index 8ecf8749..16e1f784 100644
--- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java
+++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java
@@ -29,8 +29,8 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
-import org.apache.flink.util.StringUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaType;
@@ -77,7 +77,7 @@ public void testBooleanRead() throws Exception {
@Test(timeout = 100 * 1000L)
public void testBooleanWrite() throws Exception {
- checkWrite(SchemaType.BOOLEAN, DataTypes.BOOLEAN(), BOOLEAN_LIST, null, null);
+ checkWrite(SchemaType.BOOLEAN, DataTypes.BOOLEAN(), BOOLEAN_LIST, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -87,7 +87,7 @@ public void testINT32Read() throws Exception {
@Test(timeout = 100 * 1000L)
public void testINT32Write() throws Exception {
- checkWrite(SchemaType.INT32, DataTypes.INT(), INTEGER_LIST, null, null);
+ checkWrite(SchemaType.INT32, DataTypes.INT(), INTEGER_LIST, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -97,7 +97,7 @@ public void testINT64Read() throws Exception {
@Test(timeout = 100 * 1000L)
public void testINT64Write() throws Exception {
- checkWrite(SchemaType.INT64, DataTypes.BIGINT(), INT_64_LIST, null, null);
+ checkWrite(SchemaType.INT64, DataTypes.BIGINT(), INT_64_LIST, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -107,7 +107,8 @@ public void testStringRead() throws Exception {
@Test(timeout = 100 * 1000L)
public void testStringWrite() throws Exception {
- checkWrite(SchemaType.STRING, DataTypes.STRING(), STRING_LIST, null, null);
+
+ checkWrite(SchemaType.STRING, DataTypes.STRING(), STRING_LIST, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -117,7 +118,7 @@ public void testByteRead() throws Exception {
@Test
public void testByteWrite() throws Exception {
- checkWrite(SchemaType.INT8, DataTypes.TINYINT(), INT_8_LIST, null, null);
+ checkWrite(SchemaType.INT8, DataTypes.TINYINT(), INT_8_LIST, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -127,7 +128,7 @@ public void testShortRead() throws Exception {
@Test(timeout = 100 * 1000L)
public void testShortWrite() throws Exception {
- checkWrite(SchemaType.INT16, DataTypes.SMALLINT(), INT_16_LIST, null, null);
+ checkWrite(SchemaType.INT16, DataTypes.SMALLINT(), INT_16_LIST, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -137,7 +138,7 @@ public void testFloatRead() throws Exception {
@Test(timeout = 100 * 1000L)
public void testFloatWrite() throws Exception {
- checkWrite(SchemaType.FLOAT, DataTypes.FLOAT(), FLOAT_LIST, null, null);
+ checkWrite(SchemaType.FLOAT, DataTypes.FLOAT(), FLOAT_LIST, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -147,7 +148,7 @@ public void testDoubleRead() throws Exception {
@Test(timeout = 100 * 1000L)
public void testDoubleWrite() throws Exception {
- checkWrite(SchemaType.DOUBLE, DataTypes.DOUBLE(), DOUBLE_LIST, null, null);
+ checkWrite(SchemaType.DOUBLE, DataTypes.DOUBLE(), DOUBLE_LIST, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -160,7 +161,7 @@ public void testDateRead() throws Exception {
public void testDateWrite() throws Exception {
checkWrite(SchemaType.LOCAL_DATE,
DataTypes.DATE(),
- localDateList, null, null);
+ localDateList, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -172,7 +173,7 @@ public void testTimestampRead() throws Exception {
@Test(timeout = 100 * 1000L)
public void testTimestampWrite() throws Exception {
checkWrite(SchemaType.LOCAL_DATE_TIME,
- DataTypes.TIMESTAMP(3), localDateTimeList, null, null);
+ DataTypes.TIMESTAMP(3), localDateTimeList, obj -> Row.of(obj).toString(), null);
}
@Test(timeout = 100 * 1000L)
@@ -182,7 +183,7 @@ public void testByteArrayRead() throws Exception {
@Test(timeout = 100 * 1000L)
public void testByteArrayWrite() throws Exception {
- checkWrite(SchemaType.BYTES, DataTypes.BYTES(), BYTES_LIST, t -> StringUtils.arrayAwareToString(t), null);
+ checkWrite(SchemaType.BYTES, DataTypes.BYTES(), BYTES_LIST, obj -> Row.of(obj).toString(), null);
}
private void checkRead(SchemaType type, DataType dt, List datas, Function toStr, Class tClass)
diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactoryTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactoryTest.java
index f3c8e374..7e245220 100644
--- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactoryTest.java
+++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactoryTest.java
@@ -16,7 +16,6 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
@@ -24,12 +23,11 @@
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.util.KeyHashMessageRouterImpl;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
@@ -39,7 +37,7 @@
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.factories.TestFormatFactory.DecodingFormatMock;
import org.apache.flink.table.factories.TestFormatFactory.EncodingFormatMock;
@@ -68,9 +66,12 @@
import java.util.function.Consumer;
import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -115,20 +116,33 @@ public class PulsarDynamicTableFactoryTest extends TestLogger {
private static final String PROPS_SCAN_OFFSETS =
MessageId.earliest.toString();
- private static final TableSchema SCHEMA = TableSchema.builder()
- .add(TableColumn.physical(NAME, DataTypes.STRING()))
- .add(TableColumn.physical(COUNT, DataTypes.DECIMAL(38, 18)))
- .add(TableColumn.physical(EVENT_TIME, DataTypes.TIMESTAMP(3)))
- .add(TableColumn.computed(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION))
- .watermark(EVENT_TIME, WATERMARK_EXPRESSION, WATERMARK_DATATYPE)
- .build();
-
- private static final TableSchema SCHEMA_WITH_METADATA = TableSchema.builder()
- .add(TableColumn.physical(NAME, DataTypes.STRING()))
- .add(TableColumn.physical(COUNT, DataTypes.DECIMAL(38, 18)))
- .add(TableColumn.metadata(EVENT_TIME, DataTypes.TIMESTAMP(3), "eventTime"))
- .add(TableColumn.metadata(METADATA_TOPIC, DataTypes.STRING(), "value.metadata_2"))
- .build();
+ private static final ResolvedSchema SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical(NAME, DataTypes.STRING().notNull()),
+ Column.physical(COUNT, DataTypes.DECIMAL(38, 18)),
+ Column.physical(EVENT_TIME, DataTypes.TIMESTAMP(3)),
+ Column.computed(
+ COMPUTED_COLUMN_NAME,
+ ResolvedExpressionMock.of(
+ COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION))),
+ Collections.singletonList(
+ WatermarkSpec.of(
+ EVENT_TIME,
+ ResolvedExpressionMock.of(
+ WATERMARK_DATATYPE, WATERMARK_EXPRESSION))),
+ null);
+
+ private static final ResolvedSchema SCHEMA_WITH_METADATA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical(NAME, DataTypes.STRING()),
+ Column.physical(COUNT, DataTypes.DECIMAL(38, 18)),
+ Column.metadata(EVENT_TIME, DataTypes.TIMESTAMP(3), "eventTime", false),
+ Column.metadata(
+ METADATA, DataTypes.STRING(), "value.metadata_2", false)),
+ Collections.emptyList(),
+ null);
private static final DataType SCHEMA_DATA_TYPE = SCHEMA.toPhysicalRowDataType();
@@ -248,7 +262,7 @@ public void testTableSourceWithKeyValueAndMetadata() {
final PulsarDynamicTableSource actualPulsarSource = (PulsarDynamicTableSource) actualSource;
// initialize stateful testing formats
actualPulsarSource.applyReadableMetadata(Arrays.asList("eventTime", "value.metadata_2"),
- SCHEMA_WITH_METADATA.toRowDataType());
+ SCHEMA_WITH_METADATA.toSourceRowDataType());
actualPulsarSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
final DecodingFormatMock expectedKeyFormat = new DecodingFormatMock(
@@ -293,7 +307,7 @@ public void testTableSourceWithKeyValueAndMetadata() {
PULSAR_SOURCE_PROPERTIES,
startupOptions
);
- expectedPulsarSource.producedDataType = SCHEMA_WITH_METADATA.toRowDataType();
+ expectedPulsarSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedPulsarSource.metadataKeys = Collections.singletonList("eventTime");
assertEquals(actualSource, expectedPulsarSource);
@@ -457,14 +471,11 @@ public void testSinkWithTopicListOrTopicPattern() {
@Test
public void testPrimaryKeyValidation() {
- final TableSchema pkSchema = TableSchema.builder()
- .field(NAME, DataTypes.STRING().notNull())
- .field(COUNT, DataTypes.DECIMAL(38, 18))
- .field(EVENT_TIME, DataTypes.TIMESTAMP(3))
- .field(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION)
- .watermark(EVENT_TIME, WATERMARK_EXPRESSION, WATERMARK_DATATYPE)
- .primaryKey(NAME)
- .build();
+ final ResolvedSchema pkSchema =
+ new ResolvedSchema(
+ SCHEMA.getColumns(),
+ SCHEMA.getWatermarkSpecs(),
+ UniqueConstraint.primaryKey(NAME, Collections.singletonList(NAME)));
Map options1 = getModifiedOptions(
getBasicSourceOptions(),
@@ -483,7 +494,7 @@ public void testPrimaryKeyValidation() {
createTableSink(pkSchema, getBasicSinkOptions());
fail();
} catch (Throwable t) {
- String error = "The Pulsar table 'default.default.sinkTable' with 'test-format' format" +
+ String error = "The Pulsar table 'default.default.t1' with 'test-format' format" +
" doesn't support defining PRIMARY KEY constraint on the table, because it can't" +
" guarantee the semantic of primary key.";
assertEquals(error, t.getCause().getMessage());
@@ -493,7 +504,7 @@ public void testPrimaryKeyValidation() {
createTableSource(pkSchema, getBasicSinkOptions());
fail();
} catch (Throwable t) {
- String error = "The Pulsar table 'default.default.scanTable' with 'test-format' format" +
+ String error = "The Pulsar table 'default.default.t1' with 'test-format' format" +
" doesn't support defining PRIMARY KEY constraint on the table, because it can't" +
" guarantee the semantic of primary key.";
assertEquals(error, t.getCause().getMessage());
@@ -563,36 +574,6 @@ private static PulsarDynamicTableSink createExpectedSink(
KeyHashMessageRouterImpl.INSTANCE);
}
- private static DynamicTableSource createTableSource(TableSchema schema, Map options) {
- final ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
- "default",
- "default",
- "scanTable");
- final CatalogTable catalogTable = new CatalogTableImpl(schema, options, "scanTable");
- return FactoryUtil.createTableSource(
- null,
- objectIdentifier,
- catalogTable,
- new Configuration(),
- Thread.currentThread().getContextClassLoader(),
- false);
- }
-
- private static DynamicTableSink createTableSink(TableSchema schema, Map options) {
- final ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
- "default",
- "default",
- "sinkTable");
- final CatalogTable catalogTable = new CatalogTableImpl(schema, options, "sinkTable");
- return FactoryUtil.createTableSink(
- null,
- objectIdentifier,
- catalogTable,
- new Configuration(),
- Thread.currentThread().getContextClassLoader(),
- false);
- }
-
/**
* Returns the full options modified by the given consumer {@code optionModifier}.
*
diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/UpsertPulsarDynamicTableFactoryTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/UpsertPulsarDynamicTableFactoryTest.java
index 6864e904..e76213f2 100644
--- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/UpsertPulsarDynamicTableFactoryTest.java
+++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/UpsertPulsarDynamicTableFactoryTest.java
@@ -16,7 +16,6 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
@@ -24,11 +23,10 @@
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.util.KeyHashMessageRouterImpl;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
@@ -38,7 +36,6 @@
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
@@ -52,6 +49,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -59,9 +57,11 @@
import java.util.function.Consumer;
import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -77,22 +77,28 @@ public class UpsertPulsarDynamicTableFactoryTest extends TestLogger {
private static final String ADMIN_URL = "http://127.0.0.1:8080";
- private static final TableSchema SOURCE_SCHEMA = TableSchema.builder()
- .field("window_start", DataTypes.STRING().notNull())
- .field("region", DataTypes.STRING().notNull())
- .field("view_count", DataTypes.BIGINT())
- .primaryKey("window_start", "region")
- .build();
+ private static final ResolvedSchema SOURCE_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("window_start", DataTypes.STRING().notNull()),
+ Column.physical("region", DataTypes.STRING().notNull()),
+ Column.physical("view_count", DataTypes.BIGINT())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("name", Arrays.asList("window_start", "region")));
+
private static final int[] SOURCE_KEY_FIELDS = new int[]{0, 1};
private static final int[] SOURCE_VALUE_FIELDS = new int[]{0, 1, 2};
- private static final TableSchema SINK_SCHEMA = TableSchema.builder()
- .field("region", new AtomicDataType(new VarCharType(false, 100)))
- .field("view_count", DataTypes.BIGINT())
- .primaryKey("region")
- .build();
+ private static final ResolvedSchema SINK_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical(
+ "region", new AtomicDataType(new VarCharType(false, 100))),
+ Column.physical("view_count", DataTypes.BIGINT())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("name", Collections.singletonList("region")));
private static final int[] SINK_KEY_FIELDS = new int[]{0};
@@ -109,23 +115,28 @@ public class UpsertPulsarDynamicTableFactoryTest extends TestLogger {
// UPSERT_PULSAR_SINK_PROPERTIES.setProperty("service-url", SERVICE_URL);
}
+ protected static DecodingFormat> keyDecodingFormat =
+ new TestFormatFactory.DecodingFormatMock(
+ ",", true, ChangelogMode.insertOnly(), Collections.emptyMap());
+
+ protected static DecodingFormat> valueDecodingFormat =
+ new TestFormatFactory.DecodingFormatMock(
+ ",", true, ChangelogMode.insertOnly(), Collections.emptyMap());
+
+ protected static EncodingFormat> keyEncodingFormat =
+ new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
+ protected static EncodingFormat> valueEncodingFormat =
+ new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
+
+
@Rule
public ExpectedException thrown = ExpectedException.none();
@Test
public void testTableSource() {
final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
-
- DecodingFormat> keyDecodingFormat =
- new TestFormatFactory.DecodingFormatMock(
- ",", true, ChangelogMode.insertOnly(), Collections.emptyMap());
-
- DecodingFormat> valueDecodingFormat =
- new TestFormatFactory.DecodingFormatMock(
- ",", true, ChangelogMode.insertOnly(), Collections.emptyMap());
-
// Construct table source using options and table source factory
- final DynamicTableSource actualSource = createActualSource(SOURCE_SCHEMA, getFullSourceOptions());
+ final DynamicTableSource actualSource = createTableSource(SOURCE_SCHEMA, getFullSourceOptions());
final PulsarDynamicTableSource expectedSource = createExpectedScanSource(
producedDataType,
@@ -149,13 +160,9 @@ public void testTableSource() {
@Test
public void testTableSink() {
- EncodingFormat> keyEncodingFormat =
- new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
- EncodingFormat> valueEncodingFormat =
- new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
// Construct table sink using options and table sink factory.
- final DynamicTableSink actualSink = createActualSink(SINK_SCHEMA, getFullSinkOptions());
+ final DynamicTableSink actualSink = createTableSink(SINK_SCHEMA, getFullSinkOptions());
final DynamicTableSink expectedSink = createExpectedSink(
SINK_SCHEMA.toPhysicalRowDataType(),
@@ -188,12 +195,7 @@ public void testTableSinkWithParallelism() {
final Map modifiedOptions = getModifiedOptions(
getFullSinkOptions(),
options -> options.put("sink.parallelism", "100"));
- final DynamicTableSink actualSink = createActualSink(SINK_SCHEMA, modifiedOptions);
-
- EncodingFormat> keyEncodingFormat =
- new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
- EncodingFormat> valueEncodingFormat =
- new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
+ final DynamicTableSink actualSink = createTableSink(SINK_SCHEMA, modifiedOptions);
final DynamicTableSink expectedSink = createExpectedSink(
SINK_SCHEMA.toPhysicalRowDataType(),
@@ -228,12 +230,12 @@ public void testCreateSourceTableWithoutPK() {
"The PRIMARY KEY specifies which columns should be read from or write to the Pulsar message key. " +
"The PRIMARY KEY also defines records in the 'upsert-pulsar' table should update or delete on which keys.")));
- TableSchema illegalSchema = TableSchema.builder()
- .field("window_start", DataTypes.STRING())
- .field("region", DataTypes.STRING())
- .field("view_count", DataTypes.BIGINT())
- .build();
- createActualSource(illegalSchema, getFullSinkOptions());
+ ResolvedSchema illegalSchema =
+ ResolvedSchema.of(
+ Column.physical("window_start", DataTypes.STRING()),
+ Column.physical("region", DataTypes.STRING()),
+ Column.physical("view_count", DataTypes.BIGINT()));
+ createTableSource(illegalSchema, getFullSourceOptions());
}
@Test
@@ -243,11 +245,11 @@ public void testCreateSinkTableWithoutPK() {
"The PRIMARY KEY specifies which columns should be read from or write to the Pulsar message key. " +
"The PRIMARY KEY also defines records in the 'upsert-pulsar' table should update or delete on which keys.")));
- TableSchema illegalSchema = TableSchema.builder()
- .field("region", DataTypes.STRING())
- .field("view_count", DataTypes.BIGINT())
- .build();
- createActualSink(illegalSchema, getFullSinkOptions());
+ ResolvedSchema illegalSchema =
+ ResolvedSchema.of(
+ Column.physical("region", DataTypes.STRING()),
+ Column.physical("view_count", DataTypes.BIGINT()));
+ createTableSink(illegalSchema, getFullSinkOptions());
}
@Test
@@ -260,7 +262,7 @@ public void testSerWithCDCFormatAsValue() {
TestFormatFactory.IDENTIFIER, TestFormatFactory.IDENTIFIER
))));
- createActualSink(SINK_SCHEMA,
+ createTableSink(SINK_SCHEMA,
getModifiedOptions(
getFullSinkOptions(),
options -> options.put(
@@ -279,7 +281,7 @@ public void testDeserWithCDCFormatAsValue() {
TestFormatFactory.IDENTIFIER
))));
- createActualSource(SOURCE_SCHEMA,
+ createTableSource(SOURCE_SCHEMA,
getModifiedOptions(
getFullSinkOptions(),
options -> options.put(
@@ -352,39 +354,6 @@ private static Map getFullSinkOptions() {
return options;
}
- private static DynamicTableSource createActualSource(TableSchema schema, Map options) {
- ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
- "default",
- "default",
- "sourceTable");
- final CatalogTable sourceTable =
- new CatalogTableImpl(schema, options, "sinkTable");
-
- return FactoryUtil.createTableSource(
- null,
- objectIdentifier,
- sourceTable,
- new Configuration(),
- Thread.currentThread().getContextClassLoader(),
- false);
- }
-
- private static DynamicTableSink createActualSink(TableSchema schema, Map options) {
- ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
- "default",
- "default",
- "sinkTable");
- final CatalogTable sinkTable =
- new CatalogTableImpl(schema, options, "sinkTable");
- return FactoryUtil.createTableSink(
- null,
- objectIdentifier,
- sinkTable,
- new Configuration(),
- Thread.currentThread().getContextClassLoader(),
- false);
- }
-
private static PulsarDynamicTableSource createExpectedScanSource(
DataType producedDataType,
DecodingFormat> keyDecodingFormat,
diff --git a/pulsar-flink-sql-connector-2.11/pom.xml b/pulsar-flink-sql-connector-2.11/pom.xml
index f1f69f9e..816010d6 100644
--- a/pulsar-flink-sql-connector-2.11/pom.xml
+++ b/pulsar-flink-sql-connector-2.11/pom.xml
@@ -21,7 +21,7 @@
io.streamnative.connectors
pulsar-flink-parent
- 2.7.7-SNAPSHOT
+ 1.13.1.0
pulsar-flink-sql-connector_2.11
diff --git a/pulsar-flink-sql-connector-2.12/pom.xml b/pulsar-flink-sql-connector-2.12/pom.xml
index 4ae4a749..daeef6c5 100644
--- a/pulsar-flink-sql-connector-2.12/pom.xml
+++ b/pulsar-flink-sql-connector-2.12/pom.xml
@@ -21,7 +21,7 @@
io.streamnative.connectors
pulsar-flink-parent
- 2.7.7-SNAPSHOT
+ 1.13.1.0
pulsar-flink-sql-connector_2.12