Skip to content

Commit

Permalink
Reduce source scope to bounded
Browse files Browse the repository at this point in the history
  • Loading branch information
jayehwhyehentee committed Jan 27, 2025
1 parent bce6d8a commit 9257b93
Show file tree
Hide file tree
Showing 22 changed files with 94 additions and 1,449 deletions.
121 changes: 35 additions & 86 deletions README.md

Large diffs are not rendered by default.

16 changes: 0 additions & 16 deletions cloudbuild/nightly/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,6 @@ steps:
- 'CLUSTER_SMALL_TEST_FILE=${_CLUSTER_SMALL_TEST_FILE}'
- 'SINK_PARALLELISM_SMALL_BOUNDED_JOB=${_SINK_PARALLELISM_SMALL_BOUNDED_JOB}'

# 7. Start the query read e2e test.
- name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-nightly'
id: 'e2e-bounded-query-test'
waitFor: ['e2e-bounded-table-api-all-datatypes-test']
entrypoint: 'bash'
args: ['/workspace/cloudbuild/nightly/nightly.sh', 'e2e_bounded_query_test']
env:
- 'GCS_JAR_LOCATION_FILE=${_GCS_JAR_LOCATION_FILE}'
- 'PROJECT_ID=${_PROJECT_ID}'
- 'PROJECT_NAME=${_PROJECT_NAME}'
- 'DATASET_NAME=${_WRITE_DATASET_NAME}'
- 'QUERY=${_QUERY}'
- 'PROPERTIES_SMALL_BOUNDED_JOB=${_PROPERTIES_SMALL_BOUNDED_JOB}'
- 'REGION_SMALL_TEST_FILE=${_REGION_SMALL_TEST_FILE}'
- 'CLUSTER_SMALL_TEST_FILE=${_CLUSTER_SMALL_TEST_FILE}'

# 8. Start the large table e2e test.
- name: 'gcr.io/$PROJECT_ID/dataproc-flink-bigquery-connector-nightly'
id: 'e2e-bounded-large-table-test'
Expand Down
10 changes: 2 additions & 8 deletions cloudbuild/nightly/nightly.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ run_read_only_test(){
DATASET_NAME=$5
TABLE_NAME=$6
AGG_PROP_NAME=$7
QUERY=$8
# Deprecated: QUERY=$8
MODE=$9
PROPERTIES=${10}
# Get the final region and the cluster name.
export REGION=$(cat "$REGION_FILE")
export CLUSTER_NAME=$(cat "$CLUSTER_FILE")
export GCS_JAR_LOCATION=$(cat "$GCS_JAR_LOCATION_FILE")
# Run the simple bounded table test.
source cloudbuild/nightly/scripts/table_read.sh "$PROJECT_ID" "$CLUSTER_NAME" "$REGION" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME" "$AGG_PROP_NAME" "$QUERY" "$MODE" "$PROPERTIES"
source cloudbuild/nightly/scripts/table_read.sh "$PROJECT_ID" "$CLUSTER_NAME" "$REGION" "$PROJECT_NAME" "$DATASET_NAME" "$TABLE_NAME" "$AGG_PROP_NAME" "" "$MODE" "$PROPERTIES"
}

# Function to run the test to check BQ Table Read.
Expand Down Expand Up @@ -176,12 +176,6 @@ case $STEP in
exit
;;

# Run the query bounded e2e test.
e2e_bounded_query_test)
run_read_only_test_delete_cluster "$PROJECT_ID" "$REGION_SMALL_TEST_FILE" "$CLUSTER_SMALL_TEST_FILE" "$PROJECT_NAME" "$DATASET_NAME" "" "" "$QUERY" "bounded" "$PROPERTIES_SMALL_BOUNDED_JOB"
exit
;;

# Run the large table bounded e2e test.
e2e_bounded_large_table_test)
# Run the large table test.
Expand Down
4 changes: 2 additions & 2 deletions cloudbuild/nightly/scripts/table_read.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ PROJECT_NAME=$4
DATASET_NAME=$5
TABLE_NAME=$6
AGG_PROP_NAME=$7
QUERY_STRING=$8
# Deprecated: QUERY_STRING=$8
MODE=$9
PROPERTIES=${10}

Expand Down Expand Up @@ -54,7 +54,7 @@ fi

