Skip to content

Commit

Permalink
[Fix][Engine] Fix flink engine issue (#516)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored Feb 15, 2025
1 parent dd31be1 commit 8cbeadd
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ default String getQuoteIdentifier() {
}

default String getTableExistsQuery(String table) {
return String.format("SELECT * FROM %s WHERE 1=0", table);
return String.format("SELECT 1 FROM %s WHERE 1=0", table);
}

default String getSchemaQuery(String table) {
return String.format("SELECT * FROM %s WHERE 1=0", table);
return String.format("SELECT 1 FROM %s WHERE 1=0", table);
}

default String getCountQuery(String table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class HiveParameterConverter extends JdbcParameterConverter {

@Override
protected String getUrl(Map<String, Object> parameter) {

StringBuilder address = new StringBuilder();
address.append("jdbc:hive2://");
Object port = parameter.get(PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,4 @@ protected String getSeparator() {
return ";";
}

@Override
public String getJdbcUrl() {
return super.getJdbcUrl();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import io.datavines.engine.flink.api.FlinkRuntimeEnvironment;

public interface FlinkStreamSink extends Component {

/**
* 输出数据流
*/
void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environment);

void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environment) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
String tableAlias = getTableAlias(metricInputParameter.get(DATABASE), metricInputParameter.get(SCHEMA), metricInputParameter.get(TABLE), "1");
connectorParameterMap.put(OUTPUT_TABLE, outputTable);
connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver());
connectorParameterMap.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());
table2OutputTable.put(table, outputTable);

metricInputParameter.put(TABLE, outputTable);
metricInputParameter.put(TABLE_ALIAS, tableAlias);
metricInputParameter.put(COLUMN, metricInputParameter.get(COLUMN));
metricInputParameter.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());
// metricInputParameter.put(ENGINE_TYPE, SPARK);

String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);

Expand Down Expand Up @@ -174,12 +174,12 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
String tableAlias = getTableAlias(metricInputParameter.get(DATABASE), metricInputParameter.get(SCHEMA), metricInputParameter.get(TABLE), "1");
connectorParameterMap.put(OUTPUT_TABLE, outputTable);
connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver());
connectorParameterMap.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());

metricInputParameter.put(TABLE, outputTable);
metricInputParameter.put(TABLE_ALIAS, tableAlias);
metricInputParameter.put(COLUMN, metricInputParameter.get(COLUMN));
metricInputParameter.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());
// metricInputParameter.put(ENGINE_TYPE, SPARK);

String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,11 @@
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-common</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package io.datavines.engine.flink.jdbc.sink;

import io.datavines.common.utils.StringUtils;
import io.datavines.common.utils.ThreadUtils;
import io.datavines.engine.common.utils.ParserUtils;
import io.datavines.engine.flink.api.entity.FLinkColumnInfo;
import io.datavines.engine.flink.jdbc.utils.FlinkTableUtils;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -30,9 +33,14 @@
import io.datavines.engine.flink.api.FlinkRuntimeEnvironment;
import io.datavines.engine.flink.api.stream.FlinkStreamSink;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static io.datavines.common.ConfigConstants.*;
Expand All @@ -46,7 +54,7 @@ public class JdbcSink implements FlinkStreamSink {
private final List<FLinkColumnInfo> columns = new ArrayList<>();

@Override
public void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environment) {
public void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environment) throws Exception{
String sql = config.getString(SQL).replace("\\n", " ").replaceAll("/", "");
Table table = environment.getTableEnv().sqlQuery(sql);
ResolvedSchema schema = table.getResolvedSchema();
Expand All @@ -56,9 +64,10 @@ public void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environme
columnInfo.setDataType(column.getDataType().getLogicalType().asSerializableString());
columns.add(columnInfo);
});

checkTableNotExistAndCreate();

