Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the record filtering and transformation features (#1) #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@
build

# IntelliJ IDEA / GoLand project files
.idea/
.idea/

.DS_Store
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# kafka-sink-connector

## Env
- java 17
- gradle 8.9

## Configuration Properties
| Required | Name | Type | Description | Sample |
|----------|---------------------------|--------|----------------------------------------------------------------|---------------|
| O | topics | String | A list of Kafka topics that the sink connector watches. | my-topic |
| O | kafka.sink.topic | String | The Kafka topic name to which the sink connector writes. | relay-topic |
| O | kafka.sink.bootstrap | String | The Kafka bootstrap server to which the sink connector writes. | my-kafka:9092 |
| X | producer.max.request.size | Int | The maximum size of a request in bytes. | 20971520 |

## Build
```
$ ./gradlew clean build
$ tar -czvf kafka-sink-connector.tar.gz app/build/libs/app-all.jar


$ rm ./kafka-sink-connector.tar.gz; rm app/build/libs/app-all.jar;./gradlew clean build; tar -czvf kafka-sink-connector-dev.tar.gz app/build/libs/app-all.jar; rm /Users/kang/Desktop/workspace/labs/static/jars/kafka-sink-connector-dev.tar.gz; mv ./kafka-sink-connector-dev.tar.gz /Users/kang/Desktop/workspace/labs/static/jars/kafka-sink-connector-dev.tar.gz
```

# JAR (Java ARchive)
- JAR 파일은 자바 애플리케이션이나 라이브러리를 패키징하는 데 사용되는 파일 형식입니다. 여러 자바 클래스 파일, 메타데이터 및 리소스 파일을 포함할 수 있습니다.
- 장점:
Expand Down
135 changes: 92 additions & 43 deletions app/src/main/kotlin/org/example/SinkTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,40 @@ import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.example.config.SinkConnectorConfig
import java.util.*

class SinkTask : SinkTask() {

private lateinit var destTopic: String
private lateinit var connectorName: String
private lateinit var sinkTopic: String
private lateinit var producer: KafkaProducer<String, String>
private val objectMapper = ObjectMapper()
private val logger: Logger = LogManager.getLogger(SinkTask::class.java)

override fun start(props: Map<String, String>) {
// Task 초기화 작업
destTopic = props["dest.topic"] ?: throw ConfigException("Destination topic must be set")
val config: SinkConnectorConfig
connectorName = props["name"].orEmpty()
config = SinkConnectorConfig(props)

sinkTopic = config.getString(SinkConnectorConfig.SINK_TOPIC)

val producerProps = Properties()
producerProps["bootstrap.servers"] = props["bootstrap.servers"]
producerProps["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
producerProps["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
producerProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.getString(SinkConnectorConfig.SINK_BOOTSTRAP_SERVER)
producerProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
producerProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
producerProps[ProducerConfig.MAX_REQUEST_SIZE_CONFIG] = config.getInt(SinkConnectorConfig.PRODUCER_MAX_REQUEST_SIZE)

// NOTE: 커넥트 프로듀서 각각 고유해야한다. Multi Task 는 고려되지 않았다.
producerProps[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = String.format("kafka-sink-connector-%s-%s-%s", connectorName, config.getString(SinkConnectorConfig.SOURCE_TOPIC), config.getString(SinkConnectorConfig.SINK_TOPIC))

producer = KafkaProducer(producerProps)
producer.initTransactions()

/**
* NOTE: Exactly once 를 위한 설정
Expand Down Expand Up @@ -57,55 +67,75 @@ class SinkTask : SinkTask() {
* - 이를 통해 프로듀서가 정확히 한 번 전송을 보장할 수 있습니다.
* - 트랜잭션 ID를 설정하여 트랜잭션을 활성화합니다
*/
producerProps[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "TRANSACTIONAL_ID_CONFIG"

producer = KafkaProducer(producerProps)
producer.initTransactions()
// producerProps[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "TRANSACTIONAL_ID_CONFIG"
//
// producer = KafkaProducer(producerProps)
// producer.initTransactions()
}

override fun put(records: Collection<SinkRecord>) {

override fun put(records: Collection<SinkRecord>) {
producer.beginTransaction()
try {
// 데이터를 처리하여 다른 Kafka 토픽으로 전송
for (record in records) {
logger.info("Received record:\n" +
" topic: ${record.topic()}\n" +
" partition: ${record.kafkaPartition()}\n" +
" offset: ${record.kafkaOffset()}\n" +
" key: ${record.key()}\n" +
" value: ${convertStructToJson(record.value())}\n" +
" timestamp: ${record.timestamp()}\n"
)

val structValue = record.value() as? Struct
val afterStruct = structValue?.getStruct("after")
// if (afterStruct != null) {
// val eventType = afterStruct.getString("type")
// if (eventType == "KillEvent") {
// throw RuntimeException("Encountered a KillEvent")
// }
// }

val key = convertStructToJson(record.key() ?: "")
val value = convertStructToJson(record.value() ?: "")

producer.send(ProducerRecord(destTopic, key, value))
logger.info("Record sent to topic $destTopic: key=$key, value=$value")
val transformedRecord = transform(record)
if (transformedRecord == null) {
continue
}

val key = createSchemaPayloadJson(transformedRecord.key(), transformedRecord.keySchema())
val value = createSchemaPayloadJson(transformedRecord.value(), transformedRecord.valueSchema())
producer.send(ProducerRecord(sinkTopic, key, value))
}
producer.commitTransaction()
} catch (e: Exception) {
logger.error(e.message + " / " + connectorName, e)
producer.abortTransaction()
producer.close()
throw e;
throw e
}
}

// NOTE: 원본 메시지를 원하는 형태로 변환한다. Debezium, KafakEvent decorator 메시지 형태에 의존한다.
private fun transform(record: SinkRecord): SinkRecord? {
val valueStruct = record.value() as Struct
val afterStruct = valueStruct.getStruct("after")
val metadataString = afterStruct.getString("metadata")
if (metadataString == null) {
return null
}

val metadata = objectMapper.readTree(metadataString)
val prefix = metadata["prefix"]?.asText()

// NOTE: 1. EventType prefix 설정 (InvoiceConfirmEvent -> PaymentInvoiceConfirmEvent)
if (!prefix.isNullOrEmpty()) {
val type = afterStruct.getString("type")
afterStruct.put("type", String.format("%s%s", prefix, type))
}

return SinkRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
valueStruct,
record.kafkaOffset()
)
}

private fun createSchemaPayloadJson(data: Any, schema: Schema): String {
val schemaMap = convertSchemaToJson(schema)
val payload = convertDataToJson(data)
val resultMap = mapOf("schema" to schemaMap, "payload" to payload)
return objectMapper.writeValueAsString(resultMap)
}

private fun convertStructToJson(data: Any?): String {
private fun convertDataToJson(data: Any?): Any? {
return when (data) {
is Struct -> objectMapper.writeValueAsString(structToMap(data))
is String -> data
else -> data?.toString() ?: ""
is Struct -> structToMap(data)
else -> data
}
}

Expand All @@ -119,8 +149,27 @@ class SinkTask : SinkTask() {
return map
}

private fun convertSchemaToJson(schema: Schema): Map<String, Any?> {
val schemaMap = mutableMapOf<String, Any?>()
schemaMap["type"] = schema.type().name.lowercase()
schemaMap["name"] = schema.name()
schemaMap["version"] = schema.version()
schemaMap["parameters"] = schema.parameters()
schemaMap["default"] = schema.defaultValue()
schemaMap["optional"] = schema.isOptional
if (schema.type() == Schema.Type.STRUCT) {
val fields = schema.fields().map { field ->
val fieldMap = convertSchemaToJson(field.schema()).toMutableMap()
fieldMap["field"] = field.name()
fieldMap
}
schemaMap["fields"] = fields
}

return schemaMap.filterValues { it != null }
}

override fun stop() {
// Task 종료 작업
producer.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ class SinkConnectorConfig(props: Map<String, String>) : AbstractConfig(CONFIG, p
const val SINK_BOOTSTRAP_SERVER: String = "kafka.sink.bootstrap"
const val SINK_BOOTSTRAP_SERVER_DOC: String = "Define sink bootstrap"

const val PRODUCER_MAX_REQUEST_SIZE: String = "producer.max.request.size"
const val PRODUCER_MAX_REQUEST_SIZE_DEFAULT: Int = 1048576 // https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#max-request-size
const val PRODUCER_MAX_REQUEST_SIZE_DOC: String = "producer.max.request.size"

var CONFIG: ConfigDef = ConfigDef()
.define(SOURCE_TOPIC, ConfigDef.Type.STRING, "", Importance.HIGH, SOURCE_TOPIC_DOC)
.define(SINK_TOPIC, ConfigDef.Type.STRING, "", Importance.HIGH, SINK_TOPIC_DOC)
.define(SINK_BOOTSTRAP_SERVER, ConfigDef.Type.STRING, "", Importance.HIGH, SINK_BOOTSTRAP_SERVER_DOC) // timestamp parsing
.define(SINK_BOOTSTRAP_SERVER, ConfigDef.Type.STRING, "", Importance.HIGH, SINK_BOOTSTRAP_SERVER_DOC)
.define(PRODUCER_MAX_REQUEST_SIZE, ConfigDef.Type.INT, PRODUCER_MAX_REQUEST_SIZE_DEFAULT, Importance.LOW, PRODUCER_MAX_REQUEST_SIZE_DOC)
}
}