Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Engine] Fix flink engine issue #516

Merged
merged 2 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ default String getQuoteIdentifier() {
}

default String getTableExistsQuery(String table) {
return String.format("SELECT * FROM %s WHERE 1=0", table);
return String.format("SELECT 1 FROM %s WHERE 1=0", table);
}

default String getSchemaQuery(String table) {
return String.format("SELECT * FROM %s WHERE 1=0", table);
return String.format("SELECT 1 FROM %s WHERE 1=0", table);
}

default String getCountQuery(String table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class HiveParameterConverter extends JdbcParameterConverter {

@Override
protected String getUrl(Map<String, Object> parameter) {

StringBuilder address = new StringBuilder();
address.append("jdbc:hive2://");
Object port = parameter.get(PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,4 @@ protected String getSeparator() {
return ";";
}

@Override
public String getJdbcUrl() {
return super.getJdbcUrl();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import io.datavines.engine.flink.api.FlinkRuntimeEnvironment;

public interface FlinkStreamSink extends Component {

/**
* 输出数据流
*/
void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environment);

void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environment) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
String tableAlias = getTableAlias(metricInputParameter.get(DATABASE), metricInputParameter.get(SCHEMA), metricInputParameter.get(TABLE), "1");
connectorParameterMap.put(OUTPUT_TABLE, outputTable);
connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver());
connectorParameterMap.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());
table2OutputTable.put(table, outputTable);

metricInputParameter.put(TABLE, outputTable);
metricInputParameter.put(TABLE_ALIAS, tableAlias);
metricInputParameter.put(COLUMN, metricInputParameter.get(COLUMN));
metricInputParameter.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());
// metricInputParameter.put(ENGINE_TYPE, SPARK);

String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);

Expand Down Expand Up @@ -174,12 +174,12 @@ protected List<SourceConfig> getSourceConfigs() throws DataVinesException {
String tableAlias = getTableAlias(metricInputParameter.get(DATABASE), metricInputParameter.get(SCHEMA), metricInputParameter.get(TABLE), "1");
connectorParameterMap.put(OUTPUT_TABLE, outputTable);
connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver());
connectorParameterMap.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());

metricInputParameter.put(TABLE, outputTable);
metricInputParameter.put(TABLE_ALIAS, tableAlias);
metricInputParameter.put(COLUMN, metricInputParameter.get(COLUMN));
metricInputParameter.put(SRC_CONNECTOR_TYPE, connectorParameter.getType());
// metricInputParameter.put(ENGINE_TYPE, SPARK);

