Skip to content

Commit

Permalink
Convert Chunk to ChunkBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Feb 7, 2025
1 parent f651a64 commit f1b5d46
Show file tree
Hide file tree
Showing 11 changed files with 412 additions and 58 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5830,6 +5830,7 @@ dependencies = [
"re_format_arrow",
"re_log",
"re_log_types",
"re_sorbet",
"re_tracing",
"re_tuid",
"re_types_core",
Expand Down Expand Up @@ -6587,6 +6588,7 @@ name = "re_sorbet"
version = "0.23.0-alpha.1+dev"
dependencies = [
"arrow",
"itertools 0.13.0",
"re_format_arrow",
"re_log",
"re_log_types",
Expand Down
5 changes: 3 additions & 2 deletions crates/store/re_chunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ serde = ["re_log_types/serde", "re_tuid/serde", "re_types_core/serde"]
re_arrow_util.workspace = true
re_byte_size.workspace = true
re_error.workspace = true
re_format.workspace = true
re_format_arrow.workspace = true
re_log.workspace = true
re_format.workspace = true
re_log_types.workspace = true
re_log.workspace = true
re_sorbet.workspace = true
re_tracing.workspace = true
re_tuid.workspace = true
re_types_core.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ pub enum ChunkError {

#[error("Deserialization: {0}")]
Deserialization(#[from] DeserializationError),

#[error(transparent)]
UnsupportedTimeType(#[from] re_sorbet::UnsupportedTimeType),

#[error(transparent)]
WrongDatatypeError(#[from] re_sorbet::WrongDatatypeError),

#[error(transparent)]
MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),
}

pub type ChunkResult<T> = Result<T, ChunkError>;
Expand Down
143 changes: 141 additions & 2 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,116 @@ impl TransportChunk {
}
}

impl Chunk {
/// Prepare the [`Chunk`] for transport.
///
/// It is probably a good idea to sort the chunk first.
pub fn to_chunk_batch(&self) -> ChunkResult<re_sorbet::ChunkBatch> {
self.sanity_check()?;

re_tracing::profile_function!(format!(
"num_columns={} num_rows={}",
self.num_columns(),
self.num_rows()
));

let heap_size_bytes = self.heap_size_bytes();
let Self {
id,
entity_path,
heap_size_bytes: _, // use the method instead because of lazy initialization
is_sorted,
row_ids,
timelines,
components,
} = self;

let row_id_schema = re_sorbet::RowIdColumnDescriptor::try_from(RowId::arrow_datatype())?;

let (index_schemas, index_arrays): (Vec<_>, Vec<_>) = {
re_tracing::profile_scope!("timelines");

let mut timelines = timelines
.iter()
.map(|(timeline, info)| {
let TimeColumn {
timeline: _,
times: _,
is_sorted,
time_range: _,
} = info;

let array = info.times_array();
let schema = re_sorbet::TimeColumnDescriptor {
timeline: *timeline,
datatype: array.data_type().clone(),
is_sorted: *is_sorted,
};

(schema, into_arrow_ref(array))
})
.collect_vec();

timelines.sort_by(|(schema_a, _), (schema_b, _)| schema_a.cmp(schema_b));

timelines.into_iter().unzip()
};

let (data_schemas, data_arrays): (Vec<_>, Vec<_>) = {
re_tracing::profile_scope!("components");

let mut components = components
.values()
.flat_map(|per_desc| per_desc.iter())
.map(|(component_desc, list_array)| {
let list_array = ArrowListArray::from(list_array.clone());
let ComponentDescriptor {
archetype_name,
archetype_field_name,
component_name,
} = *component_desc;

let schema = re_sorbet::ComponentColumnDescriptor {
store_datatype: list_array.data_type().clone(),
entity_path: entity_path.clone(),

archetype_name,
archetype_field_name,
component_name,

is_static: false, // TODO
is_indicator: false, // TODO
is_tombstone: false, // TODO
is_semantically_empty: false, // TODO
};
(schema, into_arrow_ref(list_array))
})
.collect_vec();

components.sort_by(|(schema_a, _), (schema_b, _)| schema_a.cmp(schema_b));

components.into_iter().unzip()
};

let schema = re_sorbet::ChunkSchema::new(
*id,
entity_path.clone(),
row_id_schema,
index_schemas,
data_schemas,
)
.with_heap_size_bytes(heap_size_bytes)
.with_sorted(*is_sorted);

Ok(re_sorbet::ChunkBatch::try_new(
schema,
into_arrow_ref(row_ids.clone()),
index_arrays,
data_arrays,
)?)
}
}

impl Chunk {
/// Prepare the [`Chunk`] for transport.
///
Expand Down Expand Up @@ -735,7 +845,7 @@ mod tests {
let entity_path = EntityPath::parse_forgiving("a/b/c");

let timeline1 = Timeline::new_temporal("log_time");
let timelines1 = std::iter::once((
let timelines1: IntMap<_, _> = std::iter::once((
timeline1,
TimeColumn::new(Some(true), timeline1, vec![42, 43, 44, 45].into()),
))
Expand Down Expand Up @@ -788,7 +898,7 @@ mod tests {

let row_ids = vec![RowId::new(), RowId::new(), RowId::new(), RowId::new()];

for timelines in [timelines1, timelines2] {
for timelines in [timelines1.clone(), timelines2.clone()] {
let chunk_original = Chunk::from_native_row_ids(
ChunkId::new(),
entity_path.clone(),
Expand Down Expand Up @@ -860,6 +970,35 @@ mod tests {
}
}

for timelines in [timelines1, timelines2] {
let chunk_before = Chunk::from_native_row_ids(
ChunkId::new(),
entity_path.clone(),
None,
&row_ids,
timelines.clone(),
components.clone().into_iter().collect(),
)
.unwrap();

let chunk_batch = chunk_before.to_chunk_batch().unwrap();

assert_eq!(chunk_before.num_columns(), chunk_batch.num_columns());
assert_eq!(chunk_before.num_rows(), chunk_batch.num_rows());

let chunk_after = Chunk::from_record_batch(chunk_batch.into()).unwrap();

assert_eq!(chunk_before.entity_path(), chunk_after.entity_path());
assert_eq!(
chunk_before.heap_size_bytes(),
chunk_after.heap_size_bytes(),
);
assert_eq!(chunk_before.num_columns(), chunk_after.num_columns());
assert_eq!(chunk_before.num_rows(), chunk_after.num_rows());
assert!(chunk_before.are_equal(&chunk_after));
assert_eq!(chunk_before, chunk_after);
}

Ok(())
}
}
1 change: 1 addition & 0 deletions crates/store/re_sorbet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ re_tuid.workspace = true
re_types_core.workspace = true

arrow.workspace = true
itertools.workspace = true
thiserror.workspace = true

# Keep this crate simple, with few dependencies.
88 changes: 86 additions & 2 deletions crates/store/re_sorbet/src/chunk_batch.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
use arrow::{
array::RecordBatch as ArrowRecordBatch,
array::{ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch, RecordBatchOptions},
datatypes::{Fields as ArrowFields, Schema as ArrowSchema},
};

use re_log_types::EntityPath;
use re_types_core::ChunkId;

use crate::{chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema};
use crate::{
chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema, WrongDatatypeError,
};

#[derive(thiserror::Error, Debug)]
pub enum MismatchedChunkSchemaError {
#[error("{0}")]
Custom(String),

#[error(transparent)]
WrongDatatypeError(#[from] WrongDatatypeError),
}

impl MismatchedChunkSchemaError {
pub fn custom(s: impl Into<String>) -> Self {
Self::Custom(s.into())
}
}

/// The arrow [`ArrowRecordBatch`] representation of a Rerun chunk.
///
Expand All @@ -17,9 +34,76 @@ use crate::{chunk_schema::InvalidChunkSchema, ArrowBatchMetadata, ChunkSchema};
#[derive(Debug, Clone)]
pub struct ChunkBatch {
schema: ChunkSchema,

// TODO: should we store a record batch here, or just the parsed columns?
batch: ArrowRecordBatch,
}

impl ChunkBatch {
pub fn try_new(
schema: ChunkSchema,
row_ids: ArrowArrayRef,
index_arrays: Vec<ArrowArrayRef>,
data_arrays: Vec<ArrowArrayRef>,
) -> Result<Self, MismatchedChunkSchemaError> {
let row_count = row_ids.len();

WrongDatatypeError::compare_expected_actual(
&schema.row_id_column.datatype(),
row_ids.data_type(),
)?;

if index_arrays.len() != schema.index_columns.len() {
return Err(MismatchedChunkSchemaError::custom(format!(
"Schema had {} index columns, but got {}",
schema.index_columns.len(),
index_arrays.len()
)));
}
for (schema, array) in itertools::izip!(&schema.index_columns, &index_arrays) {
WrongDatatypeError::compare_expected_actual(schema.datatype(), array.data_type())?;
if array.len() != row_count {
return Err(MismatchedChunkSchemaError::custom(format!(
"Index column {:?} had {} rows, but we got {} row IDs",
schema.name(),
array.len(),
row_count
)));
}
}

if data_arrays.len() != schema.data_columns.len() {
return Err(MismatchedChunkSchemaError::custom(format!(
"Schema had {} data columns, but got {}",
schema.data_columns.len(),
data_arrays.len()
)));
}
for (schema, array) in itertools::izip!(&schema.data_columns, &data_arrays) {
WrongDatatypeError::compare_expected_actual(&schema.store_datatype, array.data_type())?;
if array.len() != row_count {
return Err(MismatchedChunkSchemaError::custom(format!(
"Data column {:?} had {} rows, but we got {} row IDs",
schema.column_name(),
array.len(),
row_count
)));
}
}

let arrow_columns = itertools::chain!(Some(row_ids), index_arrays, data_arrays).collect();

let batch = ArrowRecordBatch::try_new_with_options(
std::sync::Arc::new(ArrowSchema::from(&schema)),
arrow_columns,
&RecordBatchOptions::default().with_row_count(Some(row_count)),
)
.unwrap();

Ok(Self { schema, batch })
}
}

impl ChunkBatch {
/// The parsed rerun schema of this chunk.
#[inline]
Expand Down
Loading

0 comments on commit f1b5d46

Please sign in to comment.