Skip to content

Commit

Permalink
Increase batch size x4
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Dec 1, 2023
1 parent 8aae82c commit 845c5d2
Show file tree
Hide file tree
Showing 40 changed files with 302 additions and 302 deletions.
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct ConvertOpt {
partitions: usize,

/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
#[structopt(short = "s", long = "batch-size", default_value = "32768")]
batch_size: usize,
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ config_namespace! {
/// Default batch size while creating new batches, it's especially useful for
/// buffer-in-memory batches since creating tiny batches would result in too much
/// metadata memory consumption
pub batch_size: usize, default = 8192
pub batch_size: usize, default = 32768

/// When set to true, record batches will be examined between each operator and
/// small batches will be coalesced into larger batches. This is helpful when there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,15 @@ mod tests {
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
Expand Down Expand Up @@ -511,19 +511,19 @@ mod tests {
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
Expand All @@ -546,15 +546,15 @@ mod tests {

let expected_input = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
Expand Down Expand Up @@ -583,7 +583,7 @@ mod tests {
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
Expand All @@ -592,7 +592,7 @@ mod tests {
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
Expand Down Expand Up @@ -887,7 +887,7 @@ mod tests {
}

fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalesceBatchesExec::new(input, 8192))
Arc::new(CoalesceBatchesExec::new(input, 32768))
}

fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ async fn join_change_in_planner() -> Result<()> {
let expected = {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
Expand Down Expand Up @@ -181,11 +181,11 @@ async fn join_change_in_planner_without_sort() -> Result<()> {
let expected = {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,15 @@ ORDER BY 1, 2;
" InterleaveExec",
" ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
///
/// ```text
/// ProjectionExec: expr=[a]
/// CoalesceBatchesExec: target_batch_size=8192
/// CoalesceBatchesExec: target_batch_size=32768
/// FilterExec: a < 5
/// RepartitionExec: partitioning=RoundRobinBatch(16)
/// CsvExec: source=...",
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2486,7 +2486,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: 32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
Expand Down Expand Up @@ -2541,7 +2541,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: 32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
Expand All @@ -2560,7 +2560,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MIN(traces.timestamp)@1 DESC], fetch=4
----SortExec: TopK(fetch=4), expr=[MIN(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: 32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
Expand All @@ -2579,7 +2579,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 ASC NULLS LAST], fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: 32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
Expand All @@ -2598,7 +2598,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [trace_id@0 ASC NULLS LAST], fetch=4
----SortExec: TopK(fetch=4), expr=[trace_id@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: 32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
Expand Down Expand Up @@ -2733,7 +2733,7 @@ GlobalLimitExec: skip=0, fetch=4
------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
--------------CoalescePartitionsExec
----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
------------------CoalesceBatchesExec: target_batch_size=8192
------------------CoalesceBatchesExec: 32768
--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/distinct_on.slt
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_tes
--SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
----SortExec: expr=[c1@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: 32768
----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)], ordering_mode=Sorted
--------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Projection: aggregate_test_100.c1
----TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]
physical_plan
ProjectionExec: expr=[c1@0 as c1]
--CoalesceBatchesExec: target_batch_size=8192
--CoalesceBatchesExec: 32768
----FilterExec: c2@1 > 10
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2], has_header=true
Expand Down
14 changes: 7 additions & 7 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2017,16 +2017,16 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
--SortExec: expr=[col0@0 ASC NULLS LAST]
----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1]
------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: 32768
----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0])
--------------SortExec: expr=[col0@3 ASC NULLS LAST]
----------------CoalesceBatchesExec: target_batch_size=8192
----------------CoalesceBatchesExec: 32768
------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
--------------------CoalesceBatchesExec: target_batch_size=8192
--------------------CoalesceBatchesExec: 32768
----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[3]
--------------------CoalesceBatchesExec: target_batch_size=8192
--------------------CoalesceBatchesExec: 32768
----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[3]

Expand Down Expand Up @@ -2710,7 +2710,7 @@ SortExec: expr=[sn@2 ASC NULLS LAST]
----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)]
------SortExec: expr=[sn@5 ASC NULLS LAST]
--------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, sn@5 as sn, amount@8 as amount]
----------CoalesceBatchesExec: target_batch_size=8192
----------CoalesceBatchesExec: 32768
------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@4, currency@2)], filter=ts@0 >= ts@1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------------MemoryExec: partitions=1, partition_sizes=[1]
Expand Down Expand Up @@ -2754,7 +2754,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
--SortExec: expr=[country@0 ASC NULLS LAST]
----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2]
------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: 32768
----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
Expand Down Expand Up @@ -2791,7 +2791,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
--SortExec: expr=[country@0 ASC NULLS LAST]
----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2]
------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: 32768
----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
Expand Down
Loading

0 comments on commit 845c5d2

Please sign in to comment.