diff --git a/.travis.yml b/.travis.yml index ce0b8083..f51dc65f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -57,7 +57,7 @@ env: #addons addons: - postgresql: "9.4" + postgresql: "9.5" apt: packages: - libpq-dev diff --git a/DESCRIPTION b/DESCRIPTION index 063b333b..80365e63 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -40,12 +40,17 @@ Imports: Rcpp (>= 0.11.4.2), withr Suggests: + callr, DBItest (>= 1.7.0), + knitr, + rmarkdown, testthat LinkingTo: BH, plogr (>= 0.2.0), Rcpp +VignetteBuilder: + knitr Encoding: UTF-8 LazyLoad: true Roxygen: list(markdown = TRUE) diff --git a/NAMESPACE b/NAMESPACE index 78197cbd..e0098385 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -6,6 +6,7 @@ export(Postgres) export(Redshift) export(postgresDefault) export(postgresHasDefault) +export(postgresWaitForNotify) exportClasses(PqConnection) exportClasses(PqDriver) exportClasses(PqResult) diff --git a/R/PqConnection.R b/R/PqConnection.R index 790e0397..69c2bc9c 100644 --- a/R/PqConnection.R +++ b/R/PqConnection.R @@ -200,6 +200,56 @@ setMethod("dbDisconnect", "PqConnection", function(conn, ...) { invisible(TRUE) }) +#' Wait for and return any notifications that return within timeout +#' +#' Once you subscribe to notifications with LISTEN, use this to wait for +#' responses on each channel. +#' +#' @export +#' @param conn a [PqConnection-class] object, produced by +#' [DBI::dbConnect()] +#' @param timeout How long to wait, in seconds. Default 1 +#' @return If a notification was available, a list of: +#' \describe{ +#' \item{channel}{Name of channel} +#' \item{pid}{PID of notifying server process} +#' \item{payload}{Content of notification} +#' } +#' If no notifications are available, return NULL +#' @examples +#' # For running the examples on systems without PostgreSQL connection: +#' if (postgresHasDefault()) { +#' library(DBI) +#' library(callr) +#' +#' # listen for messages on the grapevine +#' db_listen <- dbConnect(RPostgres::Postgres()) +#' dbExecute(db_listen, "LISTEN grapevine") +#' +#' # Start another process, which sends a message after a delay +#' rp <- r_bg(function () { +#' library(DBI) +#' Sys.sleep(0.3) +#' db_notify <- dbConnect(RPostgres::Postgres()) +#' dbExecute(db_notify, "NOTIFY grapevine, 'psst'") +#' dbDisconnect(db_notify) +#' }) +#' +#' # Sleep until we get the message +#' n <- NULL +#' while (is.null(n)) { +#' n <- RPostgres::postgresWaitForNotify(db_listen, 60) +#' } +#' stopifnot(n$payload == 'psst') +#' +#' # Tidy up +#' rp$wait() +#' dbDisconnect(db_listen) +#' } +postgresWaitForNotify <- function (conn, timeout = 1) { + out <- connection_wait_for_notify(conn@ptr, timeout) + if ('pid' %in% names(out)) out else NULL +} #' Determine database type for R vector. #' diff --git a/R/RcppExports.R b/R/RcppExports.R index 38e7fddd..68d1e44c 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -41,6 +41,10 @@ connection_copy_data <- function(con, sql, df) { invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df)) } +connection_wait_for_notify <- function(con, timeout_secs) { + .Call(`_RPostgres_connection_wait_for_notify`, con, timeout_secs) +} + encode_vector <- function(x) { .Call(`_RPostgres_encode_vector`, x) } diff --git a/man/postgresWaitForNotify.Rd b/man/postgresWaitForNotify.Rd new file mode 100644 index 00000000..6e843ae0 --- /dev/null +++ b/man/postgresWaitForNotify.Rd @@ -0,0 +1,58 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/PqConnection.R +\name{postgresWaitForNotify} +\alias{postgresWaitForNotify} +\title{Wait for and return any notifications that return within timeout} +\usage{ +postgresWaitForNotify(conn, timeout = 1) +} +\arguments{ +\item{conn}{a \linkS4class{PqConnection} object, produced by +\code{\link[DBI:dbConnect]{DBI::dbConnect()}}} + +\item{timeout}{How long to wait, in seconds. Default 1} +} +\value{ +If a notification was available, a list of: +\describe{ +\item{channel}{Name of channel} +\item{pid}{PID of notifying server process} +\item{payload}{Content of notification} +} +If no notifications are available, return NULL +} +\description{ +Once you subscribe to notifications with LISTEN, use this to wait for +responses on each channel. +} +\examples{ +# For running the examples on systems without PostgreSQL connection: +if (postgresHasDefault()) { + library(DBI) + library(callr) + + # listen for messages on the grapevine + db_listen <- dbConnect(RPostgres::Postgres()) + dbExecute(db_listen, "LISTEN grapevine") + + # Start another process, which sends a message after a delay + rp <- r_bg(function () { + library(DBI) + Sys.sleep(0.3) + db_notify <- dbConnect(RPostgres::Postgres()) + dbExecute(db_notify, "NOTIFY grapevine, 'psst'") + dbDisconnect(db_notify) + }) + + # Sleep until we get the message + n <- NULL + while (is.null(n)) { + n <- RPostgres::postgresWaitForNotify(db_listen, 60) + } + stopifnot(n$payload == 'psst') + + # Tidy up + rp$wait() + dbDisconnect(db_listen) +} +} diff --git a/src/DbConnection.cpp b/src/DbConnection.cpp index dd753740..60e5984f 100644 --- a/src/DbConnection.cpp +++ b/src/DbConnection.cpp @@ -3,6 +3,9 @@ #include "encode.h" #include "DbResult.h" +#ifdef _WIN32 +#include +#endif DbConnection::DbConnection(std::vector keys, std::vector values, bool check_interrupts) : @@ -254,3 +257,43 @@ void DbConnection::cleanup_query() { } finish_query(pConn_); } + +List DbConnection::wait_for_notify(int timeout_secs) { + PGnotify *notify; + List out; + int socket = -1; + fd_set input; + + while (TRUE) { + // See if there's a notification waiting, if so return it + if (!PQconsumeInput(pConn_)) { + stop("Failed to consume input from the server"); + } + if ((notify = PQnotifies(pConn_)) != NULL) { + out = Rcpp::List::create( + _["channel"] = CharacterVector::create(notify->relname), + _["pid"] = IntegerVector::create(notify->be_pid), + _["payload"] = CharacterVector::create(notify->extra) + ); + PQfreemem(notify); + return out; + } + + if (socket != -1) { + // Socket open, so already been round once, give up. + return R_NilValue; + } + + // Open DB socket and wait for new data for at most (timeout_secs) seconds + if ((socket = PQsocket(pConn_)) < 0) { + stop("Failed to get connection socket"); + } + FD_ZERO(&input); + FD_SET(socket, &input); + timeval timeout = {0, 0}; + timeout.tv_sec = timeout_secs; + if (select(socket + 1, &input, NULL, NULL, &timeout) < 0) { + stop("select() on the connection failed"); + } + } +} diff --git a/src/DbConnection.h b/src/DbConnection.h index c2c0627d..ff4c435c 100644 --- a/src/DbConnection.h +++ b/src/DbConnection.h @@ -52,6 +52,7 @@ class DbConnection : boost::noncopyable { void cleanup_query(); static void finish_query(PGconn* pConn); + List wait_for_notify(int timeout_secs); private: void cancel_query(); diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 5ca11b7d..5a9c22a3 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -119,6 +119,18 @@ BEGIN_RCPP return R_NilValue; END_RCPP } +// connection_wait_for_notify +List connection_wait_for_notify(DbConnection* con, int timeout_secs); +RcppExport SEXP _RPostgres_connection_wait_for_notify(SEXP conSEXP, SEXP timeout_secsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< DbConnection* >::type con(conSEXP); + Rcpp::traits::input_parameter< int >::type timeout_secs(timeout_secsSEXP); + rcpp_result_gen = Rcpp::wrap(connection_wait_for_notify(con, timeout_secs)); + return rcpp_result_gen; +END_RCPP +} // encode_vector std::string encode_vector(RObject x); RcppExport SEXP _RPostgres_encode_vector(SEXP xSEXP) { @@ -275,6 +287,7 @@ static const R_CallMethodDef CallEntries[] = { {"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1}, {"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2}, {"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3}, + {"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2}, {"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1}, {"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1}, {"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2}, diff --git a/src/connection.cpp b/src/connection.cpp index e751522d..dc547fd2 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -99,6 +99,11 @@ void connection_copy_data(DbConnection* con, std::string sql, List df) { return con->copy_data(sql, df); } +// [[Rcpp::export]] +List connection_wait_for_notify(DbConnection* con, int timeout_secs) { + return con->wait_for_notify(timeout_secs); +} + // as() override diff --git a/tests/testthat/test-waitForNotify.R b/tests/testthat/test-waitForNotify.R new file mode 100644 index 00000000..156f4f80 --- /dev/null +++ b/tests/testthat/test-waitForNotify.R @@ -0,0 +1,17 @@ +context("waitForNotify") + +test_that("WaitForNotify without anything to do returns NULL", { + db <- postgresDefault() + n <- RPostgres::postgresWaitForNotify(db, 1) + dbDisconnect(db) + expect_null(n) +}) + +test_that("WaitForNotify with a waiting message returns message", { + db <- postgresDefault() + dbExecute(db, 'LISTEN grapevine') + dbExecute(db, "NOTIFY grapevine, 'psst'") + n <- RPostgres::postgresWaitForNotify(db, 1) + dbDisconnect(db) + expect_identical(n$payload, 'psst') +}) diff --git a/vignettes/work-queue.Rmd b/vignettes/work-queue.Rmd new file mode 100644 index 00000000..fa3abb4e --- /dev/null +++ b/vignettes/work-queue.Rmd @@ -0,0 +1,271 @@ +--- +title: "Implementing a Work Queue using RPostgres" +author: "Jamie Lentin" +output: rmarkdown::html_vignette +vignette: > + %\VignetteIndexEntry{Implementing a Work Queue using RPostgres} + %\VignetteEngine{knitr::rmarkdown} + \usepackage[utf8]{inputenc} +--- + +```{r, echo = FALSE} +library(DBI) +knitr::opts_chunk$set( + collapse = TRUE, + comment = "#>", + eval = RPostgres::postgresHasDefault() +) +con <- NULL +rp <- NULL +rs <- NULL +``` + +Imagine you have an R process that is relatively intensive, based on user input. + +To keep things as fast as possible, you may want to use several servers to all process incoming requests for square roots. +However, to do this you need to co-ordinate between all of your servers (or workers). +How do you decide which server works on what? What if one server dies mid-way? +To decide this, we need a work queue, also known as a job queue or task queue. +This document will show show you how to build a work queue system using R and PostgreSQL that would ordinarily require an external tool, +like [RabbitMQ](https://www.rabbitmq.com/). + +In this example, our work will be generating square roots. We'll keep track of the results in a table: + +```{r} +library(DBI) + +con <- dbConnect(RPostgres::Postgres()) + +dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;") +dbExecute(con, " + CREATE TABLE sqroot_vignette_example ( + in_val INTEGER PRIMARY KEY, + out_val DOUBLE PRECISION NULL + ) +") +``` + +When a client wants a square root value, it can insert a new row into a table, filling ``in_val``. +We'll then have a bunch of workers that will calculate the results for the client, and fill in ``out_val``. + +To manage these workers, we will combine 2 PostgreSQL concepts: + +```{r, echo = FALSE} +if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL } +if (!is.null(con)) { dbDisconnect(con) ; con <- NULL } +if (!is.null(rp)) { rp$wait() ; rp <- NULL } +``` + +## LISTEN / NOTIFY + +The Postgres ``LISTEN`` and ``NOTIFY`` commands allow you to send and receive messages between clients connected to a PostgreSQL database. +This is known as a publish/subscribe architecture. + +We tell Postgres that we are interested in receiving messages using ``LISTEN``. For example: + +```{r} +con <- dbConnect(RPostgres::Postgres()) +dbExecute(con, "LISTEN grapevine") +``` + +...in this case, "grapevine" is arbitary, we don't need to create channels ahead of time. +To make sure we have something to receive, we can start a separate R process using [callr](https://CRAN.R-project.org/package=callr). +Ordinarily this would be part of another R script, maybe on another computer. +This will wait a bit, and use ``NOTIFY`` to send a message, then finish: + +```{r} +rp <- callr::r_bg(function () { + library(DBI) + Sys.sleep(0.3) + db_notify <- dbConnect(RPostgres::Postgres()) + dbExecute(db_notify, "NOTIFY grapevine, 'psst'") + dbDisconnect(db_notify) +}) +``` + +Finally, we should wait for any incoming messages. To do this, use ``postgresWaitForNotify``. +The payload will contain the message from the other R process: + +```{r} +# Sleep until we get the message +n <- NULL +while (is.null(n)) { + n <- RPostgres::postgresWaitForNotify(con) +} +n$payload +``` + +## SKIP LOCKED + +We can use LISTEN/NOTIFY to inform all workers that there is something to be done, but how do we decide which worker actually does the work? +This is done using ``SKIP LOCKED``. + +We notify all workers that the input ``99`` is ready for processing. +After receiving this, they all do the following: + +```{r} +rs <- dbSendQuery(con, " + SELECT in_val + FROM sqroot_vignette_example + WHERE in_val = $1 + FOR UPDATE + SKIP LOCKED +", params = list(99)) +``` + +One lucky worker will get a row back, but thanks to ``FOR UPDATE``, the row is now locked. +For any other worker, as the row is now locked, they will skip over it (``SKIP LOCKED``) and find something else to do. +If there are no other jobs available, then nothing will be returned. + +Using SKIP LOCKED is discussed in more detail [in this article](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/). + +```{r, echo = FALSE} +if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL } +if (!is.null(con)) { dbDisconnect(con) ; con <- NULL } +if (!is.null(rp)) { rp$wait() ; rp <- NULL } +``` + +## Implementing our worker + +Now we can put the concepts together. +The following implements our worker as a function (again, this would be running as a script on several servers): + +```{r} +worker <- function () { + library(DBI) + db_worker <- dbConnect(RPostgres::Postgres()) + on.exit(dbDisconnect(db_worker)) + dbExecute(db_worker, "LISTEN sqroot") + dbExecute(db_worker, "LISTEN sqroot_shutdown") + + while(TRUE) { + # Wait for new work to do + n <- RPostgres::postgresWaitForNotify(db_worker, 60) + if (is.null(n)) { + # If nothing to do, send notifications of any not up-to-date work + dbExecute(db_worker, " + SELECT pg_notify('sqroot', in_val::TEXT) + FROM sqroot_vignette_example + WHERE out_val IS NULL + ") + next + } + + # If we've been told to shutdown, stop right away + if (n$channel == 'sqroot_shutdown') { + writeLines("Shutting down.") + break + } + + in_val <- strtoi(n$payload) + tryCatch({ + dbWithTransaction(db_worker, { + # Try and fetch the item we got notified about + rs <- dbSendQuery(db_worker, " + SELECT in_val + FROM sqroot_vignette_example + WHERE out_val IS NULL -- if another worker already finished, don't reprocess + AND in_val = $1 + FOR UPDATE SKIP LOCKED -- Don't let another worker work on this at the same time + ", params = list(in_val)) + in_val <- dbFetch(rs)[1,1] + dbClearResult(rs) + + if (!is.na(in_val)) { + # Actually do the sqrt + writeLines(paste("Sqroot-ing", in_val, "... ")) + Sys.sleep(in_val * 0.1) + out_val <- sqrt(in_val) + + # Update the datbase with the result + dbExecute(db_worker, " + UPDATE sqroot_vignette_example + SET out_val = $1 + WHERE in_val = $2 + ", params = list(out_val, in_val)) + } else { + writeLines(paste("Not sqroot-ing as another worker got there first")) + } + }) + }, error = function (e) { + # Something went wrong. Report error and carry on + writeLines(paste("Failed to sqroot:", e$message)) + }) + } +} +``` + +The worker connects to the database, starts listening and loops indefinitely. + +* First, we wait for new notifications. +* If there aren't any notifications, then we search for any old items and generate new notifications. + This allows items to be picked up again if they didn't get processed the first time around, + e.g. because there were no workers listening. +* If we got a shutdown message, stop. +* Try to grab the row for the new item, if we win, and only one worker will, then fill in the square root. + +Let's use callr again to start 2 workers: + +```{r} +stdout_1 <- tempfile() +stdout_2 <- tempfile() +rp <- callr::r_bg(worker, stdout = stdout_1, stderr = stdout_1) +rp <- callr::r_bg(worker, stdout = stdout_2, stderr = stdout_2) +Sys.sleep(1) # Give workers a chance to set themselves up +``` + +Now our client can add some values to our table and notify the workers that there's something to do: + +```{r} +con <- dbConnect(RPostgres::Postgres()) + +add_sqroot <- function (in_val) { + dbExecute(con, " + INSERT INTO sqroot_vignette_example (in_val) VALUES ($1) + ", params = list(in_val)) + dbExecute(con, " + SELECT pg_notify('sqroot', $1) + ", params = list(in_val)) +} + +add_sqroot(7) +add_sqroot(8) +add_sqroot(9) +``` + +...after a wait, the answers should have been populated by the workers for us: + +```{r} +Sys.sleep(3) +rs <- dbSendQuery(con, "SELECT * FROM sqroot_vignette_example ORDER BY in_val") +dbFetch(rs) +dbClearResult(rs) ; rs <- NULL +``` + +Finally, we can use ``NOTIFY`` to stop all the workers: + +```{r} +dbExecute(con, "NOTIFY sqroot_shutdown, ''") +``` + +And see what messages were printed as they run: + +```{r} +# We can't control which worker will process the first entry, +# so we sort the results so the vignette output stays the same. +outputs <- sort(c( + paste(readLines(con = stdout_1), collapse = "\n"), + paste(readLines(con = stdout_2), collapse = "\n"))) + +writeLines(outputs[[1]]) +writeLines(outputs[[2]]) +``` + +Notice that the work has been shared between the 2 workers. +If these 2 weren't enough, we could happily add more to keep the system going. + +```{r, echo = FALSE} +if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL } +if (!is.null(con)) { dbDisconnect(con) ; con <- NULL } +if (!is.null(rp)) { rp$wait() ; rp <- NULL } +```