Skip to content

Commit

Permalink
Fix: modify createKafkaSink (#87)
Browse files Browse the repository at this point in the history
* Fix: modify createKafkaSink

* Fix: remove annotation
  • Loading branch information
junyoungcross authored Dec 12, 2024
1 parent d1bc0a9 commit a2cd1a2
Showing 1 changed file with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,26 +329,29 @@ private void createKafkaSink(Map<String, Object> sink) {
logger.info("Creating Kafka sink...");
String flinkSchema = getSchemaFromTransform(sink.get("from").toString());
String sql = String.format(
"CREATE TABLE %s (%s) WITH (" +
" 'connector' = 'kafka'," +
" 'topoc' = '%s'," +
" 'properties.bootstrap.servers' = '%s'," +
" 'properties.security.protocol' = 'SASL_SSL'," +
" 'properties.ssl.truststore.location' = '%s'," +
" 'properties.ssl.truststore.password' = '%s'," +
" 'properties.sasl.mechanism' = 'PLAIN'," +
" 'properties.ssl.endpoint.identification.algorithm' = ''," +
" 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";',"
+
")",
sink.get("name"), sink.get("kafka_servers"),
sink.get("truststore_location"), sink.get("truststore_password"),
((Map<String, Object>) sink.get("config")).get("username"),
((Map<String, Object>) sink.get("config")).get("password")
"CREATE TABLE %s (%s) WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = '%s'," +
" 'properties.bootstrap.servers' = '%s'," +
" 'properties.security.protocol' = 'SASL_SSL'," +
" 'properties.ssl.truststore.location' = '%s'," +
" 'properties.ssl.truststore.password' = '%s'," +
" 'properties.sasl.mechanism' = 'PLAIN'," +
" 'properties.ssl.endpoint.identification.algorithm' = ''," +
" 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\"'" +
")",
sink.get("name"), flinkSchema,
sink.get("topic"),
sink.get("kafka_servers"),
sink.get("truststore_location"),
sink.get("truststore_password"),
((Map<String, Object>) sink.get("config")).get("username"),
((Map<String, Object>) sink.get("config")).get("password")
);
logger.info("Executing SQL for StarRocks sink: {}", sql);

logger.info("Executing SQL for Kafka sink: {}", sql);
tEnv.executeSql(sql);
logger.info("StarRocks sink created successfully.");
logger.info("Kafka sink created successfully.");
}

private void createPostgresSink(Map<String, Object> sink) {
Expand Down

0 comments on commit a2cd1a2

Please sign in to comment.