diff --git a/DESCRIPTION b/DESCRIPTION index 9f5f8e2..6e9773f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: paramvalf Type: Package Title: Parameter Value Analysis Framework -Version: 2.4.1 +Version: 2.5.0 Author: Martin Ueding Authors@R: person("Ueding", "Martin", email = 'dev@martin-ueding.de', role = c('aut', 'cre')) Maintainer: Martin Ueding diff --git a/NAMESPACE b/NAMESPACE index 2878d47..7cee756 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -6,7 +6,9 @@ export(filter_paramval) export(get_root_dir) export(inner_outer_join) export(inner_outer_join_impl) +export(lazy_value) export(list_transpose) +export(load.lazy_value) export(make_filename) export(make_name) export(make_summary) diff --git a/R/call.R b/R/call.R index 0d1b913..fe6020b 100644 --- a/R/call.R +++ b/R/call.R @@ -16,12 +16,15 @@ #' @param dynamic_scheduling Logical, if true the each work packet will be #' assigned to a newly forked process. This provides best load balancing with #' a high cost of overhead. Should only be used for expensive tasks. +#' @param filename `NULL` or character. If a filename is given, then the +#' individual `value` objects are stored in +#' `output/values/FILENAME/value-%d.Rdata`. #' #' @return PV container with the results, same number of rows as the #' intermediate PV container. #' #' @export -pv_call <- function(func, ..., serial = FALSE, convert = c(), dynamic_scheduling = FALSE) { +pv_call <- function(func, ..., serial = FALSE, convert = c(), dynamic_scheduling = FALSE, filename = NULL) { stopifnot(inherits(func, 'function')) if (exists('paramval_rval')) { @@ -43,7 +46,19 @@ pv_call <- function(func, ..., serial = FALSE, convert = c(), dynamic_scheduling param_row <- get_row(joined$param, i) value_row <- joined$value[[i]] - result <- func(param_row, value_row) + value_loaded <- load_lazy_value.list(value_row) + + result <- func(param_row, value_loaded) + + rm(value_loaded) + + if (store && + object.size(result) * length(indices) >= get_lazy_threshold() && + !(length(names(result)) == 1 && names(result) == c('summary'))) { + for (name in names(result)) { + result[[name]] <- lazy_value(result[[name]], cluster, rvar_name, i, name) + } + } return (result) } @@ -54,6 +69,8 @@ pv_call <- function(func, ..., serial = FALSE, convert = c(), dynamic_scheduling stop('There are no results. This could be because every single function call returned `NA` and was therefore discarded.') } + delete_rdata_directory(filename) + result <- list(param = joined$param[pp$not_na, , drop = FALSE], value = pp$value) @@ -65,6 +82,10 @@ pv_call <- function(func, ..., serial = FALSE, convert = c(), dynamic_scheduling e <- parent.frame() e[[rvar_name]] <- result + if (store) { + pv_save(cluster, rvar, name = rvar_name) + } + invisible(result) } diff --git a/R/io.R b/R/io.R index f69eba2..e8388d3 100644 --- a/R/io.R +++ b/R/io.R @@ -90,8 +90,68 @@ pv_load <- function (cluster, x, name, envir = NULL) { vars <- load(filename, envir = envir) + if (eager) { + for (var in vars) { + e[[var]]$value <- lapply(e[[var]]$value, load_lazy_value.list) + } + } + end_time <- Sys.time() if (want_verbose()) { cat(' took', sprintf('%.2f', end_time - start_time), 'seconds.\n') } } + +#' Create a lazy value +#' +#' The given value is stored on disk (using `save`) and a handle for retrieving +#' it will be returned. +#' +#' @export +lazy_value <- function (sub_value, cluster, name, index, value_name) { + path <- sprintf('%s/output/%s/%s.Rdata.dir/%d-%s.Rdata', get_root_dir(), cluster, name, index, value_name) + if (!dir.exists(dirname(path))) { + stopifnot(dir.create(dirname(path))) + } + save(sub_value = sub_value, file = path) + + self <- list(path = path) + class(self) <- append(class(self), 'lazy_value') + return (self) + + #rlang::env_bind_exprs(environment(), lv = { load.lazy_value(self) }) + #return (lv) +} + +#' Load a lazy value +#' +#' @export +load.lazy_value <- function (self) { + stopifnot(inherits(self, 'lazy_value')) + + vars <- load(self$path) + stopifnot(any('sub_value' %in% vars)) + + return (sub_value) +} + +load_lazy_value.list <- function (self) { + lapply(self, function (x) { + if (inherits(x, 'lazy_value')) { + return (load.lazy_value(x)) + } else { + return (x) + } + }) +} + +get_lazy_threshold <- function () { + getOption('paramvalf_lazy_threshold', 1000 * 2^20) +} + +delete_rdata_directory <- function (cluster, name) { + path <- sprintf('%s/output/%s/%s.Rdata.dir', get_root_dir(), cluster, name) + if (dir.exists(path)) { + stopifnot(unlink(path, recursive = TRUE) == 0) + } +}