Skip to content

Commit

Permalink
Merge pull request #544 from ropensci/539
Browse files Browse the repository at this point in the history
Optionally load targets in parallel workers
  • Loading branch information
wlandau-lilly authored Oct 12, 2018
2 parents 93df07a + f6befe7 commit 21f0ba1
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 128 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
- Call `path.expand()` on the `file` argument to `render_drake_graph()` and `render_sankey_drake_graph()`. That way, tildes in file paths no longer interfere with the rendering of static image files. Compensates for https://github.com/wch/webshot.
- Skip tests and examples if the required "Suggests" packages are not installed.
- Stop checking for non-standard columns. Previously, warnings about non-standard columns were incorrectly triggered by `evaluate_plan(trace = TRUE)` followed by `expand_plan()`, `gather_plan()`, `reduce_plan()`, `gather_by()`, or `reduce_by()`. The more relaxed behavior also gives users more options about how to construct and maintain their workflow plan data frames.
- Use checksums in `"future"` parallelism to make sure files travel over network file systems before proceeding to downstream targets.
- Refactor and clean up checksum code.

## Enhancements

Expand Down
106 changes: 106 additions & 0 deletions R/checksums.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
mc_get_checksum <- function(target, config){
paste(
safe_get_hash(
key = target,
namespace = config$cache$default_namespace,
config = config
),
safe_get_hash(key = target, namespace = "kernels", config = config),
safe_get_hash(key = target, namespace = "meta", config = config),
safe_get_hash(key = target, namespace = "attempt", config = config),
mc_get_outfile_checksum(target, config),
sep = " "
)
}

mc_get_outfile_checksum <- function(target, config){
deps <- vertex_attr(
graph = config$graph,
name = "deps",
index = target
)[[1]]
files <- sort(unique(as.character(deps$file_out)))
vapply(
X = files,
FUN = rehash_file,
FUN.VALUE = character(1),
config = config
) %>%
paste(collapse = "") %>%
digest::digest(algo = config$long_hash_algo, serialize = FALSE)
}

mc_is_good_checksum <- function(target, checksum, config){
# Not actually reached, but may come in handy later.
# nocov start
if (!length(checksum)){
mc_warn_no_checksum(target = target, config = config)
return(TRUE)
}
if (identical("failed", get_progress_single(target, cache = config$cache))){
return(TRUE) # covered with parallel processes # nocov
}
# nocov end
local_checksum <- mc_get_checksum(target = target, config = config)
if (!identical(local_checksum, checksum)){
return(FALSE)
}
all(
vapply(
X = unlist(strsplit(local_checksum, " "))[1:3], # Exclude attempt flag (often NA). # nolint
config$cache$exists_object,
FUN.VALUE = logical(1)
)
)
}

mc_is_good_outfile_checksum <- function(target, checksum, config){
if (!length(checksum)){
mc_warn_no_checksum(target = target, config = config)
return(TRUE)
}
if (identical("failed", get_progress_single(target, cache = config$cache))){
return(TRUE) # covered with parallel processes # nocov
}
identical(checksum, mc_get_outfile_checksum(target = target, config = config))
}

mc_wait_checksum <- function(
target,
checksum,
config,
timeout = 300,
criterion = mc_is_good_checksum
){
i <- 0
while (i < timeout / mc_wait){
if (criterion(target, checksum, config)){
return()
} else {
Sys.sleep(mc_wait)
}
i <- i + 1
}
drake_error(
"Target `", target, "` did not download from your ",
"network file system. Checksum verification timed out after about ",
timeout, " seconds.", config = config
)
}

mc_wait_outfile_checksum <- function(target, checksum, config, timeout = 300){
mc_wait_checksum(
target = target,
checksum = checksum,
config = config,
timeout = timeout,
criterion = mc_is_good_outfile_checksum
)
}

