Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clustermq-based staged scheduling #452

Merged
merged 10 commits into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Imports:
Suggests:
abind,
callr,
clustermq,
DBI,
MASS,
methods,
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export(as_file)
export(available_hash_algos)
export(backend)
export(bind_plans)
export(build_clustermq)
export(build_drake_graph)
export(build_graph)
export(build_times)
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Version 5.2.1.9000

- Add `make(parallelism = "future_lapply_staged")`, a `future`-based staged parallelism backend (see https://github.com/ropensci/drake/pull/450).
- Implement `make(parallelism = "clustermq_staged")`, a `clustermq`-based staged parallelism backend (see https://github.com/ropensci/drake/pull/452).
- Implement `make(parallelism = "future_lapply_staged")`, a `future`-based staged parallelism backend (see https://github.com/ropensci/drake/pull/450).
- Depend on `codetools` rather than `CodeDepends` for finding global variables.
- Skip more tests on CRAN. White-list tests instead of blacklisting them in order to try to keep check time under the official 10-minute cap.
- Disallow wildcard names to grep-match other wildcard names or any replacement values. This will prevent careless mistakes and confusion when generating `drake_plan()`s.
Expand Down
5 changes: 1 addition & 4 deletions R/build.R
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#' @title Build/process a single target or import.
#' @export
#' @description For internal use only.
#' the only reason this function is exported
#' is to set up parallel socket (PSOCK) clusters
#' without much of a fuss.
#' @description Also load the target's dependencies beforehand.
#' @return The value of the target right after it is built.
#' @param target name of the target
#' @param meta list of metadata that tell which
Expand Down
2 changes: 1 addition & 1 deletion R/config.R
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ drake_config <- function(
}

add_packages_to_prework <- function(packages, prework) {
packages <- unique(c("methods", packages))
packages <- unique(c("methods", "drake", packages))
paste0("if(!R.utils::isPackageLoaded(\"", packages, "\")) require(",
packages, ")", sep = "") %>% c(prework)
}
Expand Down
1 change: 1 addition & 0 deletions R/parallel_ui.R
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ parallelism_choices <- function(distributed_only = FALSE) {
"parLapply_staged"
)
distributed <- c(
"clustermq_staged",
"future",
"future_lapply",
"future_lapply_staged",
Expand Down
115 changes: 115 additions & 0 deletions R/staged.R
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,118 @@ run_future_lapply_staged <- function(config){
}
invisible()
}

run_clustermq_staged <- function(config){
if (!requireNamespace("clustermq")){
# nocov start
drake_error(
"to use make(parallelism = \"clustermq_staged\"), ",
"you must install the clustermq package: ",
"https://github.com/mschubert/clustermq.",
config = config
)
# nocov end
}
schedule <- config$schedule
workers <- clustermq::workers(n_jobs = config$jobs)
on.exit(workers$finalize())
while (length(V(schedule)$name)){
stage <- next_stage(config = config, schedule = schedule)
schedule <- stage$schedule
if (!length(stage$targets)){
break
} else if (any(stage$targets %in% config$plan$target)){
set_attempt_flag(key = "_attempt", config = config)
}
prune_envir(
targets = stage$targets,
config = config,
jobs = config$jobs_imports
)
export <- list()
if (identical(config$envir, globalenv())){
export <- as.list(config$envir, all.names = TRUE) # nocov
}
export$config <- config
export$meta_list <- stage$meta_list
meta_list <- NULL
tmp <- lightly_parallelize(
X = stage$targets,
FUN = function(target){
announce_build(
target = target,
meta = stage$meta_list[[target]],
config = config
)
},
jobs = config$jobs_imports
)
builds <- clustermq::Q(
stage$targets,
fun = function(target){
# This call is actually tested in tests/testthat/test-clustermq.R.
# nocov start
drake::build_clustermq(
target = target,
meta_list = meta_list,
config = config
)
# nocov end
},
workers = workers,
export = export
)
tmp <- lightly_parallelize(
X = builds,
FUN = function(build){
wait_for_file(build = build, config = config)
conclude_build(
target = build$target,
value = build$value,
meta = build$meta,
config = config
)
},
jobs = config$jobs_imports
)
}
invisible()
}

#' @title Build a target using the clustermq backend
#' @description For internal use only
#' @export
#' @keywords internal
#' @inheritParams drake_build
#' @param meta_list list of metadata
build_clustermq <- function(target, meta_list, config){
# This function is actually tested in tests/testthat/test-clustermq.R.
# nocov start
do_prework(config = config, verbose_packages = FALSE)
build <- just_build(
target = target,
meta = meta_list[[target]],
config = config
)
if (is_file(target)){
build$checksum <- rehash_file(target, config = config)
}
build
# nocov end
}

wait_for_file <- function(build, config){
if (!length(build$checksum)){
return()
}
R.utils::withTimeout({
while (!file.exists(drake_unquote(build$target))){
Sys.sleep(mc_wait) # nocov
}
while (!identical(rehash_file(build$target, config), build$checksum)){
Sys.sleep(mc_wait) # nocov
}
},
timeout = 60
)
}
19 changes: 19 additions & 0 deletions man/build_clustermq.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions man/drake_build.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions tests/testthat/test-clustermq.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
drake_context("clustermq")

test_with_dir("clustermq_staged parallelism", {
skip_on_cran()
if ("package:clustermq" %in% search()){
detach("package:clustermq", unload = TRUE)
}
options(clustermq.scheduler = "multicore")
skip_if_not_installed("clustermq")
skip_on_os("windows")
scenario <- get_testing_scenario()
e <- eval(parse(text = scenario$envir))
jobs <- scenario$jobs
load_mtcars_example(envir = e)
make(
e$my_plan,
parallelism = "clustermq_staged",
jobs = jobs,
envir = e,
verbose = 4
)
config <- drake_config(e$my_plan, envir = e)
expect_equal(outdated(config), character(0))
make(
e$my_plan,
parallelism = "clustermq_staged",
jobs = jobs,
envir = e,
verbose = 4
)
expect_equal(justbuilt(config), character(0))
})
22 changes: 12 additions & 10 deletions tests/testthat/test-future.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ test_with_dir("future package functionality", {
}

# Stuff is already up to date.
config <- make(
e$my_plan,
envir = e,
parallelism = backends[3],
caching = caching[3],
jobs = 1,
verbose = FALSE,
session_info = FALSE
)
expect_equal(justbuilt(config), character(0))
for (i in 1:4){
config <- make(
e$my_plan,
envir = e,
parallelism = backends[i],
caching = caching[i],
jobs = 1,
verbose = FALSE,
session_info = FALSE
)
expect_equal(justbuilt(config), character(0))
}

# Workers can wait for dependencies.
e$my_plan$command[2] <- "Sys.sleep(2); simulate(48)"
Expand Down