Skip to content

Commit

Permalink
Merge pull request r-dbi#237 from lentinj/listen-notify-support
Browse files Browse the repository at this point in the history
- New `postgresWaitForNotify()` adds `LISTEN/NOTIFY` support (r-dbi#237, @lentinj).
  • Loading branch information
krlmlr authored Oct 18, 2020
2 parents 12c3d3d + 259497b commit 8bcf03e
Show file tree
Hide file tree
Showing 12 changed files with 469 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ env:

#addons
addons:
postgresql: "9.4"
postgresql: "9.5"
apt:
packages:
- libpq-dev
Expand Down
5 changes: 5 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ Imports:
Rcpp (>= 0.11.4.2),
withr
Suggests:
callr,
DBItest (>= 1.7.0),
knitr,
rmarkdown,
testthat
LinkingTo:
BH,
plogr (>= 0.2.0),
Rcpp
VignetteBuilder:
knitr
Encoding: UTF-8
LazyLoad: true
Roxygen: list(markdown = TRUE)
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export(Postgres)
export(Redshift)
export(postgresDefault)
export(postgresHasDefault)
export(postgresWaitForNotify)
exportClasses(PqConnection)
exportClasses(PqDriver)
exportClasses(PqResult)
Expand Down
50 changes: 50 additions & 0 deletions R/PqConnection.R
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,56 @@ setMethod("dbDisconnect", "PqConnection", function(conn, ...) {
invisible(TRUE)
})

#' Wait for and return any notifications that return within timeout
#'
#' Once you subscribe to notifications with LISTEN, use this to wait for
#' responses on each channel.
#'
#' @export
#' @param conn a [PqConnection-class] object, produced by
#' [DBI::dbConnect()]
#' @param timeout How long to wait, in seconds. Default 1
#' @return If a notification was available, a list of:
#' \describe{
#' \item{channel}{Name of channel}
#' \item{pid}{PID of notifying server process}
#' \item{payload}{Content of notification}
#' }
#' If no notifications are available, return NULL
#' @examples
#' # For running the examples on systems without PostgreSQL connection:
#' if (postgresHasDefault()) {
#' library(DBI)
#' library(callr)
#'
#' # listen for messages on the grapevine
#' db_listen <- dbConnect(RPostgres::Postgres())
#' dbExecute(db_listen, "LISTEN grapevine")
#'
#' # Start another process, which sends a message after a delay
#' rp <- r_bg(function () {
#' library(DBI)
#' Sys.sleep(0.3)
#' db_notify <- dbConnect(RPostgres::Postgres())
#' dbExecute(db_notify, "NOTIFY grapevine, 'psst'")
#' dbDisconnect(db_notify)
#' })
#'
#' # Sleep until we get the message
#' n <- NULL
#' while (is.null(n)) {
#' n <- RPostgres::postgresWaitForNotify(db_listen, 60)
#' }
#' stopifnot(n$payload == 'psst')
#'
#' # Tidy up
#' rp$wait()
#' dbDisconnect(db_listen)
#' }
postgresWaitForNotify <- function (conn, timeout = 1) {
out <- connection_wait_for_notify(conn@ptr, timeout)
if ('pid' %in% names(out)) out else NULL
}

#' Determine database type for R vector.
#'
Expand Down
4 changes: 4 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ connection_copy_data <- function(con, sql, df) {
invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df))
}

connection_wait_for_notify <- function(con, timeout_secs) {
.Call(`_RPostgres_connection_wait_for_notify`, con, timeout_secs)
}

encode_vector <- function(x) {
.Call(`_RPostgres_encode_vector`, x)
}
Expand Down
58 changes: 58 additions & 0 deletions man/postgresWaitForNotify.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions src/DbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include "encode.h"
#include "DbResult.h"

#ifdef _WIN32
#include <winsock2.h>
#endif

DbConnection::DbConnection(std::vector<std::string> keys, std::vector<std::string> values,
bool check_interrupts) :
Expand Down Expand Up @@ -254,3 +257,43 @@ void DbConnection::cleanup_query() {
}
finish_query(pConn_);
}

List DbConnection::wait_for_notify(int timeout_secs) {
PGnotify *notify;
List out;
int socket = -1;
fd_set input;

while (TRUE) {
// See if there's a notification waiting, if so return it
if (!PQconsumeInput(pConn_)) {
stop("Failed to consume input from the server");
}
if ((notify = PQnotifies(pConn_)) != NULL) {
out = Rcpp::List::create(
_["channel"] = CharacterVector::create(notify->relname),
_["pid"] = IntegerVector::create(notify->be_pid),
_["payload"] = CharacterVector::create(notify->extra)
);
PQfreemem(notify);
return out;
}

if (socket != -1) {
// Socket open, so already been round once, give up.
return R_NilValue;
}

// Open DB socket and wait for new data for at most (timeout_secs) seconds
if ((socket = PQsocket(pConn_)) < 0) {
stop("Failed to get connection socket");
}
FD_ZERO(&input);
FD_SET(socket, &input);
timeval timeout = {0, 0};
timeout.tv_sec = timeout_secs;
if (select(socket + 1, &input, NULL, NULL, &timeout) < 0) {
stop("select() on the connection failed");
}
}
}
1 change: 1 addition & 0 deletions src/DbConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class DbConnection : boost::noncopyable {

void cleanup_query();
static void finish_query(PGconn* pConn);
List wait_for_notify(int timeout_secs);

private:
void cancel_query();
Expand Down
13 changes: 13 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ BEGIN_RCPP
return R_NilValue;
END_RCPP
}
// connection_wait_for_notify
List connection_wait_for_notify(DbConnection* con, int timeout_secs);
RcppExport SEXP _RPostgres_connection_wait_for_notify(SEXP conSEXP, SEXP timeout_secsSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< DbConnection* >::type con(conSEXP);
Rcpp::traits::input_parameter< int >::type timeout_secs(timeout_secsSEXP);
rcpp_result_gen = Rcpp::wrap(connection_wait_for_notify(con, timeout_secs));
return rcpp_result_gen;
END_RCPP
}
// encode_vector
std::string encode_vector(RObject x);
RcppExport SEXP _RPostgres_encode_vector(SEXP xSEXP) {
Expand Down Expand Up @@ -275,6 +287,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1},
{"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2},
{"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1},
{"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1},
{"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2},
Expand Down
5 changes: 5 additions & 0 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ void connection_copy_data(DbConnection* con, std::string sql, List df) {
return con->copy_data(sql, df);
}

// [[Rcpp::export]]
List connection_wait_for_notify(DbConnection* con, int timeout_secs) {
return con->wait_for_notify(timeout_secs);
}


// as() override

Expand Down
17 changes: 17 additions & 0 deletions tests/testthat/test-waitForNotify.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
context("waitForNotify")

test_that("WaitForNotify without anything to do returns NULL", {
db <- postgresDefault()
n <- RPostgres::postgresWaitForNotify(db, 1)
dbDisconnect(db)
expect_null(n)
})

test_that("WaitForNotify with a waiting message returns message", {
db <- postgresDefault()
dbExecute(db, 'LISTEN grapevine')
dbExecute(db, "NOTIFY grapevine, 'psst'")
n <- RPostgres::postgresWaitForNotify(db, 1)
dbDisconnect(db)
expect_identical(n$payload, 'psst')
})
Loading

0 comments on commit 8bcf03e

Please sign in to comment.