Collect.R 3.14 KB
Newer Older
Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (!('startR_exec' %in% class(startr_exec))) {
    stop("Parameter 'startr_exec' must be an object of the class ",
Nicolau Manubens's avatar
Nicolau Manubens committed
         "'startR_exec', as returned by Collect(..., wait = FALSE).")
Nicolau Manubens's avatar
Nicolau Manubens committed
  }
  if (Sys.which('ecflow_client') == '') {
    stop("ecFlow must be installed in order to collect results from a ",
         "Compute() execution.")
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  cluster <- startr_exec[['cluster']]
  ecflow_server <- startr_exec[['ecflow_server']]
Nicolau Manubens's avatar
Nicolau Manubens committed
  suite_id <- startr_exec[['suite_id']]
  chunks <- startr_exec[['chunks']]
  ecflow_suite_dir <- startr_exec[['ecflow_suite_dir']]
Nicolau Manubens's avatar
Nicolau Manubens committed
  done <- FALSE
  attempt <- 1
  while (!done) {
    failed <- FALSE
    if (cluster[['bidirectional']]) {
      Sys.sleep(min(sqrt(attempt), 5))
    } else {
      Sys.sleep(cluster[['polling_period']])
      remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_', 
                                              suite_id, '/')
      rsync_output <- tryCatch({
        system(paste0("rsync -rav ", cluster[['queue_host']], ":", 
                      remote_ecflow_suite_dir_suite, "/*.Rds ", 
                      ecflow_suite_dir, "/"), intern = TRUE)
      }, error = function(e) {
        message("Warning: rsync from remote server to collect results failed. ",
                "Retrying soon.")
        failed <- TRUE
      })
    }
    if (!failed) {
      if (cluster[['bidirectional']]) {
        status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_",
                         suite_id, " --host=",
                         ecflow_server[['host']], " --port=", ecflow_server[['port']]),
                         intern = TRUE)
        if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) {
          done <- TRUE
        } else if (!wait) {
          stop("Computation in progress...")
        }
      } else {
        received_chunks <- grepl('Rds$', list.files(ecflow_suite_dir))
        if (received_chunks == prod(unlist(chunks))) {
          done <- TRUE
        } else if (!wait) {
          stop("Computation in progress...")
        }
      }
Nicolau Manubens's avatar
Nicolau Manubens committed
    }
    attempt <- attempt + 1
  }
  if (!is.null(cluster[['temp_dir']])) {
    ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_', suite_id)
    system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ', 
                  ecflow_suite_dir_suite, '"'))
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (remove) {
    .warning("ATTENTION: The source chunks will be removed from the ",
             "system. Store the result after Collect() ends if needed.")
  }
  result <- startR:::.MergeChunks(ecflow_suite_dir, suite_id, remove)
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (remove) {
    system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_",
                  suite_id, " --host=", ecflow_server[['host']],
                  " --port=", ecflow_server[['port']]))
Nicolau Manubens's avatar
Nicolau Manubens committed
    unlink(paste0(ecflow_suite_dir, '/STARTR_CHUNKING_', suite_id), 
Nicolau Manubens's avatar
Nicolau Manubens committed
           recursive = TRUE)
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  #system("ecflow_client --shutdown --port=5678")
  #system("ecflow_stop.sh -p 5678")
  #result <- readRDS(paste0(ecflow_output_dir, '/result.Rds'))
  #file.remove(paste0(ecflow_output_dir, '/result.Rds'))
Nicolau Manubens's avatar
Nicolau Manubens committed
  result
}