String connectorUUID = connectorFactory.getConnectorParameterConverter().getConnectorUUID(connectorParameterMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,11 @@
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-common</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package io.datavines.engine.flink.jdbc.sink;

import io.datavines.common.utils.StringUtils;
import io.datavines.common.utils.ThreadUtils;
import io.datavines.engine.common.utils.ParserUtils;
import io.datavines.engine.flink.api.entity.FLinkColumnInfo;
import io.datavines.engine.flink.jdbc.utils.FlinkTableUtils;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -30,9 +33,14 @@
import io.datavines.engine.flink.api.FlinkRuntimeEnvironment;
import io.datavines.engine.flink.api.stream.FlinkStreamSink;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static io.datavines.common.ConfigConstants.*;
Expand All @@ -46,7 +54,7 @@ public class JdbcSink implements FlinkStreamSink {
private final List<FLinkColumnInfo> columns = new ArrayList<>();

@Override
public void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environment) {
public void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environment) throws Exception{
String sql = config.getString(SQL).replace("\\n", " ").replaceAll("/", "");
Table table = environment.getTableEnv().sqlQuery(sql);
ResolvedSchema schema = table.getResolvedSchema();
Expand All @@ -56,9 +64,10 @@ public void output(DataStream<Row> dataStream, FlinkRuntimeEnvironment environme
columnInfo.setDataType(column.getDataType().getLogicalType().asSerializableString());
columns.add(columnInfo);
});

checkTableNotExistAndCreate();

String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(OUTPUT_TABLE), config.getString(TABLE), columns, config);
String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(DATABASE), config.getString(OUTPUT_TABLE), config.getString(TABLE), columns, config);
log.info("sink create table sql: {}", createTableSql);
environment.getTableEnv().executeSql(createTableSql);
table.executeInsert(config.getString(OUTPUT_TABLE));
Expand Down Expand Up @@ -101,8 +110,58 @@ public CheckResult checkConfig() {
public void prepare(RuntimeEnvironment env) throws Exception {
}

private void checkTableNotExistAndCreate() {
// Check if the table exists
// If not, create the table
private void checkTableNotExistAndCreate() throws Exception {
String jdbcUrl = config.getString(URL);
String user = config.getString(USER);
String password = config.getString(PASSWORD);
String query = String.format("SELECT * FROM %s WHERE 1=0", config.getString(DATABASE) + "." + config.getString(TABLE));

Properties properties = new Properties();
properties.setProperty(USER, user);
if (!StringUtils.isEmptyOrNullStr(password)) {
properties.setProperty(PASSWORD, ParserUtils.decode(password));
}

String[] url2Array = jdbcUrl.split("\\?");
String url = url2Array[0];
if (url2Array.length > 1) {
String[] keyArray = url2Array[1].split("&");
for (String prop : keyArray) {
String[] values = prop.split("=");
properties.setProperty(values[0], values[1]);
}
}

boolean tableExists = false;
Connection conn = DriverManager.getConnection(url, properties);
int retryTimes = 3;
while (retryTimes > 0 && !tableExists) {
if (conn != null) {
PreparedStatement statement = null;
try {
statement = conn.prepareStatement(query);
statement.setQueryTimeout(30000);
statement.execute();
tableExists = true;
} catch (SQLException e) {
log.error("tableExists error : ", e);
retryTimes--;
ThreadUtils.sleep(2000);
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException e){
log.error("close statement error : ", e);
}
}
}
}
}

if (!tableExists) {
String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(DATABASE), config.getString(TABLE), columns);
conn.prepareStatement(createTableSql).execute();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.datavines.engine.flink.jdbc.source;

import io.datavines.common.utils.StringUtils;
import io.datavines.engine.common.utils.ParserUtils;
import io.datavines.engine.flink.api.entity.FLinkColumnInfo;
import io.datavines.engine.flink.jdbc.utils.FlinkTableUtils;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -46,7 +47,7 @@ public class JdbcSource implements FlinkStreamSource {
@Override
public DataStream<Row> getData(FlinkRuntimeEnvironment environment) throws Exception {
getRowTypeInfo(config.getString(URL), config.getString(USER), config.getString(PASSWORD), "select * from " + config.getString(TABLE));
String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(OUTPUT_TABLE), config.getString(TABLE), columns, config);
String createTableSql = FlinkTableUtils.generateCreateTableStatement(config.getString(DATABASE), config.getString(OUTPUT_TABLE), config.getString(TABLE), columns, config);
log.info("source create table sql: {}", createTableSql);
environment.getTableEnv().executeSql(createTableSql);
return null;
Expand All @@ -56,7 +57,7 @@ private void getRowTypeInfo(String jdbcUrl, String user, String password, String
Properties properties = new Properties();
properties.setProperty(USER, user);
if (!StringUtils.isEmptyOrNullStr(password)) {
properties.setProperty(PASSWORD, password);
properties.setProperty(PASSWORD, ParserUtils.decode(password));
}

String[] url2Array = jdbcUrl.split("\\?");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.flink.jdbc.utils;

import java.util.regex.*;

public class DatabaseUrlReplacer {

public static String replaceDatabase(String url, String newDb) throws IllegalArgumentException {
if (url.startsWith("jdbc:mysql:")) {
return processMysql(url, newDb);
} else if (url.startsWith("jdbc:postgresql:")) {
return processPostgresql(url, newDb);
} else if (url.startsWith("jdbc:sqlserver:")) {
return processSqlServer(url, newDb);
} else if (url.startsWith("jdbc:oracle:")) {
return processOracle(url, newDb);
} else if (url.startsWith("jdbc:clickhouse:")) {
return processClickhouse(url, newDb);
} else if (url.startsWith("jdbc:trino:")) {
return processTrino(url, newDb);
} else if (url.startsWith("jdbc:presto:")) {
return processPresto(url, newDb);
} else if (url.startsWith("jdbc:hive2:")) {
return processHive(url, newDb);
} else if (url.startsWith("jdbc:dm:")) {
return processDm(url, newDb);
} else {
return processMysql(url, newDb);
}
}

private static String processMysql(String url, String newDb) {
Pattern pattern = Pattern.compile("^(jdbc:mysql://[^/]+)(/?[^?]*)?(\\?.*)?$");
Matcher matcher = pattern.matcher(url);
if (matcher.find()) {
String protocol = matcher.group(1);
String path = matcher.group(2) != null ? matcher.group(2) : "";
String query = matcher.group(3) != null ? matcher.group(3) : "";
path = "/" + newDb;
return protocol + path + query;
}
return url;
}

private static String processPostgresql(String url, String newDb) {
return processMysql(url, newDb);
}

private static String processSqlServer(String url, String newDb) {
String[] parts = url.split(";", 2);
String hostPart = parts[0];
String params = parts.length > 1 ? parts[1] : "";
String[] paramsArray = params.split(";");
StringBuilder newParams = new StringBuilder();
for (String param : paramsArray) {
if (param.contains("=")) {
String[] paramParts = param.split("=", 2);
if (paramParts[0].equalsIgnoreCase("databaseName")) {
newParams.append("databaseName=").append(newDb);
} else {
newParams.append(param);
}
}
}
return hostPart + ";" + newParams;
}

private static String processOracle(String url, String newDb) {
Pattern sidPattern = Pattern.compile("(jdbc:oracle:thin:@[^:]+:\\d+:)(\\w+)");
Matcher sidMatcher = sidPattern.matcher(url);
if (sidMatcher.find()) {
return sidMatcher.group(1) + newDb;
}
Pattern servicePattern = Pattern.compile("(jdbc:oracle:thin:@//[^/]+/)(\\w+)");
Matcher serviceMatcher = servicePattern.matcher(url);
if (serviceMatcher.find()) {
return serviceMatcher.group(1) + newDb;
}
return url;
}

private static String processClickhouse(String url, String newDb) {
Pattern pattern = Pattern.compile("(jdbc:clickhouse://[^/]+)(/[^?]*)?(\\?.*)?");
Matcher matcher = pattern.matcher(url);
if (matcher.find()) {
String protocol = matcher.group(1);
String path = matcher.group(2) != null ? matcher.group(2) : "";
String query = matcher.group(3) != null ? matcher.group(3) : "";
path = "/" + newDb;
query = query.replaceAll("([?&])database=[^&]*", "");
return protocol + path + query;
}
return url;
}

private static String processTrino(String url, String newDb) {
return url.replaceAll("(jdbc:trino://[^/]+(/[^/]+)*/)([^/?#]+)", "$1" + newDb);
}

private static String processPresto(String url, String newDb) {
return url.replaceAll("(jdbc:presto://[^/]+(/[^/]+)*/)([^/?#]+)", "$1" + newDb);
}

private static String processHive(String url, String newDb) {
return url.replaceAll("(jdbc:hive2://[^/]+/)([^/?]*)", "$1" + newDb);
}

private static String processDm(String url, String newDb) {
String[] parts = url.split(";", 2);
String hostPart = parts[0];
String params = parts.length > 1 ? parts[1] : "";
String[] paramsArray = params.split("\\?");
StringBuilder newParams = new StringBuilder();
for (String param : paramsArray) {
if (param.contains("=")) {
String[] paramParts = param.split("=", 2);
if (paramParts[0].equalsIgnoreCase("schema")) {
newParams.append("schema=").append(newDb);
} else {
newParams.append(param);
}
}
}
return hostPart + ";" + newParams;
}
}
Loading
Loading