Skip to content

Commit

Permalink
feat: Add: 1.jackson module.
Browse files Browse the repository at this point in the history
Signed-off-by: photowey <[email protected]>
  • Loading branch information
photowey committed Apr 6, 2024
1 parent 363a5aa commit f490621
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 0 deletions.
40 changes: 40 additions & 0 deletions kafka-plus-jackson/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus</artifactId>
<version>3.7.0.1.0-SNAPSHOT</version>
</parent>

<artifactId>kafka-plus-jackson</artifactId>

<name>${project.groupId}:${project.artifactId}</name>
<description>The jackson core module of project kafka-plus</description>

<!-- @formatter:off -->
<properties>
</properties>
<!-- @formatter:on -->

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright © 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.photowey.kafka.plus.core.jackson.serialization;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

/**
* {@code ApplyObjectMapper}
*
* @author photowey
* @date 2024/04/06
* @since 1.0.0
*/
public interface ApplyObjectMapper {

default ObjectMapper initObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
this.applyObjectMapper(objectMapper);

return objectMapper;
}

default void applyObjectMapper(ObjectMapper objectMapper) {
objectMapper
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true)
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.registerModule(new JavaTimeModule());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright © 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.photowey.kafka.plus.core.jackson.serialization.deserializer;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.photowey.kafka.plus.core.jackson.serialization.ApplyObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* {@code JacksonDeserializer}
*
* @author photowey
* @date 2024/04/06
* @since 1.0.0
*/
public class JacksonDeserializer<T> implements Deserializer<Object>, ApplyObjectMapper {

private final ObjectMapper objectMapper;
private String encoding = StandardCharsets.UTF_8.name();

public JacksonDeserializer() {
this.objectMapper = this.initObjectMapper();
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("deserializer.encoding");
}
if (encodingValue instanceof String) {
encoding = (String) encodingValue;
}
}

@Override
public Object deserialize(String topic, byte[] data) {
try {
if (data == null) {
return null;
}

return this.objectMapper.readValue(data, Object.class);
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to Object by [com.fasterxml.jackson.databind.ObjectMapper].");
}
}

@Override
public Object deserialize(String topic, Headers headers, ByteBuffer data) {
if (data == null) {
return null;
}

try {
if (data.hasArray()) {
String json = new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding);
return this.objectMapper.readValue(json, Object.class);
}

String json = new String(Utils.toArray(data), encoding);
return this.objectMapper.readValue(json, Object.class);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing ByteBuffer to string due to unsupported encoding " + encoding);
} catch (Exception e) {
throw new SerializationException("Error when deserializing ByteBuffer to Object by [com.fasterxml.jackson.databind.ObjectMapper].");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.photowey.kafka.plus.core.jackson.serialization.serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.photowey.kafka.plus.core.jackson.serialization.ApplyObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

/**
* {@code JacksonSerializer}
*
* @author photowey
* @date 2024/04/06
* @since 1.0.0
*/
public class JacksonSerializer implements Serializer<Object>, ApplyObjectMapper {

private final ObjectMapper objectMapper;

public JacksonSerializer() {
this.objectMapper = this.initObjectMapper();
}

@Override
public byte[] serialize(String topic, Object data) {
try {
if (data == null) {
return null;
}

return this.objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error when serializing object to byte[] by [com.fasterxml.jackson.databind.ObjectMapper].");
}
}
}
Empty file.
Empty file.
Empty file.
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<modules>
<module>kafka-plus-core</module>
<module>kafka-plus-engine</module>
<module>kafka-plus-jackson</module>
<module>kafka-plus-runtime</module>
</modules>

Expand Down Expand Up @@ -71,6 +72,7 @@
<maven-git-commit-id-plugin.version>4.9.10</maven-git-commit-id-plugin.version>

<kafka.version>3.7.0</kafka.version>
<jackson.version>2.14.2</jackson.version>

<io.github.photowey.project.url>https://github.com/photowey/kafka-plus</io.github.photowey.project.url>
</properties>
Expand All @@ -97,11 +99,28 @@
<artifactId>kafka-plus-engine</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus-jackson</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.github.photowey</groupId>
<artifactId>kafka-plus-runtime</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down

0 comments on commit f490621

Please sign in to comment.