Skip to content

Commit

Permalink
Support marking columns as system columns via metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb committed Jan 29, 2025
1 parent d051731 commit 5359935
Show file tree
Hide file tree
Showing 5 changed files with 411 additions and 2 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tonic = "0.12.1"
url = { workspace = true }
uuid = "1.7"
itertools = { workspace = true }

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
nix = { version = "0.28.0", features = ["fs"] }
177 changes: 177 additions & 0 deletions datafusion-examples/examples/system_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::record_batch;
use datafusion::arrow::datatypes::{DataType, Field, Schema};

use datafusion::{assert_batches_eq, prelude::*};

/// This example shows how to mark fields as system columns.
/// System columns are columns which meant to be semi-public stores of the internal details of the table.
/// For example, `ctid` in Postgres would be considered a metadata column
/// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples.
/// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)).
///
/// DataFusion allows fields to be declared as metadata columns by setting the `datafusion.system_column` key in the field's metadata
/// to `true`.
///
/// As an example of how this works in practice, if you have the following Postgres table:
///
/// ```sql
/// CREATE TABLE t (x int);
/// INSERT INTO t VALUES (1);
/// ```
///
/// And you do a `SELECT * FROM t`, you would get the following schema:
///
/// ```text
/// +---+
/// | x |
/// +---+
/// | 1 |
/// +---+
/// ```
///
/// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example):
///
/// ```text
/// +-----+---+
/// | ctid| x |
/// +-----+---+
/// | 0 | 1 |
/// +-----+---+
/// ```
#[tokio::main]
async fn main() {
let batch = record_batch!(
("a", Int32, [1, 2, 3]),
("b", Utf8, ["foo", "bar", "baz"]),
("_row_num", UInt32, [1, 2, 3])
).unwrap();
let batch = batch.with_schema(
Arc::new(
Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
Field::new("_row_num", DataType::UInt32, true).with_metadata(HashMap::from_iter([
("datafusion.system_column".to_string(), "true".to_string()),
])),
])
)
).unwrap();

let ctx = SessionContext::new();
let _ = ctx.register_batch("t", batch);

let res = ctx.sql("SELECT a, b FROM t").await.unwrap().collect().await.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+",
"| a | b |",
"+---+-----+",
"| 1 | foo |",
"| 2 | bar |",
"| 3 | baz |",
"+---+-----+",
];
assert_batches_eq!(expected, &res);

let res = ctx.sql("SELECT _row_num FROM t").await.unwrap().collect().await.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+----------+",
"| _row_num |",
"+----------+",
"| 1 |",
"| 2 |",
"| 3 |",
"+----------+",
];
assert_batches_eq!(expected, &res);

let res = ctx.sql("SELECT * FROM t").await.unwrap().collect().await.unwrap();
// does not include _row_num
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+",
"| a | b |",
"+---+-----+",
"| 1 | foo |",
"| 2 | bar |",
"| 3 | baz |",
"+---+-----+",
];
assert_batches_eq!(expected, &res);

let res = ctx.sql("SELECT *, _row_num FROM t").await.unwrap().collect().await.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+----------+",
"| a | b | _row_num |",
"+---+-----+----------+",
"| 1 | foo | 1 |",
"| 2 | bar | 2 |",
"| 3 | baz | 3 |",
"+---+-----+----------+",
];
assert_batches_eq!(expected, &res);



let res = ctx.sql("SELECT t._row_num FROM t").await.unwrap().collect().await.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+----------+",
"| _row_num |",
"+----------+",
"| 1 |",
"| 2 |",
"| 3 |",
"+----------+",
];
assert_batches_eq!(expected, &res);

let res = ctx.sql("SELECT t.* FROM t").await.unwrap().collect().await.unwrap();
// does not include _row_num
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+",
"| a | b |",
"+---+-----+",
"| 1 | foo |",
"| 2 | bar |",
"| 3 | baz |",
"+---+-----+",
];
assert_batches_eq!(expected, &res);

