Skip to content

Commit

Permalink
finish constructing executor from proto
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 5, 2025
1 parent 8acd0fd commit ae7d462
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub use simple_agg::SimpleAggExecutor;
pub use sink::SinkExecutor;
pub use sort::*;
pub use stateless_simple_agg::StatelessSimpleAggExecutor;
pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
pub use temporal_join::TemporalJoinExecutor;
pub use top_n::{
AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
Expand Down
45 changes: 28 additions & 17 deletions src/stream/src/from_proto/sync_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
use risingwave_pb::stream_plan::SyncLogStoreNode;
use risingwave_storage::StateStore;

use crate::common::log_store_impl::kv_log_store::{KV_LOG_STORE_V2_INFO, KvLogStoreMetrics};
use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde;
use crate::common::log_store_impl::kv_log_store::{
KvLogStoreMetrics, FIRST_SEQ_ID, KV_LOG_STORE_V2_INFO,
};
use crate::error::StreamResult;
use crate::executor::Executor;
use crate::executor::{Executor, SyncedKvLogStoreExecutor};
use crate::from_proto::ExecutorBuilder;
use crate::task::ExecutorParams;

Expand All @@ -33,7 +35,8 @@ impl ExecutorBuilder for SyncLogStoreExecutorBuilder {
store: impl StateStore,
) -> StreamResult<Executor> {
let actor_context = params.actor_context.clone();
let table_id = 1;
let table = node.log_store_table.as_ref().unwrap().clone();
let table_id = table.id;

let metrics = {
let streaming_metrics = actor_context.streaming_metrics.as_ref();
Expand All @@ -50,20 +53,28 @@ impl ExecutorBuilder for SyncLogStoreExecutorBuilder {
)
};

let table = node.log_store_table.as_ref().unwrap().clone();
let serde = LogStoreRowSerde::new(&table, params.vnode_bitmap.map(|b| b.into()), &KV_LOG_STORE_V2_INFO);
let serde = LogStoreRowSerde::new(
&table,
params.vnode_bitmap.map(|b| b.into()),
&KV_LOG_STORE_V2_INFO,
);
// FIXME(kwannoel): Make configurable
let buffer_max_size = 1000;
let [upstream] = params.input.try_into().unwrap();

let seq_id = FIRST_SEQ_ID;

// let table = node.get_table()?;
// let table = table.clone();
// let table = table.into_sync_log_store();
// let table = Arc::new(table);
// let table = table as Arc<dyn StateTable>;
//
// let executor = SyncedKvLogStoreExecutor {
// base: ExecutorBase::new(params, node, store),
// table,
// };
// Ok(Box::new(executor))
todo!()
let executor = SyncedKvLogStoreExecutor::new(
actor_context,
table_id,
metrics,
serde,
seq_id,
store,
buffer_max_size,
upstream,
)
.await;
Ok((params.info, executor).into())
}
}

0 comments on commit ae7d462

Please sign in to comment.