# Now check the success of the job
# Mode helps in checking for unbounded job separately.
python3 cloudbuild/nightly/scripts/python-scripts/parse_logs.py -- --job_id "$JOB_ID" --project_id "$PROJECT_ID" --cluster_name "$CLUSTER_NAME" --region "$REGION" --project_name "$PROJECT_NAME" --dataset_name "$DATASET_NAME" --table_name "$TABLE_NAME" --query "$QUERY_STRING" --mode "$MODE"
python3 cloudbuild/nightly/scripts/python-scripts/parse_logs.py -- --job_id "$JOB_ID" --project_id "$PROJECT_ID" --cluster_name "$CLUSTER_NAME" --region "$REGION" --project_name "$PROJECT_NAME" --dataset_name "$DATASET_NAME" --table_name "$TABLE_NAME" --mode "$MODE"
ret=$?
if [ $ret -ne 0 ]
then
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -93,7 +92,7 @@
* This module tests the following cases:
*
* <ol>
* <li>Bounded Jobs: Involve reading from and writing to a BigQuery Table in the <i>bounded</i>
* <li>Bounded Job: Involve reading from and writing to a BigQuery Table in the <i>bounded</i>
* mode.<br>
* The arguments given in this case would be:
* <ul>
Expand All @@ -102,7 +101,6 @@
* table} <br>
* <li>--bq-source-table {required; name of Source BigQuery table to read} <br>
* <li>--agg-prop {required; record property to aggregate in Flink job} <br>
* <li>--query {optional; SQL query to fetch data from BigQuery table}
* <li>--gcp-dest-project {optional; project ID which contains the Destination BigQuery
* table}
* <li>--bq-dest-dataset {optional; name of Destination BigQuery dataset containing the
Expand All @@ -117,10 +115,6 @@
* <br>
* The records read are passed to a map which increments the "number" field in the BQ table by
* 1, and writes this modified record back to another (specified) BigQuery Table. <br>
* If a query is set, it is executed first and records obtained are streamed via a map which
* counts the total number of records read (the number of records observed by map operation)
* and logs this count at the end. It also logs the "HOUR" and "DAY" value of the obtained
* rows in order to verify the query correctness. <br>
* In case the <code>is-sql</code> flag is set to true, Flink's Table API's <code>
* .select($(*))</code> method is executed. Which is responsible for reading a source table.
* These read records are then pass through a <code>addOrReplaceColumns()</code> method which
Expand Down Expand Up @@ -204,17 +198,10 @@ public static void main(String[] args) throws Exception {
+ " --exactly-once <set for sink via 'EXACTLY ONCE' approach>"
+ " --enable-table-creation <set for creating BQ table in sink>"
+ " --mode <source type>"
+ " --query <SQL query to get data from BQ table>"
+ " --file-discovery-interval <minutes between checking new files>");
return;
}
String sourceGcpProjectName = parameterTool.getRequired("gcp-source-project");
String query = parameterTool.get("query", "");

if (!query.isEmpty()) {
runQueryFlinkJob(sourceGcpProjectName, query);
return;
}

// Add Sink Parameters as well. (Optional)
String destGcpProjectName = parameterTool.get("gcp-dest-project");
Expand Down Expand Up @@ -335,22 +322,6 @@ public static void main(String[] args) throws Exception {
}
}

private static void runQueryFlinkJob(String projectName, String query) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RESTART_STRATEGY);
env.enableCheckpointing(CHECKPOINT_INTERVAL);

BigQuerySource<GenericRecord> bqSource =
BigQuerySource.readAvrosFromQuery(query, projectName);

env.fromSource(bqSource, WatermarkStrategy.noWatermarks(), "BigQueryQuerySource")
.map(new Mapper())
.print();

env.execute("Flink BigQuery Query Integration Test");
}

private static void runBoundedFlinkJobWithSink(
String sourceGcpProjectName,
String sourceDatasetName,
Expand Down Expand Up @@ -635,7 +606,6 @@ private static void runBoundedSQLFlinkJob(
.dataset(sourceDatasetName)
.table(sourceTableName)
.testMode(false)
.boundedness(Boundedness.BOUNDED)
.build();

// Register the Source Table
Expand Down
Loading

0 comments on commit 9257b93

Please sign in to comment.