-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement Sourcer
traits for serving source
#2301
Conversation
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
please resolve conflicts? |
Signed-off-by: Sreekanth <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2301 +/- ##
==========================================
+ Coverage 67.08% 67.11% +0.02%
==========================================
Files 349 351 +2
Lines 45055 45263 +208
==========================================
+ Hits 30227 30378 +151
- Misses 13756 13818 +62
+ Partials 1072 1067 -5 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
824ae59
to
76c2371
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the test coverage too low?
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
rust/Cargo.toml
Outdated
@@ -40,8 +40,8 @@ verbose_file_reads = "warn" | |||
# This profile optimizes for runtime performance and small binary size at the expense of longer build times. | |||
# Compared to default release profile, this profile reduced binary size from 29MB to 21MB | |||
# and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max). | |||
[profile.release] | |||
lto = "fat" | |||
# [profile.release] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted. Had commented it out for faster image builds on laptop.
@@ -37,7 +38,7 @@ pub(crate) mod source { | |||
Generator(GeneratorConfig), | |||
UserDefined(UserDefinedConfig), | |||
Pulsar(PulsarSourceConfig), | |||
Serving(serving::Settings), | |||
Serving(Arc<serving::Settings>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why Arc
, is it for easy cloning? how come this is different from the rest? please document the reason?
rust/numaflow-core/src/lib.rs
Outdated
@@ -55,6 +55,7 @@ mod tracker; | |||
mod mapper; | |||
|
|||
pub async fn run() -> Result<()> { | |||
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this for? please document
SourceType::Serving(_) => { | ||
unimplemented!("Serving as built-in source is not yet implemented") | ||
SourceType::Serving(config) => { | ||
let serving = ServingSource::new(Arc::clone(config), batch_size, read_timeout).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let serving = ServingSource::new(Arc::clone(config), batch_size, read_timeout).await?; | |
let serving = ServingSource::new(config, batch_size, read_timeout).await?; |
why clone, can't we give the ownership?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The match
is on a reference, so the config
here is a &Arc<Settings>
.
let offset = Offset::String(StringOffset::new(message.id.clone(), 0)); | ||
|
||
Ok(Message { | ||
keys: Arc::from(vec![]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keys: Arc::from(vec![]), | |
// we do not support keys from HTTP client | |
keys: Arc::from(vec![]), |
} | ||
|
||
fn partitions(&self) -> Vec<u16> { | ||
vec![] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have atleast one partition?
} | ||
|
||
impl super::SourceAcker for ServingSource { | ||
async fn ack(&mut self, offsets: Vec<Offset>) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn ack(&mut self, offsets: Vec<Offset>) -> Result<()> { | |
/// HTTP response is sent only once we have confirmation that the message has been written to the ISB. | |
async fn ack(&mut self, offsets: Vec<Offset>) -> Result<()> { |
} | ||
} | ||
|
||
impl super::SourceAcker for ServingSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl super::SourceAcker for ServingSource { | |
// TODO: implement for async and sync, currently it is only for async | |
impl super::SourceAcker for ServingSource { |
}, | ||
}; | ||
|
||
proxy_state.message.send(message).await.unwrap(); // FIXME: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIXME :)
Signed-off-by: Vigith Maurice <[email protected]>
rust/serving/src/source.rs
Outdated
) -> Result<()> { | ||
let (messages_tx, messages_rx) = mpsc::channel(10000); | ||
// Create a redis store to store the callbacks and the custom responses |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10000 is way too much, should we restrict it to 2 * batch size?
rust/serving/src/source.rs
Outdated
tokio::spawn(async move { | ||
let mut serving_actor = ServingSourceActor { | ||
messages: messages_rx, | ||
handler_rx, | ||
tracker: HashMap::new(), | ||
}; | ||
serving_actor.run().await; | ||
}); | ||
let app = crate::AppState { | ||
message: messages_tx, | ||
settings, | ||
callback_state, | ||
}; | ||
tokio::spawn(async move { | ||
crate::serve(app).await.unwrap(); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure these tokio tasks are aborted during shutdown, structured concurrency :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might need more work. Created an issue for it #2309
rust/serving/src/source.rs
Outdated
} | ||
|
||
struct ServingSourceActor { | ||
messages: mpsc::Receiver<MessageWrapper>, | ||
handler_rx: mpsc::Receiver<ActorMessage>, | ||
tracker: HashMap<String, oneshot::Sender<()>>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc
rust/serving/src/source.rs
Outdated
|
||
async fn read(&mut self, count: usize, timeout_at: Instant) -> Vec<Message> { | ||
let mut messages = vec![]; | ||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Result<Vec<Message>>
to fix the FIXME :). Source reader trait returns a result.
if messages.len() >= count || Instant::now() >= timeout_at { | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document
rust/serving/src/source.rs
Outdated
let offset = offset | ||
.strip_suffix("-0") | ||
.expect("offset does not end with '-0'"); // FIXME: we hardcode 0 as the partition index when constructing offset | ||
let confirm_save_tx = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use vertex replica as the partition index incase of source.
rust/serving/src/source.rs
Outdated
let (actor_tx, actor_rx) = mpsc::channel(1000); | ||
ServingSourceActor::start(settings, actor_rx).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make the channel size configurable, it can be 2 * batch size.
rust/serving/src/source.rs
Outdated
timeout_at: Instant, | ||
reply_to: oneshot::Sender<Vec<Message>>, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make oneshot sender return Result
, to handle critical errors?
rust/serving/src/source.rs
Outdated
}, | ||
Ack { | ||
offsets: Vec<String>, | ||
reply_to: oneshot::Sender<()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, make oneshot sender return Result?
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
rust/serving/src/source.rs
Outdated
// Stop if the read timeout has reached or if we have collected the requested number of messages | ||
if messages.len() >= count || Instant::now() >= timeout_at { | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use ReceiverStream
instead of mpsc
channel to store the messages? That way you can chunk the stream using chunks_timeout()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't use ReceiverStream
here since chunks_timeout
consumes self
. This results in:
cannot move out of `self.messages` which is behind a mutable reference
ReceiverStream.chunks_timeout()
would have worked if it was function that accepts the ReceiverStream
and uses it in a loop{}
to consume it. In this case, the Receiver
end is a struct member.
Signed-off-by: Sreekanth <[email protected]>
// The error will happen on all the subsequent read attempts too. | ||
if messages.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to send an error when we don't have any messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the code reaches here, all the Sender
s are already dropped. Any subsequent read requests also will fail.
Unless there is some bug, this code path should never reach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Signed-off-by: Sreekanth <[email protected]> Signed-off-by: Vigith Maurice <[email protected]> Co-authored-by: Vigith Maurice <[email protected]>
closes: #2281