Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
Bump flink 1.13.1 (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jianyun Zhao authored Jun 21, 2021
1 parent 2a57809 commit 113afca
Show file tree
Hide file tree
Showing 29 changed files with 337 additions and 553 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-parent</artifactId>
<version>2.7.7-SNAPSHOT</version>
<version>1.13.1.0</version>

<name>StreamNative :: Pulsar Flink Connector :: Root</name>
<url>https://pulsar.apache.org</url>
Expand Down Expand Up @@ -74,7 +74,7 @@

<!-- use Pulsar stable version -->
<pulsar.version>2.8.0</pulsar.version>
<flink.version>1.12.3</flink.version>
<flink.version>1.13.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<lombok.version>1.18.18</lombok.version>
<aircompressor.version>0.16</aircompressor.version>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-flink-connector-shade-2.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-parent</artifactId>
<version>2.7.7-SNAPSHOT</version>
<version>1.13.1.0</version>
</parent>

<artifactId>pulsar-flink-connector_2.11</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-flink-connector-shade-2.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-parent</artifactId>
<version>2.7.7-SNAPSHOT</version>
<version>1.13.1.0</version>
</parent>

<artifactId>pulsar-flink-connector_2.12</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-flink-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-parent</artifactId>
<version>2.7.7-SNAPSHOT</version>
<version>1.13.1.0</version>
</parent>

<artifactId>pulsar-flink-connector-origin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void addReader(int subtaskId) {
}

@Override
public PulsarSourceEnumeratorState snapshotState() throws Exception {
public PulsarSourceEnumeratorState snapshotState(long l) throws Exception {
return new PulsarSourceEnumeratorState(readerIdToSplitAssignments);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
import static org.apache.flink.table.catalog.CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX;

/**
* catalog support.
Expand Down Expand Up @@ -87,7 +88,7 @@ public List<String> getTopics(String databaseName) throws PulsarAdminException {
return pulsarMetadataReader.getTopics(databaseName);
}

public CatalogTableImpl getTableSchema(ObjectPath tablePath,
public CatalogTable getTableSchema(ObjectPath tablePath,
Map<String, String> properties)
throws PulsarAdminException, IncompatibleSchemaException {
String topicName = objectPath2TopicName(tablePath);
Expand Down Expand Up @@ -163,7 +164,7 @@ private SchemaInfo tableSchemaToPulsarSchema(String format, TableSchema schema,
return SchemaUtils.tableSchemaToSchemaInfo(format, physicalRowDataType, options);
}

private CatalogTableImpl schemaToCatalogTable(SchemaInfo pulsarSchema,
private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema,
ObjectPath tablePath,
Map<String, String> flinkProperties)
throws IncompatibleSchemaException {
Expand All @@ -182,10 +183,20 @@ private CatalogTableImpl schemaToCatalogTable(SchemaInfo pulsarSchema,
properties.putAll(flinkProperties);
properties.remove(IS_CATALOG_TOPIC);
String comment = properties.remove(PulsarCatalogSupport.COMMENT);
return new CatalogTableImpl(tableSchema, partitionKeys, properties, comment);
return CatalogTable.of(
tableSchema.toSchema(),
comment,
partitionKeys,
properties
);
} else {
final TableSchema tableSchema = schemaTranslator.pulsarSchemaToTableSchema(pulsarSchema);
return new CatalogTableImpl(tableSchema, flinkProperties, "");
return CatalogTable.of(
tableSchema.toSchema(),
"",
Collections.emptyList(),
flinkProperties
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.apache.flink.streaming.connectors.pulsar.util;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;

Expand All @@ -33,8 +32,4 @@ public static <T> Optional<Class<T>> extractType(DataType dataType) {
}
return Optional.empty();
}

public static DataType toDataType(Class<?> clazz) {
return DataTypes.RAW(TypeInformationUtils.getTypesAsRow(clazz));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Schema<V> getSchema() {

@Override
public V deserialize(Message<V> message) throws IOException {
return valueDeserializer.deserialize(message.getData());
return message.getValue();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package org.apache.flink.streaming.util.serialization;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.connectors.pulsar.util.TypeInformationUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;

import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -60,9 +58,6 @@ public PulsarDeserializationSchemaBuilder<V> setRecordClass(Class<V> recordClass
}

public PulsarDeserializationSchema<V> build() {
if (dataType == null) {
dataType = DataTypes.RAW(TypeInformationUtils.getTypesAsRow(recordClass)).bridgedTo(recordClass);
}
return new PulsarDeserializationSchemaWrapper<>(valueDeserializer, dataType);
throw new UnsupportedOperationException("PulsarDeserializationSchemaBuilder is deprecated, use PulsarDeserializationSchema#valueOnly.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class PulsarCatalog extends GenericInMemoryCatalog {

private PulsarCatalogSupport catalogSupport;

public static final String DEFAULT_DB = "public/default";

public PulsarCatalog(String adminUrl, String catalogName, Map<String, String> props, String defaultDatabase) {
super(catalogName, defaultDatabase);
this.adminUrl = adminUrl;
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 113afca

Please sign in to comment.