Newer
Older
Compute <- function(workflow, chunks = 'auto',
ncores = 1, cluster = NULL, shared_dir = NULL,
ecflow_host = NULL, silent = FALSE, debug = FALSE,
wait = TRUE) {
# Check workflow
if (!any(c('startR_cube', 'startR_workflow') %in% class(workflow))) {
stop("Parameter 'workflow' must be an object of class 'startR_cube' as ",
"returned by Start or of class 'startR_workflow' as returned by ",
"AddStep.")
}
if ('startR_cube' %in% class(workflow)) {
#machine_free_ram <- 1000000000
#max_ram_ratio <- 0.5
#data_size <- prod(c(attr(workflow, 'Dimensions'), 8))
#if (data_size > (machine_free_ram * max_ram_ratio)) {
# stop("It is not possible to fit the requested data (", data_size,
# " bytes) into the maximum allowed free ram (", max_ram_ratio,
# " x ", machine_free_ram, ").")
#}
eval(workflow)
# TODO:
#explore tree of operations and identify set of operations that reduce dimensionality as much as possible
# while being able to fit in (cluster and to exploit number of available nodes) | (machine)
#combine set of operations into a single function
#Goal: to build manually a function following this pattern:
#operation <- function(input1, input2) {
# fun1 <- workflow$fun
# fun1(input1, input2, names(workflow$params)[1] = workflow$params[[1]])
#}
op_text <- "function("
op_text <- paste0(op_text,
paste(paste0('input', 1:length(workflow$inputs)),
collapse = ', '))
op_text <- paste0(op_text, ") {")
op_text <- paste0(op_text, "\n fun1 <- ", paste(deparse(workflow$fun), collapse = '\n'))
op_text <- paste0(op_text, "\n res <- fun1(",
paste(paste0('input', 1:length(workflow$inputs)),
collapse = ", "))
op_text <- paste0(op_text, ", ")
if (length(workflow$params) > 0) {
for (j in 1:length(workflow$params)) {
op_text <- paste0(op_text, names(workflow$params)[j], " = ",
paste(deparse(workflow$params[[j]]), collapse = '\n'))
}
}
op_text <- paste0(op_text, ")")
op_text <- paste0(op_text, "\n}")
operation <- eval(parse(text = op_text))
operation <- Step(operation,
attr(workflow$fun, 'TargetDims'),
attr(workflow$fun, 'OutputDims'))
if (!all(sapply(workflow$inputs, class) == 'startR_header')) {
stop("Workflows with only one step supported by now.")
}
# Run ByChunks with the combined operation
res <- ByChunks(step_fun = operation,
chunks = chunks, ncores = ncores,
cluster = cluster, shared_dir = shared_dir,
ecflow_host = ecflow_host, silent = silent,
debug = debug, wait = wait)
# TODO: carry out remaining steps locally, using multiApply
# Return results
res