Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support On-Demand Repartition #14411

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

Weijun-H
Copy link
Member

@Weijun-H Weijun-H commented Feb 2, 2025

Which issue does this PR close?

Closes #14287

Rationale for this change

  • Introduce prefer_round_robin_repartititon in optimizer config, when it is false, replace all RoundRobinBatch to OnDemandRepartition
  • Use mpmc channel to make sure the Repartition poll one data when requesting instead of pre-assigning
/// The OnDemandRepartitionExec operator repartitions the input data based on a push-based model.
/// It is similar to the RepartitionExec operator, but it doesn't distribute the data to the output
/// partitions until the output partitions request the data.
///
/// When polling, the operator sends the output partition number to the one partition channel, then the prefetch buffer will distribute the data based on the order of the partition number.
/// Each input steams has a prefetch buffer(channel) to distribute the data to the output partitions.
///
/// The following diagram illustrates the data flow of the OnDemandRepartitionExec operator with 3 output partitions for the input stream 1:
/// ```text
///         /\                     /\                     /\
///         ││                     ││                     ││
///         ││                     ││                     ││
///         ││                     ││                     ││
/// ┌───────┴┴────────┐    ┌───────┴┴────────┐    ┌───────┴┴────────┐
/// │     Stream      │    │     Stream      │    │     Stream      │
/// │       (1)       │    │       (2)       │    │       (3)       │
/// └────────┬────────┘    └───────┬─────────┘    └────────┬────────┘
///          │                     │                       │    / \
///          │                     │                       │    | |
///          │                     │                       │    | |
///          └────────────────┐    │    ┌──────────────────┘    | |
///                           │    │    │                       | |
///                           ▼    ▼    ▼                       | |
///                       ┌─────────────────┐                   | |
///  Send the partition   │ partion channel │                   | |
///  number when polling  │                 │                   | |
///                       └────────┬────────┘                   | |
///                                │                            | |
///                                │                            | |
///                                │  Get the partition number  | |
///                                ▼  then send data            | |
///                       ┌─────────────────┐                   | |
///                       │ Prefetch Buffer │───────────────────┘ |
///                       │       (1)       │─────────────────────┘
///                       └─────────────────┘ Distribute data to the output partitions
///
/// ```text

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Benchmark

