Skip to content

Commit

Permalink
add document, refine test
Browse files Browse the repository at this point in the history
  • Loading branch information
chenkovsky committed Jan 12, 2025
1 parent a4dee3e commit 1ab8c7d
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 55 deletions.
5 changes: 5 additions & 0 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ pub trait TableProvider: Debug + Sync + Send {
fn schema(&self) -> SchemaRef;

/// Get metadata columns of this table.
/// See Also: [`datafusion_common::DFSchema::metadata`]
///
/// Returns:
/// - `None` for tables that do not have metadata columns.
/// - `Some(SchemaRef)` for tables having metadata columns.
fn metadata_columns(&self) -> Option<SchemaRef> {
None
}
Expand Down
16 changes: 15 additions & 1 deletion datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,20 @@ pub struct DFSchema {
inner: QualifiedSchema,
/// Stores functional dependencies in the schema.
functional_dependencies: FunctionalDependencies,
/// metadata columns
/// metadata columns are data columns for a table that are not in the table schema.
/// For example, a file source could expose a "file" column that contains the path of the file that contained each row.
/// See Also: [Spark SupportsMetadataColumns]: <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java>
metadata: Option<QualifiedSchema>,
}

/// The starting point of the metadata column index.
/// If an index is less than this value, then this index is for an ordinary column.
/// If it is greater than this value, then this index is for a metadata column.
pub const METADATA_OFFSET: usize = usize::MAX >> 1;

/// QualifiedSchema wraps an Arrow schema and field qualifiers.
/// Some fields may be qualified and some unqualified. A qualified field is a field that has a
/// relation name associated with it.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QualifiedSchema {
/// Inner Arrow schema reference.
Expand Down Expand Up @@ -251,6 +259,11 @@ impl DFSchema {
}
}

/// Return a reference to the qualified metadata schema
///
/// Returns:
/// - `&None` for tables that do not have metadata columns.
/// - `&Some(QualifiedSchema)` for tables having metadata columns.
pub fn metadata_schema(&self) -> &Option<QualifiedSchema> {
&self.metadata
}
Expand All @@ -269,6 +282,7 @@ impl DFSchema {
&self.inner.schema
}

/// Set metadata schema to provided value
pub fn with_metadata_schema(
mut self,
metadata_schema: Option<QualifiedSchema>,
Expand Down
154 changes: 100 additions & 54 deletions datafusion/core/tests/sql/metadata_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ use std::any::Any;
use std::fmt::{self, Debug, Formatter};
use std::sync::{Arc, Mutex};

use arrow::compute::concat_batches;
use arrow_array::{ArrayRef, UInt64Array};
use arrow_array::{ArrayRef, StringArray, UInt64Array};
use arrow_schema::SchemaBuilder;
use async_trait::async_trait;
use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::csv::CsvSerializer;
use datafusion::datasource::file_format::write::BatchSerializer;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
Expand All @@ -38,7 +35,7 @@ use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream,
};
use datafusion::prelude::*;
use datafusion::{assert_batches_sorted_eq, prelude::*};

use datafusion::catalog::Session;
use datafusion_common::METADATA_OFFSET;
Expand Down Expand Up @@ -108,6 +105,10 @@ impl Default for CustomDataSource {
"_rowid",
DataType::UInt64,
false,
), Field::new(
"_file",
DataType::Utf8,
false,
)])),
}
}
Expand Down Expand Up @@ -258,6 +259,7 @@ impl ExecutionPlan for CustomExec {
let id_array = id_array.finish();
let account_array = account_array.finish();
let rowid_array = UInt64Array::from_iter_values(0_u64..len);
let file_array = StringArray::from_iter_values((0_u64..len).map(|i| format!("file-{}", i)));

let arrays = self
.projected_schema
Expand All @@ -267,6 +269,7 @@ impl ExecutionPlan for CustomExec {
"_rowid" => Arc::new(rowid_array.clone()) as ArrayRef,
"id" => Arc::new(id_array.clone()) as ArrayRef,
"bank_account" => Arc::new(account_array.clone()) as ArrayRef,
"_file" => Arc::new(file_array.clone()) as ArrayRef,
_ => panic!("cannot reach here"),
})
.collect();
Expand All @@ -287,75 +290,118 @@ async fn select_metadata_column() {
);
let db = CustomDataSource::default();
db.populate_users();
// ctx.sql("CREATE TABLE test (x int)").await.unwrap();
ctx.register_table("test", Arc::new(db)).unwrap();
// disallow ddl
let options = SQLOptions::new().with_allow_ddl(false);

