From f4906219b7ae11ef56aca24f7004509ff29179b1 Mon Sep 17 00:00:00 2001 From: photowey Date: Sat, 6 Apr 2024 14:46:23 +0800 Subject: [PATCH] feat: Add: 1.jackson module. Signed-off-by: photowey --- kafka-plus-jackson/pom.xml | 40 ++++++++ .../serialization/ApplyObjectMapper.java | 49 ++++++++++ .../deserializer/JacksonDeserializer.java | 91 +++++++++++++++++++ .../serializer/JacksonSerializer.java | 51 +++++++++++ kafka-plus-jackson/src/main/resources/.Keep | 0 .../io/github/photowey/kafka/plus/core/.Keep | 0 kafka-plus-jackson/src/test/resources/.Keep | 0 pom.xml | 19 ++++ 8 files changed, 250 insertions(+) create mode 100644 kafka-plus-jackson/pom.xml create mode 100644 kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/ApplyObjectMapper.java create mode 100644 kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/deserializer/JacksonDeserializer.java create mode 100644 kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/serializer/JacksonSerializer.java create mode 100644 kafka-plus-jackson/src/main/resources/.Keep create mode 100644 kafka-plus-jackson/src/test/java/io/github/photowey/kafka/plus/core/.Keep create mode 100644 kafka-plus-jackson/src/test/resources/.Keep diff --git a/kafka-plus-jackson/pom.xml b/kafka-plus-jackson/pom.xml new file mode 100644 index 0000000..e724ee7 --- /dev/null +++ b/kafka-plus-jackson/pom.xml @@ -0,0 +1,40 @@ + + + + 4.0.0 + + + io.github.photowey + kafka-plus + 3.7.0.1.0-SNAPSHOT + + + kafka-plus-jackson + + ${project.groupId}:${project.artifactId} + The jackson core module of project kafka-plus + + + + + + + + + org.apache.kafka + kafka-clients + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + + \ No newline at end of file diff --git a/kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/ApplyObjectMapper.java b/kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/ApplyObjectMapper.java new file mode 100644 index 0000000..ebd60b7 --- /dev/null +++ b/kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/ApplyObjectMapper.java @@ -0,0 +1,49 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.jackson.serialization; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +/** + * {@code ApplyObjectMapper} + * + * @author photowey + * @date 2024/04/06 + * @since 1.0.0 + */ +public interface ApplyObjectMapper { + + default ObjectMapper initObjectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + this.applyObjectMapper(objectMapper); + + return objectMapper; + } + + default void applyObjectMapper(ObjectMapper objectMapper) { + objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true) + .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) + .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + .registerModule(new JavaTimeModule()); + } +} diff --git a/kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/deserializer/JacksonDeserializer.java b/kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/deserializer/JacksonDeserializer.java new file mode 100644 index 0000000..2d9d515 --- /dev/null +++ b/kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/deserializer/JacksonDeserializer.java @@ -0,0 +1,91 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.jackson.serialization.deserializer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.photowey.kafka.plus.core.jackson.serialization.ApplyObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Utils; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * {@code JacksonDeserializer} + * + * @author photowey + * @date 2024/04/06 + * @since 1.0.0 + */ +public class JacksonDeserializer implements Deserializer, ApplyObjectMapper { + + private final ObjectMapper objectMapper; + private String encoding = StandardCharsets.UTF_8.name(); + + public JacksonDeserializer() { + this.objectMapper = this.initObjectMapper(); + } + + @Override + public void configure(Map configs, boolean isKey) { + String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding"; + Object encodingValue = configs.get(propertyName); + if (encodingValue == null) { + encodingValue = configs.get("deserializer.encoding"); + } + if (encodingValue instanceof String) { + encoding = (String) encodingValue; + } + } + + @Override + public Object deserialize(String topic, byte[] data) { + try { + if (data == null) { + return null; + } + + return this.objectMapper.readValue(data, Object.class); + } catch (Exception e) { + throw new SerializationException("Error when deserializing byte[] to Object by [com.fasterxml.jackson.databind.ObjectMapper]."); + } + } + + @Override + public Object deserialize(String topic, Headers headers, ByteBuffer data) { + if (data == null) { + return null; + } + + try { + if (data.hasArray()) { + String json = new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding); + return this.objectMapper.readValue(json, Object.class); + } + + String json = new String(Utils.toArray(data), encoding); + return this.objectMapper.readValue(json, Object.class); + } catch (UnsupportedEncodingException e) { + throw new SerializationException("Error when deserializing ByteBuffer to string due to unsupported encoding " + encoding); + } catch (Exception e) { + throw new SerializationException("Error when deserializing ByteBuffer to Object by [com.fasterxml.jackson.databind.ObjectMapper]."); + } + } +} \ No newline at end of file diff --git a/kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/serializer/JacksonSerializer.java b/kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/serializer/JacksonSerializer.java new file mode 100644 index 0000000..bd81e54 --- /dev/null +++ b/kafka-plus-jackson/src/main/java/io/github/photowey/kafka/plus/core/jackson/serialization/serializer/JacksonSerializer.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.jackson.serialization.serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.photowey.kafka.plus.core.jackson.serialization.ApplyObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +/** + * {@code JacksonSerializer} + * + * @author photowey + * @date 2024/04/06 + * @since 1.0.0 + */ +public class JacksonSerializer implements Serializer, ApplyObjectMapper { + + private final ObjectMapper objectMapper; + + public JacksonSerializer() { + this.objectMapper = this.initObjectMapper(); + } + + @Override + public byte[] serialize(String topic, Object data) { + try { + if (data == null) { + return null; + } + + return this.objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new SerializationException("Error when serializing object to byte[] by [com.fasterxml.jackson.databind.ObjectMapper]."); + } + } +} \ No newline at end of file diff --git a/kafka-plus-jackson/src/main/resources/.Keep b/kafka-plus-jackson/src/main/resources/.Keep new file mode 100644 index 0000000..e69de29 diff --git a/kafka-plus-jackson/src/test/java/io/github/photowey/kafka/plus/core/.Keep b/kafka-plus-jackson/src/test/java/io/github/photowey/kafka/plus/core/.Keep new file mode 100644 index 0000000..e69de29 diff --git a/kafka-plus-jackson/src/test/resources/.Keep b/kafka-plus-jackson/src/test/resources/.Keep new file mode 100644 index 0000000..e69de29 diff --git a/pom.xml b/pom.xml index a224eb5..1a180fb 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,7 @@ kafka-plus-core kafka-plus-engine + kafka-plus-jackson kafka-plus-runtime @@ -71,6 +72,7 @@ 4.9.10 3.7.0 + 2.14.2 https://github.com/photowey/kafka-plus @@ -97,11 +99,28 @@ kafka-plus-engine ${project.version} + + io.github.photowey + kafka-plus-jackson + ${project.version} + io.github.photowey kafka-plus-runtime ${project.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} +