Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support marking columns as system columns via Field's metadata #14362

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Jan 29, 2025

@github-actions github-actions bot added logical-expr Logical plan and expressions core Core DataFusion crate labels Jan 29, 2025
@adriangb
Copy link
Contributor Author

adriangb commented Jan 29, 2025

I don't know if there's any other "known" metadata, but I feel like it would be good to have an extension trait along the lines of:

/// Extension of [`Field`] to manage DataFusion specific field metadata.
pub trait DataFusionFieldInfo {
    /// A nice docstring!
    fn is_system_column(&self) -> bool;
   /// Another nice docstring :)
    fn as_system_column(mut self) -> Self;
}

impl DataFusionFieldInfo for Field { ... }

And then we can call field.is_system_column() from anywhere that needs to check.

Edit: done!

@github-actions github-actions bot added the common Related to common crate label Jan 29, 2025
@adriangb adriangb changed the title Support marking columns as system columns via metadata Support marking columns as system columns via Field's metadata Jan 29, 2025
Comment on lines -743 to +816
let flat_name = format!("{}.{}", qualifier, field.name());
let flat_name =
Column::new(Some(qualifier.clone()), field.name())
.flat_name();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit of the existing code. I'd rather re-use the logic for creating flat names at a trivial cost just to avoid having arbitrary string formatting going on here.

@adriangb
Copy link
Contributor Author

@chenkovsky @jayzhan211 could you please review?

@chenkovsky
Copy link

as described in document, https://spark.apache.org/docs/3.5.1/api/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.html

If a table column and a metadata column have the same name, the conflict is resolved by either renaming or suppressing the metadata column. I think we should also guarantee suppressing the metadata column in case of arbitrary fields order.

@adriangb
Copy link
Contributor Author

@chenkovsky could you maybe translate that into a test that we can add? I'm having trouble imagining in what sorts of situations this would apply. Generally in SQL if you have two columns with the same name it's an error (and this is enforced by DFSchema).

@chenkovsky
Copy link

something like this.

#[tokio::test]
async fn test_name_conflict() {
    let batch = record_batch!(
        ("_rowid", UInt32, [0, 1, 2]),
        ("_rowid", Utf8, ["file-0", "file-1", "file-2"])
    )
    .unwrap();
    let batch = batch
        .with_schema(Arc::new(Schema::new(vec![
            Field::new("_rowid", DataType::UInt32, true).to_system_column(),
            Field::new("_rowid", DataType::Utf8, true),
        ])))
        .unwrap();

    let ctx = SessionContext::new_with_config(
        SessionConfig::new().with_information_schema(true),
    );
    let _ = ctx.register_batch("test", batch);
    let select = "SELECT _rowid FROM test";
    let df = ctx.sql(select).await.unwrap();
    let batchs = df.collect().await.unwrap();
    let expected = [
        "+--------+",
        "| _rowid |",
        "+--------+",
        "| file-0 |",
        "| file-1 |",
        "| file-2 |",
        "+--------+",
    ];
    assert_batches_sorted_eq!(expected, &batchs);
}

Currently DuplicateQualifiedField error is thrown. But it's valid in spark.

@adriangb
Copy link
Contributor Author

Is this that important to support? The example seems a bit contrived, I think it'd be more reasonable if it occurred naturally as part of a join or something where a user could unexpectedly run into it. Otherwise it seems to me like something very spark specific.

Postgres for example doesn't let you duplicate system columns:

create table t (ctid int);
ERROR:  column name "ctid" conflicts with a system column name

@adriangb
Copy link
Contributor Author

I will still give a shot at adding that feature tomorrow. But I'm not sold on the behavior being ideal even if that's what Spark does. Besides if it's an error now we can always make it not error in the future.

@chenkovsky
Copy link

Is this that important to support? The example seems a bit contrived, I think it'd be more reasonable if it occurred naturally as part of a join or something where a user could unexpectedly run into it. Otherwise it seems to me like something very spark specific.

Postgres for example doesn't let you duplicate system columns:

create table t (ctid int);
ERROR:  column name "ctid" conflicts with a system column name

because spark is not a database, it's a compute engine. For databases such as postgres, it can manipulate schema and data by itself. so it's ok to disable system column and normal column name conflict. but for spark, the data comes from other systems. it cannot guarantee data doesn't contain these conflict fields.

@adriangb
Copy link
Contributor Author

Was that working in #14057? I didn't see a test for it.

Hypothetically speaking we could do something in DFSchema to deduplicate but I worry that won't make it work e.g. we'll end up with mismatched fields and arrays.

@chenkovsky
Copy link

Yes, it supports. it's my fault. I have added the missing ut.

@chenkovsky
Copy link

by the way, I also encapsulate METADATA_OFFSET.

@adriangb
Copy link
Contributor Author

adriangb commented Jan 30, 2025

@chenkovsky does 3cfce67 satisfy the use case?

@chenkovsky
Copy link

@chenkovsky does 3cfce67 satisfy the use case?

yes,it's the case. but here you decided whether this is system field. it should be decided by table provider automatically.

@adriangb
Copy link
Contributor Author

adriangb commented Jan 30, 2025

it should be decided by table provider automatically

I'm not sure I follow. At some point you have to mark a field as a system column. I used explicit batches / schemas just to make the tests easier, but of course you can do it via a TableProvider and register that, you just have to return the right schema from TableProvider::schema. I can add cook up an example of a TableProvider with system columns if you'd like.

@chenkovsky
Copy link

chenkovsky commented Jan 30, 2025

i dont mean that this way cannot reach the target. but when i use spark style api i dont need to take care it. it's battery included. no need to call extra api to do it.

please correct me if I'm wrong.
the benefit of this approch is

  1. dont need to change api

the drawbacks are

  1. table provider developers have to take care of that and call an api to merge system field and normal field.
  2. hashmap performance is not good.
  3. hashmap need more effort to take care. for example, maybe we have another method that can also change hashmap later. we have to make sure that method wont change this special key value.

Another thing I haven't checked for this approach is that system column doesn't only apply on scan. it can also be applied on

Projection

Join

SubqueryAlias

@adriangb
Copy link
Contributor Author

I don't think (2) or (3) are real concerns. Things manipulating metadata should already be careful about not clobbering existing metadata and I really, really don't think the performance of inserting into a small HashMap will ever be an issue.

@adriangb
Copy link
Contributor Author

For (1) yes implementers of TableProvider will have to decide what fields they return, but isn't that already the case? I don't know how DataFusion would automatically determine what is a system column or not. It really depends on the data source.

@adriangb
Copy link
Contributor Author

Another thing I haven't checked for this approach is that system column doesn't only apply on scan. it can also be applied on Projection, Join, SubqueryAlias

Could you give example test cases that you think are relevant? I have a hunch that the current implementation may already handle these correctly.

@chenkovsky
Copy link

Another thing I haven't checked for this approach is that system column doesn't only apply on scan. it can also be applied on Projection, Join, SubqueryAlias

Could you give example test cases that you think are relevant? I have a hunch that the current implementation may already handle these correctly.

sorry I have no test. I haven't implemented these features now. because my implementation changes some APIs, so I think it's better to do it when APIs are stable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate logical-expr Logical plan and expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants