Collect.R 12.1 KB
Newer Older
Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (!('startR_exec' %in% class(startr_exec))) {
    stop("Parameter 'startr_exec' must be an object of the class ",
Nicolau Manubens's avatar
Nicolau Manubens committed
         "'startR_exec', as returned by Collect(..., wait = FALSE).")
Nicolau Manubens's avatar
Nicolau Manubens committed
  }
  if (Sys.which('ecflow_client') == '') {
    stop("ecFlow must be installed in order to collect results from a ",
         "Compute() execution.")
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  cluster <- startr_exec[['cluster']]
  ecflow_server <- startr_exec[['ecflow_server']]
Nicolau Manubens's avatar
Nicolau Manubens committed
  suite_id <- startr_exec[['suite_id']]
  chunks <- startr_exec[['chunks']]
  ecflow_suite_dir <- startr_exec[['ecflow_suite_dir']]
  timings <- startr_exec[['timings']]
  ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_', 
                                   suite_id, '/')
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (!is.null(cluster[['temp_dir']])) {
    remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], 
                                            '/STARTR_CHUNKING_', 
Nicolau Manubens's avatar
Nicolau Manubens committed
                                            suite_id, '/')
  }
  find_task_name <- function(received_file) {
    file_name <- received_file
    parts <- strsplit(file_name, '__')[[1]]
    parts <- parts[c(2:(length(parts) - 1))]
    chunk_indices <- rev(sapply(parts, function(x) {
      as.numeric(strsplit(x, '_')[[1]][2])
    }))
    task_pattern <- paste(paste0('*_', chunk_indices, '/'), 
                          collapse = '')
    task_glob <- paste0(ecflow_suite_dir_suite, '/*/*/', 
                        task_pattern)
    task_path <- Sys.glob(task_glob)
    if (length(task_path) != 1) {
      stop("Unexpected error while receiving results.")
    }
    task_name <- strsplit(task_path, 'computation')[[1]][2]
    task_name <- paste0('/STARTR_CHUNKING_', suite_id, 
                        '/computation', task_name)
    task_name
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  done <- FALSE
  attempt <- 1
  sum_received_chunks <- 0
  if (cluster[['bidirectional']]) {
    t_transfer_back <- NA
  } else {
    t_transfer_back <- 0
  }
  time_before_first_chunk <- startr_exec[['t_begin_first_chunk']]
  first_chunk_received <- FALSE
  rsync_petition_file_lines <- c('+ *.Rds', '+ *.timings', '+ *.crashed', 
                                 '+ *.running', '- *')
  rsync_petition_file <- tempfile()
  writeLines(rsync_petition_file_lines, rsync_petition_file)
  Sys.sleep(2)
Nicolau Manubens's avatar
Nicolau Manubens committed
  while (!done) {
    failed <- FALSE
    if (cluster[['bidirectional']]) {
Nicolau Manubens's avatar
Nicolau Manubens committed
      status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_",
                       suite_id, " --host=",
                       ecflow_server[['host']], " --port=", ecflow_server[['port']]),
                       intern = TRUE)
      if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) {
        done <- TRUE
      } else if (!wait) {
        stop("Computation in progress...")
      }
      if (!first_chunk_received) {
        if (any(grepl('state:complete', status))) {
          if (!is.null(time_before_first_chunk)) {
            time_after_first_chunk <- Sys.time()
            estimate <- (time_after_first_chunk -
                         time_before_first_chunk) *
                        ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
                                cluster[['max_jobs']])
            units(estimate) <- 'mins'
            startR:::.message(
              paste0('Remaining time estimate (neglecting queue and ',
                     'merge time) (at ', format(time_after_first_chunk),
                     '): ', format(estimate), ' (',
                     format(time_after_first_chunk -
                           time_before_first_chunk), ' per chunk)')
            )
          }
          first_chunk_received <- TRUE
        }
      Sys.sleep(min(sqrt(attempt), 5))
    } else {
      #if (sum_received_chunks == 0) {
      #  # Accounting for the fist chunk received in ByChunks and
      #  # setting it to complete
      #  # ByChunks needs the first chunk to calculate remaining time
      #  received_files <- list.files(ecflow_suite_dir_suite)
      #  received_chunks <- received_files[grepl('Rds$', 
      #                                          received_files)]
      #}
      t_begin_transfer_back <- Sys.time()
      rsync_output <- tryCatch({
        system(paste0("rsync -rav --include-from=", rsync_petition_file, " '", 
                      cluster[['queue_host']], ":", remote_ecflow_suite_dir_suite, "' ",  
Nicolau Manubens's avatar
Nicolau Manubens committed
                      ecflow_suite_dir_suite, "/"), intern = TRUE)
      }, error = function(e) {
        message("Warning: rsync from remote server to collect results failed. ",
                "Retrying soon.")
        failed <- TRUE
      })
      t_end_transfer_back <- Sys.time()
      t_transfer_back <- t_transfer_back + as.numeric(difftime(t_end_transfer_back, 
                                                      t_begin_transfer_back, units = 'secs'))
Nicolau Manubens's avatar
Nicolau Manubens committed
      if (!failed) {
        #if (sum_received_chunks == 0) {
        #  rsync_output <- c(rsync_output, received_chunks)
        #}
        received_running <- grepl('running$', rsync_output)
        for (received_chunk_index in which(received_running)) {
          file_name <- rsync_output[received_chunk_index]
          task_name <- find_task_name(file_name)
          system(paste0('ecflow_client --force=active recursive ',
                        task_name, 
                        " --host=", ecflow_server[['host']],
                        " --port=", ecflow_server[['port']]))
        }
        received_crashed <- grepl('crashed$', rsync_output)
        for (received_chunk_index in which(received_crashed)) {
          file_name <- rsync_output[received_chunk_index]
          task_name <- find_task_name(file_name)
          system(paste0('ecflow_client --force=aborted recursive ',
                        task_name, 
                        " --host=", ecflow_server[['host']],
                        " --port=", ecflow_server[['port']]))
        }
        received_chunks <- grepl('Rds$', rsync_output)
        for (received_chunk_index in which(received_chunks)) {
          file_name <- rsync_output[received_chunk_index]
          task_name <- find_task_name(file_name)
          system(paste0('ecflow_client --force=complete recursive ',
                        task_name, 
                        " --host=", ecflow_server[['host']],
                        " --port=", ecflow_server[['port']]))
          sum_received_chunks <- sum_received_chunks + 1
          if (!first_chunk_received) {
            if (!is.null(time_before_first_chunk)) {
              time_after_first_chunk <- Sys.time()
              estimate <- (time_after_first_chunk -
                           time_before_first_chunk) *
                          ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
                                  cluster[['max_jobs']])
              units(estimate) <- 'mins'
              startR:::.message(
                paste0('Remaining time estimate (neglecting queue and ',
                       'merge time) (at ', format(time_after_first_chunk),
                       '): ', format(estimate), ' (', 
                       format(time_after_first_chunk - 
                             time_before_first_chunk), ' per chunk)')
              )
            }
            first_chunk_received <- TRUE
          }
        }
        if (sum_received_chunks == prod(unlist(chunks))) {
          done <- TRUE
        } else if (!wait) {
          stop("Computation in progress...")
        }
      }
      Sys.sleep(cluster[['polling_period']])
