Collect.R 1.65 KB
Newer Older
Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (!('startR_exec' %in% class(startr_exec))) {
    stop("Parameter 'startr_exec' must be an object of the class ",
         "'startR_exec', as returned by ByChunks(..., wait = FALSE).")
  }
  cluster <- startr_exec[['cluster']]
  ecflow_host <- startr_exec[['ecflow_host']]
  suite_id <- startr_exec[['suite_id']]
  shared_dir <- startr_exec[['shared_dir']]
  done <- FALSE
  attempt <- 1
  while (!done) {
    Sys.sleep(min(sqrt(attempt), 5))
    status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_",
                     suite_id, " --host=",
                     ecflow_host[['name']], " --port=", ecflow_host[['port']]),
                     intern = TRUE)
    if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) {
      done <- TRUE
    } else if (!wait) {
      stop("Computation in progress...")
    }
    attempt <- attempt + 1
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (remove) {
    .warning("ATTENTION: The source chunks will be removed from the ",
             "system. Store the result after Collect() ends if needed.")
  }
  result <- startR:::.MergeChunks(shared_dir, suite_id, remove)
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (remove) {
    system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_",
                  suite_id, " --host=", ecflow_host[['name']],
                  " --port=", ecflow_host[['port']]))
    unlink(paste0(shared_dir, '/STARTR_CHUNKING_', suite_id), 
           recursive = TRUE)
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  #system("ecflow_client --shutdown --port=5678")
  #system("ecflow_stop.sh -p 5678")
  #result <- readRDS(paste0(shared_dir, '/result.Rds'))
  #file.remove(paste0(shared_dir, '/result.Rds'))
  result
}