Newer
Older
#'Specify the execution parameters and trigger the execution
#'
#'The step of the startR workflow after the complete workflow is defined by
#'AddStep(). This function specifies the execution parameters and triggers the
#'execution. The execution can be operated locally or on a remote machine. If
#'it is the latter case, the configuration of the machine needs to be
#'sepecified in the function, and the EC-Flow server is required to be
#'The execution can be operated by chunks to avoid overloading the RAM memory.
#'After all the chunks are finished, Compute() will gather and merge them, and
#'return a single data object, including one or multiple multidimensional data
#'arrays and additional metadata.
#'
#'@param workflow A list of the class 'startR_workflow' returned by function
#' AddSteop() or of class 'startR_cube' returned by function Start(). It
#' contains all the objects needed for the execution.
#'@param chunks A named list of dimensions which to split the data along and
#' the number of chunks to make for each. The chunked dimension can only be
#' those not required as the target dimension in function Step(). The default
#' value is 'auto', which lists all the non-target dimensions and each one has
#' one chunk.
#'@param threads_load An integer indicating the number of execution processes to
#'@param threads_compute An integer indicating the number of execution processes
#' to use for the computation. The default value is 1.
#'@param cluster A list of components that define the configuration of the
#' machine to be run on. The comoponents vary from the different machines.
#' Check \href{https://earth.bsc.es/gitlab/es/startR/-/blob/master/inst/doc/practical_guide.md}{Practical guide on GitLab} for more
#' details and examples. Only needed when the computation is not run locally.
#' The default value is NULL.
#'@param workflow_manager Can be NULL, 'ecFlow' or 'Autosubmit'. The default is 'ecFlow'.
#'@param ecflow_suite_dir A character string indicating the path to a folder in
#' the local workstation where to store temporary files generated for the
#' automatic management of the workflow. Only needed when the execution is run
#' remotely. The default value is NULL.
#'@param ecflow_server A named vector indicating the host and port of the
#' EC-Flow server. The vector form should be
#' \code{c(host = 'hostname', port = port_number)}. Only needed when the
#' execution is run#' remotely. The default value is NULL.
#'@param silent A logical value deciding whether to print the computation
#' progress (FALSE) on the R session or not (TRUE). It only works when the
#' execution runs locally or the parameter 'wait' is TRUE. The default value
#' is FALSE.
#'@param debug A logical value deciding whether to return detailed messages on
#' the progress and operations in a Compute() call (TRUE) or not (FALSE).
#' Automatically changed to FALSE if parameter 'silent' is TRUE. The default
#' value is FALSE.
#'@param wait A logical value deciding whether the R session waits for the
#' Compute() call to finish (TRUE) or not (FALSE). If FALSE, it will return an
#' object with all the information of the startR execution that can be stored
#' in your disk. After that, the R session can be closed and the results can
#' be collected later with the Collect() function. The default value is TRUE.
#'
#'@return A list of data arrays for the output returned by the last step in the
#' specified workflow (wait = TRUE), or an object with information about the
#' startR execution (wait = FALSE). The configuration details and profiling
#' information are attached as attributes to the returned list of arrays.
#' data_path <- system.file('extdata', package = 'startR')
#' path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc')
#' sdates <- c('200011', '200012')
#' data <- Start(dat = list(list(path = path_obs)),
#' var = 'tos',
#' sdate = sdates,
#' time = 'all',
#' latitude = 'all',
#' longitude = 'all',
#' return_vars = list(latitude = 'dat',
#' longitude = 'dat',
#' time = 'sdate'),
#' retrieve = FALSE)
#' fun <- function(x) {
#' lat = attributes(x)$Variables$dat1$latitude
#' weight = sqrt(cos(lat * pi / 180))
#' corrected = Apply(list(x), target_dims = "latitude",
#' fun = function(x) {x * weight})
#' }
#' step <- Step(fun = fun,
#' target_dims = 'latitude',
#' output_dims = 'latitude',
#' use_libraries = c('multiApply'),
#' use_attributes = list(data = "Variables"))
#' wf <- AddStep(data, step)
#' res <- Compute(wf, chunks = list(longitude = 4, sdate = 2))
#'
Compute <- function(workflow, chunks = 'auto', workflow_manager = 'ecFlow',
threads_load = 1, threads_compute = 1,
cluster = NULL, ecflow_suite_dir = NULL, ecflow_server = NULL,
autosubmit_suite_dir = NULL, autosubmit_server = NULL,
silent = FALSE, debug = FALSE, wait = TRUE) {
if (!is(workflow, 'startR_cube') & !is(workflow, 'startR_workflow')) {
stop("Parameter 'workflow' must be an object of class 'startR_cube' as ",
"returned by Start or of class 'startR_workflow' as returned by ",
"AddStep.")
}
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#machine_free_ram <- 1000000000
#max_ram_ratio <- 0.5
#data_size <- prod(c(attr(workflow, 'Dimensions'), 8))
#if (data_size > (machine_free_ram * max_ram_ratio)) {
# stop("It is not possible to fit the requested data (", data_size,
# " bytes) into the maximum allowed free ram (", max_ram_ratio,
# " x ", machine_free_ram, ").")
#}
eval(workflow)
} else {
# TODO:
#explore tree of operations and identify set of operations that reduce dimensionality as much as possible
# while being able to fit in (cluster and to exploit number of available nodes) | (machine)
#combine set of operations into a single function
#Goal: to build manually a function following this pattern:
#operation <- function(input1, input2) {
# fun1 <- workflow$fun
# fun1(input1, input2, names(workflow$params)[1] = workflow$params[[1]])
#}
op_text <- "function("
op_text <- paste0(op_text,
paste(paste0('input', 1:length(workflow$inputs)),
collapse = ', '))
op_text <- paste0(op_text, ") {")
op_text <- paste0(op_text, "\n fun1 <- ", paste(deparse(workflow$fun), collapse = '\n'))
op_text <- paste0(op_text, "\n res <- fun1(",
paste(paste0('input', 1:length(workflow$inputs)),
collapse = ", "))
if (length(workflow$params) > 0) {
for (j in 1:length(workflow$params)) {
op_text <- paste0(op_text, ", ")
op_text <- paste0(op_text, names(workflow$params)[j], " = ",
paste(deparse(workflow$params[[j]]), collapse = '\n'))
}
}
op_text <- paste0(op_text, ")")
op_text <- paste0(op_text, "\n}")
operation <- eval(parse(text = op_text))
operation <- Step(operation,
attr(workflow$fun, 'TargetDims'),
attr(workflow$fun, 'OutputDims'),
attr(workflow$fun, 'UseLibraries'),
attr(workflow$fun, 'UseAttributes'))
if (!all(sapply(workflow$inputs, class) == 'startR_cube')) {
stop("Workflows with only one step supported by now.")
}
# Run ByChunks with the chosen operation
if (is.null(workflow_manager)) {
stop("Specify parameter 'workflow_manager' as 'ecFlow' or 'Autosubmit'.")
} else if (!tolower(workflow_manager) %in% c('ecflow', 'autosubmit')) {
stop("Parameter 'workflow_manager' can only be 'ecFlow' or 'Autosubmit'.")
}
} else { # run locally
workflow_manager <- 'ecflow'
if (tolower(workflow_manager) == 'ecflow') {
res <- ByChunks_ecflow(step_fun = operation,
cube_headers = workflow$inputs,
chunks = chunks,
threads_load = threads_load,
threads_compute = threads_compute,
cluster = cluster,
ecflow_suite_dir = ecflow_suite_dir,
ecflow_server = ecflow_server,
silent = silent, debug = debug, wait = wait)
} else {
res <- ByChunks_autosubmit(step_fun = operation,
cube_headers = workflow$inputs,
chunks = chunks,
threads_load = threads_load,
threads_compute = threads_compute,
cluster = cluster,
autosubmit_suite_dir = autosubmit_suite_dir,
autosubmit_server = autosubmit_server,
silent = silent, debug = debug, wait = wait)
}