Skip to content

Commit

Permalink
[Refactor][Engine] Refactor flink engine (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored Feb 9, 2025
1 parent 46c31e9 commit fdb9d32
Show file tree
Hide file tree
Showing 91 changed files with 2,000 additions and 1,509 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Base64;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -46,6 +46,8 @@ public static String getMd5(String src, boolean isUpper) {

if (isUpper) {
md5 = md5.toUpperCase();
} else {
md5 = md5.toLowerCase();
}

return md5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ default String quoteIdentifier(String entity) {
return entity;
}

default String getQuoteIdentifier() {
return "";
}

default String getTableExistsQuery(String table) {
return String.format("SELECT * FROM %s WHERE 1=0", table);
}
Expand All @@ -121,9 +125,8 @@ default String getCreateTableAsSelectStatementFromSql(String srcTable, String ta

default String getCreateTableStatement(String table, List<StructField> fields, TypeConverter typeConverter) {
if (CollectionUtils.isNotEmpty(fields)) {
String columns = fields.stream().map(field -> {
return quoteIdentifier(field.getName()) + " " + typeConverter.convertToOriginType(field.getDataType());
}).collect(Collectors.joining(","));
String columns = fields.stream().map(field -> quoteIdentifier(field.getName()) + " " + typeConverter.convertToOriginType(field.getDataType()))
.collect(Collectors.joining(","));

return String.format("CREATE TABLE IF NOT EXISTS %s (%s)", table, columns);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.plugin.utils;
package io.datavines.connector.api.utils;

import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.Dialect;
import io.datavines.connector.api.TypeConverter;
import io.datavines.connector.api.entity.JdbcOptions;
Expand All @@ -40,6 +39,7 @@ public static boolean tableExists(Connection connection, JdbcOptions options, Di
statement.setQueryTimeout(options.getQueryTimeout());
statement.execute();
} catch (SQLException e) {
log.error("tableExists error : ", e);
return false;
} finally {
if (statement != null) {
Expand Down Expand Up @@ -67,13 +67,9 @@ public static String getCreateTableStatement(String table, List<StructField> fie

public static String getInsertStatement(String table, List<StructField> fields, Dialect dialect) {
if (CollectionUtils.isNotEmpty(fields)) {
String columns = fields.stream().map(field -> {
return dialect.quoteIdentifier(field.getName());
}).collect(Collectors.joining(","));
String columns = fields.stream().map(field -> dialect.quoteIdentifier(field.getName())).collect(Collectors.joining(","));

String placeholders = fields.stream().map(field -> {
return "?";
}).collect(Collectors.joining(","));
String placeholders = fields.stream().map(field -> "?").collect(Collectors.joining(","));

return String.format("INSERT INTO %s (%s) VALUES (%s)", table, columns, placeholders);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.plugin.utils;
package io.datavines.connector.api.utils;

import io.datavines.common.exception.DataVinesException;
import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.entity.QueryColumn;
import io.datavines.connector.api.entity.ResultList;
import io.datavines.connector.api.entity.ResultListWithColumns;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.Alias;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Table;
import net.sf.jsqlparser.statement.select.*;
import net.sf.jsqlparser.util.TablesNamesFinder;
import org.apache.commons.collections4.CollectionUtils;

import java.sql.*;
import java.util.*;

import static io.datavines.common.CommonConstants.DOT;
import static org.apache.commons.lang3.StringUtils.EMPTY;

@Slf4j
Expand Down Expand Up @@ -163,84 +159,6 @@ private static String getColumnLabel(Set<String> columnPrefixes, String columnLa
return columnLabel;
}

public static Set<String> getQueryFromsAndJoins(String sql) {
Set<String> columnPrefixes = new HashSet<>();
try {
net.sf.jsqlparser.statement.Statement parse = CCJSqlParserUtil.parse(sql);
Select select = (Select) parse;
Select selectBody = select.getSelectBody();
if (selectBody instanceof PlainSelect) {
PlainSelect plainSelect = (PlainSelect) selectBody;
columnPrefixExtractor(columnPrefixes, plainSelect);
}

if (selectBody instanceof SetOperationList) {
SetOperationList setOperationList = (SetOperationList) selectBody;
List<Select> selects = setOperationList.getSelects();
for (Select optSelectBody : selects) {
PlainSelect plainSelect = (PlainSelect) optSelectBody;
columnPrefixExtractor(columnPrefixes, plainSelect);
}
}

} catch (JSQLParserException e) {
log.error("get column error: ", e);
}

return columnPrefixes;
}

private static void columnPrefixExtractor(Set<String> columnPrefixes, PlainSelect plainSelect) {
getFromItemName(columnPrefixes, plainSelect.getFromItem());
List<Join> joins = plainSelect.getJoins();
if (!CollectionUtils.isEmpty(joins)) {
joins.forEach(join -> getFromItemName(columnPrefixes, join.getRightItem()));
}
}

private static void getFromItemName(Set<String> columnPrefixes, FromItem fromItem) {
if (fromItem == null) {
return;
}
Alias alias = fromItem.getAlias();
if (alias != null) {
if (alias.isUseAs()) {
columnPrefixes.add(alias.getName().trim() + DOT);
} else {
columnPrefixes.add(alias.toString().trim() + DOT);
}
} else {
fromItem.accept(getFromItemTableName(columnPrefixes));
}
}

private static FromItemVisitor getFromItemTableName(Set<String> set) {
return new FromItemVisitor() {
@Override
public void visit(Table tableName) {
set.add(tableName.getName() + DOT);
}

@Override
public void visit(ParenthesedSelect parenthesedSelect) {

}

@Override
public void visit(LateralSubSelect lateralSubSelect) {
}

@Override
public void visit(TableFunction tableFunction) {
}

@Override
public void visit(ParenthesedFromItem parenthesedFromItem) {

}
};
}

public static void dropView(String viewName, Connection connection) {
try (Statement statement = connection.createStatement()){
if (!StringUtils.isEmptyOrNullStr(viewName)) {
Expand Down Expand Up @@ -290,4 +208,15 @@ public static void closeConnection(Connection connection) {
}
}
}

public static List<String> extractTablesFromSelect(String sql) {
List<String> tables;
try {
// 解析 SQL 查询语句
tables = new ArrayList<>(TablesNamesFinder.findTables(sql));
} catch (Exception e) {
throw new DataVinesException("extract tables from select error", e);
}
return tables;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-connector-flink</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public boolean invalidateItemCanOutput() {
public String quoteIdentifier(String entity) {
return "\"" + entity + "\"";
}

@Override
public String getQuoteIdentifier() {
return "\"";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.datavines</groupId>
<artifactId>datavines-connector-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>

<artifactId>datavines-connector-flink</artifactId>

<dependencies>
<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-connector-jdbc</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.flink.config;
package io.datavines.connector.plugin;

import io.datavines.common.config.BaseConfig;
import io.datavines.connector.api.*;

public class FlinkConfiguration extends BaseConfig {

private String jobName;
private String checkpointPath;
private int checkpointInterval = 10000; // default 10s

@Override
public String getType() {
return "FLINK";
}
public class FlinkConnectorFactory extends AbstractJdbcConnectorFactory {

public String getJobName() {
return jobName;
@Override
public Connector getConnector() {
return null;
}

public void setJobName(String jobName) {
this.jobName = jobName;
@Override
public Dialect getDialect() {
return new FlinkDialect();
}

public String getCheckpointPath() {
return checkpointPath;
@Override
public ParameterConverter getConnectorParameterConverter() {
return null;
}

public void setCheckpointPath(String checkpointPath) {
this.checkpointPath = checkpointPath;
@Override
public Executor getExecutor() {
return null;
}

public int getCheckpointInterval() {
return checkpointInterval;
@Override
public MetricScript getMetricScript() {
return new FlinkMetricScript();
}

public void setCheckpointInterval(int checkpointInterval) {
this.checkpointInterval = checkpointInterval;
@Override
public Boolean showInFrontend() {
return false;
}
}
Loading

0 comments on commit fdb9d32

Please sign in to comment.