diff --git a/.gitignore b/.gitignore index 665db60..bebaff7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ build # IntelliJ IDEA / GoLand project files -.idea/ \ No newline at end of file +.idea/ + +.DS_Store \ No newline at end of file diff --git a/README.md b/README.md index c92d3b8..1fd0e14 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,26 @@ # kafka-sink-connector +## Env +- java 17 +- gradle 8.9 + +## Configuration Properties +| Required | Name | Type | Description | Sample | +|----------|---------------------------|--------|----------------------------------------------------------------|---------------| +| O | topics | String | A list of Kafka topics that the sink connector watches. | my-topic | +| O | kafka.sink.topic | String | The Kafka topic name to which the sink connector writes. | relay-topic | +| O | kafka.sink.bootstrap | String | The Kafka bootstrap server to which the sink connector writes. | my-kafka:9092 | +| X | producer.max.request.size | Int | The maximum size of a request in bytes. | 20971520 | + +## Build +``` +$ ./gradlew clean build +$ tar -czvf kafka-sink-connector.tar.gz app/build/libs/app-all.jar + + +$ rm ./kafka-sink-connector.tar.gz; rm app/build/libs/app-all.jar;./gradlew clean build; tar -czvf kafka-sink-connector-dev.tar.gz app/build/libs/app-all.jar; rm /Users/kang/Desktop/workspace/labs/static/jars/kafka-sink-connector-dev.tar.gz; mv ./kafka-sink-connector-dev.tar.gz /Users/kang/Desktop/workspace/labs/static/jars/kafka-sink-connector-dev.tar.gz +``` + # JAR (Java ARchive) - JAR 파일은 자바 애플리케이션이나 라이브러리를 패키징하는 데 사용되는 파일 형식입니다. 여러 자바 클래스 파일, 메타데이터 및 리소스 파일을 포함할 수 있습니다. - 장점: diff --git a/app/src/main/kotlin/org/example/SinkTask.kt b/app/src/main/kotlin/org/example/SinkTask.kt index 7ed96bf..e98343b 100644 --- a/app/src/main/kotlin/org/example/SinkTask.kt +++ b/app/src/main/kotlin/org/example/SinkTask.kt @@ -4,30 +4,40 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.sink.SinkRecord import org.apache.kafka.connect.sink.SinkTask import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger +import org.example.config.SinkConnectorConfig import java.util.* class SinkTask : SinkTask() { - - private lateinit var destTopic: String + private lateinit var connectorName: String + private lateinit var sinkTopic: String private lateinit var producer: KafkaProducer private val objectMapper = ObjectMapper() private val logger: Logger = LogManager.getLogger(SinkTask::class.java) override fun start(props: Map) { - // Task 초기화 작업 - destTopic = props["dest.topic"] ?: throw ConfigException("Destination topic must be set") + val config: SinkConnectorConfig + connectorName = props["name"].orEmpty() + config = SinkConnectorConfig(props) + + sinkTopic = config.getString(SinkConnectorConfig.SINK_TOPIC) val producerProps = Properties() - producerProps["bootstrap.servers"] = props["bootstrap.servers"] - producerProps["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" - producerProps["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" + producerProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.getString(SinkConnectorConfig.SINK_BOOTSTRAP_SERVER) + producerProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" + producerProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer" + producerProps[ProducerConfig.MAX_REQUEST_SIZE_CONFIG] = config.getInt(SinkConnectorConfig.PRODUCER_MAX_REQUEST_SIZE) + + // NOTE: 커넥트 프로듀서 각각 고유해야한다. Multi Task 는 고려되지 않았다. + producerProps[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = String.format("kafka-sink-connector-%s-%s-%s", connectorName, config.getString(SinkConnectorConfig.SOURCE_TOPIC), config.getString(SinkConnectorConfig.SINK_TOPIC)) + producer = KafkaProducer(producerProps) + producer.initTransactions() /** * NOTE: Exactly once 를 위한 설정 @@ -57,55 +67,75 @@ class SinkTask : SinkTask() { * - 이를 통해 프로듀서가 정확히 한 번 전송을 보장할 수 있습니다. * - 트랜잭션 ID를 설정하여 트랜잭션을 활성화합니다 */ - producerProps[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "TRANSACTIONAL_ID_CONFIG" - - producer = KafkaProducer(producerProps) - producer.initTransactions() +// producerProps[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "TRANSACTIONAL_ID_CONFIG" +// +// producer = KafkaProducer(producerProps) +// producer.initTransactions() } - override fun put(records: Collection) { + override fun put(records: Collection) { producer.beginTransaction() try { - // 데이터를 처리하여 다른 Kafka 토픽으로 전송 for (record in records) { - logger.info("Received record:\n" + - " topic: ${record.topic()}\n" + - " partition: ${record.kafkaPartition()}\n" + - " offset: ${record.kafkaOffset()}\n" + - " key: ${record.key()}\n" + - " value: ${convertStructToJson(record.value())}\n" + - " timestamp: ${record.timestamp()}\n" - ) - - val structValue = record.value() as? Struct - val afterStruct = structValue?.getStruct("after") -// if (afterStruct != null) { -// val eventType = afterStruct.getString("type") -// if (eventType == "KillEvent") { -// throw RuntimeException("Encountered a KillEvent") -// } -// } - - val key = convertStructToJson(record.key() ?: "") - val value = convertStructToJson(record.value() ?: "") - - producer.send(ProducerRecord(destTopic, key, value)) - logger.info("Record sent to topic $destTopic: key=$key, value=$value") + val transformedRecord = transform(record) + if (transformedRecord == null) { + continue + } + + val key = createSchemaPayloadJson(transformedRecord.key(), transformedRecord.keySchema()) + val value = createSchemaPayloadJson(transformedRecord.value(), transformedRecord.valueSchema()) + producer.send(ProducerRecord(sinkTopic, key, value)) } producer.commitTransaction() } catch (e: Exception) { + logger.error(e.message + " / " + connectorName, e) producer.abortTransaction() producer.close() - throw e; + throw e + } + } + + // NOTE: 원본 메시지를 원하는 형태로 변환한다. Debezium, KafakEvent decorator 메시지 형태에 의존한다. + private fun transform(record: SinkRecord): SinkRecord? { + val valueStruct = record.value() as Struct + val afterStruct = valueStruct.getStruct("after") + val metadataString = afterStruct.getString("metadata") + if (metadataString == null) { + return null + } + + val metadata = objectMapper.readTree(metadataString) + val prefix = metadata["prefix"]?.asText() + + // NOTE: 1. EventType prefix 설정 (InvoiceConfirmEvent -> PaymentInvoiceConfirmEvent) + if (!prefix.isNullOrEmpty()) { + val type = afterStruct.getString("type") + afterStruct.put("type", String.format("%s%s", prefix, type)) } + + return SinkRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + valueStruct, + record.kafkaOffset() + ) + } + + private fun createSchemaPayloadJson(data: Any, schema: Schema): String { + val schemaMap = convertSchemaToJson(schema) + val payload = convertDataToJson(data) + val resultMap = mapOf("schema" to schemaMap, "payload" to payload) + return objectMapper.writeValueAsString(resultMap) } - private fun convertStructToJson(data: Any?): String { + private fun convertDataToJson(data: Any?): Any? { return when (data) { - is Struct -> objectMapper.writeValueAsString(structToMap(data)) - is String -> data - else -> data?.toString() ?: "" + is Struct -> structToMap(data) + else -> data } } @@ -119,8 +149,27 @@ class SinkTask : SinkTask() { return map } + private fun convertSchemaToJson(schema: Schema): Map { + val schemaMap = mutableMapOf() + schemaMap["type"] = schema.type().name.lowercase() + schemaMap["name"] = schema.name() + schemaMap["version"] = schema.version() + schemaMap["parameters"] = schema.parameters() + schemaMap["default"] = schema.defaultValue() + schemaMap["optional"] = schema.isOptional + if (schema.type() == Schema.Type.STRUCT) { + val fields = schema.fields().map { field -> + val fieldMap = convertSchemaToJson(field.schema()).toMutableMap() + fieldMap["field"] = field.name() + fieldMap + } + schemaMap["fields"] = fields + } + + return schemaMap.filterValues { it != null } + } + override fun stop() { - // Task 종료 작업 producer.close() } diff --git a/app/src/main/kotlin/org/example/config/SinkConnectorConfig.kt b/app/src/main/kotlin/org/example/config/SinkConnectorConfig.kt index b5cb212..ad27703 100644 --- a/app/src/main/kotlin/org/example/config/SinkConnectorConfig.kt +++ b/app/src/main/kotlin/org/example/config/SinkConnectorConfig.kt @@ -15,9 +15,14 @@ class SinkConnectorConfig(props: Map) : AbstractConfig(CONFIG, p const val SINK_BOOTSTRAP_SERVER: String = "kafka.sink.bootstrap" const val SINK_BOOTSTRAP_SERVER_DOC: String = "Define sink bootstrap" + const val PRODUCER_MAX_REQUEST_SIZE: String = "producer.max.request.size" + const val PRODUCER_MAX_REQUEST_SIZE_DEFAULT: Int = 1048576 // https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#max-request-size + const val PRODUCER_MAX_REQUEST_SIZE_DOC: String = "producer.max.request.size" + var CONFIG: ConfigDef = ConfigDef() .define(SOURCE_TOPIC, ConfigDef.Type.STRING, "", Importance.HIGH, SOURCE_TOPIC_DOC) .define(SINK_TOPIC, ConfigDef.Type.STRING, "", Importance.HIGH, SINK_TOPIC_DOC) - .define(SINK_BOOTSTRAP_SERVER, ConfigDef.Type.STRING, "", Importance.HIGH, SINK_BOOTSTRAP_SERVER_DOC) // timestamp parsing + .define(SINK_BOOTSTRAP_SERVER, ConfigDef.Type.STRING, "", Importance.HIGH, SINK_BOOTSTRAP_SERVER_DOC) + .define(PRODUCER_MAX_REQUEST_SIZE, ConfigDef.Type.INT, PRODUCER_MAX_REQUEST_SIZE_DEFAULT, Importance.LOW, PRODUCER_MAX_REQUEST_SIZE_DOC) } } \ No newline at end of file