Skip to content

Commit

Permalink
doc: comments
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith committed Jan 15, 2025
1 parent 8728f09 commit 9431405
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
7 changes: 7 additions & 0 deletions rust/serving/src/app/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@ use state::State as CallbackState;
/// store for storing the state
pub(crate) mod store;

/// As message passes through each component (map, transformer, sink, etc.). it emits a beacon via callback
/// to inform that message has been processed by this component.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) struct Callback {
pub(crate) id: String,
pub(crate) vertex: String,
pub(crate) cb_time: u64,
pub(crate) from_vertex: String,
/// Due to flat-map operation, we can have 0 or more responses.
// TODO: Arc<[T]>
pub(crate) responses: Vec<Response>,
}

/// It contains details about the `To` vertex via tags (conditional forwarding).
#[derive(Debug, Serialize, Deserialize, Clone)]

Check warning on line 29 in rust/serving/src/app/callback.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

Diff in /home/runner/work/numaflow/numaflow/rust/serving/src/app/callback.rs
pub(crate) struct Response {
/// If tags is None, the message is forwarded to all vertices, if len(Vec) == 0, it means that
/// the message has been dropped.
pub(crate) tags: Option<Vec<String>>,
}

Expand Down
33 changes: 21 additions & 12 deletions rust/serving/src/app/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ fn compare_slice(operator: &OperatorType, a: &[String], b: &[String]) -> bool {
}
}

/// hash map of vertex and its edges. The key of the HashMap and `Edge.from` are same, `Edge.to` will
/// help find the adjacent vertex.
type Graph = HashMap<String, Vec<Edge>>;

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -47,10 +49,12 @@ struct CallbackRequestWrapper {
}

impl MessageGraph {
/// This function generates a sub graph from a list of callbacks.
/// Generates a sub graph from a list of [Callback].
/// It first creates a HashMap to map each vertex to its corresponding callbacks.
/// Then it finds the source vertex by checking if the vertex and from_vertex fields are the same.
/// Finally, it calls the `generate_subgraph` function to generate the subgraph from the source vertex.
/// NOTE: it finds checks whether the callback is from the originating source vertex by checking
/// if the vertex and from_vertex fields are the same.
/// Finally, it calls the [Self::generate_subgraph] function to generate the subgraph from the source
/// vertex.
pub(crate) fn generate_subgraph_from_callbacks(
&self,
id: String,
Expand Down Expand Up @@ -113,10 +117,13 @@ impl MessageGraph {
}
}

// generate_subgraph function is a recursive function that generates the sub graph from the source vertex for
// the given list of callbacks. The function returns true if the subgraph is generated successfully(if we are
// able to find a subgraph for the message using the given callbacks), it
// updates the subgraph with the path from the source vertex to the downstream vertices.
/// generate_subgraph function is a recursive function that generates the sub graph from the source
/// vertex for the given list of callbacks. The function returns true if the subgraph is
/// generated successfully (if we are able to find a subgraph for the message using the given
/// callbacks), it updates the subgraph with the path from the source vertex to the downstream
/// vertices.
/// It uses the pipeline DAG [Graph] and the [Callback]'s HashMap to check whether sub-graph is
/// complete.
fn generate_subgraph(
&self,
current: String,
Expand Down Expand Up @@ -167,7 +174,7 @@ impl MessageGraph {
// more than one response, so we need to make sure all the responses are processed.
// For example a -> b -> c, lets say vertex a has 2 responses. We will have to recursively
// find the subgraph for both the responses.
for response in current_callback.responses.clone() {
for response in current_callback.responses.iter() {
// recursively invoke the downstream vertices of the current vertex, if any
if let Some(edges) = self.dag.get(&current) {
for edge in edges {
Expand All @@ -176,7 +183,7 @@ impl MessageGraph {
// if there are conditions, we should check the tags of the current callback
// with the tags of the edge and the operator of the tags to decide if we should
// proceed with the edge
let should_proceed = edge
let will_continue_the_path = edge
.conditions
.as_ref()
// If the edge has conditions, get the tags
Expand Down Expand Up @@ -204,12 +211,14 @@ impl MessageGraph {
)
});

// if the conditions are not met, then proceed to the next edge
if !should_proceed {
// if the conditions are not met, then proceed to the next edge because conditions
// for forwarding the message did not match the conditional forwarding requirements.
if !will_continue_the_path {
// let's move on to the next edge
continue;
}

// proceed to the downstream vertex
// recursively proceed to the next downstream vertex
// if any of the downstream vertex returns false, then we should return false.
if !self.generate_subgraph(
edge.to.clone(),
Expand Down

0 comments on commit 9431405

Please sign in to comment.