diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 8105ada59f5c..e938b38978de 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -586,6 +586,12 @@ pub trait ExprSchema: std::fmt::Debug { /// What is the datatype of this column? fn data_type(&self, col: &Column) -> Result<&DataType>; + + /// Is this column reference dict_is_ordered? + fn dict_is_ordered(&self, col: &Column) -> Result; + + /// What is the dict_id of this column? + fn dict_id(&self, col: &Column) -> Result; } // Implement `ExprSchema` for `Arc` @@ -597,6 +603,14 @@ impl + std::fmt::Debug> ExprSchema for P { fn data_type(&self, col: &Column) -> Result<&DataType> { self.as_ref().data_type(col) } + + fn dict_is_ordered(&self, col: &Column) -> Result { + self.as_ref().dict_is_ordered(col) + } + + fn dict_id(&self, col: &Column) -> Result { + self.as_ref().dict_id(col) + } } impl ExprSchema for DFSchema { @@ -607,6 +621,20 @@ impl ExprSchema for DFSchema { fn data_type(&self, col: &Column) -> Result<&DataType> { Ok(self.field_from_column(col)?.data_type()) } + + fn dict_is_ordered(&self, col: &Column) -> Result { + match self.field_from_column(col)?.field().dict_is_ordered() { + Some(dict_id_ordered) => Ok(dict_id_ordered), + _ => Ok(false), + } + } + + fn dict_id(&self, col: &Column) -> Result { + match self.field_from_column(col)?.field().dict_id() { + Some(dict_id_ordered) => Ok(dict_id_ordered), + _ => Ok(0), + } + } } /// DFField wraps an Arrow field and adds an optional qualifier @@ -639,6 +667,46 @@ impl DFField { field: Arc::new(Field::new(name, data_type, nullable)), } } + /// Creates a new `DFField` with dict + pub fn new_dict>( + qualifier: Option, + name: &str, + data_type: DataType, + nullable: bool, + dict_id: i64, + dict_is_ordered: bool, + ) -> Self { + DFField { + qualifier: qualifier.map(|s| s.into()), + field: Arc::new(Field::new_dict( + name, + data_type, + nullable, + dict_id, + dict_is_ordered, + )), + } + } + + /// Convenience method for creating new `DFField` without a qualifier with dict type + pub fn new_unqualified_dict( + name: &str, + data_type: DataType, + nullable: bool, + dict_id: i64, + dict_is_ordered: bool, + ) -> Self { + DFField { + qualifier: None, + field: Arc::new(Field::new_dict( + name, + data_type, + nullable, + dict_id, + dict_is_ordered, + )), + } + } /// Create a qualified field from an existing Arrow field pub fn from_qualified<'a>( diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index ba37cf6d45b8..69db90b81d11 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -43,6 +43,12 @@ pub trait ExprSchemable { /// cast to a type with respect to a schema fn cast_to(self, cast_to_type: &DataType, schema: &S) -> Result; + + /// given a schema, return the dict id of the expr + fn get_dict_id(&self, schema: &S) -> Result; + + /// given a schema, return the dict_is_ordered of the expr + fn dict_is_ordered(&self, input_schema: &S) -> Result; } impl ExprSchemable for Expr { @@ -262,23 +268,58 @@ impl ExprSchemable for Expr { } } + fn dict_is_ordered(&self, input_schema: &S) -> Result { + match self { + // TODO Handle more types + Expr::Column(c) => input_schema.dict_is_ordered(c), + _ => Ok(false), + } + } + + fn get_dict_id(&self, schema: &S) -> Result { + match self { + // TODO Handle more types + Expr::Column(c) => schema.dict_id(c), + _ => Ok(0), + } + } + /// Returns a [arrow::datatypes::Field] compatible with this expression. /// /// So for example, a projected expression `col(c1) + col(c2)` is /// placed in an output field **named** col("c1 + c2") fn to_field(&self, input_schema: &DFSchema) -> Result { match self { - Expr::Column(c) => Ok(DFField::new( - c.relation.clone(), - &c.name, - self.get_type(input_schema)?, - self.nullable(input_schema)?, - )), - _ => Ok(DFField::new_unqualified( - &self.display_name()?, - self.get_type(input_schema)?, - self.nullable(input_schema)?, - )), + Expr::Column(c) => Ok(match self.get_type(input_schema)? { + DataType::Dictionary(_, _) => DFField::new_dict( + c.relation.clone(), + &c.name, + self.get_type(input_schema)?, + self.nullable(input_schema)?, + self.get_dict_id(input_schema)?, + self.dict_is_ordered(input_schema)?, + ), + _ => DFField::new( + c.relation.clone(), + &c.name, + self.get_type(input_schema)?, + self.nullable(input_schema)?, + ), + }), + _ => Ok(match self.get_type(input_schema)? { + DataType::Dictionary(_, _) => DFField::new_unqualified_dict( + &self.display_name()?, + self.get_type(input_schema)?, + self.nullable(input_schema)?, + self.get_dict_id(input_schema)?, + self.dict_is_ordered(input_schema)?, + ), + _ => DFField::new_unqualified( + &self.display_name()?, + self.get_type(input_schema)?, + self.nullable(input_schema)?, + ), + }), } } @@ -347,11 +388,58 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result::new(), + ) + .unwrap(); + let expr = vec![col("dictionary_column1"), col("dictionary_column2")]; + for i in 0..dffield.len() { + assert_eq!(expr[i].to_field(&dfschema).unwrap(), dffield[i]); + assert_eq!( + expr[i].to_field(&dfschema).unwrap().field().dict_id(), + dffield[i].field().dict_id() + ); + assert_eq!( + expr[i] + .to_field(&dfschema) + .unwrap() + .field() + .dict_is_ordered(), + dffield[i].field().dict_is_ordered() + ); + } + } + #[test] fn expr_schema_nullability() { let expr = col("foo").eq(lit(1)); @@ -404,5 +492,13 @@ mod tests { fn data_type(&self, _col: &Column) -> Result<&DataType> { Ok(&self.data_type) } + + fn dict_id(&self, _col: &Column) -> Result { + Ok(0) + } + + fn dict_is_ordered(&self, _col: &Column) -> Result { + Ok(false) + } } }