From bd289a5b63f887b232bb11dc249817810f850272 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 19 Nov 2024 11:08:56 +0100 Subject: [PATCH 1/4] Encapsulate create table/view construction --- .../core/src/catalog_common/listing_schema.rs | 25 +- .../src/datasource/listing_table_factory.rs | 52 +- datafusion/core/src/execution/context/mod.rs | 14 +- datafusion/expr/src/logical_plan/ddl.rs | 489 +++++++++++++++++- datafusion/expr/src/logical_plan/mod.rs | 8 +- datafusion/proto/src/logical_plan/mod.rs | 97 ++-- datafusion/sql/src/query.rs | 15 +- datafusion/sql/src/statement.rs | 82 +-- 8 files changed, 624 insertions(+), 158 deletions(-) diff --git a/datafusion/core/src/catalog_common/listing_schema.rs b/datafusion/core/src/catalog_common/listing_schema.rs index dc55a07ef82d..6e74baf64f42 100644 --- a/datafusion/core/src/catalog_common/listing_schema.rs +++ b/datafusion/core/src/catalog_common/listing_schema.rs @@ -25,9 +25,7 @@ use std::sync::{Arc, Mutex}; use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory}; use crate::execution::context::SessionState; -use datafusion_common::{ - Constraints, DFSchema, DataFusionError, HashMap, TableReference, -}; +use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; @@ -131,21 +129,12 @@ impl ListingSchemaProvider { .factory .create( state, - &CreateExternalTable { - schema: Arc::new(DFSchema::empty()), - name, - location: table_url, - file_type: self.format.clone(), - table_partition_cols: vec![], - if_not_exists: false, - temporary: false, - definition: None, - order_exprs: vec![], - unbounded: false, - options: Default::default(), - constraints: Constraints::empty(), - column_defaults: Default::default(), - }, + &CreateExternalTable::builder() + .schema(Arc::new(DFSchema::empty())) + .name(name) + .location(table_url) + .file_type(self.format.clone()) + .build()?, ) .await?; let _ = diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 636d1623c5e9..b05a3caafa7d 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -176,10 +176,10 @@ mod tests { datasource::file_format::csv::CsvFormat, execution::context::SessionContext, }; - use datafusion_common::{Constraints, DFSchema, TableReference}; + use datafusion_common::{DFSchema, TableReference}; #[tokio::test] - async fn test_create_using_non_std_file_ext() { + async fn test_create_using_non_std_file_ext() -> Result<()> { let csv_file = tempfile::Builder::new() .prefix("foo") .suffix(".tbl") @@ -190,21 +190,13 @@ mod tests { let context = SessionContext::new(); let state = context.state(); let name = TableReference::bare("foo"); - let cmd = CreateExternalTable { - name, - location: csv_file.path().to_str().unwrap().to_string(), - file_type: "csv".to_string(), - schema: Arc::new(DFSchema::empty()), - table_partition_cols: vec![], - if_not_exists: false, - temporary: false, - definition: None, - order_exprs: vec![], - unbounded: false, - options: HashMap::from([("format.has_header".into(), "true".into())]), - constraints: Constraints::empty(), - column_defaults: HashMap::new(), - }; + let cmd = CreateExternalTable::builder() + .name(name) + .location(csv_file.path().to_str().unwrap().to_string()) + .file_type("csv".to_string()) + .schema(Arc::new(DFSchema::empty())) + .options(HashMap::from([("format.has_header".into(), "true".into())])) + .build()?; let table_provider = factory.create(&state, &cmd).await.unwrap(); let listing_table = table_provider .as_any() @@ -212,10 +204,11 @@ mod tests { .unwrap(); let listing_options = listing_table.options(); assert_eq!(".tbl", listing_options.file_extension); + Ok(()) } #[tokio::test] - async fn test_create_using_non_std_file_ext_csv_options() { + async fn test_create_using_non_std_file_ext_csv_options() -> Result<()> { let csv_file = tempfile::Builder::new() .prefix("foo") .suffix(".tbl") @@ -230,21 +223,13 @@ mod tests { let mut options = HashMap::new(); options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned()); options.insert("format.has_header".into(), "true".into()); - let cmd = CreateExternalTable { - name, - location: csv_file.path().to_str().unwrap().to_string(), - file_type: "csv".to_string(), - schema: Arc::new(DFSchema::empty()), - table_partition_cols: vec![], - if_not_exists: false, - temporary: false, - definition: None, - order_exprs: vec![], - unbounded: false, - options, - constraints: Constraints::empty(), - column_defaults: HashMap::new(), - }; + let cmd = CreateExternalTable::builder() + .name(name) + .location(csv_file.path().to_str().unwrap().to_string()) + .file_type("csv".to_string()) + .schema(Arc::new(DFSchema::empty())) + .options(options) + .build()?; let table_provider = factory.create(&state, &cmd).await.unwrap(); let listing_table = table_provider .as_any() @@ -257,5 +242,6 @@ mod tests { assert_eq!(csv_options.schema_infer_max_rec, Some(1000)); let listing_options = listing_table.options(); assert_eq!(".tbl", listing_options.file_extension); + Ok(()) } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 5f01d41c31e7..bd50d5872e1d 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -41,9 +41,9 @@ use crate::{ logical_expr::ScalarUDF, logical_expr::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, - CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable, - DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, SetVariable, - TableType, UNNAMED_TABLE, + CreateMemoryTable, CreateMemoryTableFields, CreateView, CreateViewFields, + DropCatalogSchema, DropFunction, DropTable, DropView, Execute, LogicalPlan, + LogicalPlanBuilder, Prepare, SetVariable, TableType, UNNAMED_TABLE, }, physical_expr::PhysicalExpr, physical_plan::ExecutionPlan, @@ -792,7 +792,7 @@ impl SessionContext { } async fn create_memory_table(&self, cmd: CreateMemoryTable) -> Result { - let CreateMemoryTable { + let CreateMemoryTableFields { name, input, if_not_exists, @@ -800,7 +800,7 @@ impl SessionContext { constraints, column_defaults, temporary, - } = cmd; + } = cmd.into_fields(); let input = Arc::unwrap_or_clone(input); let input = self.state().optimize(&input)?; @@ -852,13 +852,13 @@ impl SessionContext { } async fn create_view(&self, cmd: CreateView) -> Result { - let CreateView { + let CreateViewFields { name, input, or_replace, definition, temporary, - } = cmd; + } = cmd.into_fields(); let view = self.table(name.clone()).await; diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 8c64a017988e..54d3cdf0f5fb 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -192,11 +192,12 @@ impl DdlStatement { /// Creates an external table. #[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] pub struct CreateExternalTable { - /// The table schema - pub schema: DFSchemaRef, /// The table name pub name: TableReference, + /// The table schema + pub schema: DFSchemaRef, /// The physical location pub location: String, /// The file type of physical file @@ -224,8 +225,8 @@ pub struct CreateExternalTable { // Hashing refers to a subset of fields considered in PartialEq. impl Hash for CreateExternalTable { fn hash(&self, state: &mut H) { - self.schema.hash(state); self.name.hash(state); + self.schema.hash(state); self.location.hash(state); self.file_type.hash(state); self.table_partition_cols.hash(state); @@ -288,8 +289,233 @@ impl PartialOrd for CreateExternalTable { } } +impl CreateExternalTable { + pub fn new(fields: CreateExternalTableFields) -> Result { + let CreateExternalTableFields { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + } = fields; + Ok(Self { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + }) + } + + pub fn into_fields(self) -> CreateExternalTableFields { + let Self { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + } = self; + CreateExternalTableFields { + name, + schema, + location, + file_type, + table_partition_cols, + if_not_exists, + temporary, + definition, + order_exprs, + unbounded, + options, + constraints, + column_defaults, + } + } + + pub fn builder() -> CreateExternalTableBuilder { + CreateExternalTableBuilder::new() + } +} + +/// A struct with same fields as [`CreateExternalTable`] struct so that the DDL can be conveniently +/// destructed with validation that each field is handled, while still requiring that all +/// construction goes through the [`CreateExternalTable::new`] constructor or the builder. +pub struct CreateExternalTableFields { + /// The table name + pub name: TableReference, + /// The table schema + pub schema: DFSchemaRef, + /// The physical location + pub location: String, + /// The file type of physical file + pub file_type: String, + /// Partition Columns + pub table_partition_cols: Vec, + /// Option to not error if table already exists + pub if_not_exists: bool, + /// Whether the table is a temporary table + pub temporary: bool, + /// SQL used to create the table, if available + pub definition: Option, + /// Order expressions supplied by user + pub order_exprs: Vec>, + /// Whether the table is an infinite streams + pub unbounded: bool, + /// Table(provider) specific options + pub options: HashMap, + /// The list of constraints in the schema, such as primary key, unique, etc. + pub constraints: Constraints, + /// Default values for columns + pub column_defaults: HashMap, +} + +/// A builder or [`CreateExternalTable`]. Use [`CreateExternalTable::builder`] to obtain a new builder instance. +pub struct CreateExternalTableBuilder { + name: Option, + schema: Option, + location: Option, + file_type: Option, + table_partition_cols: Vec, + if_not_exists: bool, + temporary: bool, + definition: Option, + order_exprs: Vec>, + unbounded: bool, + options: HashMap, + constraints: Constraints, + column_defaults: HashMap, +} + +impl CreateExternalTableBuilder { + fn new() -> Self { + Self { + name: None, + schema: None, + location: None, + file_type: None, + table_partition_cols: vec![], + if_not_exists: false, + temporary: false, + definition: None, + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Constraints::empty(), + column_defaults: HashMap::new(), + } + } + + pub fn name(mut self, name: TableReference) -> Self { + self.name = Some(name); + self + } + + pub fn schema(mut self, schema: DFSchemaRef) -> Self { + self.schema = Some(schema); + self + } + + pub fn location(mut self, location: String) -> Self { + self.location = Some(location); + self + } + + pub fn file_type(mut self, file_type: String) -> Self { + self.file_type = Some(file_type); + self + } + + pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + pub fn if_not_exists(mut self, if_not_exists: bool) -> Self { + self.if_not_exists = if_not_exists; + self + } + + pub fn temporary(mut self, temporary: bool) -> Self { + self.temporary = temporary; + self + } + + pub fn definition(mut self, definition: Option) -> Self { + self.definition = definition; + self + } + + pub fn order_exprs(mut self, order_exprs: Vec>) -> Self { + self.order_exprs = order_exprs; + self + } + + pub fn unbounded(mut self, unbounded: bool) -> Self { + self.unbounded = unbounded; + self + } + + pub fn options(mut self, options: HashMap) -> Self { + self.options = options; + self + } + + pub fn constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + + pub fn column_defaults(mut self, column_defaults: HashMap) -> Self { + self.column_defaults = column_defaults; + self + } + + pub fn build(self) -> Result { + CreateExternalTable::new(CreateExternalTableFields { + name: self.name.expect("name is required"), + schema: self.schema.expect("schema is required"), + location: self.location.expect("location is required"), + file_type: self.file_type.expect("file_type is required"), + table_partition_cols: self.table_partition_cols, + if_not_exists: self.if_not_exists, + temporary: self.temporary, + definition: self.definition, + order_exprs: self.order_exprs, + unbounded: self.unbounded, + options: self.options, + constraints: self.constraints, + column_defaults: self.column_defaults, + }) + } +} + /// Creates an in memory table. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] +#[non_exhaustive] pub struct CreateMemoryTable { /// The table name pub name: TableReference, @@ -303,12 +529,153 @@ pub struct CreateMemoryTable { pub or_replace: bool, /// Default values for columns pub column_defaults: Vec<(String, Expr)>, - /// Wheter the table is `TableType::Temporary` + /// Whether the table is `TableType::Temporary` + pub temporary: bool, +} + +impl CreateMemoryTable { + pub fn new(fields: CreateMemoryTableFields) -> Result { + let CreateMemoryTableFields { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + } = fields; + Ok(Self { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + }) + } + + pub fn into_fields(self) -> CreateMemoryTableFields { + let Self { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + } = self; + CreateMemoryTableFields { + name, + constraints, + input, + if_not_exists, + or_replace, + column_defaults, + temporary, + } + } + + pub fn builder() -> CreateMemoryTableBuilder { + CreateMemoryTableBuilder::new() + } +} + +/// A struct with same fields as [`CreateMemoryTable`] struct so that the DDL can be conveniently +/// destructed with validation that each field is handled, while still requiring that all +/// construction goes through the [`CreateMemoryTable::new`] constructor or the builder. +pub struct CreateMemoryTableFields { + /// The table name + pub name: TableReference, + /// The list of constraints in the schema, such as primary key, unique, etc. + pub constraints: Constraints, + /// The logical plan + pub input: Arc, + /// Option to not error if table already exists + pub if_not_exists: bool, + /// Option to replace table content if table already exists + pub or_replace: bool, + /// Default values for columns + pub column_defaults: Vec<(String, Expr)>, + /// Whether the table is `TableType::Temporary` pub temporary: bool, } +/// A builder or [`CreateMemoryTable`]. Use [`CreateMemoryTable::builder`] to obtain a new builder instance. +pub struct CreateMemoryTableBuilder { + name: Option, + constraints: Constraints, + input: Option>, + if_not_exists: bool, + or_replace: bool, + column_defaults: Vec<(String, Expr)>, + temporary: bool, +} + +impl CreateMemoryTableBuilder { + fn new() -> Self { + Self { + name: None, + constraints: Constraints::empty(), + input: None, + if_not_exists: false, + or_replace: false, + column_defaults: vec![], + temporary: false, + } + } + + pub fn name(mut self, name: TableReference) -> Self { + self.name = Some(name); + self + } + + pub fn constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + + pub fn input(mut self, input: Arc) -> Self { + self.input = Some(input); + self + } + + pub fn if_not_exists(mut self, if_not_exists: bool) -> Self { + self.if_not_exists = if_not_exists; + self + } + + pub fn or_replace(mut self, or_replace: bool) -> Self { + self.or_replace = or_replace; + self + } + + pub fn column_defaults(mut self, column_defaults: Vec<(String, Expr)>) -> Self { + self.column_defaults = column_defaults; + self + } + + pub fn temporary(mut self, temporary: bool) -> Self { + self.temporary = temporary; + self + } + + pub fn build(self) -> Result { + CreateMemoryTable::new(CreateMemoryTableFields { + name: self.name.expect("name is required"), + constraints: self.constraints, + input: self.input.expect("input is required"), + if_not_exists: self.if_not_exists, + or_replace: self.or_replace, + column_defaults: self.column_defaults, + temporary: self.temporary, + }) + } +} + /// Creates a view. #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] +#[non_exhaustive] pub struct CreateView { /// The table name pub name: TableReference, @@ -318,10 +685,122 @@ pub struct CreateView { pub or_replace: bool, /// SQL used to create the view, if available pub definition: Option, - /// Wheter the view is ephemeral + /// Whether the view is ephemeral pub temporary: bool, } +impl CreateView { + pub fn new(fields: CreateViewFields) -> Result { + let CreateViewFields { + name, + input, + or_replace, + definition, + temporary, + } = fields; + Ok(Self { + name, + input, + or_replace, + definition, + temporary, + }) + } + + pub fn into_fields(self) -> CreateViewFields { + let Self { + name, + input, + or_replace, + definition, + temporary, + } = self; + CreateViewFields { + name, + input, + or_replace, + definition, + temporary, + } + } + + pub fn builder() -> CreateViewBuilder { + CreateViewBuilder::new() + } +} + +/// A struct with same fields as [`CreateView`] struct so that the DDL can be conveniently +/// destructed with validation that each field is handled, while still requiring that all +/// construction goes through the [`CreateView::new`] constructor or the builder. +pub struct CreateViewFields { + /// The table name + pub name: TableReference, + /// The logical plan + pub input: Arc, + /// Option to not error if table already exists + pub or_replace: bool, + /// SQL used to create the view, if available + pub definition: Option, + /// Whether the view is ephemeral + pub temporary: bool, +} + +/// A builder or [`CreateView`]. Use [`CreateView::builder`] to obtain a new builder instance. +pub struct CreateViewBuilder { + name: Option, + input: Option>, + or_replace: bool, + definition: Option, + temporary: bool, +} + +impl CreateViewBuilder { + fn new() -> Self { + Self { + name: None, + input: None, + or_replace: false, + definition: None, + temporary: false, + } + } + + pub fn name(mut self, name: TableReference) -> Self { + self.name = Some(name); + self + } + + pub fn input(mut self, input: Arc) -> Self { + self.input = Some(input); + self + } + + pub fn or_replace(mut self, or_replace: bool) -> Self { + self.or_replace = or_replace; + self + } + + pub fn definition(mut self, definition: Option) -> Self { + self.definition = definition; + self + } + + pub fn temporary(mut self, temporary: bool) -> Self { + self.temporary = temporary; + self + } + + pub fn build(self) -> Result { + CreateView::new(CreateViewFields { + name: self.name.expect("name is required"), + input: self.input.expect("input is required"), + or_replace: self.or_replace, + definition: self.definition, + temporary: self.temporary, + }) + } +} + /// Creates a catalog (aka "Database"). #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CreateCatalog { diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 5d613d4e80db..6be7fc2e25c1 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -29,9 +29,11 @@ pub use builder::{ LogicalPlanBuilder, LogicalTableSource, UNNAMED_TABLE, }; pub use ddl::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, - CreateFunctionBody, CreateIndex, CreateMemoryTable, CreateView, DdlStatement, - DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg, + CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateExternalTableBuilder, + CreateExternalTableFields, CreateFunction, CreateFunctionBody, CreateIndex, + CreateMemoryTable, CreateMemoryTableBuilder, CreateMemoryTableFields, CreateView, + CreateViewBuilder, CreateViewFields, DdlStatement, DropCatalogSchema, DropFunction, + DropTable, DropView, OperateFunctionArg, }; pub use dml::{DmlStatement, WriteOp}; pub use plan::{ diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 50636048ebc9..49e20540fb3d 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -62,9 +62,9 @@ use datafusion_expr::{ dml, logical_plan::{ builder::project, Aggregate, CreateCatalog, CreateCatalogSchema, - CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation, - Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort, - SubqueryAlias, TableScan, Values, Window, + CreateExternalTable, CreateExternalTableFields, CreateView, CreateViewFields, + DdlStatement, Distinct, EmptyRelation, Extension, Join, JoinConstraint, Prepare, + Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window, }, DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, Statement, WindowUDF, @@ -568,7 +568,7 @@ impl AsLogicalPlan for LogicalPlanNode { } Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - CreateExternalTable { + CreateExternalTable::new(CreateExternalTableFields { schema: pb_schema.try_into()?, name: from_table_reference( create_extern_table.name.as_ref(), @@ -587,7 +587,7 @@ impl AsLogicalPlan for LogicalPlanNode { options: create_extern_table.options.clone(), constraints: constraints.into(), column_defaults, - }, + })?, ))) } LogicalPlanType::CreateView(create_view) => { @@ -602,13 +602,18 @@ impl AsLogicalPlan for LogicalPlanNode { None }; - Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name: from_table_reference(create_view.name.as_ref(), "CreateView")?, - temporary: create_view.temporary, - input: Arc::new(plan), - or_replace: create_view.or_replace, - definition, - }))) + Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView::new( + CreateViewFields { + name: from_table_reference( + create_view.name.as_ref(), + "CreateView", + )?, + temporary: create_view.temporary, + input: Arc::new(plan), + or_replace: create_view.or_replace, + definition, + }, + )?))) } LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => { let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| { @@ -1398,7 +1403,9 @@ impl AsLogicalPlan for LogicalPlanNode { )), }), LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - CreateExternalTable { + create_external_table, + )) => { + let CreateExternalTableFields { name, location, file_type, @@ -1412,12 +1419,11 @@ impl AsLogicalPlan for LogicalPlanNode { constraints, column_defaults, temporary, - }, - )) => { + } = create_external_table.clone().into_fields(); let mut converted_order_exprs: Vec = vec![]; for order in order_exprs { let temp = SortExprNodeCollection { - sort_expr_nodes: serialize_sorts(order, extension_codec)?, + sort_expr_nodes: serialize_sorts(&order, extension_codec)?, }; converted_order_exprs.push(temp); } @@ -1425,8 +1431,10 @@ impl AsLogicalPlan for LogicalPlanNode { let mut converted_column_defaults = HashMap::with_capacity(column_defaults.len()); for (col_name, expr) in column_defaults { - converted_column_defaults - .insert(col_name.clone(), serialize_expr(expr, extension_codec)?); + converted_column_defaults.insert( + col_name.clone(), + serialize_expr(&expr, extension_codec)?, + ); } Ok(LogicalPlanNode { @@ -1435,13 +1443,13 @@ impl AsLogicalPlan for LogicalPlanNode { name: Some(name.clone().into()), location: location.clone(), file_type: file_type.clone(), - schema: Some(df_schema.try_into()?), + schema: Some(df_schema.as_ref().try_into()?), table_partition_cols: table_partition_cols.clone(), - if_not_exists: *if_not_exists, - temporary: *temporary, + if_not_exists, + temporary, order_exprs: converted_order_exprs, definition: definition.clone().unwrap_or_default(), - unbounded: *unbounded, + unbounded, options: options.clone(), constraints: Some(constraints.clone().into()), column_defaults: converted_column_defaults, @@ -1449,26 +1457,31 @@ impl AsLogicalPlan for LogicalPlanNode { )), }) } - LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name, - input, - or_replace, - definition, - temporary, - })) => Ok(LogicalPlanNode { - logical_plan_type: Some(LogicalPlanType::CreateView(Box::new( - protobuf::CreateViewNode { - name: Some(name.clone().into()), - input: Some(Box::new(LogicalPlanNode::try_from_logical_plan( - input, - extension_codec, - )?)), - or_replace: *or_replace, - temporary: *temporary, - definition: definition.clone().unwrap_or_default(), - }, - ))), - }), + LogicalPlan::Ddl(DdlStatement::CreateView(create_view)) => { + let CreateViewFields { + name, + input, + or_replace, + definition, + temporary, + } = create_view.clone().into_fields(); + Ok(LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::CreateView(Box::new( + protobuf::CreateViewNode { + name: Some(name.clone().into()), + input: Some(Box::new( + LogicalPlanNode::try_from_logical_plan( + &input, + extension_codec, + )?, + )), + or_replace, + temporary, + definition: definition.clone().unwrap_or_default(), + }, + ))), + }) + } LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema( CreateCatalogSchema { schema_name, diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 740f9ad3b42c..851d583afec1 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; +use datafusion_common::{not_impl_err, DFSchema, Result}; use datafusion_expr::expr::Sort; use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, LogicalPlan, LogicalPlanBuilder, @@ -134,15 +134,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result { match select_into { Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(into.name)?, - constraints: Constraints::empty(), - input: Arc::new(plan), - if_not_exists: false, - or_replace: false, - temporary: false, - column_defaults: vec![], - }, + CreateMemoryTable::builder() + .name(self.object_name_to_table_reference(into.name)?) + .input(Arc::new(plan)) + .build()?, ))), _ => Ok(plan), } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 31b836f32b24..be26a832be17 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -444,15 +444,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )?; Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(name)?, - constraints, - input: Arc::new(plan), - if_not_exists, - or_replace, - column_defaults, - temporary, - }, + CreateMemoryTable::builder() + .name(self.object_name_to_table_reference(name)?) + .constraints(constraints) + .input(Arc::new(plan)) + .if_not_exists(if_not_exists) + .or_replace(or_replace) + .column_defaults(column_defaults) + .temporary(temporary) + .build()?, ))) } @@ -467,15 +467,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan.schema(), )?; Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( - CreateMemoryTable { - name: self.object_name_to_table_reference(name)?, - constraints, - input: Arc::new(plan), - if_not_exists, - or_replace, - column_defaults, - temporary, - }, + CreateMemoryTable::builder() + .name(self.object_name_to_table_reference(name)?) + .constraints(constraints) + .input(Arc::new(plan)) + .if_not_exists(if_not_exists) + .or_replace(or_replace) + .column_defaults(column_defaults) + .temporary(temporary) + .build()?, ))) } } @@ -530,13 +530,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut plan = self.query_to_plan(*query, &mut PlannerContext::new())?; plan = self.apply_expr_alias(plan, columns)?; - Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { - name: self.object_name_to_table_reference(name)?, - input: Arc::new(plan), - or_replace, - definition: sql, - temporary, - }))) + Ok(LogicalPlan::Ddl(DdlStatement::CreateView( + CreateView::builder() + .name(self.object_name_to_table_reference(name)?) + .input(Arc::new(plan)) + .or_replace(or_replace) + .definition(sql) + .temporary(temporary) + .build()?, + ))) } Statement::ShowCreate { obj_type, obj_name } => match obj_type { ShowCreateObject::Table => self.show_create_table_to_plan(obj_name), @@ -1289,21 +1291,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let constraints = Self::new_constraint_from_table_constraints(&all_constraints, &df_schema)?; Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - PlanCreateExternalTable { - schema: df_schema, - name, - location, - file_type, - table_partition_cols, - if_not_exists, - temporary, - definition, - order_exprs: ordered_exprs, - unbounded, - options: options_map, - constraints, - column_defaults, - }, + PlanCreateExternalTable::builder() + .schema(df_schema) + .name(name) + .location(location) + .file_type(file_type) + .table_partition_cols(table_partition_cols) + .if_not_exists(if_not_exists) + .temporary(temporary) + .definition(definition) + .order_exprs(ordered_exprs) + .unbounded(unbounded) + .options(options_map) + .constraints(constraints) + .column_defaults(column_defaults) + .build()?, ))) } From 7dafc6f3702a938e9a33a80f7889c443194adbf3 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 20 Nov 2024 15:28:27 +0100 Subject: [PATCH 2/4] Reject CREATE TABLE/VIEW with duplicate column names DFSchema checks column for uniqueness, but allows duplicate column names when they are qualified differently. This is because DFSchema plays central role during query planning as a identifier resolution scope. Those checks in their current form should not be there, since they prevent execution of queries with duplicate column aliases, which is legal in SQL. But even with these checks present, they are not sufficient to ensure CREATE TABLE/VIEW is well structured. Table or view columns need to have unique names and there is no qualification involved. This commit adds necessary checks in CREATE TABLE/VIEW DDL structs, ensuring that CREATE TABLE/VIEW logical plans are valid in that regard. --- datafusion/common/src/error.rs | 15 ++ datafusion/expr/src/logical_plan/ddl.rs | 135 +++++++++++++++++- .../sqllogictest/test_files/create_table.slt | 20 +++ 3 files changed, 168 insertions(+), 2 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/create_table.slt diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 4fac7298c455..2043fbf5db7a 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -150,6 +150,13 @@ pub enum SchemaError { qualifier: Box, name: String, }, + /// Schema duplicate qualified fields with duplicate unqualified names. This is an error + /// for schema in CREATE TABLE/VIEW statements, because the final object is going to retain + /// and return the unqualified names only. + QualifiedFieldWithDuplicateName { + qualifier: Box, + name: String, + }, /// Schema contains duplicate unqualified field name DuplicateUnqualifiedField { name: String }, /// No field with this name @@ -188,6 +195,14 @@ impl Display for SchemaError { quote_identifier(name) ) } + Self::QualifiedFieldWithDuplicateName { qualifier, name } => { + write!( + f, + "Schema contains qualified fields with duplicate unqualified names {}.{}", + qualifier.to_quoted_string(), + quote_identifier(name) + ) + } Self::DuplicateUnqualifiedField { name } => { write!( f, diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 54d3cdf0f5fb..fc787c81597c 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -17,7 +17,7 @@ use crate::{Expr, LogicalPlan, SortExpr, Volatility}; use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::{ fmt::{self, Display}, @@ -28,7 +28,8 @@ use crate::expr::Sort; use arrow::datatypes::DataType; use datafusion_common::tree_node::{Transformed, TreeNodeContainer, TreeNodeRecursion}; use datafusion_common::{ - Constraints, DFSchemaRef, Result, SchemaReference, TableReference, + schema_err, Column, Constraints, DFSchema, DFSchemaRef, Result, + SchemaError, SchemaReference, TableReference, }; use sqlparser::ast::Ident; @@ -306,6 +307,7 @@ impl CreateExternalTable { constraints, column_defaults, } = fields; + check_fields_unique(&schema)?; Ok(Self { name, schema, @@ -544,6 +546,7 @@ impl CreateMemoryTable { column_defaults, temporary, } = fields; + check_fields_unique(input.schema())?; Ok(Self { name, constraints, @@ -698,6 +701,7 @@ impl CreateView { definition, temporary, } = fields; + check_fields_unique(input.schema())?; Ok(Self { name, input, @@ -800,6 +804,48 @@ impl CreateViewBuilder { }) } } +fn check_fields_unique(schema: &DFSchema) -> Result<()> { + // Use tree set for deterministic error messages + let mut qualified_names = BTreeSet::new(); + let mut unqualified_names = HashSet::new(); + let mut name_occurrences: HashMap<&String, usize> = HashMap::new(); + + for (qualifier, field) in schema.iter() { + if let Some(qualifier) = qualifier { + // Check for duplicate qualified field names + if !qualified_names.insert((qualifier, field.name())) { + return schema_err!(SchemaError::DuplicateQualifiedField { + qualifier: Box::new(qualifier.clone()), + name: field.name().to_string(), + }); + } + // Check for duplicate unqualified field names + } else if !unqualified_names.insert(field.name()) { + return schema_err!(SchemaError::DuplicateUnqualifiedField { + name: field.name().to_string() + }); + } + *name_occurrences.entry(field.name()).or_default() += 1; + } + + for (qualifier, name) in qualified_names { + // Check for duplicate between qualified and unqualified field names + if unqualified_names.contains(name) { + return schema_err!(SchemaError::AmbiguousReference { + field: Column::new(Some(qualifier.clone()), name) + }); + } + // Check for duplicates between qualified names as the qualification will be stripped off + if name_occurrences[name] > 1 { + return schema_err!(SchemaError::QualifiedFieldWithDuplicateName { + qualifier: Box::new(qualifier.clone()), + name: name.to_owned(), + }); + } + } + + Ok(()) +} /// Creates a catalog (aka "Database"). #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -1085,7 +1131,9 @@ impl PartialOrd for CreateIndex { #[cfg(test)] mod test { + use super::*; use crate::{CreateCatalog, DdlStatement, DropView}; + use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{DFSchema, DFSchemaRef, TableReference}; use std::cmp::Ordering; @@ -1112,4 +1160,87 @@ mod test { assert_eq!(drop_view.partial_cmp(&catalog), Some(Ordering::Greater)); } + + #[test] + fn test_check_fields_unique() -> Result<()> { + // no duplicate fields, unqualified schema + check_fields_unique(&DFSchema::try_from(Schema::new(vec![ + Field::new("c100", DataType::Boolean, true), + Field::new("c101", DataType::Boolean, true), + ]))?)?; + + // no duplicate fields, qualified schema + check_fields_unique(&DFSchema::try_from_qualified_schema( + "t1", + &Schema::new(vec![ + Field::new("c100", DataType::Boolean, true), + Field::new("c101", DataType::Boolean, true), + ]), + )?)?; + + // duplicate unqualified field name + assert_eq!( + check_fields_unique(&DFSchema::try_from(Schema::new(vec![ + Field::new("c0", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + Field::new("c2", DataType::Boolean, true), + ]))?) + .unwrap_err() + .strip_backtrace() + .to_string(), + "Schema error: Schema contains duplicate unqualified field name c1" + ); + + // duplicate qualified field with same qualifier + assert_eq!( + DFSchema::try_from_qualified_schema( + "t1", + &Schema::new(vec![ + Field::new("c1", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + ]) + ) + // if schema construction succeeds (due to future changes in DFSchema), call check_fields_unique on it + .unwrap_err() + .strip_backtrace() + .to_string(), + "Schema error: Schema contains duplicate qualified field name t1.c1" + ); + + // duplicate qualified and unqualified field + assert_eq!( + DFSchema::from_field_specific_qualified_schema( + vec![ + None, + Some(TableReference::from("t1")), + ], + &Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + ])) + ) + // if schema construction succeeds (due to future changes in DFSchema), call check_fields_unique on it + .unwrap_err().strip_backtrace().to_string(), + "Schema error: Schema contains qualified field name t1.c1 and unqualified field name c1 which would be ambiguous" + ); + + // qualified fields with duplicate unqualified names + assert_eq!( + check_fields_unique(&DFSchema::from_field_specific_qualified_schema( + vec![ + Some(TableReference::from("t1")), + Some(TableReference::from("t2")), + ], + &Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Boolean, true), + Field::new("c1", DataType::Boolean, true), + ])) + )?) + .unwrap_err().strip_backtrace().to_string(), + "Schema error: Schema contains qualified fields with duplicate unqualified names t1.c1" + ); + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/create_table.slt b/datafusion/sqllogictest/test_files/create_table.slt new file mode 100644 index 000000000000..3e30d7486f29 --- /dev/null +++ b/datafusion/sqllogictest/test_files/create_table.slt @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Issue https://github.com/apache/datafusion/issues/13487 +statement error DataFusion error: Schema error: Schema contains qualified fields with duplicate unqualified names l\.id +CREATE TABLE t AS SELECT * FROM (SELECT 1 AS id, 'Foo' AS name) l JOIN (SELECT 1 AS id, 'Bar' as name) r ON l.id = r.id; From 7d4b3e74eb3e93e6d2726a26d6370066f5805a9a Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Sat, 23 Nov 2024 13:54:42 +0100 Subject: [PATCH 3/4] fmt --- datafusion/expr/src/logical_plan/ddl.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index fc787c81597c..a2fe7a2ef82f 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -28,8 +28,8 @@ use crate::expr::Sort; use arrow::datatypes::DataType; use datafusion_common::tree_node::{Transformed, TreeNodeContainer, TreeNodeRecursion}; use datafusion_common::{ - schema_err, Column, Constraints, DFSchema, DFSchemaRef, Result, - SchemaError, SchemaReference, TableReference, + schema_err, Column, Constraints, DFSchema, DFSchemaRef, Result, SchemaError, + SchemaReference, TableReference, }; use sqlparser::ast::Ident; From e0ecbb86d4d4c86e6f7cef5742dc09719df54c9c Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 28 Nov 2024 22:17:27 +0100 Subject: [PATCH 4/4] Rename builders' new() to try_new() --- datafusion/expr/src/logical_plan/ddl.rs | 18 +++++++++--------- datafusion/proto/src/logical_plan/mod.rs | 10 +++++----- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index a2fe7a2ef82f..9bf67fd727f2 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -291,7 +291,7 @@ impl PartialOrd for CreateExternalTable { } impl CreateExternalTable { - pub fn new(fields: CreateExternalTableFields) -> Result { + pub fn try_new(fields: CreateExternalTableFields) -> Result { let CreateExternalTableFields { name, schema, @@ -365,7 +365,7 @@ impl CreateExternalTable { /// A struct with same fields as [`CreateExternalTable`] struct so that the DDL can be conveniently /// destructed with validation that each field is handled, while still requiring that all -/// construction goes through the [`CreateExternalTable::new`] constructor or the builder. +/// construction goes through the [`CreateExternalTable::try_new`] constructor or the builder. pub struct CreateExternalTableFields { /// The table name pub name: TableReference, @@ -497,7 +497,7 @@ impl CreateExternalTableBuilder { } pub fn build(self) -> Result { - CreateExternalTable::new(CreateExternalTableFields { + CreateExternalTable::try_new(CreateExternalTableFields { name: self.name.expect("name is required"), schema: self.schema.expect("schema is required"), location: self.location.expect("location is required"), @@ -536,7 +536,7 @@ pub struct CreateMemoryTable { } impl CreateMemoryTable { - pub fn new(fields: CreateMemoryTableFields) -> Result { + pub fn try_new(fields: CreateMemoryTableFields) -> Result { let CreateMemoryTableFields { name, constraints, @@ -586,7 +586,7 @@ impl CreateMemoryTable { /// A struct with same fields as [`CreateMemoryTable`] struct so that the DDL can be conveniently /// destructed with validation that each field is handled, while still requiring that all -/// construction goes through the [`CreateMemoryTable::new`] constructor or the builder. +/// construction goes through the [`CreateMemoryTable::try_new`] constructor or the builder. pub struct CreateMemoryTableFields { /// The table name pub name: TableReference, @@ -664,7 +664,7 @@ impl CreateMemoryTableBuilder { } pub fn build(self) -> Result { - CreateMemoryTable::new(CreateMemoryTableFields { + CreateMemoryTable::try_new(CreateMemoryTableFields { name: self.name.expect("name is required"), constraints: self.constraints, input: self.input.expect("input is required"), @@ -693,7 +693,7 @@ pub struct CreateView { } impl CreateView { - pub fn new(fields: CreateViewFields) -> Result { + pub fn try_new(fields: CreateViewFields) -> Result { let CreateViewFields { name, input, @@ -735,7 +735,7 @@ impl CreateView { /// A struct with same fields as [`CreateView`] struct so that the DDL can be conveniently /// destructed with validation that each field is handled, while still requiring that all -/// construction goes through the [`CreateView::new`] constructor or the builder. +/// construction goes through the [`CreateView::try_new`] constructor or the builder. pub struct CreateViewFields { /// The table name pub name: TableReference, @@ -795,7 +795,7 @@ impl CreateViewBuilder { } pub fn build(self) -> Result { - CreateView::new(CreateViewFields { + CreateView::try_new(CreateViewFields { name: self.name.expect("name is required"), input: self.input.expect("input is required"), or_replace: self.or_replace, diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 49e20540fb3d..3c295905b6cf 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -568,7 +568,7 @@ impl AsLogicalPlan for LogicalPlanNode { } Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( - CreateExternalTable::new(CreateExternalTableFields { + CreateExternalTable::try_new(CreateExternalTableFields { schema: pb_schema.try_into()?, name: from_table_reference( create_extern_table.name.as_ref(), @@ -602,8 +602,8 @@ impl AsLogicalPlan for LogicalPlanNode { None }; - Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView::new( - CreateViewFields { + Ok(LogicalPlan::Ddl(DdlStatement::CreateView( + CreateView::try_new(CreateViewFields { name: from_table_reference( create_view.name.as_ref(), "CreateView", @@ -612,8 +612,8 @@ impl AsLogicalPlan for LogicalPlanNode { input: Arc::new(plan), or_replace: create_view.or_replace, definition, - }, - )?))) + })?, + ))) } LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => { let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {