Skip to content

Commit

Permalink
[FLINK-37284][runtime] Fix ForwardForConsecutiveHashPartitioner canno…
Browse files Browse the repository at this point in the history
…t be chained in Adaptive batch.
  • Loading branch information
JunRuiLee committed Feb 11, 2025
1 parent ea28cad commit f13c1d8
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
Expand Down Expand Up @@ -618,6 +619,15 @@ && isChainableSource(
streamGraph.getStreamNode(edge.getSourceId()), streamGraph))
|| isChainable(edge, streamGraph)) {
edge.setPartitioner(new ForwardPartitioner<>());

// ForwardForConsecutiveHashPartitioner may use BATCH exchange mode, which prevents
// operator chaining. To enable chaining for edges using this partitioner, we need
// to set their exchange mode to UNDEFINED.
if (partitioner instanceof ForwardForConsecutiveHashPartitioner
&& edge.getExchangeMode() == StreamExchangeMode.BATCH) {
edge.setExchangeMode(StreamExchangeMode.UNDEFINED);
}

// Currently, there is no intra input key correlation for edge with
// ForwardForUnspecifiedPartitioner, and we need to modify it to false.
if (partitioner instanceof ForwardForUnspecifiedPartitioner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class StreamEdge implements Serializable {
/** The name of the operator in the target vertex. */
private final String targetOperatorName;

private final StreamExchangeMode exchangeMode;
private StreamExchangeMode exchangeMode;

private long bufferTimeout;

Expand Down Expand Up @@ -195,6 +195,10 @@ public StreamExchangeMode getExchangeMode() {
return exchangeMode;
}

void setExchangeMode(StreamExchangeMode exchangeMode) {
this.exchangeMode = exchangeMode;
}

public void setPartitioner(StreamPartitioner<?> partitioner) {
configureKeyCorrelation(partitioner);
this.outputPartitioner = partitioner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.graph;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
Expand All @@ -32,6 +33,10 @@
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;

import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;

Expand Down Expand Up @@ -235,6 +240,33 @@ void testSourceChain() {
assertThat(jobGraph.getVerticesSortedTopologicallyFromSources().size()).isEqualTo(4);
}

@Test
void testForwardForConsecutiveHashPartitionerChain() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(1);

final DataStream<Integer> source = env.fromData(1, 2, 3);
final DataStream<Integer> forward =
new DataStream<>(
env,
new PartitionTransformation<>(
source.getTransformation(),
new ForwardForConsecutiveHashPartitioner<>(
new RebalancePartitioner<>()),
StreamExchangeMode.BATCH));
forward.print();

StreamGraph streamGraph1 = env.getStreamGraph(false);
streamGraph1.setDynamic(true);
JobGraph jobGraph1 = generateJobGraphInLazilyMode(streamGraph1);

StreamGraph streamGraph2 = env.getStreamGraph(false);
streamGraph2.setDynamic(true);
JobGraph jobGraph2 = StreamingJobGraphGenerator.createJobGraph(streamGraph2);
assertThat(isJobGraphEquivalent(jobGraph1, jobGraph2)).isTrue();
}

private static JobGraph generateJobGraphInLazilyMode(StreamGraph streamGraph) {
AdaptiveGraphManager adaptiveGraphManager =
new AdaptiveGraphManager(
Expand Down

0 comments on commit f13c1d8

Please sign in to comment.