Skip to content

Commit

Permalink
Add PgListener::next_buffered(), to support batch processing of notif…
Browse files Browse the repository at this point in the history
…ications (#3560)

* Implement and test PgListener::try_recv_buffered().

* rustfmt

* Fix warnings.

* Fix test.

* Rename try_recv_buffered() -> next_buffered().
  • Loading branch information
chanks authored Nov 27, 2024
1 parent 503a72c commit 4f10962
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 2 deletions.
17 changes: 15 additions & 2 deletions sqlx-postgres/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ impl PgListener {
pub async fn try_recv(&mut self) -> Result<Option<PgNotification>, Error> {
// Flush the buffer first, if anything
// This would only fill up if this listener is used as a connection
if let Ok(Some(notification)) = self.buffer_rx.try_next() {
return Ok(Some(PgNotification(notification)));
if let Some(notification) = self.next_buffered() {
return Ok(Some(notification));
}

// Fetch our `CloseEvent` listener, if applicable.
Expand Down Expand Up @@ -319,6 +319,19 @@ impl PgListener {
}
}

/// Receives the next notification that already exists in the connection buffer, if any.
///
/// This is similar to `try_recv`, except it will not wait if the connection has not yet received a notification.
///
/// This is helpful if you want to retrieve all buffered notifications and process them in batches.
pub fn next_buffered(&mut self) -> Option<PgNotification> {
if let Ok(Some(notification)) = self.buffer_rx.try_next() {
Some(PgNotification(notification))
} else {
None
}
}

/// Consume this listener, returning a `Stream` of notifications.
///
/// The backing connection will be automatically reconnected should it be lost.
Expand Down
69 changes: 69 additions & 0 deletions tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,75 @@ async fn test_listener_cleanup() -> anyhow::Result<()> {
Ok(())
}

#[sqlx_macros::test]
async fn test_listener_try_recv_buffered() -> anyhow::Result<()> {
use sqlx_core::rt::timeout;

use sqlx::pool::PoolOptions;
use sqlx::postgres::PgListener;

// Create a connection on which to send notifications
let mut notify_conn = new::<Postgres>().await?;

let pool = PoolOptions::<Postgres>::new()
.min_connections(1)
.max_connections(1)
.test_before_acquire(true)
.connect(&env::var("DATABASE_URL")?)
.await?;

let mut listener = PgListener::connect_with(&pool).await?;
listener.listen("test_channel2").await?;

// Checks for a notification on the test channel
async fn try_recv(listener: &mut PgListener) -> anyhow::Result<bool> {
match timeout(Duration::from_millis(100), listener.recv()).await {
Ok(res) => {
res?;
Ok(true)
}
Err(_) => Ok(false),
}
}

// Check no notification is buffered, since we haven't sent one.
assert!(listener.next_buffered().is_none());

// Send five notifications transactionally, so they all arrive at once.
{
let mut txn = notify_conn.begin().await?;
for i in 0..5 {
txn.execute(format!("NOTIFY test_channel2, 'payload {i}'").as_str())
.await?;
}
txn.commit().await?;
}

// Still no notifications buffered, since we haven't awaited the listener yet.
assert!(listener.next_buffered().is_none());

// Activate connection.
sqlx::query!("SELECT 1 AS one")
.fetch_all(&mut listener)
.await?;

// The next five notifications should now be buffered.
for i in 0..5 {
assert!(
listener.next_buffered().is_some(),
"Notification {i} was not buffered"
);
}

// Should be no more.
assert!(listener.next_buffered().is_none());

// Even if we wait.
assert!(!try_recv(&mut listener).await?, "Notification received");

Ok(())
}

#[sqlx_macros::test]
async fn test_pg_listener_allows_pool_to_close() -> anyhow::Result<()> {
let pool = pool::<Postgres>().await?;
Expand Down

0 comments on commit 4f10962

Please sign in to comment.