From a2cd1a23b59a0639cc6e0c1da5c161a95b37076c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=A4=80=EC=98=81?= <96804462+junyoungcross@users.noreply.github.com> Date: Thu, 12 Dec 2024 16:49:25 +0900 Subject: [PATCH] Fix: modify createKafkaSink (#87) * Fix: modify createKafkaSink * Fix: remove annotation --- .../chainbase/manuscript/ETLProcessor.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/flink/runner/src/main/java/com/chainbase/manuscript/ETLProcessor.java b/flink/runner/src/main/java/com/chainbase/manuscript/ETLProcessor.java index bb727a5..4f6fb9d 100644 --- a/flink/runner/src/main/java/com/chainbase/manuscript/ETLProcessor.java +++ b/flink/runner/src/main/java/com/chainbase/manuscript/ETLProcessor.java @@ -329,26 +329,29 @@ private void createKafkaSink(Map sink) { logger.info("Creating Kafka sink..."); String flinkSchema = getSchemaFromTransform(sink.get("from").toString()); String sql = String.format( - "CREATE TABLE %s (%s) WITH (" + - " 'connector' = 'kafka'," + - " 'topoc' = '%s'," + - " 'properties.bootstrap.servers' = '%s'," + - " 'properties.security.protocol' = 'SASL_SSL'," + - " 'properties.ssl.truststore.location' = '%s'," + - " 'properties.ssl.truststore.password' = '%s'," + - " 'properties.sasl.mechanism' = 'PLAIN'," + - " 'properties.ssl.endpoint.identification.algorithm' = ''," + - " 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";'," - + - ")", - sink.get("name"), sink.get("kafka_servers"), - sink.get("truststore_location"), sink.get("truststore_password"), - ((Map) sink.get("config")).get("username"), - ((Map) sink.get("config")).get("password") + "CREATE TABLE %s (%s) WITH (" + + " 'connector' = 'kafka'," + + " 'topic' = '%s'," + + " 'properties.bootstrap.servers' = '%s'," + + " 'properties.security.protocol' = 'SASL_SSL'," + + " 'properties.ssl.truststore.location' = '%s'," + + " 'properties.ssl.truststore.password' = '%s'," + + " 'properties.sasl.mechanism' = 'PLAIN'," + + " 'properties.ssl.endpoint.identification.algorithm' = ''," + + " 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\"'" + + ")", + sink.get("name"), flinkSchema, + sink.get("topic"), + sink.get("kafka_servers"), + sink.get("truststore_location"), + sink.get("truststore_password"), + ((Map) sink.get("config")).get("username"), + ((Map) sink.get("config")).get("password") ); - logger.info("Executing SQL for StarRocks sink: {}", sql); + + logger.info("Executing SQL for Kafka sink: {}", sql); tEnv.executeSql(sql); - logger.info("StarRocks sink created successfully."); + logger.info("Kafka sink created successfully."); } private void createPostgresSink(Map sink) {