From 4b138b1abb1a5e5699502edec79bef34bfe45eef Mon Sep 17 00:00:00 2001 From: Jamie Lentin Date: Thu, 5 Dec 2019 17:50:08 +0000 Subject: [PATCH 01/11] PqConnection: Add pqWaitForNotify To be able to use LISTEN/NOTIFY from R, you need to be able to wait for incoming asynchronous messages. --- NAMESPACE | 1 + R/PqConnection.R | 31 +++++++++++++++++++++++++++++++ R/RcppExports.R | 4 ++++ man/pqWaitForNotify.Rd | 39 +++++++++++++++++++++++++++++++++++++++ src/DbConnection.cpp | 40 ++++++++++++++++++++++++++++++++++++++++ src/DbConnection.h | 1 + src/RcppExports.cpp | 13 +++++++++++++ src/connection.cpp | 5 +++++ 8 files changed, 134 insertions(+) create mode 100644 man/pqWaitForNotify.Rd diff --git a/NAMESPACE b/NAMESPACE index de9ab7f4..8c2aade5 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -5,6 +5,7 @@ export(Id) export(Postgres) export(postgresDefault) export(postgresHasDefault) +export(pqWaitForNotify) exportClasses(PqConnection) exportClasses(PqDriver) exportClasses(PqResult) diff --git a/R/PqConnection.R b/R/PqConnection.R index 4707d887..b9815a16 100644 --- a/R/PqConnection.R +++ b/R/PqConnection.R @@ -193,6 +193,37 @@ setMethod("dbDisconnect", "PqConnection", function(conn, ...) { invisible(TRUE) }) +#' Wait for and return any notifications that return within timeout +#' +#' Once you subscribe to notifications with LISTEN, use this to wait for +#' responses on each channel. +#' +#' @export +#' @param conn a [PqConnection-class] object, produced by +#' [DBI::dbConnect()] +#' @param timeout How long to wait, in seconds. Default 1 +#' @return If a notification was available, a list of: +#' \describe{ +#' \item{channel}{Name of channel} +#' \item{pid}{PID of notifying server process} +#' \item{payload}{Content of notification} +#' } +#' If no notifications are available, return NULL +#' @examples +#' # For running the examples on systems without PostgreSQL connection: +#' if (postgresHasDefault()) { +#' library(DBI) +#' db <- dbConnect(RPostgres::Postgres()) +#' dbSendStatement(db, "LISTEN channel") +#' # In another connection:- +#' # dbSendStatement(db2, "NOTIFY channel, 'hello'") +#' n <- RPostgres::pqWaitForNotify(db) +#' if (!is.null(n)) writeLines(c("Got a message:-", n$payload)) +#' } +pqWaitForNotify <- function (conn, timeout = 1) { + out <- connection_wait_for_notify(conn@ptr, timeout) + if ('pid' %in% names(out)) out else NULL +} #' Determine database type for R vector. #' diff --git a/R/RcppExports.R b/R/RcppExports.R index 0d5ab4ac..71bb7a8d 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -37,6 +37,10 @@ connection_copy_data <- function(con, sql, df) { invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df)) } +connection_wait_for_notify <- function(con, timeout_secs) { + .Call(`_RPostgres_connection_wait_for_notify`, con, timeout_secs) +} + encode_vector <- function(x) { .Call(`_RPostgres_encode_vector`, x) } diff --git a/man/pqWaitForNotify.Rd b/man/pqWaitForNotify.Rd new file mode 100644 index 00000000..d1f7517e --- /dev/null +++ b/man/pqWaitForNotify.Rd @@ -0,0 +1,39 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/PqConnection.R +\name{pqWaitForNotify} +\alias{pqWaitForNotify} +\title{Wait for and return any notifications that return within timeout} +\usage{ +pqWaitForNotify(conn, timeout = 1) +} +\arguments{ +\item{conn}{a \linkS4class{PqConnection} object, produced by +\code{\link[DBI:dbConnect]{DBI::dbConnect()}}} + +\item{timeout}{How long to wait, in seconds. Default 1} +} +\value{ +If a notification was available, a list of: +\describe{ +\item{channel}{Name of channel} +\item{pid}{PID of notifying server process} +\item{payload}{Content of notification} +} +If no notifications are available, return NULL +} +\description{ +Once you subscribe to notifications with LISTEN, use this to wait for +responses on each channel. +} +\examples{ +# For running the examples on systems without PostgreSQL connection: +if (postgresHasDefault()) { + library(DBI) + db <- dbConnect(RPostgres::Postgres()) + dbSendStatement(db, "LISTEN channel") + # In another connection:- + # dbSendStatement(db2, "NOTIFY channel, 'hello'") + n <- RPostgres::pqWaitForNotify(db) + if (!is.null(n)) writeLines(c("Got a message:-", n$payload)) +} +} diff --git a/src/DbConnection.cpp b/src/DbConnection.cpp index dd753740..7bb26ad9 100644 --- a/src/DbConnection.cpp +++ b/src/DbConnection.cpp @@ -254,3 +254,43 @@ void DbConnection::cleanup_query() { } finish_query(pConn_); } + +List DbConnection::wait_for_notify(__time_t timeout_secs) { + PGnotify *notify; + List out; + int socket = -1; + fd_set input; + + while (TRUE) { + // See if there's a notification waiting, if so return it + if (!PQconsumeInput(pConn_)) { + stop("Failed to consume input from the server"); + } + if ((notify = PQnotifies(pConn_)) != NULL) { + out = Rcpp::List::create( + _["channel"] = CharacterVector::create(notify->relname), + _["pid"] = IntegerVector::create(notify->be_pid), + _["payload"] = CharacterVector::create(notify->extra) + ); + PQfreemem(notify); + return out; + } + + if (socket != -1) { + // Socket open, so already been round once, give up. + return NULL; + } + + // Open DB socket and wait for new data for at most (timeout_secs) seconds + if ((socket = PQsocket(pConn_)) < 0) { + stop("Failed to get connection socket"); + } + FD_ZERO(&input); + FD_SET(socket, &input); + timeval timeout = {0, 0}; + timeout.tv_sec = timeout_secs; + if (select(socket + 1, &input, NULL, NULL, &timeout) < 0) { + stop("select() on the connection failed"); + } + } +} diff --git a/src/DbConnection.h b/src/DbConnection.h index c2c0627d..10ca48a2 100644 --- a/src/DbConnection.h +++ b/src/DbConnection.h @@ -52,6 +52,7 @@ class DbConnection : boost::noncopyable { void cleanup_query(); static void finish_query(PGconn* pConn); + List wait_for_notify(__time_t timeout_secs); private: void cancel_query(); diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index b58a01ae..1bc55309 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -109,6 +109,18 @@ BEGIN_RCPP return R_NilValue; END_RCPP } +// connection_wait_for_notify +List connection_wait_for_notify(DbConnection* con, __time_t timeout_secs); +RcppExport SEXP _RPostgres_connection_wait_for_notify(SEXP conSEXP, SEXP timeout_secsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< DbConnection* >::type con(conSEXP); + Rcpp::traits::input_parameter< __time_t >::type timeout_secs(timeout_secsSEXP); + rcpp_result_gen = Rcpp::wrap(connection_wait_for_notify(con, timeout_secs)); + return rcpp_result_gen; +END_RCPP +} // encode_vector std::string encode_vector(RObject x); RcppExport SEXP _RPostgres_encode_vector(SEXP xSEXP) { @@ -264,6 +276,7 @@ static const R_CallMethodDef CallEntries[] = { {"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1}, {"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2}, {"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3}, + {"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2}, {"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1}, {"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1}, {"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2}, diff --git a/src/connection.cpp b/src/connection.cpp index b73fa9ce..0548a0c4 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -94,6 +94,11 @@ void connection_copy_data(DbConnection* con, std::string sql, List df) { return con->copy_data(sql, df); } +// [[Rcpp::export]] +List connection_wait_for_notify(DbConnection* con, __time_t timeout_secs) { + return con->wait_for_notify(timeout_secs); +} + // as() override From c6f945a255d698ec53aaf46f56591ff85568bae8 Mon Sep 17 00:00:00 2001 From: Jamie Lentin Date: Wed, 18 Dec 2019 09:44:07 +0000 Subject: [PATCH 02/11] Rename pqWaitForNotify --> postgresWaitForNotify --- NAMESPACE | 2 +- R/PqConnection.R | 4 ++-- man/{pqWaitForNotify.Rd => postgresWaitForNotify.Rd} | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) rename man/{pqWaitForNotify.Rd => postgresWaitForNotify.Rd} (87%) diff --git a/NAMESPACE b/NAMESPACE index 8c2aade5..d7df8883 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -5,7 +5,7 @@ export(Id) export(Postgres) export(postgresDefault) export(postgresHasDefault) -export(pqWaitForNotify) +export(postgresWaitForNotify) exportClasses(PqConnection) exportClasses(PqDriver) exportClasses(PqResult) diff --git a/R/PqConnection.R b/R/PqConnection.R index b9815a16..9b3be052 100644 --- a/R/PqConnection.R +++ b/R/PqConnection.R @@ -217,10 +217,10 @@ setMethod("dbDisconnect", "PqConnection", function(conn, ...) { #' dbSendStatement(db, "LISTEN channel") #' # In another connection:- #' # dbSendStatement(db2, "NOTIFY channel, 'hello'") -#' n <- RPostgres::pqWaitForNotify(db) +#' n <- RPostgres::postgresWaitForNotify(db) #' if (!is.null(n)) writeLines(c("Got a message:-", n$payload)) #' } -pqWaitForNotify <- function (conn, timeout = 1) { +postgresWaitForNotify <- function (conn, timeout = 1) { out <- connection_wait_for_notify(conn@ptr, timeout) if ('pid' %in% names(out)) out else NULL } diff --git a/man/pqWaitForNotify.Rd b/man/postgresWaitForNotify.Rd similarity index 87% rename from man/pqWaitForNotify.Rd rename to man/postgresWaitForNotify.Rd index d1f7517e..dd9181f8 100644 --- a/man/pqWaitForNotify.Rd +++ b/man/postgresWaitForNotify.Rd @@ -1,10 +1,10 @@ % Generated by roxygen2: do not edit by hand % Please edit documentation in R/PqConnection.R -\name{pqWaitForNotify} -\alias{pqWaitForNotify} +\name{postgresWaitForNotify} +\alias{postgresWaitForNotify} \title{Wait for and return any notifications that return within timeout} \usage{ -pqWaitForNotify(conn, timeout = 1) +postgresWaitForNotify(conn, timeout = 1) } \arguments{ \item{conn}{a \linkS4class{PqConnection} object, produced by @@ -33,7 +33,7 @@ if (postgresHasDefault()) { dbSendStatement(db, "LISTEN channel") # In another connection:- # dbSendStatement(db2, "NOTIFY channel, 'hello'") - n <- RPostgres::pqWaitForNotify(db) + n <- RPostgres::postgresWaitForNotify(db) if (!is.null(n)) writeLines(c("Got a message:-", n$payload)) } } From d514e9b757844c0f4dbf211af856f02794aa3b71 Mon Sep 17 00:00:00 2001 From: Jamie Lentin Date: Wed, 18 Dec 2019 09:58:36 +0000 Subject: [PATCH 03/11] PqConnection: Rewrite postgresWaitForNotify example Before it wouldn't actually get a message but wait, slowing example runs down. Create 2 database connections so we get a message through. --- R/PqConnection.R | 15 ++++++++++----- man/postgresWaitForNotify.Rd | 15 ++++++++++----- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/R/PqConnection.R b/R/PqConnection.R index 9b3be052..3c33b9ec 100644 --- a/R/PqConnection.R +++ b/R/PqConnection.R @@ -213,11 +213,16 @@ setMethod("dbDisconnect", "PqConnection", function(conn, ...) { #' # For running the examples on systems without PostgreSQL connection: #' if (postgresHasDefault()) { #' library(DBI) -#' db <- dbConnect(RPostgres::Postgres()) -#' dbSendStatement(db, "LISTEN channel") -#' # In another connection:- -#' # dbSendStatement(db2, "NOTIFY channel, 'hello'") -#' n <- RPostgres::postgresWaitForNotify(db) +#' # DB1 listens for messages on the grapevine +#' db1 <- dbConnect(RPostgres::Postgres()) +#' dbExecute(db1, "LISTEN grapevine") +#' +#' # DB2 sends one (NB: Normally part of another process) +#' db2 <- dbConnect(RPostgres::Postgres()) +#' dbExecute(db2, "NOTIFY grapevine, 'psst'") +#' +#' # DB1 waits for the message to come +#' n <- RPostgres::postgresWaitForNotify(db1) #' if (!is.null(n)) writeLines(c("Got a message:-", n$payload)) #' } postgresWaitForNotify <- function (conn, timeout = 1) { diff --git a/man/postgresWaitForNotify.Rd b/man/postgresWaitForNotify.Rd index dd9181f8..87d787e9 100644 --- a/man/postgresWaitForNotify.Rd +++ b/man/postgresWaitForNotify.Rd @@ -29,11 +29,16 @@ responses on each channel. # For running the examples on systems without PostgreSQL connection: if (postgresHasDefault()) { library(DBI) - db <- dbConnect(RPostgres::Postgres()) - dbSendStatement(db, "LISTEN channel") - # In another connection:- - # dbSendStatement(db2, "NOTIFY channel, 'hello'") - n <- RPostgres::postgresWaitForNotify(db) + # DB1 listens for messages on the grapevine + db1 <- dbConnect(RPostgres::Postgres()) + dbExecute(db1, "LISTEN grapevine") + + # DB2 sends one (NB: Normally part of another process) + db2 <- dbConnect(RPostgres::Postgres()) + dbExecute(db2, "NOTIFY grapevine, 'psst'") + + # DB1 waits for the message to come + n <- RPostgres::postgresWaitForNotify(db1) if (!is.null(n)) writeLines(c("Got a message:-", n$payload)) } } From 7ce3682520f5f449db678e085aba92541910618d Mon Sep 17 00:00:00 2001 From: Jamie Lentin Date: Thu, 19 Dec 2019 12:26:22 +0000 Subject: [PATCH 04/11] PqConnection: Use callr in postgresWaitForNotify example --- DESCRIPTION | 1 + R/PqConnection.R | 38 ++++++++++++++++++++++++------------ man/postgresWaitForNotify.Rd | 34 ++++++++++++++++++++++---------- 3 files changed, 51 insertions(+), 22 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 7f6ee268..618caee1 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -39,6 +39,7 @@ Imports: Rcpp (>= 0.11.4.2), withr Suggests: + callr, DBItest, testthat LinkingTo: diff --git a/R/PqConnection.R b/R/PqConnection.R index 3c33b9ec..37fe3869 100644 --- a/R/PqConnection.R +++ b/R/PqConnection.R @@ -212,18 +212,32 @@ setMethod("dbDisconnect", "PqConnection", function(conn, ...) { #' @examples #' # For running the examples on systems without PostgreSQL connection: #' if (postgresHasDefault()) { -#' library(DBI) -#' # DB1 listens for messages on the grapevine -#' db1 <- dbConnect(RPostgres::Postgres()) -#' dbExecute(db1, "LISTEN grapevine") -#' -#' # DB2 sends one (NB: Normally part of another process) -#' db2 <- dbConnect(RPostgres::Postgres()) -#' dbExecute(db2, "NOTIFY grapevine, 'psst'") -#' -#' # DB1 waits for the message to come -#' n <- RPostgres::postgresWaitForNotify(db1) -#' if (!is.null(n)) writeLines(c("Got a message:-", n$payload)) +#' library(DBI) +#' library(callr) +#' +#' # listen for messages on the grapevine +#' db_listen <- dbConnect(RPostgres::Postgres()) +#' dbExecute(db_listen, "LISTEN grapevine") +#' +#' # Start another process, which sends a message after a delay +#' rp <- r_bg(function () { +#' library(DBI) +#' Sys.sleep(0.3) +#' db_notify <- dbConnect(RPostgres::Postgres()) +#' dbExecute(db_notify, "NOTIFY grapevine, 'psst'") +#' dbDisconnect(db_notify) +#' }) +#' +#' # Sleep until we get the message +#' n <- NULL +#' while (is.null(n)) { +#' n <- RPostgres::postgresWaitForNotify(db_listen, 60) +#' } +#' stopifnot(n$payload == 'psst') +#' +#' # Tidy up +#' rp$wait() +#' dbDisconnect(db_listen) #' } postgresWaitForNotify <- function (conn, timeout = 1) { out <- connection_wait_for_notify(conn@ptr, timeout) diff --git a/man/postgresWaitForNotify.Rd b/man/postgresWaitForNotify.Rd index 87d787e9..6e843ae0 100644 --- a/man/postgresWaitForNotify.Rd +++ b/man/postgresWaitForNotify.Rd @@ -28,17 +28,31 @@ responses on each channel. \examples{ # For running the examples on systems without PostgreSQL connection: if (postgresHasDefault()) { - library(DBI) - # DB1 listens for messages on the grapevine - db1 <- dbConnect(RPostgres::Postgres()) - dbExecute(db1, "LISTEN grapevine") + library(DBI) + library(callr) - # DB2 sends one (NB: Normally part of another process) - db2 <- dbConnect(RPostgres::Postgres()) - dbExecute(db2, "NOTIFY grapevine, 'psst'") + # listen for messages on the grapevine + db_listen <- dbConnect(RPostgres::Postgres()) + dbExecute(db_listen, "LISTEN grapevine") - # DB1 waits for the message to come - n <- RPostgres::postgresWaitForNotify(db1) - if (!is.null(n)) writeLines(c("Got a message:-", n$payload)) + # Start another process, which sends a message after a delay + rp <- r_bg(function () { + library(DBI) + Sys.sleep(0.3) + db_notify <- dbConnect(RPostgres::Postgres()) + dbExecute(db_notify, "NOTIFY grapevine, 'psst'") + dbDisconnect(db_notify) + }) + + # Sleep until we get the message + n <- NULL + while (is.null(n)) { + n <- RPostgres::postgresWaitForNotify(db_listen, 60) + } + stopifnot(n$payload == 'psst') + + # Tidy up + rp$wait() + dbDisconnect(db_listen) } } From 0ca1927bef8692488a568013d2f5c895270df1c6 Mon Sep 17 00:00:00 2001 From: Jamie Lentin Date: Thu, 19 Dec 2019 14:13:32 +0000 Subject: [PATCH 05/11] DbConnection: Fix compiler warnings under other OSes * __time_t isn't available under windows * timeval requires winsock2.h * Converting NULL to list pointer emits a warning, use R_NilValue --- src/DbConnection.cpp | 7 +++++-- src/DbConnection.h | 2 +- src/RcppExports.cpp | 4 ++-- src/connection.cpp | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/DbConnection.cpp b/src/DbConnection.cpp index 7bb26ad9..60e5984f 100644 --- a/src/DbConnection.cpp +++ b/src/DbConnection.cpp @@ -3,6 +3,9 @@ #include "encode.h" #include "DbResult.h" +#ifdef _WIN32 +#include +#endif DbConnection::DbConnection(std::vector keys, std::vector values, bool check_interrupts) : @@ -255,7 +258,7 @@ void DbConnection::cleanup_query() { finish_query(pConn_); } -List DbConnection::wait_for_notify(__time_t timeout_secs) { +List DbConnection::wait_for_notify(int timeout_secs) { PGnotify *notify; List out; int socket = -1; @@ -278,7 +281,7 @@ List DbConnection::wait_for_notify(__time_t timeout_secs) { if (socket != -1) { // Socket open, so already been round once, give up. - return NULL; + return R_NilValue; } // Open DB socket and wait for new data for at most (timeout_secs) seconds diff --git a/src/DbConnection.h b/src/DbConnection.h index 10ca48a2..ff4c435c 100644 --- a/src/DbConnection.h +++ b/src/DbConnection.h @@ -52,7 +52,7 @@ class DbConnection : boost::noncopyable { void cleanup_query(); static void finish_query(PGconn* pConn); - List wait_for_notify(__time_t timeout_secs); + List wait_for_notify(int timeout_secs); private: void cancel_query(); diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index e4bad1c5..5a9c22a3 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -120,13 +120,13 @@ BEGIN_RCPP END_RCPP } // connection_wait_for_notify -List connection_wait_for_notify(DbConnection* con, __time_t timeout_secs); +List connection_wait_for_notify(DbConnection* con, int timeout_secs); RcppExport SEXP _RPostgres_connection_wait_for_notify(SEXP conSEXP, SEXP timeout_secsSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; Rcpp::traits::input_parameter< DbConnection* >::type con(conSEXP); - Rcpp::traits::input_parameter< __time_t >::type timeout_secs(timeout_secsSEXP); + Rcpp::traits::input_parameter< int >::type timeout_secs(timeout_secsSEXP); rcpp_result_gen = Rcpp::wrap(connection_wait_for_notify(con, timeout_secs)); return rcpp_result_gen; END_RCPP diff --git a/src/connection.cpp b/src/connection.cpp index f008cbc0..dc547fd2 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -100,7 +100,7 @@ void connection_copy_data(DbConnection* con, std::string sql, List df) { } // [[Rcpp::export]] -List connection_wait_for_notify(DbConnection* con, __time_t timeout_secs) { +List connection_wait_for_notify(DbConnection* con, int timeout_secs) { return con->wait_for_notify(timeout_secs); } From a84724bf720c10b2b56e37a833b63573c83ce6af Mon Sep 17 00:00:00 2001 From: Jamie Lentin Date: Thu, 19 Dec 2019 20:30:07 +0000 Subject: [PATCH 06/11] waitForNotify: Add unit tests --- tests/testthat/test-waitForNotify.R | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 tests/testthat/test-waitForNotify.R diff --git a/tests/testthat/test-waitForNotify.R b/tests/testthat/test-waitForNotify.R new file mode 100644 index 00000000..156f4f80 --- /dev/null +++ b/tests/testthat/test-waitForNotify.R @@ -0,0 +1,17 @@ +context("waitForNotify") + +test_that("WaitForNotify without anything to do returns NULL", { + db <- postgresDefault() + n <- RPostgres::postgresWaitForNotify(db, 1) + dbDisconnect(db) + expect_null(n) +}) + +test_that("WaitForNotify with a waiting message returns message", { + db <- postgresDefault() + dbExecute(db, 'LISTEN grapevine') + dbExecute(db, "NOTIFY grapevine, 'psst'") + n <- RPostgres::postgresWaitForNotify(db, 1) + dbDisconnect(db) + expect_identical(n$payload, 'psst') +}) From a1b982549a037fa7a1dc1514e1f82c55902113fa Mon Sep 17 00:00:00 2001 From: Jamie Lentin Date: Tue, 13 Oct 2020 12:22:13 +0100 Subject: [PATCH 07/11] DESCRIPTION: Add knitr as the vignette builder --- DESCRIPTION | 3 +++ 1 file changed, 3 insertions(+) diff --git a/DESCRIPTION b/DESCRIPTION index 67fa2121..7ea505de 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -42,11 +42,14 @@ Imports: Suggests: callr, DBItest (>= 1.7.0), + knitr, testthat LinkingTo: BH, plogr (>= 0.2.0), Rcpp +VignetteBuilder: + knitr Encoding: UTF-8 LazyLoad: true Roxygen: list(markdown = TRUE) From 4baf9b9f968da9687da678b2b9ee195e69f24415 Mon Sep 17 00:00:00 2001 From: Jamie Lentin Date: Tue, 13 Oct 2020 12:22:36 +0100 Subject: [PATCH 08/11] vignettes/work-queue: A full LISTEN/NOTIFY use case Show what you might do with LISTEN/NOTIFY support in PostgreSQL. --- vignettes/work-queue.Rmd | 265 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 vignettes/work-queue.Rmd diff --git a/vignettes/work-queue.Rmd b/vignettes/work-queue.Rmd new file mode 100644 index 00000000..5769efa4 --- /dev/null +++ b/vignettes/work-queue.Rmd @@ -0,0 +1,265 @@ +--- +title: "Implementing a Work Queue using RPostgres" +author: "Jamie Lentin" +output: rmarkdown::html_vignette +vignette: > + %\VignetteIndexEntry{Implementing a Work Queue using RPostgres} + %\VignetteEngine{knitr::rmarkdown} + \usepackage[utf8]{inputenc} +--- + +```{r, echo = FALSE} +library(DBI) +knitr::opts_chunk$set(collapse = TRUE, comment = "#>") +con <- NULL +rp <- NULL +rs <- NULL +``` + +Imagine you have an R process that is relatively intensive, based on user input. + +To keep things as fast as possible, you may want to use several servers to all process incoming requests for square roots. +However, to do this you need to co-ordinate between all of your servers (or workers). +How do you decide which server works on what? What if one server dies mid-way? +To decide this, we need a work queue, also known as a job queue or task queue. +This document will show show you how to build a work queue system using R and PostgreSQL that would ordinarily require an external tool, +like [RabbitMQ](https://www.rabbitmq.com/). + +In this example, our work will be generating square roots. We'll keep track of the results in a table: + +```{r} +con <- dbConnect(RPostgres::Postgres()) + +dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;") +dbExecute(con, " + CREATE TABLE sqroot_vignette_example ( + in_val INTEGER PRIMARY KEY, + out_val DOUBLE PRECISION NULL + ) +") +``` + +When a client wants a square root value, it can insert a new row into a table, filling ``in_val``. +We'll then have a bunch of workers that will calculate the results for the client, and fill in ``out_val``. + +To manage these workers, we will combine 2 PostgreSQL concepts: + +```{r, echo = FALSE} +if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL } +if (!is.null(con)) { dbDisconnect(con) ; con <- NULL } +if (!is.null(rp)) { rp$wait() ; rp <- NULL } +``` + +## LISTEN / NOTIFY + +The Postgres ``LISTEN`` and ``NOTIFY`` commands allow you to send and receive messages between clients connected to a PostgreSQL database. +This is known as a publish/subscribe architecture. + +We tell Postgres that we are interested in receiving messages using ``LISTEN``. For example: + +```{r} +con <- dbConnect(RPostgres::Postgres()) +dbExecute(con, "LISTEN grapevine") +``` + +...in this case, "grapevine" is arbitary, we don't need to create channels ahead of time. +To make sure we have something to receive, we can start a separate R process using [callr](https://CRAN.R-project.org/package=callr). +Ordinarily this would be part of another R script, maybe on another computer. +This will wait a bit, and use ``NOTIFY`` to send a message, then finish: + +```{r} +rp <- callr::r_bg(function () { + library(DBI) + Sys.sleep(0.3) + db_notify <- dbConnect(RPostgres::Postgres()) + dbExecute(db_notify, "NOTIFY grapevine, 'psst'") + dbDisconnect(db_notify) +}) +``` + +Finally, we should wait for any incoming messages. To do this, use ``postgresWaitForNotify``. +The payload will contain the message from the other R process: + +```{r} +# Sleep until we get the message +n <- NULL +while (is.null(n)) { + n <- RPostgres::postgresWaitForNotify(con) +} +n$payload +``` + +## SKIP LOCKED + +We can use LISTEN/NOTIFY to inform all workers that there is something to be done, but how do we decide which worker actually does the work? +This is done using ``SKIP LOCKED``. + +We notify all workers that the input ``99`` is ready for processing. +After receiving this, they all do the following: + +```{r} +rs <- dbSendQuery(con, " + SELECT in_val + FROM sqroot_vignette_example + WHERE in_val = $1 + FOR UPDATE + SKIP LOCKED +", params = list(99)) +``` + +One lucky worker will get a row back, but thanks to ``FOR UPDATE``, the row is now locked. +For any other worker, as the row is now locked, they will skip over it (``SKIP LOCKED``) and find something else to do. +If there are no other jobs available, then nothing will be returned. + +Using SKIP LOCKED is discussed in more detail [in this article](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/). + +```{r, echo = FALSE} +if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL } +if (!is.null(con)) { dbDisconnect(con) ; con <- NULL } +if (!is.null(rp)) { rp$wait() ; rp <- NULL } +``` + +## Implementing our worker + +Now we can put the concepts together. +The following implements our worker as a function (again, this would be running as a script on several servers): + +```{r} +worker <- function () { + library(DBI) + db_worker <- dbConnect(RPostgres::Postgres()) + on.exit(dbDisconnect(db_worker)) + dbExecute(db_worker, "LISTEN sqroot") + dbExecute(db_worker, "LISTEN sqroot_shutdown") + + while(TRUE) { + # Wait for new work to do + n <- RPostgres::postgresWaitForNotify(db_worker, 60) + if (is.null(n)) { + # If nothing to do, send notifications of any not up-to-date work + dbExecute(db_worker, " + SELECT pg_notify('sqroot', in_val::TEXT) + FROM sqroot_vignette_example + WHERE out_val IS NULL + ") + next + } + + # If we've been told to shutdown, stop right away + if (n$channel == 'sqroot_shutdown') { + writeLines("Shutting down.") + break + } + + in_val <- strtoi(n$payload) + tryCatch({ + dbWithTransaction(db_worker, { + # Try and fetch the item we got notified about + rs <- dbSendQuery(db_worker, " + SELECT in_val + FROM sqroot_vignette_example + WHERE out_val IS NULL -- if another worker already finished, don't reprocess + AND in_val = $1 + FOR UPDATE SKIP LOCKED -- Don't let another worker work on this at the same time + ", params = list(in_val)) + in_val <- dbFetch(rs)[1,1] + dbClearResult(rs) + + if (!is.na(in_val)) { + # Actually do the sqrt + writeLines(paste("Sqroot-ing", in_val, "... ")) + Sys.sleep(in_val * 0.1) + out_val <- sqrt(in_val) + + # Update the datbase with the result + dbExecute(db_worker, " + UPDATE sqroot_vignette_example + SET out_val = $1 + WHERE in_val = $2 + ", params = list(out_val, in_val)) + } else { + writeLines(paste("Not sqroot-ing as another worker got there first")) + } + }) + }, error = function (e) { + # Something went wrong. Report error and carry on + writeLines(paste("Failed to sqroot:", e$message)) + }) + } +} +``` + +The worker connects to the database, starts listening and loops indefinitely. + +* First, we wait for new notifications. +* If there aren't any notifications, then we search for any old items and generate new notifications. + This allows items to be picked up again if they didn't get processed the first time around, + e.g. because there were no workers listening. +* If we got a shutdown message, stop. +* Try to grab the row for the new item, if we win, and only one worker will, then fill in the square root. + +Let's use callr again to start 2 workers: + +```{r} +stdout_1 <- tempfile() +stdout_2 <- tempfile() +rp <- callr::r_bg(worker, stdout = stdout_1, stderr = stdout_1) +rp <- callr::r_bg(worker, stdout = stdout_2, stderr = stdout_2) +Sys.sleep(1) # Give workers a chance to set themselves up +``` + +Now our client can add some values to our table and notify the workers that there's something to do: + +```{r} +con <- dbConnect(RPostgres::Postgres()) + +add_sqroot <- function (in_val) { + dbExecute(con, " + INSERT INTO sqroot_vignette_example (in_val) VALUES ($1) + ", params = list(in_val)) + dbExecute(con, " + SELECT pg_notify('sqroot', $1) + ", params = list(in_val)) +} + +add_sqroot(7) +add_sqroot(8) +add_sqroot(9) +``` + +...after a wait, the answers should have been populated by the workers for us: + +```{r} +Sys.sleep(3) +rs <- dbSendQuery(con, "SELECT * FROM sqroot_vignette_example ORDER BY in_val") +dbFetch(rs) +dbClearResult(rs) ; rs <- NULL +``` + +Finally, we can use ``NOTIFY`` to stop all the workers: + +```{r} +dbExecute(con, "NOTIFY sqroot_shutdown, ''") +``` + +And see what messages were printed as they run: + +```{r} +# We can't control which worker will process the first entry, +# so we sort the results so the vignette output stays the same. +outputs <- sort(c( + paste(readLines(con = stdout_1), collapse = "\n"), + paste(readLines(con = stdout_2), collapse = "\n"))) + +writeLines(outputs[[1]]) +writeLines(outputs[[2]]) +``` + +Notice that the work has been shared between the 2 workers. +If these 2 weren't enough, we could happily add more to keep the system going. + +```{r, echo = FALSE} +if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL } +if (!is.null(con)) { dbDisconnect(con) ; con <- NULL } +if (!is.null(rp)) { rp$wait() ; rp <- NULL } +``` From 95eb3e73ddb7a35eea68624995f42a48ef8726c3 Mon Sep 17 00:00:00 2001 From: Jamie Lentin Date: Tue, 13 Oct 2020 12:49:31 +0100 Subject: [PATCH 09/11] DESCRIPTION: Also require rmarkdown for vignettes --- DESCRIPTION | 1 + 1 file changed, 1 insertion(+) diff --git a/DESCRIPTION b/DESCRIPTION index 7ea505de..011bbb2f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -43,6 +43,7 @@ Suggests: callr, DBItest (>= 1.7.0), knitr, + rmarkdown, testthat LinkingTo: BH, From 9c8f18fae967cc1ea3385593175b742221809587 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 18 Oct 2020 07:19:39 +0200 Subject: [PATCH 10/11] Bump Postgres server to supported version required for SKIP LOCKED --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index a8169e5d..08e6661e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,7 +55,7 @@ env: #addons addons: - postgresql: "9.4" + postgresql: "9.5" apt: packages: - libpq-dev From 259497b6103d3f812801d8cb241861dc4726151c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kirill=20M=C3=BCller?= Date: Sun, 18 Oct 2020 07:47:47 +0200 Subject: [PATCH 11/11] Tweaks --- vignettes/work-queue.Rmd | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/vignettes/work-queue.Rmd b/vignettes/work-queue.Rmd index 5769efa4..fa3abb4e 100644 --- a/vignettes/work-queue.Rmd +++ b/vignettes/work-queue.Rmd @@ -10,7 +10,11 @@ vignette: > ```{r, echo = FALSE} library(DBI) -knitr::opts_chunk$set(collapse = TRUE, comment = "#>") +knitr::opts_chunk$set( + collapse = TRUE, + comment = "#>", + eval = RPostgres::postgresHasDefault() +) con <- NULL rp <- NULL rs <- NULL @@ -28,6 +32,8 @@ like [RabbitMQ](https://www.rabbitmq.com/). In this example, our work will be generating square roots. We'll keep track of the results in a table: ```{r} +library(DBI) + con <- dbConnect(RPostgres::Postgres()) dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;")