Skip to content

Commit

Permalink
feat: Enhance Serving To Support Flatmap Operations (#2324)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Jan 15, 2025
1 parent 376a602 commit 33b992b
Show file tree
Hide file tree
Showing 8 changed files with 752 additions and 308 deletions.
1 change: 0 additions & 1 deletion rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ struct TimestampedPending {
#[derive(Clone)]
pub(crate) enum LagReader {
Source(Source),
// TODO: Arc<[T]>
ISB(Vec<JetstreamReader>), // multiple partitions
}

Expand Down
33 changes: 22 additions & 11 deletions rust/serving/src/app/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,23 @@ use state::State as CallbackState;
/// store for storing the state
pub(crate) mod store;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) struct CallbackRequest {
/// As message passes through each component (map, transformer, sink, etc.). it emits a beacon via callback
/// to inform that message has been processed by this component.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct Callback {
pub(crate) id: String,
pub(crate) vertex: String,
pub(crate) cb_time: u64,
pub(crate) from_vertex: String,
/// Due to flat-map operation, we can have 0 or more responses.
pub(crate) responses: Vec<Response>,
}

/// It contains details about the `To` vertex via tags (conditional forwarding).
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct Response {
/// If tags is None, the message is forwarded to all vertices, if len(Vec) == 0, it means that
/// the message has been dropped.
pub(crate) tags: Option<Vec<String>>,
}

Expand Down Expand Up @@ -68,7 +79,7 @@ async fn callback_save<T: Send + Sync + Clone + Store>(

async fn callback<T: Send + Sync + Clone + Store>(
State(app_state): State<CallbackAppState<T>>,
Json(payload): Json<Vec<CallbackRequest>>,
Json(payload): Json<Vec<Callback>>,
) -> Result<(), ApiError> {
app_state
.callback_state
Expand Down Expand Up @@ -107,12 +118,12 @@ mod tests {
let state = CallbackState::new(msg_graph, store).await.unwrap();
let app = callback_handler("ID".to_owned(), state);

let payload = vec![CallbackRequest {
let payload = vec![Callback {
id: "test_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
}];

let res = Request::builder()
Expand Down Expand Up @@ -143,26 +154,26 @@ mod tests {
let app = callback_handler("ID".to_owned(), state);

let payload = vec![
CallbackRequest {
Callback {
id: "test_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: "test_id".to_string(),
vertex: "cat".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: "test_id".to_string(),
vertex: "out".to_string(),
cb_time: 12345,
from_vertex: "cat".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
];

Expand Down
68 changes: 32 additions & 36 deletions rust/serving/src/app/callback/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::{
use tokio::sync::oneshot;

use super::store::Store;
use crate::app::callback::{store::PayloadToSave, CallbackRequest};
use crate::app::callback::{store::PayloadToSave, Callback};
use crate::app::tracker::MessageGraph;
use crate::Error;

struct RequestState {
// Channel to notify when all callbacks for a message is received
tx: oneshot::Sender<Result<String, Error>>,
// CallbackRequest is immutable, while vtx_visited can grow.
vtx_visited: Vec<Arc<CallbackRequest>>,
vtx_visited: Vec<Arc<Callback>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -81,15 +81,14 @@ where
/// insert_callback_requests is used to insert the callback requests.
pub(crate) async fn insert_callback_requests(
&mut self,
cb_requests: Vec<CallbackRequest>,
cb_requests: Vec<Callback>,
) -> Result<(), Error> {
/*
TODO: should we consider batching the requests and then processing them?
that way algorithm can be invoked only once for a batch of requests
instead of invoking it for each request.
*/
let cb_requests: Vec<Arc<CallbackRequest>> =
cb_requests.into_iter().map(Arc::new).collect();
let cb_requests: Vec<Arc<Callback>> = cb_requests.into_iter().map(Arc::new).collect();
let redis_payloads: Vec<PayloadToSave> = cb_requests
.iter()
.cloned()
Expand Down Expand Up @@ -153,23 +152,18 @@ where
id: &str,
) -> Result<String, Error> {
// If the id is not found in the in-memory store, fetch from Redis
let callbacks: Vec<Arc<CallbackRequest>> =
match self.retrieve_callbacks_from_storage(id).await {
Ok(callbacks) => callbacks,
Err(e) => {
return Err(e);
}
};
let callbacks: Vec<Arc<Callback>> = match self.retrieve_callbacks_from_storage(id).await {
Ok(callbacks) => callbacks,
Err(e) => {
return Err(e);
}
};
// check if the sub graph can be generated
self.get_subgraph(id.to_string(), callbacks)
}

// Generate subgraph from the given callbacks
fn get_subgraph(
&self,
id: String,
callbacks: Vec<Arc<CallbackRequest>>,
) -> Result<String, Error> {
fn get_subgraph(&self, id: String, callbacks: Vec<Arc<Callback>>) -> Result<String, Error> {
match self
.msg_graph_generator
.generate_subgraph_from_callbacks(id, callbacks)
Expand Down Expand Up @@ -204,7 +198,7 @@ where

// Get the Callback value for the given ID
// TODO: Generate json serialized data here itself to avoid cloning.
fn get_callbacks_from_memory(&self, id: &str) -> Option<Vec<Arc<CallbackRequest>>> {
fn get_callbacks_from_memory(&self, id: &str) -> Option<Vec<Arc<Callback>>> {
let guard = self.callbacks.lock().expect("Getting lock on State");
guard.get(id).map(|state| state.vtx_visited.clone())
}
Expand All @@ -213,9 +207,9 @@ where
async fn retrieve_callbacks_from_storage(
&mut self,
id: &str,
) -> Result<Vec<Arc<CallbackRequest>>, Error> {
) -> Result<Vec<Arc<Callback>>, Error> {
// If the id is not found in the in-memory store, fetch from Redis
let callbacks: Vec<Arc<CallbackRequest>> = match self.store.retrieve_callbacks(id).await {
let callbacks: Vec<Arc<Callback>> = match self.store.retrieve_callbacks(id).await {
Ok(response) => response.into_iter().collect(),
Err(e) => {
return Err(e);
Expand All @@ -232,11 +226,11 @@ where

#[cfg(test)]
mod tests {
use axum::body::Bytes;

use super::*;
use crate::app::callback::store::memstore::InMemoryStore;
use crate::app::callback::Response;
use crate::pipeline::PipelineDCG;
use axum::body::Bytes;

const PIPELINE_SPEC_ENCODED: &str = "eyJ2ZXJ0aWNlcyI6W3sibmFtZSI6ImluIiwic291cmNlIjp7InNlcnZpbmciOnsiYXV0aCI6bnVsbCwic2VydmljZSI6dHJ1ZSwibXNnSURIZWFkZXJLZXkiOiJYLU51bWFmbG93LUlkIiwic3RvcmUiOnsidXJsIjoicmVkaXM6Ly9yZWRpczo2Mzc5In19fSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIiLCJlbnYiOlt7Im5hbWUiOiJSVVNUX0xPRyIsInZhbHVlIjoiZGVidWcifV19LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InBsYW5uZXIiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJwbGFubmVyIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InRpZ2VyIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsidGlnZXIiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZG9nIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZG9nIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6ImVsZXBoYW50IiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZWxlcGhhbnQiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiYXNjaWlhcnQiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJhc2NpaWFydCJdLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJidWlsdGluIjpudWxsLCJncm91cEJ5IjpudWxsfSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19fSx7Im5hbWUiOiJzZXJ2ZS1zaW5rIiwic2luayI6eyJ1ZHNpbmsiOnsiY29udGFpbmVyIjp7ImltYWdlIjoic2VydmVzaW5rOjAuMSIsImVudiI6W3sibmFtZSI6Ik5VTUFGTE9XX0NBTExCQUNLX1VSTF9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctQ2FsbGJhY2stVXJsIn0seyJuYW1lIjoiTlVNQUZMT1dfTVNHX0lEX0hFQURFUl9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctSWQifV0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn19LCJyZXRyeVN0cmF0ZWd5Ijp7fX0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZXJyb3Itc2luayIsInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InNlcnZlc2luazowLjEiLCJlbnYiOlt7Im5hbWUiOiJOVU1BRkxPV19DQUxMQkFDS19VUkxfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUNhbGxiYWNrLVVybCJ9LHsibmFtZSI6Ik5VTUFGTE9XX01TR19JRF9IRUFERVJfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUlkIn1dLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9fSwicmV0cnlTdHJhdGVneSI6e319LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19XSwiZWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoicGxhbm5lciIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImFzY2lpYXJ0IiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiYXNjaWlhcnQiXX19fSx7ImZyb20iOiJwbGFubmVyIiwidG8iOiJ0aWdlciIsImNvbmRpdGlvbnMiOnsidGFncyI6eyJvcGVyYXRvciI6Im9yIiwidmFsdWVzIjpbInRpZ2VyIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZG9nIiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiZG9nIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZWxlcGhhbnQiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlbGVwaGFudCJdfX19LHsiZnJvbSI6InRpZ2VyIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZG9nIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZWxlcGhhbnQiLCJ0byI6InNlcnZlLXNpbmsiLCJjb25kaXRpb25zIjpudWxsfSx7ImZyb20iOiJhc2NpaWFydCIsInRvIjoic2VydmUtc2luayIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImVycm9yLXNpbmsiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlcnJvciJdfX19XSwibGlmZWN5Y2xlIjp7fSwid2F0ZXJtYXJrIjp7fX0=";

Expand Down Expand Up @@ -271,47 +265,49 @@ mod tests {

// Test insert_callback_requests
let cbs = vec![
CallbackRequest {
Callback {
id: id.clone(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "planner".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: Some(vec!["tiger".to_owned(), "asciiart".to_owned()]),
responses: vec![Response {
tags: Some(vec!["tiger".to_owned(), "asciiart".to_owned()]),
}],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "tiger".to_string(),
cb_time: 12345,
from_vertex: "planner".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "asciiart".to_string(),
cb_time: 12345,
from_vertex: "planner".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "serve-sink".to_string(),
cb_time: 12345,
from_vertex: "tiger".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "serve-sink".to_string(),
cb_time: 12345,
from_vertex: "asciiart".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
];
state.insert_callback_requests(cbs).await.unwrap();
Expand Down Expand Up @@ -345,12 +341,12 @@ mod tests {
let store = InMemoryStore::new();
let mut state = State::new(msg_graph, store).await.unwrap();

let cbs = vec![CallbackRequest {
let cbs = vec![Callback {
id: "nonexistent_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
}];

// Try to insert callback requests for an ID that hasn't been registered
Expand Down
12 changes: 3 additions & 9 deletions rust/serving/src/app/callback/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use crate::app::callback::CallbackRequest;
use crate::app::callback::Callback;

// in-memory store
pub(crate) mod memstore;
Expand All @@ -9,10 +9,7 @@ pub(crate) mod redisstore;

pub(crate) enum PayloadToSave {
/// Callback as sent by Numaflow to track the progression
Callback {
key: String,
value: Arc<CallbackRequest>,
},
Callback { key: String, value: Arc<Callback> },
/// Data sent by the Numaflow pipeline which is to be delivered as the response
DatumFromPipeline {
key: String,
Expand All @@ -26,10 +23,7 @@ pub(crate) enum PayloadToSave {
pub(crate) trait LocalStore {
async fn save(&mut self, messages: Vec<PayloadToSave>) -> crate::Result<()>;
/// retrieve the callback payloads
async fn retrieve_callbacks(
&mut self,
id: &str,
) -> Result<Vec<Arc<CallbackRequest>>, crate::Error>;
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<Callback>>, crate::Error>;
async fn retrieve_datum(&mut self, id: &str) -> Result<Vec<Vec<u8>>, crate::Error>;
async fn ready(&mut self) -> bool;
}
16 changes: 8 additions & 8 deletions rust/serving/src/app/callback/store/memstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use super::PayloadToSave;
use crate::app::callback::CallbackRequest;
use crate::app::callback::Callback;
use crate::consts::SAVED;
use crate::Error;

Expand Down Expand Up @@ -55,14 +55,14 @@ impl super::Store for InMemoryStore {

/// Retrieves callbacks for a given id from the `HashMap`.
/// Each callback is deserialized from bytes into a `CallbackRequest`.
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<CallbackRequest>>, Error> {
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<Callback>>, Error> {
let data = self.data.lock().unwrap();
match data.get(id) {
Some(result) => {
let messages: Result<Vec<_>, _> = result
.iter()
.map(|msg| {
let cbr: CallbackRequest = serde_json::from_slice(msg).map_err(|_| {
let cbr: Callback = serde_json::from_slice(msg).map_err(|_| {
Error::StoreRead(
"Failed to parse CallbackRequest from bytes".to_string(),
)
Expand Down Expand Up @@ -98,18 +98,18 @@ mod tests {

use super::*;
use crate::app::callback::store::{PayloadToSave, Store};
use crate::app::callback::CallbackRequest;
use crate::app::callback::{Callback, Response};

#[tokio::test]
async fn test_save_and_retrieve_callbacks() {
let mut store = InMemoryStore::new();
let key = "test_key".to_string();
let value = Arc::new(CallbackRequest {
let value = Arc::new(Callback {
id: "test_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
});

// Save a callback
Expand Down Expand Up @@ -179,12 +179,12 @@ mod tests {
#[tokio::test]
async fn test_save_invalid_callback() {
let mut store = InMemoryStore::new();
let value = Arc::new(CallbackRequest {
let value = Arc::new(Callback {
id: "test_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
});

// Try to save a callback with an invalid key
Expand Down
Loading

0 comments on commit 33b992b

Please sign in to comment.