diff --git a/DESCRIPTION b/DESCRIPTION index 476529832..f84b7bc59 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -93,6 +93,7 @@ Imports: Suggests: abind, callr, + clustermq, DBI, MASS, methods, diff --git a/NAMESPACE b/NAMESPACE index f3d10e88f..388c638ba 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -8,6 +8,7 @@ export(as_file) export(available_hash_algos) export(backend) export(bind_plans) +export(build_clustermq) export(build_drake_graph) export(build_graph) export(build_times) diff --git a/NEWS.md b/NEWS.md index c9c690c1e..e4b0ac4d0 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,7 @@ # Version 5.2.1.9000 -- Add `make(parallelism = "future_lapply_staged")`, a `future`-based staged parallelism backend (see https://github.com/ropensci/drake/pull/450). +- Implement `make(parallelism = "clustermq_staged")`, a `clustermq`-based staged parallelism backend (see https://github.com/ropensci/drake/pull/452). +- Implement `make(parallelism = "future_lapply_staged")`, a `future`-based staged parallelism backend (see https://github.com/ropensci/drake/pull/450). - Depend on `codetools` rather than `CodeDepends` for finding global variables. - Skip more tests on CRAN. White-list tests instead of blacklisting them in order to try to keep check time under the official 10-minute cap. - Disallow wildcard names to grep-match other wildcard names or any replacement values. This will prevent careless mistakes and confusion when generating `drake_plan()`s. diff --git a/R/build.R b/R/build.R index d3857cf18..f1eee5360 100644 --- a/R/build.R +++ b/R/build.R @@ -1,9 +1,6 @@ #' @title Build/process a single target or import. #' @export -#' @description For internal use only. -#' the only reason this function is exported -#' is to set up parallel socket (PSOCK) clusters -#' without much of a fuss. +#' @description Also load the target's dependencies beforehand. #' @return The value of the target right after it is built. #' @param target name of the target #' @param meta list of metadata that tell which diff --git a/R/config.R b/R/config.R index db2f16fa8..c49cbd9bd 100644 --- a/R/config.R +++ b/R/config.R @@ -488,7 +488,7 @@ drake_config <- function( } add_packages_to_prework <- function(packages, prework) { - packages <- unique(c("methods", packages)) + packages <- unique(c("methods", "drake", packages)) paste0("if(!R.utils::isPackageLoaded(\"", packages, "\")) require(", packages, ")", sep = "") %>% c(prework) } diff --git a/R/parallel_ui.R b/R/parallel_ui.R index d54143991..36577a037 100644 --- a/R/parallel_ui.R +++ b/R/parallel_ui.R @@ -105,6 +105,7 @@ parallelism_choices <- function(distributed_only = FALSE) { "parLapply_staged" ) distributed <- c( + "clustermq_staged", "future", "future_lapply", "future_lapply_staged", diff --git a/R/staged.R b/R/staged.R index c126f16d9..031400ea9 100644 --- a/R/staged.R +++ b/R/staged.R @@ -160,3 +160,118 @@ run_future_lapply_staged <- function(config){ } invisible() } + +run_clustermq_staged <- function(config){ + if (!requireNamespace("clustermq")){ + # nocov start + drake_error( + "to use make(parallelism = \"clustermq_staged\"), ", + "you must install the clustermq package: ", + "https://github.com/mschubert/clustermq.", + config = config + ) + # nocov end + } + schedule <- config$schedule + workers <- clustermq::workers(n_jobs = config$jobs) + on.exit(workers$finalize()) + while (length(V(schedule)$name)){ + stage <- next_stage(config = config, schedule = schedule) + schedule <- stage$schedule + if (!length(stage$targets)){ + break + } else if (any(stage$targets %in% config$plan$target)){ + set_attempt_flag(key = "_attempt", config = config) + } + prune_envir( + targets = stage$targets, + config = config, + jobs = config$jobs_imports + ) + export <- list() + if (identical(config$envir, globalenv())){ + export <- as.list(config$envir, all.names = TRUE) # nocov + } + export$config <- config + export$meta_list <- stage$meta_list + meta_list <- NULL + tmp <- lightly_parallelize( + X = stage$targets, + FUN = function(target){ + announce_build( + target = target, + meta = stage$meta_list[[target]], + config = config + ) + }, + jobs = config$jobs_imports + ) + builds <- clustermq::Q( + stage$targets, + fun = function(target){ + # This call is actually tested in tests/testthat/test-clustermq.R. + # nocov start + drake::build_clustermq( + target = target, + meta_list = meta_list, + config = config + ) + # nocov end + }, + workers = workers, + export = export + ) + tmp <- lightly_parallelize( + X = builds, + FUN = function(build){ + wait_for_file(build = build, config = config) + conclude_build( + target = build$target, + value = build$value, + meta = build$meta, + config = config + ) + }, + jobs = config$jobs_imports + ) + } + invisible() +} + +#' @title Build a target using the clustermq backend +#' @description For internal use only +#' @export +#' @keywords internal +#' @inheritParams drake_build +#' @param meta_list list of metadata +build_clustermq <- function(target, meta_list, config){ + # This function is actually tested in tests/testthat/test-clustermq.R. + # nocov start + do_prework(config = config, verbose_packages = FALSE) + build <- just_build( + target = target, + meta = meta_list[[target]], + config = config + ) + if (is_file(target)){ + build$checksum <- rehash_file(target, config = config) + } + build + # nocov end +} + +wait_for_file <- function(build, config){ + if (!length(build$checksum)){ + return() + } + R.utils::withTimeout({ + while (!file.exists(drake_unquote(build$target))){ + Sys.sleep(mc_wait) # nocov + } + while (!identical(rehash_file(build$target, config), build$checksum)){ + Sys.sleep(mc_wait) # nocov + } + }, + timeout = 60 + ) +} diff --git a/man/build_clustermq.Rd b/man/build_clustermq.Rd new file mode 100644 index 000000000..14e63106c --- /dev/null +++ b/man/build_clustermq.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/staged.R +\name{build_clustermq} +\alias{build_clustermq} +\title{Build a target using the clustermq backend} +\usage{ +build_clustermq(target, meta_list, config) +} +\arguments{ +\item{target}{name of the target} + +\item{meta_list}{list of metadata} + +\item{config}{internal configuration list} +} +\description{ +For internal use only +} +\keyword{internal} diff --git a/man/drake_build.Rd b/man/drake_build.Rd index 11cc8c485..b515076d3 100644 --- a/man/drake_build.Rd +++ b/man/drake_build.Rd @@ -37,10 +37,7 @@ will not be replaced.} The value of the target right after it is built. } \description{ -For internal use only. -the only reason this function is exported -is to set up parallel socket (PSOCK) clusters -without much of a fuss. +Also load the target's dependencies beforehand. } \examples{ \dontrun{ diff --git a/tests/testthat/test-clustermq.R b/tests/testthat/test-clustermq.R new file mode 100644 index 000000000..2c346dda9 --- /dev/null +++ b/tests/testthat/test-clustermq.R @@ -0,0 +1,32 @@ +drake_context("clustermq") + +test_with_dir("clustermq_staged parallelism", { + skip_on_cran() + if ("package:clustermq" %in% search()){ + detach("package:clustermq", unload = TRUE) + } + options(clustermq.scheduler = "multicore") + skip_if_not_installed("clustermq") + skip_on_os("windows") + scenario <- get_testing_scenario() + e <- eval(parse(text = scenario$envir)) + jobs <- scenario$jobs + load_mtcars_example(envir = e) + make( + e$my_plan, + parallelism = "clustermq_staged", + jobs = jobs, + envir = e, + verbose = 4 + ) + config <- drake_config(e$my_plan, envir = e) + expect_equal(outdated(config), character(0)) + make( + e$my_plan, + parallelism = "clustermq_staged", + jobs = jobs, + envir = e, + verbose = 4 + ) + expect_equal(justbuilt(config), character(0)) +}) diff --git a/tests/testthat/test-future.R b/tests/testthat/test-future.R index 95afc09a7..9b3da2467 100644 --- a/tests/testthat/test-future.R +++ b/tests/testthat/test-future.R @@ -26,16 +26,18 @@ test_with_dir("future package functionality", { } # Stuff is already up to date. - config <- make( - e$my_plan, - envir = e, - parallelism = backends[3], - caching = caching[3], - jobs = 1, - verbose = FALSE, - session_info = FALSE - ) - expect_equal(justbuilt(config), character(0)) + for (i in 1:4){ + config <- make( + e$my_plan, + envir = e, + parallelism = backends[i], + caching = caching[i], + jobs = 1, + verbose = FALSE, + session_info = FALSE + ) + expect_equal(justbuilt(config), character(0)) + } # Workers can wait for dependencies. e$my_plan$command[2] <- "Sys.sleep(2); simulate(48)"