Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector] Improve hive connector with error data sink #490

Merged
merged 3 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ConfigConstants {
public static final String DATA_DATE = "data_date";
public static final String REGEXP_PATTERN = "regexp_pattern";
public static final String ERROR_OUTPUT_PATH = "error_output_path";
public static final String ERROR_DATA_CONNECTOR_TYPE = "error_data_connector_type";
public static final String INDEX = "index";
public static final String PATH = "path";
public static final String HDFS_FILE = "hdfs_file";
Expand Down Expand Up @@ -111,7 +112,6 @@ public class ConfigConstants {
public static final String DATA_DIR = "data_dir";

public static final String ENABLE_SPARK_HIVE_SUPPORT = "enable_spark_hive_support";
public static final String ENABLE_USE_VIEW = "enable_use_view";

public static final String FILE = "file";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static String formatSql(String sql) {
return sql;
}

private static Map<String, Object> getResultObjectMap(ResultSet rs, ResultSetMetaData metaData) throws SQLException {
public static Map<String, Object> getResultObjectMap(ResultSet rs, ResultSetMetaData metaData) throws SQLException {
Map<String, Object> map = new LinkedHashMap<>();

for (int i = 1; i <= metaData.getColumnCount(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

import io.datavines.common.enums.DataType;
import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.entity.ResultList;
import io.datavines.connector.api.entity.StructField;
import org.apache.commons.collections4.CollectionUtils;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -138,4 +142,6 @@ default String getInsertAsSelectStatementFromSql(String srcTable, String targetD
String getErrorDataScript(Map<String, String> configMap);

String getValidateResultDataScript(Map<String, String> configMap);

ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs, String sourceTable, int start, int end) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.local.api.entity;
package io.datavines.connector.api.entity;

public class QueryColumn {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.local.api.entity;
package io.datavines.connector.api.entity;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.local.api.entity;
package io.datavines.connector.api.entity;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.Dialect;
import io.datavines.connector.api.entity.ResultList;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -61,4 +65,9 @@ public String getValidateResultDataScript(Map<String, String> configMap) {
}
return null;
}

@Override
public ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs, String sourceTable, int start, int end) throws SQLException {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,21 @@
*/
package io.datavines.connector.plugin;

import io.datavines.common.datasource.jdbc.utils.HiveSqlUtils;
import io.datavines.connector.api.entity.ResultList;
import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.datavines.common.ConfigConstants.STRING_TYPE;

@Slf4j
public class HiveDialect extends JdbcDialect {

@Override
Expand All @@ -35,7 +46,12 @@ public String getDriver() {
}

@Override
public boolean invalidateItemCanOutput() {
return false;
public ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs,String sourceTable, int start, int end) throws SQLException {
List<Map<String, Object>> resultList = new ArrayList<>();
String sql = "select * from " + sourceTable + " LIMIT " + start + ", " + (end-start);
ResultSet errorDataResultSet = sourceConnectionStatement.executeQuery(sql);
ResultSetMetaData metaData = rs.getMetaData();
resultList.add(HiveSqlUtils.getResultObjectMap(errorDataResultSet, metaData));
return new ResultList(resultList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,7 @@ protected InputParam getInputParam(String field, String title, String placeholde

protected List<PluginParams> getOtherParams(boolean isEn) {

List<PluginParams> list = new ArrayList<>();

InputParam enableExternalCatalog = getInputParam("enable_use_view",
isEn ? "enable.use.view" : "允许使用视图",
isEn ? "please enter true or false" : "请填入 true 或者 false", 2, null,
"false");

list.add(enableExternalCatalog);
return list;
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.Dialect;
import io.datavines.connector.api.entity.ResultList;
import io.datavines.connector.plugin.utils.SqlUtils;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -80,4 +85,8 @@ public String getValidateResultDataScript(Map<String, String> configMap) {
return null;
}

@Override
public ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs, String sourceTable, int start, int end) throws SQLException {
return SqlUtils.getPageFromResultSet(rs, start, end);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public static List<StructField> getSchema(ResultSet resultSet, Dialect dialect,
boolean isNullable = metaData.isNullable(i + 1) != ResultSetMetaData.columnNoNulls;

StructField field = new StructField();
String[] columns = columnName.split("\\.");
if (columns.length > 1) {
columnName = columns[columns.length - 1];
}
field.setName(columnName.toLowerCase());
field.setDataType(typeConverter.convert(typeName));
field.setNullable(isNullable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.local.api.utils;
package io.datavines.connector.plugin.utils;

import io.datavines.common.utils.StringUtils;
import io.datavines.engine.local.api.entity.QueryColumn;
import io.datavines.engine.local.api.entity.ResultList;
import io.datavines.engine.local.api.entity.ResultListWithColumns;
import io.datavines.connector.api.entity.QueryColumn;
import io.datavines.connector.api.entity.ResultList;
import io.datavines.connector.api.entity.ResultListWithColumns;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.Alias;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Table;
import net.sf.jsqlparser.statement.select.*;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;

import java.sql.*;
import java.util.*;

import static io.datavines.common.CommonConstants.DOT;
import static org.apache.commons.lang3.StringUtils.EMPTY;

@Slf4j
public class SqlUtils {

protected static Logger log = LoggerFactory.getLogger(SqlUtils.class);

public static ResultListWithColumns getListWithHeaderFromResultSet(ResultSet rs, Set<String> queryFromsAndJoins) throws SQLException {

ResultListWithColumns resultListWithColumns = new ResultListWithColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import io.datavines.common.exception.DataVinesException;
import io.datavines.common.utils.StringUtils;
import io.datavines.engine.api.env.Execution;
import io.datavines.engine.local.api.entity.ResultList;
import io.datavines.connector.api.entity.ResultList;
import io.datavines.engine.local.api.utils.LoggerFactory;
import io.datavines.engine.local.api.utils.SqlUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;

Expand Down Expand Up @@ -58,7 +57,6 @@ public void execute(List<LocalSource> sources, List<LocalTransform> transforms,
return;
}

List<String> invalidItemTableSet = new ArrayList<>();
String preSql = null;
String postSql = null;
try {
Expand Down Expand Up @@ -116,14 +114,11 @@ public void execute(List<LocalSource> sources, List<LocalTransform> transforms,

List<ResultList> taskResult = new ArrayList<>();
List<ResultList> actualValue = new ArrayList<>();
transforms.forEach(localTransform -> {
for (LocalTransform localTransform : transforms) {
if (localRuntimeEnvironment.isStop()) {
break;
}
switch (TransformType.of(localTransform.getConfig().getString(PLUGIN_TYPE))){
case INVALIDATE_ITEMS:
if (StringUtils.isNotEmpty(localTransform.getConfig().getString(INVALIDATE_ITEMS_TABLE))) {
invalidItemTableSet.add(localTransform.getConfig().getString(INVALIDATE_ITEMS_TABLE));
}
localTransform.process(localRuntimeEnvironment);
break;
case ACTUAL_VALUE:
ResultList actualValueResult = localTransform.process(localRuntimeEnvironment);
actualValue.add(actualValueResult);
Expand All @@ -138,9 +133,12 @@ public void execute(List<LocalSource> sources, List<LocalTransform> transforms,
default:
break;
}
});
}

for (LocalSink localSink : sinks) {
if (localRuntimeEnvironment.isStop()) {
break;
}
switch (SinkType.of(localSink.getConfig().getString(PLUGIN_TYPE))){
case ERROR_DATA:
localSink.output(null, localRuntimeEnvironment);
Expand All @@ -159,14 +157,6 @@ public void execute(List<LocalSource> sources, List<LocalTransform> transforms,
} catch (Exception e) {
log.error("execute error", e);
throw e;
} finally {
for (String invalidItemTable : invalidItemTableSet) {
try {
SqlUtils.dropView(invalidItemTable, localRuntimeEnvironment.getSourceConnection().getConnection());
} catch (SQLException sqlException) {
log.error("drop view error: ", sqlException);
}
}
}

post(postSql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.datavines.engine.api.env.RuntimeEnvironment;
import io.datavines.engine.local.api.entity.ConnectionHolder;
import io.datavines.engine.local.api.utils.LoggerFactory;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;

import java.sql.Statement;
Expand All @@ -30,14 +32,24 @@ public class LocalRuntimeEnvironment implements RuntimeEnvironment {

protected Logger log = LoggerFactory.getLogger(LocalRuntimeEnvironment.class);

@Setter
@Getter
private ConnectionHolder sourceConnection;

@Setter
@Getter
private ConnectionHolder targetConnection;

@Setter
@Getter
private ConnectionHolder metadataConnection;

@Setter
private Statement currentStatement;

@Getter
private boolean stop;

@Override
public void prepare() {

Expand All @@ -63,33 +75,9 @@ public CheckResult checkConfig() {
return null;
}

public ConnectionHolder getSourceConnection() {
return sourceConnection;
}

public void setSourceConnection(ConnectionHolder sourceConnection) {
this.sourceConnection = sourceConnection;
}

public ConnectionHolder getMetadataConnection() {
return metadataConnection;
}

public void setMetadataConnection(ConnectionHolder metadataConnection) {
this.metadataConnection = metadataConnection;
}

public ConnectionHolder getTargetConnection() {
return targetConnection;
}

public void setTargetConnection(ConnectionHolder targetConnection) {
this.targetConnection = targetConnection;
}

public void close() throws Exception {
if (currentStatement != null) {
currentStatement.close();
currentStatement.cancel();
}

if (sourceConnection != null) {
Expand All @@ -103,9 +91,8 @@ public void close() throws Exception {
if (metadataConnection != null) {
metadataConnection.close();
}
}

public void setCurrentStatement(Statement statement) {
this.currentStatement = statement;
stop = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.datavines.common.config.Config;
import io.datavines.common.utils.StringUtils;
import io.datavines.engine.api.component.Component;
import io.datavines.engine.local.api.entity.ResultList;
import io.datavines.connector.api.entity.ResultList;

import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/
package io.datavines.engine.local.api;

import io.datavines.common.exception.DataVinesException;
import io.datavines.engine.api.component.Component;
import io.datavines.engine.local.api.entity.ResultList;
import io.datavines.connector.api.entity.ResultList;

public interface LocalTransform extends Component {

ResultList process(LocalRuntimeEnvironment env);
ResultList process(LocalRuntimeEnvironment env) throws DataVinesException;
}
Loading
Loading