diff --git a/datafusion-testing b/datafusion-testing index 5cc59ceceeeb..5b424aefd7f6 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 5cc59ceceeebeea6b39861210b6d1cd27e66648a +Subproject commit 5b424aefd7f6bf198220c37f59d39dbb25b47695 diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 2b477462da68..e3996f3c3188 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -55,12 +55,47 @@ pub trait TableProvider: Debug + Sync + Send { /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef; - /// Get metadata columns of this table. - /// See Also: [`datafusion_common::DFSchema::metadata`] + /// Return a reference to the schema for metadata columns. + /// + /// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. + /// For example, `ctid` in Postgres would be considered a metadata column + /// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples. + /// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)). + /// + /// You can use this method to declare which columns in the table are "metadata" columns. + /// See `datafusion/core/tests/sql/metadata_columns.rs` for an example of this in action. + /// + /// As an example of how this works in practice, if you have the following Postgres table: + /// + /// ```sql + /// CREATE TABLE t (x int); + /// INSERT INTO t VALUES (1); + /// ``` + /// + /// And you do a `SELECT * FROM t`, you would get the following schema: + /// + /// ```text + /// +---+ + /// | x | + /// +---+ + /// | 1 | + /// +---+ + /// ``` + /// + /// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example): + /// + /// ```text + /// +-----+---+ + /// | ctid| x | + /// +-----+---+ + /// | 0 | 1 | + /// +-----+---+ + /// ``` /// /// Returns: /// - `None` for tables that do not have metadata columns. /// - `Some(SchemaRef)` for tables having metadata columns. + /// The returned schema should be be the schema of _only_ the metadata columns, not the full schema. fn metadata_columns(&self) -> Option { None } diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 787ea299201c..699a5b37cc6b 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -104,12 +104,36 @@ pub type DFSchemaRef = Arc; /// let schema = Schema::from(df_schema); /// assert_eq!(schema.fields().len(), 1); /// ``` +/// +/// DFSchema also supports metadata columns. +/// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. +/// For example, the [`ctid` column in Postgres](https://www.postgresql.org/docs/current/ddl-system-columns.html) +/// or the [`_metadata` column that in Spark](https://docs.databricks.com/en/ingestion/file-metadata-column.html). +/// +/// These columns are stored in a separate schema from the main schema, which can be accessed using [DFSchema::metadata_schema]. +/// To build a schema with metadata columns, use [DFSchema::new_with_metadata]: +/// ```rust +/// use datafusion_common::{DFSchema, Column, TableReference}; +/// use arrow_schema::{Field, Schema}; +/// use arrow::datatypes::DataType; +/// use std::collections::HashMap; +/// +/// let schema = Schema::new(vec![ +/// Field::new("c1", DataType::Int32, false), +/// ]); +/// let metadata_schema = Schema::new(vec![ +/// Field::new("file", DataType::Utf8, false), +/// ]); +/// +/// let df_schema = DFSchema::new_with_metadata( +/// vec![(None, Field::new("c1", DataType::Int32, false).into())], +/// #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { inner: QualifiedSchema, /// Stores functional dependencies in the schema. functional_dependencies: FunctionalDependencies, - /// metadata columns are data columns for a table that are not in the table schema. + /// Metadata columns are data columns for a table that are not in the table schema. /// For example, a file source could expose a "file" column that contains the path of the file that contained each row. /// See Also: [Spark SupportsMetadataColumns]: metadata: Option, @@ -121,8 +145,9 @@ pub struct DFSchema { pub const METADATA_OFFSET: usize = usize::MAX >> 1; /// QualifiedSchema wraps an Arrow schema and field qualifiers. -/// Some fields may be qualified and some unqualified. A qualified field is a field that has a -/// relation name associated with it. +/// Some fields may be qualified and some unqualified. +/// A qualified field is a field that has a relation name associated with it. +/// For example, a qualified field would be `table_name.column_name` and an unqualified field would be just `column_name`. #[derive(Debug, Clone, PartialEq, Eq)] pub struct QualifiedSchema { /// Inner Arrow schema reference. @@ -132,7 +157,11 @@ pub struct QualifiedSchema { field_qualifiers: Vec>, } +/// A table schema that holds not just column names but also the name of the table they belong to. +/// For example, consider `table_name.column_name` (qualified) vs. just `column_name` (unqualified). impl QualifiedSchema { + + /// Creates an empty `QualifiedSchema`. pub fn empty() -> Self { Self { schema: Arc::new(Schema::new([])), @@ -140,30 +169,67 @@ impl QualifiedSchema { } } - pub fn new(schema: SchemaRef, field_qualifiers: Vec>) -> Self { - QualifiedSchema { - schema, - field_qualifiers, + /// Creates a new `QualifiedSchema` from an Arrow schema and a list of table references. + /// The table references must be of the same length as the fields in the schema and + /// follow the same order. + pub fn new(schema: SchemaRef, field_qualifiers: Vec>) -> Result { + if schema.fields().len() != field_qualifiers.len() { + return _schema_err!(SchemaError::UnmatchedFieldQualifiers { + field_count: schema.fields().len(), + qualifier_count: field_qualifiers.len(), + }); } + Ok( + QualifiedSchema { + schema, + field_qualifiers, + } + ) } + /// Create a new `QualifiedSchema` from a list of Arrow [Field]s where they all share the same [TableReference]. + /// + /// For example, to create a schema for a table with all fields qualified by `table_name`: + /// ```rust + /// use datafusion_common::{QualifiedSchema, TableReference}; + /// use arrow_schema::{Field, Schema}; + /// use arrow::datatypes::DataType; + /// let schema = Schema::new(vec![ + /// Field::new("c1", DataType::Int32, false), + /// ]); + /// let table_name = TableReference::from("table_name"); + /// let qualified_schema = QualifiedSchema::new_with_table(schema, &table_name); + /// ``` + /// + /// To create a schema where fields have different qualifiers, use [QualifiedSchema::new]. pub fn new_with_table(schema: SchemaRef, table_name: &TableReference) -> Self { let field_qualifiers = schema .fields() .iter() .map(|_| Some(table_name.clone())) .collect(); - Self::new(schema, field_qualifiers) + Self::new(schema, field_qualifiers).expect("field qualifier length should match schema") } + /// Checks if the schema is empty. + /// + /// Returns: + /// - `true` if the schema has no fields + /// - `false` if it has any fields, qualified or unqualified pub fn is_empty(&self) -> bool { self.schema.fields.is_empty() } + /// Returns the number of fields in the schema, be they qualified or unqualified. pub fn len(&self) -> usize { self.schema.fields.len() } + /// Look up the field by it's unqualified name. + /// + /// This returns a Vec of fields and their qualifier for any field that have the given unqualified name. + /// For example, given the fields `table1.a`, `table1.b` and `table2.a` if you search for `a` you will get `table1.a` and `table2.a` + /// as [(`table1`, `a`), (`table2`, `a`)]. pub fn qualified_fields_with_unqualified_name( &self, name: &str, @@ -174,7 +240,7 @@ impl QualifiedSchema { .collect() } - /// Iterate over the qualifiers and fields in the DFSchema + /// Iterate over the qualifiers and fields in the DFSchema. pub fn iter(&self) -> impl Iterator, &FieldRef)> { self.field_qualifiers .iter() @@ -182,6 +248,7 @@ impl QualifiedSchema { .map(|(qualifier, field)| (qualifier.as_ref(), field)) } + /// Similar to [Self::qualified_fields_with_unqualified_name] but discards the qualifier in the result. pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { self.fields() .iter() @@ -201,12 +268,17 @@ impl QualifiedSchema { &self.schema.fields[i] } - /// Returns an immutable reference of a specific `Field` instance selected using an + /// Returns an immutable reference to a specific `Field` and it's qualifier using an /// offset within the internal `fields` vector and its qualifier pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) { (self.field_qualifiers[i].as_ref(), self.field(i)) } + /// Search for a field using it's qualified name. + /// + /// This will return the field if it exists, otherwise it will return `None`. + /// + /// For example, given the fields `table1.a`, `table1.b` and `table2.a` if you search for (`table1`, `a`) you will get the [Field] for `a` back. pub fn field_with_qualified_name( &self, qualifier: &TableReference, @@ -222,6 +294,10 @@ impl QualifiedSchema { matches.next() } + /// Get the internal index of a column using it's unqualified name. + /// If multiple columns have the same unqualified name, the index of the first one is returned. + /// If no column is found, `None` is returned. + /// This index can be used to access the column via [Self::field] or [Self::qualified_field]. pub fn index_of_column_by_name( &self, qualifier: Option<&TableReference>, @@ -244,13 +320,14 @@ impl QualifiedSchema { matches.next() } + /// Get only the qualifier of a field using it's internal index. pub fn field_qualifier(&self, i: usize) -> Option<&TableReference> { self.field_qualifiers[i].as_ref() } } impl DFSchema { - /// Creates an empty `DFSchema` + /// Creates an empty `DFSchema` with no fields and no metadata columns. pub fn empty() -> Self { Self { inner: QualifiedSchema::empty(), @@ -259,10 +336,17 @@ impl DFSchema { } } - /// Return a reference to the qualified metadata schema - /// - /// Returns: - /// - `&None` for tables that do not have metadata columns. + /// Return a reference to the schema for metadata columns. + /// + /// Metadata columns are columns which meant to be semi-public stores of the internal details of the table. + /// For example, the [`ctid` column in Postgres](https://www.postgresql.org/docs/current/ddl-system-columns.html) + /// or the [`_metadata` column that in Spark](https://docs.databricks.com/en/ingestion/file-metadata-column.html). + /// + /// Implementers of [`datafusion::datasource::TableProvider`] can use this declare which columns in the table are "metadata" columns. + /// See also [`datafusion::datasource::TableProvider::metadata_columns`] for more information or `datafusion/core/tests/sql/metadata_columns.rs` for a full example. + /// + /// Returns: + /// - `&None` for tables that do not have metadata columns. /// - `&Some(QualifiedSchema)` for tables having metadata columns. pub fn metadata_schema(&self) -> &Option { &self.metadata @@ -282,7 +366,9 @@ impl DFSchema { &self.inner.schema } - /// Set metadata schema to provided value + /// Set the metadata schema for an existing [`DFSchema`]. + /// Note that this is the schema for the metadata columns (see [DFSchema::metadata_schema]). + /// Not to be confused with the metadata of the schema itself (see [Schema::with_metadata]). pub fn with_metadata_schema( mut self, metadata_schema: Option, @@ -291,7 +377,10 @@ impl DFSchema { self } - /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier + /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier and the schema has fixed metadata. + /// This is not to be confused with the _metadata schema_ or _metadata columns_ which are a completely different concept. + /// In this method `metadata` refers to the metadata of the schema itself, which is arbitrary key-value pairs. + /// See [Schema::with_metadata] for more information. pub fn new_with_metadata( qualified_fields: Vec<(Option, Arc)>, metadata: HashMap, @@ -302,7 +391,7 @@ impl DFSchema { let schema = Arc::new(Schema::new_with_metadata(fields, metadata)); let dfschema = Self { - inner: QualifiedSchema::new(schema, qualifiers), + inner: QualifiedSchema::new(schema, qualifiers).expect("qualifiers and fields should have the same length, we just unzipped them"), functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -331,7 +420,7 @@ impl DFSchema { let field_count = fields.len(); let schema = Arc::new(Schema::new_with_metadata(fields, metadata)); let dfschema = Self { - inner: QualifiedSchema::new(schema, vec![None; field_count]), + inner: QualifiedSchema::new(schema, vec![None; field_count]).expect("qualifiers length is hardcoded to be the same as fields length"), functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -352,7 +441,7 @@ impl DFSchema { inner: QualifiedSchema::new( schema.clone().into(), vec![Some(qualifier); schema.fields.len()], - ), + ).expect("qualifiers and fields have the same length"), functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -366,7 +455,7 @@ impl DFSchema { schema: &SchemaRef, ) -> Result { let dfschema = Self { - inner: QualifiedSchema::new(Arc::clone(schema), qualifiers), + inner: QualifiedSchema::new(Arc::clone(schema), qualifiers)?, functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -442,7 +531,7 @@ impl DFSchema { inner: QualifiedSchema::new( Arc::new(new_schema_with_metadata), new_qualifiers, - ), + )?, functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -517,6 +606,9 @@ impl DFSchema { self.inner.qualified_field(i) } + /// Get the internal index of a column using it's unqualified name and an optional qualifier. + /// If a non-metadata column is found, it's index is returned. + /// If a metadata column is found, it's index is returned with an offset of `METADATA_OFFSET`. pub fn index_of_column_by_name( &self, qualifier: Option<&TableReference>, @@ -962,7 +1054,7 @@ impl DFSchema { pub fn strip_qualifiers(self) -> Self { let len = self.inner.len(); DFSchema { - inner: QualifiedSchema::new(self.inner.schema, vec![None; len]), + inner: QualifiedSchema::new(self.inner.schema, vec![None; len]).expect("qualifier length is hardcoded to be the same as fields length"), functional_dependencies: self.functional_dependencies, metadata: self.metadata, } @@ -973,7 +1065,7 @@ impl DFSchema { let qualifier = qualifier.into(); let len = self.inner.len(); DFSchema { - inner: QualifiedSchema::new(self.inner.schema, vec![Some(qualifier); len]), + inner: QualifiedSchema::new(self.inner.schema, vec![Some(qualifier); len]).expect("qualifier length is hardcoded to be the same as fields length"), functional_dependencies: self.functional_dependencies, metadata: self.metadata, } @@ -1050,7 +1142,7 @@ impl TryFrom for DFSchema { fn try_from(schema: SchemaRef) -> Result { let field_count = schema.fields.len(); let dfschema = Self { - inner: QualifiedSchema::new(schema, vec![None; field_count]), + inner: QualifiedSchema::new(schema, vec![None; field_count])?, functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -1106,7 +1198,7 @@ impl ToDFSchema for Vec { metadata: HashMap::new(), }; let dfschema = DFSchema { - inner: QualifiedSchema::new(schema.into(), vec![None; field_count]), + inner: QualifiedSchema::new(schema.into(), vec![None; field_count])?, functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -1470,7 +1562,7 @@ mod tests { inner: QualifiedSchema::new( Arc::clone(&arrow_schema_ref), vec![None; arrow_schema_ref.fields.len()], - ), + ).unwrap(), functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; @@ -1519,7 +1611,7 @@ mod tests { inner: QualifiedSchema::new( Arc::clone(&schema), vec![None; schema.fields.len()], - ), + ).unwrap(), functional_dependencies: FunctionalDependencies::empty(), metadata: None, }; diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 1012c4cd2270..e5b5ee34096a 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -157,6 +157,11 @@ pub enum SchemaError { field: Box, valid_fields: Vec, }, + /// Schema contains a different number of fields and field qualifiers + UnmatchedFieldQualifiers { + field_count: usize, + qualifier_count: usize, + }, } impl Display for SchemaError { @@ -211,6 +216,16 @@ impl Display for SchemaError { ) } } + Self::UnmatchedFieldQualifiers { + field_count, + qualifier_count, + } => { + write!( + f, + "Schema contains a different number of fields ({}) and field qualifiers ({})", + field_count, qualifier_count + ) + } } } } diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs index ff411311f609..c15ce8c9646f 100644 --- a/datafusion/core/src/datasource/default_table_source.rs +++ b/datafusion/core/src/datasource/default_table_source.rs @@ -56,6 +56,9 @@ impl TableSource for DefaultTableSource { self.table_provider.schema() } + /// Get a reference to the metadata columns for this table. + /// By default this delegate to the table provider, but can be overridden by the table source. + /// See [`TableProvider::metadata_columns`] for more information. fn metadata_columns(&self) -> Option { self.table_provider.metadata_columns() } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 8214b44ebe66..1b08b0fd5881 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -370,6 +370,10 @@ impl LogicalPlan { } } + /// Gather the schema representating the metadata columns that this plan outputs. + /// This is done by recursively traversing the plan and collecting the metadata columns that are output + /// from inner nodes. + /// See [`TableProvider::metadata_columns`] for more information on metadata columns in general. pub fn metadata_schema(&self) -> &Option { match self { LogicalPlan::TableScan(TableScan {