diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptions.java index 22865211..986327d3 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptions.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptions.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; /** * Various options for decoding a JSON record. @@ -48,7 +47,6 @@ public class JSONOptions implements Serializable { private final String columnNameOfCorruptRecord; private final boolean dropFieldIfAllNull; private final ConcurrentHashMap computedTimeZones; - private final Function computeTimeZone; private final TimeZone timeZone; private final FastDateFormat dateFormat; private final FastDateFormat timestampFormat; @@ -125,7 +123,6 @@ public JSONOptions( } this.computedTimeZones = new ConcurrentHashMap<>(); - this.computeTimeZone = timezoneId -> TimeZone.getTimeZone(timezoneId); this.timeZone = getTimeZone(parameters.getOrDefault("timezone", defaultTimeZoneId)); @@ -139,8 +136,8 @@ protected String checkEncoding(String enc) { return enc; } - private TimeZone getTimeZone(String timeZoneeId) { - return computedTimeZones.computeIfAbsent(timeZoneeId, computeTimeZone); + private TimeZone getTimeZone(String timeZoneId) { + return computedTimeZones.computeIfAbsent(timeZoneId, TimeZone::getTimeZone); } /** Sets config options on a Jackson [[JsonFactory]]. */ diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JacksonRecordParser.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JacksonRecordParser.java index 6e89493a..b5dfee8c 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JacksonRecordParser.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JacksonRecordParser.java @@ -30,19 +30,18 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; -import java.util.function.Function; /** * JSON record parser. */ @Slf4j -public class JacksonRecordParser { +public class JacksonRecordParser implements Serializable{ private final DataType schema; @@ -558,7 +557,7 @@ public Object parseJsonToken(JsonParser parser, DataType dataType, PartialFunc f } } - interface PartialFunc { + interface PartialFunc extends Serializable{ boolean isDefinedAt(JsonToken token); Object apply(JsonToken token); @@ -579,11 +578,19 @@ default Object applyOrElse(JsonToken token, JsonParser parser, DataType dataType } } - interface BiFunctionWithException { + interface Function extends Serializable{ + R apply(T t); + } + + interface BiFunction extends Serializable{ + R apply(T t, U u); + } + + interface BiFunctionWithException extends Serializable{ R apply(T t, U u) throws BadRecordException; } - static class FailureSafeRecordParser { + static class FailureSafeRecordParser implements Serializable { private final BiFunctionWithException rawParser; private final ParseMode mode; private final FieldsDataType schema; diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarDeserializer.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarDeserializer.java index 80dd1dce..a5edec38 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarDeserializer.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarDeserializer.java @@ -56,7 +56,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -117,7 +116,7 @@ public PulsarDeserializer(SchemaInfo schemaInfo, JSONOptions parsedOptions, bool case JSON: FieldsDataType fdt = (FieldsDataType) rootDataType; - BiFunction createParser = + JacksonRecordParser.BiFunction createParser = (jsonFactory, s) -> { try { return jsonFactory.createParser(s); diff --git a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java index 385736dd..ba3f81a3 100644 --- a/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java +++ b/pulsar-flink-connector/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java @@ -142,7 +142,76 @@ public void testStructTypesInAvro() throws Exception { try { see.execute("test struct in avro"); } catch (Exception e) { + } + SingletonStreamSink.compareWithList( + fooList.subList(0, fooList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList())); + } + + @Test + public void testStructTypesInJson() throws Exception { + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(see); + + String table = newTopic() + "_json"; + + sendTypedMessages(table, SchemaType.JSON, fooList, Optional.empty(), SchemaData.Foo.class); + + tEnv + .connect(getPulsarDescriptor(table)) + .inAppendMode() + .registerTableSource(table); + + Table t = tEnv.scan(table).select("i, f, bar"); + tEnv.toAppendStream(t, t.getSchema().toRowType()) + .map(new FailingIdentityMapper(fooList.size())) + .addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1); + + try { + see.execute("test struct in json"); + } catch (Exception ignore) { + } + SingletonStreamSink.compareWithList( + fooList.subList(0, fooList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList())); + } + @Test + public void testStructTypesInJsonBySql() throws Exception { + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(see); + + String tableName = "topic_1_sql"; + sendTypedMessages(tableName, SchemaType.JSON, fooList, Optional.empty(), SchemaData.Foo.class); + + String createSql = "create table `" + tableName + "`(\n" + + "`b` BOOLEAN,\n" + + "`s` STRING\n" + + ") with (\n" + + "'connector.type' ='pulsar',\n" + + "'connector.topic' ='persistent://public/default/" + tableName + "',\n" + + "'connector.service-url' ='" + serviceUrl + "',\n" + + "'connector.admin-url' ='" + adminUrl + "',\n" + + "'connector.startup-mode' ='earliest',\n" + + "'connector.properties.0.key' ='pulsar.reader.readerName',\n" + + "'connector.properties.0.value' ='testStructTypesInJsonBySql',\n" + + "'connector.properties.1.key' ='partitiondiscoveryintervalmillis',\n" + + "'connector.properties.1.value' = '5000',\n" + + "'format.derive-schema' ='true',\n" + + "'format.ignore-parse-errors' ='true',\n" + + "'update-mode' ='append'\n" + + ")"; + tEnv.sqlUpdate(createSql); + + String querySql = "select i,f,bar from " + tableName; + Table table = tEnv.sqlQuery(querySql); + tEnv.toAppendStream(table, table.getSchema().toRowType()) + .map(new FailingIdentityMapper(fooList.size())) + .addSink(new SingletonStreamSink.StringSink<>()).setParallelism(1); + + try { + see.execute("test struct in json by sql"); + } catch (Exception ignore) { } SingletonStreamSink.compareWithList( fooList.subList(0, fooList.size() - 1).stream().map(Objects::toString).collect(Collectors.toList()));