diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 5e8317c081d9..1a4a6068a3ad 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -256,6 +256,9 @@ config_namespace! { /// query (i.e. [`Span`](sqlparser::tokenizer::Span)) will be collected /// and recorded in the logical plan nodes. pub collect_spans: bool, default = false + + /// Specifies the recursion depth limit when parsing complex SQL Queries + pub recursion_limit: usize, default = 50 } } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index f1abf30c0c54..fb90dda25414 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -483,12 +483,21 @@ impl SessionState { MsSQL, ClickHouse, BigQuery, Ansi." ) })?; - let mut statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?; + + let recursion_limit = self.config.options().sql_parser.recursion_limit; + + let mut statements = DFParser::parse_sql_with_dialect_limit( + sql, + dialect.as_ref(), + recursion_limit, + )?; + if statements.len() > 1 { return not_impl_err!( "The context currently only supports a single SQL statement" ); } + let statement = statements.pop_front().ok_or_else(|| { plan_datafusion_err!("No SQL statements were provided in the query string") })?; @@ -522,7 +531,12 @@ impl SessionState { ) })?; - let expr = DFParser::parse_sql_into_expr_with_dialect(sql, dialect.as_ref())?; + let recursion_limit = self.config.options().sql_parser.recursion_limit; + let expr = DFParser::parse_sql_into_expr_with_dialect_limit( + sql, + dialect.as_ref(), + recursion_limit, + )?; Ok(expr) } diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index b83641acf2ce..a8d4a3ddf3d1 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -180,7 +180,7 @@ pub fn compute_record_batch_statistics( } } -/// Write in Arrow IPC format. +/// Write in Arrow IPC File format. pub struct IPCWriter { /// Path pub path: PathBuf, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index a3e835c64131..cc5fec6af716 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -59,7 +59,7 @@ use arrow::compute::{ }; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use arrow::error::ArrowError; -use arrow::ipc::reader::FileReader; +use arrow::ipc::reader::StreamReader; use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide, JoinType, Result, @@ -1394,7 +1394,7 @@ impl SortMergeJoinStream { if let Some(batch) = buffered_batch.batch { spill_record_batches( - vec![batch], + &[batch], spill_file.path().into(), Arc::clone(&self.buffered_schema), )?; @@ -2270,7 +2270,7 @@ fn fetch_right_columns_from_batch_by_idxs( Vec::with_capacity(buffered_indices.len()); let file = BufReader::new(File::open(spill_file.path())?); - let reader = FileReader::try_new(file, None)?; + let reader = StreamReader::try_new(file, None)?; for batch in reader { batch?.columns().iter().for_each(|column| { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 649468260e56..c06fff354a5a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -24,7 +24,7 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use crate::common::{spawn_buffered, IPCWriter}; +use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; @@ -35,6 +35,7 @@ use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::{ get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, + IPCStreamWriter, }; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; @@ -402,7 +403,7 @@ impl ExternalSorter { let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); let (spilled_rows, spilled_bytes) = spill_record_batches( - batches, + &batches, spill_file.path().into(), Arc::clone(&self.schema), )?; @@ -439,36 +440,35 @@ impl ExternalSorter { // `self.in_mem_batches` is already taken away by the sort_stream, now it is empty. // We'll gradually collect the sorted stream into self.in_mem_batches, or directly // write sorted batches to disk when the memory is insufficient. - let mut spill_writer: Option = None; + let mut spill_writer: Option = None; while let Some(batch) = sorted_stream.next().await { let batch = batch?; - match &mut spill_writer { - None => { - let sorted_size = get_reserved_byte_for_record_batch(&batch); - if self.reservation.try_grow(sorted_size).is_err() { - // Directly write in_mem_batches as well as all the remaining batches in - // sorted_stream to disk. Further batches fetched from `sorted_stream` will - // be handled by the `Some(writer)` matching arm. - let spill_file = - self.runtime.disk_manager.create_tmp_file("Sorting")?; - let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?; - // Flush everything in memory to the spill file - for batch in self.in_mem_batches.drain(..) { - writer.write(&batch)?; - } - // as well as the newly sorted batch - writer.write(&batch)?; - spill_writer = Some(writer); - self.reservation.free(); - self.spills.push(spill_file); - } else { - self.in_mem_batches.push(batch); - self.in_mem_batches_sorted = true; - } - } - Some(writer) => { + + // If we've started spilling, just keep spilling + if let Some(spill_writer) = &mut spill_writer { + spill_writer.write(&batch)?; + continue; + } + + let sorted_size = get_reserved_byte_for_record_batch(&batch); + if self.reservation.try_grow(sorted_size).is_err() { + // Directly write in_mem_batches as well as all the remaining batches in + // sorted_stream to disk. Further batches fetched from `sorted_stream` will + // be handled by the `Some(writer)` matching arm. + let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; + let mut writer = IPCStreamWriter::new(spill_file.path(), &self.schema)?; + // Flush everything in memory to the spill file + for batch in self.in_mem_batches.drain(..) { writer.write(&batch)?; } + // as well as the newly sorted batch + writer.write(&batch)?; + spill_writer = Some(writer); + self.reservation.free(); + self.spills.push(spill_file); + } else { + self.in_mem_batches.push(batch); + self.in_mem_batches_sorted = true; } } diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index b45353ae13f0..330b302552ea 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -23,8 +23,8 @@ use std::path::{Path, PathBuf}; use std::ptr::NonNull; use arrow::array::ArrayData; -use arrow::datatypes::SchemaRef; -use arrow::ipc::reader::FileReader; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::ipc::{reader::StreamReader, writer::StreamWriter}; use arrow::record_batch::RecordBatch; use log::debug; use tokio::sync::mpsc::Sender; @@ -34,7 +34,6 @@ use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::human_readable_size; use datafusion_execution::SendableRecordBatchStream; -use crate::common::IPCWriter; use crate::stream::RecordBatchReceiverStream; /// Read spilled batches from the disk @@ -59,13 +58,13 @@ pub(crate) fn read_spill_as_stream( /// /// Returns total number of the rows spilled to disk. pub(crate) fn spill_record_batches( - batches: Vec, + batches: &[RecordBatch], path: PathBuf, schema: SchemaRef, ) -> Result<(usize, usize)> { - let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; + let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?; for batch in batches { - writer.write(&batch)?; + writer.write(batch)?; } writer.finish()?; debug!( @@ -79,7 +78,7 @@ pub(crate) fn spill_record_batches( fn read_spill(sender: Sender>, path: &Path) -> Result<()> { let file = BufReader::new(File::open(path)?); - let reader = FileReader::try_new(file, None)?; + let reader = StreamReader::try_new(file, None)?; for batch in reader { sender .blocking_send(batch.map_err(Into::into)) @@ -98,7 +97,7 @@ pub fn spill_record_batch_by_size( ) -> Result<()> { let mut offset = 0; let total_rows = batch.num_rows(); - let mut writer = IPCWriter::new(&path, schema.as_ref())?; + let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?; while offset < total_rows { let length = std::cmp::min(total_rows - offset, batch_size_rows); @@ -130,7 +129,7 @@ pub fn spill_record_batch_by_size( /// {xxxxxxxxxxxxxxxxxxx} <--- buffer /// ^ ^ ^ ^ /// | | | | -/// col1->{ } | | +/// col1->{ } | | /// col2--------->{ } /// /// In the above case, `get_record_batch_memory_size` will return the size of @@ -179,6 +178,51 @@ fn count_array_data_memory_size( } } +/// Write in Arrow IPC Stream format to a file. +/// +/// Stream format is used for spill because it supports dictionary replacement, and the random +/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement). +pub(crate) struct IPCStreamWriter { + /// Inner writer + pub writer: StreamWriter, + /// Batches written + pub num_batches: usize, + /// Rows written + pub num_rows: usize, + /// Bytes written + pub num_bytes: usize, +} + +impl IPCStreamWriter { + /// Create new writer + pub fn new(path: &Path, schema: &Schema) -> Result { + let file = File::create(path).map_err(|e| { + exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}") + })?; + Ok(Self { + num_batches: 0, + num_rows: 0, + num_bytes: 0, + writer: StreamWriter::try_new(file, schema)?, + }) + } + + /// Write one single batch + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + self.writer.write(batch)?; + self.num_batches += 1; + self.num_rows += batch.num_rows(); + let num_bytes: usize = batch.get_array_memory_size(); + self.num_bytes += num_bytes; + Ok(()) + } + + /// Finish the writer + pub fn finish(&mut self) -> Result<()> { + self.writer.finish().map_err(Into::into) + } +} + #[cfg(test)] mod tests { use super::*; @@ -190,6 +234,7 @@ mod tests { use datafusion_common::Result; use datafusion_execution::disk_manager::DiskManagerConfig; use datafusion_execution::DiskManager; + use itertools::Itertools; use std::fs::File; use std::io::BufReader; use std::sync::Arc; @@ -214,18 +259,20 @@ mod tests { let schema = batch1.schema(); let num_rows = batch1.num_rows() + batch2.num_rows(); let (spilled_rows, _) = spill_record_batches( - vec![batch1, batch2], + &[batch1, batch2], spill_file.path().into(), Arc::clone(&schema), )?; assert_eq!(spilled_rows, num_rows); let file = BufReader::new(File::open(spill_file.path())?); - let reader = FileReader::try_new(file, None)?; + let reader = StreamReader::try_new(file, None)?; - assert_eq!(reader.num_batches(), 2); assert_eq!(reader.schema(), schema); + let batches = reader.collect_vec(); + assert!(batches.len() == 2); + Ok(()) } @@ -249,11 +296,13 @@ mod tests { )?; let file = BufReader::new(File::open(spill_file.path())?); - let reader = FileReader::try_new(file, None)?; + let reader = StreamReader::try_new(file, None)?; - assert_eq!(reader.num_batches(), 4); assert_eq!(reader.schema(), schema); + let batches = reader.collect_vec(); + assert!(batches.len() == 4); + Ok(()) } diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index 9725166b8ae0..c167e211f0d7 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -257,6 +257,9 @@ fn ensure_not_set(field: &Option, name: &str) -> Result<(), ParserError> { Ok(()) } +/// Same as `sqlparser` +const DEFAULT_RECURSION_LIMIT: usize = 50; + /// DataFusion SQL Parser based on [`sqlparser`] /// /// Parses DataFusion's SQL dialect, often delegating to [`sqlparser`]'s [`Parser`]. @@ -282,12 +285,23 @@ impl<'a> DFParser<'a> { pub fn new_with_dialect( sql: &str, dialect: &'a dyn Dialect, + ) -> Result { + DFParser::new_with_dialect_limit(sql, dialect, DEFAULT_RECURSION_LIMIT) + } + /// Create a new parser for the specified tokens with the + /// specified dialect and recursion limit + pub fn new_with_dialect_limit( + sql: &str, + dialect: &'a dyn Dialect, + recursion_limit: usize, ) -> Result { let mut tokenizer = Tokenizer::new(dialect, sql); let tokens = tokenizer.tokenize_with_location()?; Ok(DFParser { - parser: Parser::new(dialect).with_tokens_with_locations(tokens), + parser: Parser::new(dialect) + .with_tokens_with_locations(tokens) + .with_recursion_limit(recursion_limit), }) } @@ -295,7 +309,7 @@ impl<'a> DFParser<'a> { /// [`GenericDialect`]. pub fn parse_sql(sql: &str) -> Result, ParserError> { let dialect = &GenericDialect {}; - DFParser::parse_sql_with_dialect(sql, dialect) + DFParser::parse_sql_with_dialect_limit(sql, dialect, DEFAULT_RECURSION_LIMIT) } /// Parse a SQL string and produce one or more [`Statement`]s with @@ -304,7 +318,17 @@ impl<'a> DFParser<'a> { sql: &str, dialect: &dyn Dialect, ) -> Result, ParserError> { - let mut parser = DFParser::new_with_dialect(sql, dialect)?; + DFParser::parse_sql_with_dialect_limit(sql, dialect, DEFAULT_RECURSION_LIMIT) + } + + /// Parse a SQL string and produce one or more [`Statement`]s with + /// with the specified dialect and recursion limit + pub fn parse_sql_with_dialect_limit( + sql: &str, + dialect: &dyn Dialect, + recursion_limit: usize, + ) -> Result, ParserError> { + let mut parser = DFParser::new_with_dialect_limit(sql, dialect, recursion_limit)?; let mut stmts = VecDeque::new(); let mut expecting_statement_delimiter = false; loop { @@ -331,7 +355,19 @@ impl<'a> DFParser<'a> { sql: &str, dialect: &dyn Dialect, ) -> Result { - let mut parser = DFParser::new_with_dialect(sql, dialect)?; + DFParser::parse_sql_into_expr_with_dialect_limit( + sql, + dialect, + DEFAULT_RECURSION_LIMIT, + ) + } + + pub fn parse_sql_into_expr_with_dialect_limit( + sql: &str, + dialect: &dyn Dialect, + recursion_limit: usize, + ) -> Result { + let mut parser = DFParser::new_with_dialect_limit(sql, dialect, recursion_limit)?; parser.parse_expr() } diff --git a/datafusion/sql/src/unparser/expr.rs b/datafusion/sql/src/unparser/expr.rs index 7c56969d47cd..87f78d661f51 100644 --- a/datafusion/sql/src/unparser/expr.rs +++ b/datafusion/sql/src/unparser/expr.rs @@ -1624,9 +1624,7 @@ impl Unparser<'_> { DataType::Union(_, _) => { not_impl_err!("Unsupported DataType: conversion: {data_type:?}") } - DataType::Dictionary(_, _) => { - not_impl_err!("Unsupported DataType: conversion: {data_type:?}") - } + DataType::Dictionary(_, val) => self.arrow_dtype_to_ast_dtype(val), DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { let mut new_precision = *precision as u64; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5a1caad46732..f1f2785b370e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -262,6 +262,7 @@ datafusion.sql_parser.dialect generic datafusion.sql_parser.enable_ident_normalization true datafusion.sql_parser.enable_options_value_normalization false datafusion.sql_parser.parse_float_as_decimal false +datafusion.sql_parser.recursion_limit 50 datafusion.sql_parser.support_varchar_with_length true # show all variables with verbose @@ -357,6 +358,7 @@ datafusion.sql_parser.dialect generic Configure the SQL dialect used by DataFusi datafusion.sql_parser.enable_ident_normalization true When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) datafusion.sql_parser.enable_options_value_normalization false When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. datafusion.sql_parser.parse_float_as_decimal false When set to true, SQL parser will parse float as decimal type +datafusion.sql_parser.recursion_limit 50 Specifies the recursion depth limit when parsing complex SQL Queries datafusion.sql_parser.support_varchar_with_length true If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but ignore the length. If false, error if a `VARCHAR` with a length is specified. The Arrow type system does not have a notion of maximum string length and thus DataFusion can not enforce such limits. # show_variable_in_config_options diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 999735f4c059..0ee136387d38 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -127,3 +127,4 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. | | datafusion.sql_parser.support_varchar_with_length | true | If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but ignore the length. If false, error if a `VARCHAR` with a length is specified. The Arrow type system does not have a notion of maximum string length and thus DataFusion can not enforce such limits. | | datafusion.sql_parser.collect_spans | false | When set to true, the source locations relative to the original SQL query (i.e. [`Span`](sqlparser::tokenizer::Span)) will be collected and recorded in the logical plan nodes. | +| datafusion.sql_parser.recursion_limit | 50 | Specifies the recursion depth limit when parsing complex SQL Queries |