Skip to content

Commit

Permalink
name conflict test & refine metadata index
Browse files Browse the repository at this point in the history
  • Loading branch information
chenkovsky committed Jan 30, 2025
1 parent 190db1f commit 912a35a
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 155 deletions.
62 changes: 23 additions & 39 deletions datafusion-examples/examples/metadata_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use arrow::array::{ArrayRef, StringArray, UInt64Array};
use arrow_schema::SchemaBuilder;
use async_trait::async_trait;
use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand All @@ -33,14 +32,14 @@ use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream,
};

use datafusion::prelude::*;

use datafusion::catalog::Session;
use datafusion_common::METADATA_OFFSET;
use datafusion_common::{extract_field_index, FieldIndex};
use itertools::Itertools;
use tokio::time::timeout;

Expand Down Expand Up @@ -120,9 +119,8 @@ impl CustomDataSource {
pub(crate) async fn create_physical_plan(
&self,
projections: Option<&Vec<usize>>,
schema: SchemaRef,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CustomExec::new(projections, schema, self.clone())))
Ok(Arc::new(CustomExec::new(projections, self.clone())))
}

pub(crate) fn populate_users(&self) {
Expand Down Expand Up @@ -189,33 +187,7 @@ impl TableProvider for CustomDataSource {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut schema = self.schema();
let size = schema.fields.len();
if let Some(metadata) = self.metadata_columns() {
let mut builder = SchemaBuilder::from(schema.as_ref());
for f in metadata.fields.iter() {
builder.try_merge(f)?;
}
schema = Arc::new(builder.finish());
}

let projection = match projection {
Some(projection) => {
let projection = projection
.iter()
.map(|idx| {
if *idx >= METADATA_OFFSET {
*idx - METADATA_OFFSET + size
} else {
*idx
}
})
.collect_vec();
Some(projection)
}
None => None,
};
return self.create_physical_plan(projection.as_ref(), schema).await;
return self.create_physical_plan(projection).await;
}
}

Expand All @@ -227,12 +199,24 @@ struct CustomExec {
}

impl CustomExec {
fn new(
projections: Option<&Vec<usize>>,
schema: SchemaRef,
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections).unwrap();
fn new(projections: Option<&Vec<usize>>, db: CustomDataSource) -> Self {
let schema = db.schema();
let metadata_schema = db.metadata_columns();
let projected_schema = match projections {
Some(projection) => {
let projection = projection
.iter()
.map(|idx| match extract_field_index(*idx) {
FieldIndex::NormalIndex(i) => Arc::new(schema.field(i).clone()),
FieldIndex::MetadataIndex(i) => {
Arc::new(metadata_schema.as_ref().unwrap().field(i).clone())
}
})
.collect_vec();
Arc::new(Schema::new(projection))
}
None => schema,
};
let cache = Self::compute_properties(projected_schema.clone());
Self {
db,
Expand Down
14 changes: 7 additions & 7 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,34 +56,34 @@ pub trait TableProvider: Debug + Sync + Send {
fn schema(&self) -> SchemaRef;

/// Return a reference to the schema for metadata columns.
///
///
/// Metadata columns are columns which meant to be semi-public stores of the internal details of the table.
/// For example, `ctid` in Postgres would be considered a metadata column
/// (Postgres calls these "system columns", see [the Postgres docs](https://www.postgresql.org/docs/current/ddl-system-columns.html) for more information and examples.
/// Spark has a `_metadata` column that it uses to include details about each file read in a query (see [Spark's docs](https://docs.databricks.com/en/ingestion/file-metadata-column.html)).
///
/// You can use this method to declare which columns in the table are "metadata" columns.
/// See `datafusion/core/tests/sql/metadata_columns.rs` for an example of this in action.
///
///
/// As an example of how this works in practice, if you have the following Postgres table:
///
///
/// ```sql
/// CREATE TABLE t (x int);
/// INSERT INTO t VALUES (1);
/// ```
///
///
/// And you do a `SELECT * FROM t`, you would get the following schema:
///
///
/// ```text
/// +---+
/// | x |
/// +---+
/// | 1 |
/// +---+
/// ```
///
///
/// But if you do `SELECT ctid, * FROM t`, you would get the following schema (ignore the meaning of the value of `ctid`, this is just an example):
///
///
/// ```text
/// +-----+---+
/// | ctid| x |
Expand Down
Loading

0 comments on commit 912a35a

Please sign in to comment.