ByChunks.R 18.1 KB
Newer Older
Nicolau Manubens's avatar
Nicolau Manubens committed
ByChunks <- function(fun, cube_headers, ..., chunks = 'auto',
Nicolau Manubens's avatar
Nicolau Manubens committed
                     ncores = 1, cluster = NULL, shared_dir = NULL, 
Nicolau Manubens's avatar
Nicolau Manubens committed
                     ecflow_host = NULL, silent = FALSE, debug = FALSE,
                     wait = TRUE) {
Nicolau Manubens's avatar
Nicolau Manubens committed
  library(multiApply, lib.loc = '~nmanuben/multiApply/multiApply.Rcheck')
  MergeArrays <- startR:::.MergeArrays

  # Check input headers
  if ('startR_header' %in% class(cube_headers)) {
    cube_headers <- list(cube_headers)
  }
  if (!all(sapply(lapply(cube_headers, class), 
                  function(x) 'startR_header' %in% x))) {
Nicolau Manubens's avatar
Nicolau Manubens committed
    stop("All objects passed in 'cube_headers' must be of class 'startR_header', ",
         "as returned by Start().")
  }

  # Check fun
  if (!is.function(fun)) {
    stop("Parameter 'fun' must be a function.")
  }

  # Check cores
  if (!is.numeric(ncores)) {
    stop("Parameter 'ncores' must be a numeric value.")
  }
  ncores <- round(ncores)

Nicolau Manubens's avatar
Nicolau Manubens committed
  on_cluster <- !is.null(cluster)
Nicolau Manubens's avatar
Nicolau Manubens committed
  default_cluster <- list(queue_host = NULL,
Nicolau Manubens's avatar
Nicolau Manubens committed
                          node_memory = NULL,
                          cores_per_job = 1,
                          job_wallclock = '01:00:00',
Nicolau Manubens's avatar
Nicolau Manubens committed
                          max_jobs = 6)
  if (!is.null(cluster)) {
    if (!is.list(cluster)) {
      stop("Parameter 'cluster' must be a named list.")
    }
    if (is.null(names(cluster))) {
      stop("Parameter 'cluster' must be a named list.")
    }
    if (any(!(names(cluster) %in% c('queue_host', 'node_memory', 'cores_per_job', 'job_wallclock', 'max_jobs')))) {
      stop("Found invalid component names in parameter 'cluster'.")
    }
    default_cluster[names(cluster)] <- cluster
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  cluster <- default_cluster
  if (on_cluster) {
    if (Sys.which('ecflow_client') == '') {
      stop("ecFlow must be installed in order to run the computation on clusters.")
    }
Nicolau Manubens's avatar
Nicolau Manubens committed
    if (is.null(cluster[['queue_host']])) {
      stop("The component 'queue_host' of the parameter 'cluster' must be specified.")
    }
Nicolau Manubens's avatar
Nicolau Manubens committed
  suite_id <- sample(10 ^ 10, 1)
  shared_dir_suite <- ''
  if (on_cluster) {
    if (is.null(shared_dir)) {
      stop("Parameter 'shared_dir' must be specified when a cluster is specified.")
    } else {
Nicolau Manubens's avatar
Nicolau Manubens committed
      shared_dir_suite <- paste0(shared_dir, '/STARTR_CHUNKING_', suite_id, '/')
      dir.create(shared_dir_suite, recursive = TRUE)
      if (!dir.exists(shared_dir_suite)) stop("Could not find or create the directory in ",
                                        "parameter 'shared_dir'.")
    }
  }

  # Check silent
  if (!is.logical(silent)) {
    stop("Parameter 'silent' must be logical.")
  }

  # Check debug
  if (!is.logical(debug)) {
    stop("Parameter 'debug' must be logical.")
  }
  if (silent) {
    debug <- FALSE
  }

Nicolau Manubens's avatar
Nicolau Manubens committed
  # Check wait
  if (!is.logical(wait)) {
    stop("Parameter 'wait' must be logical.")
  }

  # Work out chunked dimensions and target dimensions
  all_dims <- lapply(cube_headers, attr, 'Dimensions')
  all_dims_merged <- NULL
  for (i in all_dims) {
    if (is.null(all_dims_merged)) {
Nicolau Manubens's avatar
Nicolau Manubens committed
      all_dims_merged <- i
    } else {
Nicolau Manubens's avatar
Nicolau Manubens committed
      all_dims_merged <- startR:::.MergeArrayDims(all_dims_merged, i)[[1]]
    }
  }
  all_dimnames <- names(all_dims_merged)

  target_dims_indices <- which(all_dimnames %in% unlist(attr(fun, 'target_dims')))
  target_dims <- NULL
  if (length(target_dims_indices) > 0) {
    target_dims <- all_dimnames[target_dims_indices]
  }

  chunked_dims <- all_dimnames
  if (length(target_dims_indices) > 0) {
    chunked_dims <- chunked_dims[-target_dims_indices]
  }
  if (length(chunked_dims) < 1) {
    stop("Not possible to process input by chunks. All input dimensions are ",
         "target dimensions.")
  }

  if (length(cube_headers) != length(attr(fun, 'target_dims'))) {
    stop("Number of inputs in parameter 'cube_headers' must be equal to the ",
         "number of inputs expected by the function 'fun'.")
  }
  # Check all input headers have matching dimensions
  cube_index <- 1
  for (cube_header in cube_headers) {
Nicolau Manubens's avatar
Nicolau Manubens committed
    if (!all(attr(cube_header, 'Dimensions') == unlist(all_dims)[names(attr(cube_header, 'Dimensions'))])) {
      stop("All provided 'cube_headers' must have matching dimension lengths ",
           "with each other.")
    }
Nicolau Manubens's avatar
Nicolau Manubens committed
    if (!all(attr(fun, 'target_dims')[[cube_index]] %in% names(attr(cube_header, 'Dimensions')))) {
      stop("All provided 'cube_headers' must contain at least the target dimensions ",
           "expected by 'fun'.")
    }
    cube_index <- cube_index + 1
    # work out expected result dimensions
  }

  # Check chunks
  default_chunks <- as.list(rep(1, length(chunked_dims)))
  names(default_chunks) <- chunked_dims
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (length(chunks) == 1 && chunks == 'auto') {
    chunks <- default_chunks
  }
  if (!is.list(chunks)) {
    stop("Parameter 'chunks' must be a named list or 'auto'.")
  }
  if (is.null(names(chunks))) {
    stop("Parameter 'chunks' must be a named list or 'auto'.")
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (any(!(names(chunks) %in% chunked_dims))) {
    stop("All names in parameter 'chunks' must be one of the non-target dimensions ",
         "present in the cubes in 'cube_headers'. The target dimensions are ", 
         paste(paste0("'", target_dims, "'"), collapse = ', '), ". The non-target ",
Nicolau Manubens's avatar
Nicolau Manubens committed
         "dimensions (margins) are ", paste(paste0("'", chunked_dims, "'"), collapse = ', '), ".")
  }
  if (any(!(((unlist(chunks) %% 1) == 0) | (unlist(chunks) == 'all')))) {
    stop("All values in parameter 'chunks' must take a numeric value or 'all'.")
  }
  if (any(unlist(chunks) < 1)) {
    stop("All values in parameter 'chunks' must be >= 1.")
  }
  for (chunk_spec in 1:length(chunks)) {
    if (chunks[[chunk_spec]] > unlist(all_dims)[names(chunks)[chunk_spec]]) {
      stop("Too many chunks requested for the dimension ", names(chunks)[chunk_spec], 
           ". Maximum allowed is ", unlist(all_dims)[names(chunks)[chunk_spec]])
    }
  }
  default_chunks[names(chunks)] <- chunks
  chunks <- default_chunks

  # Check fun
  if (!('startR_step' %in% class(fun))) {
    stop("Parameter 'fun' must be of the class 'startR_step', as returned ",
         "by the function Step.")
  }

  # Replace 'all's
  chunks_all <- which(unlist(chunks) == 'all')
  if (length(chunks_all) > 0) {
    chunks[chunks_all] <- all_dims[names(chunks)[chunks_all]] 
  }
  # Mount the ecFlow suite
  if (on_cluster) {
    # Copy load_process_save_chunk.R into shared folder
    chunk_script <- file(system.file('chunking/load_process_save_chunk.R', 
                                     package = 'startR'))
    chunk_script_lines <- readLines(chunk_script)
Nicolau Manubens's avatar
Nicolau Manubens committed
    close(chunk_script)
    chunk_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', 
Nicolau Manubens's avatar
Nicolau Manubens committed
                               paste(deparse(shared_dir_suite), collapse = '\n')), chunk_script_lines)
    chunk_script_lines <- gsub('^debug <- *', paste0('debug <- ', paste(deparse(debug), collapse = '\n')), 
                               chunk_script_lines)
Nicolau Manubens's avatar
Nicolau Manubens committed
    deparsed_calls <- paste0('start_calls <- list(')
    for (cube_header in 1:length(cube_headers)) {
      deparsed_calls <- paste0(deparsed_calls, '\nquote(',
        paste(deparse(cube_headers[[cube_header]]), collapse = '\n'), 
        ')')
      if (cube_header < length(cube_headers)) {
        deparsed_calls <- paste0(deparsed_calls, ', ')
      }
    }
    deparsed_calls <- paste0(deparsed_calls, '\n)')
    chunk_script_lines <- gsub('^start_calls <- *', deparsed_calls, chunk_script_lines)
Nicolau Manubens's avatar
Nicolau Manubens committed
    chunk_script_lines <- gsub('^start_calls_dims <- *', paste0('start_calls_dims <- ', paste(deparse(lapply(cube_headers, attr, 'Dimensions')), collapse = '\n')), 
                               chunk_script_lines)
    chunk_script_lines <- gsub('^param_dimnames <- *', paste0('param_dimnames <- ', paste(deparse(chunked_dims), collapse = '\n')), 
                               chunk_script_lines)
    chunk_script_lines <- gsub('^cores_per_job <- *', paste0('cores_per_job <- ', cluster[['cores_per_job']]), 
                               chunk_script_lines)
    chunk_script_lines <- gsub('^fun <- *', paste0('fun <- ', paste(deparse(fun), collapse = '\n')), 
                               chunk_script_lines)
    chunk_script_lines <- gsub('^params <- *', paste0('params <- ', paste(deparse(list(...)), collapse = '\n')), 
                               chunk_script_lines)
Nicolau Manubens's avatar
Nicolau Manubens committed
    writeLines(chunk_script_lines, paste0(shared_dir_suite, '/load_process_save_chunk.R'))
  
    # Copy Chunk.ecf into shared folder
    chunk_ecf_script <- file(system.file('chunking/Chunk.ecf',
                                         package = 'startR'))
    chunk_ecf_script_lines <- readLines(chunk_ecf_script)
Nicolau Manubens's avatar
Nicolau Manubens committed
    close(chunk_ecf_script)
    ecf_vars <- paste(paste0('%', as.vector(sapply(chunked_dims,
      function(x) {
        c(toupper(x), paste0(toupper(x), '_N'))
      })), '%'), collapse = ' ')
    chunk_ecf_script_lines <- gsub('^Rscript load_process_save_chunk.R --args *', 
                                   paste0('Rscript load_process_save_chunk.R --args ', ecf_vars),
                                   chunk_ecf_script_lines)
Nicolau Manubens's avatar
Nicolau Manubens committed
    writeLines(chunk_ecf_script_lines, paste0(shared_dir_suite, '/Chunk.ecf'))

    # Copy merge_chunks.R into tmp folder
Nicolau Manubens's avatar
Nicolau Manubens committed
#    merge_script <- file(system.file('chunking/merge_chunks.R',
#                                     package = 'startR'))
#    merge_script_lines <- readLines(merge_script)
#    close(merge_script)
#    merge_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', 
#                               paste(deparse(shared_dir_suite), collapse = '\n')), merge_script_lines)
#    writeLines(merge_script_lines, paste0(shared_dir_suite, '/merge_chunks.R'))
  
    # Copy Merge.ecf into tmp folder
    #TODO: Modify chain of parameters sent to r script when merging 
    #chunks progressively
Nicolau Manubens's avatar
Nicolau Manubens committed
#    merge_ecf_script <- file(system.file('chunking/Merge.ecf',
#                                         package = 'startR'))
#    merge_ecf_script_lines <- readLines(merge_ecf_script)
#    close(merge_ecf_script)
#    writeLines(merge_ecf_script_lines, paste0(shared_dir_suite, '/Merge.ecf')) 

    # Copy headers
    file.copy(system.file('chunking/head.h', package = 'startR'),
Nicolau Manubens's avatar
Nicolau Manubens committed
              shared_dir_suite)
    file.copy(system.file('chunking/tail.h', package = 'startR'),
Nicolau Manubens's avatar
Nicolau Manubens committed
              shared_dir_suite)
    file.copy(system.file('chunking/slurm.h', package = 'startR'),
Nicolau Manubens's avatar
Nicolau Manubens committed
              shared_dir_suite)
  }

  add_line <- function(suite, line, tabs) {
    c(suite, paste0(paste(rep(' ', tabs), collapse = ''), line))
  }
  suite <- NULL
  tabs <- 0
Nicolau Manubens's avatar
Nicolau Manubens committed
  suite <- add_line(suite, paste0('suite STARTR_CHUNKING_', suite_id), tabs)
  tabs <- tabs + 2
Nicolau Manubens's avatar
Nicolau Manubens committed
  suite <- add_line(suite, paste0("edit ECF_HOME '", shared_dir_suite, "'"), tabs)
  suite <- add_line(suite, paste0("edit ECF_FILES '", shared_dir_suite, "'"), tabs)
  suite <- add_line(suite, paste0("edit ECF_INCLUDE '", shared_dir_suite, "'"), tabs)
  suite <- add_line(suite, paste0("edit CORES_PER_JOB ", cluster[['cores_per_job']], ""), tabs)
  suite <- add_line(suite, paste0("edit JOB_WALLCLOCK '", cluster[['job_wallclock']], "'"), tabs)
  suite <- add_line(suite, paste0("limit max_jobs ", cluster[['max_jobs']]), tabs)
  suite <- add_line(suite, paste0("inlimit max_jobs"), tabs)
  suite <- add_line(suite, paste0("edit HOST '", cluster[['queue_host']], "'"), tabs)
  suite <- add_line(suite, "edit ECF_JOB_CMD 'ssh %HOST% 'sbatch %ECF_JOB% > %ECF_JOBOUT% 2>&1 &''", tabs)
  suite <- add_line(suite, "family computation", tabs)
  tabs <- tabs + 2

  if (!silent) {
    startR:::.message(paste0("Processing chunks... ",
                             "remaining time estimate soon..."))
  }
  time_before_first_chunk <- Sys.time()
  time_after_first_chunk <- NULL

  # Empty list to collect results
  result <- vector('list', length(attr(fun, 'output_dims')))
Nicolau Manubens's avatar
Nicolau Manubens committed
  names(result) <- names(attr(fun, 'output_dims'))

  # Open nested ecFlow families
Nicolau Manubens's avatar
Nicolau Manubens committed
  for (i in length(chunked_dims):1) {
Nicolau Manubens's avatar
Nicolau Manubens committed
    suite <- add_line(suite, paste0('family ', chunked_dims[i], '_CHUNK_', 1), tabs)
    tabs <- tabs + 2
Nicolau Manubens's avatar
Nicolau Manubens committed
    suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), ' ', 1), tabs)
    suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), '_N ', chunks[[chunked_dims[i]]]), tabs)
