From d051d11c18e8af0d7be3593fa947482b14481d73 Mon Sep 17 00:00:00 2001 From: Ward Harris Date: Sat, 31 Jul 2021 02:28:05 +0800 Subject: [PATCH] fix: pulsar with flink debezium-json format bug (#379) (cherry picked from commit a38b8095d3bd96470af07f4fd20d830983c0a07c) --- .../util/serialization/ThreadSafeDeserializationSchema.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/ThreadSafeDeserializationSchema.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/ThreadSafeDeserializationSchema.java index 1391e930..72b91f27 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/ThreadSafeDeserializationSchema.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/ThreadSafeDeserializationSchema.java @@ -16,6 +16,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; import java.io.IOException; @@ -47,6 +48,11 @@ public synchronized T deserialize(byte[] bytes) throws IOException { return deserializationSchema.deserialize(bytes); } + @Override + public synchronized void deserialize(byte[] message, Collector out) throws IOException { + deserializationSchema.deserialize(message, out); + } + @Override public synchronized boolean isEndOfStream(T object) { return deserializationSchema.isEndOfStream(object);