Collect.R 17.1 KB
Newer Older
aho's avatar
aho committed
#'Collect and merge the computation results
#'
#'The final step of the startR workflow after the data operation. It is used when
#'the parameter 'wait' of Compute() is FALSE, and the functionality includes 
#'updating the job status shown on the EC-Flow GUI and collecting all the chunks
#'of results as one data array when the execution is done. See more details on 
aho's avatar
aho committed
#'\href{https://earth.bsc.es/gitlab/es/startR/}{startR GitLab}.
aho's avatar
aho committed
#'
#'@param startr_exec An R object returned by Compute() when the parameter 'wait'
#'  of Compute() is FALSE. It can be directly from a Compute() call or read from
#'  the RDS file.
#'@param wait A logical value deciding whether the R session waits for the 
#'  Collect() call to finish (TRUE) or not (FALSE). If TRUE, it will be a 
#'  blocking call, in which Collect() will retrieve information from the HPC,
#'  including signals and outputs, each polling_period seconds. The the status
#'  can be monitored on the EC-Flow GUI. Collect() will not return until the 
#'  results of all chunks have been received. If FALSE, Collect() will crash with
#'  an error if the execution has not finished yet, otherwise it will return the
#'  merged array. The default value is TRUE.
#'@param remove A logical value deciding whether to remove of all data results 
#'  received from the HPC (and stored under 'ecflow_suite_dir', the parameter in
#'  Compute()) after being collected. To preserve the data and Collect() it as 
#'  many times as desired, set remove to FALSE. The default value is TRUE.
#'@return A list of merged data array.
#'
#'@examples
aho's avatar
aho committed
#'  data_path <- system.file('extdata', package = 'startR')
aho's avatar
aho committed
#'  path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc')
#'  sdates <- c('200011', '200012')
aho's avatar
aho committed
#'  data <- Start(dat = list(list(path = path_obs)),
aho's avatar
aho committed
#'                var = 'tos',
#'                sdate = sdates,
#'                time = 'all',
#'                latitude = 'all',
#'                longitude = 'all',
#'                return_vars = list(latitude = 'dat',
#'                                   longitude = 'dat',
#'                                   time = 'sdate'),
#'                retrieve = FALSE)
#'  fun <- function(x) {
#'            lat = attributes(x)$Variables$dat1$latitude
#'            weight = sqrt(cos(lat * pi / 180))
#'            corrected = Apply(list(x), target_dims = "latitude",
#'                              fun = function(x) {x * weight})
#'          }
#'  step <- Step(fun = fun,
#'               target_dims = 'latitude',
#'               output_dims = 'latitude',
#'               use_libraries = c('multiApply'),
#'               use_attributes = list(data = "Variables"))
#'  wf <- AddStep(data, step)
#'  \dontrun{
#'  res <- Compute(wf, chunks = list(longitude = 2, sdate = 2),
#'                 threads_load = 1,
#'                 threads_compute = 4,
#'                 cluster = list(queue_host = 'nord3',
#'                                queue_type = 'lsf',
#'                                temp_dir = '/on_hpc/tmp_dir/',
#'                                cores_per_job = 2,
#'                                job_wallclock = '05:00',
#'                                max_jobs = 4,
#'                                extra_queue_params = list('#BSUB -q bsc_es'),
#'                                bidirectional = FALSE,
#'                                polling_period = 10
#'                 ),
#'                 ecflow_suite_dir = '/on_local_machine/username/ecflow_dir/',
#'                 wait = FALSE)
#'  saveRDS(res, file = 'test_collect.Rds')
#'  collect_info <- readRDS('test_collect.Rds')
#'  result <- Collect(collect_info, wait = TRUE)
#'  }
#'
#'@export
Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
  if (!('startR_exec' %in% class(startr_exec))) {
    stop("Parameter 'startr_exec' must be an object of the class ",
         "'startR_exec', as returned by Collect(..., wait = FALSE).")
  }
  cluster <- startr_exec[['cluster']]
  ecflow_server <- startr_exec[['ecflow_server']]
  suite_id <- startr_exec[['suite_id']]
  chunks <- startr_exec[['chunks']]
  num_outputs <- startr_exec[['num_outputs']]
  ecflow_suite_dir <- startr_exec[['ecflow_suite_dir']]
  timings <- startr_exec[['timings']]
  ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_', 
                                   suite_id, '/')
  find_task_name <- function(received_file) {
    file_name <- received_file
    parts <- strsplit(file_name, '__')[[1]]
    parts <- parts[c(2:(length(parts) - 1))]
    chunk_indices <- rev(sapply(parts, function(x) {
      as.numeric(strsplit(x, '_')[[1]][2])
    }))
    task_pattern <- paste(paste0('*_', chunk_indices, '/'), 
                          collapse = '')
    task_glob <- paste0(ecflow_suite_dir_suite, '/*/*/', 
                        task_pattern)
    task_path <- Sys.glob(task_glob)
    if (length(task_path) != 1) {
      stop("Unexpected error while receiving results.")
    }
    task_name <- strsplit(task_path, 'computation')[[1]][2]
    task_name <- paste0('/STARTR_CHUNKING_', suite_id, 
                        '/computation', task_name)
    task_name
  }
  done <- FALSE
  attempt <- 1
  if (Sys.which('ecflow_client') == '') {
    startR:::.warning("Retrieving the data from remote cluster without ecFlow installation.")
    if (timings[['nchunks']] != sum(grepl('output.*\\.Rds', 
                                    list.files(paste0(cluster[['temp_dir']],
                                              '/STARTR_CHUNKING_',
                                              suite_id, '/'))))) {
      stop("Check all jobs jave finished successfully.")
    }
    # remove false to test the results
    t_begin_merge <- Sys.time()
    result <- startR:::.MergeChunks(cluster[['temp_dir']], suite_id, remove = FALSE)
    t_end_merge <- Sys.time()
    timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs'))
    received_files <- list.files(paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_',
                                        suite_id, '/'), full.names = TRUE)
    received_timings_files <- received_files[grepl('timings$', received_files)]
    for (timings_file in received_timings_files) {
      times <- readRDS(timings_file)
      timings[['queue']] <- c(timings[['queue']], times['queue'])
      timings[['job_setup']] <- c(timings[['job_setup']], times['job_setup'])
      timings[['load']] <- c(timings[['load']], times['load'])
      timings[['compute']] <- c(timings[['compute']], times['compute'])
    }
  } else {
    if (!is.null(cluster[['temp_dir']])) {
      remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']],
                                              '/STARTR_CHUNKING_',
                                              suite_id, '/')
  }

  sum_received_chunks <- sum(grepl('output.*\\.Rds', 
                                   list.files(ecflow_suite_dir_suite)))
  if (cluster[['bidirectional']]) {
    t_transfer_back <- NA
  } else {
    t_transfer_back <- 0
  }
  time_before_first_chunk <- startr_exec[['t_begin_first_chunk']]
  first_chunk_received <- FALSE
  rsync_petition_file_lines <- c('+ *.Rds', '+ *.timings', '+ *.crashed', 
                                 '+ *.running', '- *')
  rsync_petition_file <- tempfile()
  writeLines(rsync_petition_file_lines, rsync_petition_file)
  Sys.sleep(2)
  while (!done) {
    failed <- FALSE
    if (cluster[['bidirectional']]) {
      status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_",
                              suite_id, " --host=",
                              ecflow_server[['host']], " --port=", ecflow_server[['port']]),
                       intern = TRUE)
      if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) {
        done <- TRUE
      } else if (!wait) {
        stop("Computation in progress...")
      }
      if (!first_chunk_received) {
        if (any(grepl('state:complete', status))) {
          if (!is.null(time_before_first_chunk)) {
            time_after_first_chunk <- Sys.time()
            estimate <- (time_after_first_chunk -
                           time_before_first_chunk) *
              ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
                        cluster[['max_jobs']])
            units(estimate) <- 'mins'
aho's avatar
aho committed
            .message(
              paste0('Remaining time estimate (neglecting queue and ',
                     'merge time) (at ', format(time_after_first_chunk),
                     '): ', format(estimate), ' (',
                     format(time_after_first_chunk -
                              time_before_first_chunk), ' per chunk)')
            )
          }
          first_chunk_received <- TRUE
        }
      }
      Sys.sleep(min(sqrt(attempt), 5))
    } else {
      #if (sum_received_chunks == 0) {
      #  # Accounting for the fist chunk received in ByChunks and
      #  # setting it to complete
      #  # ByChunks needs the first chunk to calculate remaining time
      #  received_files <- list.files(ecflow_suite_dir_suite)
      #  received_chunks <- received_files[grepl('Rds$', 
      #                                          received_files)]
      #}
      t_begin_transfer_back <- Sys.time()
      rsync_output <- tryCatch({
        system(paste0("rsync -rav --include-from=", rsync_petition_file, " '", 
                      cluster[['queue_host']], ":", remote_ecflow_suite_dir_suite, "' ",  
                      ecflow_suite_dir_suite, "/"), intern = TRUE)
      }, error = function(e) {
        message("Warning: rsync from remote server to collect results failed. ",
                "Retrying soon.")
        failed <- TRUE
      })
      t_end_transfer_back <- Sys.time()
      t_transfer_back <- t_transfer_back + as.numeric(difftime(t_end_transfer_back, 
                                                               t_begin_transfer_back, units = 'secs'))
      if (!failed) {
        #if (sum_received_chunks == 0) {
        #  rsync_output <- c(rsync_output, received_chunks)
        #}
        received_running <- grepl('running$', rsync_output)
        for (received_chunk_index in which(received_running)) {
          file_name <- rsync_output[received_chunk_index]
          task_name <- find_task_name(file_name)
          system(paste0('ecflow_client --force=active recursive ',
                        task_name, 
                        " --host=", ecflow_server[['host']],
                        " --port=", ecflow_server[['port']]))
        }
        received_crashed <- grepl('crashed$', rsync_output)
        for (received_chunk_index in which(received_crashed)) {
          file_name <- rsync_output[received_chunk_index]
          task_name <- find_task_name(file_name)
          system(paste0('ecflow_client --force=aborted recursive ',
                        task_name, 
                        " --host=", ecflow_server[['host']],
                        " --port=", ecflow_server[['port']]))
        }
        received_chunks <- grepl('Rds$', rsync_output)
        for (received_chunk_index in which(received_chunks)) {
          file_name <- rsync_output[received_chunk_index]
          task_name <- find_task_name(file_name)
          system(paste0('ecflow_client --force=complete recursive ',
                        task_name, 
                        " --host=", ecflow_server[['host']],
                        " --port=", ecflow_server[['port']]))
          sum_received_chunks <- sum_received_chunks + 1
          if (!first_chunk_received) {
            if (!is.null(time_before_first_chunk)) {
              time_after_first_chunk <- Sys.time()
              estimate <- (time_after_first_chunk -
                             time_before_first_chunk) *
                ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
                          cluster[['max_jobs']])
              units(estimate) <- 'mins'
aho's avatar
aho committed
              .message(
                paste0('Remaining time estimate (neglecting queue and ',
                       'merge time) (at ', format(time_after_first_chunk),
                       '): ', format(estimate), ' (', 
                       format(time_after_first_chunk - 
                                time_before_first_chunk), ' per chunk)')
              )
            }
            first_chunk_received <- TRUE
          }
        }
        if (sum_received_chunks / num_outputs == prod(unlist(chunks))) {
          done <- TRUE
        } else if (!wait) {
          stop("Computation in progress...")
        }
      }
      Sys.sleep(cluster[['polling_period']])
    }
    attempt <- attempt + 1
  }
  file.remove(rsync_petition_file)
  timings[['transfer_back']] <- t_transfer_back
  if (!is.null(cluster[['temp_dir']])) {
    system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ', 
                  remote_ecflow_suite_dir_suite, '"'))
  }
  if (remove) {
    .warning("ATTENTION: The source chunks will be removed from the ",
             "system. Store the result after Collect() ends if needed.")
  }
  t_begin_merge <- Sys.time()
