Compute <- function(workflow, chunks = 'auto', ncores = 1, cluster = NULL, ecflow_output_dir = NULL, is_ecflow_output_dir_shared = FALSE, ecflow_server = NULL, silent = FALSE, debug = FALSE, wait = TRUE) { # Check workflow if (!any(c('startR_cube', 'startR_workflow') %in% class(workflow))) { stop("Parameter 'workflow' must be an object of class 'startR_cube' as ", "returned by Start or of class 'startR_workflow' as returned by ", "AddStep.") } if ('startR_cube' %in% class(workflow)) { #machine_free_ram <- 1000000000 #max_ram_ratio <- 0.5 #data_size <- prod(c(attr(workflow, 'Dimensions'), 8)) #if (data_size > (machine_free_ram * max_ram_ratio)) { # stop("It is not possible to fit the requested data (", data_size, # " bytes) into the maximum allowed free ram (", max_ram_ratio, # " x ", machine_free_ram, ").") #} eval(workflow) } else { # TODO: #explore tree of operations and identify set of operations that reduce dimensionality as much as possible # while being able to fit in (cluster and to exploit number of available nodes) | (machine) #combine set of operations into a single function #Goal: to build manually a function following this pattern: #operation <- function(input1, input2) { # fun1 <- workflow$fun # fun1(input1, input2, names(workflow$params)[1] = workflow$params[[1]]) #} op_text <- "function(" op_text <- paste0(op_text, paste(paste0('input', 1:length(workflow$inputs)), collapse = ', ')) op_text <- paste0(op_text, ") {") op_text <- paste0(op_text, "\n fun1 <- ", paste(deparse(workflow$fun), collapse = '\n')) op_text <- paste0(op_text, "\n res <- fun1(", paste(paste0('input', 1:length(workflow$inputs)), collapse = ", ")) if (length(workflow$params) > 0) { for (j in 1:length(workflow$params)) { op_text <- paste0(op_text, ", ") op_text <- paste0(op_text, names(workflow$params)[j], " = ", paste(deparse(workflow$params[[j]]), collapse = '\n')) } } op_text <- paste0(op_text, ")") op_text <- paste0(op_text, "\n}") operation <- eval(parse(text = op_text)) operation <- Step(operation, attr(workflow$fun, 'TargetDims'), attr(workflow$fun, 'OutputDims')) if (!all(sapply(workflow$inputs, class) == 'startR_header')) { stop("Workflows with only one step supported by now.") } # Run ByChunks with the combined operation res <- ByChunks(step_fun = operation, cube_headers = workflow$inputs, chunks = chunks, ncores = ncores, cluster = cluster, ecflow_output_dir = ecflow_output_dir, is_ecflow_output_dir_shared = is_ecflow_output_dir_shared, ecflow_server = ecflow_server, silent = silent, debug = debug, wait = wait) # TODO: carry out remaining steps locally, using multiApply # Return results res } }