diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 24f227d8a535..6ce2b4828eb7 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -27,6 +27,7 @@ use std::{any::Any, vec}; use arrow::array::{ArrayRef, UInt64Builder}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use arrow_array::Array; use futures::stream::Stream; use futures::{FutureExt, StreamExt}; use hashbrown::HashMap; @@ -130,11 +131,11 @@ impl BatchPartitioner { /// /// The time spent repartitioning, not including time spent in `f` will be recorded /// to the [`metrics::Time`] provided on construction - pub fn partition(&mut self, batch: RecordBatch, mut f: F) -> Result<()> + pub fn partition(&mut self, batches: Vec, mut f: F) -> Result<()> where F: FnMut(usize, RecordBatch) -> Result<()>, { - self.partition_iter(batch)?.try_for_each(|res| match res { + self.partition_iter(batches)?.try_for_each(|res| match res { Ok((partition, batch)) => f(partition, batch), Err(e) => Err(e), }) @@ -147,7 +148,7 @@ impl BatchPartitioner { /// this (so we don't need to clone the entire implementation). fn partition_iter( &mut self, - batch: RecordBatch, + mut batches: Vec, ) -> Result> + Send + '_> { let it: Box> + Send> = match &mut self.state { @@ -157,7 +158,8 @@ impl BatchPartitioner { } => { let idx = *next_idx; *next_idx = (*next_idx + 1) % *num_partitions; - Box::new(std::iter::once(Ok((idx, batch)))) + assert_eq!(batches.len(), 1); + Box::new(std::iter::once(Ok((idx, batches.swap_remove(0))))) } BatchPartitionerState::Hash { random_state, @@ -166,46 +168,52 @@ impl BatchPartitioner { hash_buffer, } => { let timer = self.timer.timer(); - - let arrays = exprs - .iter() - .map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows())) - .collect::>>()?; - - hash_buffer.clear(); - hash_buffer.resize(batch.num_rows(), 0); - - create_hashes(&arrays, random_state, hash_buffer)?; - - let mut indices: Vec<_> = (0..*partitions) - .map(|_| UInt64Builder::with_capacity(batch.num_rows())) - .collect(); - - for (index, hash) in hash_buffer.iter().enumerate() { - indices[(*hash % *partitions as u64) as usize] - .append_value(index as u64); + let mut indices = vec![vec![]; *partitions]; + + for (i, batch) in batches.iter().enumerate() { + let arrays = exprs + .iter() + .map(|expr| { + expr.evaluate(&batch)?.into_array(batch.num_rows()) + }) + .collect::>>()?; + hash_buffer.clear(); + hash_buffer.resize(batch.num_rows(), 0); + create_hashes(&arrays, random_state, hash_buffer)?; + + for (index, hash) in hash_buffer.iter().enumerate() { + let p = (*hash % *partitions as u64); + indices[p as usize].push((i, index)) + } } let it = indices .into_iter() .enumerate() - .filter_map(|(partition, mut indices)| { - let indices = indices.finish(); + .filter_map(|(partition, indices)| { (!indices.is_empty()).then_some((partition, indices)) }) .map(move |(partition, indices)| { // Produce batches based on indices - let columns = batch - .columns() - .iter() - .map(|c| { - arrow::compute::take(c.as_ref(), &indices, None) - .map_err(DataFusionError::ArrowError) - }) - .collect::>>()?; + let num_columns = batches[0].num_columns(); + let mut cols = vec![]; + for c in 0..num_columns { + let to_interleave_columns: Vec<&dyn Array> = batches + .iter() + .map(|x| x.columns()[c].as_ref()) + .collect(); + + let res = arrow::compute::interleave( + to_interleave_columns.as_ref(), + &indices, + ) + .map_err(DataFusionError::ArrowError)?; + + cols.push(res); + } let batch = - RecordBatch::try_new(batch.schema(), columns).unwrap(); + RecordBatch::try_new(batches[0].schema(), cols).unwrap(); // bind timer so it drops w/ this iterator let _ = &timer; @@ -686,14 +694,14 @@ impl RepartitionExec { metrics: RepartitionMetrics, context: Arc, ) -> Result<()> { + let is_hash_partitioning = matches!(partitioning, Partitioning::Hash(_, _)); let mut partitioner = BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; - // execute the child operator let timer = metrics.fetch_time.timer(); let mut stream = input.execute(partition, context)?; timer.done(); - + let mut batches_buffer = vec![]; // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions(); while !output_channels.is_empty() { @@ -703,12 +711,21 @@ impl RepartitionExec { timer.done(); // Input is done - let batch = match result { - Some(result) => result?, - None => break, + let _ = match result { + Some(result) => { + batches_buffer.push(result?); + if is_hash_partitioning + && batches_buffer.len() < partitioner.num_partitions() + { + // Keep buffering batches + continue; + } + } + None if batches_buffer.is_empty() => break, + None => {} }; - for res in partitioner.partition_iter(batch)? { + for res in partitioner.partition_iter(batches_buffer.clone())? { let (partition, batch) = res?; let size = batch.get_array_memory_size();