Compute.R 3.3 KB
Newer Older
Compute <- function(workflow, chunks = 'auto',
                    threads_load = 1, threads_compute = 1, 
                    cluster = NULL, ecflow_suite_dir = NULL,
                    ecflow_server = NULL, silent = FALSE, debug = FALSE,
Nicolau Manubens's avatar
Nicolau Manubens committed
                    wait = TRUE) {
  # Check workflow
  if (!any(c('startR_header', 'startR_workflow') %in% class(workflow))) {
    stop("Parameter 'workflow' must be an object of class 'startR_header' as ",
         "returned by Start or of class 'startR_workflow' as returned by ",
         "AddStep.")
  }

  if ('startR_header' %in% class(workflow)) {
    #machine_free_ram <- 1000000000
    #max_ram_ratio <- 0.5
    #data_size <- prod(c(attr(workflow, 'Dimensions'), 8))
    #if (data_size > (machine_free_ram * max_ram_ratio)) {
    #  stop("It is not possible to fit the requested data (", data_size, 
    #       " bytes) into the maximum allowed free ram (", max_ram_ratio, 
    #       " x ", machine_free_ram, ").")
    #}
    eval(workflow)
Nicolau Manubens's avatar
Nicolau Manubens committed
  } else {
    # TODO:
    #explore tree of operations and identify set of operations that reduce dimensionality as much as possible
    #  while being able to fit in (cluster and to exploit number of available nodes) | (machine)
    #combine set of operations into a single function
    #Goal: to build manually a function following this pattern:
    #operation <- function(input1, input2) {
    #  fun1 <- workflow$fun
    #  fun1(input1, input2, names(workflow$params)[1] = workflow$params[[1]])
    #}
    op_text <- "function("
    op_text <- paste0(op_text,
                      paste(paste0('input', 1:length(workflow$inputs)), 
                            collapse = ', '))
    op_text <- paste0(op_text, ") {")
    op_text <- paste0(op_text, "\n  fun1 <- ", paste(deparse(workflow$fun), collapse = '\n'))
    op_text <- paste0(op_text, "\n  res <- fun1(", 
                      paste(paste0('input', 1:length(workflow$inputs)),
                            collapse = ", "))
Nicolau Manubens's avatar
Nicolau Manubens committed
    if (length(workflow$params) > 0) {
      for (j in 1:length(workflow$params)) {
Nicolau Manubens's avatar
Nicolau Manubens committed
        op_text <- paste0(op_text, ", ")
Nicolau Manubens's avatar
Nicolau Manubens committed
        op_text <- paste0(op_text, names(workflow$params)[j], " = ",
                          paste(deparse(workflow$params[[j]]), collapse = '\n'))
      }
    }
    op_text <- paste0(op_text, ")")
    op_text <- paste0(op_text, "\n}")
    operation <- eval(parse(text = op_text))
    operation <- Step(operation, 
                      attr(workflow$fun, 'TargetDims'), 
Nicolau Manubens's avatar
Nicolau Manubens committed
                      attr(workflow$fun, 'OutputDims'),
                      attr(workflow$fun, 'UseLibraries'),
                      attr(workflow$fun, 'UseAttributes'))
Nicolau Manubens's avatar
Nicolau Manubens committed
    if (!all(sapply(workflow$inputs, class) == 'startR_header')) {
      stop("Workflows with only one step supported by now.")
    }
    # Run ByChunks with the combined operation
    res <- ByChunks(step_fun = operation, 
Nicolau Manubens's avatar
Nicolau Manubens committed
                    cube_headers = workflow$inputs,
                    chunks = chunks, 
                    threads_load = threads_load,
                    threads_compute = threads_compute,
                    cluster = cluster, 
                    ecflow_suite_dir = ecflow_suite_dir,
                    ecflow_server = ecflow_server,
                    silent = silent, debug = debug, wait = wait)
    # TODO: carry out remaining steps locally, using multiApply
    # Return results
    res