Skip to content

Commit

Permalink
gRPC spec update: unify data and metadata API responses (#8328)
Browse files Browse the repository at this point in the history
Part 2 of API unification that simplifies dealing with data streams
coming from Data Platform.
  • Loading branch information
zehiko authored Dec 9, 2024
1 parent 7107267 commit 36cf7e0
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 251 deletions.
17 changes: 17 additions & 0 deletions crates/store/re_protos/proto/rerun/v0/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@ syntax = "proto3";

package rerun.remote_store.v0;

// supported encoder versions for encoding data
// See `RerunData` and `RerunChunkData` for its usage
enum EncoderVersion {
V0 = 0;
}

// RerunChunk is arrow IPC encoded RecordBatch that has
// rerun-specific semantic constraints and can be directly
// converted to a Rerun Chunk (`re_chunk::Chunk`)
message RerunChunk {
// encoder version used to encode the data
EncoderVersion encoder_version = 1;

// Data payload is Arrow IPC encoded RecordBatch
bytes payload = 1000;
}

// unique recording identifier. At this point in time it is the same id as the ChunkStore's StoreId
message RecordingId {
string id = 1;
Expand Down
54 changes: 19 additions & 35 deletions crates/store/re_protos/proto/rerun/v0/remote_store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,26 @@ import "rerun/v0/common.proto";

service StorageNode {
// data API calls
rpc Query(QueryRequest) returns (stream QueryResponse) {}
rpc FetchRecording(FetchRecordingRequest) returns (stream FetchRecordingResponse) {}
rpc Query(QueryRequest) returns (stream DataframePart) {}
rpc FetchRecording(FetchRecordingRequest) returns (stream RerunChunk) {}

// metadata API calls
rpc QueryCatalog(QueryCatalogRequest) returns (stream QueryCatalogResponse) {}
rpc QueryCatalog(QueryCatalogRequest) returns (stream DataframePart) {}
rpc UpdateCatalog(UpdateCatalogRequest) returns (UpdateCatalogResponse) {}
rpc RegisterRecording(RegisterRecordingRequest) returns (RegisterRecordingResponse) {}
// TODO(zehiko) support registering more than one recording at a time
rpc RegisterRecording(RegisterRecordingRequest) returns (DataframePart) {}
}


// ---------------- Common response message ------------------

// DataframePart is arrow IPC encoded RecordBatch
message DataframePart {
// encoder version used to encode the data
EncoderVersion encoder_version = 1;

// Data payload is Arrow IPC encoded RecordBatch
bytes payload = 1000;
}

// ---------------- RegisterRecording ------------------
Expand All @@ -26,29 +39,14 @@ message RegisterRecordingRequest {
RecordingType typ = 3;
// (optional) any additional metadata that should be associated with the recording
// You can associate any arbtrirary number of columns with a specific recording
RecordingMetadata metadata = 4;
}

// Recording metadata is single row arrow record batch
message RecordingMetadata {
EncoderVersion encoder_version = 1;
bytes payload = 2;
}

message RegisterRecordingResponse {
RecordingId id = 1;
// Note / TODO(zehiko): this implies we read the record (for example go through entire .rrd file
// chunk by chunk) and extract the metadata. So we might want to 1/ not do this i.e.
// only do it as part of explicit GetMetadata request or 2/ do it if Request has "include_metadata=true"
// or 3/ do it always
RecordingMetadata metadata = 2;
DataframePart metadata = 4;
}

// ---------------- UpdateCatalog -----------------

message UpdateCatalogRequest {
RecordingId recording_id = 1;
RecordingMetadata metadata = 2;
DataframePart metadata = 2;
}

message UpdateCatalogResponse {}
Expand All @@ -62,20 +60,6 @@ message QueryRequest {
Query query = 3;
}

message QueryResponse {
// TODO(zehiko) we need to expand this to become something like 'encoder options'
// as we will need to specify additional options like compression, including schema
// in payload, etc.
EncoderVersion encoder_version = 1;
// payload is raw bytes that the relevant codec can interpret
bytes payload = 2;
}


enum EncoderVersion {
V0 = 0;
}


// ----------------- QueryCatalog -----------------

Expand Down
135 changes: 1 addition & 134 deletions crates/store/re_protos/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow2::error::Error as Arrow2Error;
use arrow2::io::ipc::{read, write};
use re_dataframe::TransportChunk;

use crate::v0::{EncoderVersion, RecordingMetadata};
use crate::v0::EncoderVersion;

#[derive(Debug, thiserror::Error)]
pub enum CodecError {
Expand Down Expand Up @@ -138,79 +138,6 @@ pub fn decode(version: EncoderVersion, data: &[u8]) -> Result<Option<TransportCh
}
}

impl RecordingMetadata {
/// Create `RecordingMetadata` from `TransportChunk`. We rely on `TransportChunk` until
/// we migrate from arrow2 to arrow.
pub fn try_from(
version: EncoderVersion,
metadata: &TransportChunk,
) -> Result<Self, CodecError> {
if metadata.data.len() != 1 {
return Err(CodecError::InvalidArgument(format!(
"metadata record batch can only have a single row, batch with {} rows given",
metadata.data.len()
)));
};

match version {
EncoderVersion::V0 => {
let mut data: Vec<u8> = Vec::new();
write_arrow_to_bytes(&mut data, &metadata.schema, &metadata.data)?;

Ok(Self {
encoder_version: version as i32,
payload: data,
})
}
}
}

/// Get metadata as arrow data
pub fn data(&self) -> Result<TransportChunk, CodecError> {
let mut reader = std::io::Cursor::new(self.payload.clone());

let encoder_version = EncoderVersion::try_from(self.encoder_version)
.map_err(|err| CodecError::InvalidArgument(err.to_string()))?;

match encoder_version {
EncoderVersion::V0 => {
let (schema, data) = read_arrow_from_bytes(&mut reader)?;
Ok(TransportChunk { schema, data })
}
}
}

/// Returns unique id of the recording
pub fn id(&self) -> Result<re_log_types::StoreId, CodecError> {
let metadata = self.data()?;
let id_pos = metadata
.schema
.fields
.iter()
// TODO(zehiko) we need to figure out where mandatory fields live
.position(|field| field.name == "id")
.ok_or_else(|| CodecError::InvalidArgument("missing id field in schema".to_owned()))?;

use arrow2::array::Utf8Array as Arrow2Utf8Array;

let id = metadata.data.columns()[id_pos]
.as_any()
.downcast_ref::<Arrow2Utf8Array<i32>>()
.ok_or_else(|| {
CodecError::InvalidArgument(format!(
"Unexpected type for id with position {id_pos} in schema: {:?}",
metadata.schema
))
})?
.value(0);

Ok(re_log_types::StoreId::from_string(
re_log_types::StoreKind::Recording,
id.to_owned(),
))
}
}

/// Helper function that serializes given arrow schema and record batch into bytes
/// using Arrow IPC format.
fn write_arrow_to_bytes<W: std::io::Write>(
Expand Down Expand Up @@ -253,18 +180,9 @@ fn read_arrow_from_bytes<R: std::io::Read>(

#[cfg(test)]
mod tests {
use arrow2::array::Utf8Array as Arrow2Utf8Array;
use arrow2::chunk::Chunk as Arrow2Chunk;
use arrow2::{
array::Int32Array as Arrow2Int32Array, datatypes::Field as Arrow2Field,
datatypes::Schema as Arrow2Schema,
};
use re_dataframe::external::re_chunk::{Chunk, RowId};
use re_dataframe::TransportChunk;
use re_log_types::StoreId;
use re_log_types::{example_components::MyPoint, Timeline};

use crate::v0::RecordingMetadata;
use crate::{
codec::{decode, encode, CodecError, TransportMessageV0},
v0::EncoderVersion,
Expand Down Expand Up @@ -356,55 +274,4 @@ mod tests {

assert_eq!(expected_chunk, decoded_chunk);
}

#[test]
fn test_recording_metadata_serialization() {
let expected_schema = Arrow2Schema::from(vec![
Arrow2Field::new("id", arrow2::datatypes::DataType::Utf8, false),
Arrow2Field::new("my_int", arrow2::datatypes::DataType::Int32, false),
]);

let id = Arrow2Utf8Array::<i32>::from_slice(["some_id"]);
let my_ints = Arrow2Int32Array::from_slice([42]);
let expected_chunk = Arrow2Chunk::new(vec![Box::new(id) as _, Box::new(my_ints) as _]);
let metadata_tc = TransportChunk {
schema: expected_schema.clone(),
data: expected_chunk.clone(),
};

let metadata = RecordingMetadata::try_from(EncoderVersion::V0, &metadata_tc).unwrap();
assert_eq!(
StoreId::from_string(re_log_types::StoreKind::Recording, "some_id".to_owned()),
metadata.id().unwrap()
);

let tc = metadata.data().unwrap();

assert_eq!(expected_schema, tc.schema);
assert_eq!(expected_chunk, tc.data);
}

#[test]
fn test_recording_metadata_fails_with_non_unit_batch() {
let expected_schema = Arrow2Schema::from(vec![Arrow2Field::new(
"my_int",
arrow2::datatypes::DataType::Int32,
false,
)]);
// more than 1 row in the batch
let my_ints = Arrow2Int32Array::from_slice([41, 42]);

let expected_chunk = Arrow2Chunk::new(vec![Box::new(my_ints) as _]);
let metadata_tc = TransportChunk {
schema: expected_schema.clone(),
data: expected_chunk,
};

let metadata = RecordingMetadata::try_from(EncoderVersion::V0, &metadata_tc);

assert!(matches!(
metadata.err().unwrap(),
CodecError::InvalidArgument(_)
));
}
}
Loading

0 comments on commit 36cf7e0

Please sign in to comment.