Skip to content

Commit

Permalink
More unit tests for serving
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 3, 2025
1 parent e893964 commit a45411d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 7 deletions.
8 changes: 4 additions & 4 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub(crate) struct Message {
}

/// Offset of the message which will be used to acknowledge the message.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) enum Offset {
Int(IntOffset),
String(StringOffset),
Expand All @@ -62,7 +62,7 @@ impl Message {
}

/// IntOffset is integer based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct IntOffset {
pub(crate) offset: u64,
pub(crate) partition_idx: u16,
Expand All @@ -84,7 +84,7 @@ impl fmt::Display for IntOffset {
}

/// StringOffset is string based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct StringOffset {
/// offset could be a complex base64 string.
pub(crate) offset: Bytes,
Expand Down Expand Up @@ -120,7 +120,7 @@ pub(crate) enum ReadAck {
}

/// Message ID which is used to uniquely identify a message. It cheap to clone this.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct MessageID {
pub(crate) vertex_name: Bytes,
pub(crate) offset: Bytes,
Expand Down
53 changes: 53 additions & 0 deletions rust/numaflow-core/src/source/serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,56 @@ impl super::LagReader for ServingSource {
Ok(None)
}
}

#[cfg(test)]
mod tests {
use crate::message::{Message, MessageID, Offset, StringOffset};
use std::collections::HashMap;

use bytes::Bytes;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[test]
fn test_message_conversion() -> Result<()> {
const MSG_ID: &str = "b149ad7a-5690-4f0a";

let mut headers = HashMap::new();
headers.insert("header-key".to_owned(), "header-value".to_owned());

let serving_message = serving::Message {
value: Bytes::from_static(b"test"),
id: MSG_ID.into(),
headers: headers.clone(),
};
let message: Message = serving_message.try_into()?;
assert_eq!(message.value, Bytes::from_static(b"test"));
assert_eq!(
message.offset,
Some(Offset::String(StringOffset::new(MSG_ID.into(), 0)))
);
assert_eq!(
message.id,
MessageID {
vertex_name: Bytes::new(),
offset: format!("{MSG_ID}-0").into(),
index: 0
}
);

assert_eq!(message.headers, headers);

Ok(())
}

#[test]
fn test_error_conversion() {
use crate::error::Error;
let error: Error = serving::Error::ParseConfig("Invalid config".to_owned()).into();
if let Error::Source(val) = error {
assert_eq!(val, "ParseConfig Error - Invalid config".to_owned());
} else {
panic!("Expected Error::Source() variant");
}
}
}
4 changes: 1 addition & 3 deletions rust/serving/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ pub(crate) async fn serve<T>(
where
T: Clone + Send + Sync + Store + 'static,
{
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.expect("Failed to set crypto provider");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let (cert, key) = generate_certs()?;

let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into())
Expand Down

0 comments on commit a45411d

Please sign in to comment.