aho's avatar
aho committed
  result <- .MergeChunks(ecflow_suite_dir, suite_id, remove)
  t_end_merge <- Sys.time()
  timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs'))
  received_files <- list.files(ecflow_suite_dir_suite, full.names = TRUE)
  received_timings_files <- received_files[grepl('timings$', received_files)]
  for (timings_file in received_timings_files) {
    times <- readRDS(timings_file)
    timings[['queue']] <- c(timings[['queue']], times['queue'])
    timings[['job_setup']] <- c(timings[['job_setup']], times['job_setup'])
    timings[['load']] <- c(timings[['load']], times['load'])
    timings[['compute']] <- c(timings[['compute']], times['compute'])
  }
  if (remove) {
    system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_",
                  suite_id, " --host=", ecflow_server[['host']],
                  " --port=", ecflow_server[['port']]))
    unlink(paste0(ecflow_suite_dir_suite), 
           recursive = TRUE)
  }
  if (attempt > 2) {
    t_end_total <- Sys.time()
    timings[['total']] <- as.numeric(difftime(t_end_total, timings[['total']], units = 'secs'))
  } else {
    # When attempt <= 2, it means all results were ready possibly from
    # long ago, so is not straightfowrard to work out total time.
    timings[['total']] <- NA
  }
  message(paste0("* Computation ended successfully."))
  message(paste0("*   Number of chunks: ",
                 timings[['nchunks']]))
  message(paste0("*   Max. number of concurrent chunks (jobs): ",
                 timings[['concurrent_chunks']]))
  message(paste0("*   Requested cores per job: ",  
                 timings[['cores_per_job']]))
  message(paste0("*   Load threads per chunk: ",   
                 timings[['threads_load']]))
  message(paste0("*   Compute threads per chunk: ", 
                 timings[['threads_compute']]))
  message(paste0("*   Total time (s): ",
                 timings[['total']]))
  message(paste0("*     Chunking setup: ",
                 timings[['bychunks_setup']]))
  message(paste0("*     Data upload to cluster: ",
                 timings[['transfer']]))
  message(paste0("*     All chunks: ",
                 timings[['total']] -
                   timings[['bychunks_setup']] -
                   timings[['transfer']] -
                   timings[['transfer_back']] -
                   timings[['merge']]))
  message(paste0("*     Transfer results from cluster: ",
                 timings[['transfer_back']]))
  message(paste0("*     Merge: ",
                 timings[['merge']]))
  message(paste0("*     Each chunk: "))
  message(paste0("*       queue: "))
  message(paste0("*         mean: ",
                 mean(timings[['queue']])))
  message(paste0("*         min: ",
                 min(timings[['queue']])))
  message(paste0("*         max: ",
                 max(timings[['queue']])))
  message(paste0("*       job setup: "))
  message(paste0("*         mean: ",
                 mean(timings[['job_setup']])))
  message(paste0("*         min: ",
                 min(timings[['job_setup']])))
  message(paste0("*         max: ",
                 max(timings[['job_setup']])))
  message(paste0("*       load: "))
  message(paste0("*         mean: ",
                 mean(timings[['load']])))
  message(paste0("*         min: ",
                 min(timings[['load']])))
  message(paste0("*         max: ",
                 max(timings[['load']])))
  message(paste0("*       compute: "))
  message(paste0("*         mean: ",
                 mean(timings[['compute']])))
  message(paste0("*         min: ",
                 min(timings[['compute']])))
  message(paste0("*         max: ",
                 max(timings[['compute']])))
  #system("ecflow_client --shutdown --port=5678")
  #system("ecflow_stop.sh -p 5678")
  #result <- readRDS(paste0(ecflow_output_dir, '/result.Rds'))
  #file.remove(paste0(ecflow_output_dir, '/result.Rds'))
  attr(result, 'startR_compute_profiling') <- timings
  result
}