diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index b3711842f..3e4cd9d06 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -444,57 +444,47 @@ impl<'a> futures::Stream for Sequence<'a> { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - match self.next.as_mut() { - None => { - let context = self.context.clone(); - let subject = self.subject.clone(); - let request = self.request.clone(); - let pending_messages = self.pending_messages; - - let next = self.next.insert(Box::pin(async move { - let inbox = context.client.new_inbox(); - let subscriber = context - .client - .subscribe(inbox.clone()) - .await - .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; + let this = self.as_mut().get_mut(); + + let next = this.next.get_or_insert_with(|| { + let context = this.context.clone(); + let subject = this.subject.clone(); + let request = this.request.clone(); + let pending_messages = this.pending_messages; + let inbox = context.client.new_inbox(); + + Box::pin(async move { + let subscriber = context + .client + .subscribe(inbox.clone()) + .await + .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; + + context + .client + .publish_with_reply(subject, inbox, request) + .await + .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; + + // TODO(tp): Add timeout config and defaults. + Ok(Batch { + pending_messages, + subscriber, + context, + terminated: false, + timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))), + }) + }) + }); - context - .client - .publish_with_reply(subject, inbox, request) - .await - .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; - - // TODO(tp): Add timeout config and defaults. - Ok(Batch { - pending_messages, - subscriber, - context, - terminated: false, - timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))), - }) - })); - - match next.as_mut().poll(cx) { - Poll::Ready(result) => { - self.next = None; - Poll::Ready(Some(result.map_err(|err| { - MessagesError::with_source(MessagesErrorKind::Pull, err) - }))) - } - Poll::Pending => Poll::Pending, - } + match next.poll_unpin(cx) { + Poll::Ready(result) => { + this.next = None; + Poll::Ready(Some(result.map_err(|err| { + MessagesError::with_source(MessagesErrorKind::Pull, err) + }))) } - - Some(next) => match next.as_mut().poll(cx) { - Poll::Ready(result) => { - self.next = None; - Poll::Ready(Some(result.map_err(|err| { - MessagesError::with_source(MessagesErrorKind::Pull, err) - }))) - } - Poll::Pending => Poll::Pending, - }, + Poll::Pending => Poll::Pending, } } } @@ -751,34 +741,32 @@ impl<'a> futures::Stream for Ordered<'a> { // Poll messages if let Some(stream) = self.stream.as_mut() { match stream.poll_next_unpin(cx) { - Poll::Ready(message) => match message { - Some(message) => { - // Do we bail out on all errors? - // Or we want to handle some? (like consumer deleted?) - let message = message?; - let info = message.info().map_err(|err| { - OrderedError::with_source(OrderedErrorKind::Other, err) - })?; - trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}", + Poll::Ready(Some(message)) => { + // Do we bail out on all errors? + // Or we want to handle some? (like consumer deleted?) + let message = message?; + let info = message + .info() + .map_err(|err| OrderedError::with_source(OrderedErrorKind::Other, err))?; + trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}", self.consumer_sequence, self.stream_sequence, info.consumer_sequence, info.stream_sequence); - if info.consumer_sequence != self.consumer_sequence + 1 { - debug!( - "ordered consumer mismatch. current {}, info: {}", - self.consumer_sequence, info.consumer_sequence - ); - recreate = true; - self.consumer_sequence = 0; - } else { - self.stream_sequence = info.stream_sequence; - self.consumer_sequence = info.consumer_sequence; - return Poll::Ready(Some(Ok(message))); - } + if info.consumer_sequence != self.consumer_sequence + 1 { + debug!( + "ordered consumer mismatch. current {}, info: {}", + self.consumer_sequence, info.consumer_sequence + ); + recreate = true; + self.consumer_sequence = 0; + } else { + self.stream_sequence = info.stream_sequence; + self.consumer_sequence = info.consumer_sequence; + return Poll::Ready(Some(Ok(message))); } - None => return Poll::Ready(None), - }, + } + Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => (), } } @@ -792,7 +780,7 @@ impl<'a> futures::Stream for Ordered<'a> { let consumer_name = self.consumer_name.clone(); let sequence = self.consumer_sequence; async move { - recreate_consumer_stream(context, config, stream_name, consumer_name, sequence) + recreate_consumer_stream(context, config, stream_name, &consumer_name, sequence) .await } })) @@ -800,19 +788,17 @@ impl<'a> futures::Stream for Ordered<'a> { // check for recreation future if let Some(result) = self.create_stream.as_mut() { match result.poll_unpin(cx) { - Poll::Ready(result) => match result { - Ok(stream) => { - self.create_stream = None; - self.stream = Some(stream); - return self.poll_next(cx); - } - Err(err) => { - return Poll::Ready(Some(Err(OrderedError::with_source( - OrderedErrorKind::Recreate, - err, - )))) - } - }, + Poll::Ready(Ok(stream)) => { + self.create_stream = None; + self.stream = Some(stream); + return self.poll_next(cx); + } + Poll::Ready(Err(err)) => { + return Poll::Ready(Some(Err(OrderedError::with_source( + OrderedErrorKind::Recreate, + err, + )))) + } Poll::Pending => (), } } @@ -909,7 +895,7 @@ impl Stream { debug!("expired pull request")}, } - let request = serde_json::to_vec(&batch).map(Bytes::from).unwrap(); + let request = Bytes::from(serde_json::to_vec(&batch).unwrap()); let result = context .client .publish_with_reply(subject.clone(), inbox.clone(), request.clone()) @@ -1048,20 +1034,17 @@ impl futures::Stream for Stream { if !self.batch_config.idle_heartbeat.is_zero() { trace!("setting hearbeats"); let timeout = self.batch_config.idle_heartbeat.saturating_mul(2); - self.heartbeat_timeout + let heartbeat_timeout = self + .heartbeat_timeout .get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout))); trace!("checking idle hearbeats"); - if let Some(hearbeat) = self.heartbeat_timeout.as_mut() { - match hearbeat.poll_unpin(cx) { - Poll::Ready(_) => { - self.heartbeat_timeout = None; - return Poll::Ready(Some(Err(MessagesError::new( - MessagesErrorKind::MissingHeartbeat, - )))); - } - Poll::Pending => (), - } + if heartbeat_timeout.poll_unpin(cx).is_ready() { + self.heartbeat_timeout = None; + + return Poll::Ready(Some(Err(MessagesError::new( + MessagesErrorKind::MissingHeartbeat, + )))); } } @@ -1078,30 +1061,26 @@ impl futures::Stream for Stream { } match self.request_result_rx.poll_recv(cx) { - Poll::Ready(resp) => match resp { - Some(resp) => match resp { - Ok(reset) => { - trace!("request response: {:?}", reset); - debug!("request sent, setting pending messages"); - if reset { - self.pending_messages = self.batch_config.batch; - self.pending_bytes = self.batch_config.max_bytes; - } else { - self.pending_messages += self.batch_config.batch; - self.pending_bytes += self.batch_config.max_bytes; - } - self.pending_request = false; - continue; - } - Err(err) => { - return Poll::Ready(Some(Err(MessagesError::with_source( - MessagesErrorKind::Pull, - err, - )))) - } - }, - None => return Poll::Ready(None), - }, + Poll::Ready(Some(Ok(reset))) => { + trace!("request response: {:?}", reset); + debug!("request sent, setting pending messages"); + if reset { + self.pending_messages = self.batch_config.batch; + self.pending_bytes = self.batch_config.max_bytes; + } else { + self.pending_messages += self.batch_config.batch; + self.pending_bytes += self.batch_config.max_bytes; + } + self.pending_request = false; + continue; + } + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(Some(Err(MessagesError::with_source( + MessagesErrorKind::Pull, + err, + )))) + } + Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => { trace!("pending result"); } @@ -1111,6 +1090,7 @@ impl futures::Stream for Stream { match self.subscriber.receiver.poll_recv(cx) { Poll::Ready(maybe_message) => { self.heartbeat_timeout = None; + match maybe_message { Some(message) => match message.status.unwrap_or(StatusCode::OK) { StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => { @@ -2260,7 +2240,7 @@ async fn recreate_consumer_stream( context: Context, config: Config, stream_name: String, - consumer_name: String, + consumer_name: &str, sequence: u64, ) -> Result { // TODO(jarema): retry whole operation few times? @@ -2270,20 +2250,15 @@ async fn recreate_consumer_stream( .map_err(|err| { ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err) })?; - stream - .delete_consumer(&consumer_name) - .await - .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err) - })?; - - let deliver_policy = { - if sequence == 0 { - DeliverPolicy::All - } else { - DeliverPolicy::ByStartSequence { - start_sequence: sequence + 1, - } + stream.delete_consumer(consumer_name).await.map_err(|err| { + ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err) + })?; + + let deliver_policy = if sequence == 0 { + DeliverPolicy::All + } else { + DeliverPolicy::ByStartSequence { + start_sequence: sequence + 1, } }; tokio::time::timeout( diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index e4278f685..19edd8190 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -22,7 +22,7 @@ use crate::{ }; use bytes::Bytes; -use futures::future::BoxFuture; +use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; #[cfg(feature = "server_2_10")] use std::collections::HashMap; @@ -384,15 +384,16 @@ impl FromConsumer for OrderedConfig { where Self: Sized, { - if config.deliver_subject.is_none() { - return Err(Box::new(io::Error::new( + let deliver_subject = config.deliver_subject.ok_or_else(|| { + Box::new(io::Error::new( ErrorKind::Other, "push consumer must have delivery subject", - ))); - } + )) + })?; + Ok(OrderedConfig { name: config.name, - deliver_subject: config.deliver_subject.unwrap(), + deliver_subject, description: config.description, filter_subject: config.filter_subject, #[cfg(feature = "server_2_10")] @@ -558,159 +559,135 @@ impl<'a> futures::Stream for Ordered<'a> { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - loop { - match self.shutdown.try_recv() { - Ok(err) => { - return Poll::Ready(Some(Err(OrderedError::with_source( - OrderedErrorKind::Other, - err, - )))) - } - Err(TryRecvError::Closed) => { - return Poll::Ready(Some(Err(OrderedError::with_source( - OrderedErrorKind::Other, - "consumer task closed", - )))) - } - Err(TryRecvError::Empty) => {} + match self.shutdown.try_recv() { + Ok(err) => { + return Poll::Ready(Some(Err(OrderedError::with_source( + OrderedErrorKind::Other, + err, + )))) } - if self.subscriber.is_none() { - match self.subscriber_future.as_mut() { + Err(TryRecvError::Closed) => { + return Poll::Ready(Some(Err(OrderedError::with_source( + OrderedErrorKind::Other, + "consumer task closed", + )))) + } + Err(TryRecvError::Empty) => {} + } + + loop { + let subscriber = match &mut self.subscriber { + Some(subscriber) => subscriber, + None => match &mut self.subscriber_future { + Some(subscriber_future) => match subscriber_future.poll_unpin(cx) { + Poll::Ready(subscriber) => { + self.subscriber_future = None; + self.consumer_sequence.store(0, Ordering::Relaxed); + self.subscriber.insert(subscriber.map_err(|err| { + OrderedError::with_source(OrderedErrorKind::Recreate, err) + })?) + } + Poll::Pending => return Poll::Pending, + }, None => { - trace!( - "subscriber and subscriber future are None. Recreating the consumer" - ); let context = self.context.clone(); let sequence = self.stream_sequence.clone(); let config = self.consumer.config.clone(); let stream_name = self.consumer.info.stream_name.clone(); - let subscriber_future = - self.subscriber_future.insert(Box::pin(async move { - recreate_consumer_and_subscription( - context, - config, - stream_name, - sequence.load(Ordering::Relaxed), - ) - .await - })); - match subscriber_future.as_mut().poll(cx) { - Poll::Ready(subscriber) => { - self.subscriber_future = None; - self.consumer_sequence.store(0, Ordering::Relaxed); - self.subscriber = Some(subscriber.map_err(|err| { - OrderedError::with_source(OrderedErrorKind::Recreate, err) - })?); + + self.subscriber_future = Some(Box::pin(async move { + recreate_consumer_and_subscription( + context, + config, + stream_name, + sequence.load(Ordering::Relaxed), + ) + .await + })); + continue; + } + }, + }; + + match subscriber.receiver.poll_recv(cx) { + Poll::Ready(Some(message)) => { + *self.last_seen.lock().unwrap() = Instant::now(); + + match message.status { + Some(StatusCode::IDLE_HEARTBEAT) => { + debug!("received idle heartbeats"); + if let Some(headers) = message.headers.as_ref() { + if let Some(sequence) = + headers.get(crate::header::NATS_LAST_CONSUMER) + { + let sequence: u64 = + sequence.iter().next().unwrap().parse().map_err(|err| { + OrderedError::with_source(OrderedErrorKind::Other, err) + })?; + + let last_sequence = + self.consumer_sequence.load(Ordering::Relaxed); + + if sequence != last_sequence { + debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence); + self.subscriber = None; + } + } } - Poll::Pending => { - return Poll::Pending; + // flow control. + if let Some(subject) = message.reply.clone() { + trace!("received flow control message"); + let client = self.context.client.clone(); + tokio::task::spawn(async move { + client.publish(subject, Bytes::from_static(b"")).await.ok(); + client.flush().await.ok(); + }); } + continue; } - } - Some(subscriber) => match subscriber.as_mut().poll(cx) { - Poll::Ready(subscriber) => { - self.subscriber_future = None; - self.consumer_sequence.store(0, Ordering::Relaxed); - self.subscriber = Some(subscriber.map_err(|err| { - OrderedError::with_source(OrderedErrorKind::Recreate, err) - })?); - } - Poll::Pending => { - return Poll::Pending; + Some(status) => { + debug!("received status message: {}", status); + continue; } - }, - } - } - if let Some(subscriber) = self.subscriber.as_mut() { - match subscriber.receiver.poll_recv(cx) { - Poll::Ready(maybe_message) => match maybe_message { - Some(message) => { - *self.last_seen.lock().unwrap() = Instant::now(); - match message.status { - Some(StatusCode::IDLE_HEARTBEAT) => { - debug!("received idle heartbeats"); - if let Some(headers) = message.headers.as_ref() { - if let Some(sequence) = - headers.get(crate::header::NATS_LAST_CONSUMER) - { - let sequence: u64 = - sequence.iter().next().unwrap().parse().map_err( - |err| { - OrderedError::with_source( - OrderedErrorKind::Other, - err, - ) - }, - )?; - - let last_sequence = - self.consumer_sequence.load(Ordering::Relaxed); - - if sequence != last_sequence { - debug!("hearbeats sequence mismatch. got {}, expected {}, resetting consumer", sequence, last_sequence); - self.subscriber = None; - } - } - } - // flow control. - if let Some(subject) = message.reply.clone() { - trace!("received flow control message"); - let client = self.context.client.clone(); - tokio::task::spawn(async move { - client - .publish(subject, Bytes::from_static(b"")) - .await - .ok(); - client.flush().await.ok(); - }); - } - continue; - } - Some(status) => { - debug!("received status message: {}", status); - continue; - } - None => { - trace!("received a message"); - let jetstream_message = jetstream::message::Message { - message, - context: self.context.clone(), - }; - - let info = jetstream_message.info().map_err(|err| { - OrderedError::with_source(OrderedErrorKind::Other, err) - })?; - trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}", + None => { + trace!("received a message"); + let jetstream_message = jetstream::message::Message { + message, + context: self.context.clone(), + }; + + let info = jetstream_message.info().map_err(|err| { + OrderedError::with_source(OrderedErrorKind::Other, err) + })?; + trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}", self.consumer_sequence, self.stream_sequence, info.consumer_sequence, info.stream_sequence); - if info.consumer_sequence - != self.consumer_sequence.load(Ordering::Relaxed) + 1 - { - debug!( - "ordered consumer mismatch. current {}, info: {}", - self.consumer_sequence.load(Ordering::Relaxed), - info.consumer_sequence - ); - self.subscriber = None; - self.consumer_sequence.store(0, Ordering::Relaxed); - continue; - } - self.stream_sequence - .store(info.stream_sequence, Ordering::Relaxed); - self.consumer_sequence - .store(info.consumer_sequence, Ordering::Relaxed); - return Poll::Ready(Some(Ok(jetstream_message))); - } + if info.consumer_sequence + != self.consumer_sequence.load(Ordering::Relaxed) + 1 + { + debug!( + "ordered consumer mismatch. current {}, info: {}", + self.consumer_sequence.load(Ordering::Relaxed), + info.consumer_sequence + ); + self.subscriber = None; + self.consumer_sequence.store(0, Ordering::Relaxed); + continue; } + self.stream_sequence + .store(info.stream_sequence, Ordering::Relaxed); + self.consumer_sequence + .store(info.consumer_sequence, Ordering::Relaxed); + return Poll::Ready(Some(Ok(jetstream_message))); } - None => { - return Poll::Ready(None); - } - }, - Poll::Pending => return Poll::Pending, + } } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => return Poll::Pending, } } }