Skip to content

Commit

Permalink
Add Flink execution engine support 2
Browse files Browse the repository at this point in the history
  • Loading branch information
GSHF committed Dec 26, 2024
1 parent cf6e836 commit 2ea7254
Show file tree
Hide file tree
Showing 18 changed files with 40 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.flink.core.config;
package io.datavines.engine.flink.config;

import io.datavines.common.config.EnvConfig;
import io.datavines.common.config.SinkConfig;
Expand All @@ -40,7 +40,12 @@
import static io.datavines.common.ConfigConstants.*;
import static io.datavines.common.ConfigConstants.TABLE;


/**
*
*
* @author dataVines
* @since 2021-07-01
*/
@Slf4j
public abstract class BaseFlinkConfigurationBuilder extends BaseJobConfigurationBuilder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.flink.core.config;
package io.datavines.engine.flink.config;

import io.datavines.common.config.Config;
import io.datavines.common.config.CheckResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.flink.core.config;
package io.datavines.engine.flink.config;

import io.datavines.common.config.Config;
import io.datavines.common.config.EnvConfig;
import io.datavines.common.config.SinkConfig;
import io.datavines.common.config.SourceConfig;
Expand All @@ -32,12 +31,11 @@
import java.util.List;
import java.util.Map;

import static io.datavines.common.CommonConstants.*;
import static io.datavines.common.ConfigConstants.*;
import static io.datavines.common.ConfigConstants.TABLE;

public class FlinkSingleTableConfigurationBuilder extends BaseFlinkConfigurationBuilder {


@Override
public void buildEnvConfig() {
EnvConfig envConfig = new EnvConfig();
Expand All @@ -59,7 +57,7 @@ public void buildSinkConfigs() throws DataVinesException {
}

metricInputParameter.put(METRIC_UNIQUE_KEY, metricUniqueKey);
String expectedType = jobExecutionInfo.getEngineType() + "_" + parameter.getExpectedType();
String expectedType = "local_" + parameter.getExpectedType();
ExpectedValue expectedValue = PluginLoader
.getPluginLoader(ExpectedValue.class)
.getNewPlugin(expectedType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.flink.core.config;
package io.datavines.engine.flink.config;

public class FlinkSinkSqlBuilder {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
flink=io.datavines.engine.flink.FlinkEngineExecutor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
flink_single_table=io.datavines.engine.flink.config.FlinkSingleTableConfigurationBuilder

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local_daily_avg=io.datavines.metric.expected.plugin.DailyAvg
spark_daily_avg=io.datavines.metric.expected.plugin.SparkDailyAvg
livy_daily_avg=io.datavines.metric.expected.plugin.SparkDailyAvg
livy_daily_avg=io.datavines.metric.expected.plugin.SparkDailyAvg
flink_daily_avg=io.datavines.metric.expected.plugin.DailyAvg
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local_fix_value=io.datavines.metric.expected.plugin.FixValue
spark_fix_value=io.datavines.metric.expected.plugin.FixValue
livy_fix_value=io.datavines.metric.expected.plugin.FixValue
livy_fix_value=io.datavines.metric.expected.plugin.FixValue
flink_fix_value=io.datavines.metric.expected.plugin.FixValue
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local_last_30d_avg=io.datavines.metric.expected.plugin.Last30DayAvg
spark_last_30d_avg=io.datavines.metric.expected.plugin.SparkLast30DayAvg
livy_last_30d_avg=io.datavines.metric.expected.plugin.SparkLast30DayAvg
livy_last_30d_avg=io.datavines.metric.expected.plugin.SparkLast30DayAvg
flink_last_30d_avg=io.datavines.metric.expected.plugin.Last30DayAvg
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local_last_7d_avg=io.datavines.metric.expected.plugin.Last7DayAvg
spark_last_7d_avg=io.datavines.metric.expected.plugin.SparkLast7DayAvg
livy_last_7d_avg=io.datavines.metric.expected.plugin.SparkLast7DayAvg
livy_last_7d_avg=io.datavines.metric.expected.plugin.SparkLast7DayAvg
flink_last_7d_avg=io.datavines.metric.expected.plugin.Last7DayAvg
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local_monthly_avg=io.datavines.metric.expected.plugin.MonthlyAvg
spark_monthly_avg=io.datavines.metric.expected.plugin.SparkMonthlyAvg
livy_monthly_avg=io.datavines.metric.expected.plugin.SparkMonthlyAvg
livy_monthly_avg=io.datavines.metric.expected.plugin.SparkMonthlyAvg
flink_monthly_avg=io.datavines.metric.expected.plugin.MonthlyAvg
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local_none=io.datavines.metric.expected.plugin.None
spark_none=io.datavines.metric.expected.plugin.SparkNone
livy_none=io.datavines.metric.expected.plugin.SparkNone
livy_none=io.datavines.metric.expected.plugin.SparkNone
flink_none=io.datavines.metric.expected.plugin.None
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local_table_total_rows=io.datavines.metric.expected.plugin.TableTotalRows
spark_table_total_rows=io.datavines.metric.expected.plugin.TableTotalRows
livy_table_total_rows=io.datavines.metric.expected.plugin.TableTotalRows
livy_table_total_rows=io.datavines.metric.expected.plugin.TableTotalRows
flink_table_total_rows=io.datavines.metric.expected.plugin.TableTotalRows
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local_target_table_total_rows=io.datavines.metric.expected.plugin.TargetTableTotalRows
spark_target_table_total_rows=io.datavines.metric.expected.plugin.TargetTableTotalRows
livy_target_table_total_rows=io.datavines.metric.expected.plugin.TargetTableTotalRows
livy_target_table_total_rows=io.datavines.metric.expected.plugin.TargetTableTotalRows
flink_target_table_total_rows=io.datavines.metric.expected.plugin.TargetTableTotalRows
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local_weekly_avg=io.datavines.metric.expected.plugin.WeeklyAvg
spark_weekly_avg=io.datavines.metric.expected.plugin.SparkWeeklyAvg
livy_weekly_avg=io.datavines.metric.expected.plugin.SparkWeeklyAvg
livy_weekly_avg=io.datavines.metric.expected.plugin.SparkWeeklyAvg
flink_weekly_avg=io.datavines.metric.expected.plugin.WeeklyAvg
6 changes: 6 additions & 0 deletions datavines-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-flink-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-local-config</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public Object getExpectedTypeList(@PathVariable("type") String type) {
afterFilterSet = expectedValueList.stream()
.map(it ->it.replace("local_", "")
.replace("spark_","")
.replace("livy_",""))
.replace("livy_","")
.replace("flink_",""))
.collect(Collectors.toSet());

List<Item> items = new ArrayList<>();
Expand Down

0 comments on commit 2ea7254

Please sign in to comment.