From 113afcac069867efa3be3eb4be6743d2fc41e412 Mon Sep 17 00:00:00 2001 From: Jianyun Zhao Date: Mon, 21 Jun 2021 12:21:59 +0800 Subject: [PATCH] Bump flink 1.13.1 (#360) --- pom.xml | 4 +- pulsar-flink-connector-shade-2.11/pom.xml | 2 +- pulsar-flink-connector-shade-2.12/pom.xml | 2 +- pulsar-flink-connector/pom.xml | 2 +- .../enumerator/PulsarSourceEnumerator.java | 2 +- .../pulsar/internal/PulsarCatalogSupport.java | 21 ++- .../connectors/pulsar/util/DataTypeUtils.java | 5 - .../PulsarDeserializationSchema.java | 2 +- .../PulsarDeserializationSchemaBuilder.java | 7 +- .../table/catalog/pulsar/PulsarCatalog.java | 2 + .../pulsar/PulsarCatalogDescriptor.java | 55 ------- .../pulsar/PulsarCatalogValidator.java | 59 -------- .../descriptors/PulsarCatalogDescriptor.java | 55 ------- .../descriptors/PulsarCatalogValidator.java | 53 ------- .../factories/PulsarCatalogFactory.java | 103 ++++++------- .../PulsarCatalogFactoryOptions.java | 50 +++++++ .../org.apache.flink.table.factories.Factory | 1 + ....apache.flink.table.factories.TableFactory | 15 -- .../connectors/pulsar/CatalogITest.java | 62 ++++++-- .../connectors/pulsar/FlinkPulsarITest.java | 4 +- .../pulsar/FlinkPulsarTableITest.java | 74 ++++------ .../connectors/pulsar/PulsarTestBase.java | 12 -- .../pulsar/PulsarTestBaseWithFlink.java | 17 +++ .../RowDataDerSerializationSchemaTest.java | 4 +- .../connectors/pulsar/SchemaITest.java | 25 ++-- .../table/PulsarDynamicTableFactoryTest.java | 109 ++++++-------- .../UpsertPulsarDynamicTableFactoryTest.java | 139 +++++++----------- pulsar-flink-sql-connector-2.11/pom.xml | 2 +- pulsar-flink-sql-connector-2.12/pom.xml | 2 +- 29 files changed, 337 insertions(+), 553 deletions(-) delete mode 100644 pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogDescriptor.java delete mode 100644 pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogValidator.java delete mode 100644 pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogDescriptor.java delete mode 100644 pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java create mode 100644 pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactoryOptions.java delete mode 100644 pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory diff --git a/pom.xml b/pom.xml index 2ccf7289..055e2b5b 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ io.streamnative.connectors pulsar-flink-parent - 2.7.7-SNAPSHOT + 1.13.1.0 StreamNative :: Pulsar Flink Connector :: Root https://pulsar.apache.org @@ -74,7 +74,7 @@ 2.8.0 - 1.12.3 + 1.13.1 2.11 1.18.18 0.16 diff --git a/pulsar-flink-connector-shade-2.11/pom.xml b/pulsar-flink-connector-shade-2.11/pom.xml index 6ccea88f..d4fce5e9 100644 --- a/pulsar-flink-connector-shade-2.11/pom.xml +++ b/pulsar-flink-connector-shade-2.11/pom.xml @@ -21,7 +21,7 @@ io.streamnative.connectors pulsar-flink-parent - 2.7.7-SNAPSHOT + 1.13.1.0 pulsar-flink-connector_2.11 diff --git a/pulsar-flink-connector-shade-2.12/pom.xml b/pulsar-flink-connector-shade-2.12/pom.xml index 7db9f17c..c67ec0d4 100644 --- a/pulsar-flink-connector-shade-2.12/pom.xml +++ b/pulsar-flink-connector-shade-2.12/pom.xml @@ -21,7 +21,7 @@ io.streamnative.connectors pulsar-flink-parent - 2.7.7-SNAPSHOT + 1.13.1.0 pulsar-flink-connector_2.12 diff --git a/pulsar-flink-connector/pom.xml b/pulsar-flink-connector/pom.xml index d065a9a5..e147b6b1 100644 --- a/pulsar-flink-connector/pom.xml +++ b/pulsar-flink-connector/pom.xml @@ -21,7 +21,7 @@ io.streamnative.connectors pulsar-flink-parent - 2.7.7-SNAPSHOT + 1.13.1.0 pulsar-flink-connector-origin diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java index c3e483c4..9fe38e7f 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java @@ -142,7 +142,7 @@ public void addReader(int subtaskId) { } @Override - public PulsarSourceEnumeratorState snapshotState() throws Exception { + public PulsarSourceEnumeratorState snapshotState(long l) throws Exception { return new PulsarSourceEnumeratorState(readerIdToSplitAssignments); } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java index 3bdee765..9d7c9dc0 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java @@ -36,12 +36,13 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX; +import static org.apache.flink.table.catalog.CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX; /** * catalog support. @@ -87,7 +88,7 @@ public List getTopics(String databaseName) throws PulsarAdminException { return pulsarMetadataReader.getTopics(databaseName); } - public CatalogTableImpl getTableSchema(ObjectPath tablePath, + public CatalogTable getTableSchema(ObjectPath tablePath, Map properties) throws PulsarAdminException, IncompatibleSchemaException { String topicName = objectPath2TopicName(tablePath); @@ -163,7 +164,7 @@ private SchemaInfo tableSchemaToPulsarSchema(String format, TableSchema schema, return SchemaUtils.tableSchemaToSchemaInfo(format, physicalRowDataType, options); } - private CatalogTableImpl schemaToCatalogTable(SchemaInfo pulsarSchema, + private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema, ObjectPath tablePath, Map flinkProperties) throws IncompatibleSchemaException { @@ -182,10 +183,20 @@ private CatalogTableImpl schemaToCatalogTable(SchemaInfo pulsarSchema, properties.putAll(flinkProperties); properties.remove(IS_CATALOG_TOPIC); String comment = properties.remove(PulsarCatalogSupport.COMMENT); - return new CatalogTableImpl(tableSchema, partitionKeys, properties, comment); + return CatalogTable.of( + tableSchema.toSchema(), + comment, + partitionKeys, + properties + ); } else { final TableSchema tableSchema = schemaTranslator.pulsarSchemaToTableSchema(pulsarSchema); - return new CatalogTableImpl(tableSchema, flinkProperties, ""); + return CatalogTable.of( + tableSchema.toSchema(), + "", + Collections.emptyList(), + flinkProperties + ); } } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/util/DataTypeUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/util/DataTypeUtils.java index 3e87dc8a..6d4a48e6 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/util/DataTypeUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/util/DataTypeUtils.java @@ -14,7 +14,6 @@ package org.apache.flink.streaming.connectors.pulsar.util; -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.DataType; @@ -33,8 +32,4 @@ public static Optional> extractType(DataType dataType) { } return Optional.empty(); } - - public static DataType toDataType(Class clazz) { - return DataTypes.RAW(TypeInformationUtils.getTypesAsRow(clazz)); - } } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchema.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchema.java index dbcf43c4..3bb049b0 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchema.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchema.java @@ -55,7 +55,7 @@ public Schema getSchema() { @Override public V deserialize(Message message) throws IOException { - return valueDeserializer.deserialize(message.getData()); + return message.getValue(); } @Override diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchemaBuilder.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchemaBuilder.java index 46087af2..4bc6eed5 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchemaBuilder.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarDeserializationSchemaBuilder.java @@ -15,8 +15,6 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.pulsar.util.TypeInformationUtils; -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.apache.pulsar.client.api.Schema; @@ -60,9 +58,6 @@ public PulsarDeserializationSchemaBuilder setRecordClass(Class recordClass } public PulsarDeserializationSchema build() { - if (dataType == null) { - dataType = DataTypes.RAW(TypeInformationUtils.getTypesAsRow(recordClass)).bridgedTo(recordClass); - } - return new PulsarDeserializationSchemaWrapper<>(valueDeserializer, dataType); + throw new UnsupportedOperationException("PulsarDeserializationSchemaBuilder is deprecated, use PulsarDeserializationSchema#valueOnly."); } } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java index aac71799..20ad3472 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java @@ -68,6 +68,8 @@ public class PulsarCatalog extends GenericInMemoryCatalog { private PulsarCatalogSupport catalogSupport; + public static final String DEFAULT_DB = "public/default"; + public PulsarCatalog(String adminUrl, String catalogName, Map props, String defaultDatabase) { super(catalogName, defaultDatabase); this.adminUrl = adminUrl; diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogDescriptor.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogDescriptor.java deleted file mode 100644 index 952e1d0a..00000000 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogDescriptor.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog.pulsar; - -import org.apache.flink.table.descriptors.CatalogDescriptor; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; - -import java.util.Map; - -import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_PULSAR_VERSION; -import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_TYPE_VALUE_PULSAR; - -/** - * Pulsar {@CatalogDescriptor}. - */ -public class PulsarCatalogDescriptor extends CatalogDescriptor { - - private String pulsarVersion; - - public PulsarCatalogDescriptor() { - super(CATALOG_TYPE_VALUE_PULSAR, 1, "public/default"); - } - - public PulsarCatalogDescriptor pulsarVersion(String pulsarVersion) { - Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(pulsarVersion)); - this.pulsarVersion = pulsarVersion; - - return this; - } - - @Override - protected Map toCatalogProperties() { - DescriptorProperties props = new DescriptorProperties(); - - if (pulsarVersion != null) { - props.putString(CATALOG_PULSAR_VERSION, pulsarVersion); - } - - return props.asMap(); - } -} diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogValidator.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogValidator.java deleted file mode 100644 index 577cb3a6..00000000 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalogValidator.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog.pulsar; - -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; -import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.descriptors.CatalogDescriptorValidator; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.FormatDescriptorValidator; - -/** - * Pulsar {@CatalogDescriptorValidator}. - */ -public class PulsarCatalogValidator extends CatalogDescriptorValidator { - - public static final String CATALOG_TYPE_VALUE_PULSAR = "pulsar"; - public static final String CATALOG_PULSAR_VERSION = "pulsar-version"; - public static final String CATALOG_SERVICE_URL = - PulsarTableOptions.SERVICE_URL.key(); - public static final String CATALOG_ADMIN_URL = - PulsarTableOptions.ADMIN_URL.key(); - public static final String CATALOG_STARTUP_MODE = - PulsarTableOptions.SCAN_STARTUP_MODE.key(); - public static final String CATALOG_DEFAULT_PARTITIONS = PulsarOptions.DEFAULT_PARTITIONS; - - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_PULSAR, false); - properties.validateString(CATALOG_PULSAR_VERSION, true, 1); - properties.validateString(CATALOG_SERVICE_URL, false, 1); - properties.validateString(CATALOG_ADMIN_URL, false, 1); - properties.validateInt(CATALOG_DEFAULT_PARTITIONS, true, 1); - properties.validateString(FormatDescriptorValidator.FORMAT, false); - validateStartingOffsets(properties); - } - - private void validateStartingOffsets(DescriptorProperties properties) { - if (properties.containsKey(CATALOG_STARTUP_MODE)) { - String v = properties.getString(CATALOG_STARTUP_MODE); - if (!v.equals("earliest") && !v.equals("latest")) { - throw new ValidationException(CATALOG_STARTUP_MODE + " should be either earliest or latest"); - } - } - } -} diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogDescriptor.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogDescriptor.java deleted file mode 100644 index d133c228..00000000 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogDescriptor.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog.pulsar.descriptors; - -import org.apache.flink.table.descriptors.CatalogDescriptor; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; - -import java.util.Map; - -import static org.apache.flink.table.catalog.pulsar.descriptors.PulsarCatalogValidator.CATALOG_PULSAR_VERSION; -import static org.apache.flink.table.catalog.pulsar.descriptors.PulsarCatalogValidator.CATALOG_TYPE_VALUE_PULSAR; - -/** - * Pulsar {@CatalogDescriptor}. - */ -public class PulsarCatalogDescriptor extends CatalogDescriptor { - - private String pulsarVersion; - - public PulsarCatalogDescriptor() { - super(CATALOG_TYPE_VALUE_PULSAR, 1, "public/default"); - } - - public PulsarCatalogDescriptor pulsarVersion(String pulsarVersion) { - Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(pulsarVersion)); - this.pulsarVersion = pulsarVersion; - - return this; - } - - @Override - protected Map toCatalogProperties() { - DescriptorProperties props = new DescriptorProperties(); - - if (pulsarVersion != null) { - props.putString(CATALOG_PULSAR_VERSION, pulsarVersion); - } - - return props.asMap(); - } -} diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java deleted file mode 100644 index b1686036..00000000 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog.pulsar.descriptors; - -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.descriptors.CatalogDescriptorValidator; -import org.apache.flink.table.descriptors.DescriptorProperties; - -/** - * Pulsar {@CatalogDescriptorValidator}. - */ -public class PulsarCatalogValidator extends CatalogDescriptorValidator { - - public static final String CATALOG_TYPE_VALUE_PULSAR = "pulsar"; - public static final String CATALOG_PULSAR_VERSION = "pulsar-version"; - public static final String CATALOG_SERVICE_URL = PulsarOptions.SERVICE_URL_OPTION_KEY; - public static final String CATALOG_ADMIN_URL = PulsarOptions.ADMIN_URL_OPTION_KEY; - public static final String CATALOG_STARTUP_MODE = PulsarOptions.STARTUP_MODE_OPTION_KEY; - public static final String CATALOG_DEFAULT_PARTITIONS = PulsarOptions.DEFAULT_PARTITIONS; - - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_PULSAR, false); - properties.validateString(CATALOG_PULSAR_VERSION, true, 1); - properties.validateString(CATALOG_SERVICE_URL, false, 1); - properties.validateString(CATALOG_ADMIN_URL, false, 1); - properties.validateInt(CATALOG_DEFAULT_PARTITIONS, true, 1); - validateStartingOffsets(properties); - } - - private void validateStartingOffsets(DescriptorProperties properties) { - if (properties.containsKey(CATALOG_STARTUP_MODE)) { - String v = properties.getString(CATALOG_STARTUP_MODE); - if (!v.equals("earliest") && !v.equals("latest")) { - throw new ValidationException(CATALOG_STARTUP_MODE + " should be either earliest or latest"); - } - } - } -} diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java index 00e5fc8b..83e4500a 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java @@ -14,29 +14,30 @@ package org.apache.flink.table.catalog.pulsar.factories; -import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.pulsar.PulsarCatalog; -import org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.FormatDescriptorValidator; import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.HashSet; +import java.util.Set; -import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES_PREFIX; -import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_ADMIN_URL; -import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_DEFAULT_PARTITIONS; -import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_PULSAR_VERSION; -import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_SERVICE_URL; -import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_STARTUP_MODE; -import static org.apache.flink.table.catalog.pulsar.PulsarCatalogValidator.CATALOG_TYPE_VALUE_PULSAR; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.ADMIN_URL; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_SEMANTIC; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FIELDS_INCLUDE; +import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FORMAT; +import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.DEFAULT_DATABASE; +import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.DEFAULT_PARTITIONS; +import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.IDENTIFIER; +import static org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactoryOptions.PULSAR_VERSION; +import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; /** * Pulsar {@CatalogFactory}. @@ -44,46 +45,46 @@ public class PulsarCatalogFactory implements CatalogFactory { @Override - public Catalog createCatalog(String name, Map properties) { - DescriptorProperties dp = getValidateProperties(properties); - String defaultDB = dp.getOptionalString(CATALOG_DEFAULT_DATABASE).orElse("public/default"); - String adminUrl = dp.getString(CATALOG_ADMIN_URL); - return new PulsarCatalog(adminUrl, name, dp.asMap(), defaultDB); + public String factoryIdentifier() { + return IDENTIFIER; } @Override - public Map requiredContext() { - HashMap context = new HashMap<>(); - context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_PULSAR); - context.put(CATALOG_PROPERTY_VERSION, "1"); - return context; + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); + return new PulsarCatalog( + helper.getOptions().get(ADMIN_URL), + context.getName(), + context.getOptions(), + helper.getOptions().get(DEFAULT_DATABASE)); } @Override - public List supportedProperties() { - List props = new ArrayList(); - props.add(CATALOG_DEFAULT_DATABASE); - props.add(CATALOG_PULSAR_VERSION); - props.add(CATALOG_SERVICE_URL); - props.add(CATALOG_ADMIN_URL); - props.add(CATALOG_STARTUP_MODE); - props.add(CATALOG_DEFAULT_PARTITIONS); - props.add(PulsarTableOptions.KEY_FORMAT.key()); - props.add(PulsarTableOptions.KEY_FIELDS.key()); - props.add(PulsarTableOptions.KEY_FIELDS_PREFIX.key()); - props.add(PulsarTableOptions.VALUE_FORMAT.key()); - props.add(PulsarTableOptions.VALUE_FIELDS_INCLUDE.key()); - props.add(PulsarTableOptions.SINK_SEMANTIC.key()); - props.add(FormatDescriptorValidator.FORMAT); - props.add(FormatDescriptorValidator.FORMAT + ".*"); - props.add(PROPERTIES_PREFIX + "*"); - return props; + public Set> requiredOptions() { + final Set> options = new HashSet<>(); + options.add(ADMIN_URL); + options.add(SERVICE_URL); + return options; } - private DescriptorProperties getValidateProperties(Map properties) { - DescriptorProperties dp = new DescriptorProperties(); - dp.putProperties(properties); - new PulsarCatalogValidator().validate(dp); - return dp; + @Override + public Set> optionalOptions() { + Set> props = new HashSet<>(); + props.add(DEFAULT_DATABASE); + props.add(PROPERTY_VERSION); + props.add(SCAN_STARTUP_MODE); + props.add(DEFAULT_PARTITIONS); + props.add(KEY_FORMAT); + props.add(KEY_FIELDS); + props.add(KEY_FIELDS_PREFIX); + props.add(VALUE_FORMAT); + props.add(VALUE_FIELDS_INCLUDE); + props.add(SINK_SEMANTIC); + props.add(PULSAR_VERSION); + props.add(FactoryUtil.FORMAT); + props.add(PROPERTIES); + return props; } } diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactoryOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactoryOptions.java new file mode 100644 index 00000000..ad5d1aa3 --- /dev/null +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactoryOptions.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.pulsar.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.pulsar.PulsarCatalog; + +import org.apache.pulsar.PulsarVersion; + +/** + * {@link ConfigOption}s for {@link PulsarCatalog}. + */ +@Internal +public final class PulsarCatalogFactoryOptions { + + public static final String IDENTIFIER = "pulsar"; + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue(PulsarCatalog.DEFAULT_DB); + + public static final ConfigOption DEFAULT_PARTITIONS = + ConfigOptions.key("table-default-partitions") + .intType() + .defaultValue(5); + + public static final ConfigOption PULSAR_VERSION = + ConfigOptions.key("pulsar-version") + .stringType() + .defaultValue(PulsarVersion.getVersion()); + + private PulsarCatalogFactoryOptions() { + } +} diff --git a/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index b187e544..a2f93b8a 100644 --- a/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -14,4 +14,5 @@ org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableFactory org.apache.flink.streaming.connectors.pulsar.table.UpsertPulsarDynamicTableFactory +org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactory org.apache.flink.formats.atomic.AtomicRowDataFormatFactory \ No newline at end of file diff --git a/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory deleted file mode 100644 index abba4aac..00000000 --- a/pulsar-flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ /dev/null @@ -1,15 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.apache.flink.table.catalog.pulsar.factories.PulsarCatalogFactory diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java index 22e0f889..a8f269f0 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/CatalogITest.java @@ -15,8 +15,9 @@ package org.apache.flink.streaming.connectors.pulsar; import org.apache.flink.client.cli.DefaultCLI; -import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.pulsar.testutils.EnvironmentFileUtil; import org.apache.flink.streaming.connectors.pulsar.testutils.FailingIdentityMapper; @@ -28,13 +29,13 @@ import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.pulsar.PulsarCatalog; import org.apache.flink.table.client.config.Environment; -import org.apache.flink.table.client.gateway.SessionContext; -import org.apache.flink.table.client.gateway.local.ExecutionContext; +import org.apache.flink.table.client.gateway.context.DefaultContext; +import org.apache.flink.table.client.gateway.context.ExecutionContext; +import org.apache.flink.table.client.gateway.context.SessionContext; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; -import org.apache.commons.cli.Options; import org.apache.commons.io.IOUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -83,10 +84,13 @@ public class CatalogITest extends PulsarTestBaseWithFlink { private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-pulsar-catalog.yaml"; private static final String CATALOGS_ENVIRONMENT_FILE_START = "test-sql-client-pulsar-start-catalog.yaml"; + private static ClusterClient clusterClient; + @Before public void clearStates() { SingletonStreamSink.clear(); FailingIdentityMapper.failedBefore = false; + clusterClient = flink.getClusterClient(); } @Test(timeout = 40 * 1000L) @@ -222,6 +226,7 @@ public void run() { Thread.sleep(2000); SingletonStreamSink.compareWithList( INTEGER_LIST.subList(0, INTEGER_LIST.size() - 1).stream().map(Objects::toString) + .map(s -> "+I[" + s + "]") .collect(Collectors.toList())); } @@ -265,6 +270,7 @@ public void run() { SingletonStreamSink.compareWithList( INTEGER_LIST.subList(0, INTEGER_LIST.size() - 1).stream().map(Objects::toString) + .map(s -> "+I[" + s + "]") .collect(Collectors.toList())); } @@ -312,6 +318,8 @@ public void testAvroTableSink() throws Exception { ExecutionContext context = createExecutionContext(CATALOGS_ENVIRONMENT_FILE_START, conf); TableEnvironment tableEnv = context.getTableEnvironment(); + tableEnv.getConfig() + .addConfiguration(new Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 1)); tableEnv.useCatalog(pulsarCatalog1); @@ -382,7 +390,6 @@ public void testJsonTableSink() throws Exception { ExecutionContext context = createExecutionContext(CATALOGS_ENVIRONMENT_FILE_START, conf); TableEnvironment tableEnv = context.getTableEnvironment(); - tableEnv.useCatalog(pulsarCatalog1); String sinkDDL = "create table " + tableSinkName + "(\n" + @@ -396,7 +403,8 @@ public void testJsonTableSink() throws Exception { " ('oid3', 30, 'cid3'),\n" + " ('oid4', 10, 'cid4')"; - tableEnv.executeSql(sinkDDL).print(); + tableEnv.executeSql(sinkDDL).await(10, TimeUnit.SECONDS); + tableEnv.executeSql(insertQ); List result = consumeMessage(tableSinkName, Schema.AUTO_CONSUME(), 4, 10); @@ -404,6 +412,30 @@ public void testJsonTableSink() throws Exception { assertEquals(4, result.size()); } + @Test(timeout = 40 * 10000L) + public void testCreateTopic() throws Exception { + + String tableSinkTopic = newTopic("tableSink"); + String tableSinkName = TopicName.get(tableSinkTopic).getLocalName(); + String pulsarCatalog1 = "pulsarcatalog3"; + + Map conf = getStreamingConfs(); + conf.put("$VAR_STARTING", "earliest"); + conf.put("$VAR_FORMAT", "json"); + + ExecutionContext context = createExecutionContext(CATALOGS_ENVIRONMENT_FILE_START, conf); + TableEnvironment tableEnv = context.getTableEnvironment(); + tableEnv.useCatalog(pulsarCatalog1); + + String sinkDDL = "create table " + tableSinkName + "(\n" + + " oid STRING,\n" + + " totalprice INT,\n" + + " customerid STRING\n" + + ")"; + tableEnv.executeSql(sinkDDL).await(10, TimeUnit.SECONDS); + assertTrue(Arrays.asList(tableEnv.listTables()).contains(tableSinkName)); + } + @NotNull private List consumeMessage(String topic, Schema schema, int count, int timeout) throws InterruptedException, ExecutionException, TimeoutException { @@ -466,15 +498,15 @@ private ExecutionContext createExecutionContext(String file, Map final Environment env = EnvironmentFileUtil.parseModified( file, replaceVars); - final Configuration flinkConfig = new Configuration(); - return ExecutionContext.builder( - env, - new SessionContext("test-session", new Environment()), - Collections.emptyList(), - flinkConfig, - new DefaultClusterClientServiceLoader(), - new Options(), - Collections.singletonList(new DefaultCLI())).build(); + + DefaultContext defaultContext = + new DefaultContext( + env, + new ArrayList<>(), + clusterClient.getFlinkConfiguration(), + Collections.singletonList(new DefaultCLI())); + SessionContext sessionContext = SessionContext.create(defaultContext, "test-session"); + return sessionContext.getExecutionContext(); } private Map getStreamingConfs() { diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java index a3070384..a12eaf55 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java @@ -26,10 +26,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.atomic.AtomicRowDataDeserializationSchema; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; import org.apache.flink.formats.json.JsonRowDataSerializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.datastream.DataStream; @@ -1090,7 +1090,7 @@ private void produceIntoPulsar(DataStream stream, DataType dt, Properti public static SerializationSchema getJsonSerializationSchema(RowType rowType) { return new JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601, JsonOptions.MapNullKeyMode.DROP, - ""); + "" , true); } private class AssertSink extends FlinkPulsarSink { diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java index ae90ecb0..4783286c 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.connectors.pulsar.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.pulsar.testutils.PulsarTableTestUtils; import org.apache.flink.streaming.connectors.pulsar.testutils.SingletonStreamSink; +import org.apache.flink.streaming.connectors.pulsar.testutils.TestUtils; import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableColumn; @@ -36,7 +37,6 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.test.util.SuccessException; -import org.apache.flink.test.util.TestUtils; import org.apache.flink.types.Row; import com.google.protobuf.ByteString; @@ -116,18 +116,12 @@ public void testBasicFunctioning() throws Exception { tEnv.executeSql(createTableSql(tableName, table, tSchema, "atomic")).print(); - Table t = tEnv.scan(tableName).select("value"); - - tEnv.toAppendStream(t, t.getSchema().toRowType()) + Table t = tEnv.sqlQuery("select `value` from " + tableName); + tEnv.toDataStream(t, Boolean.class) .map(new FailingIdentityMapper<>(BOOLEAN_LIST.size())) .addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1); - try { - see.execute("basic functionality"); - } catch (Exception e) { - - } - + TestUtils.tryExecute(see, "basic functionality"); SingletonStreamSink.compareWithList( BOOLEAN_LIST.subList(0, BOOLEAN_LIST.size() - 1).stream().map(Objects::toString) .collect(Collectors.toList())); @@ -162,15 +156,11 @@ public void testWriteThenRead() throws Exception { tEnv.executeSql(createTableSql(tableName, tp, tSchema, "json")).print(); Table t = tEnv.sqlQuery("select i, f, bar from " + tableName); - tEnv.toAppendStream(t, t.getSchema().toRowType()) - .map(new FailingIdentityMapper(fooList.size())) + tEnv.toDataStream(t, SchemaData.Foo.class) + .map(new FailingIdentityMapper<>(fooList.size())) .addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1); - try { - env.execute("count elements from topics"); - } catch (Exception e) { - - } + TestUtils.tryExecute(env, "count elements from topics"); SingletonStreamSink.compareWithList( fooList.subList(0, fooList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList())); } @@ -188,12 +178,12 @@ public void testStructTypesInJson() throws Exception { TableSchema tSchema = getTableSchema(table); tEnv.executeSql(createTableSql(tableName, table, tSchema, "json")).print(); - Table t = tEnv.scan(tableName).select("i, f, bar"); - tEnv.toAppendStream(t, t.getSchema().toRowType()) - .map(new FailingIdentityMapper(fooList.size())) + Table t = tEnv.sqlQuery("select i, f, bar from " + tableName); + tEnv.toDataStream(t, SchemaData.Foo.class) + .map(new FailingIdentityMapper<>(fooList.size())) .addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1); - TestUtils.tryExecute(see, "test struct in avro"); + TestUtils.tryExecute(see, "test struct in json"); SingletonStreamSink.compareWithList( fooList.subList(0, fooList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList())); } @@ -211,16 +201,11 @@ public void testStructTypesWithJavaList() throws Exception { TableSchema tSchema = getTableSchema(table); tEnv.executeSql(createTableSql(tableName, table, tSchema, "json")).print(); - Table t = tEnv.scan(tableName).select("l"); - tEnv.toAppendStream(t, t.getSchema().toRowType()) - .map(new FailingIdentityMapper(flList.size())) + Table t = tEnv.sqlQuery("select l from " + tableName); + tEnv.toDataStream(t, SchemaData.FL.class) + .map(new FailingIdentityMapper<>(flList.size())) .addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1); - - try { - see.execute("test struct in avro"); - } catch (Exception e) { - log.error("", e); - } + TestUtils.tryExecute(see, "test struct in json"); SingletonStreamSink.compareWithList( flList.subList(0, flList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList())); } @@ -250,16 +235,13 @@ public void testStructTypesWithJavaArray() throws Exception { tEnv.executeSql(createTableSql(tableName, table, tSchema, "json")).print(); - Table t = tEnv.scan(tableName).select("l"); - tEnv.toAppendStream(t, t.getSchema().toRowType()) - .map(new FailingIdentityMapper(faList.size())) + Table t = tEnv.sqlQuery("select l from " + tableName); + tEnv.toDataStream(t, SchemaData.FA.class) + .map(new FailingIdentityMapper<>(faList.size())) .addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1); - try { - see.execute("test struct in avro"); - } catch (Exception e) { + TestUtils.tryExecute(see, "test struct in avro"); - } SingletonStreamSink.compareWithList( faList.subList(0, faList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList())); } @@ -278,17 +260,13 @@ public void testStructTypesWithJavaMap() throws Exception { tEnv.executeSql(createTableSql(tableName, table, tSchema, "json")).print(); - Table t = tEnv.scan(tableName).select("m"); - - tEnv.toAppendStream(t, t.getSchema().toRowType()) - .map(new FailingIdentityMapper(faList.size())) + Table t = tEnv.sqlQuery("select m from " + tableName); + tEnv.toDataStream(t, SchemaData.FM.class) + .map(new FailingIdentityMapper<>(faList.size())) .addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1); - try { - see.execute("test struct in avro"); - } catch (Exception e) { + TestUtils.tryExecute(see, "test struct in avro"); - } SingletonStreamSink.compareWithList( fmList.subList(0, fmList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList())); } @@ -304,7 +282,7 @@ public void testSimpleSQLWork() throws Exception { Assert.fail(); } catch (ValidationException e) { // success - } catch (Exception e){ + } catch (Exception e) { log.error("test fail", e); Assert.fail(e.getMessage()); } @@ -322,7 +300,7 @@ public void testProtobufSQLWork() throws Exception { Map map = new HashMap<>(); map.put("protobuf.message-class-name", SimpleTest.class.getCanonicalName()); String extendParamStr = ""; - if (map != null && !map.isEmpty()){ + if (map != null && !map.isEmpty()) { extendParamStr = map.entrySet().stream() .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())) .collect(Collectors.joining(",\n")); @@ -394,7 +372,7 @@ public void testSimpleSQL(String format, Map extend) throws Exce String topic = newTopic(); final String createTable; String extendParamStr = ""; - if (extend != null && !extend.isEmpty()){ + if (extend != null && !extend.isEmpty()) { extendParamStr = extend.entrySet().stream() .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())) .collect(Collectors.joining(",\n")); diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java index 4bc45c0a..8296733c 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java @@ -14,15 +14,12 @@ package org.apache.flink.streaming.connectors.pulsar; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.connector.pulsar.source.BrokerPartition; import org.apache.flink.connector.pulsar.source.StartOffsetInitializer; import org.apache.flink.connector.pulsar.source.StopCondition; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.connector.pulsar.source.util.PulsarAdminUtils; -import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange; import org.apache.flink.streaming.util.TestStreamEnvironment; @@ -161,15 +158,6 @@ public static void shutDownServices() throws Exception { log.info("-------------------------------------------------------------------------"); } - protected static Configuration getFlinkConfiguration() { - Configuration flinkConfig = new Configuration(); - - flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "16m"); - flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + - ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); - return flinkConfig; - } - public static List sendTypedMessages( String topic, SchemaType type, diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBaseWithFlink.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBaseWithFlink.java index f107f860..d42f6356 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBaseWithFlink.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBaseWithFlink.java @@ -16,6 +16,11 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.TableColumn; @@ -57,6 +62,18 @@ public void noJobIsRunning() throws Exception { waitUntilNoJobIsRunning(client); } + protected static Configuration getFlinkConfiguration() { + Configuration flinkConfig = new Configuration(); + + flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "16m"); + flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS); + flinkConfig.setBoolean(WebOptions.SUBMIT_ENABLE, false); + return flinkConfig; + } + public static void waitUntilJobIsRunning(ClusterClient client) throws Exception { while (getRunningJobs(client).isEmpty()) { Thread.sleep(50); diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java index 6e6b0d8a..227f4ab0 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/RowDataDerSerializationSchemaTest.java @@ -20,10 +20,10 @@ import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; import org.apache.flink.formats.json.JsonRowDataSerializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.formats.protobuf.PbRowTypeInformation; import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema; import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; @@ -157,7 +157,7 @@ public void testJsonSerializeDeserialize() throws Exception { JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601, JsonOptions.MapNullKeyMode.DROP, - ""); + "", true); serializationSchema.open(null); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(rowType, typeInfo, false, false, TimestampFormat.ISO_8601); diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java index 8ecf8749..16e1f784 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java @@ -29,8 +29,8 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; import org.apache.flink.test.util.TestUtils; +import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; -import org.apache.flink.util.StringUtils; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.schema.SchemaType; @@ -77,7 +77,7 @@ public void testBooleanRead() throws Exception { @Test(timeout = 100 * 1000L) public void testBooleanWrite() throws Exception { - checkWrite(SchemaType.BOOLEAN, DataTypes.BOOLEAN(), BOOLEAN_LIST, null, null); + checkWrite(SchemaType.BOOLEAN, DataTypes.BOOLEAN(), BOOLEAN_LIST, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -87,7 +87,7 @@ public void testINT32Read() throws Exception { @Test(timeout = 100 * 1000L) public void testINT32Write() throws Exception { - checkWrite(SchemaType.INT32, DataTypes.INT(), INTEGER_LIST, null, null); + checkWrite(SchemaType.INT32, DataTypes.INT(), INTEGER_LIST, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -97,7 +97,7 @@ public void testINT64Read() throws Exception { @Test(timeout = 100 * 1000L) public void testINT64Write() throws Exception { - checkWrite(SchemaType.INT64, DataTypes.BIGINT(), INT_64_LIST, null, null); + checkWrite(SchemaType.INT64, DataTypes.BIGINT(), INT_64_LIST, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -107,7 +107,8 @@ public void testStringRead() throws Exception { @Test(timeout = 100 * 1000L) public void testStringWrite() throws Exception { - checkWrite(SchemaType.STRING, DataTypes.STRING(), STRING_LIST, null, null); + + checkWrite(SchemaType.STRING, DataTypes.STRING(), STRING_LIST, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -117,7 +118,7 @@ public void testByteRead() throws Exception { @Test public void testByteWrite() throws Exception { - checkWrite(SchemaType.INT8, DataTypes.TINYINT(), INT_8_LIST, null, null); + checkWrite(SchemaType.INT8, DataTypes.TINYINT(), INT_8_LIST, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -127,7 +128,7 @@ public void testShortRead() throws Exception { @Test(timeout = 100 * 1000L) public void testShortWrite() throws Exception { - checkWrite(SchemaType.INT16, DataTypes.SMALLINT(), INT_16_LIST, null, null); + checkWrite(SchemaType.INT16, DataTypes.SMALLINT(), INT_16_LIST, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -137,7 +138,7 @@ public void testFloatRead() throws Exception { @Test(timeout = 100 * 1000L) public void testFloatWrite() throws Exception { - checkWrite(SchemaType.FLOAT, DataTypes.FLOAT(), FLOAT_LIST, null, null); + checkWrite(SchemaType.FLOAT, DataTypes.FLOAT(), FLOAT_LIST, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -147,7 +148,7 @@ public void testDoubleRead() throws Exception { @Test(timeout = 100 * 1000L) public void testDoubleWrite() throws Exception { - checkWrite(SchemaType.DOUBLE, DataTypes.DOUBLE(), DOUBLE_LIST, null, null); + checkWrite(SchemaType.DOUBLE, DataTypes.DOUBLE(), DOUBLE_LIST, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -160,7 +161,7 @@ public void testDateRead() throws Exception { public void testDateWrite() throws Exception { checkWrite(SchemaType.LOCAL_DATE, DataTypes.DATE(), - localDateList, null, null); + localDateList, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -172,7 +173,7 @@ public void testTimestampRead() throws Exception { @Test(timeout = 100 * 1000L) public void testTimestampWrite() throws Exception { checkWrite(SchemaType.LOCAL_DATE_TIME, - DataTypes.TIMESTAMP(3), localDateTimeList, null, null); + DataTypes.TIMESTAMP(3), localDateTimeList, obj -> Row.of(obj).toString(), null); } @Test(timeout = 100 * 1000L) @@ -182,7 +183,7 @@ public void testByteArrayRead() throws Exception { @Test(timeout = 100 * 1000L) public void testByteArrayWrite() throws Exception { - checkWrite(SchemaType.BYTES, DataTypes.BYTES(), BYTES_LIST, t -> StringUtils.arrayAwareToString(t), null); + checkWrite(SchemaType.BYTES, DataTypes.BYTES(), BYTES_LIST, obj -> Row.of(obj).toString(), null); } private void checkRead(SchemaType type, DataType dt, List datas, Function toStr, Class tClass) diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactoryTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactoryTest.java index f3c8e374..7e245220 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactoryTest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactoryTest.java @@ -16,7 +16,6 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink; @@ -24,12 +23,11 @@ import org.apache.flink.streaming.connectors.pulsar.config.StartupMode; import org.apache.flink.streaming.connectors.pulsar.util.KeyHashMessageRouterImpl; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; -import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; @@ -39,7 +37,7 @@ import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; import org.apache.flink.table.factories.TestFormatFactory; import org.apache.flink.table.factories.TestFormatFactory.DecodingFormatMock; import org.apache.flink.table.factories.TestFormatFactory.EncodingFormatMock; @@ -68,9 +66,12 @@ import java.util.function.Consumer; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -115,20 +116,33 @@ public class PulsarDynamicTableFactoryTest extends TestLogger { private static final String PROPS_SCAN_OFFSETS = MessageId.earliest.toString(); - private static final TableSchema SCHEMA = TableSchema.builder() - .add(TableColumn.physical(NAME, DataTypes.STRING())) - .add(TableColumn.physical(COUNT, DataTypes.DECIMAL(38, 18))) - .add(TableColumn.physical(EVENT_TIME, DataTypes.TIMESTAMP(3))) - .add(TableColumn.computed(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION)) - .watermark(EVENT_TIME, WATERMARK_EXPRESSION, WATERMARK_DATATYPE) - .build(); - - private static final TableSchema SCHEMA_WITH_METADATA = TableSchema.builder() - .add(TableColumn.physical(NAME, DataTypes.STRING())) - .add(TableColumn.physical(COUNT, DataTypes.DECIMAL(38, 18))) - .add(TableColumn.metadata(EVENT_TIME, DataTypes.TIMESTAMP(3), "eventTime")) - .add(TableColumn.metadata(METADATA_TOPIC, DataTypes.STRING(), "value.metadata_2")) - .build(); + private static final ResolvedSchema SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical(NAME, DataTypes.STRING().notNull()), + Column.physical(COUNT, DataTypes.DECIMAL(38, 18)), + Column.physical(EVENT_TIME, DataTypes.TIMESTAMP(3)), + Column.computed( + COMPUTED_COLUMN_NAME, + ResolvedExpressionMock.of( + COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION))), + Collections.singletonList( + WatermarkSpec.of( + EVENT_TIME, + ResolvedExpressionMock.of( + WATERMARK_DATATYPE, WATERMARK_EXPRESSION))), + null); + + private static final ResolvedSchema SCHEMA_WITH_METADATA = + new ResolvedSchema( + Arrays.asList( + Column.physical(NAME, DataTypes.STRING()), + Column.physical(COUNT, DataTypes.DECIMAL(38, 18)), + Column.metadata(EVENT_TIME, DataTypes.TIMESTAMP(3), "eventTime", false), + Column.metadata( + METADATA, DataTypes.STRING(), "value.metadata_2", false)), + Collections.emptyList(), + null); private static final DataType SCHEMA_DATA_TYPE = SCHEMA.toPhysicalRowDataType(); @@ -248,7 +262,7 @@ public void testTableSourceWithKeyValueAndMetadata() { final PulsarDynamicTableSource actualPulsarSource = (PulsarDynamicTableSource) actualSource; // initialize stateful testing formats actualPulsarSource.applyReadableMetadata(Arrays.asList("eventTime", "value.metadata_2"), - SCHEMA_WITH_METADATA.toRowDataType()); + SCHEMA_WITH_METADATA.toSourceRowDataType()); actualPulsarSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); final DecodingFormatMock expectedKeyFormat = new DecodingFormatMock( @@ -293,7 +307,7 @@ public void testTableSourceWithKeyValueAndMetadata() { PULSAR_SOURCE_PROPERTIES, startupOptions ); - expectedPulsarSource.producedDataType = SCHEMA_WITH_METADATA.toRowDataType(); + expectedPulsarSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedPulsarSource.metadataKeys = Collections.singletonList("eventTime"); assertEquals(actualSource, expectedPulsarSource); @@ -457,14 +471,11 @@ public void testSinkWithTopicListOrTopicPattern() { @Test public void testPrimaryKeyValidation() { - final TableSchema pkSchema = TableSchema.builder() - .field(NAME, DataTypes.STRING().notNull()) - .field(COUNT, DataTypes.DECIMAL(38, 18)) - .field(EVENT_TIME, DataTypes.TIMESTAMP(3)) - .field(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION) - .watermark(EVENT_TIME, WATERMARK_EXPRESSION, WATERMARK_DATATYPE) - .primaryKey(NAME) - .build(); + final ResolvedSchema pkSchema = + new ResolvedSchema( + SCHEMA.getColumns(), + SCHEMA.getWatermarkSpecs(), + UniqueConstraint.primaryKey(NAME, Collections.singletonList(NAME))); Map options1 = getModifiedOptions( getBasicSourceOptions(), @@ -483,7 +494,7 @@ public void testPrimaryKeyValidation() { createTableSink(pkSchema, getBasicSinkOptions()); fail(); } catch (Throwable t) { - String error = "The Pulsar table 'default.default.sinkTable' with 'test-format' format" + + String error = "The Pulsar table 'default.default.t1' with 'test-format' format" + " doesn't support defining PRIMARY KEY constraint on the table, because it can't" + " guarantee the semantic of primary key."; assertEquals(error, t.getCause().getMessage()); @@ -493,7 +504,7 @@ public void testPrimaryKeyValidation() { createTableSource(pkSchema, getBasicSinkOptions()); fail(); } catch (Throwable t) { - String error = "The Pulsar table 'default.default.scanTable' with 'test-format' format" + + String error = "The Pulsar table 'default.default.t1' with 'test-format' format" + " doesn't support defining PRIMARY KEY constraint on the table, because it can't" + " guarantee the semantic of primary key."; assertEquals(error, t.getCause().getMessage()); @@ -563,36 +574,6 @@ private static PulsarDynamicTableSink createExpectedSink( KeyHashMessageRouterImpl.INSTANCE); } - private static DynamicTableSource createTableSource(TableSchema schema, Map options) { - final ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "scanTable"); - final CatalogTable catalogTable = new CatalogTableImpl(schema, options, "scanTable"); - return FactoryUtil.createTableSource( - null, - objectIdentifier, - catalogTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - - private static DynamicTableSink createTableSink(TableSchema schema, Map options) { - final ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "sinkTable"); - final CatalogTable catalogTable = new CatalogTableImpl(schema, options, "sinkTable"); - return FactoryUtil.createTableSink( - null, - objectIdentifier, - catalogTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - /** * Returns the full options modified by the given consumer {@code optionModifier}. * diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/UpsertPulsarDynamicTableFactoryTest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/UpsertPulsarDynamicTableFactoryTest.java index 6864e904..e76213f2 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/UpsertPulsarDynamicTableFactoryTest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/table/UpsertPulsarDynamicTableFactoryTest.java @@ -16,7 +16,6 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink; @@ -24,11 +23,10 @@ import org.apache.flink.streaming.connectors.pulsar.config.StartupMode; import org.apache.flink.streaming.connectors.pulsar.util.KeyHashMessageRouterImpl; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; -import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; @@ -38,7 +36,6 @@ import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.TestFormatFactory; import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; @@ -52,6 +49,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -59,9 +57,11 @@ import java.util.function.Consumer; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** @@ -77,22 +77,28 @@ public class UpsertPulsarDynamicTableFactoryTest extends TestLogger { private static final String ADMIN_URL = "http://127.0.0.1:8080"; - private static final TableSchema SOURCE_SCHEMA = TableSchema.builder() - .field("window_start", DataTypes.STRING().notNull()) - .field("region", DataTypes.STRING().notNull()) - .field("view_count", DataTypes.BIGINT()) - .primaryKey("window_start", "region") - .build(); + private static final ResolvedSchema SOURCE_SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical("window_start", DataTypes.STRING().notNull()), + Column.physical("region", DataTypes.STRING().notNull()), + Column.physical("view_count", DataTypes.BIGINT())), + Collections.emptyList(), + UniqueConstraint.primaryKey("name", Arrays.asList("window_start", "region"))); + private static final int[] SOURCE_KEY_FIELDS = new int[]{0, 1}; private static final int[] SOURCE_VALUE_FIELDS = new int[]{0, 1, 2}; - private static final TableSchema SINK_SCHEMA = TableSchema.builder() - .field("region", new AtomicDataType(new VarCharType(false, 100))) - .field("view_count", DataTypes.BIGINT()) - .primaryKey("region") - .build(); + private static final ResolvedSchema SINK_SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical( + "region", new AtomicDataType(new VarCharType(false, 100))), + Column.physical("view_count", DataTypes.BIGINT())), + Collections.emptyList(), + UniqueConstraint.primaryKey("name", Collections.singletonList("region"))); private static final int[] SINK_KEY_FIELDS = new int[]{0}; @@ -109,23 +115,28 @@ public class UpsertPulsarDynamicTableFactoryTest extends TestLogger { // UPSERT_PULSAR_SINK_PROPERTIES.setProperty("service-url", SERVICE_URL); } + protected static DecodingFormat> keyDecodingFormat = + new TestFormatFactory.DecodingFormatMock( + ",", true, ChangelogMode.insertOnly(), Collections.emptyMap()); + + protected static DecodingFormat> valueDecodingFormat = + new TestFormatFactory.DecodingFormatMock( + ",", true, ChangelogMode.insertOnly(), Collections.emptyMap()); + + protected static EncodingFormat> keyEncodingFormat = + new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly()); + protected static EncodingFormat> valueEncodingFormat = + new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly()); + + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void testTableSource() { final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType(); - - DecodingFormat> keyDecodingFormat = - new TestFormatFactory.DecodingFormatMock( - ",", true, ChangelogMode.insertOnly(), Collections.emptyMap()); - - DecodingFormat> valueDecodingFormat = - new TestFormatFactory.DecodingFormatMock( - ",", true, ChangelogMode.insertOnly(), Collections.emptyMap()); - // Construct table source using options and table source factory - final DynamicTableSource actualSource = createActualSource(SOURCE_SCHEMA, getFullSourceOptions()); + final DynamicTableSource actualSource = createTableSource(SOURCE_SCHEMA, getFullSourceOptions()); final PulsarDynamicTableSource expectedSource = createExpectedScanSource( producedDataType, @@ -149,13 +160,9 @@ public void testTableSource() { @Test public void testTableSink() { - EncodingFormat> keyEncodingFormat = - new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly()); - EncodingFormat> valueEncodingFormat = - new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly()); // Construct table sink using options and table sink factory. - final DynamicTableSink actualSink = createActualSink(SINK_SCHEMA, getFullSinkOptions()); + final DynamicTableSink actualSink = createTableSink(SINK_SCHEMA, getFullSinkOptions()); final DynamicTableSink expectedSink = createExpectedSink( SINK_SCHEMA.toPhysicalRowDataType(), @@ -188,12 +195,7 @@ public void testTableSinkWithParallelism() { final Map modifiedOptions = getModifiedOptions( getFullSinkOptions(), options -> options.put("sink.parallelism", "100")); - final DynamicTableSink actualSink = createActualSink(SINK_SCHEMA, modifiedOptions); - - EncodingFormat> keyEncodingFormat = - new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly()); - EncodingFormat> valueEncodingFormat = - new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly()); + final DynamicTableSink actualSink = createTableSink(SINK_SCHEMA, modifiedOptions); final DynamicTableSink expectedSink = createExpectedSink( SINK_SCHEMA.toPhysicalRowDataType(), @@ -228,12 +230,12 @@ public void testCreateSourceTableWithoutPK() { "The PRIMARY KEY specifies which columns should be read from or write to the Pulsar message key. " + "The PRIMARY KEY also defines records in the 'upsert-pulsar' table should update or delete on which keys."))); - TableSchema illegalSchema = TableSchema.builder() - .field("window_start", DataTypes.STRING()) - .field("region", DataTypes.STRING()) - .field("view_count", DataTypes.BIGINT()) - .build(); - createActualSource(illegalSchema, getFullSinkOptions()); + ResolvedSchema illegalSchema = + ResolvedSchema.of( + Column.physical("window_start", DataTypes.STRING()), + Column.physical("region", DataTypes.STRING()), + Column.physical("view_count", DataTypes.BIGINT())); + createTableSource(illegalSchema, getFullSourceOptions()); } @Test @@ -243,11 +245,11 @@ public void testCreateSinkTableWithoutPK() { "The PRIMARY KEY specifies which columns should be read from or write to the Pulsar message key. " + "The PRIMARY KEY also defines records in the 'upsert-pulsar' table should update or delete on which keys."))); - TableSchema illegalSchema = TableSchema.builder() - .field("region", DataTypes.STRING()) - .field("view_count", DataTypes.BIGINT()) - .build(); - createActualSink(illegalSchema, getFullSinkOptions()); + ResolvedSchema illegalSchema = + ResolvedSchema.of( + Column.physical("region", DataTypes.STRING()), + Column.physical("view_count", DataTypes.BIGINT())); + createTableSink(illegalSchema, getFullSinkOptions()); } @Test @@ -260,7 +262,7 @@ public void testSerWithCDCFormatAsValue() { TestFormatFactory.IDENTIFIER, TestFormatFactory.IDENTIFIER )))); - createActualSink(SINK_SCHEMA, + createTableSink(SINK_SCHEMA, getModifiedOptions( getFullSinkOptions(), options -> options.put( @@ -279,7 +281,7 @@ public void testDeserWithCDCFormatAsValue() { TestFormatFactory.IDENTIFIER )))); - createActualSource(SOURCE_SCHEMA, + createTableSource(SOURCE_SCHEMA, getModifiedOptions( getFullSinkOptions(), options -> options.put( @@ -352,39 +354,6 @@ private static Map getFullSinkOptions() { return options; } - private static DynamicTableSource createActualSource(TableSchema schema, Map options) { - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "sourceTable"); - final CatalogTable sourceTable = - new CatalogTableImpl(schema, options, "sinkTable"); - - return FactoryUtil.createTableSource( - null, - objectIdentifier, - sourceTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - - private static DynamicTableSink createActualSink(TableSchema schema, Map options) { - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "sinkTable"); - final CatalogTable sinkTable = - new CatalogTableImpl(schema, options, "sinkTable"); - return FactoryUtil.createTableSink( - null, - objectIdentifier, - sinkTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - private static PulsarDynamicTableSource createExpectedScanSource( DataType producedDataType, DecodingFormat> keyDecodingFormat, diff --git a/pulsar-flink-sql-connector-2.11/pom.xml b/pulsar-flink-sql-connector-2.11/pom.xml index f1f69f9e..816010d6 100644 --- a/pulsar-flink-sql-connector-2.11/pom.xml +++ b/pulsar-flink-sql-connector-2.11/pom.xml @@ -21,7 +21,7 @@ io.streamnative.connectors pulsar-flink-parent - 2.7.7-SNAPSHOT + 1.13.1.0 pulsar-flink-sql-connector_2.11 diff --git a/pulsar-flink-sql-connector-2.12/pom.xml b/pulsar-flink-sql-connector-2.12/pom.xml index 4ae4a749..daeef6c5 100644 --- a/pulsar-flink-sql-connector-2.12/pom.xml +++ b/pulsar-flink-sql-connector-2.12/pom.xml @@ -21,7 +21,7 @@ io.streamnative.connectors pulsar-flink-parent - 2.7.7-SNAPSHOT + 1.13.1.0 pulsar-flink-sql-connector_2.12