Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Even less arrow2 #8719

Merged
merged 12 commits into from
Jan 17, 2025
Prev Previous commit
Next Next commit
Convert re_dataframe/src/qeury.rs
  • Loading branch information
emilk committed Jan 16, 2025
commit 0efbc086f6e7c01a3b90ff1085452cb6e2df021a
78 changes: 19 additions & 59 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
@@ -7,10 +7,12 @@ use std::{
};

use arrow::{
array::RecordBatch as ArrowRecordBatch,
array::{ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch},
buffer::ScalarBuffer as ArrowScalarBuffer,
datatypes::SchemaRef as ArrowSchemaRef,
datatypes::{DataType as ArrowDataType, Fields as ArrowFields, Schema as ArrowSchema},
datatypes::{
DataType as ArrowDataType, Fields as ArrowFields, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
},
};
use arrow2::{
array::{
@@ -792,39 +794,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
#[inline]
pub fn next_row(&self) -> Option<Vec<ArrayRef>> {
self.engine
.with(|store, cache| self._next_row_arrow2(store, cache))
.map(|vec| vec.into_iter().map(|a| a.into()).collect())
}

/// Returns the next row's worth of data.
///
/// The returned vector of Arrow arrays strictly follows the schema specified by [`Self::schema`].
/// Columns that do not yield any data will still be present in the results, filled with null values.
///
/// Each cell in the result corresponds to the latest _locally_ known value at that particular point in
/// the index, for each respective `ColumnDescriptor`.
/// See [`QueryExpression::sparse_fill_strategy`] to go beyond local resolution.
///
/// Example:
/// ```ignore
/// while let Some(row) = query_handle.next_row() {
/// // …
/// }
/// ```
///
/// ## Pagination
///
/// Use [`Self::seek_to_row`]:
/// ```ignore
/// query_handle.seek_to_row(42);
/// for row in query_handle.into_iter().take(len) {
/// // …
/// }
/// ```
#[inline]
fn next_row_arrow2(&self) -> Option<Vec<Box<dyn Arrow2Array>>> {
self.engine
.with(|store, cache| self._next_row_arrow2(store, cache))
.with(|store, cache| self._next_row(store, cache))
}

/// Asynchronously returns the next row's worth of data.
@@ -843,15 +813,13 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// }
/// ```
#[cfg(not(target_arch = "wasm32"))]
pub fn next_row_async_arrow2(
&self,
) -> impl std::future::Future<Output = Option<Vec<Box<dyn Arrow2Array>>>>
pub fn next_row_async(&self) -> impl std::future::Future<Output = Option<Vec<ArrayRef>>>
where
E: 'static + Send + Clone,
{
let res: Option<Option<_>> = self
.engine
.try_with(|store, cache| self._next_row_arrow2(store, cache));
.try_with(|store, cache| self._next_row(store, cache));

let engine = self.engine.clone();
std::future::poll_fn(move |cx| {
@@ -881,11 +849,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
})
}

pub fn _next_row_arrow2(
&self,
store: &ChunkStore,
cache: &QueryCache,
) -> Option<Vec<Box<dyn Arrow2Array>>> {
pub fn _next_row(&self, store: &ChunkStore, cache: &QueryCache) -> Option<Vec<ArrowArrayRef>> {
re_tracing::profile_function!();

/// Temporary state used to resolve the streaming join for the current iteration.
@@ -1238,20 +1202,17 @@ impl<E: StorageEngineLike> QueryHandle<E> {
.map(|(view_idx, column)| match column {
ColumnDescriptor::Time(descr) => {
max_value_per_index.get(&descr.timeline()).map_or_else(
|| arrow2::array::new_null_array(column.arrow_datatype().into(), 1),
|(_time, time_sliced)| {
descr.typ().make_arrow_array(time_sliced.clone()).into()
},
|| arrow::array::new_null_array(&column.arrow_datatype(), 1),
|(_time, time_sliced)| descr.typ().make_arrow_array(time_sliced.clone()),
)
}

ColumnDescriptor::Component(_descr) => view_sliced_arrays
.get(*view_idx)
.cloned()
.flatten()
.unwrap_or_else(|| {
arrow2::array::new_null_array(column.arrow_datatype().into(), 1)
}),
.map(|a| a.into())
.unwrap_or_else(|| arrow::array::new_null_array(&column.arrow_datatype(), 1)),
})
.collect_vec();

@@ -1278,28 +1239,27 @@ impl<E: StorageEngineLike> QueryHandle<E> {
where
E: 'static + Send + Clone,
{
let row = self.next_row_async_arrow2().await?;
let row = self.next_row_async().await?;

// If we managed to get a row, then the state must be initialized already.
#[allow(clippy::unwrap_used)]
let schema = self.state.get().unwrap().arrow_schema.clone();

// TODO(#3741): remove the collect
ArrowRecordBatch::try_new(schema, row.into_iter().map(|a| a.into()).collect()).ok()
ArrowRecordBatch::try_new(schema, row).ok()
}
}

impl<E: StorageEngineLike> QueryHandle<E> {
/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn iter(&self) -> impl Iterator<Item = Vec<Box<dyn Arrow2Array>>> + '_ {
std::iter::from_fn(move || self.next_row_arrow2())
pub fn iter(&self) -> impl Iterator<Item = Vec<ArrowArrayRef>> + '_ {
std::iter::from_fn(move || self.next_row())
}

/// Returns an iterator backed by [`Self::next_row`].
#[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work
pub fn into_iter(self) -> impl Iterator<Item = Vec<Box<dyn Arrow2Array>>> {
std::iter::from_fn(move || self.next_row_arrow2())
pub fn into_iter(self) -> impl Iterator<Item = Vec<ArrowArrayRef>> {
std::iter::from_fn(move || self.next_row())
}

/// Returns an iterator backed by [`Self::next_row_batch`].
Loading