Nicolau Manubens's avatar
Nicolau Manubens committed

  # Iterate through chunks
Nicolau Manubens's avatar
Nicolau Manubens committed
  chunk_array <- array(1:prod(unlist(chunks)), dim = (unlist(chunks)))
  array_of_results <- vector('list', prod((unlist(chunks))))
  dim(array_of_results) <- (unlist(chunks))
  previous_chunk_indices <- rep(1, length(chunks))
  for (i in 1:length(chunk_array)) {
Nicolau Manubens's avatar
Nicolau Manubens committed
    chunk_indices <- which(chunk_array == i, arr.ind = TRUE)[1, ]
    names(chunk_indices) <- names(dim(chunk_array))
    # ADD CHUNK SCRIPT TO SUITE
Nicolau Manubens's avatar
Nicolau Manubens committed
    families_to_jump <- which(chunk_indices != previous_chunk_indices)
    if (length(families_to_jump) > 0) {
      families_to_jump <- max(families_to_jump)
      # Close ecFlow families
      for (j in 1:families_to_jump) {
        tabs <- tabs - 2
        suite <- add_line(suite, paste0('endfamily'), tabs)
      }
      # Open ecFlow families
      for (j in families_to_jump:1) {
Nicolau Manubens's avatar
Nicolau Manubens committed
        suite <- add_line(suite, paste0('family ', (chunked_dims)[j], '_CHUNK_', chunk_indices[j]), tabs)
Nicolau Manubens's avatar
Nicolau Manubens committed
        tabs <- tabs + 2
        suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), ' ', chunk_indices[j]), tabs)
        suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), '_N ', chunks[[(chunked_dims)[j]]]), tabs)
      }
    }
    suite <- add_line(suite, "task Chunk", tabs)

    if (!on_cluster) {
      if (!silent) {
        startR:::.message(paste("Loading chunk", i, 
                                "out of", length(chunk_array), "..."))
      }
      data <- vector('list', length(cube_headers))
      for (input in 1:length(data)) {
        start_call <- cube_headers[[input]]
Nicolau Manubens's avatar
Nicolau Manubens committed
        dims_to_alter <- which(names(attr(start_call, 'Dimensions')) %in% names(chunks))
        if (length(dims_to_alter) > 0) {
          for (call_dim in names(attr(start_call, 'Dimensions'))[dims_to_alter]) {
            start_call[[call_dim]] <- chunk(chunk_indices[call_dim], chunks[[call_dim]], 
                                            eval(start_call[[call_dim]]))
          }
Nicolau Manubens's avatar
Nicolau Manubens committed
        start_call[['silent']] <- !debug
        data[[input]] <- eval(start_call)
      }
      if (!silent) {
        startR:::.message(paste("Processing..."))
      }
Nicolau Manubens's avatar
Nicolau Manubens committed
      array_of_results[[i]] <- multiApply::Apply(data, 
                                                 target_dims = attr(fun, 'target_dims'), 
                                                 AtomicFun = deparse(substitute(fun)), ..., 
                                                 output_dims = attr(fun, 'output_dims'),
Nicolau Manubens's avatar
Nicolau Manubens committed
                                                 parallel = ncores > 1,
                                                 ncores = ncores)
Nicolau Manubens's avatar
Nicolau Manubens committed
    # Time estimate
    if (!on_cluster) {
      if (is.null(time_after_first_chunk)) {
        time_after_first_chunk <- Sys.time()
        if (!silent) {
          estimate <- (time_after_first_chunk - 
                       time_before_first_chunk) *
                       (length(chunk_array) - 1)
          units(estimate) <- 'mins'
          startR:::.message(
            paste0("Remaining time estimate (at ", format(time_after_first_chunk), ") ", 
                   "(neglecting merge time): ", format(estimate))
          )
Nicolau Manubens's avatar
Nicolau Manubens committed
    previous_chunk_indices <- chunk_indices
  }

  # Close nested ecFlow families
  for (i in length(chunked_dims):1) {
    tabs <- tabs - 2
    suite <- add_line(suite, paste0('endfamily'), tabs)
  }

  # Close the ecFlow suite
Nicolau Manubens's avatar
Nicolau Manubens committed
  tabs <- tabs - 2
  suite <- add_line(suite, paste0('endfamily'), tabs)
Nicolau Manubens's avatar
Nicolau Manubens committed
#  suite <- add_line(suite, "family merge", tabs)
#  tabs <- tabs + 2
#  suite <- add_line(suite, "trigger computation == complete", tabs)
#  suite <- add_line(suite, "edit ECF_JOB_CMD 'bash %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'", tabs)
#  suite <- add_line(suite, "task Merge", tabs)
#  tabs <- tabs - 2
#  suite <- add_line(suite, paste0('endfamily'), tabs)

  tabs <- tabs - 2
  suite <- add_line(suite, "endsuite", tabs)

  # Run ecFlow suite if needed
  if (on_cluster) {
Nicolau Manubens's avatar
Nicolau Manubens committed
    suite_file <- paste0(shared_dir_suite, '/startR_chunking.def')
    suite_file_o <- file(suite_file)
    writeLines(suite, suite_file_o)
    close(suite_file_o)
Nicolau Manubens's avatar
Nicolau Manubens committed

Nicolau Manubens's avatar
Nicolau Manubens committed
    default_ecflow_host <- list(name = 'localhost', port = '5678')
Nicolau Manubens's avatar
Nicolau Manubens committed
    if (is.null(ecflow_host)) {
      .warning("Parameter 'ecflow_host' has not been specified but execution on ",
               "cluster has been requested. An ecFlow server instance will ",
               "be created on localhost:5678.")
    } else {
      default_ecflow_host[names(ecflow_host)] <- ecflow_host
    }
    ecflow_host <- default_ecflow_host
    system(paste0("ecflow_start.sh -p ", ecflow_host[['port']]))
    system(paste0("ecflow_client --load=", suite_file, " --host=",
                  ecflow_host[['name']], " --port=", ecflow_host[['port']]))
    system(paste0("ecflow_client --begin=STARTR_CHUNKING_", suite_id, 
                  " --host=", ecflow_host[['name']], " --port=", 
                  ecflow_host[['port']]))
Nicolau Manubens's avatar
Nicolau Manubens committed
     
    startr_exec <- list(cluster = cluster, ecflow_host = ecflow_host, 
                        shared_dir = shared_dir, suite_id = suite_id)
    class(startr_exec) <- 'startR_exec'
    if (wait) {
      if (!silent) {
        while (is.null(time_after_first_chunk)) {
          if (any(grepl('.*\\.Rds$', list.files(shared_dir_suite)))) {
            time_after_first_chunk <- Sys.time()
            estimate <- (time_after_first_chunk - 
                         time_before_first_chunk) * 
                        ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) / 
                                cluster[['max_jobs']])
            units(estimate) <- 'mins'
            startR:::.message(
              paste0('Remaining time estimate (neglecting queue and ',
                     'merge time) (at ', format(time_after_first_chunk),
                     '): ', format(estimate))
            )
          }
Nicolau Manubens's avatar
Nicolau Manubens committed
      result <- Collect(startr_exec, wait = TRUE)
      startR:::.message("Computation ended successfully.")
      result
    } else {
      startr_exec
Nicolau Manubens's avatar
Nicolau Manubens committed
  } else {
    array_of_results <- startR:::.MergeArrayOfArrays(array_of_results)
    startR:::.message("Computation ended successfully.")
    array_of_results
Nicolau Manubens's avatar
Nicolau Manubens committed
  #TODO: check result dimensions match expected dimensions