--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-repartition-with-config ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    1.45ms │                            1.36ms │ +1.06x faster │
│ QQuery 1     │   25.36ms │                           21.41ms │ +1.18x faster │
│ QQuery 2     │   67.53ms │                           61.03ms │ +1.11x faster │
│ QQuery 3     │   57.35ms │                           52.41ms │ +1.09x faster │
│ QQuery 4     │  534.77ms │                          478.96ms │ +1.12x faster │
│ QQuery 5     │  595.20ms │                          528.32ms │ +1.13x faster │
│ QQuery 6     │   24.09ms │                           21.91ms │ +1.10x faster │
│ QQuery 7     │   27.90ms │                           25.10ms │ +1.11x faster │
│ QQuery 8     │  598.24ms │                          527.70ms │ +1.13x faster │
│ QQuery 9     │  826.05ms │                          770.30ms │ +1.07x faster │
│ QQuery 10    │  181.87ms │                          166.84ms │ +1.09x faster │
│ QQuery 11    │  203.73ms │                          184.91ms │ +1.10x faster │
│ QQuery 12    │  618.12ms │                          568.15ms │ +1.09x faster │
│ QQuery 13    │  953.41ms │                          847.28ms │ +1.13x faster │
│ QQuery 14    │  597.70ms │                          536.09ms │ +1.11x faster │
│ QQuery 15    │  656.94ms │                          579.37ms │ +1.13x faster │
│ QQuery 16    │ 1143.55ms │                         1143.37ms │     no change │
│ QQuery 17    │ 1062.59ms │                         1059.22ms │     no change │
│ QQuery 18    │ 2679.79ms │                         2946.97ms │  1.10x slower │
│ QQuery 19    │   45.29ms │                           43.45ms │     no change │
│ QQuery 20    │  731.38ms │                          751.88ms │     no change │
│ QQuery 21    │  921.35ms │                          948.15ms │     no change │
│ QQuery 22    │ 1776.19ms │                         1793.34ms │     no change │
│ QQuery 23    │ 6025.93ms │                         6083.57ms │     no change │
│ QQuery 24    │  325.23ms │                          314.10ms │     no change │
│ QQuery 25    │  270.52ms │                          272.40ms │     no change │
│ QQuery 26    │  353.91ms │                          363.50ms │     no change │
│ QQuery 27    │ 1116.80ms │                         1134.67ms │     no change │
│ QQuery 28    │ 9262.02ms │                         9267.22ms │     no change │
│ QQuery 29    │  413.75ms │                          412.71ms │     no change │
│ QQuery 30    │  517.75ms │                          519.10ms │     no change │
│ QQuery 31    │  507.80ms │                          513.83ms │     no change │
│ QQuery 32    │ 3793.91ms │                         4300.86ms │  1.13x slower │
│ QQuery 33    │ 4359.10ms │                         4613.47ms │  1.06x slower │
│ QQuery 34    │ 4594.42ms │                         4679.83ms │     no change │
│ QQuery 35    │  782.20ms │                          784.82ms │     no change │
│ QQuery 36    │  102.11ms │                          102.32ms │     no change │
│ QQuery 37    │   49.16ms │                           48.60ms │     no change │
│ QQuery 38    │   68.82ms │                           67.89ms │     no change │
│ QQuery 39    │  189.32ms │                          184.44ms │     no change │
│ QQuery 40    │   22.48ms │                           23.17ms │     no change │
│ QQuery 41    │   20.51ms │                           20.36ms │     no change │
│ QQuery 42    │   28.20ms │                           27.98ms │     no change │
└──────────────┴───────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                │ 47133.76ms │
│ Total Time (on-demand-repartition-with-config)   │ 47792.34ms │
│ Average Time (main)                              │  1096.13ms │
│ Average Time (on-demand-repartition-with-config) │  1111.45ms │
│ Queries Faster                                   │         16 │
│ Queries Slower                                   │          3 │
│ Queries with No Change                           │         24 │
└──────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ on-demand-repartition-with-config ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  99.09ms │                           88.80ms │ +1.12x faster │
│ QQuery 2     │  20.87ms │                           19.74ms │ +1.06x faster │
│ QQuery 3     │  37.25ms │                           38.27ms │     no change │
│ QQuery 4     │  23.81ms │                           23.95ms │     no change │
│ QQuery 5     │  53.52ms │                           54.23ms │     no change │
│ QQuery 6     │  18.25ms │                           17.62ms │     no change │
│ QQuery 7     │  73.79ms │                           73.01ms │     no change │
│ QQuery 8     │  48.66ms │                           46.68ms │     no change │
│ QQuery 9     │  69.25ms │                           70.53ms │     no change │
│ QQuery 10    │  55.81ms │                           58.42ms │     no change │
│ QQuery 11    │  13.62ms │                           14.20ms │     no change │
│ QQuery 12    │  35.37ms │                           36.85ms │     no change │
│ QQuery 13    │  32.74ms │                           33.21ms │     no change │
│ QQuery 14    │  28.51ms │                           29.01ms │     no change │
│ QQuery 15    │  41.11ms │                           42.72ms │     no change │
│ QQuery 16    │  14.57ms │                           15.28ms │     no change │
│ QQuery 17    │  90.69ms │                           90.80ms │     no change │
│ QQuery 18    │ 117.01ms │                          117.39ms │     no change │
│ QQuery 19    │  46.54ms │                           47.67ms │     no change │
│ QQuery 20    │  39.28ms │                           39.74ms │     no change │
│ QQuery 21    │  93.51ms │                           90.00ms │     no change │
│ QQuery 22    │  15.31ms │                           15.92ms │     no change │
└──────────────┴──────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                                │ 1068.59ms │
│ Total Time (on-demand-repartition-with-config)   │ 1064.05ms │
│ Average Time (main)                              │   48.57ms │
│ Average Time (on-demand-repartition-with-config) │   48.37ms │
│ Queries Faster                                   │         2 │
│ Queries Slower                                   │         0 │
│ Queries with No Change                           │        20 │
└──────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-repartition-with-config ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 1     │  819.81ms │                          836.43ms │ no change │
│ QQuery 2     │  124.99ms │                          123.28ms │ no change │
│ QQuery 3     │  399.40ms │                          390.62ms │ no change │
│ QQuery 4     │  195.41ms │                          194.33ms │ no change │
│ QQuery 5     │  591.84ms │                          598.59ms │ no change │
│ QQuery 6     │  138.58ms │                          138.75ms │ no change │
│ QQuery 7     │  870.74ms │                          868.79ms │ no change │
│ QQuery 8     │  615.54ms │                          610.69ms │ no change │
│ QQuery 9     │  981.82ms │                          992.46ms │ no change │
│ QQuery 10    │  543.70ms │                          546.70ms │ no change │
│ QQuery 11    │   84.80ms │                           86.21ms │ no change │
│ QQuery 12    │  281.75ms │                          290.11ms │ no change │
│ QQuery 13    │  414.65ms │                          425.19ms │ no change │
│ QQuery 14    │  228.08ms │                          228.42ms │ no change │
│ QQuery 15    │  381.08ms │                          389.09ms │ no change │
│ QQuery 16    │   91.66ms │                           91.75ms │ no change │
│ QQuery 17    │ 1067.02ms │                         1074.84ms │ no change │
│ QQuery 18    │ 1593.63ms │                         1563.52ms │ no change │
│ QQuery 19    │  393.60ms │                          396.00ms │ no change │
│ QQuery 20    │  373.34ms │                          369.92ms │ no change │
│ QQuery 21    │ 1337.07ms │                         1331.86ms │ no change │
│ QQuery 22    │  129.59ms │                          125.32ms │ no change │
└──────────────┴───────────┴───────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                │ 11658.11ms │
│ Total Time (on-demand-repartition-with-config)   │ 11672.87ms │
│ Average Time (main)                              │   529.91ms │
│ Average Time (on-demand-repartition-with-config) │   530.58ms │
│ Queries Faster                                   │          0 │
│ Queries Slower                                   │          0 │
│ Queries with No Change                           │         22 │
└──────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf50.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ on-demand-repartition-with-config ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  4056.73ms │                         4084.32ms │     no change │
│ QQuery 2     │   821.39ms │                          808.06ms │     no change │
│ QQuery 3     │  2003.29ms │                         2001.88ms │     no change │
│ QQuery 4     │   960.06ms │                          970.89ms │     no change │
│ QQuery 5     │  3328.71ms │                         3334.79ms │     no change │
│ QQuery 6     │   672.96ms │                          660.16ms │     no change │
│ QQuery 7     │  9750.95ms │                         9392.03ms │     no change │
│ QQuery 8     │  3829.53ms │                         3295.35ms │ +1.16x faster │
│ QQuery 9     │  6594.97ms │                         6034.65ms │ +1.09x faster │
│ QQuery 10    │  2779.14ms │                         2814.19ms │     no change │
│ QQuery 11    │   767.51ms │                          775.13ms │     no change │
│ QQuery 12    │  1328.17ms │                         1361.47ms │     no change │
│ QQuery 13    │  2438.08ms │                         2500.79ms │     no change │
│ QQuery 14    │  1061.81ms │                         1061.12ms │     no change │
│ QQuery 15    │  2412.39ms │                         2490.02ms │     no change │
│ QQuery 16    │   412.71ms │                          403.75ms │     no change │
│ QQuery 17    │  5928.70ms │                         5931.86ms │     no change │
│ QQuery 18    │ 15010.01ms │                        15308.33ms │     no change │
│ QQuery 19    │  1765.70ms │                         1743.14ms │     no change │
│ QQuery 20    │  2134.86ms │                         2167.34ms │     no change │
│ QQuery 21    │  8523.05ms │                         8515.56ms │     no change │
│ QQuery 22    │   670.66ms │                          674.19ms │     no change │
└──────────────┴────────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                │ 77251.37ms │
│ Total Time (on-demand-repartition-with-config)   │ 76329.02ms │
│ Average Time (main)                              │  3511.43ms │
│ Average Time (on-demand-repartition-with-config) │  3469.50ms │
│ Queries Faster                                   │          2 │
│ Queries Slower                                   │          0 │
│ Queries with No Change                           │         20 │
└──────────────────────────────────────────────────┴────────────┘

