Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

patches 2025-02-25 #7

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ config_namespace! {
/// query (i.e. [`Span`](sqlparser::tokenizer::Span)) will be collected
/// and recorded in the logical plan nodes.
pub collect_spans: bool, default = false

/// Specifies the recursion depth limit when parsing complex SQL Queries
pub recursion_limit: usize, default = 50
}
}

Expand Down
18 changes: 16 additions & 2 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,21 @@ impl SessionState {
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;
let mut statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;

let recursion_limit = self.config.options().sql_parser.recursion_limit;

let mut statements = DFParser::parse_sql_with_dialect_limit(
sql,
dialect.as_ref(),
recursion_limit,
)?;

if statements.len() > 1 {
return not_impl_err!(
"The context currently only supports a single SQL statement"
);
}

let statement = statements.pop_front().ok_or_else(|| {
plan_datafusion_err!("No SQL statements were provided in the query string")
})?;
Expand Down Expand Up @@ -522,7 +531,12 @@ impl SessionState {
)
})?;

let expr = DFParser::parse_sql_into_expr_with_dialect(sql, dialect.as_ref())?;
let recursion_limit = self.config.options().sql_parser.recursion_limit;
let expr = DFParser::parse_sql_into_expr_with_dialect_limit(
sql,
dialect.as_ref(),
recursion_limit,
)?;

Ok(expr)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub fn compute_record_batch_statistics(
}
}

