Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Sourcer traits for serving source #2301

Merged
merged 18 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 159 additions & 53 deletions rust/Cargo.lock

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ verbose_file_reads = "warn"
# This profile optimizes for runtime performance and small binary size at the expense of longer build times.
# Compared to default release profile, this profile reduced binary size from 29MB to 21MB
# and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max).
[profile.release]
lto = "fat"
# [profile.release]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted. Had commented it out for faster image builds on laptop.

# lto = "fat"

# This profile optimizes for short build times at the expense of larger binary size and slower runtime performance.
# If you have to rebuild image often, in Dockerfile you may replace `--release` passed to cargo command with `--profile quick-release`
Expand All @@ -58,8 +58,12 @@ numaflow-core = { path = "numaflow-core" }
numaflow-models = { path = "numaflow-models" }
backoff = { path = "backoff" }
numaflow-pb = { path = "numaflow-pb" }
numaflow-pulsar = {path = "extns/numaflow-pulsar"}
numaflow-pulsar = { path = "extns/numaflow-pulsar" }
tokio = "1.41.1"
bytes = "1.7.1"
tracing = "0.1.40"
axum = "0.7.5"
axum-server = { version = "0.7.1", features = ["tls-rustls"] }
serde = { version = "1.0.204", features = ["derive"] }
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
reqwest = "0.12.12"
11 changes: 7 additions & 4 deletions rust/numaflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ serving.workspace = true
backoff.workspace = true
axum.workspace = true
axum-server.workspace = true
bytes.workspace = true
serde.workspace = true
rustls.workspace = true
tonic = "0.12.3"
bytes = "1.7.1"
thiserror = "2.0.3"
tokio-util = "0.7.11"
tokio-stream = "0.1.15"
Expand All @@ -35,8 +37,6 @@ tower = "0.4.13"
serde_json = "1.0.122"
trait-variant = "0.1.2"
rcgen = "0.13.1"
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
serde = { version = "1.0.204", features = ["derive"] }
semver = "1.0"
pep440_rs = "0.6.6"
parking_lot = "0.12.3"
Expand All @@ -50,6 +50,9 @@ async-nats = "0.38.0"
[dev-dependencies]
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "9ca9362ad511084501520e5a37d40cdcd0cdc9d9" }
pulsar = { version = "6.3.0", default-features = false, features = ["tokio-rustls-runtime"] }
pulsar = { version = "6.3.0", default-features = false, features = [
"tokio-rustls-runtime",
] }
reqwest = { workspace = true, features = ["json"] }

[build-dependencies]
14 changes: 8 additions & 6 deletions rust/numaflow-core/src/config/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod source {

use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::{fmt::Debug, time::Duration};

use bytes::Bytes;
Expand Down Expand Up @@ -37,7 +38,7 @@ pub(crate) mod source {
Generator(GeneratorConfig),
UserDefined(UserDefinedConfig),
Pulsar(PulsarSourceConfig),
Serving(serving::Settings),
Serving(Arc<serving::Settings>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Arc, is it for easy cloning? how come this is different from the rest? please document the reason?

}

impl From<Box<GeneratorSource>> for SourceType {
Expand Down Expand Up @@ -110,10 +111,7 @@ pub(crate) mod source {
// There should be only one option (user-defined) to define the settings.
fn try_from(cfg: Box<numaflow_models::models::ServingSource>) -> Result<Self> {
let env_vars = env::vars().collect::<HashMap<String, String>>();

let mut settings: serving::Settings = env_vars
.try_into()
.map_err(|e: serving::Error| Error::Config(e.to_string()))?;
let mut settings: serving::Settings = env_vars.try_into()?;

settings.tid_header = cfg.msg_id_header_key;

Expand Down Expand Up @@ -148,7 +146,7 @@ pub(crate) mod source {
}
settings.redis.addr = cfg.store.url;

Ok(SourceType::Serving(settings))
Ok(SourceType::Serving(Arc::new(settings)))
}
}

Expand All @@ -168,6 +166,10 @@ pub(crate) mod source {
return pulsar.try_into();
}

if let Some(serving) = source.serving.take() {
return serving.try_into();
}

Err(Error::Config(format!("Invalid source type: {source:?}")))
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/numaflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ mod tracker;
mod mapper;

pub async fn run() -> Result<()> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for? please document

let cln_token = CancellationToken::new();
let shutdown_cln_token = cln_token.clone();

Expand Down
8 changes: 4 additions & 4 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub(crate) struct Message {
}

/// Offset of the message which will be used to acknowledge the message.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) enum Offset {
Int(IntOffset),
String(StringOffset),
Expand All @@ -62,7 +62,7 @@ impl Message {
}

/// IntOffset is integer based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct IntOffset {
pub(crate) offset: u64,
pub(crate) partition_idx: u16,
Expand All @@ -84,7 +84,7 @@ impl fmt::Display for IntOffset {
}

/// StringOffset is string based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct StringOffset {
/// offset could be a complex base64 string.
pub(crate) offset: Bytes,
Expand Down Expand Up @@ -120,7 +120,7 @@ pub(crate) enum ReadAck {
}

/// Message ID which is used to uniquely identify a message. It cheap to clone this.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct MessageID {
pub(crate) vertex_name: Bytes,
pub(crate) offset: Bytes,
Expand Down
2 changes: 0 additions & 2 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,6 @@ pub(crate) async fn start_metrics_https_server(
addr: SocketAddr,
metrics_state: UserDefinedContainerState,
) -> crate::Result<()> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

// Generate a self-signed certificate
let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()])
.map_err(|e| Error::Metrics(format!("Generating self-signed certificate: {}", e)))?;
Expand Down
15 changes: 13 additions & 2 deletions rust/numaflow-core/src/shared/create_components.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;
use std::time::Duration;

use numaflow_pb::clients::map::map_client::MapClient;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use serving::ServingSource;
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;

Expand Down Expand Up @@ -334,8 +336,17 @@ pub async fn create_source(
None,
))
}
SourceType::Serving(_) => {
unimplemented!("Serving as built-in source is not yet implemented")
SourceType::Serving(config) => {
let serving = ServingSource::new(Arc::clone(config), batch_size, read_timeout).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let serving = ServingSource::new(Arc::clone(config), batch_size, read_timeout).await?;
let serving = ServingSource::new(config, batch_size, read_timeout).await?;

why clone, can't we give the ownership?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The match is on a reference, so the config here is a &Arc<Settings>.

Ok((
Source::new(
batch_size,
source::SourceType::Serving(serving),
tracker_handle,
source_config.read_ahead,
),
None,
))
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions rust/numaflow-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub(crate) mod generator;
/// [Pulsar]: https://numaflow.numaproj.io/user-guide/sources/pulsar/
pub(crate) mod pulsar;

pub(crate) mod serving;
use serving::ServingSource;

/// Set of Read related items that has to be implemented to become a Source.
pub(crate) trait SourceReader {
#[allow(dead_code)]
Expand Down Expand Up @@ -68,6 +71,7 @@ pub(crate) enum SourceType {
generator::GeneratorLagReader,
),
Pulsar(PulsarSource),
Serving(ServingSource),
}

enum ActorMessage {
Expand Down Expand Up @@ -182,6 +186,13 @@ impl Source {
actor.run().await;
});
}
SourceType::Serving(serving) => {
tokio::spawn(async move {
let actor =
SourceActor::new(receiver, serving.clone(), serving.clone(), serving);
actor.run().await;
});
}
};
Self {
read_batch_size: batch_size,
Expand Down
Loading
Loading