Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
fix pulsar admin auth for PulsarProtobufNativeFormatFactory (#391)
Browse files Browse the repository at this point in the history
(cherry picked from commit d682122)
  • Loading branch information
Jianyun Zhao authored and jianyun8023 committed Aug 12, 2021
1 parent 58b4e53 commit c6e0cac
Showing 1 changed file with 2 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
Expand All @@ -41,12 +42,10 @@

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.ADMIN_URL;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC_PATTERN;

Expand All @@ -65,12 +64,7 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(Dynam

String topic = extractTopicName(context);
String adminUrl = tableConf.get(ADMIN_URL);
Optional<Map<String, String>> stringMap = tableConf.getOptional(PROPERTIES);
Properties properties = stringMap.map((map) -> {
final Properties prop = new Properties();
prop.putAll(map);
return prop;
}).orElse(new Properties());
Properties properties = PulsarTableOptions.getPulsarProperties(context.getCatalogTable().getOptions());

SerializableSupplier<Descriptors.Descriptor> loadDescriptor = () -> {
SchemaInfo schemaInfo = null;
Expand Down

0 comments on commit c6e0cac

Please sign in to comment.