let res = ctx.sql("SELECT t.*, _row_num FROM t").await.unwrap().collect().await.unwrap();
#[rustfmt::skip]
let expected: Vec<&str> = vec![
"+---+-----+----------+",
"| a | b | _row_num |",
"+---+-----+----------+",
"| 1 | foo | 1 |",
"| 2 | bar | 2 |",
"| 3 | baz | 3 |",
"+---+-----+----------+",
];
assert_batches_eq!(expected, &res);
}
5 changes: 5 additions & 0 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ impl InformationSchemaConfig {
for (field_position, field) in
table.schema().fields().iter().enumerate()
{
if let Some(v) = field.metadata().get("datafusion.system_column") {
if v.to_lowercase().starts_with("t") {
continue;
}
}
builder.add_column(
&catalog_name,
&schema_name,
Expand Down
155 changes: 155 additions & 0 deletions datafusion/core/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use super::*;
use datafusion_common::ScalarValue;

Expand Down Expand Up @@ -350,3 +352,156 @@ async fn test_version_function() {

assert_eq!(version.value(0), expected_version);
}

#[tokio::test]
async fn test_select_system_column() {
let batch = record_batch!(
("id", UInt8, [1, 2, 3]),
("bank_account", UInt64, [9000, 100, 1000]),
("_rowid", UInt32, [0, 1, 2]),
("_file", Utf8, ["file-0", "file-1", "file-2"])
).unwrap();
let batch = batch.with_schema(
Arc::new(
Schema::new(vec![
Field::new("id", DataType::UInt8, true),
Field::new("bank_account", DataType::UInt64, true),
Field::new("_rowid", DataType::UInt32, true).with_metadata(HashMap::from_iter([
("datafusion.system_column".to_string(), "true".to_string()),
])),
Field::new("_file", DataType::Utf8, true).with_metadata(HashMap::from_iter([
("datafusion.system_column".to_string(), "true".to_string()),
])),
])
)
).unwrap();

let ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);
let _ = ctx.register_batch("test", batch);

let select0 = "SELECT * FROM test order by id";
let df = ctx.sql(select0).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+----+--------------+",
"| id | bank_account |",
"+----+--------------+",
"| 1 | 9000 |",
"| 2 | 100 |",
"| 3 | 1000 |",
"+----+--------------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select1 = "SELECT _rowid FROM test order by _rowid";
let df = ctx.sql(select1).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+",
"| _rowid |",
"+--------+",
"| 0 |",
"| 1 |",
"| 2 |",
"+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select2 = "SELECT _rowid, id FROM test order by _rowid";
let df = ctx.sql(select2).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+----+",
"| _rowid | id |",
"+--------+----+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"+--------+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select3 = "SELECT _rowid, id FROM test WHERE _rowid = 0";
let df = ctx.sql(select3).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+----+",
"| _rowid | id |",
"+--------+----+",
"| 0 | 1 |",
"+--------+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select4 = "SELECT _rowid FROM test LIMIT 1";
let df = ctx.sql(select4).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+",
"| _rowid |",
"+--------+",
"| 0 |",
"+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select5 = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1";
let df = ctx.sql(select5).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+----+",
"| _rowid | id |",
"+--------+----+",
"| 1 | 2 |",
"+--------+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select6 = "SELECT _rowid, _file FROM test order by _rowid";
let df = ctx.sql(select6).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+--------+--------+",
"| _rowid | _file |",
"+--------+--------+",
"| 0 | file-0 |",
"| 1 | file-1 |",
"| 2 | file-2 |",
"+--------+--------+",
];
assert_batches_sorted_eq!(expected, &batchs);

let select6 = "SELECT id FROM test order by _rowid desc";
let df = ctx.sql(select6).await.unwrap();
let batchs = df.collect().await.unwrap();
let expected = [
"+----+",
"| id |",
"+----+",
"| 3 |",
"| 2 |",
"| 1 |",
"+----+",
];
assert_batches_sorted_eq!(expected, &batchs);

let show_columns = "show columns from test;";
let df_columns = ctx.sql(show_columns).await.unwrap();
let batchs = df_columns
.select(vec![col("column_name"), col("data_type")])
.unwrap()
.collect()
.await
.unwrap();
let expected = [
"+--------------+-----------+",
"| column_name | data_type |",
"+--------------+-----------+",
"| id | UInt8 |",
"| bank_account | UInt64 |",
"+--------------+-----------+",
];
assert_batches_sorted_eq!(expected, &batchs);
}
Loading

0 comments on commit 5359935

Please sign in to comment.