Compute.R 9.5 KB
Newer Older
aho's avatar
aho committed
#'Specify the execution parameters and trigger the execution 
#'
#'The step of the startR workflow after the complete workflow is defined by 
#'AddStep(). This function specifies the execution parameters and triggers the
#'execution. The execution can be operated locally or on a remote machine. If 
#'it is the latter case, the configuration of the machine needs to be 
#'sepecified in the function, and the EC-Flow server is required to be 
aho's avatar
aho committed
#'installed.\cr\cr
aho's avatar
aho committed
#'The execution can be operated by chunks to avoid overloading the RAM memory.
#'After all the chunks are finished, Compute() will gather and merge them, and 
#'return a single data object, including one or multiple multidimensional data 
#'arrays and additional metadata.
#'
#'@param workflow A list of the class 'startR_workflow' returned by function 
#'  AddSteop() or of class 'startR_cube' returned by function Start(). It 
#'  contains all the objects needed for the execution.
#'@param chunks A named list of dimensions which to split the data along and 
#'  the number of chunks to make for each. The chunked dimension can only be 
#'  those not required as the target dimension in function Step(). The default
#'  value is 'auto', which lists all the non-target dimensions and each one has
#'  one chunk.
#'@param threads_load An integer indicating the number of execution threads to 
#'  use for the data retrieval stage. The default value is 1.
#'@param threads_compute An integer indicating the number of execution threads
#'  to use for the computation. The default value is 1.
#'@param cluster A list of components that define the configuration of the 
#'  machine to be run on. The comoponents vary from the different machines.
#'  Check \href{https://earth.bsc.es/gitlab/es/startR/-/blob/master/inst/doc/practical_guide.md}{Practical guide on GitLab} for more 
aho's avatar
aho committed
#'  details and examples. Only needed when the computation is not run locally. 
#'  The default value is NULL.
#'@param workflow_manager Can be NULL, 'ecFlow' or 'Autosubmit'. The default is 'ecFlow'.
aho's avatar
aho committed
#'@param ecflow_suite_dir A character string indicating the path to a folder in
#'  the local workstation where to store temporary files generated for the 
#'  automatic management of the workflow. Only needed when the execution is run
#'  remotely. The default value is NULL.
#'@param ecflow_server A named vector indicating the host and port of the 
#'  EC-Flow server. The vector form should be 
#'  \code{c(host = 'hostname', port = port_number)}. Only needed when the 
#'  execution is run#'  remotely. The default value is NULL.
#'@param silent A logical value deciding whether to print the computation 
#'  progress (FALSE) on the R session or not (TRUE). It only works when the 
#'  execution runs locally or the parameter 'wait' is TRUE. The default value
#'  is FALSE.
#'@param debug A logical value deciding whether to return detailed messages on 
#'  the progress and operations in a Compute() call (TRUE) or not (FALSE). 
#'  Automatically changed to FALSE if parameter 'silent' is TRUE. The default 
#'  value is FALSE.
#'@param wait A logical value deciding whether the R session waits for the 
#'  Compute() call to finish (TRUE) or not (FALSE). If FALSE, it will return an
#'  object with all the information of the startR execution that can be stored
#'  in your disk. After that, the R session can be closed and the results can
aho's avatar
aho committed
#'  be collected later with the Collect() function. The default value is TRUE.
aho's avatar
aho committed
#'
#'@return A list of data arrays for the output returned by the last step in the
aho's avatar
aho committed
#'  specified workflow (wait = TRUE), or an object with information about the 
#'  startR execution (wait = FALSE). The configuration details and profiling 
#'  information are attached as attributes to the returned list of arrays.
aho's avatar
aho committed
#'@examples
aho's avatar
aho committed
#'  data_path <- system.file('extdata', package = 'startR')
aho's avatar
aho committed
#'  path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc')
#'  sdates <- c('200011', '200012')
aho's avatar
aho committed
#'  data <- Start(dat = list(list(path = path_obs)),
aho's avatar
aho committed
#'                var = 'tos',
#'                sdate = sdates,
#'                time = 'all',
#'                latitude = 'all',
#'                longitude = 'all',
#'                return_vars = list(latitude = 'dat',
#'                                   longitude = 'dat',
#'                                   time = 'sdate'),
#'                retrieve = FALSE)
#'  fun <- function(x) {
#'            lat = attributes(x)$Variables$dat1$latitude
#'            weight = sqrt(cos(lat * pi / 180))
#'            corrected = Apply(list(x), target_dims = "latitude",
#'                              fun = function(x) {x * weight})
#'          }
#'  step <- Step(fun = fun,
#'               target_dims = 'latitude',
#'               output_dims = 'latitude',
#'               use_libraries = c('multiApply'),
#'               use_attributes = list(data = "Variables"))
#'  wf <- AddStep(data, step)
#'  res <- Compute(wf, chunks = list(longitude = 4, sdate = 2))
#'
aho's avatar
aho committed
#'@importFrom methods is
aho's avatar
aho committed
#'@export
aho's avatar
aho committed
Compute <- function(workflow, chunks = 'auto', workflow_manager = 'ecFlow',
                    threads_load = 1, threads_compute = 1,
                    cluster = NULL, ecflow_suite_dir = NULL, ecflow_server = NULL,
                    autosubmit_suite_dir = NULL, autosubmit_server = NULL,
                    silent = FALSE, debug = FALSE, wait = TRUE) {
  # Check workflow
aho's avatar
aho committed
  if (!is(workflow, 'startR_cube') & !is(workflow, 'startR_workflow')) {
    stop("Parameter 'workflow' must be an object of class 'startR_cube' as ",
         "returned by Start or of class 'startR_workflow' as returned by ",
         "AddStep.")
  }
  
aho's avatar
aho committed
  if (is(workflow, 'startR_cube')) {
    #machine_free_ram <- 1000000000
    #max_ram_ratio <- 0.5
    #data_size <- prod(c(attr(workflow, 'Dimensions'), 8))
    #if (data_size > (machine_free_ram * max_ram_ratio)) {
    #  stop("It is not possible to fit the requested data (", data_size, 
    #       " bytes) into the maximum allowed free ram (", max_ram_ratio, 
    #       " x ", machine_free_ram, ").")
    #}
    eval(workflow)
  } else {
    # TODO:
    #explore tree of operations and identify set of operations that reduce dimensionality as much as possible
    #  while being able to fit in (cluster and to exploit number of available nodes) | (machine)
    #combine set of operations into a single function
    #Goal: to build manually a function following this pattern:
    #operation <- function(input1, input2) {
    #  fun1 <- workflow$fun
    #  fun1(input1, input2, names(workflow$params)[1] = workflow$params[[1]])
    #}
    op_text <- "function("
    op_text <- paste0(op_text,
                      paste(paste0('input', 1:length(workflow$inputs)), 
                            collapse = ', '))
    op_text <- paste0(op_text, ") {")
    op_text <- paste0(op_text, "\n  fun1 <- ", paste(deparse(workflow$fun), collapse = '\n'))
    op_text <- paste0(op_text, "\n  res <- fun1(", 
                      paste(paste0('input', 1:length(workflow$inputs)),
                            collapse = ", "))
    if (length(workflow$params) > 0) {
      for (j in 1:length(workflow$params)) {
        op_text <- paste0(op_text, ", ")
        op_text <- paste0(op_text, names(workflow$params)[j], " = ",
                          paste(deparse(workflow$params[[j]]), collapse = '\n'))
      }
    }
    op_text <- paste0(op_text, ")")
    op_text <- paste0(op_text, "\n}")
    operation <- eval(parse(text = op_text))
    operation <- Step(operation, 
                      attr(workflow$fun, 'TargetDims'), 
                      attr(workflow$fun, 'OutputDims'),
                      attr(workflow$fun, 'UseLibraries'),
                      attr(workflow$fun, 'UseAttributes'))
    
    if (!all(sapply(workflow$inputs, class) == 'startR_cube')) {
      stop("Workflows with only one step supported by now.")
    }
    # Run ByChunks with the chosen operation
aho's avatar
aho committed
    if (!is.null(cluster)) {
      if (is.null(workflow_manager)) {
        stop("Specify parameter 'workflow_manager' as 'ecFlow' or 'Autosubmit'.")
      } else if (!tolower(workflow_manager) %in% c('ecflow', 'autosubmit')) {
aho's avatar
aho committed
        stop("Parameter 'workflow_manager' can only be 'ecFlow' or 'Autosubmit'.")
      }
    } else { # run locally
      workflow_manager <- 'ecflow'
    if (tolower(workflow_manager) == 'ecflow') {
aho's avatar
aho committed
      # ecFlow or run locally
      res <- ByChunks_ecflow(step_fun = operation,
                             cube_headers = workflow$inputs,
                             chunks = chunks,
                             threads_load = threads_load,
                             threads_compute = threads_compute,
                             cluster = cluster,
                             ecflow_suite_dir = ecflow_suite_dir,
                             ecflow_server = ecflow_server,
                             silent = silent, debug = debug, wait = wait)
    } else {
      res <- ByChunks_autosubmit(step_fun = operation,
                      cube_headers = workflow$inputs,
                      chunks = chunks,
                      threads_load = threads_load,
                      threads_compute = threads_compute,
                      cluster = cluster,
                      autosubmit_suite_dir = autosubmit_suite_dir,
                      autosubmit_server = autosubmit_server,
                      silent = silent, debug = debug, wait = wait)

    } 
    # TODO: carry out remaining steps locally, using multiApply
    # Return results
    res
  }
}