Nicolau Manubens's avatar
Nicolau Manubens committed
    }
    attempt <- attempt + 1
  }
  file.remove(rsync_petition_file)
  timings[['transfer_back']] <- t_transfer_back
  if (!is.null(cluster[['temp_dir']])) {
    system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ', 
Nicolau Manubens's avatar
Nicolau Manubens committed
                  remote_ecflow_suite_dir_suite, '"'))
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (remove) {
    .warning("ATTENTION: The source chunks will be removed from the ",
             "system. Store the result after Collect() ends if needed.")
  }
  t_begin_merge <- Sys.time()
  result <- startR:::.MergeChunks(ecflow_suite_dir, suite_id, remove)
  t_end_merge <- Sys.time()
  timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs'))
  received_files <- list.files(ecflow_suite_dir_suite, full.names = TRUE)
  received_timings_files <- received_files[grepl('timings$', received_files)]
  for (timings_file in received_timings_files) {
    times <- readRDS(timings_file)
    timings[['queue']] <- c(timings[['queue']], times['queue'])
    timings[['job_setup']] <- c(timings[['job_setup']], times['job_setup'])
    timings[['load']] <- c(timings[['load']], times['load'])
    timings[['compute']] <- c(timings[['compute']], times['compute'])
  }
Nicolau Manubens's avatar
Nicolau Manubens committed
  if (remove) {
    system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_",
                  suite_id, " --host=", ecflow_server[['host']],
                  " --port=", ecflow_server[['port']]))