mc_warn_no_checksum <- function(target, config){
drake_warning(
"No checksum available for target ", target, ".",
config = config
)
}
20 changes: 14 additions & 6 deletions R/clustermq.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
run_clustermq <- function(config){
assert_pkg("clustermq", version = "0.8.4.99")
assert_pkg("clustermq", version = "0.8.5")
config$queue <- new_priority_queue(
config = config,
jobs = config$jobs_imports
Expand Down Expand Up @@ -70,8 +70,12 @@ cmq_send_target <- function(config){
# nocov end
meta$start <- proc.time()
announce_build(target = target, meta = meta, config = config)
prune_envir(targets = target, config = config, jobs = 1)
deps <- cmq_deps_list(target = target, config = config)
if (identical(config$caching, "master")){
prune_envir(targets = target, config = config, jobs = 1)
deps <- cmq_deps_list(target = target, config = config)
} else {
deps <- NULL
}
config$workers$send_call(
expr = drake::cmq_build(
target = target,
Expand Down Expand Up @@ -109,12 +113,16 @@ cmq_build <- function(target, meta, deps, config){
gc()
}
do_prework(config = config, verbose_packages = FALSE)
for (dep in names(deps)){
config$envir[[dep]] <- deps[[dep]]
if (identical(config$caching, "master")){
for (dep in names(deps)){
config$envir[[dep]] <- deps[[dep]]
}
} else {
prune_envir(targets = target, config = config, jobs = 1)
}
build <- just_build(target = target, meta = meta, config = config)
if (identical(config$caching, "master")){
build$checksum <- mc_output_file_checksum(target, config)
build$checksum <- mc_get_outfile_checksum(target, config)
return(build)
}
conclude_build(
Expand Down
55 changes: 42 additions & 13 deletions R/future.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,24 @@ run_future <- function(config){
#' @param config [drake_config()] list
#' @param protect Names of targets that still need their
#' dependencies available in `config$envir`.
drake_future_task <- function(target, meta, config){
drake_future_task <- function(target, meta, config, protect){
if (identical(config$caching, "worker")){
prune_envir(targets = target, config = config, downstream = protect)
}
do_prework(config = config, verbose_packages = FALSE)
if (config$caching == "worker"){
build_and_store(
target = target, meta = meta, config = config, announce = FALSE)
} else {
just_build(target = target, meta = meta, config = config)
build <- just_build(target = target, meta = meta, config = config)
if (identical(config$caching, "master")){
build$checksum <- mc_get_outfile_checksum(target, config)
return(build)
}
conclude_build(
target = build$target,
value = build$value,
meta = build$meta,
config = config
)
set_attempt_flag(key = build$target, config = config)
list(target = target, checksum = mc_get_checksum(target, config))
}

new_worker <- function(id, target, config, protect){
Expand All @@ -65,14 +75,21 @@ new_worker <- function(id, target, config, protect){
)){
return(empty_worker(target = target))
}
prune_envir(targets = target, config = config, downstream = protect)
if (identical(config$caching, "master")){
prune_envir(targets = target, config = config, downstream = protect)
}
meta$start <- proc.time()
config$cache$flush_cache() # Less data to pass this way.
DRAKE_GLOBALS__ <- NULL # Fixes warning about undefined globals.
# Avoid potential name conflicts with other globals.
# When we solve #296, the need for such a clumsy workaround
# should go away.
globals <- future_globals(target = target, meta = meta, config = config)
globals <- future_globals(
target = target,
meta = meta,
config = config,
protect = protect
)
evaluator <- drake_plan_override(
target = target,
field = "evaluator",
Expand All @@ -85,7 +102,8 @@ new_worker <- function(id, target, config, protect){
expr = drake_future_task(
target = DRAKE_GLOBALS__$target,
meta = DRAKE_GLOBALS__$meta,
config = DRAKE_GLOBALS__$config
config = DRAKE_GLOBALS__$config,
protect = DRAKE_GLOBALS__$protect
),
packages = "drake",
globals = globals,
Expand All @@ -95,12 +113,13 @@ new_worker <- function(id, target, config, protect){
)
}

future_globals <- function(target, meta, config){
future_globals <- function(target, meta, config, protect){
globals <- list(
DRAKE_GLOBALS__ = list(
target = target,
meta = meta,
config = config
config = config,
protect = protect
)
)
if (identical(config$envir, globalenv())){
Expand Down Expand Up @@ -206,11 +225,21 @@ conclude_worker <- function(worker, config, queue){
if (is_empty_worker(worker)){
return(out)
}
set_attempt_flag(key = "_attempt", config = config)
build <- resolve_worker_value(worker = worker, config = config)
if (config$caching == "worker"){
if (identical(config$caching, "worker")){
mc_wait_checksum(
target = build$target,
checksum = build$checksum,
config = config
)
return(out)
}
set_attempt_flag(key = build$target, config = config)
mc_wait_outfile_checksum(
target = build$target,
checksum = build$checksum,
config = config
)
conclude_build(
target = build$target,
value = build$value,
Expand Down
84 changes: 0 additions & 84 deletions R/mc_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -163,90 +163,6 @@ mc_worker_id <- function(x){
paste0("worker_", x)
}

mc_get_checksum <- function(target, config){
paste(
safe_get_hash(
key = target,
namespace = config$cache$default_namespace,
config = config
),
safe_get_hash(key = target, namespace = "kernels", config = config),
safe_get_hash(key = target, namespace = "meta", config = config),
safe_get_hash(key = target, namespace = "attempt", config = config),
mc_output_file_checksum(target, config),
sep = " "
)
}

mc_output_file_checksum <- function(target, config){
deps <- vertex_attr(
graph = config$graph,
name = "deps",
index = target
)[[1]]
files <- sort(unique(as.character(deps$file_out)))
vapply(
X = files,
FUN = rehash_file,
FUN.VALUE = character(1),
config = config
) %>%
paste(collapse = "") %>%
digest::digest(algo = config$long_hash_algo, serialize = FALSE)
}

mc_is_good_checksum <- function(target, checksum, config){
local_checksum <- mc_get_checksum(target = target, config = config)
if (!identical(local_checksum, checksum)){
return(FALSE)
}
if (identical("failed", get_progress_single(target, cache = config$cache))){
return(TRUE) # covered with parallel processes # nocov
}
all(
vapply(
X = unlist(strsplit(local_checksum, " "))[1:3], # Exclude attempt flag (often NA). # nolint
config$cache$exists_object,
FUN.VALUE = logical(1)
)
)
}

mc_wait_checksum <- function(target, checksum, config, timeout = 300){
i <- 0
while (i < timeout / mc_wait){
if (mc_is_good_checksum(target, checksum, config)){
return()
} else {
Sys.sleep(mc_wait)
}
i <- i + 1
}
drake_error(
"Target `", target, "` did not download from your ",
"network file system. Checksum verification timed out after about ",
timeout, " seconds.", config = config
)
}

mc_wait_outfile_checksum <- function(target, checksum, config, timeout = 300){
i <- 0
while (i < timeout / mc_wait){
local_checksum <- mc_output_file_checksum(target, config)
if (identical(local_checksum, checksum)){
return()
} else {
Sys.sleep(mc_wait)
}
i <- i + 1
}
drake_error(
"Target `", target, "` did not download from your ",
"network file system. Checksum verification timed out after about ",
timeout, " seconds.", config = config
)
}

mc_abort_with_errored_workers <- function(config){
if (length(failed_workers <- config$cache$list("mc_error"))){
if (!identical(config$keep_going, TRUE)){
Expand Down
2 changes: 1 addition & 1 deletion R/staged.R
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ cmq_staged_build <- function(target, meta_list, config){
config = config
)
if (identical(config$caching, "master")){
build$checksum <- mc_output_file_checksum(target, config)
build$checksum <- mc_get_outfile_checksum(target, config)
return(build)
}
conclude_build(
Expand Down
4 changes: 3 additions & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ artifacts:
environment:
NOT_CRAN: true
global:
WARNINGS_ARE_ERRORS: 1
R_REMOTES_NO_ERRORS_FROM_WARNINGS: true
USE_RTOOLS: true
WARNINGS_ARE_ERRORS: 1

matrix:
- R_VERSION: release
arch: x64
2 changes: 1 addition & 1 deletion man/deps_code.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/drake_future_task.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 21f0ba1

Please sign in to comment.