Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix join on arrays of unhashable types and allow hash join on all types supported at run-time #13388

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow_buffer::IntervalMonthDayNano;
use crate::cast::{
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
as_primitive_array, as_string_array, as_string_view_array, as_struct_array,
as_string_array, as_string_view_array, as_struct_array,
};
use crate::error::Result;
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down Expand Up @@ -392,14 +392,6 @@ pub fn create_hashes<'a>(
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
hash_array(array, random_state, hashes_buffer, rehash)
}
DataType::Decimal128(_, _) => {
findepi marked this conversation as resolved.
Show resolved Hide resolved
let array = as_primitive_array::<Decimal128Type>(array)?;
hash_array_primitive(array, random_state, hashes_buffer, rehash)
}
DataType::Decimal256(_, _) => {
let array = as_primitive_array::<Decimal256Type>(array)?;
hash_array_primitive(array, random_state, hashes_buffer, rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
Expand Down
59 changes: 42 additions & 17 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
};
use datafusion_expr_common::signature::{Signature, TypeSignature};

use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
Expand Down Expand Up @@ -958,7 +958,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(

/// Can this data type be used in hash join equal conditions??
/// Data types here come from function 'equal_rows', if more data types are supported
/// in equal_rows(hash join), add those data types here to generate join logical plan.
/// in create_hashes, add those data types here to generate join logical plan.
pub fn can_hash(data_type: &DataType) -> bool {
match data_type {
DataType::Null => true,
Expand All @@ -971,31 +971,38 @@ pub fn can_hash(data_type: &DataType) -> bool {
DataType::UInt16 => true,
DataType::UInt32 => true,
DataType::UInt64 => true,
DataType::Float16 => true,
DataType::Float32 => true,
DataType::Float64 => true,
DataType::Timestamp(time_unit, _) => match time_unit {
TimeUnit::Second => true,
TimeUnit::Millisecond => true,
TimeUnit::Microsecond => true,
TimeUnit::Nanosecond => true,
},
DataType::Decimal128(_, _) => true,
DataType::Decimal256(_, _) => true,
DataType::Timestamp(_, _) => true,
DataType::Utf8 => true,
DataType::LargeUtf8 => true,
DataType::Utf8View => true,
DataType::Decimal128(_, _) => true,
DataType::Binary => true,
DataType::LargeBinary => true,
DataType::BinaryView => true,
DataType::Date32 => true,
DataType::Date64 => true,
DataType::Time32(_) => true,
DataType::Time64(_) => true,
DataType::Duration(_) => true,
DataType::Interval(_) => true,
DataType::FixedSizeBinary(_) => true,
DataType::Dictionary(key_type, value_type)
if *value_type.as_ref() == DataType::Utf8 =>
{
DataType::is_dictionary_key_type(key_type)
DataType::Dictionary(key_type, value_type) => {
DataType::is_dictionary_key_type(key_type) && can_hash(value_type)
}
DataType::List(_) => true,
DataType::LargeList(_) => true,
DataType::FixedSizeList(_, _) => true,
DataType::List(value_type) => can_hash(value_type.data_type()),
DataType::LargeList(value_type) => can_hash(value_type.data_type()),
DataType::FixedSizeList(value_type, _) => can_hash(value_type.data_type()),
DataType::Map(map_struct, true | false) => can_hash(map_struct.data_type()),
DataType::Struct(fields) => fields.iter().all(|f| can_hash(f.data_type())),
_ => false,

DataType::ListView(_)
| DataType::LargeListView(_)
| DataType::Union(_, _)
| DataType::RunEndEncoded(_, _) => false,
}
}

Expand Down Expand Up @@ -1403,6 +1410,7 @@ mod tests {
test::function_stub::max_udaf, test::function_stub::min_udaf,
test::function_stub::sum_udaf, Cast, ExprFunctionExt, WindowFunctionDefinition,
};
use arrow::datatypes::{UnionFields, UnionMode};

#[test]
fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
Expand Down Expand Up @@ -1805,4 +1813,21 @@ mod tests {
assert!(accum.contains(&Column::from_name("a")));
Ok(())
}

#[test]
fn test_can_hash() {
let union_fields: UnionFields = [
(0, Arc::new(Field::new("A", DataType::Int32, true))),
(1, Arc::new(Field::new("B", DataType::Float64, true))),
]
.into_iter()
.collect();

let union_type = DataType::Union(union_fields, UnionMode::Sparse);
assert!(!can_hash(&union_type));

let list_union_type =
DataType::List(Arc::new(Field::new("my_union", union_type, true)));
assert!(!can_hash(&list_union_type));
}
}
9 changes: 7 additions & 2 deletions datafusion/sqllogictest/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ impl TestContext {
let example_udf = create_example_udf();
test_ctx.ctx.register_udf(example_udf);
register_partition_table(&mut test_ctx).await;
info!("Registering table with many types");
register_table_with_many_types(test_ctx.session_ctx()).await;
}
"metadata.slt" => {
info!("Registering metadata table tables");
Expand Down Expand Up @@ -251,8 +253,11 @@ pub async fn register_table_with_many_types(ctx: &SessionContext) {
.unwrap();
ctx.register_catalog("my_catalog", Arc::new(catalog));

ctx.register_table("my_catalog.my_schema.t2", table_with_many_types())
.unwrap();
ctx.register_table(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"my_catalog.my_schema.table_with_many_types",
table_with_many_types(),
)
.unwrap();
}

pub async fn register_table_with_map(ctx: &SessionContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ query TTTTITTTIIIIIIT rowsort
SELECT * from information_schema.columns;
----
my_catalog my_schema t1 i 0 NULL YES Int32 NULL NULL 32 2 NULL NULL NULL
my_catalog my_schema t2 binary_col 4 NULL NO Binary NULL 2147483647 NULL NULL NULL NULL NULL
my_catalog my_schema t2 float64_col 1 NULL YES Float64 NULL NULL 24 2 NULL NULL NULL
my_catalog my_schema t2 int32_col 0 NULL NO Int32 NULL NULL 32 2 NULL NULL NULL
my_catalog my_schema t2 large_binary_col 5 NULL NO LargeBinary NULL 9223372036854775807 NULL NULL NULL NULL NULL
my_catalog my_schema t2 large_utf8_col 3 NULL NO LargeUtf8 NULL 9223372036854775807 NULL NULL NULL NULL NULL
my_catalog my_schema t2 timestamp_nanos 6 NULL NO Timestamp(Nanosecond, None) NULL NULL NULL NULL NULL NULL NULL
my_catalog my_schema t2 utf8_col 2 NULL YES Utf8 NULL 2147483647 NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types binary_col 4 NULL NO Binary NULL 2147483647 NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types float64_col 1 NULL YES Float64 NULL NULL 24 2 NULL NULL NULL
my_catalog my_schema table_with_many_types int32_col 0 NULL NO Int32 NULL NULL 32 2 NULL NULL NULL
my_catalog my_schema table_with_many_types large_binary_col 5 NULL NO LargeBinary NULL 9223372036854775807 NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types large_utf8_col 3 NULL NO LargeUtf8 NULL 9223372036854775807 NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types timestamp_nanos 6 NULL NO Timestamp(Nanosecond, None) NULL NULL NULL NULL NULL NULL NULL
my_catalog my_schema table_with_many_types utf8_col 2 NULL YES Utf8 NULL 2147483647 NULL NULL NULL NULL NULL

# Cleanup
statement ok
drop table t1

statement ok
drop table t2
drop table table_with_many_types
21 changes: 21 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4292,3 +4292,24 @@ query T
select * from table1 as t1 natural join table1_stringview as t2;
----
foo

query TT
EXPLAIN SELECT count(*)
FROM my_catalog.my_schema.table_with_many_types AS l
JOIN my_catalog.my_schema.table_with_many_types AS r ON l.binary_col = r.binary_col
----
logical_plan
01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
02)--Projection:
03)----Inner Join: l.binary_col = r.binary_col
04)------SubqueryAlias: l
05)--------TableScan: my_catalog.my_schema.table_with_many_types projection=[binary_col]
06)------SubqueryAlias: r
07)--------TableScan: my_catalog.my_schema.table_with_many_types projection=[binary_col]
physical_plan
01)AggregateExec: mode=Single, gby=[], aggr=[count(*)]
02)--ProjectionExec: expr=[]
03)----CoalesceBatchesExec: target_batch_size=3
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(binary_col@0, binary_col@0)]
05)--------MemoryExec: partitions=1, partition_sizes=[1]
06)--------MemoryExec: partitions=1, partition_sizes=[1]