From 62e27030730362fd71ed9c47a78cc9855374f14b Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Wed, 13 Mar 2024 20:18:25 -0400 Subject: [PATCH] feat: add pause consumer Signed-off-by: Yordis Prieto --- nats/src/jetstream/mod.rs | 70 +++++++++++++++++++++++++++++++++++++ nats/src/jetstream/types.rs | 19 ++++++++++ nats/tests/jetstream.rs | 28 +++++++++++++++ 3 files changed, 117 insertions(+) diff --git a/nats/src/jetstream/mod.rs b/nats/src/jetstream/mod.rs index 6c6615263..4af4c7b29 100644 --- a/nats/src/jetstream/mod.rs +++ b/nats/src/jetstream/mod.rs @@ -1764,6 +1764,76 @@ impl JetStream { .map(|dr| dr.success) } + /// Pause a `JetStream` consumer until the given time. + pub fn pause_consumer(&self, stream: S, consumer: C, pause_until: DateTime) -> io::Result + where + S: AsRef, + C: AsRef, + { + let stream = stream.as_ref(); + if stream.is_empty() { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "the stream name must not be empty", + )); + } + let consumer = consumer.as_ref(); + if consumer.is_empty() { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "the consumer name must not be empty", + )); + } + + let subject = format!( + "{}CONSUMER.PAUSE.{}.{}", + self.api_prefix(), + stream, + consumer + ); + + let req = serde_json::ser::to_vec(&PauseConsumerRequest { + pause_until: Some(pause_until), + })?; + + self.js_request::(&subject, &req) + } + + /// Resume a `JetStream` consumer. + pub fn resume_consumer(&self, stream: S, consumer: C) -> io::Result + where + S: AsRef, + C: AsRef, + { + let stream = stream.as_ref(); + if stream.is_empty() { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "the stream name must not be empty", + )); + } + let consumer = consumer.as_ref(); + if consumer.is_empty() { + return Err(io::Error::new( + ErrorKind::InvalidInput, + "the consumer name must not be empty", + )); + } + + let subject = format!( + "{}CONSUMER.PAUSE.{}.{}", + self.api_prefix(), + stream, + consumer + ); + + let req = serde_json::ser::to_vec(&PauseConsumerRequest { + pause_until: None, + })?; + + self.js_request::(&subject, &req) + } + /// Query `JetStream` consumer information. pub fn consumer_info(&self, stream: S, consumer: C) -> io::Result where diff --git a/nats/src/jetstream/types.rs b/nats/src/jetstream/types.rs index 2740ec5b8..196170555 100644 --- a/nats/src/jetstream/types.rs +++ b/nats/src/jetstream/types.rs @@ -121,6 +121,18 @@ pub(crate) struct DeleteResponse { pub success: bool, } +#[derive(Deserialize)] +pub(crate) struct PauseResponse { + pub paused: bool, + pub pause_until: Option, + pub pause_remaining: Option, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct PauseConsumerRequest { + pub pause_until: Option, +} + #[derive(Debug, Default, Serialize, Deserialize)] pub(crate) struct CreateConsumerRequest { pub stream_name: String, @@ -252,6 +264,8 @@ pub struct ConsumerConfig { /// Threshold for ephemeral consumer inactivity #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")] pub inactive_threshold: Duration, + /// PauseUntil is for suspending the consumer until the deadline. + pub pause_until: Option, } pub(crate) enum ConsumerKind { @@ -680,6 +694,11 @@ pub struct ConsumerInfo { /// Indicates if any client is connected and receiving messages from a push consumer #[serde(default)] pub push_bound: bool, + /// Paused indicates whether the consumer is paused. + pub paused: bool, + /// PauseRemaining contains the amount of time left until the consumer unpauses. It will only + /// be non-zero if the consumer is currently paused. + pub pause_remaining: Option, } /// Information about the stream's, consumer's associated `JetStream` cluster diff --git a/nats/tests/jetstream.rs b/nats/tests/jetstream.rs index 89a8a8f98..f2d765fbe 100644 --- a/nats/tests/jetstream.rs +++ b/nats/tests/jetstream.rs @@ -791,3 +791,31 @@ pub fn run_basic_jetstream() -> (nats_server::Server, Connection, JetStream) { (s, nc, js) } + +#[test] +fn jetstream_create_paused_consumer() { + const CONSUMER_NAME: &str = "CONSUMER1"; + + let s = nats_server::run_server("tests/configs/jetstream.conf"); + let nc = nats::Options::new() + .error_callback(|err| println!("error!: {err}")) + .connect(s.client_url()) + .unwrap(); + let js = nats::jetstream::new(nc.clone()); + let pause_until = time::OffsetDateTime::now_utc() + Duration::from_secs(100); + + js.add_consumer( + "TEST", + ConsumerConfig { + durable_name: Some(CONSUMER_NAME.to_string()), + pause_until: Some(pause_until), + ..Default::default() + }, + ) + .unwrap(); + + + let info = js.consumer_info("TEST", CONSUMER_NAME.to_string()).unwrap(); + assert_eq!(info.paused, true); + assert_ne!(info.pause_remaining, Option::None); +}