Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Sourcer traits for serving source #2301

Merged
merged 18 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 159 additions & 53 deletions rust/Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ numaflow-core = { path = "numaflow-core" }
numaflow-models = { path = "numaflow-models" }
backoff = { path = "backoff" }
numaflow-pb = { path = "numaflow-pb" }
numaflow-pulsar = {path = "extns/numaflow-pulsar"}
numaflow-pulsar = { path = "extns/numaflow-pulsar" }
tokio = "1.41.1"
bytes = "1.7.1"
tracing = "0.1.40"
axum = "0.7.5"
axum-server = { version = "0.7.1", features = ["tls-rustls"] }
serde = { version = "1.0.204", features = ["derive"] }
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
reqwest = "0.12.12"
11 changes: 7 additions & 4 deletions rust/numaflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ serving.workspace = true
backoff.workspace = true
axum.workspace = true
axum-server.workspace = true
bytes.workspace = true
serde.workspace = true
rustls.workspace = true
tonic = "0.12.3"
bytes = "1.7.1"
thiserror = "2.0.3"
tokio-util = "0.7.11"
tokio-stream = "0.1.15"
Expand All @@ -35,8 +37,6 @@ tower = "0.4.13"
serde_json = "1.0.122"
trait-variant = "0.1.2"
rcgen = "0.13.1"
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
serde = { version = "1.0.204", features = ["derive"] }
semver = "1.0"
pep440_rs = "0.6.6"
parking_lot = "0.12.3"
Expand All @@ -50,6 +50,9 @@ async-nats = "0.38.0"
[dev-dependencies]
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "9ca9362ad511084501520e5a37d40cdcd0cdc9d9" }
pulsar = { version = "6.3.0", default-features = false, features = ["tokio-rustls-runtime"] }
pulsar = { version = "6.3.0", default-features = false, features = [
"tokio-rustls-runtime",
] }
reqwest = { workspace = true, features = ["json"] }

[build-dependencies]
16 changes: 10 additions & 6 deletions rust/numaflow-core/src/config/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod source {

use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::{fmt::Debug, time::Duration};

use bytes::Bytes;
Expand Down Expand Up @@ -37,7 +38,9 @@ pub(crate) mod source {
Generator(GeneratorConfig),
UserDefined(UserDefinedConfig),
Pulsar(PulsarSourceConfig),
Serving(serving::Settings),
// Serving source starts an Axum HTTP server in the background.
// The settings will be used as application state which gets cloned in each handler on each request.
Serving(Arc<serving::Settings>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Arc, is it for easy cloning? how come this is different from the rest? please document the reason?

}

impl From<Box<GeneratorSource>> for SourceType {
Expand Down Expand Up @@ -110,10 +113,7 @@ pub(crate) mod source {
// There should be only one option (user-defined) to define the settings.
fn try_from(cfg: Box<numaflow_models::models::ServingSource>) -> Result<Self> {
let env_vars = env::vars().collect::<HashMap<String, String>>();

let mut settings: serving::Settings = env_vars
.try_into()
.map_err(|e: serving::Error| Error::Config(e.to_string()))?;
let mut settings: serving::Settings = env_vars.try_into()?;

settings.tid_header = cfg.msg_id_header_key;

Expand Down Expand Up @@ -148,7 +148,7 @@ pub(crate) mod source {
}
settings.redis.addr = cfg.store.url;

Ok(SourceType::Serving(settings))
Ok(SourceType::Serving(Arc::new(settings)))
}
}

Expand All @@ -168,6 +168,10 @@ pub(crate) mod source {
return pulsar.try_into();
}

if let Some(serving) = source.serving.take() {
return serving.try_into();
}

Err(Error::Config(format!("Invalid source type: {source:?}")))
}
}
Expand Down
8 changes: 4 additions & 4 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub(crate) struct Message {
}

/// Offset of the message which will be used to acknowledge the message.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) enum Offset {
Int(IntOffset),
String(StringOffset),
Expand All @@ -62,7 +62,7 @@ impl Message {
}

/// IntOffset is integer based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct IntOffset {
pub(crate) offset: u64,
pub(crate) partition_idx: u16,
Expand All @@ -84,7 +84,7 @@ impl fmt::Display for IntOffset {
}

/// StringOffset is string based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct StringOffset {
/// offset could be a complex base64 string.
pub(crate) offset: Bytes,
Expand Down Expand Up @@ -120,7 +120,7 @@ pub(crate) enum ReadAck {
}

/// Message ID which is used to uniquely identify a message. It cheap to clone this.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct MessageID {
pub(crate) vertex_name: Bytes,
pub(crate) offset: Bytes,
Expand Down
2 changes: 0 additions & 2 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,6 @@ pub(crate) async fn start_metrics_https_server(
addr: SocketAddr,
metrics_state: UserDefinedContainerState,
) -> crate::Result<()> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

// Generate a self-signed certificate
let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()])
.map_err(|e| Error::Metrics(format!("Generating self-signed certificate: {}", e)))?;
Expand Down
22 changes: 20 additions & 2 deletions rust/numaflow-core/src/shared/create_components.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::sync::Arc;
use std::time::Duration;

use numaflow_pb::clients::map::map_client::MapClient;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use serving::ServingSource;
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;

use crate::config::components::sink::{SinkConfig, SinkType};
use crate::config::components::source::{SourceConfig, SourceType};
use crate::config::components::transformer::TransformerConfig;
use crate::config::get_vertex_replica;
use crate::config::pipeline::map::{MapMode, MapType, MapVtxConfig};
use crate::config::pipeline::{DEFAULT_BATCH_MAP_SOCKET, DEFAULT_STREAM_MAP_SOCKET};
use crate::error::Error;
Expand Down Expand Up @@ -334,8 +337,23 @@ pub async fn create_source(
None,
))
}
SourceType::Serving(_) => {
unimplemented!("Serving as built-in source is not yet implemented")
SourceType::Serving(config) => {
let serving = ServingSource::new(
Arc::clone(config),
batch_size,
read_timeout,
*get_vertex_replica(),
)
.await?;
Ok((
Source::new(
batch_size,
source::SourceType::Serving(serving),
tracker_handle,
source_config.read_ahead,
),
None,
))
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions rust/numaflow-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub(crate) mod generator;
/// [Pulsar]: https://numaflow.numaproj.io/user-guide/sources/pulsar/
pub(crate) mod pulsar;

pub(crate) mod serving;
use serving::ServingSource;

/// Set of Read related items that has to be implemented to become a Source.
pub(crate) trait SourceReader {
#[allow(dead_code)]
Expand Down Expand Up @@ -68,6 +71,7 @@ pub(crate) enum SourceType {
generator::GeneratorLagReader,
),
Pulsar(PulsarSource),
Serving(ServingSource),
}

enum ActorMessage {
Expand Down Expand Up @@ -182,6 +186,13 @@ impl Source {
actor.run().await;
});
}
SourceType::Serving(serving) => {
tokio::spawn(async move {
let actor =
SourceActor::new(receiver, serving.clone(), serving.clone(), serving);
actor.run().await;
});
}
};
Self {
read_batch_size: batch_size,
Expand Down
Loading
Loading