Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into repartition_poc_2
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Dec 9, 2024
2 parents bb22271 + d3cfc45 commit 1b8e770
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
27 changes: 15 additions & 12 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,21 @@ async fn load_left_input(

// Load all batches and count the rows
let (batches, _metrics, reservation) = stream
.try_fold((Vec::new(), metrics, reservation), |mut acc, batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.2.try_grow(batch_size)?;
// Update metrics
acc.1.build_mem_used.add(batch_size);
acc.1.build_input_batches.add(1);
acc.1.build_input_rows.add(batch.num_rows());
// Push batch to output
acc.0.push(batch);
Ok(acc)
})
.try_fold(
(Vec::new(), metrics, reservation),
|(mut batches, metrics, mut reservation), batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
reservation.try_grow(batch_size)?;
// Update metrics
metrics.build_mem_used.add(batch_size);
metrics.build_input_batches.add(1);
metrics.build_input_rows.add(batch.num_rows());
// Push batch to output
batches.push(batch);
Ok((batches, metrics, reservation))
},
)
.await?;

let merged_batch = concat_batches(&left_schema, &batches)?;
Expand Down
24 changes: 11 additions & 13 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
use arrow::compute::concat_batches;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use datafusion_common::{
exec_datafusion_err, internal_err, JoinSide, Result, Statistics,
};
Expand Down Expand Up @@ -440,17 +439,17 @@ async fn collect_left_input(
let (batches, metrics, mut reservation) = stream
.try_fold(
(Vec::new(), join_metrics, reservation),
|mut acc, batch| async {
|(mut batches, metrics, mut reservation), batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.2.try_grow(batch_size)?;
reservation.try_grow(batch_size)?;
// Update metrics
acc.1.build_mem_used.add(batch_size);
acc.1.build_input_batches.add(1);
acc.1.build_input_rows.add(batch.num_rows());
metrics.build_mem_used.add(batch_size);
metrics.build_input_batches.add(1);
metrics.build_input_rows.add(batch.num_rows());
// Push batch to output
acc.0.push(batch);
Ok(acc)
batches.push(batch);
Ok((batches, metrics, reservation))
},
)
.await?;
Expand All @@ -459,14 +458,13 @@ async fn collect_left_input(

// Reserve memory for visited_left_side bitmap if required by join type
let visited_left_side = if with_visited_left_side {
// TODO: Replace `ceil` wrapper with stable `div_cell` after
// https://github.com/rust-lang/rust/issues/88581
let buffer_size = bit_util::ceil(merged_batch.num_rows(), 8);
let n_rows = merged_batch.num_rows();
let buffer_size = n_rows.div_ceil(8);
reservation.try_grow(buffer_size)?;
metrics.build_mem_used.add(buffer_size);

let mut buffer = BooleanBufferBuilder::new(merged_batch.num_rows());
buffer.append_n(merged_batch.num_rows(), false);
let mut buffer = BooleanBufferBuilder::new(n_rows);
buffer.append_n(n_rows, false);
buffer
} else {
BooleanBufferBuilder::new(0)
Expand Down

0 comments on commit 1b8e770

Please sign in to comment.