let show_columns = "show columns from test;";
let df_columns = ctx.sql_with_options(show_columns, options).await.unwrap();
let all_batchs = df_columns
let batchs = df_columns
.select(vec![col("column_name"), col("data_type")])
.unwrap()
.collect()
.await
.unwrap();
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap();
assert_eq!(batch.num_rows(), 2);
let serializer = CsvSerializer::new().with_header(false);
let bytes = serializer.serialize(batch, true).unwrap();
assert_eq!(bytes, "id,UInt8\nbank_account,UInt64\n");
let select0 = "SELECT * FROM test order by id";
let df0 = ctx.sql_with_options(select0, options).await.unwrap();
assert!(!df0.schema().has_column_with_unqualified_name("_rowid"));
let expected = [
"+--------------+-----------+",
"| column_name | data_type |",
"+--------------+-----------+",
"| id | UInt8 |",
"| bank_account | UInt64 |",
"+--------------+-----------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let all_batchs = df0.collect().await.unwrap();
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap();
let bytes = serializer.serialize(batch, true).unwrap();
assert_eq!(bytes, "1,9000\n2,100\n3,1000\n");
let select0 = "SELECT * FROM test order by id";
let df = ctx.sql_with_options(select0, options).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+----+--------------+",
"| id | bank_account |",
"+----+--------------+",
"| 1 | 9000 |",
"| 2 | 100 |",
"| 3 | 1000 |",
"+----+--------------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select1 = "SELECT _rowid FROM test order by _rowid";
let df1 = ctx.sql_with_options(select1, options).await.unwrap();
assert_eq!(df1.schema().field_names(), vec!["test._rowid"]);

let all_batchs = df1.collect().await.unwrap();
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap();
let bytes = serializer.serialize(batch, true).unwrap();
assert_eq!(bytes, "0\n1\n2\n");
let df = ctx.sql_with_options(select1, options).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+",
"| _rowid |",
"+--------+",
"| 0 |",
"| 1 |",
"| 2 |",
"+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select2 = "SELECT _rowid, id FROM test order by _rowid";
let df2 = ctx.sql_with_options(select2, options).await.unwrap();
assert_eq!(df2.schema().field_names(), vec!["test._rowid", "test.id"]);

let all_batchs = df2.collect().await.unwrap();
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap();
let bytes = serializer.serialize(batch, true).unwrap();
assert_eq!(bytes, "0,1\n1,2\n2,3\n");
let df = ctx.sql_with_options(select2, options).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+----+",
"| _rowid | id |",
"+--------+----+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"+--------+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select3 = "SELECT _rowid, id FROM test WHERE _rowid = 0";
let df3 = ctx.sql_with_options(select3, options).await.unwrap();
assert_eq!(df3.schema().field_names(), vec!["test._rowid", "test.id"]);

let all_batchs = df3.collect().await.unwrap();
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap();
let bytes = serializer.serialize(batch, true).unwrap();
assert_eq!(bytes, "0,1\n");
let df = ctx.sql_with_options(select3, options).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+----+",
"| _rowid | id |",
"+--------+----+",
"| 0 | 1 |",
"+--------+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select4 = "SELECT _rowid FROM test LIMIT 1";
let df4 = ctx.sql_with_options(select4, options).await.unwrap();
assert_eq!(df4.schema().field_names(), vec!["test._rowid"]);

let all_batchs = df4.collect().await.unwrap();
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap();
let bytes = serializer.serialize(batch, true).unwrap();
assert_eq!(bytes, "0\n");
let df = ctx.sql_with_options(select4, options).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+",
"| _rowid |",
"+--------+",
"| 0 |",
"+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select5 = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1";
let df5 = ctx.sql_with_options(select5, options).await.unwrap();
assert_eq!(df5.schema().field_names(), vec!["test._rowid", "test.id"]);

let all_batchs = df5.collect().await.unwrap();
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap();
let bytes = serializer.serialize(batch, true).unwrap();
assert_eq!(bytes, "1,2\n");
let df = ctx.sql_with_options(select5, options).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+----+",
"| _rowid | id |",
"+--------+----+",
"| 1 | 2 |",
"+--------+----+",
];
assert_batches_sorted_eq!(expected, &batchs);


let select6 = "SELECT _rowid, _file FROM test order by _rowid";
let df = ctx.sql_with_options(select6, options).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+--------+",
"| _rowid | _file |",
"+--------+--------+",
"| 0 | file-0 |",
"| 1 | file-1 |",
"| 2 | file-2 |",
"+--------+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);
}

0 comments on commit 1ab8c7d

Please sign in to comment.