Skip to content

Commit

Permalink
Buffer batches to use interleave
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Nov 30, 2023
1 parent d22ed69 commit 52fbb08
Showing 1 changed file with 56 additions and 39 deletions.
95 changes: 56 additions & 39 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{any::Any, vec};
use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use futures::stream::Stream;
use futures::{FutureExt, StreamExt};
use hashbrown::HashMap;
Expand Down Expand Up @@ -130,11 +131,11 @@ impl BatchPartitioner {
///
/// The time spent repartitioning, not including time spent in `f` will be recorded
/// to the [`metrics::Time`] provided on construction
pub fn partition<F>(&mut self, batch: RecordBatch, mut f: F) -> Result<()>
pub fn partition<F>(&mut self, batches: Vec<RecordBatch>, mut f: F) -> Result<()>
where
F: FnMut(usize, RecordBatch) -> Result<()>,
{
self.partition_iter(batch)?.try_for_each(|res| match res {
self.partition_iter(batches)?.try_for_each(|res| match res {
Ok((partition, batch)) => f(partition, batch),
Err(e) => Err(e),
})
Expand All @@ -147,7 +148,7 @@ impl BatchPartitioner {
/// this (so we don't need to clone the entire implementation).
fn partition_iter(
&mut self,
batch: RecordBatch,
mut batches: Vec<RecordBatch>,
) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_> {
let it: Box<dyn Iterator<Item = Result<(usize, RecordBatch)>> + Send> =
match &mut self.state {
Expand All @@ -157,7 +158,8 @@ impl BatchPartitioner {
} => {
let idx = *next_idx;
*next_idx = (*next_idx + 1) % *num_partitions;
Box::new(std::iter::once(Ok((idx, batch))))
assert_eq!(batches.len(), 1);
Box::new(std::iter::once(Ok((idx, batches.swap_remove(0)))))
}
BatchPartitionerState::Hash {
random_state,
Expand All @@ -166,46 +168,52 @@ impl BatchPartitioner {
hash_buffer,
} => {
let timer = self.timer.timer();

let arrays = exprs
.iter()
.map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;

hash_buffer.clear();
hash_buffer.resize(batch.num_rows(), 0);

create_hashes(&arrays, random_state, hash_buffer)?;

let mut indices: Vec<_> = (0..*partitions)
.map(|_| UInt64Builder::with_capacity(batch.num_rows()))
.collect();

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize]
.append_value(index as u64);
let mut indices = vec![vec![]; *partitions];

for (i, batch) in batches.iter().enumerate() {
let arrays = exprs
.iter()
.map(|expr| {
expr.evaluate(&batch)?.into_array(batch.num_rows())
})
.collect::<Result<Vec<_>>>()?;
hash_buffer.clear();
hash_buffer.resize(batch.num_rows(), 0);
create_hashes(&arrays, random_state, hash_buffer)?;

for (index, hash) in hash_buffer.iter().enumerate() {
let p = (*hash % *partitions as u64);
indices[p as usize].push((i, index))
}
}

let it = indices
.into_iter()
.enumerate()
.filter_map(|(partition, mut indices)| {
let indices = indices.finish();
.filter_map(|(partition, indices)| {
(!indices.is_empty()).then_some((partition, indices))
})
.map(move |(partition, indices)| {
// Produce batches based on indices
let columns = batch
.columns()
.iter()
.map(|c| {
arrow::compute::take(c.as_ref(), &indices, None)
.map_err(DataFusionError::ArrowError)
})
.collect::<Result<Vec<ArrayRef>>>()?;
let num_columns = batches[0].num_columns();
let mut cols = vec![];
for c in 0..num_columns {
let to_interleave_columns: Vec<&dyn Array> = batches
.iter()
.map(|x| x.columns()[c].as_ref())
.collect();

let res = arrow::compute::interleave(
to_interleave_columns.as_ref(),
&indices,
)
.map_err(DataFusionError::ArrowError)?;

cols.push(res);
}

let batch =
RecordBatch::try_new(batch.schema(), columns).unwrap();
RecordBatch::try_new(batches[0].schema(), cols).unwrap();

// bind timer so it drops w/ this iterator
let _ = &timer;
Expand Down Expand Up @@ -686,14 +694,14 @@ impl RepartitionExec {
metrics: RepartitionMetrics,
context: Arc<TaskContext>,
) -> Result<()> {
let is_hash_partitioning = matches!(partitioning, Partitioning::Hash(_, _));
let mut partitioner =
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;

// execute the child operator
let timer = metrics.fetch_time.timer();
let mut stream = input.execute(partition, context)?;
timer.done();

let mut batches_buffer = vec![];
// While there are still outputs to send to, keep pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
while !output_channels.is_empty() {
Expand All @@ -703,12 +711,21 @@ impl RepartitionExec {
timer.done();

// Input is done
let batch = match result {
Some(result) => result?,
None => break,
let _ = match result {
Some(result) => {
batches_buffer.push(result?);
if is_hash_partitioning
&& batches_buffer.len() < partitioner.num_partitions()
{
// Keep buffering batches
continue;
}
}
None if batches_buffer.is_empty() => break,
None => {}
};

for res in partitioner.partition_iter(batch)? {
for res in partitioner.partition_iter(batches_buffer.clone())? {
let (partition, batch) = res?;
let size = batch.get_array_memory_size();

Expand Down

0 comments on commit 52fbb08

Please sign in to comment.