@github-actions github-actions bot added physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) common Related to common crate proto Related to proto crate labels Feb 2, 2025
@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 54db067 to 6ffe62c Compare February 2, 2025 16:23
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Feb 2, 2025
@ozankabak
Copy link
Contributor

@Weijun-H has been working on this with the Synnada team for a while. The initial benchmark results were promising, so we decided to continue development while receiving community feedback 🚀

@ozankabak
Copy link
Contributor

This is still in somewhat early stages, and there is work to do. But it might be good to get feedback early on from the community as the performance of this code is somewhat sensitive to idioms used with channels etc.

Copy link
Contributor

@mertak-synnada mertak-synnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the work! Just put some comments

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch 3 times, most recently from 69a3c4f to f6934d1 Compare February 6, 2025 14:42
@Weijun-H Weijun-H marked this pull request as ready for review February 6, 2025 15:20
@alamb
Copy link
Contributor

alamb commented Feb 6, 2025

This is still in somewhat early stages, and there is work to do. But it might be good to get feedback early on from the community as the performance of this code is somewhat sensitive to idioms used with channels etc.

Maybe I am missing something, but the benchmark numbers reported above don't really show much of an improvement

For example, this branch appears to be basically the same

│ Total Time (main)                                │ 11767.43ms │
│ Total Time (on-demand-repartition-with-config)   │ 11787.12ms │

Are there any benchmarks that show a performance benefit of all this new code?

@ozankabak
Copy link
Contributor

@Weijun-H did some benchmarks a while back and the approach seemed promising in TPCH/SF50.

@mertak-synnada will do a detailed review of this tomorrow and then @Weijun-H can run the latest benchmarks for us to see how the performance changes

@berkaysynnada
Copy link
Contributor

Maybe I am missing something, but the benchmark numbers reported above don't really show much of an improvement

this might be a silly question but, did you set the config flag for on-demand-repartition-with-config branch?😅

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from fa91ea3 to beacced Compare February 7, 2025 04:18
@Weijun-H
Copy link
Member Author

Weijun-H commented Feb 7, 2025

I updated the latest benchmark results. It appears that the OnDemandRepartition improved performance on clickbench_partitioned and large datasets such as tpch_sf50. For tpch_sf1 and tpch_sf10, the results are similar. I will check again to ensure everything is functioning correctly in the coming days.
@ozankabak @alamb @berkaysynnada @mertak-synnada

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 62726ec to 3d4ee00 Compare February 7, 2025 04:59
@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 2ac6849 to df119c3 Compare February 7, 2025 05:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-expr Physical Expressions proto Related to proto crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Alternative approaches to "fan-out" style RepartitionExec
5 participants