/// Write in Arrow IPC format.
/// Write in Arrow IPC File format.
pub struct IPCWriter {
/// Path
pub path: PathBuf,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use arrow::compute::{
};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use arrow::ipc::reader::FileReader;
use arrow::ipc::reader::StreamReader;
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide,
JoinType, Result,
Expand Down Expand Up @@ -1394,7 +1394,7 @@ impl SortMergeJoinStream {

if let Some(batch) = buffered_batch.batch {
spill_record_batches(
vec![batch],
&[batch],
spill_file.path().into(),
Arc::clone(&self.buffered_schema),
)?;
Expand Down Expand Up @@ -2270,7 +2270,7 @@ fn fetch_right_columns_from_batch_by_idxs(
Vec::with_capacity(buffered_indices.len());

let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;

for batch in reader {
batch?.columns().iter().for_each(|column| {
Expand Down
56 changes: 28 additions & 28 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use crate::common::{spawn_buffered, IPCWriter};
use crate::common::spawn_buffered;
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
use crate::expressions::PhysicalSortExpr;
use crate::limit::LimitStream;
Expand All @@ -35,6 +35,7 @@ use crate::projection::{make_with_child, update_expr, ProjectionExec};
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::{
get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
IPCStreamWriter,
};
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
Expand Down Expand Up @@ -402,7 +403,7 @@ impl ExternalSorter {
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let batches = std::mem::take(&mut self.in_mem_batches);
let (spilled_rows, spilled_bytes) = spill_record_batches(
batches,
&batches,
spill_file.path().into(),
Arc::clone(&self.schema),
)?;
Expand Down Expand Up @@ -439,36 +440,35 @@ impl ExternalSorter {
// `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
// write sorted batches to disk when the memory is insufficient.
let mut spill_writer: Option<IPCWriter> = None;
let mut spill_writer: Option<IPCStreamWriter> = None;
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
match &mut spill_writer {
None => {
let sorted_size = get_reserved_byte_for_record_batch(&batch);
if self.reservation.try_grow(sorted_size).is_err() {
// Directly write in_mem_batches as well as all the remaining batches in
// sorted_stream to disk. Further batches fetched from `sorted_stream` will
// be handled by the `Some(writer)` matching arm.
let spill_file =
self.runtime.disk_manager.create_tmp_file("Sorting")?;
let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?;
// Flush everything in memory to the spill file
for batch in self.in_mem_batches.drain(..) {
writer.write(&batch)?;
}
// as well as the newly sorted batch
writer.write(&batch)?;
spill_writer = Some(writer);
self.reservation.free();
self.spills.push(spill_file);
} else {
self.in_mem_batches.push(batch);
self.in_mem_batches_sorted = true;
}
}
Some(writer) => {

// If we've started spilling, just keep spilling
if let Some(spill_writer) = &mut spill_writer {
spill_writer.write(&batch)?;
continue;
}

let sorted_size = get_reserved_byte_for_record_batch(&batch);
if self.reservation.try_grow(sorted_size).is_err() {
// Directly write in_mem_batches as well as all the remaining batches in
// sorted_stream to disk. Further batches fetched from `sorted_stream` will
// be handled by the `Some(writer)` matching arm.
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let mut writer = IPCStreamWriter::new(spill_file.path(), &self.schema)?;
// Flush everything in memory to the spill file
for batch in self.in_mem_batches.drain(..) {
writer.write(&batch)?;
}
// as well as the newly sorted batch
writer.write(&batch)?;
spill_writer = Some(writer);
self.reservation.free();
self.spills.push(spill_file);
} else {
self.in_mem_batches.push(batch);
self.in_mem_batches_sorted = true;
}
}

Expand Down
77 changes: 63 additions & 14 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::path::{Path, PathBuf};
use std::ptr::NonNull;

use arrow::array::ArrayData;
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
use arrow::record_batch::RecordBatch;
use log::debug;
use tokio::sync::mpsc::Sender;
Expand All @@ -34,7 +34,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::SendableRecordBatchStream;

use crate::common::IPCWriter;
use crate::stream::RecordBatchReceiverStream;

/// Read spilled batches from the disk
Expand All @@ -59,13 +58,13 @@ pub(crate) fn read_spill_as_stream(
///
/// Returns total number of the rows spilled to disk.
pub(crate) fn spill_record_batches(
batches: Vec<RecordBatch>,
batches: &[RecordBatch],
path: PathBuf,
schema: SchemaRef,
) -> Result<(usize, usize)> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
writer.write(batch)?;
}
writer.finish()?;
debug!(
Expand All @@ -79,7 +78,7 @@ pub(crate) fn spill_record_batches(

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
Expand All @@ -98,7 +97,7 @@ pub fn spill_record_batch_by_size(
) -> Result<()> {
let mut offset = 0;
let total_rows = batch.num_rows();
let mut writer = IPCWriter::new(&path, schema.as_ref())?;
let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?;

while offset < total_rows {
let length = std::cmp::min(total_rows - offset, batch_size_rows);
Expand Down Expand Up @@ -130,7 +129,7 @@ pub fn spill_record_batch_by_size(
/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
/// ^ ^ ^ ^
/// | | | |
/// col1->{ } | |
/// col1->{ } | |
/// col2--------->{ }
///
/// In the above case, `get_record_batch_memory_size` will return the size of
Expand Down Expand Up @@ -179,6 +178,51 @@ fn count_array_data_memory_size(
}
}

/// Write in Arrow IPC Stream format to a file.
///
/// Stream format is used for spill because it supports dictionary replacement, and the random
/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement).
pub(crate) struct IPCStreamWriter {
/// Inner writer
pub writer: StreamWriter<File>,
/// Batches written
pub num_batches: usize,
/// Rows written
pub num_rows: usize,
/// Bytes written
pub num_bytes: usize,
}

impl IPCStreamWriter {
/// Create new writer
pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
let file = File::create(path).map_err(|e| {
exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
})?;
Ok(Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
writer: StreamWriter::try_new(file, schema)?,
})
}

/// Write one single batch
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows();
let num_bytes: usize = batch.get_array_memory_size();
self.num_bytes += num_bytes;
Ok(())
}

/// Finish the writer
pub fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(Into::into)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -190,6 +234,7 @@ mod tests {
use datafusion_common::Result;
use datafusion_execution::disk_manager::DiskManagerConfig;
use datafusion_execution::DiskManager;
use itertools::Itertools;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
Expand All @@ -214,18 +259,20 @@ mod tests {
let schema = batch1.schema();
let num_rows = batch1.num_rows() + batch2.num_rows();
let (spilled_rows, _) = spill_record_batches(
vec![batch1, batch2],
&[batch1, batch2],
spill_file.path().into(),
Arc::clone(&schema),
)?;
assert_eq!(spilled_rows, num_rows);

let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 2);
assert_eq!(reader.schema(), schema);

let batches = reader.collect_vec();
assert!(batches.len() == 2);

Ok(())
}

Expand All @@ -249,11 +296,13 @@ mod tests {
)?;

let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
let reader = StreamReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 4);
assert_eq!(reader.schema(), schema);

let batches = reader.collect_vec();
assert!(batches.len() == 4);

Ok(())
}

Expand Down
Loading