Nicolau Manubens's avatar
Nicolau Manubens committed
    unlink(paste0(ecflow_suite_dir_suite), 
Nicolau Manubens's avatar
Nicolau Manubens committed
           recursive = TRUE)
  }
  if (attempt > 2) {
    t_end_total <- Sys.time()
    timings[['total']] <- as.numeric(difftime(t_end_total, timings[['total']], units = 'secs'))
  } else {
    # When attempt <= 2, it means all results were ready possibly from
    # long ago, so is not straightfowrard to work out total time.
    timings[['total']] <- NA
  }
    message(paste0("* Computation ended successfully."))
    message(paste0("*   Number of chunks: ",
                             timings[['nchunks']]))
    message(paste0("*   Max. number of concurrent chunks (jobs): ",
                             timings[['concurrent_chunks']]))
    message(paste0("*   Requested cores per job: ",  
                             timings[['cores_per_job']]))
    message(paste0("*   Load threads per chunk: ",   
                             timings[['threads_load']]))
    message(paste0("*   Compute threads per chunk: ", 
                             timings[['threads_compute']]))
    message(paste0("*   Total time (s): ",
                             timings[['total']]))
    message(paste0("*     Chunking setup: ",
                             timings[['bychunks_setup']]))
    message(paste0("*     Data upload to cluster: ",
                             timings[['transfer']]))
    message(paste0("*     All chunks: ",
                             timings[['total']] -
                             timings[['bychunks_setup']] -
                             timings[['transfer']] -
                             timings[['transfer_back']] -
                             timings[['merge']]))
    message(paste0("*     Transfer results from cluster: ",
                             timings[['transfer_back']]))
    message(paste0("*     Merge: ",
                             timings[['merge']]))
    message(paste0("*     Each chunk: "))
    message(paste0("*       queue: "))
    message(paste0("*         mean: ",
                             mean(timings[['queue']])))
    message(paste0("*         min: ",
                             min(timings[['queue']])))
    message(paste0("*         max: ",
                             max(timings[['queue']])))
    message(paste0("*       job setup: "))
    message(paste0("*         mean: ",
                             mean(timings[['job_setup']])))
    message(paste0("*         min: ",
                             min(timings[['job_setup']])))
    message(paste0("*         max: ",
                             max(timings[['job_setup']])))
    message(paste0("*       load: "))
    message(paste0("*         mean: ",
                             mean(timings[['load']])))
    message(paste0("*         min: ",
                             min(timings[['load']])))
    message(paste0("*         max: ",
                             max(timings[['load']])))
    message(paste0("*       compute: "))
    message(paste0("*         mean: ",
                             mean(timings[['compute']])))
    message(paste0("*         min: ",
                             min(timings[['compute']])))
    message(paste0("*         max: ",
                             max(timings[['compute']])))
Nicolau Manubens's avatar
Nicolau Manubens committed
  #system("ecflow_client --shutdown --port=5678")
  #system("ecflow_stop.sh -p 5678")
  #result <- readRDS(paste0(ecflow_output_dir, '/result.Rds'))
  #file.remove(paste0(ecflow_output_dir, '/result.Rds'))
  attr(result, 'startR_compute_profiling') <- timings
Nicolau Manubens's avatar
Nicolau Manubens committed
  result
}