From 912a35a2b563df6985aa0c14a5e0ece5ce222e03 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 30 Jan 2025 18:20:54 +0800 Subject: [PATCH] name conflict test & refine metadata index --- .../examples/metadata_columns.rs | 62 +++----- datafusion/catalog/src/table.rs | 14 +- datafusion/common/src/dfschema.rs | 106 ++++++++----- datafusion/common/src/lib.rs | 4 +- .../core/src/execution/session_state.rs | 6 +- datafusion/core/tests/sql/metadata_columns.rs | 144 +++++++++++++----- datafusion/expr/src/logical_plan/plan.rs | 48 +++--- 7 files changed, 229 insertions(+), 155 deletions(-) diff --git a/datafusion-examples/examples/metadata_columns.rs b/datafusion-examples/examples/metadata_columns.rs index 503fb7c403ab..130835b67c8c 100644 --- a/datafusion-examples/examples/metadata_columns.rs +++ b/datafusion-examples/examples/metadata_columns.rs @@ -21,7 +21,6 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use arrow::array::{ArrayRef, StringArray, UInt64Array}; -use arrow_schema::SchemaBuilder; use async_trait::async_trait; use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -33,14 +32,14 @@ use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ - project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, - PlanProperties, SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, }; use datafusion::prelude::*; use datafusion::catalog::Session; -use datafusion_common::METADATA_OFFSET; +use datafusion_common::{extract_field_index, FieldIndex}; use itertools::Itertools; use tokio::time::timeout; @@ -120,9 +119,8 @@ impl CustomDataSource { pub(crate) async fn create_physical_plan( &self, projections: Option<&Vec>, - schema: SchemaRef, ) -> Result> { - Ok(Arc::new(CustomExec::new(projections, schema, self.clone()))) + Ok(Arc::new(CustomExec::new(projections, self.clone()))) } pub(crate) fn populate_users(&self) { @@ -189,33 +187,7 @@ impl TableProvider for CustomDataSource { _filters: &[Expr], _limit: Option, ) -> Result> { - let mut schema = self.schema(); - let size = schema.fields.len(); - if let Some(metadata) = self.metadata_columns() { - let mut builder = SchemaBuilder::from(schema.as_ref()); - for f in metadata.fields.iter() { - builder.try_merge(f)?; - } - schema = Arc::new(builder.finish()); - } - - let projection = match projection { - Some(projection) => { - let projection = projection - .iter() - .map(|idx| { - if *idx >= METADATA_OFFSET { - *idx - METADATA_OFFSET + size - } else { - *idx - } - }) - .collect_vec(); - Some(projection) - } - None => None, - }; - return self.create_physical_plan(projection.as_ref(), schema).await; + return self.create_physical_plan(projection).await; } } @@ -227,12 +199,24 @@ struct CustomExec { } impl CustomExec { - fn new( - projections: Option<&Vec>, - schema: SchemaRef, - db: CustomDataSource, - ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + fn new(projections: Option<&Vec>, db: CustomDataSource) -> Self { + let schema = db.schema(); + let metadata_schema = db.metadata_columns(); + let projected_schema = match projections { + Some(projection) => { + let projection = projection + .iter() + .map(|idx| match extract_field_index(*idx) { + FieldIndex::NormalIndex(i) => Arc::new(schema.field(i).clone()), + FieldIndex::MetadataIndex(i) => { + Arc::new(metadata_schema.as_ref().unwrap().field(i).clone()) + } + }) + .collect_vec(); + Arc::new(Schema::new(projection)) + } + None => schema, + }; let cache = Self::compute_properties(projected_schema.clone()); Self { db, diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index e3996f3c3188..9da6f9ab2048 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -56,7 +56,7 @@ pub trait TableProvider: Debug + Sync + Send { fn schema(&self) -> SchemaRef; /// Return a reference to the schema for metadata columns. - /// + /// /// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. /// For example, `ctid` in Postgres would be considered a metadata column /// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. @@ -64,16 +64,16 @@ pub trait TableProvider: Debug + Sync + Send { /// /// You can use this method to declare which columns in the table are "metadata" columns. /// See `datafusion/core/tests/sql/metadata_columns.rs` for an example of this in action. - /// + /// /// As an example of how this works in practice, if you have the following Postgres table: - /// + /// /// ```sql /// CREATE TABLE t (x int); /// INSERT INTO t VALUES (1); /// ``` - /// + /// /// And you do a `SELECT * FROM t`, you would get the following schema: - /// + /// /// ```text /// +---+ /// | x | @@ -81,9 +81,9 @@ pub trait TableProvider: Debug + Sync + Send { /// | 1 | /// +---+ /// ``` - /// + /// /// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): - /// + /// /// ```text /// +-----+---+ /// | ctid| x | diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 699a5b37cc6b..abdc18356147 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -104,12 +104,12 @@ pub type DFSchemaRef = Arc; /// let schema = Schema::from(df_schema); /// assert_eq!(schema.fields().len(), 1); /// ``` -/// +/// /// DFSchema also supports metadata columns. /// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. /// For example, the [`ctid` column in Postgres](https://www.postgresql.org/docs/current/ddl-system-columns.html) /// or the [`_metadata` column that in Spark](https://docs.databricks.com/en/ingestion/file-metadata-column.html). -/// +/// /// These columns are stored in a separate schema from the main schema, which can be accessed using [DFSchema::metadata_schema]. /// To build a schema with metadata columns, use [DFSchema::new_with_metadata]: /// ```rust @@ -117,17 +117,17 @@ pub type DFSchemaRef = Arc; /// use arrow_schema::{Field, Schema}; /// use arrow::datatypes::DataType; /// use std::collections::HashMap; -/// +/// /// let schema = Schema::new(vec![ /// Field::new("c1", DataType::Int32, false), /// ]); /// let metadata_schema = Schema::new(vec![ /// Field::new("file", DataType::Utf8, false), /// ]); -/// +/// /// let df_schema = DFSchema::new_with_metadata( /// vec![(None, Field::new("c1", DataType::Int32, false).into())], -/// +/// #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { inner: QualifiedSchema, @@ -142,7 +142,26 @@ pub struct DFSchema { /// The starting point of the metadata column index. /// If an index is less than this value, then this index is for an ordinary column. /// If it is greater than this value, then this index is for a metadata column. -pub const METADATA_OFFSET: usize = usize::MAX >> 1; +const METADATA_OFFSET: usize = usize::MAX >> 1; + +pub enum FieldIndex { + MetadataIndex(usize), + NormalIndex(usize), +} + +/// There are two types of fields, one is normal field, the other is metadata field. +/// Extract the real field index. +/// +/// Returns: +/// - `FieldIndex::MetadataIndex` if the index is metadata field index +/// - `FieldIndex::NormalIndex` if the index is normal field index +pub fn extract_field_index(index: usize) -> FieldIndex { + if index >= METADATA_OFFSET { + FieldIndex::MetadataIndex(index - METADATA_OFFSET) + } else { + FieldIndex::NormalIndex(index) + } +} /// QualifiedSchema wraps an Arrow schema and field qualifiers. /// Some fields may be qualified and some unqualified. @@ -160,7 +179,6 @@ pub struct QualifiedSchema { /// A table schema that holds not just column names but also the name of the table they belong to. /// For example, consider `table_name.column_name` (qualified) vs. just `column_name` (unqualified). impl QualifiedSchema { - /// Creates an empty `QualifiedSchema`. pub fn empty() -> Self { Self { @@ -172,23 +190,24 @@ impl QualifiedSchema { /// Creates a new `QualifiedSchema` from an Arrow schema and a list of table references. /// The table references must be of the same length as the fields in the schema and /// follow the same order. - pub fn new(schema: SchemaRef, field_qualifiers: Vec>) -> Result { + pub fn new( + schema: SchemaRef, + field_qualifiers: Vec>, + ) -> Result { if schema.fields().len() != field_qualifiers.len() { return _schema_err!(SchemaError::UnmatchedFieldQualifiers { field_count: schema.fields().len(), qualifier_count: field_qualifiers.len(), }); } - Ok( - QualifiedSchema { - schema, - field_qualifiers, - } - ) + Ok(QualifiedSchema { + schema, + field_qualifiers, + }) } /// Create a new `QualifiedSchema` from a list of Arrow [Field]s where they all share the same [TableReference]. - /// + /// /// For example, to create a schema for a table with all fields qualified by `table_name`: /// ```rust /// use datafusion_common::{QualifiedSchema, TableReference}; @@ -200,7 +219,7 @@ impl QualifiedSchema { /// let table_name = TableReference::from("table_name"); /// let qualified_schema = QualifiedSchema::new_with_table(schema, &table_name); /// ``` - /// + /// /// To create a schema where fields have different qualifiers, use [QualifiedSchema::new]. pub fn new_with_table(schema: SchemaRef, table_name: &TableReference) -> Self { let field_qualifiers = schema @@ -208,11 +227,12 @@ impl QualifiedSchema { .iter() .map(|_| Some(table_name.clone())) .collect(); - Self::new(schema, field_qualifiers).expect("field qualifier length should match schema") + Self::new(schema, field_qualifiers) + .expect("field qualifier length should match schema") } /// Checks if the schema is empty. - /// + /// /// Returns: /// - `true` if the schema has no fields /// - `false` if it has any fields, qualified or unqualified @@ -226,7 +246,7 @@ impl QualifiedSchema { } /// Look up the field by it's unqualified name. - /// + /// /// This returns a Vec of fields and their qualifier for any field that have the given unqualified name. /// For example, given the fields `table1.a`, `table1.b` and `table2.a` if you search for `a` you will get `table1.a` and `table2.a` /// as [(`table1`, `a`), (`table2`, `a`)]. @@ -275,9 +295,9 @@ impl QualifiedSchema { } /// Search for a field using it's qualified name. - /// + /// /// This will return the field if it exists, otherwise it will return `None`. - /// + /// /// For example, given the fields `table1.a`, `table1.b` and `table2.a` if you search for (`table1`, `a`) you will get the [Field] for `a` back. pub fn field_with_qualified_name( &self, @@ -336,17 +356,17 @@ impl DFSchema { } } - /// Return a reference to the schema for metadata columns. - /// - /// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. + /// Return a reference to the schema for metadata columns. + /// + /// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. /// For example, the [`ctid` column in Postgres](https://www.postgresql.org/docs/current/ddl-system-columns.html) /// or the [`_metadata` column that in Spark](https://docs.databricks.com/en/ingestion/file-metadata-column.html). - /// + /// /// Implementers of [`datafusion::datasource::TableProvider`] can use this declare which columns in the table are "metadata" columns. /// See also [`datafusion::datasource::TableProvider::metadata_columns`] for more information or `datafusion/core/tests/sql/metadata_columns.rs` for a full example. - /// - /// Returns: - /// - `&None` for tables that do not have metadata columns. + /// + /// Returns: + /// - `&None` for tables that do not have metadata columns. /// - `&Some(QualifiedSchema)` for tables having metadata columns. pub fn metadata_schema(&self) -> &Option { &self.metadata @@ -420,7 +440,8 @@ impl DFSchema { let field_count = fields.len(); let schema = Arc::new(Schema::new_with_metadata(fields, metadata)); let dfschema = Self { - inner: QualifiedSchema::new(schema, vec![None; field_count]).expect("qualifiers length is hardcoded to be the same as fields length"), + inner: QualifiedSchema::new(schema, vec![None; field_count]) + .expect("qualifiers length is hardcoded to be the same as fields length"), functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -441,7 +462,8 @@ impl DFSchema { inner: QualifiedSchema::new( schema.clone().into(), vec![Some(qualifier); schema.fields.len()], - ).expect("qualifiers and fields have the same length"), + ) + .expect("qualifiers and fields have the same length"), functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -710,8 +732,10 @@ impl DFSchema { /// Find all fields that match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { let mut fields: Vec<&Field> = self.inner.fields_with_unqualified_name(name); - if let Some(schema) = self.metadata_schema() { - fields.extend(schema.fields_with_unqualified_name(name)); + if fields.is_empty() { + if let Some(schema) = self.metadata_schema() { + fields.extend(schema.fields_with_unqualified_name(name)); + } } fields } @@ -723,8 +747,10 @@ impl DFSchema { ) -> Vec<(Option<&TableReference>, &Field)> { let mut fields: Vec<(Option<&TableReference>, &Field)> = self.inner.qualified_fields_with_unqualified_name(name); - if let Some(schema) = self.metadata_schema() { - fields.extend(schema.qualified_fields_with_unqualified_name(name)); + if fields.is_empty() { + if let Some(schema) = self.metadata_schema() { + fields.extend(schema.qualified_fields_with_unqualified_name(name)); + } } fields } @@ -1054,7 +1080,8 @@ impl DFSchema { pub fn strip_qualifiers(self) -> Self { let len = self.inner.len(); DFSchema { - inner: QualifiedSchema::new(self.inner.schema, vec![None; len]).expect("qualifier length is hardcoded to be the same as fields length"), + inner: QualifiedSchema::new(self.inner.schema, vec![None; len]) + .expect("qualifier length is hardcoded to be the same as fields length"), functional_dependencies: self.functional_dependencies, metadata: self.metadata, } @@ -1065,7 +1092,8 @@ impl DFSchema { let qualifier = qualifier.into(); let len = self.inner.len(); DFSchema { - inner: QualifiedSchema::new(self.inner.schema, vec![Some(qualifier); len]).expect("qualifier length is hardcoded to be the same as fields length"), + inner: QualifiedSchema::new(self.inner.schema, vec![Some(qualifier); len]) + .expect("qualifier length is hardcoded to be the same as fields length"), functional_dependencies: self.functional_dependencies, metadata: self.metadata, } @@ -1562,7 +1590,8 @@ mod tests { inner: QualifiedSchema::new( Arc::clone(&arrow_schema_ref), vec![None; arrow_schema_ref.fields.len()], - ).unwrap(), + ) + .unwrap(), functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -1611,7 +1640,8 @@ mod tests { inner: QualifiedSchema::new( Arc::clone(&schema), vec![None; schema.fields.len()], - ).unwrap(), + ) + .unwrap(), functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 106f28103523..a44d7ce0fa90 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -52,8 +52,8 @@ pub mod utils; pub use arrow; pub use column::Column; pub use dfschema::{ - qualified_name, DFSchema, DFSchemaRef, ExprSchema, QualifiedSchema, SchemaExt, - ToDFSchema, METADATA_OFFSET, + extract_field_index, qualified_name, DFSchema, DFSchemaRef, ExprSchema, FieldIndex, + QualifiedSchema, SchemaExt, ToDFSchema, }; pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index c5874deb6ed5..f3f551cd52b7 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -105,11 +105,11 @@ use uuid::Uuid; /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let state = SessionStateBuilder::new() -/// .with_config(SessionConfig::new()) +/// .with_config(SessionConfig::new()) /// .with_runtime_env(Arc::new(RuntimeEnv::default())) /// .with_default_features() /// .build(); -/// Ok(()) +/// Ok(()) /// # } /// ``` /// @@ -1326,7 +1326,7 @@ impl SessionStateBuilder { /// let url = Url::try_from("file://").unwrap(); /// let object_store = object_store::local::LocalFileSystem::new(); /// let state = SessionStateBuilder::new() - /// .with_config(SessionConfig::new()) + /// .with_config(SessionConfig::new()) /// .with_object_store(&url, Arc::new(object_store)) /// .with_default_features() /// .build(); diff --git a/datafusion/core/tests/sql/metadata_columns.rs b/datafusion/core/tests/sql/metadata_columns.rs index f8c52940afbc..b099433ca444 100644 --- a/datafusion/core/tests/sql/metadata_columns.rs +++ b/datafusion/core/tests/sql/metadata_columns.rs @@ -20,7 +20,6 @@ use std::fmt::{self, Debug, Formatter}; use std::sync::{Arc, Mutex}; use arrow_array::{ArrayRef, StringArray, UInt64Array}; -use arrow_schema::SchemaBuilder; use async_trait::async_trait; use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -32,13 +31,13 @@ use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ - project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, - PlanProperties, SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, }; use datafusion::{assert_batches_sorted_eq, prelude::*}; use datafusion::catalog::Session; -use datafusion_common::METADATA_OFFSET; +use datafusion_common::{extract_field_index, FieldIndex}; use itertools::Itertools; /// A User, with an id and a bank account @@ -51,6 +50,7 @@ struct User { /// A custom datasource, used to represent a datastore with a single index #[derive(Clone)] pub struct CustomDataSource { + test_conflict_name: bool, inner: Arc>, metadata_columns: SchemaRef, } @@ -69,9 +69,12 @@ impl CustomDataSource { pub(crate) async fn create_physical_plan( &self, projections: Option<&Vec>, - schema: SchemaRef, ) -> Result> { - Ok(Arc::new(CustomExec::new(projections, schema, self.clone()))) + Ok(Arc::new(CustomExec::new( + self.test_conflict_name, + projections, + self.clone(), + ))) } pub(crate) fn populate_users(&self) { @@ -93,11 +96,20 @@ impl CustomDataSource { let mut inner = self.inner.lock().unwrap(); inner.data.push(user); } + + fn with_conflict_name(&self) -> Self { + CustomDataSource { + test_conflict_name: true, + inner: self.inner.clone(), + metadata_columns: self.metadata_columns.clone(), + } + } } impl Default for CustomDataSource { fn default() -> Self { CustomDataSource { + test_conflict_name: false, inner: Arc::new(Mutex::new(CustomDataSourceInner { data: Default::default(), })), @@ -116,10 +128,17 @@ impl TableProvider for CustomDataSource { } fn schema(&self) -> SchemaRef { - SchemaRef::new(Schema::new(vec![ - Field::new("id", DataType::UInt8, false), - Field::new("bank_account", DataType::UInt64, true), - ])) + if self.test_conflict_name { + SchemaRef::new(Schema::new(vec![ + Field::new("_file", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) + } else { + SchemaRef::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) + } } fn metadata_columns(&self) -> Option { @@ -138,38 +157,13 @@ impl TableProvider for CustomDataSource { _filters: &[Expr], _limit: Option, ) -> Result> { - let mut schema = self.schema(); - let size = schema.fields.len(); - if let Some(metadata) = self.metadata_columns() { - let mut builder = SchemaBuilder::from(schema.as_ref()); - for f in metadata.fields.iter() { - builder.try_merge(f)?; - } - schema = Arc::new(builder.finish()); - } - - let projection = match projection { - Some(projection) => { - let projection = projection - .iter() - .map(|idx| { - if *idx >= METADATA_OFFSET { - *idx - METADATA_OFFSET + size - } else { - *idx - } - }) - .collect_vec(); - Some(projection) - } - None => None, - }; - return self.create_physical_plan(projection.as_ref(), schema).await; + return self.create_physical_plan(projection).await; } } #[derive(Debug, Clone)] struct CustomExec { + test_conflict_name: bool, db: CustomDataSource, projected_schema: SchemaRef, cache: PlanProperties, @@ -177,13 +171,30 @@ struct CustomExec { impl CustomExec { fn new( + test_conflict_name: bool, projections: Option<&Vec>, - schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + let schema = db.schema(); + let metadata_schema = db.metadata_columns(); + let projected_schema = match projections { + Some(projection) => { + let projection = projection + .iter() + .map(|idx| match extract_field_index(*idx) { + FieldIndex::NormalIndex(i) => Arc::new(schema.field(i).clone()), + FieldIndex::MetadataIndex(i) => { + Arc::new(metadata_schema.as_ref().unwrap().field(i).clone()) + } + }) + .collect_vec(); + Arc::new(Schema::new(projection)) + } + None => schema, + }; let cache = Self::compute_properties(projected_schema.clone()); Self { + test_conflict_name, db, projected_schema, cache, @@ -265,7 +276,13 @@ impl ExecutionPlan for CustomExec { "_rowid" => Arc::new(rowid_array.clone()) as ArrayRef, "id" => Arc::new(id_array.clone()) as ArrayRef, "bank_account" => Arc::new(account_array.clone()) as ArrayRef, - "_file" => Arc::new(file_array.clone()) as ArrayRef, + "_file" => { + if self.test_conflict_name { + Arc::new(id_array.clone()) as ArrayRef + } else { + Arc::new(file_array.clone()) as ArrayRef + } + } _ => panic!("cannot reach here"), }) .collect(); @@ -278,6 +295,53 @@ impl ExecutionPlan for CustomExec { } } +#[tokio::test] +async fn select_conflict_name() { + // when reading csv, json or parquet, normal column name may be same as metadata column name, + // metadata column name should be suppressed. + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_information_schema(true), + ); + let db = CustomDataSource::default().with_conflict_name(); + db.populate_users(); + ctx.register_table("test", Arc::new(db)).unwrap(); + // disallow ddl + let options = SQLOptions::new().with_allow_ddl(false); + + let show_columns = "show columns from test;"; + let df_columns = ctx.sql_with_options(show_columns, options).await.unwrap(); + + let batchs = df_columns + .select(vec![col("column_name"), col("data_type")]) + .unwrap() + .collect() + .await + .unwrap(); + let expected = [ + "+--------------+-----------+", + "| column_name | data_type |", + "+--------------+-----------+", + "| _file | UInt8 |", + "| bank_account | UInt64 |", + "+--------------+-----------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); + + let select0 = "SELECT _file FROM test"; + let df = ctx.sql_with_options(select0, options).await.unwrap(); + let batchs = df.collect().await.unwrap(); + let expected = [ + "+-------+", + "| _file |", + "+-------+", + "| 1 |", + "| 2 |", + "| 3 |", + "+-------+", + ]; + assert_batches_sorted_eq!(expected, &batchs); +} + #[tokio::test] async fn select_metadata_column() { // Verify SessionContext::with_sql_options errors appropriately diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 1b08b0fd5881..69bfabe530c6 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -53,10 +53,10 @@ use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion, }; use datafusion_common::{ - aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, - DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, - FunctionalDependencies, ParamValues, QualifiedSchema, Result, ScalarValue, - TableReference, UnnestOptions, METADATA_OFFSET, + aggregate_functional_dependencies, extract_field_index, internal_err, plan_err, + Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency, FieldIndex, + FunctionalDependence, FunctionalDependencies, ParamValues, QualifiedSchema, Result, + ScalarValue, TableReference, UnnestOptions, }; use indexmap::IndexSet; @@ -2619,29 +2619,25 @@ impl TableScan { .map(|p| { let projected_func_dependencies = func_dependencies.project_functional_dependencies(p, p.len()); - let qualified_fields: Result, _> = - p.iter() - .map(|i| { - if *i >= METADATA_OFFSET { - if let Some(metadata) = &metadata { - return Ok(( - Some(table_name.clone()), - Arc::new( - metadata.field(*i - METADATA_OFFSET).clone(), - ), - )); - } else { - return plan_err!( - "table doesn't support metadata column" - ); - } + let qualified_fields: Result, _> = p + .iter() + .map(|i| match extract_field_index(*i) { + FieldIndex::MetadataIndex(i) => { + if let Some(metadata) = &metadata { + Ok(( + Some(table_name.clone()), + Arc::new(metadata.field(i).clone()), + )) + } else { + plan_err!("table doesn't support metadata column") } - Ok(( - Some(table_name.clone()), - Arc::new(schema.field(*i).clone()), - )) - }) - .collect(); + } + FieldIndex::NormalIndex(i) => Ok(( + Some(table_name.clone()), + Arc::new(schema.field(i).clone()), + )), + }) + .collect(); let df_schema = DFSchema::new_with_metadata( qualified_fields?, schema.metadata.clone(),