From 8882a8196573363691c58fba761fc5e9aea59355 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 07:26:09 +0100 Subject: [PATCH 01/11] Use `Arrow2` prefix for arrow2 types --- crates/store/re_chunk/src/util.rs | 74 +++++++++++++++---------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/crates/store/re_chunk/src/util.rs b/crates/store/re_chunk/src/util.rs index c8e64b143c03..279fe2a9d861 100644 --- a/crates/store/re_chunk/src/util.rs +++ b/crates/store/re_chunk/src/util.rs @@ -1,12 +1,12 @@ use arrow2::{ array::{ Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, - DictionaryArray as ArrowDictionaryArray, ListArray as ArrowListArray, + DictionaryArray as Arrow2DictionaryArray, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, }, - bitmap::Bitmap as ArrowBitmap, + bitmap::Bitmap as Arrow2Bitmap, datatypes::DataType as Arrow2Datatype, - offset::Offsets as ArrowOffsets, + offset::Offsets as Arrow2Offsets, }; use itertools::Itertools; @@ -19,7 +19,7 @@ use crate::TransportChunk; /// Semantic emptiness is defined as either one of these: /// * The list is physically empty (literally no data). /// * The list only contains null entries, or empty arrays, or a mix of both. -pub fn is_list_array_semantically_empty(list_array: &ArrowListArray) -> bool { +pub fn is_list_array_semantically_empty(list_array: &Arrow2ListArray) -> bool { let is_physically_empty = || list_array.is_empty(); let is_all_nulls = || { @@ -44,7 +44,7 @@ pub fn is_list_array_semantically_empty(list_array: &ArrowListArray) -> boo #[inline] pub fn arrays_to_list_array_opt( arrays: &[Option<&dyn Arrow2Array>], -) -> Option> { +) -> Option> { let datatype = arrays .iter() .flatten() @@ -61,7 +61,7 @@ pub fn arrays_to_list_array_opt( pub fn arrays_to_list_array( array_datatype: Arrow2Datatype, arrays: &[Option<&dyn Arrow2Array>], -) -> Option> { +) -> Option> { let arrays_dense = arrays.iter().flatten().copied().collect_vec(); let data = if arrays_dense.is_empty() { @@ -76,10 +76,10 @@ pub fn arrays_to_list_array( .ok()? }; - let datatype = ArrowListArray::::default_datatype(array_datatype); + let datatype = Arrow2ListArray::::default_datatype(array_datatype); #[allow(clippy::unwrap_used)] // yes, these are indeed lengths - let offsets = ArrowOffsets::try_from_lengths( + let offsets = Arrow2Offsets::try_from_lengths( arrays .iter() .map(|array| array.map_or(0, |array| array.len())), @@ -87,9 +87,9 @@ pub fn arrays_to_list_array( .unwrap(); #[allow(clippy::from_iter_instead_of_collect)] - let validity = ArrowBitmap::from_iter(arrays.iter().map(Option::is_some)); + let validity = Arrow2Bitmap::from_iter(arrays.iter().map(Option::is_some)); - Some(ArrowListArray::::new( + Some(Arrow2ListArray::::new( datatype, offsets.into(), data, @@ -111,7 +111,7 @@ pub fn arrays_to_list_array( pub fn arrays_to_dictionary( array_datatype: &Arrow2Datatype, arrays: &[Option<(Idx, &dyn Arrow2Array)>], -) -> Option> { +) -> Option> { // Dedupe the input arrays based on the given primary key. let arrays_dense_deduped = arrays .iter() @@ -156,10 +156,10 @@ pub fn arrays_to_dictionary( #[allow(clippy::unwrap_used)] // yes, these are indeed lengths let offsets = - ArrowOffsets::try_from_lengths(arrays_dense_deduped.iter().map(|array| array.len())) + Arrow2Offsets::try_from_lengths(arrays_dense_deduped.iter().map(|array| array.len())) .unwrap(); - ArrowListArray::::new(array_datatype.clone(), offsets.into(), values, None).to_boxed() + Arrow2ListArray::::new(array_datatype.clone(), offsets.into(), values, None).to_boxed() }; let datatype = Arrow2Datatype::Dictionary( @@ -170,7 +170,7 @@ pub fn arrays_to_dictionary( // And finally we build our dictionary, which indexes into our concatenated list-array of // unique values. - ArrowDictionaryArray::try_new( + Arrow2DictionaryArray::try_new( datatype, Arrow2PrimitiveArray::::from(keys), data.to_boxed(), @@ -178,14 +178,14 @@ pub fn arrays_to_dictionary( .ok() } -/// Given a sparse `ArrowListArray` (i.e. an array with a validity bitmap that contains at least -/// one falsy value), returns a dense `ArrowListArray` that only contains the non-null values from +/// Given a sparse `Arrow2ListArray` (i.e. an array with a validity bitmap that contains at least +/// one falsy value), returns a dense `Arrow2ListArray` that only contains the non-null values from /// the original list. /// /// This is a no-op if the original array is already dense. pub fn sparse_list_array_to_dense_list_array( - list_array: &ArrowListArray, -) -> ArrowListArray { + list_array: &Arrow2ListArray, +) -> Arrow2ListArray { if list_array.is_empty() { return list_array.clone(); } @@ -199,10 +199,10 @@ pub fn sparse_list_array_to_dense_list_array( #[allow(clippy::unwrap_used)] // yes, these are indeed lengths let offsets = - ArrowOffsets::try_from_lengths(list_array.iter().flatten().map(|array| array.len())) + Arrow2Offsets::try_from_lengths(list_array.iter().flatten().map(|array| array.len())) .unwrap(); - ArrowListArray::::new( + Arrow2ListArray::::new( list_array.data_type().clone(), offsets.into(), list_array.values().clone(), @@ -214,9 +214,9 @@ pub fn sparse_list_array_to_dense_list_array( /// /// This will share the same child data array buffer, but will create new offset and validity buffers. pub fn pad_list_array_back( - list_array: &ArrowListArray, + list_array: &Arrow2ListArray, target_len: usize, -) -> ArrowListArray { +) -> Arrow2ListArray { let missing_len = target_len.saturating_sub(list_array.len()); if missing_len == 0 { return list_array.clone(); @@ -226,7 +226,7 @@ pub fn pad_list_array_back( let offsets = { #[allow(clippy::unwrap_used)] // yes, these are indeed lengths - ArrowOffsets::try_from_lengths( + Arrow2Offsets::try_from_lengths( list_array .iter() .map(|array| array.map_or(0, |array| array.len())) @@ -240,14 +240,14 @@ pub fn pad_list_array_back( let validity = { if let Some(validity) = list_array.validity() { #[allow(clippy::from_iter_instead_of_collect)] - ArrowBitmap::from_iter( + Arrow2Bitmap::from_iter( validity .iter() .chain(std::iter::repeat(false).take(missing_len)), ) } else { #[allow(clippy::from_iter_instead_of_collect)] - ArrowBitmap::from_iter( + Arrow2Bitmap::from_iter( std::iter::repeat(true) .take(list_array.len()) .chain(std::iter::repeat(false).take(missing_len)), @@ -255,16 +255,16 @@ pub fn pad_list_array_back( } }; - ArrowListArray::new(datatype, offsets.into(), values, Some(validity)) + Arrow2ListArray::new(datatype, offsets.into(), values, Some(validity)) } /// Create a new `ListArray` of target length by appending null values to its front. /// /// This will share the same child data array buffer, but will create new offset and validity buffers. pub fn pad_list_array_front( - list_array: &ArrowListArray, + list_array: &Arrow2ListArray, target_len: usize, -) -> ArrowListArray { +) -> Arrow2ListArray { let missing_len = target_len.saturating_sub(list_array.len()); if missing_len == 0 { return list_array.clone(); @@ -274,7 +274,7 @@ pub fn pad_list_array_front( let offsets = { #[allow(clippy::unwrap_used)] // yes, these are indeed lengths - ArrowOffsets::try_from_lengths( + Arrow2Offsets::try_from_lengths( std::iter::repeat(0).take(missing_len).chain( list_array .iter() @@ -289,14 +289,14 @@ pub fn pad_list_array_front( let validity = { if let Some(validity) = list_array.validity() { #[allow(clippy::from_iter_instead_of_collect)] - ArrowBitmap::from_iter( + Arrow2Bitmap::from_iter( std::iter::repeat(false) .take(missing_len) .chain(validity.iter()), ) } else { #[allow(clippy::from_iter_instead_of_collect)] - ArrowBitmap::from_iter( + Arrow2Bitmap::from_iter( std::iter::repeat(false) .take(missing_len) .chain(std::iter::repeat(true).take(list_array.len())), @@ -304,23 +304,23 @@ pub fn pad_list_array_front( } }; - ArrowListArray::new(datatype, offsets.into(), values, Some(validity)) + Arrow2ListArray::new(datatype, offsets.into(), values, Some(validity)) } -/// Returns a new [`ArrowListArray`] with len `entries`. +/// Returns a new [`Arrow2ListArray`] with len `entries`. /// /// Each entry will be an empty array of the given `child_datatype`. pub fn new_list_array_of_empties( child_datatype: Arrow2Datatype, len: usize, -) -> ArrowListArray { +) -> Arrow2ListArray { let empty_array = arrow2::array::new_empty_array(child_datatype); #[allow(clippy::unwrap_used)] // yes, these are indeed lengths - let offsets = ArrowOffsets::try_from_lengths(std::iter::repeat(0).take(len)).unwrap(); + let offsets = Arrow2Offsets::try_from_lengths(std::iter::repeat(0).take(len)).unwrap(); - ArrowListArray::::new( - ArrowListArray::::default_datatype(empty_array.data_type().clone()), + Arrow2ListArray::::new( + Arrow2ListArray::::default_datatype(empty_array.data_type().clone()), offsets.into(), empty_array.to_boxed(), None, From ef6654cffb19802bc3edc22a1e8613cac9555fdd Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 07:30:57 +0100 Subject: [PATCH 02/11] Move arrow2-utils into module called `arrow2_util` --- .../re_chunk/src/{util.rs => arrow2_util.rs} | 0 crates/store/re_chunk/src/batcher.rs | 32 ++++++++++--------- crates/store/re_chunk/src/builder.rs | 8 ++--- crates/store/re_chunk/src/lib.rs | 2 +- crates/store/re_chunk/src/merge.rs | 16 ++++++---- crates/store/re_chunk/src/migration.rs | 3 +- crates/store/re_chunk/src/shuffle.rs | 2 +- crates/store/re_chunk/src/slice.rs | 22 ++++++------- crates/store/re_chunk/src/transport.rs | 6 ++-- crates/store/re_chunk/tests/memory_test.rs | 14 ++++---- crates/store/re_chunk_store/src/writes.rs | 2 +- crates/store/re_dataframe/src/lib.rs | 2 +- crates/store/re_dataframe/src/query.rs | 4 +-- crates/store/re_grpc_client/src/lib.rs | 4 +-- 14 files changed, 62 insertions(+), 55 deletions(-) rename crates/store/re_chunk/src/{util.rs => arrow2_util.rs} (100%) diff --git a/crates/store/re_chunk/src/util.rs b/crates/store/re_chunk/src/arrow2_util.rs similarity index 100% rename from crates/store/re_chunk/src/util.rs rename to crates/store/re_chunk/src/arrow2_util.rs diff --git a/crates/store/re_chunk/src/batcher.rs b/crates/store/re_chunk/src/batcher.rs index 44a9ffe6be70..0ef2fadcddf4 100644 --- a/crates/store/re_chunk/src/batcher.rs +++ b/crates/store/re_chunk/src/batcher.rs @@ -12,7 +12,7 @@ use re_byte_size::SizeBytes as _; use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline}; use re_types_core::ComponentDescriptor; -use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; +use crate::{arrow2_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; // --- @@ -734,7 +734,7 @@ impl PendingRow { let mut per_name = ChunkComponents::default(); for (component_desc, array) in components { - let list_array = crate::util::arrays_to_list_array_opt(&[Some(&*array as _)]); + let list_array = arrow2_util::arrays_to_list_array_opt(&[Some(&*array as _)]); if let Some(list_array) = list_array { per_name.insert_descriptor(component_desc, list_array); } @@ -870,7 +870,7 @@ impl PendingRow { for (component_desc, arrays) in std::mem::take(&mut components) { let list_array = - crate::util::arrays_to_list_array_opt(&arrays); + arrow2_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { per_name.insert_descriptor(component_desc, list_array); } @@ -915,7 +915,7 @@ impl PendingRow { { let mut per_name = ChunkComponents::default(); for (component_desc, arrays) in components { - let list_array = crate::util::arrays_to_list_array_opt(&arrays); + let list_array = arrow2_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { per_name.insert_descriptor(component_desc, list_array); } @@ -995,6 +995,8 @@ mod tests { use re_log_types::example_components::{MyPoint, MyPoint64}; use re_types_core::{Component as _, Loggable as _}; + use crate::arrow2_util; + use super::*; /// A bunch of rows that don't fit any of the split conditions should end up together. @@ -1060,7 +1062,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) + arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) .unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( @@ -1132,7 +1134,7 @@ mod tests { let expected_timelines = []; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) + arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) .unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( @@ -1216,7 +1218,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), + arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[0].id, @@ -1244,7 +1246,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), + arrow2_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, @@ -1331,7 +1333,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(), + arrow2_util::arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[0].id, @@ -1369,7 +1371,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(), + arrow2_util::arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, @@ -1452,7 +1454,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), + arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[0].id, @@ -1480,7 +1482,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), + arrow2_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, @@ -1591,7 +1593,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt( + arrow2_util::arrays_to_list_array_opt( &[&*points1, &*points2, &*points3, &*points4].map(Some), ) .unwrap(), @@ -1705,7 +1707,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) + arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) .unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( @@ -1744,7 +1746,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - crate::util::arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(), + arrow2_util::arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, diff --git a/crates/store/re_chunk/src/builder.rs b/crates/store/re_chunk/src/builder.rs index debfa8297e45..2dcdbcaf0f0b 100644 --- a/crates/store/re_chunk/src/builder.rs +++ b/crates/store/re_chunk/src/builder.rs @@ -9,7 +9,7 @@ use nohash_hasher::IntMap; use re_log_types::{EntityPath, TimeInt, TimePoint, Timeline}; use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor}; -use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; +use crate::{arrow2_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; // --- @@ -236,7 +236,7 @@ impl ChunkBuilder { .into_iter() .filter_map(|(component_desc, arrays)| { let arrays = arrays.iter().map(|array| array.as_deref()).collect_vec(); - crate::util::arrays_to_list_array_opt(&arrays) + arrow2_util::arrays_to_list_array_opt(&arrays) .map(|list_array| (component_desc, list_array)) }) { @@ -295,10 +295,10 @@ impl ChunkBuilder { // If we know the datatype in advance, we're able to keep even fully sparse // columns around. if let Some(datatype) = datatypes.get(&component_desc) { - crate::util::arrays_to_list_array(datatype.clone(), &arrays) + arrow2_util::arrays_to_list_array(datatype.clone(), &arrays) .map(|list_array| (component_desc, list_array)) } else { - crate::util::arrays_to_list_array_opt(&arrays) + arrow2_util::arrays_to_list_array_opt(&arrays) .map(|list_array| (component_desc, list_array)) } }) diff --git a/crates/store/re_chunk/src/lib.rs b/crates/store/re_chunk/src/lib.rs index 6f24bbe8d371..e0a808f035fd 100644 --- a/crates/store/re_chunk/src/lib.rs +++ b/crates/store/re_chunk/src/lib.rs @@ -4,6 +4,7 @@ #![doc = document_features::document_features!()] //! +pub mod arrow2_util; mod builder; mod chunk; mod helpers; @@ -16,7 +17,6 @@ mod range; mod shuffle; mod slice; mod transport; -pub mod util; #[cfg(not(target_arch = "wasm32"))] mod batcher; diff --git a/crates/store/re_chunk/src/merge.rs b/crates/store/re_chunk/src/merge.rs index 056672a1ef2f..cd8ef80f32d2 100644 --- a/crates/store/re_chunk/src/merge.rs +++ b/crates/store/re_chunk/src/merge.rs @@ -5,7 +5,9 @@ use arrow2::array::{ use itertools::{izip, Itertools}; use nohash_hasher::IntMap; -use crate::{chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, TimeColumn}; +use crate::{ + arrow2_util, chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, TimeColumn, +}; // --- @@ -46,7 +48,7 @@ impl Chunk { let row_ids = { re_tracing::profile_scope!("row_ids"); - let row_ids = crate::util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?; + let row_ids = arrow2_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?; #[allow(clippy::unwrap_used)] // concatenating 2 RowId arrays must yield another RowId array row_ids @@ -105,7 +107,7 @@ impl Chunk { )); let list_array = - crate::util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?; + arrow2_util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?; let list_array = list_array .as_any() .downcast_ref::>()? @@ -116,7 +118,7 @@ impl Chunk { re_tracing::profile_scope!("pad"); Some(( component_desc.clone(), - crate::util::pad_list_array_back( + arrow2_util::pad_list_array_back( lhs_list_array, self.num_rows() + rhs.num_rows(), ), @@ -147,7 +149,7 @@ impl Chunk { )); let list_array = - crate::util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?; + arrow2_util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?; let list_array = list_array .as_any() .downcast_ref::>()? @@ -158,7 +160,7 @@ impl Chunk { re_tracing::profile_scope!("pad"); Some(( component_desc.clone(), - crate::util::pad_list_array_front( + arrow2_util::pad_list_array_front( rhs_list_array, self.num_rows() + rhs.num_rows(), ), @@ -285,7 +287,7 @@ impl TimeColumn { let time_range = self.time_range.union(rhs.time_range); - let times = crate::util::concat_arrays(&[&self.times, &rhs.times]).ok()?; + let times = arrow2_util::concat_arrays(&[&self.times, &rhs.times]).ok()?; let times = times .as_any() .downcast_ref::>()? diff --git a/crates/store/re_chunk/src/migration.rs b/crates/store/re_chunk/src/migration.rs index 0728b859edf8..165702c92916 100644 --- a/crates/store/re_chunk/src/migration.rs +++ b/crates/store/re_chunk/src/migration.rs @@ -78,7 +78,8 @@ impl Chunk { .map(|a| a.as_deref() as Option<&dyn Array>) .collect_vec(); - if let Some(list_array_patched) = crate::util::arrays_to_list_array_opt(&arrays) + if let Some(list_array_patched) = + crate::arrow2_util::arrays_to_list_array_opt(&arrays) { *list_array = list_array_patched; } diff --git a/crates/store/re_chunk/src/shuffle.rs b/crates/store/re_chunk/src/shuffle.rs index 3cc07a346f35..ba9639898856 100644 --- a/crates/store/re_chunk/src/shuffle.rs +++ b/crates/store/re_chunk/src/shuffle.rs @@ -279,7 +279,7 @@ impl Chunk { ArrowOffsets::try_from_lengths(sorted_arrays.iter().map(|array| array.len())) .unwrap(); #[allow(clippy::unwrap_used)] // these are slices of the same outer array - let values = crate::util::concat_arrays(&sorted_arrays).unwrap(); + let values = crate::arrow2_util::concat_arrays(&sorted_arrays).unwrap(); let validity = original .validity() .map(|validity| swaps.iter().map(|&from| validity.get_bit(from)).collect()); diff --git a/crates/store/re_chunk/src/slice.rs b/crates/store/re_chunk/src/slice.rs index 00b77a91a34d..b361254c76a9 100644 --- a/crates/store/re_chunk/src/slice.rs +++ b/crates/store/re_chunk/src/slice.rs @@ -9,7 +9,7 @@ use nohash_hasher::IntSet; use re_log_types::Timeline; use re_types_core::{ComponentDescriptor, ComponentName}; -use crate::{Chunk, RowId, TimeColumn}; +use crate::{arrow2_util, Chunk, RowId, TimeColumn}; // --- @@ -369,7 +369,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: crate::util::filter_array(row_ids, &validity_filter), + row_ids: arrow2_util::filter_array(row_ids, &validity_filter), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.filtered(&validity_filter))) @@ -377,7 +377,7 @@ impl Chunk { components: components .iter_flattened() .map(|(component_desc, list_array)| { - let filtered = crate::util::filter_array(list_array, &validity_filter); + let filtered = arrow2_util::filter_array(list_array, &validity_filter); let filtered = if component_desc.component_name == component_name_pov { // Make sure we fully remove the validity bitmap for the densified // component. @@ -546,7 +546,7 @@ impl Chunk { entity_path: self.entity_path.clone(), heap_size_bytes: Default::default(), is_sorted: self.is_sorted, - row_ids: crate::util::take_array(&self.row_ids, &indices), + row_ids: arrow2_util::take_array(&self.row_ids, &indices), timelines: self .timelines .iter() @@ -556,7 +556,7 @@ impl Chunk { .components .iter_flattened() .map(|(component_desc, list_array)| { - let filtered = crate::util::take_array(list_array, &indices); + let filtered = arrow2_util::take_array(list_array, &indices); (component_desc.clone(), filtered) }) .collect(), @@ -619,7 +619,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: crate::util::filter_array(row_ids, filter), + row_ids: arrow2_util::filter_array(row_ids, filter), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.filtered(filter))) @@ -627,7 +627,7 @@ impl Chunk { components: components .iter_flattened() .map(|(component_desc, list_array)| { - let filtered = crate::util::filter_array(list_array, filter); + let filtered = arrow2_util::filter_array(list_array, filter); (component_desc.clone(), filtered) }) .collect(), @@ -699,7 +699,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: crate::util::take_array(row_ids, indices), + row_ids: arrow2_util::take_array(row_ids, indices), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.taken(indices))) @@ -707,7 +707,7 @@ impl Chunk { components: components .iter_flattened() .map(|(component_desc, list_array)| { - let taken = crate::util::take_array(list_array, indices); + let taken = arrow2_util::take_array(list_array, indices); (component_desc.clone(), taken) }) .collect(), @@ -852,7 +852,7 @@ impl TimeColumn { Self::new( is_sorted_opt, *timeline, - crate::util::filter_array(times, filter), + arrow2_util::filter_array(times, filter), ) } @@ -871,7 +871,7 @@ impl TimeColumn { Self::new( Some(*is_sorted), *timeline, - crate::util::take_array(times, indices), + arrow2_util::take_array(times, indices), ) } } diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index c7bf9d52559e..96e1b55e1c6d 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -724,6 +724,8 @@ mod tests { Timeline, }; + use crate::arrow2_util; + use super::*; #[test] @@ -763,7 +765,7 @@ mod tests { let components = [ (MyPoint::descriptor(), { - let list_array = crate::util::arrays_to_list_array_opt(&[ + let list_array = arrow2_util::arrays_to_list_array_opt(&[ Some(&*points1), points2, Some(&*points3), @@ -774,7 +776,7 @@ mod tests { list_array }), (MyPoint::descriptor(), { - let list_array = crate::util::arrays_to_list_array_opt(&[ + let list_array = arrow2_util::arrays_to_list_array_opt(&[ Some(&*colors1), Some(&*colors2), colors3, diff --git a/crates/store/re_chunk/tests/memory_test.rs b/crates/store/re_chunk/tests/memory_test.rs index a6c0e7c6df7c..84ae71fd25a3 100644 --- a/crates/store/re_chunk/tests/memory_test.rs +++ b/crates/store/re_chunk/tests/memory_test.rs @@ -62,6 +62,7 @@ use arrow2::{ offset::Offsets as Arrow2Offsets, }; use itertools::Itertools; +use re_chunk::arrow2_util; // --- concat --- @@ -89,8 +90,7 @@ fn concat_does_allocate() { .map(|a| &**a as &dyn Arrow2Array) .collect_vec(); - let concatenated = - memory_use(|| re_chunk::util::concat_arrays(&unconcatenated_refs).unwrap()); + let concatenated = memory_use(|| arrow2_util::concat_arrays(&unconcatenated_refs).unwrap()); (unconcatenated, concatenated) }); @@ -122,7 +122,7 @@ fn concat_single_is_noop() { }); let concatenated = - memory_use(|| re_chunk::util::concat_arrays(&[&*unconcatenated.0]).unwrap()); + memory_use(|| arrow2_util::concat_arrays(&[&*unconcatenated.0]).unwrap()); (unconcatenated, concatenated) }); @@ -185,7 +185,7 @@ fn filter_does_allocate() { let filter = Arrow2BooleanArray::from_slice( (0..unfiltered.0.len()).map(|i| i % 2 == 0).collect_vec(), ); - let filtered = memory_use(|| re_chunk::util::filter_array(&unfiltered.0, &filter)); + let filtered = memory_use(|| arrow2_util::filter_array(&unfiltered.0, &filter)); (unfiltered, filtered) }); @@ -249,7 +249,7 @@ fn filter_empty_or_full_is_noop() { .take(unfiltered.0.len()) .collect_vec(), ); - let filtered = memory_use(|| re_chunk::util::filter_array(&unfiltered.0, &filter)); + let filtered = memory_use(|| arrow2_util::filter_array(&unfiltered.0, &filter)); (unfiltered, filtered) }); @@ -320,7 +320,7 @@ fn take_does_not_allocate() { .filter(|i| i % 2 == 0) .collect_vec(), ); - let taken = memory_use(|| re_chunk::util::take_array(&untaken.0, &indices)); + let taken = memory_use(|| arrow2_util::take_array(&untaken.0, &indices)); (untaken, taken) }); @@ -380,7 +380,7 @@ fn take_empty_or_full_is_noop() { }); let indices = Arrow2PrimitiveArray::from_vec((0..untaken.0.len() as i32).collect_vec()); - let taken = memory_use(|| re_chunk::util::take_array(&untaken.0, &indices)); + let taken = memory_use(|| arrow2_util::take_array(&untaken.0, &indices)); (untaken, taken) }); diff --git a/crates/store/re_chunk_store/src/writes.rs b/crates/store/re_chunk_store/src/writes.rs index 19d14ad72b85..204ad39abc8d 100644 --- a/crates/store/re_chunk_store/src/writes.rs +++ b/crates/store/re_chunk_store/src/writes.rs @@ -387,7 +387,7 @@ impl ChunkStore { }); { let is_semantically_empty = - re_chunk::util::is_list_array_semantically_empty(list_array); + re_chunk::arrow2_util::is_list_array_semantically_empty(list_array); column_metadata_state.is_semantically_empty &= is_semantically_empty; } diff --git a/crates/store/re_dataframe/src/lib.rs b/crates/store/re_dataframe/src/lib.rs index 5713c3413ad2..964ec466c789 100644 --- a/crates/store/re_dataframe/src/lib.rs +++ b/crates/store/re_dataframe/src/lib.rs @@ -9,7 +9,7 @@ pub use self::query::QueryHandle; #[doc(no_inline)] pub use self::external::arrow2::chunk::Chunk as Arrow2Chunk; #[doc(no_inline)] -pub use self::external::re_chunk::{util::concatenate_record_batches, TransportChunk}; +pub use self::external::re_chunk::{arrow2_util::concatenate_record_batches, TransportChunk}; #[doc(no_inline)] pub use self::external::re_chunk_store::{ ChunkStoreConfig, ChunkStoreHandle, ColumnSelector, ComponentColumnSelector, Index, IndexRange, diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index b5c8e9e2d2a2..b3ba686d7705 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -260,7 +260,7 @@ impl QueryHandle { archetype_name: descr.archetype_name, archetype_field_name: descr.archetype_field_name, }, - re_chunk::util::new_list_array_of_empties( + re_chunk::arrow2_util::new_list_array_of_empties( child_datatype, chunk.num_rows(), ), @@ -1334,7 +1334,7 @@ mod tests { use std::sync::Arc; use re_chunk::{ - util::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint, TransportChunk, + arrow2_util::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint, TransportChunk, }; use re_chunk_store::{ ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ResolvedTimeRange, TimeInt, diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 2ce557c5890b..b5a0a70ede79 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -438,7 +438,7 @@ async fn stream_catalog_async( let data_arrays = sliced.iter().map(|e| Some(e.as_ref())).collect::>(); #[allow(clippy::unwrap_used)] // we know we've given the right field type let data_field_array: arrow2::array::ListArray = - re_chunk::util::arrays_to_list_array( + re_chunk::arrow2_util::arrays_to_list_array( data_field_inner.data_type().clone(), &data_arrays, ) @@ -501,7 +501,7 @@ async fn stream_catalog_async( let rec_id_field = Arrow2Field::new("item", arrow2::datatypes::DataType::Utf8, true); #[allow(clippy::unwrap_used)] // we know we've given the right field type - let uris = re_chunk::util::arrays_to_list_array( + let uris = re_chunk::arrow2_util::arrays_to_list_array( rec_id_field.data_type().clone(), &recording_id_arrays, ) From 8d8638acdc961a00a2a1d0240520032a496ce91e Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 08:06:10 +0100 Subject: [PATCH 03/11] Create an arrow_util.rs --- crates/store/re_chunk/Cargo.toml | 1 + crates/store/re_chunk/src/arrow_util.rs | 317 ++++++++++++++++++++++++ crates/store/re_chunk/src/lib.rs | 1 + 3 files changed, 319 insertions(+) create mode 100644 crates/store/re_chunk/src/arrow_util.rs diff --git a/crates/store/re_chunk/Cargo.toml b/crates/store/re_chunk/Cargo.toml index 90e253be49dc..4e091da8a29e 100644 --- a/crates/store/re_chunk/Cargo.toml +++ b/crates/store/re_chunk/Cargo.toml @@ -52,6 +52,7 @@ ahash.workspace = true anyhow.workspace = true arrow.workspace = true arrow2 = { workspace = true, features = [ + "arrow", "compute_concatenate", "compute_filter", "compute_take", diff --git a/crates/store/re_chunk/src/arrow_util.rs b/crates/store/re_chunk/src/arrow_util.rs new file mode 100644 index 000000000000..856b7dc3ce2c --- /dev/null +++ b/crates/store/re_chunk/src/arrow_util.rs @@ -0,0 +1,317 @@ +use arrow::{ + array::{Array, ArrayRef, ArrowPrimitiveType, BooleanArray, ListArray, PrimitiveArray}, + buffer::{NullBuffer, OffsetBuffer}, + datatypes::{DataType, Field}, +}; +use itertools::Itertools; + +// --- + +/// Returns true if the given `list_array` is semantically empty. +/// +/// Semantic emptiness is defined as either one of these: +/// * The list is physically empty (literally no data). +/// * The list only contains null entries, or empty arrays, or a mix of both. +#[inline] +pub fn is_list_array_semantically_empty(list_array: &ListArray) -> bool { + list_array.values().is_empty() +} + +/// Create a sparse list-array out of an array of arrays. +/// +/// All arrays must have the same datatype. +/// +/// Returns `None` if `arrays` is empty. +#[inline] +pub fn arrays_to_list_array_opt(arrays: &[Option<&dyn Array>]) -> Option { + let datatype = arrays + .iter() + .flatten() + .map(|array| array.data_type().clone()) + .next()?; + arrays_to_list_array(datatype, arrays) +} + +/// An empty array of the given datatype. +pub fn new_empty_array(datatype: &DataType) -> ArrayRef { + let capacity = 0; + arrow::array::make_builder(datatype, capacity).finish() +} + +/// Create a sparse list-array out of an array of arrays. +/// +/// Returns `None` if any of the specified `arrays` doesn't match the given `array_datatype`. +/// +/// Returns an empty list if `arrays` is empty. +pub fn arrays_to_list_array( + array_datatype: DataType, + arrays: &[Option<&dyn Array>], +) -> Option { + let arrays_dense = arrays.iter().flatten().copied().collect_vec(); + + let data = if arrays_dense.is_empty() { + new_empty_array(&array_datatype) + } else { + re_tracing::profile_scope!("concatenate", arrays_dense.len().to_string()); + concat_arrays(&arrays_dense) + .map_err(|err| { + re_log::warn_once!("failed to concatenate arrays: {err}"); + err + }) + .ok()? + }; + + let nullable = true; + let field = Field::new_list_field(array_datatype, nullable); + + let offsets = OffsetBuffer::from_lengths( + arrays + .iter() + .map(|array| array.map_or(0, |array| array.len())), + ); + + #[allow(clippy::from_iter_instead_of_collect)] + let nulls = NullBuffer::from_iter(arrays.iter().map(Option::is_some)); + + Some(ListArray::new(field.into(), offsets, data, nulls.into())) +} + +/// Given a sparse [`ListArray`] (i.e. an array with a nulls bitmap that contains at least +/// one falsy value), returns a dense [`ListArray`] that only contains the non-null values from +/// the original list. +/// +/// This is a no-op if the original array is already dense. +pub fn sparse_list_array_to_dense_list_array(list_array: &ListArray) -> ListArray { + if list_array.is_empty() { + return list_array.clone(); + } + + let is_empty = list_array.nulls().map_or(false, |nulls| nulls.is_empty()); + if is_empty { + return list_array.clone(); + } + + let offsets = OffsetBuffer::from_lengths(list_array.iter().flatten().map(|array| array.len())); + + let fields = list_array_fields(list_array); + + ListArray::new(fields, offsets, list_array.values().clone(), None) +} + +fn list_array_fields(list_array: &arrow::array::GenericListArray) -> std::sync::Arc { + match list_array.data_type() { + DataType::List(fields) | DataType::LargeList(fields) => fields, + _ => unreachable!("The GenericListArray constructor guaranteed we can't get here"), + } + .clone() +} + +/// Create a new [`ListArray`] of target length by appending null values to its back. +/// +/// This will share the same child data array buffer, but will create new offset and nulls buffers. +pub fn pad_list_array_back(list_array: &ListArray, target_len: usize) -> ListArray { + let missing_len = target_len.saturating_sub(list_array.len()); + if missing_len == 0 { + return list_array.clone(); + } + + let fields = list_array_fields(list_array); + + let offsets = { + OffsetBuffer::from_lengths( + list_array + .iter() + .map(|array| array.map_or(0, |array| array.len())) + .chain(std::iter::repeat(0).take(missing_len)), + ) + }; + + let values = list_array.values().clone(); + + let nulls = { + if let Some(nulls) = list_array.nulls() { + #[allow(clippy::from_iter_instead_of_collect)] + NullBuffer::from_iter( + nulls + .iter() + .chain(std::iter::repeat(false).take(missing_len)), + ) + } else { + #[allow(clippy::from_iter_instead_of_collect)] + NullBuffer::from_iter( + std::iter::repeat(true) + .take(list_array.len()) + .chain(std::iter::repeat(false).take(missing_len)), + ) + } + }; + + ListArray::new(fields, offsets, values, Some(nulls)) +} + +/// Create a new [`ListArray`] of target length by appending null values to its front. +/// +/// This will share the same child data array buffer, but will create new offset and nulls buffers. +pub fn pad_list_array_front(list_array: &ListArray, target_len: usize) -> ListArray { + let missing_len = target_len.saturating_sub(list_array.len()); + if missing_len == 0 { + return list_array.clone(); + } + + let fields = list_array_fields(list_array); + + let offsets = { + OffsetBuffer::from_lengths( + std::iter::repeat(0).take(missing_len).chain( + list_array + .iter() + .map(|array| array.map_or(0, |array| array.len())), + ), + ) + }; + + let values = list_array.values().clone(); + + let nulls = { + if let Some(nulls) = list_array.nulls() { + #[allow(clippy::from_iter_instead_of_collect)] + NullBuffer::from_iter( + std::iter::repeat(false) + .take(missing_len) + .chain(nulls.iter()), + ) + } else { + #[allow(clippy::from_iter_instead_of_collect)] + NullBuffer::from_iter( + std::iter::repeat(false) + .take(missing_len) + .chain(std::iter::repeat(true).take(list_array.len())), + ) + } + }; + + ListArray::new(fields, offsets, values, Some(nulls)) +} + +/// Returns a new [[`ListArray`]] with len `entries`. +/// +/// Each entry will be an empty array of the given `child_datatype`. +pub fn new_list_array_of_empties(child_datatype: &DataType, len: usize) -> ListArray { + let empty_array = new_empty_array(child_datatype); + + let offsets = OffsetBuffer::from_lengths(std::iter::repeat(0).take(len)); + + let nullable = true; + ListArray::new( + Field::new_list_field(empty_array.data_type().clone(), nullable).into(), + offsets, + empty_array, + None, + ) +} + +/// Applies a [`arrow::compute::concat`] kernel to the given `arrays`. +/// +/// Early outs where it makes sense (e.g. `arrays.len() == 1`). +/// +/// Returns an error if the arrays don't share the exact same datatype. +pub fn concat_arrays(arrays: &[&dyn Array]) -> arrow::error::Result { + arrow::compute::concat(arrays) +} + +/// Applies a [filter] kernel to the given `array`. +/// +/// Panics iff the length of the filter doesn't match the length of the array. +/// +/// In release builds, filters are allowed to have null entries (they will be interpreted as `false`). +/// In debug builds, null entries will panic. +/// +/// Note: a `filter` kernel _copies_ the data in order to make the resulting arrays contiguous in memory. +/// +/// Takes care of up- and down-casting the data back and forth on behalf of the caller. +/// +/// [filter]: arrow::compute::filter +pub fn filter_array(array: &A, filter: &BooleanArray) -> A { + assert_eq!( + array.len(), filter.len(), + "the length of the filter must match the length of the array (the underlying kernel will panic otherwise)", + ); + debug_assert!( + filter.nulls().is_none(), + "filter masks with nulls bits are technically valid, but generally a sign that something went wrong", + ); + + #[allow(clippy::disallowed_methods)] // that's the whole point + #[allow(clippy::unwrap_used)] + arrow::compute::filter(array, filter) + // Unwrap: this literally cannot fail. + .unwrap() + .as_any() + .downcast_ref::() + // Unwrap: that's initial type that we got. + .unwrap() + .clone() +} + +/// Applies a [take] kernel to the given `array`. +/// +/// In release builds, indices are allowed to have null entries (they will be taken as `null`s). +/// In debug builds, null entries will panic. +/// +/// Note: a `take` kernel _copies_ the data in order to make the resulting arrays contiguous in memory. +/// +/// Takes care of up- and down-casting the data back and forth on behalf of the caller. +/// +/// [take]: arrow::compute::take +// +// TODO(cmc): in an ideal world, a `take` kernel should merely _slice_ the data and avoid any allocations/copies +// where possible (e.g. list-arrays). +// That is not possible with vanilla [`ListArray`]s since they don't expose any way to encode optional lengths, +// in addition to offsets. +// For internal stuff, we could perhaps provide a custom implementation that returns a `DictionaryArray` instead? +pub fn take_array(array: &A, indices: &PrimitiveArray) -> A +where + A: Array + Clone + 'static, + O: ArrowPrimitiveType, + O::Native: std::ops::Add, +{ + use arrow::datatypes::ArrowNativeTypeOp as _; + + debug_assert!( + indices.nulls().is_none(), + "index arrays with nulls bits are technically valid, but generally a sign that something went wrong", + ); + + if indices.len() == array.len() { + let indices = indices.values().as_ref(); + + let starts_at_zero = || indices[0] == O::Native::ZERO; + let is_consecutive = || { + indices + .windows(2) + .all(|values| values[1] == values[0] + O::Native::ONE) + }; + + if starts_at_zero() && is_consecutive() { + #[allow(clippy::unwrap_used)] + return array + .clone() + .as_any() + .downcast_ref::() + // Unwrap: that's initial type that we got. + .unwrap() + .clone(); + } + } + + #[allow(clippy::disallowed_methods)] // that's the whole point + #[allow(clippy::unwrap_used)] + arrow::compute::take(array, indices, Default::default()) + // Unwrap: this literally cannot fail. + .unwrap() + .as_any() + .downcast_ref::() + // Unwrap: that's initial type that we got. + .unwrap() + .clone() +} diff --git a/crates/store/re_chunk/src/lib.rs b/crates/store/re_chunk/src/lib.rs index e0a808f035fd..9ba7b632b86d 100644 --- a/crates/store/re_chunk/src/lib.rs +++ b/crates/store/re_chunk/src/lib.rs @@ -5,6 +5,7 @@ //! pub mod arrow2_util; +pub mod arrow_util; mod builder; mod chunk; mod helpers; From 288552dee5e71b7017dcd051483c7606a9e4a91f Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 16:25:24 +0100 Subject: [PATCH 04/11] Port PendingRow to arrow-rs --- Cargo.lock | 7 +++-- Cargo.toml | 2 ++ crates/store/re_chunk/src/batcher.rs | 31 ++++++++++++----------- crates/store/re_chunk/src/chunk.rs | 17 +++++++++++++ crates/top/re_sdk/src/recording_stream.rs | 2 +- crates/top/rerun_c/src/lib.rs | 2 +- rerun_py/src/arrow.rs | 2 +- 7 files changed, 41 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 929018b52039..71ba376ac16e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5183,7 +5183,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.1", "itertools 0.13.0", "log", "multimap", @@ -5542,9 +5542,8 @@ dependencies = [ [[package]] name = "re_arrow2" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f046c5679b0f305d610f80d93fd51ad702cfc077bbe16d9553a1660a2505160" +version = "0.18.1" +source = "git+https://github.com/rerun-io/re_arrow2.git?branch=emilk/more-arrow-compatibility#0e4b3dd7cd73426b1209ebe0323087452a7c8b91" dependencies = [ "ahash", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 6b8eee119203..35b70f963616 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -587,3 +587,5 @@ egui_commonmark = { git = "https://github.com/rerun-io/egui_commonmark.git", bra # walkers = { git = "https://github.com/rerun-io/walkers", rev = "8939cceb3fa49ca8648ee16fe1d8432f5ab0bdcc" } # https://github.com/podusowski/walkers/pull/222 # dav1d = { path = "/home/cmc/dev/rerun-io/rav1d", package = "re_rav1d", version = "0.1.1" } + +re_arrow2 = { git = "https://github.com/rerun-io/re_arrow2.git", branch = "emilk/more-arrow-compatibility" } # TODO : point to main branmch diff --git a/crates/store/re_chunk/src/batcher.rs b/crates/store/re_chunk/src/batcher.rs index 0ef2fadcddf4..814616eabad8 100644 --- a/crates/store/re_chunk/src/batcher.rs +++ b/crates/store/re_chunk/src/batcher.rs @@ -4,7 +4,8 @@ use std::{ time::{Duration, Instant}, }; -use arrow2::array::{Array as Arrow2Array, PrimitiveArray as Arrow2PrimitiveArray}; +use arrow::array::{Array as ArrowArray, ArrayRef}; +use arrow2::array::PrimitiveArray as Arrow2PrimitiveArray; use crossbeam::channel::{Receiver, Sender}; use nohash_hasher::IntMap; @@ -12,7 +13,7 @@ use re_byte_size::SizeBytes as _; use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline}; use re_types_core::ComponentDescriptor; -use crate::{arrow2_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; +use crate::{arrow_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; // --- @@ -679,15 +680,12 @@ pub struct PendingRow { /// The component data. /// /// Each array is a single component, i.e. _not_ a list array. - pub components: IntMap>, + pub components: IntMap, } impl PendingRow { #[inline] - pub fn new( - timepoint: TimePoint, - components: IntMap>, - ) -> Self { + pub fn new(timepoint: TimePoint, components: IntMap) -> Self { Self { row_id: RowId::new(), timepoint, @@ -734,9 +732,9 @@ impl PendingRow { let mut per_name = ChunkComponents::default(); for (component_desc, array) in components { - let list_array = arrow2_util::arrays_to_list_array_opt(&[Some(&*array as _)]); + let list_array = arrow_util::arrays_to_list_array_opt(&[Some(&*array as _)]); if let Some(list_array) = list_array { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor_arrow1(component_desc, list_array); } } @@ -826,7 +824,7 @@ impl PendingRow { // Create all the logical list arrays that we're going to need, accounting for the // possibility of sparse components in the data. - let mut all_components: IntMap>> = + let mut all_components: IntMap>> = IntMap::default(); for row in &rows { for component_desc in row.components.keys() { @@ -870,9 +868,12 @@ impl PendingRow { for (component_desc, arrays) in std::mem::take(&mut components) { let list_array = - arrow2_util::arrays_to_list_array_opt(&arrays); + arrow_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor_arrow1( + component_desc, + list_array, + ); } } per_name @@ -898,7 +899,7 @@ impl PendingRow { arrays.push( row_components .get(component_desc) - .map(|array| &**array as &dyn Arrow2Array), + .map(|array| &**array as &dyn ArrowArray), ); } } @@ -915,9 +916,9 @@ impl PendingRow { { let mut per_name = ChunkComponents::default(); for (component_desc, arrays) in components { - let list_array = arrow2_util::arrays_to_list_array_opt(&arrays); + let list_array = arrow_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor_arrow1(component_desc, list_array); } } per_name diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index ca2605d97dd4..11c8c6baad6c 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -1,6 +1,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use ahash::HashMap; +use arrow::array::ListArray as ArrowListArray; use arrow2::{ array::{ Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, @@ -60,6 +61,22 @@ pub struct ChunkComponents( ); impl ChunkComponents { + /// Like `Self::insert`, but automatically infers the [`ComponentName`] layer. + #[inline] + pub fn insert_descriptor_arrow1( + &mut self, + component_desc: ComponentDescriptor, + list_array: ArrowListArray, + ) -> Option { + // TODO(cmc): revert me + let component_desc = component_desc.untagged(); + self.0 + .entry(component_desc.component_name) + .or_default() + .insert(component_desc, list_array.into()) + .map(|la| la.into()) + } + /// Like `Self::insert`, but automatically infers the [`ComponentName`] layer. #[inline] pub fn insert_descriptor( diff --git a/crates/top/re_sdk/src/recording_stream.rs b/crates/top/re_sdk/src/recording_stream.rs index 7551dc9cca40..54956ff35f97 100644 --- a/crates/top/re_sdk/src/recording_stream.rs +++ b/crates/top/re_sdk/src/recording_stream.rs @@ -1176,7 +1176,7 @@ impl RecordingStream { .into_iter() .map(|comp_batch| { comp_batch - .to_arrow2() + .to_arrow() .map(|array| (comp_batch.descriptor().into_owned(), array)) }) .collect(); diff --git a/crates/top/rerun_c/src/lib.rs b/crates/top/rerun_c/src/lib.rs index d091d9322904..e90100d2a0b6 100644 --- a/crates/top/rerun_c/src/lib.rs +++ b/crates/top/rerun_c/src/lib.rs @@ -820,7 +820,7 @@ fn rr_recording_stream_log_impl( let component_type = component_type_registry.get(*component_type)?; let datatype = component_type.datatype.clone(); let values = unsafe { arrow_array_from_c_ffi(array, datatype) }?; - components.insert(component_type.descriptor.clone(), values); + components.insert(component_type.descriptor.clone(), values.into()); } } diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index f75e7a04ee30..04c6ea735e03 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -89,7 +89,7 @@ pub fn build_row_from_components( let component_descr = descriptor_to_rust(&component_descr)?; let (list_array, _field) = array_to_rust(&array, &component_descr)?; - components.insert(component_descr, list_array); + components.insert(component_descr, list_array.into()); } Ok(PendingRow { From 55df03196331a5555e22bb7ae91fe7ab622990db Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 20:38:26 +0100 Subject: [PATCH 05/11] Update tests --- crates/store/re_chunk/src/batcher.rs | 70 ++++++++++++++-------------- crates/store/re_chunk/src/chunk.rs | 27 +++++++++++ 2 files changed, 61 insertions(+), 36 deletions(-) diff --git a/crates/store/re_chunk/src/batcher.rs b/crates/store/re_chunk/src/batcher.rs index 814616eabad8..75d682c34163 100644 --- a/crates/store/re_chunk/src/batcher.rs +++ b/crates/store/re_chunk/src/batcher.rs @@ -996,8 +996,6 @@ mod tests { use re_log_types::example_components::{MyPoint, MyPoint64}; use re_types_core::{Component as _, Loggable as _}; - use crate::arrow2_util; - use super::*; /// A bunch of rows that don't fit any of the split conditions should end up together. @@ -1011,9 +1009,9 @@ mod tests { let timepoint2 = TimePoint::default().with(timeline1, 43); let timepoint3 = TimePoint::default().with(timeline1, 44); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1063,7 +1061,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) .unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( @@ -1090,9 +1088,9 @@ mod tests { let timeless = TimePoint::default(); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1135,7 +1133,7 @@ mod tests { let expected_timelines = []; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) .unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( @@ -1166,9 +1164,9 @@ mod tests { let timepoint2 = TimePoint::default().with(timeline1, 43); let timepoint3 = TimePoint::default().with(timeline1, 44); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1219,7 +1217,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[0].id, @@ -1247,7 +1245,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, @@ -1282,9 +1280,9 @@ mod tests { .with(timeline1, 44) .with(timeline2, 1001); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1334,7 +1332,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[0].id, @@ -1372,7 +1370,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, @@ -1402,10 +1400,10 @@ mod tests { let timepoint2 = TimePoint::default().with(timeline1, 43); let timepoint3 = TimePoint::default().with(timeline1, 44); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; let points2 = - MyPoint64::to_arrow2([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + MyPoint64::to_arrow([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; // same name, different datatype @@ -1455,7 +1453,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[0].id, @@ -1483,7 +1481,7 @@ mod tests { )]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, @@ -1527,11 +1525,11 @@ mod tests { .with(timeline2, 1003) .with(timeline1, 45); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let points4 = - MyPoint::to_arrow2([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?; + MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1594,7 +1592,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt( + arrow_util::arrays_to_list_array_opt( &[&*points1, &*points2, &*points3, &*points4].map(Some), ) .unwrap(), @@ -1641,11 +1639,11 @@ mod tests { .with(timeline2, 1003) .with(timeline1, 45); - let points1 = MyPoint::to_arrow2([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; - let points2 = MyPoint::to_arrow2([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; - let points3 = MyPoint::to_arrow2([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?; + let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?; + let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; let points4 = - MyPoint::to_arrow2([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?; + MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?; let components1 = [(MyPoint::descriptor(), points1.clone())]; let components2 = [(MyPoint::descriptor(), points2.clone())]; @@ -1708,7 +1706,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) + arrow_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)) .unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( @@ -1747,7 +1745,7 @@ mod tests { ]; let expected_components = [( MyPoint::descriptor(), - arrow2_util::arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(), + arrow_util::arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(), )]; let expected_chunk = Chunk::from_native_row_ids( chunks[1].id, diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 11c8c6baad6c..017272592735 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -155,6 +155,33 @@ impl std::ops::DerefMut for ChunkComponents { } } +impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents { + #[inline] + fn from_iter>(iter: T) -> Self { + let mut this = Self::default(); + { + for (component_desc, list_array) in iter { + this.insert_descriptor(component_desc, list_array.into()); + } + } + this + } +} + +// TODO(cmc): Kinda disgusting but it makes our lives easier during the interim, as long as we're +// in this weird halfway in-between state where we still have a bunch of things indexed by name only. +impl FromIterator<(ComponentName, ArrowListArray)> for ChunkComponents { + #[inline] + fn from_iter>(iter: T) -> Self { + iter.into_iter() + .map(|(component_name, list_array)| { + let component_desc = ComponentDescriptor::new(component_name); + (component_desc, list_array) + }) + .collect() + } +} + impl FromIterator<(ComponentDescriptor, Arrow2ListArray)> for ChunkComponents { #[inline] fn from_iter)>>( From 32817ba7be48f3eff259a31f11fa9a1674bf9b63 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 20:41:27 +0100 Subject: [PATCH 06/11] Revert renamings in `arrow2_util` --- crates/store/re_chunk/src/arrow2_util.rs | 74 ++++++++++++------------ 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/crates/store/re_chunk/src/arrow2_util.rs b/crates/store/re_chunk/src/arrow2_util.rs index 279fe2a9d861..c8e64b143c03 100644 --- a/crates/store/re_chunk/src/arrow2_util.rs +++ b/crates/store/re_chunk/src/arrow2_util.rs @@ -1,12 +1,12 @@ use arrow2::{ array::{ Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, - DictionaryArray as Arrow2DictionaryArray, ListArray as Arrow2ListArray, + DictionaryArray as ArrowDictionaryArray, ListArray as ArrowListArray, PrimitiveArray as Arrow2PrimitiveArray, }, - bitmap::Bitmap as Arrow2Bitmap, + bitmap::Bitmap as ArrowBitmap, datatypes::DataType as Arrow2Datatype, - offset::Offsets as Arrow2Offsets, + offset::Offsets as ArrowOffsets, }; use itertools::Itertools; @@ -19,7 +19,7 @@ use crate::TransportChunk; /// Semantic emptiness is defined as either one of these: /// * The list is physically empty (literally no data). /// * The list only contains null entries, or empty arrays, or a mix of both. -pub fn is_list_array_semantically_empty(list_array: &Arrow2ListArray) -> bool { +pub fn is_list_array_semantically_empty(list_array: &ArrowListArray) -> bool { let is_physically_empty = || list_array.is_empty(); let is_all_nulls = || { @@ -44,7 +44,7 @@ pub fn is_list_array_semantically_empty(list_array: &Arrow2ListArray) -> bo #[inline] pub fn arrays_to_list_array_opt( arrays: &[Option<&dyn Arrow2Array>], -) -> Option> { +) -> Option> { let datatype = arrays .iter() .flatten() @@ -61,7 +61,7 @@ pub fn arrays_to_list_array_opt( pub fn arrays_to_list_array( array_datatype: Arrow2Datatype, arrays: &[Option<&dyn Arrow2Array>], -) -> Option> { +) -> Option> { let arrays_dense = arrays.iter().flatten().copied().collect_vec(); let data = if arrays_dense.is_empty() { @@ -76,10 +76,10 @@ pub fn arrays_to_list_array( .ok()? }; - let datatype = Arrow2ListArray::::default_datatype(array_datatype); + let datatype = ArrowListArray::::default_datatype(array_datatype); #[allow(clippy::unwrap_used)] // yes, these are indeed lengths - let offsets = Arrow2Offsets::try_from_lengths( + let offsets = ArrowOffsets::try_from_lengths( arrays .iter() .map(|array| array.map_or(0, |array| array.len())), @@ -87,9 +87,9 @@ pub fn arrays_to_list_array( .unwrap(); #[allow(clippy::from_iter_instead_of_collect)] - let validity = Arrow2Bitmap::from_iter(arrays.iter().map(Option::is_some)); + let validity = ArrowBitmap::from_iter(arrays.iter().map(Option::is_some)); - Some(Arrow2ListArray::::new( + Some(ArrowListArray::::new( datatype, offsets.into(), data, @@ -111,7 +111,7 @@ pub fn arrays_to_list_array( pub fn arrays_to_dictionary( array_datatype: &Arrow2Datatype, arrays: &[Option<(Idx, &dyn Arrow2Array)>], -) -> Option> { +) -> Option> { // Dedupe the input arrays based on the given primary key. let arrays_dense_deduped = arrays .iter() @@ -156,10 +156,10 @@ pub fn arrays_to_dictionary( #[allow(clippy::unwrap_used)] // yes, these are indeed lengths let offsets = - Arrow2Offsets::try_from_lengths(arrays_dense_deduped.iter().map(|array| array.len())) + ArrowOffsets::try_from_lengths(arrays_dense_deduped.iter().map(|array| array.len())) .unwrap(); - Arrow2ListArray::::new(array_datatype.clone(), offsets.into(), values, None).to_boxed() + ArrowListArray::::new(array_datatype.clone(), offsets.into(), values, None).to_boxed() }; let datatype = Arrow2Datatype::Dictionary( @@ -170,7 +170,7 @@ pub fn arrays_to_dictionary( // And finally we build our dictionary, which indexes into our concatenated list-array of // unique values. - Arrow2DictionaryArray::try_new( + ArrowDictionaryArray::try_new( datatype, Arrow2PrimitiveArray::::from(keys), data.to_boxed(), @@ -178,14 +178,14 @@ pub fn arrays_to_dictionary( .ok() } -/// Given a sparse `Arrow2ListArray` (i.e. an array with a validity bitmap that contains at least -/// one falsy value), returns a dense `Arrow2ListArray` that only contains the non-null values from +/// Given a sparse `ArrowListArray` (i.e. an array with a validity bitmap that contains at least +/// one falsy value), returns a dense `ArrowListArray` that only contains the non-null values from /// the original list. /// /// This is a no-op if the original array is already dense. pub fn sparse_list_array_to_dense_list_array( - list_array: &Arrow2ListArray, -) -> Arrow2ListArray { + list_array: &ArrowListArray, +) -> ArrowListArray { if list_array.is_empty() { return list_array.clone(); } @@ -199,10 +199,10 @@ pub fn sparse_list_array_to_dense_list_array( #[allow(clippy::unwrap_used)] // yes, these are indeed lengths let offsets = - Arrow2Offsets::try_from_lengths(list_array.iter().flatten().map(|array| array.len())) + ArrowOffsets::try_from_lengths(list_array.iter().flatten().map(|array| array.len())) .unwrap(); - Arrow2ListArray::::new( + ArrowListArray::::new( list_array.data_type().clone(), offsets.into(), list_array.values().clone(), @@ -214,9 +214,9 @@ pub fn sparse_list_array_to_dense_list_array( /// /// This will share the same child data array buffer, but will create new offset and validity buffers. pub fn pad_list_array_back( - list_array: &Arrow2ListArray, + list_array: &ArrowListArray, target_len: usize, -) -> Arrow2ListArray { +) -> ArrowListArray { let missing_len = target_len.saturating_sub(list_array.len()); if missing_len == 0 { return list_array.clone(); @@ -226,7 +226,7 @@ pub fn pad_list_array_back( let offsets = { #[allow(clippy::unwrap_used)] // yes, these are indeed lengths - Arrow2Offsets::try_from_lengths( + ArrowOffsets::try_from_lengths( list_array .iter() .map(|array| array.map_or(0, |array| array.len())) @@ -240,14 +240,14 @@ pub fn pad_list_array_back( let validity = { if let Some(validity) = list_array.validity() { #[allow(clippy::from_iter_instead_of_collect)] - Arrow2Bitmap::from_iter( + ArrowBitmap::from_iter( validity .iter() .chain(std::iter::repeat(false).take(missing_len)), ) } else { #[allow(clippy::from_iter_instead_of_collect)] - Arrow2Bitmap::from_iter( + ArrowBitmap::from_iter( std::iter::repeat(true) .take(list_array.len()) .chain(std::iter::repeat(false).take(missing_len)), @@ -255,16 +255,16 @@ pub fn pad_list_array_back( } }; - Arrow2ListArray::new(datatype, offsets.into(), values, Some(validity)) + ArrowListArray::new(datatype, offsets.into(), values, Some(validity)) } /// Create a new `ListArray` of target length by appending null values to its front. /// /// This will share the same child data array buffer, but will create new offset and validity buffers. pub fn pad_list_array_front( - list_array: &Arrow2ListArray, + list_array: &ArrowListArray, target_len: usize, -) -> Arrow2ListArray { +) -> ArrowListArray { let missing_len = target_len.saturating_sub(list_array.len()); if missing_len == 0 { return list_array.clone(); @@ -274,7 +274,7 @@ pub fn pad_list_array_front( let offsets = { #[allow(clippy::unwrap_used)] // yes, these are indeed lengths - Arrow2Offsets::try_from_lengths( + ArrowOffsets::try_from_lengths( std::iter::repeat(0).take(missing_len).chain( list_array .iter() @@ -289,14 +289,14 @@ pub fn pad_list_array_front( let validity = { if let Some(validity) = list_array.validity() { #[allow(clippy::from_iter_instead_of_collect)] - Arrow2Bitmap::from_iter( + ArrowBitmap::from_iter( std::iter::repeat(false) .take(missing_len) .chain(validity.iter()), ) } else { #[allow(clippy::from_iter_instead_of_collect)] - Arrow2Bitmap::from_iter( + ArrowBitmap::from_iter( std::iter::repeat(false) .take(missing_len) .chain(std::iter::repeat(true).take(list_array.len())), @@ -304,23 +304,23 @@ pub fn pad_list_array_front( } }; - Arrow2ListArray::new(datatype, offsets.into(), values, Some(validity)) + ArrowListArray::new(datatype, offsets.into(), values, Some(validity)) } -/// Returns a new [`Arrow2ListArray`] with len `entries`. +/// Returns a new [`ArrowListArray`] with len `entries`. /// /// Each entry will be an empty array of the given `child_datatype`. pub fn new_list_array_of_empties( child_datatype: Arrow2Datatype, len: usize, -) -> Arrow2ListArray { +) -> ArrowListArray { let empty_array = arrow2::array::new_empty_array(child_datatype); #[allow(clippy::unwrap_used)] // yes, these are indeed lengths - let offsets = Arrow2Offsets::try_from_lengths(std::iter::repeat(0).take(len)).unwrap(); + let offsets = ArrowOffsets::try_from_lengths(std::iter::repeat(0).take(len)).unwrap(); - Arrow2ListArray::::new( - Arrow2ListArray::::default_datatype(empty_array.data_type().clone()), + ArrowListArray::::new( + ArrowListArray::::default_datatype(empty_array.data_type().clone()), offsets.into(), empty_array.to_boxed(), None, From d98fe58fbb7db09d595fdff729c881474c60ec85 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 20:44:32 +0100 Subject: [PATCH 07/11] `insert_descriptor_arrow2` --- crates/store/re_chunk/src/batcher.rs | 9 +++------ crates/store/re_chunk/src/builder.rs | 4 ++-- crates/store/re_chunk/src/chunk.rs | 10 +++++----- crates/store/re_chunk/src/merge.rs | 2 +- crates/store/re_chunk/src/migration.rs | 2 +- crates/store/re_chunk/src/transport.rs | 2 +- 6 files changed, 13 insertions(+), 16 deletions(-) diff --git a/crates/store/re_chunk/src/batcher.rs b/crates/store/re_chunk/src/batcher.rs index 75d682c34163..9143124a9ad4 100644 --- a/crates/store/re_chunk/src/batcher.rs +++ b/crates/store/re_chunk/src/batcher.rs @@ -734,7 +734,7 @@ impl PendingRow { for (component_desc, array) in components { let list_array = arrow_util::arrays_to_list_array_opt(&[Some(&*array as _)]); if let Some(list_array) = list_array { - per_name.insert_descriptor_arrow1(component_desc, list_array); + per_name.insert_descriptor(component_desc, list_array); } } @@ -870,10 +870,7 @@ impl PendingRow { let list_array = arrow_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { - per_name.insert_descriptor_arrow1( - component_desc, - list_array, - ); + per_name.insert_descriptor(component_desc, list_array); } } per_name @@ -918,7 +915,7 @@ impl PendingRow { for (component_desc, arrays) in components { let list_array = arrow_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { - per_name.insert_descriptor_arrow1(component_desc, list_array); + per_name.insert_descriptor(component_desc, list_array); } } per_name diff --git a/crates/store/re_chunk/src/builder.rs b/crates/store/re_chunk/src/builder.rs index 2dcdbcaf0f0b..f0dc99b1cd0e 100644 --- a/crates/store/re_chunk/src/builder.rs +++ b/crates/store/re_chunk/src/builder.rs @@ -240,7 +240,7 @@ impl ChunkBuilder { .map(|list_array| (component_desc, list_array)) }) { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor_arrow2(component_desc, list_array); } per_name }; @@ -303,7 +303,7 @@ impl ChunkBuilder { } }) { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor_arrow2(component_desc, list_array); } per_name }, diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 017272592735..c5fad08f5bef 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -63,7 +63,7 @@ pub struct ChunkComponents( impl ChunkComponents { /// Like `Self::insert`, but automatically infers the [`ComponentName`] layer. #[inline] - pub fn insert_descriptor_arrow1( + pub fn insert_descriptor( &mut self, component_desc: ComponentDescriptor, list_array: ArrowListArray, @@ -79,7 +79,7 @@ impl ChunkComponents { /// Like `Self::insert`, but automatically infers the [`ComponentName`] layer. #[inline] - pub fn insert_descriptor( + pub fn insert_descriptor_arrow2( &mut self, component_desc: ComponentDescriptor, list_array: Arrow2ListArray, @@ -161,7 +161,7 @@ impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents { let mut this = Self::default(); { for (component_desc, list_array) in iter { - this.insert_descriptor(component_desc, list_array.into()); + this.insert_descriptor_arrow2(component_desc, list_array.into()); } } this @@ -190,7 +190,7 @@ impl FromIterator<(ComponentDescriptor, Arrow2ListArray)> for ChunkComponen let mut this = Self::default(); { for (component_desc, list_array) in iter { - this.insert_descriptor(component_desc, list_array); + this.insert_descriptor_arrow2(component_desc, list_array); } } this @@ -959,7 +959,7 @@ impl Chunk { list_array: Arrow2ListArray, ) -> ChunkResult<()> { self.components - .insert_descriptor(component_desc, list_array); + .insert_descriptor_arrow2(component_desc, list_array); self.sanity_check() } diff --git a/crates/store/re_chunk/src/merge.rs b/crates/store/re_chunk/src/merge.rs index cd8ef80f32d2..b101e720b07b 100644 --- a/crates/store/re_chunk/src/merge.rs +++ b/crates/store/re_chunk/src/merge.rs @@ -173,7 +173,7 @@ impl Chunk { let components = { let mut per_name = ChunkComponents::default(); for (component_desc, list_array) in components { - per_name.insert_descriptor(component_desc.clone(), list_array); + per_name.insert_descriptor_arrow2(component_desc.clone(), list_array); } per_name }; diff --git a/crates/store/re_chunk/src/migration.rs b/crates/store/re_chunk/src/migration.rs index 165702c92916..fe50b9a53e20 100644 --- a/crates/store/re_chunk/src/migration.rs +++ b/crates/store/re_chunk/src/migration.rs @@ -88,7 +88,7 @@ impl Chunk { } for (desc, list_array) in components_patched { - chunk.components.insert_descriptor(desc, list_array); + chunk.components.insert_descriptor_arrow2(desc, list_array); } chunk diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 96e1b55e1c6d..10946bb12a9d 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -651,7 +651,7 @@ impl Chunk { let component_desc = TransportChunk::component_descriptor_from_field(field); if components - .insert_descriptor(component_desc, column.clone()) + .insert_descriptor_arrow2(component_desc, column.clone()) .is_some() { return Err(ChunkError::Malformed { From 78f676f94dc6a5488b0ef55d23856f583ac428db Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 20:50:29 +0100 Subject: [PATCH 08/11] Remove `insert_descriptor_arrow2` again --- crates/store/re_chunk/src/builder.rs | 4 ++-- crates/store/re_chunk/src/chunk.rs | 21 +++------------------ crates/store/re_chunk/src/merge.rs | 2 +- crates/store/re_chunk/src/migration.rs | 2 +- crates/store/re_chunk/src/transport.rs | 2 +- 5 files changed, 8 insertions(+), 23 deletions(-) diff --git a/crates/store/re_chunk/src/builder.rs b/crates/store/re_chunk/src/builder.rs index f0dc99b1cd0e..2c7ae1dde8d9 100644 --- a/crates/store/re_chunk/src/builder.rs +++ b/crates/store/re_chunk/src/builder.rs @@ -240,7 +240,7 @@ impl ChunkBuilder { .map(|list_array| (component_desc, list_array)) }) { - per_name.insert_descriptor_arrow2(component_desc, list_array); + per_name.insert_descriptor(component_desc, list_array.into()); } per_name }; @@ -303,7 +303,7 @@ impl ChunkBuilder { } }) { - per_name.insert_descriptor_arrow2(component_desc, list_array); + per_name.insert_descriptor(component_desc, list_array.into()); } per_name }, diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index c5fad08f5bef..7380f9ae2810 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -77,21 +77,6 @@ impl ChunkComponents { .map(|la| la.into()) } - /// Like `Self::insert`, but automatically infers the [`ComponentName`] layer. - #[inline] - pub fn insert_descriptor_arrow2( - &mut self, - component_desc: ComponentDescriptor, - list_array: Arrow2ListArray, - ) -> Option> { - // TODO(cmc): revert me - let component_desc = component_desc.untagged(); - self.0 - .entry(component_desc.component_name) - .or_default() - .insert(component_desc, list_array) - } - /// Returns all list arrays for the given component name. /// /// I.e semantically equivalent to `get("MyComponent:*.*")` @@ -161,7 +146,7 @@ impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents { let mut this = Self::default(); { for (component_desc, list_array) in iter { - this.insert_descriptor_arrow2(component_desc, list_array.into()); + this.insert_descriptor(component_desc, list_array); } } this @@ -190,7 +175,7 @@ impl FromIterator<(ComponentDescriptor, Arrow2ListArray)> for ChunkComponen let mut this = Self::default(); { for (component_desc, list_array) in iter { - this.insert_descriptor_arrow2(component_desc, list_array); + this.insert_descriptor(component_desc, list_array.into()); } } this @@ -959,7 +944,7 @@ impl Chunk { list_array: Arrow2ListArray, ) -> ChunkResult<()> { self.components - .insert_descriptor_arrow2(component_desc, list_array); + .insert_descriptor(component_desc, list_array.into()); self.sanity_check() } diff --git a/crates/store/re_chunk/src/merge.rs b/crates/store/re_chunk/src/merge.rs index b101e720b07b..a804ede244fb 100644 --- a/crates/store/re_chunk/src/merge.rs +++ b/crates/store/re_chunk/src/merge.rs @@ -173,7 +173,7 @@ impl Chunk { let components = { let mut per_name = ChunkComponents::default(); for (component_desc, list_array) in components { - per_name.insert_descriptor_arrow2(component_desc.clone(), list_array); + per_name.insert_descriptor(component_desc.clone(), list_array.into()); } per_name }; diff --git a/crates/store/re_chunk/src/migration.rs b/crates/store/re_chunk/src/migration.rs index fe50b9a53e20..d641fc5f3713 100644 --- a/crates/store/re_chunk/src/migration.rs +++ b/crates/store/re_chunk/src/migration.rs @@ -88,7 +88,7 @@ impl Chunk { } for (desc, list_array) in components_patched { - chunk.components.insert_descriptor_arrow2(desc, list_array); + chunk.components.insert_descriptor(desc, list_array.into()); } chunk diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 10946bb12a9d..0a2700d78c3c 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -651,7 +651,7 @@ impl Chunk { let component_desc = TransportChunk::component_descriptor_from_field(field); if components - .insert_descriptor_arrow2(component_desc, column.clone()) + .insert_descriptor(component_desc, column.clone().into()) .is_some() { return Err(ChunkError::Malformed { From 4be004c05a09a3dac603a202d8822b72ed46b742 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 20:54:53 +0100 Subject: [PATCH 09/11] Update list of disallowed methods --- clippy.toml | 16 ++++++++++------ crates/store/re_chunk/src/arrow_util.rs | 2 ++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/clippy.toml b/clippy.toml index ec39e9871d72..bec822eb6e8f 100644 --- a/clippy.toml +++ b/clippy.toml @@ -50,13 +50,17 @@ disallowed-methods = [ { path = "std::panic::catch_unwind", reason = "We compile with `panic = 'abort'`" }, { path = "std::thread::spawn", reason = "Use `std::thread::Builder` and name the thread" }, + { path = "arrow::compute::concat", reason = "Use `re_chunk::arrow_util::concat_arrays` instead, which has better memory management" }, + { path = "arrow::compute::filter", reason = "Use `re_chunk::arrow_util::filter_array` instead" }, + { path = "arrow::compute::take", reason = "Use `re_chunk::arrow_util::take_array` instead" }, + # Specify both `arrow2` and `re_arrow2` -- clippy gets lost in all the package renaming happening. - { path = "arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::util::concat_arrays` instead, which has proper early outs" }, - { path = "arrow2::compute::filter::filter", reason = "Use `re_chunk::util::filter_array` instead, which has proper early outs" }, - { path = "arrow2::compute::take::take", reason = "Use `re_chunk::util::take_array` instead, which has proper early outs" }, - { path = "re_arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::util::concat_arrays` instead, which has proper early outs" }, - { path = "re_arrow2::compute::filter::filter", reason = "Use `re_chunk::util::filter_array` instead, which has proper early outs" }, - { path = "re_arrow2::compute::take::take", reason = "Use `re_chunk::util::take_array` instead, which has proper early outs" }, + { path = "arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::arrow2_util::concat_arrays` instead, which has proper early outs" }, + { path = "arrow2::compute::filter::filter", reason = "Use `re_chunk::arrow2_util::filter_array` instead, which has proper early outs" }, + { path = "arrow2::compute::take::take", reason = "Use `re_chunk::arrow2_util::take_array` instead, which has proper early outs" }, + { path = "re_arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::arrow2_util::concat_arrays` instead, which has proper early outs" }, + { path = "re_arrow2::compute::filter::filter", reason = "Use `re_chunk::arrow2_util::filter_array` instead, which has proper early outs" }, + { path = "re_arrow2::compute::take::take", reason = "Use `re_chunk::arrow2_util::take_array` instead, which has proper early outs" }, # There are many things that aren't allowed on wasm, # but we cannot disable them all here (because of e.g. https://github.com/rust-lang/rust-clippy/issues/10406) diff --git a/crates/store/re_chunk/src/arrow_util.rs b/crates/store/re_chunk/src/arrow_util.rs index 856b7dc3ce2c..46d4dca005eb 100644 --- a/crates/store/re_chunk/src/arrow_util.rs +++ b/crates/store/re_chunk/src/arrow_util.rs @@ -216,7 +216,9 @@ pub fn new_list_array_of_empties(child_datatype: &DataType, len: usize) -> ListA /// /// Returns an error if the arrays don't share the exact same datatype. pub fn concat_arrays(arrays: &[&dyn Array]) -> arrow::error::Result { + #[allow(clippy::disallowed_methods)] // that's the whole point arrow::compute::concat(arrays) + // TODO: call .shrink_to_fit on the result } /// Applies a [filter] kernel to the given `array`. From d056aa0da3f840268619b9a1174e4b4241c12c92 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Thu, 9 Jan 2025 10:25:54 +0100 Subject: [PATCH 10/11] Use `main` `re_arrow2` --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/store/re_chunk/src/arrow_util.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71ba376ac16e..0a0653a26c6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5543,7 +5543,7 @@ dependencies = [ [[package]] name = "re_arrow2" version = "0.18.1" -source = "git+https://github.com/rerun-io/re_arrow2.git?branch=emilk/more-arrow-compatibility#0e4b3dd7cd73426b1209ebe0323087452a7c8b91" +source = "git+https://github.com/rerun-io/re_arrow2.git?branch=main#c762f392ded92c5978fcdc2195988587a4976417" dependencies = [ "ahash", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 35b70f963616..7d4e631b96a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -588,4 +588,4 @@ egui_commonmark = { git = "https://github.com/rerun-io/egui_commonmark.git", bra # dav1d = { path = "/home/cmc/dev/rerun-io/rav1d", package = "re_rav1d", version = "0.1.1" } -re_arrow2 = { git = "https://github.com/rerun-io/re_arrow2.git", branch = "emilk/more-arrow-compatibility" } # TODO : point to main branmch +re_arrow2 = { git = "https://github.com/rerun-io/re_arrow2.git", branch = "main" } diff --git a/crates/store/re_chunk/src/arrow_util.rs b/crates/store/re_chunk/src/arrow_util.rs index 46d4dca005eb..15ed476e7a51 100644 --- a/crates/store/re_chunk/src/arrow_util.rs +++ b/crates/store/re_chunk/src/arrow_util.rs @@ -218,7 +218,7 @@ pub fn new_list_array_of_empties(child_datatype: &DataType, len: usize) -> ListA pub fn concat_arrays(arrays: &[&dyn Array]) -> arrow::error::Result { #[allow(clippy::disallowed_methods)] // that's the whole point arrow::compute::concat(arrays) - // TODO: call .shrink_to_fit on the result + // TODO(#3741): call .shrink_to_fit on the result } /// Applies a [filter] kernel to the given `array`. From 21d76fd4f9e974ac13259f2a4bad831160e899e7 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Thu, 9 Jan 2025 10:39:25 +0100 Subject: [PATCH 11/11] Update some tests --- .../store/re_chunk_store/tests/memory_test.rs | 2 +- crates/top/re_sdk/src/recording_stream.rs | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/store/re_chunk_store/tests/memory_test.rs b/crates/store/re_chunk_store/tests/memory_test.rs index cbbcdfa1b0f9..9af9d5c04ca6 100644 --- a/crates/store/re_chunk_store/tests/memory_test.rs +++ b/crates/store/re_chunk_store/tests/memory_test.rs @@ -100,7 +100,7 @@ fn scalar_memory_overhead() { let entity_path = re_log_types::entity_path!("scalar"); let timepoint = TimePoint::default().with(Timeline::new("log_time", TimeType::Time), i as i64); - let scalars = Scalar::to_arrow2([Scalar::from(i as f64)]).unwrap(); + let scalars = Scalar::to_arrow([Scalar::from(i as f64)]).unwrap(); let row = PendingRow::new( timepoint, diff --git a/crates/top/re_sdk/src/recording_stream.rs b/crates/top/re_sdk/src/recording_stream.rs index 54956ff35f97..bfe2fb0fc3de 100644 --- a/crates/top/re_sdk/src/recording_stream.rs +++ b/crates/top/re_sdk/src/recording_stream.rs @@ -2556,7 +2556,7 @@ mod tests { components: [ ( MyPoint::descriptor(), - ::to_arrow2([ + ::to_arrow([ MyPoint::new(10.0, 10.0), MyPoint::new(20.0, 20.0), ]) @@ -2564,11 +2564,11 @@ mod tests { ), // ( MyColor::descriptor(), - ::to_arrow2([MyColor(0x8080_80FF)]).unwrap(), + ::to_arrow([MyColor(0x8080_80FF)]).unwrap(), ), // ( MyLabel::descriptor(), - ::to_arrow2([] as [MyLabel; 0]).unwrap(), + ::to_arrow([] as [MyLabel; 0]).unwrap(), ), // ] .into_iter() @@ -2583,15 +2583,15 @@ mod tests { components: [ ( MyPoint::descriptor(), - ::to_arrow2([] as [MyPoint; 0]).unwrap(), + ::to_arrow([] as [MyPoint; 0]).unwrap(), ), // ( MyColor::descriptor(), - ::to_arrow2([] as [MyColor; 0]).unwrap(), + ::to_arrow([] as [MyColor; 0]).unwrap(), ), // ( MyLabel::descriptor(), - ::to_arrow2([] as [MyLabel; 0]).unwrap(), + ::to_arrow([] as [MyLabel; 0]).unwrap(), ), // ] .into_iter() @@ -2606,15 +2606,15 @@ mod tests { components: [ ( MyPoint::descriptor(), - ::to_arrow2([] as [MyPoint; 0]).unwrap(), + ::to_arrow([] as [MyPoint; 0]).unwrap(), ), // ( MyColor::descriptor(), - ::to_arrow2([MyColor(0xFFFF_FFFF)]).unwrap(), + ::to_arrow([MyColor(0xFFFF_FFFF)]).unwrap(), ), // ( MyLabel::descriptor(), - ::to_arrow2([MyLabel("hey".into())]).unwrap(), + ::to_arrow([MyLabel("hey".into())]).unwrap(), ), // ] .into_iter()