String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(OUTPUT_TABLE), config.getString(TABLE), columns, config);
String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(DATABASE), config.getString(OUTPUT_TABLE), config.getString(TABLE), columns, config);
log.info("sink create table sql: {}", createTableSql);
environment.getTableEnv().executeSql(createTableSql);
table.executeInsert(config.getString(OUTPUT_TABLE));
Expand Down Expand Up @@ -101,8 +110,58 @@ public CheckResult checkConfig() {
public void prepare(RuntimeEnvironment env) throws Exception {
}

private void checkTableNotExistAndCreate() {
// Check if the table exists
// If not, create the table
private void checkTableNotExistAndCreate() throws Exception {
String jdbcUrl = config.getString(URL);
String user = config.getString(USER);
String password = config.getString(PASSWORD);
String query = String.format("SELECT * FROM %s WHERE 1=0", config.getString(DATABASE) + "." + config.getString(TABLE));

Properties properties = new Properties();
properties.setProperty(USER, user);
if (!StringUtils.isEmptyOrNullStr(password)) {
properties.setProperty(PASSWORD, ParserUtils.decode(password));
}

String[] url2Array = jdbcUrl.split("\\?");
String url = url2Array[0];
if (url2Array.length > 1) {
String[] keyArray = url2Array[1].split("&");
for (String prop : keyArray) {
String[] values = prop.split("=");
properties.setProperty(values[0], values[1]);
}
}

boolean tableExists = false;
Connection conn = DriverManager.getConnection(url, properties);
int retryTimes = 3;
while (retryTimes > 0 && !tableExists) {
if (conn != null) {
PreparedStatement statement = null;
try {
statement = conn.prepareStatement(query);
statement.setQueryTimeout(30000);
statement.execute();
tableExists = true;
} catch (SQLException e) {
log.error("tableExists error : ", e);
retryTimes--;
ThreadUtils.sleep(2000);
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException e){
log.error("close statement error : ", e);
}
}
}
}
}

if (!tableExists) {
String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(DATABASE), config.getString(TABLE), columns);
conn.prepareStatement(createTableSql).execute();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.datavines.engine.flink.jdbc.source;

import io.datavines.common.utils.StringUtils;
import io.datavines.engine.common.utils.ParserUtils;
import io.datavines.engine.flink.api.entity.FLinkColumnInfo;
import io.datavines.engine.flink.jdbc.utils.FlinkTableUtils;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -46,7 +47,7 @@ public class JdbcSource implements FlinkStreamSource {
@Override
public DataStream<Row> getData(FlinkRuntimeEnvironment environment) throws Exception {
getRowTypeInfo(config.getString(URL), config.getString(USER), config.getString(PASSWORD), "select * from " + config.getString(TABLE));
String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(OUTPUT_TABLE), config.getString(TABLE), columns, config);
String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(DATABASE), config.getString(OUTPUT_TABLE), config.getString(TABLE), columns, config);
log.info("source create table sql: {}", createTableSql);
environment.getTableEnv().executeSql(createTableSql);
return null;
Expand All @@ -56,7 +57,7 @@ private void getRowTypeInfo(String jdbcUrl, String user, String password, String
Properties properties = new Properties();
properties.setProperty(USER, user);
if (!StringUtils.isEmptyOrNullStr(password)) {
properties.setProperty(PASSWORD, password);
properties.setProperty(PASSWORD, ParserUtils.decode(password));
}

String[] url2Array = jdbcUrl.split("\\?");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.flink.jdbc.utils;

import java.util.regex.*;

public class DatabaseUrlReplacer {

public static String replaceDatabase(String url, String newDb) throws IllegalArgumentException {
if (url.startsWith("jdbc:mysql:")) {
return processMysql(url, newDb);
} else if (url.startsWith("jdbc:postgresql:")) {
return processPostgresql(url, newDb);
} else if (url.startsWith("jdbc:sqlserver:")) {
return processSqlServer(url, newDb);
} else if (url.startsWith("jdbc:oracle:")) {
return processOracle(url, newDb);
} else if (url.startsWith("jdbc:clickhouse:")) {
return processClickhouse(url, newDb);
} else if (url.startsWith("jdbc:trino:")) {
return processTrino(url, newDb);
} else if (url.startsWith("jdbc:presto:")) {
return processPresto(url, newDb);
} else if (url.startsWith("jdbc:hive2:")) {
return processHive(url, newDb);
} else if (url.startsWith("jdbc:dm:")) {
return processDm(url, newDb);
} else {
return processMysql(url, newDb);
}
}

private static String processMysql(String url, String newDb) {
Pattern pattern = Pattern.compile("^(jdbc:mysql://[^/]+)(/?[^?]*)?(\\?.*)?$");
Matcher matcher = pattern.matcher(url);
if (matcher.find()) {
String protocol = matcher.group(1);
String path = matcher.group(2) != null ? matcher.group(2) : "";
String query = matcher.group(3) != null ? matcher.group(3) : "";
path = "/" + newDb;
return protocol + path + query;
}
return url;
}

private static String processPostgresql(String url, String newDb) {
return processMysql(url, newDb);
}

private static String processSqlServer(String url, String newDb) {
String[] parts = url.split(";", 2);
String hostPart = parts[0];
String params = parts.length > 1 ? parts[1] : "";
String[] paramsArray = params.split(";");
StringBuilder newParams = new StringBuilder();
for (String param : paramsArray) {
if (param.contains("=")) {
String[] paramParts = param.split("=", 2);
if (paramParts[0].equalsIgnoreCase("databaseName")) {
newParams.append("databaseName=").append(newDb);
} else {
newParams.append(param);
}
}
}
return hostPart + ";" + newParams;
}

private static String processOracle(String url, String newDb) {
Pattern sidPattern = Pattern.compile("(jdbc:oracle:thin:@[^:]+:\\d+:)(\\w+)");
Matcher sidMatcher = sidPattern.matcher(url);
if (sidMatcher.find()) {
return sidMatcher.group(1) + newDb;
}
Pattern servicePattern = Pattern.compile("(jdbc:oracle:thin:@//[^/]+/)(\\w+)");
Matcher serviceMatcher = servicePattern.matcher(url);
if (serviceMatcher.find()) {
return serviceMatcher.group(1) + newDb;
}
return url;
}

private static String processClickhouse(String url, String newDb) {
Pattern pattern = Pattern.compile("(jdbc:clickhouse://[^/]+)(/[^?]*)?(\\?.*)?");
Matcher matcher = pattern.matcher(url);
if (matcher.find()) {
String protocol = matcher.group(1);
String path = matcher.group(2) != null ? matcher.group(2) : "";
String query = matcher.group(3) != null ? matcher.group(3) : "";
path = "/" + newDb;
query = query.replaceAll("([?&])database=[^&]*", "");
return protocol + path + query;
}
return url;
}

private static String processTrino(String url, String newDb) {
return url.replaceAll("(jdbc:trino://[^/]+(/[^/]+)*/)([^/?#]+)", "$1" + newDb);
}

private static String processPresto(String url, String newDb) {
return url.replaceAll("(jdbc:presto://[^/]+(/[^/]+)*/)([^/?#]+)", "$1" + newDb);
}

private static String processHive(String url, String newDb) {
return url.replaceAll("(jdbc:hive2://[^/]+/)([^/?]*)", "$1" + newDb);
}

private static String processDm(String url, String newDb) {
String[] parts = url.split(";", 2);
String hostPart = parts[0];
String params = parts.length > 1 ? parts[1] : "";
String[] paramsArray = params.split("\\?");
StringBuilder newParams = new StringBuilder();
for (String param : paramsArray) {
if (param.contains("=")) {
String[] paramParts = param.split("=", 2);
if (paramParts[0].equalsIgnoreCase("schema")) {
newParams.append("schema=").append(newDb);
} else {
newParams.append(param);
}
}
}
return hostPart + ";" + newParams;
}
}
Loading

0 comments on commit 8cbeadd

Please sign in to comment.