Skip to content

Commit

Permalink
Roundtrip it
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Feb 8, 2025
1 parent f1b5d46 commit 7e171a3
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6589,6 +6589,7 @@ version = "0.23.0-alpha.1+dev"
dependencies = [
"arrow",
"itertools 0.13.0",
"re_arrow_util",
"re_format_arrow",
"re_log",
"re_log_types",
Expand Down
115 changes: 111 additions & 4 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,102 @@ impl Chunk {
Ok(TransportChunk::from(record_batch))
}

pub fn from_chunk_batch(batch: &re_sorbet::ChunkBatch) -> ChunkResult<Self> {
re_tracing::profile_function!(format!(
"num_columns={} num_rows={}",
batch.num_columns(),
batch.num_rows()
));

// Metadata
let (id, entity_path, is_sorted) = (
batch.chunk_id(),
batch.entity_path().clone(),
batch.is_sorted(),
);

let row_ids = batch.row_id_column().1.clone();

let timelines = {
re_tracing::profile_scope!("timelines");

let mut timelines = IntMap::default();

for (schema, column) in batch.index_columns() {
let timeline = schema.timeline();

let times =
TimeColumn::read_array(&as_array_ref(column.clone())).map_err(|err| {
ChunkError::Malformed {
reason: format!("Bad time column '{}': {err}", schema.name()),
}
})?;

let time_column =
TimeColumn::new(schema.is_sorted.then_some(true), timeline, times);
if timelines.insert(timeline, time_column).is_some() {
return Err(ChunkError::Malformed {
reason: format!(
"time column '{}' was specified more than once",
schema.name(),
),
});
}
}

timelines
};

let components = {
let mut components = ChunkComponents::default();

for (schema, column) in batch.data_columns() {
let column = column
.downcast_array_ref::<ArrowListArray>()
.ok_or_else(|| ChunkError::Malformed {
reason: format!(
"The outer array in a chunked component batch must be a sparse list, got {:?}",
column.data_type(),
),
})?;

let component_desc = ComponentDescriptor {
archetype_name: schema.archetype_name,
archetype_field_name: schema.archetype_field_name,
component_name: schema.component_name,
};

if components
.insert_descriptor(component_desc, column.clone())
.is_some()
{
return Err(ChunkError::Malformed {
reason: format!(
"component column '{schema:?}' was specified more than once"
),
});
}
}

components
};

let mut res = Self::new(
id,
entity_path,
is_sorted.then_some(true),
row_ids,
timelines,
components,
)?;

if let Some(heap_size_bytes) = batch.heap_size_bytes() {
res.heap_size_bytes = heap_size_bytes.into();
}

Ok(res)
}

pub fn from_record_batch(batch: ArrowRecordBatch) -> ChunkResult<Self> {
Self::from_transport(&TransportChunk::from(batch))
}
Expand Down Expand Up @@ -831,6 +927,7 @@ impl Chunk {
#[cfg(test)]
mod tests {
use nohash_hasher::IntMap;
use similar_asserts::assert_eq;

use re_arrow_util::arrow_util;
use re_log_types::{
Expand Down Expand Up @@ -981,12 +1078,22 @@ mod tests {
)
.unwrap();

let chunk_batch = chunk_before.to_chunk_batch().unwrap();
let chunk_batch_before = chunk_before.to_chunk_batch().unwrap();

assert_eq!(chunk_before.num_columns(), chunk_batch_before.num_columns());
assert_eq!(chunk_before.num_rows(), chunk_batch_before.num_rows());

assert_eq!(chunk_before.num_columns(), chunk_batch.num_columns());
assert_eq!(chunk_before.num_rows(), chunk_batch.num_rows());
let arrow_record_batch = ArrowRecordBatch::from(&chunk_batch_before);

let chunk_batch_after = re_sorbet::ChunkBatch::try_from(arrow_record_batch).unwrap();

assert_eq!(
chunk_batch_before.chunk_schema(),
chunk_batch_after.chunk_schema()
);
assert_eq!(chunk_batch_before.num_rows(), chunk_batch_after.num_rows());

let chunk_after = Chunk::from_record_batch(chunk_batch.into()).unwrap();
let chunk_after = Chunk::from_chunk_batch(&chunk_batch_after).unwrap();

assert_eq!(chunk_before.entity_path(), chunk_after.entity_path());
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_sorbet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ all-features = true


[dependencies]
re_arrow_util.workspace = true
re_format_arrow.workspace = true
re_log_types.workspace = true
re_log.workspace = true
Expand Down
39 changes: 37 additions & 2 deletions crates/store/re_sorbet/src/chunk_batch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use arrow::{
array::{ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch, RecordBatchOptions},
array::{
Array as _, ArrayRef as ArrowArrayRef, AsArray, RecordBatch as ArrowRecordBatch,
RecordBatchOptions, StructArray as ArrowStructArray,
},
datatypes::{Fields as ArrowFields, Schema as ArrowSchema},
};

use re_log_types::EntityPath;
use re_types_core::ChunkId;

use crate::{
chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema, WrongDatatypeError,
chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema, ComponentColumnDescriptor,
RowIdColumnDescriptor, TimeColumnDescriptor, WrongDatatypeError,
};

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -144,6 +148,37 @@ impl ChunkBatch {
pub fn arrow_bacth_metadata(&self) -> &ArrowBatchMetadata {
&self.batch.schema_ref().metadata
}

pub fn row_id_column(&self) -> (&RowIdColumnDescriptor, &ArrowStructArray) {
// The first column is always the row IDs.
(
&self.schema.row_id_column,
self.batch.columns()[0]
.as_struct_opt()
.expect("Row IDs should be encoded as struct"),
)
}

/// The columns of the indices (timelines).
pub fn index_columns(&self) -> impl Iterator<Item = (&TimeColumnDescriptor, &ArrowArrayRef)> {
itertools::izip!(
&self.schema.index_columns,
self.batch.columns().iter().skip(1) // skip row IDs
)
}

/// The columns of the indices (timelines).
pub fn data_columns(
&self,
) -> impl Iterator<Item = (&ComponentColumnDescriptor, &ArrowArrayRef)> {
itertools::izip!(
&self.schema.data_columns,
self.batch
.columns()
.iter()
.skip(1 + self.schema.index_columns.len()) // skip row IDs and indices
)
}
}

impl std::fmt::Display for ChunkBatch {
Expand Down
15 changes: 8 additions & 7 deletions crates/store/re_sorbet/src/chunk_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl InvalidChunkSchema {
///
/// This does NOT preserve custom arrow metadata.
/// It only contains the metadata used by Rerun.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChunkSchema {
/// The globally unique ID of this chunk.
pub chunk_id: ChunkId,
Expand Down Expand Up @@ -240,7 +240,7 @@ impl TryFrom<&ArrowSchema> for ChunkSchema {
let row_id_column = RowIdColumnDescriptor::try_from(first_field.as_ref())
.map_err(InvalidChunkSchema::BadRowIdColumn)?;

let columns: Result<Vec<_>, _> = fields
let index_and_data_columns: Result<Vec<_>, _> = fields
.iter()
.skip(1)
.map(|field| {
Expand All @@ -252,27 +252,28 @@ impl TryFrom<&ArrowSchema> for ChunkSchema {
})
})
.collect();
let columns = columns?;
let index_and_data_columns = index_and_data_columns?;

// Index columns should always come first:
let num_index_columns = columns.partition_point(|p| matches!(p, ColumnDescriptor::Time(_)));
let num_index_columns =
index_and_data_columns.partition_point(|p| matches!(p, ColumnDescriptor::Time(_)));

let index_columns = columns[0..num_index_columns]
let index_columns = index_and_data_columns[0..num_index_columns]
.iter()
.filter_map(|c| match c {
ColumnDescriptor::Time(column) => Some(column.clone()),
ColumnDescriptor::Component(_) => None,
})
.collect_vec();
let data_columns = columns[0..num_index_columns]
let data_columns = index_and_data_columns[num_index_columns..]
.iter()
.filter_map(|c| match c {
ColumnDescriptor::Time(_) => None,
ColumnDescriptor::Component(column) => Some(column.clone()),
})
.collect_vec();

if index_columns.len() + data_columns.len() < columns.len() {
if index_columns.len() + data_columns.len() != index_and_data_columns.len() {
return Err(InvalidChunkSchema::UnorderedIndexAndDataColumns);
}

Expand Down

0 comments on commit 7e171a3

Please sign in to comment.