diff --git a/Cargo.lock b/Cargo.lock index 8cb377b1e19fb..6ade1fe6ace97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5830,6 +5830,7 @@ dependencies = [ "re_format_arrow", "re_log", "re_log_types", + "re_sorbet", "re_tracing", "re_tuid", "re_types_core", @@ -6587,6 +6588,7 @@ name = "re_sorbet" version = "0.23.0-alpha.1+dev" dependencies = [ "arrow", + "itertools 0.13.0", "re_format_arrow", "re_log", "re_log_types", diff --git a/crates/store/re_chunk/Cargo.toml b/crates/store/re_chunk/Cargo.toml index e8b617cac5fe7..cc4c3873e0c4d 100644 --- a/crates/store/re_chunk/Cargo.toml +++ b/crates/store/re_chunk/Cargo.toml @@ -32,10 +32,11 @@ serde = ["re_log_types/serde", "re_tuid/serde", "re_types_core/serde"] re_arrow_util.workspace = true re_byte_size.workspace = true re_error.workspace = true -re_format.workspace = true re_format_arrow.workspace = true -re_log.workspace = true +re_format.workspace = true re_log_types.workspace = true +re_log.workspace = true +re_sorbet.workspace = true re_tracing.workspace = true re_tuid.workspace = true re_types_core.workspace = true diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 436914b460421..7d63e7e39f58d 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -45,6 +45,15 @@ pub enum ChunkError { #[error("Deserialization: {0}")] Deserialization(#[from] DeserializationError), + + #[error(transparent)] + UnsupportedTimeType(#[from] re_sorbet::UnsupportedTimeType), + + #[error(transparent)] + WrongDatatypeError(#[from] re_sorbet::WrongDatatypeError), + + #[error(transparent)] + MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError), } pub type ChunkResult = Result; diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 19cd375a72e3b..8669e86d64ba8 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -390,6 +390,116 @@ impl TransportChunk { } } +impl Chunk { + /// Prepare the [`Chunk`] for transport. + /// + /// It is probably a good idea to sort the chunk first. + pub fn to_chunk_batch(&self) -> ChunkResult { + self.sanity_check()?; + + re_tracing::profile_function!(format!( + "num_columns={} num_rows={}", + self.num_columns(), + self.num_rows() + )); + + let heap_size_bytes = self.heap_size_bytes(); + let Self { + id, + entity_path, + heap_size_bytes: _, // use the method instead because of lazy initialization + is_sorted, + row_ids, + timelines, + components, + } = self; + + let row_id_schema = re_sorbet::RowIdColumnDescriptor::try_from(RowId::arrow_datatype())?; + + let (index_schemas, index_arrays): (Vec<_>, Vec<_>) = { + re_tracing::profile_scope!("timelines"); + + let mut timelines = timelines + .iter() + .map(|(timeline, info)| { + let TimeColumn { + timeline: _, + times: _, + is_sorted, + time_range: _, + } = info; + + let array = info.times_array(); + let schema = re_sorbet::TimeColumnDescriptor { + timeline: *timeline, + datatype: array.data_type().clone(), + is_sorted: *is_sorted, + }; + + (schema, into_arrow_ref(array)) + }) + .collect_vec(); + + timelines.sort_by(|(schema_a, _), (schema_b, _)| schema_a.cmp(schema_b)); + + timelines.into_iter().unzip() + }; + + let (data_schemas, data_arrays): (Vec<_>, Vec<_>) = { + re_tracing::profile_scope!("components"); + + let mut components = components + .values() + .flat_map(|per_desc| per_desc.iter()) + .map(|(component_desc, list_array)| { + let list_array = ArrowListArray::from(list_array.clone()); + let ComponentDescriptor { + archetype_name, + archetype_field_name, + component_name, + } = *component_desc; + + let schema = re_sorbet::ComponentColumnDescriptor { + store_datatype: list_array.data_type().clone(), + entity_path: entity_path.clone(), + + archetype_name, + archetype_field_name, + component_name, + + is_static: false, // TODO + is_indicator: false, // TODO + is_tombstone: false, // TODO + is_semantically_empty: false, // TODO + }; + (schema, into_arrow_ref(list_array)) + }) + .collect_vec(); + + components.sort_by(|(schema_a, _), (schema_b, _)| schema_a.cmp(schema_b)); + + components.into_iter().unzip() + }; + + let schema = re_sorbet::ChunkSchema::new( + *id, + entity_path.clone(), + row_id_schema, + index_schemas, + data_schemas, + ) + .with_heap_size_bytes(heap_size_bytes) + .with_sorted(*is_sorted); + + Ok(re_sorbet::ChunkBatch::try_new( + schema, + into_arrow_ref(row_ids.clone()), + index_arrays, + data_arrays, + )?) + } +} + impl Chunk { /// Prepare the [`Chunk`] for transport. /// @@ -735,7 +845,7 @@ mod tests { let entity_path = EntityPath::parse_forgiving("a/b/c"); let timeline1 = Timeline::new_temporal("log_time"); - let timelines1 = std::iter::once(( + let timelines1: IntMap<_, _> = std::iter::once(( timeline1, TimeColumn::new(Some(true), timeline1, vec![42, 43, 44, 45].into()), )) @@ -788,7 +898,7 @@ mod tests { let row_ids = vec![RowId::new(), RowId::new(), RowId::new(), RowId::new()]; - for timelines in [timelines1, timelines2] { + for timelines in [timelines1.clone(), timelines2.clone()] { let chunk_original = Chunk::from_native_row_ids( ChunkId::new(), entity_path.clone(), @@ -860,6 +970,35 @@ mod tests { } } + for timelines in [timelines1, timelines2] { + let chunk_before = Chunk::from_native_row_ids( + ChunkId::new(), + entity_path.clone(), + None, + &row_ids, + timelines.clone(), + components.clone().into_iter().collect(), + ) + .unwrap(); + + let chunk_batch = chunk_before.to_chunk_batch().unwrap(); + + assert_eq!(chunk_before.num_columns(), chunk_batch.num_columns()); + assert_eq!(chunk_before.num_rows(), chunk_batch.num_rows()); + + let chunk_after = Chunk::from_record_batch(chunk_batch.into()).unwrap(); + + assert_eq!(chunk_before.entity_path(), chunk_after.entity_path()); + assert_eq!( + chunk_before.heap_size_bytes(), + chunk_after.heap_size_bytes(), + ); + assert_eq!(chunk_before.num_columns(), chunk_after.num_columns()); + assert_eq!(chunk_before.num_rows(), chunk_after.num_rows()); + assert!(chunk_before.are_equal(&chunk_after)); + assert_eq!(chunk_before, chunk_after); + } + Ok(()) } } diff --git a/crates/store/re_sorbet/Cargo.toml b/crates/store/re_sorbet/Cargo.toml index 42d50270c9151..929fcfa20188d 100644 --- a/crates/store/re_sorbet/Cargo.toml +++ b/crates/store/re_sorbet/Cargo.toml @@ -27,6 +27,7 @@ re_tuid.workspace = true re_types_core.workspace = true arrow.workspace = true +itertools.workspace = true thiserror.workspace = true # Keep this crate simple, with few dependencies. diff --git a/crates/store/re_sorbet/src/chunk_batch.rs b/crates/store/re_sorbet/src/chunk_batch.rs index 24fd116b53641..e6cc158a6fad7 100644 --- a/crates/store/re_sorbet/src/chunk_batch.rs +++ b/crates/store/re_sorbet/src/chunk_batch.rs @@ -1,12 +1,29 @@ use arrow::{ - array::RecordBatch as ArrowRecordBatch, + array::{ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch, RecordBatchOptions}, datatypes::{Fields as ArrowFields, Schema as ArrowSchema}, }; use re_log_types::EntityPath; use re_types_core::ChunkId; -use crate::{chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema}; +use crate::{ + chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema, WrongDatatypeError, +}; + +#[derive(thiserror::Error, Debug)] +pub enum MismatchedChunkSchemaError { + #[error("{0}")] + Custom(String), + + #[error(transparent)] + WrongDatatypeError(#[from] WrongDatatypeError), +} + +impl MismatchedChunkSchemaError { + pub fn custom(s: impl Into) -> Self { + Self::Custom(s.into()) + } +} /// The arrow [`ArrowRecordBatch`] representation of a Rerun chunk. /// @@ -17,9 +34,76 @@ use crate::{chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema}; #[derive(Debug, Clone)] pub struct ChunkBatch { schema: ChunkSchema, + + // TODO: should we store a record batch here, or just the parsed columns? batch: ArrowRecordBatch, } +impl ChunkBatch { + pub fn try_new( + schema: ChunkSchema, + row_ids: ArrowArrayRef, + index_arrays: Vec, + data_arrays: Vec, + ) -> Result { + let row_count = row_ids.len(); + + WrongDatatypeError::compare_expected_actual( + &schema.row_id_column.datatype(), + row_ids.data_type(), + )?; + + if index_arrays.len() != schema.index_columns.len() { + return Err(MismatchedChunkSchemaError::custom(format!( + "Schema had {} index columns, but got {}", + schema.index_columns.len(), + index_arrays.len() + ))); + } + for (schema, array) in itertools::izip!(&schema.index_columns, &index_arrays) { + WrongDatatypeError::compare_expected_actual(schema.datatype(), array.data_type())?; + if array.len() != row_count { + return Err(MismatchedChunkSchemaError::custom(format!( + "Index column {:?} had {} rows, but we got {} row IDs", + schema.name(), + array.len(), + row_count + ))); + } + } + + if data_arrays.len() != schema.data_columns.len() { + return Err(MismatchedChunkSchemaError::custom(format!( + "Schema had {} data columns, but got {}", + schema.data_columns.len(), + data_arrays.len() + ))); + } + for (schema, array) in itertools::izip!(&schema.data_columns, &data_arrays) { + WrongDatatypeError::compare_expected_actual(&schema.store_datatype, array.data_type())?; + if array.len() != row_count { + return Err(MismatchedChunkSchemaError::custom(format!( + "Data column {:?} had {} rows, but we got {} row IDs", + schema.column_name(), + array.len(), + row_count + ))); + } + } + + let arrow_columns = itertools::chain!(Some(row_ids), index_arrays, data_arrays).collect(); + + let batch = ArrowRecordBatch::try_new_with_options( + std::sync::Arc::new(ArrowSchema::from(&schema)), + arrow_columns, + &RecordBatchOptions::default().with_row_count(Some(row_count)), + ) + .unwrap(); + + Ok(Self { schema, batch }) + } +} + impl ChunkBatch { /// The parsed rerun schema of this chunk. #[inline] diff --git a/crates/store/re_sorbet/src/chunk_schema.rs b/crates/store/re_sorbet/src/chunk_schema.rs index a10b40dcb6cd6..b80784621244a 100644 --- a/crates/store/re_sorbet/src/chunk_schema.rs +++ b/crates/store/re_sorbet/src/chunk_schema.rs @@ -1,11 +1,12 @@ use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema}; +use itertools::Itertools as _; use re_log_types::EntityPath; use re_types_core::ChunkId; use crate::{ - ArrowBatchMetadata, ComponentColumnDescriptor, MetadataExt as _, MissingFieldMetadata, - MissingMetadataKey, RowIdColumnDescriptor, WrongDatatypeError, + ArrowBatchMetadata, ColumnDescriptor, ColumnError, ComponentColumnDescriptor, MetadataExt as _, + MissingMetadataKey, RowIdColumnDescriptor, TimeColumnDescriptor, WrongDatatypeError, }; #[derive(thiserror::Error, Debug)] @@ -19,11 +20,14 @@ pub enum InvalidChunkSchema { #[error("Bad column '{field_name}': {error}")] BadColumn { field_name: String, - error: MissingFieldMetadata, + error: ColumnError, }, #[error("Bad chunk schema: {reason}")] Custom { reason: String }, + + #[error("The data columns were not the last columns. Index columns must come before any data columns.")] + UnorderedIndexAndDataColumns, } impl InvalidChunkSchema { @@ -41,22 +45,25 @@ impl InvalidChunkSchema { #[derive(Debug, Clone)] pub struct ChunkSchema { /// The globally unique ID of this chunk. - chunk_id: ChunkId, + pub chunk_id: ChunkId, /// Which entity is this chunk for? - entity_path: EntityPath, + pub entity_path: EntityPath, /// The heap size of this chunk in bytes, if known. - heap_size_bytes: Option, + pub heap_size_bytes: Option, /// Are we sorted by the row id column? - is_sorted: bool, + pub is_sorted: bool, /// The primary row id column. - row_id_column: RowIdColumnDescriptor, + pub row_id_column: RowIdColumnDescriptor, + + /// Index columns (timelines). + pub index_columns: Vec, - /// All other columns (indices and data). - columns: Vec, + /// The actual component data + pub data_columns: Vec, } /// ## Metadata keys for the record batch metadata @@ -68,35 +75,72 @@ impl ChunkSchema { const CHUNK_METADATA_VERSION: &'static str = "1"; } +/// ## Builders impl ChunkSchema { - /// The globally unique ID of this chunk. + pub fn new( + chunk_id: ChunkId, + entity_path: EntityPath, + row_id_column: RowIdColumnDescriptor, + index_columns: Vec, + data_columns: Vec, + ) -> Self { + Self { + chunk_id, + entity_path, + heap_size_bytes: None, + is_sorted: false, // assume the worst + row_id_column, + index_columns, + data_columns, + } + } + + #[inline] + pub fn with_heap_size_bytes(mut self, heap_size_bytes: u64) -> Self { + self.heap_size_bytes = Some(heap_size_bytes); + self + } + + #[inline] + pub fn with_sorted(mut self, sorted: bool) -> Self { + self.is_sorted = sorted; + self + } +} +/// ## Accessors +impl ChunkSchema { + /// The globally unique ID of this chunk. #[inline] pub fn chunk_id(&self) -> ChunkId { self.chunk_id } /// Which entity is this chunk for? - #[inline] pub fn entity_path(&self) -> &EntityPath { &self.entity_path } /// The heap size of this chunk in bytes, if known. - #[inline] pub fn heap_size_bytes(&self) -> Option { self.heap_size_bytes } /// Are we sorted by the row id column? - #[inline] pub fn is_sorted(&self) -> bool { self.is_sorted } + /// Total number of columns in this chunk, + /// including the row id column, the index columns, + /// and the data columns. + pub fn num_columns(&self) -> usize { + 1 + self.index_columns.len() + self.data_columns.len() + } + pub fn arrow_batch_metadata(&self) -> ArrowBatchMetadata { let Self { chunk_id, @@ -104,7 +148,8 @@ impl ChunkSchema { heap_size_bytes, is_sorted, row_id_column: _, - columns: _, + index_columns: _, + data_columns: _, } = self; let mut arrow_metadata = ArrowBatchMetadata::from([ @@ -129,18 +174,22 @@ impl ChunkSchema { } } -impl From for ArrowSchema { - fn from(chunk_schema: ChunkSchema) -> Self { +impl From<&ChunkSchema> for ArrowSchema { + fn from(chunk_schema: &ChunkSchema) -> Self { let metadata = chunk_schema.arrow_batch_metadata(); + let num_columns = chunk_schema.num_columns(); let ChunkSchema { row_id_column, - columns, + index_columns, + data_columns, .. } = chunk_schema; - let mut fields: Vec = Vec::with_capacity(1 + columns.len()); + + let mut fields: Vec = Vec::with_capacity(num_columns); fields.push(row_id_column.to_arrow_field()); - fields.extend(columns.iter().map(|column| column.to_arrow_field())); + fields.extend(index_columns.iter().map(|column| column.to_arrow_field())); + fields.extend(data_columns.iter().map(|column| column.to_arrow_field())); Self { metadata, @@ -195,7 +244,7 @@ impl TryFrom<&ArrowSchema> for ChunkSchema { .iter() .skip(1) .map(|field| { - ComponentColumnDescriptor::try_from(field.as_ref()).map_err(|err| { + ColumnDescriptor::try_from(field.as_ref()).map_err(|err| { InvalidChunkSchema::BadColumn { field_name: field.name().to_owned(), error: err, @@ -205,13 +254,36 @@ impl TryFrom<&ArrowSchema> for ChunkSchema { .collect(); let columns = columns?; + // Index columns should always come first: + let num_index_columns = columns.partition_point(|p| matches!(p, ColumnDescriptor::Time(_))); + + let index_columns = columns[0..num_index_columns] + .iter() + .filter_map(|c| match c { + ColumnDescriptor::Time(column) => Some(column.clone()), + ColumnDescriptor::Component(_) => None, + }) + .collect_vec(); + let data_columns = columns[0..num_index_columns] + .iter() + .filter_map(|c| match c { + ColumnDescriptor::Time(_) => None, + ColumnDescriptor::Component(column) => Some(column.clone()), + }) + .collect_vec(); + + if index_columns.len() + data_columns.len() < columns.len() { + return Err(InvalidChunkSchema::UnorderedIndexAndDataColumns); + } + Ok(Self { chunk_id, entity_path, heap_size_bytes, is_sorted, row_id_column, - columns, + index_columns, + data_columns, }) } } diff --git a/crates/store/re_sorbet/src/data_column_schema.rs b/crates/store/re_sorbet/src/data_column_schema.rs index 3c24c9c614948..26659ec158f66 100644 --- a/crates/store/re_sorbet/src/data_column_schema.rs +++ b/crates/store/re_sorbet/src/data_column_schema.rs @@ -196,8 +196,7 @@ impl ComponentColumnDescriptor { self.store_datatype.clone() } - #[inline] - pub fn to_arrow_field(&self) -> ArrowField { + pub fn column_name(&self) -> String { let entity_path = &self.entity_path; let descriptor = ComponentDescriptor { archetype_name: self.archetype_name, @@ -205,16 +204,19 @@ impl ComponentColumnDescriptor { component_name: self.component_name, }; - ArrowField::new( - // NOTE: Uncomment this to expose fully-qualified names in the Dataframe APIs! - // I'm not doing that right now, to avoid breaking changes (and we need to talk about - // what the syntax for these fully-qualified paths need to look like first). - format!("{}:{}", entity_path, descriptor.component_name.short_name()), - // format!("{entity_path}@{}", descriptor.short_name()), - self.returned_datatype(), - true, /* nullable */ - ) - .with_metadata(self.metadata()) + format!("{}:{}", entity_path, descriptor.component_name.short_name()) + + // NOTE: Uncomment this to expose fully-qualified names in the Dataframe APIs! + // I'm not doing that right now, to avoid breaking changes (and we need to talk about + // what the syntax for these fully-qualified paths need to look like first). + // format!("{entity_path}@{}", descriptor.short_name()) + } + + #[inline] + pub fn to_arrow_field(&self) -> ArrowField { + let nullable = true; + ArrowField::new(self.column_name(), self.returned_datatype(), nullable) + .with_metadata(self.metadata()) } } diff --git a/crates/store/re_sorbet/src/index_column_schema.rs b/crates/store/re_sorbet/src/index_column_schema.rs index c782edf0ecaf4..29ca0db2a3c1b 100644 --- a/crates/store/re_sorbet/src/index_column_schema.rs +++ b/crates/store/re_sorbet/src/index_column_schema.rs @@ -1,6 +1,9 @@ use arrow::datatypes::{DataType as ArrowDatatype, Field as ArrowField}; + use re_log_types::{Timeline, TimelineName}; +use crate::MetadataExt as _; + #[derive(thiserror::Error, Debug)] #[error("Unsupported time type: {datatype:?}")] pub struct UnsupportedTimeType { @@ -15,6 +18,9 @@ pub struct TimeColumnDescriptor { /// The Arrow datatype of the column. pub datatype: ArrowDatatype, + + /// Are the indices in this column sorted? + pub is_sorted: bool, } impl PartialOrd for TimeColumnDescriptor { @@ -30,6 +36,7 @@ impl Ord for TimeColumnDescriptor { let Self { timeline, datatype: _, + is_sorted: _, } = self; timeline.cmp(&other.timeline) } @@ -44,6 +51,7 @@ impl TimeColumnDescriptor { // It doesn't matter, only the name will remain in the Arrow schema anyhow. timeline: Timeline::new_sequence(name), datatype: ArrowDatatype::Null, + is_sorted: true, } } @@ -69,17 +77,21 @@ impl TimeColumnDescriptor { #[inline] pub fn to_arrow_field(&self) -> ArrowField { - let Self { timeline, datatype } = self; + let Self { + timeline, + datatype, + is_sorted, + } = self; let nullable = true; // Time column must be nullable since static data doesn't have a time. - let metadata = [ - Some(("rerun.kind".to_owned(), "index".to_owned())), - Some(("rerun.index_name".to_owned(), timeline.name().to_string())), - ] - .into_iter() - .flatten() - .collect(); + let mut metadata = std::collections::HashMap::from([ + ("rerun.kind".to_owned(), "index".to_owned()), + ("rerun.index_name".to_owned(), timeline.name().to_string()), + ]); + if *is_sorted { + metadata.insert("rerun.is_sorted".to_owned(), "true".to_owned()); + } ArrowField::new(timeline.name().to_string(), datatype.clone(), nullable) .with_metadata(metadata) @@ -91,6 +103,7 @@ impl From for TimeColumnDescriptor { Self { timeline, datatype: timeline.datatype(), + is_sorted: false, // assume the worst } } } @@ -114,6 +127,10 @@ impl TryFrom<&ArrowField> for TimeColumnDescriptor { let timeline = Timeline::new(name, time_type); - Ok(Self { timeline, datatype }) + Ok(Self { + timeline, + datatype, + is_sorted: field.metadata().get_bool("rerun.is_sorted"), + }) } } diff --git a/crates/store/re_sorbet/src/lib.rs b/crates/store/re_sorbet/src/lib.rs index 9544b32d6afe6..fb1c754d96041 100644 --- a/crates/store/re_sorbet/src/lib.rs +++ b/crates/store/re_sorbet/src/lib.rs @@ -12,7 +12,7 @@ mod metadata; mod row_id_column_schema; pub use self::{ - chunk_batch::ChunkBatch, + chunk_batch::{ChunkBatch, MismatchedChunkSchemaError}, chunk_schema::ChunkSchema, column_schema::{ColumnDescriptor, ColumnError}, data_column_schema::ComponentColumnDescriptor, diff --git a/crates/store/re_sorbet/src/row_id_column_schema.rs b/crates/store/re_sorbet/src/row_id_column_schema.rs index 3f8746434704d..fae2a3b84f119 100644 --- a/crates/store/re_sorbet/src/row_id_column_schema.rs +++ b/crates/store/re_sorbet/src/row_id_column_schema.rs @@ -8,6 +8,22 @@ pub struct WrongDatatypeError { pub actual: ArrowDatatype, } +impl WrongDatatypeError { + pub fn compare_expected_actual( + expected: &ArrowDatatype, + actual: &ArrowDatatype, + ) -> Result<(), Self> { + if expected == actual { + Ok(()) + } else { + Err(Self { + expected: expected.clone(), + actual: actual.clone(), + }) + } + } +} + /// Describes the [`RowId`] #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct RowIdColumnDescriptor {} @@ -50,13 +66,24 @@ impl TryFrom<&ArrowField> for RowIdColumnDescriptor { fn try_from(field: &ArrowField) -> Result { // TODO: check `rerun.kind` - if field.data_type() == &RowId::arrow_datatype() { - Ok(Self {}) - } else { - Err(WrongDatatypeError { - expected: RowId::arrow_datatype(), - actual: field.data_type().clone(), - }) - } + Self::try_from(field.data_type()) + } +} + +impl TryFrom<&ArrowDatatype> for RowIdColumnDescriptor { + type Error = WrongDatatypeError; + + fn try_from(data_type: &ArrowDatatype) -> Result { + WrongDatatypeError::compare_expected_actual(&RowId::arrow_datatype(), data_type)?; + Ok(Self {}) + } +} + +impl TryFrom for RowIdColumnDescriptor { + type Error = WrongDatatypeError; + + fn try_from(data_type: ArrowDatatype) -> Result { + WrongDatatypeError::compare_expected_actual(&RowId::arrow_datatype(), &data_type)?; + Ok(Self {}) } }