diff --git a/DESCRIPTION b/DESCRIPTION index 60fa08cdacb88cba6abc5814af42af6020084162..90b03a7c0e083b39c793eeea36148a939e960426 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -42,5 +42,5 @@ URL: https://earth.bsc.es/gitlab/es/startR/ BugReports: https://earth.bsc.es/gitlab/es/startR/-/issues SystemRequirements: cdo ecFlow Encoding: UTF-8 -RoxygenNote: 7.2.0 +RoxygenNote: 7.2.3 Config/testthat/edition: 3 diff --git a/R/Collect.R b/R/Collect.R index 62e105eef8dae986b7b6752d0ceafe91283ff2cf..1bb864825adb752568c9b3d71522a531a1de53a2 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -22,6 +22,9 @@ #' folder under 'ecflow_suite_dir' or 'autosubmit_suite_dir'. To preserve the #' data and Collect() them as many times as desired, set remove to FALSE. The #' default value is TRUE. +#' @param on_remote A logical value deciding to the function is run locally and +#' sync the outputs back from HPC (FALSE, default), or it is run on HPC +#' (TRUE). #'@return A list of merged data array. #' #'@examples @@ -72,8 +75,7 @@ #' } #' #'@export -Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { - +Collect <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) { # Parameter checks if (!is(startr_exec, 'startR_exec')) { stop("Parameter 'startr_exec' must be an object of the class ", @@ -88,20 +90,22 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { if (!is.logical(remove)) { stop("Parameter 'remove' must be logical.") } + if (!is.logical(on_remote)) { + stop("Parameter 'on_remote' must be logical.") + } if (tolower(startr_exec$workflow_manager) == 'ecflow') { - res <- Collect_ecflow(startr_exec, wait = wait, remove = remove) + res <- Collect_ecflow(startr_exec, wait = wait, remove = remove, on_remote = on_remote) } else if (tolower(startr_exec$workflow_manager) == 'autosubmit') { - res <- Collect_autosubmit(startr_exec, wait = wait, remove = remove) + res <- Collect_autosubmit(startr_exec, wait = wait, remove = remove, on_remote = on_remote) } return(res) } +Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) { -Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { - - if (Sys.which('ecflow_client') == '') { + if (!on_remote && Sys.which('ecflow_client') == '') { stop("ecFlow must be installed in order to collect results from a ", "Compute() execution.") } @@ -114,7 +118,8 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { timings <- startr_exec[['timings']] ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') - if (!is.null(cluster[['temp_dir']])) { + if (!is.null(cluster[['temp_dir']])) { #NOTE: Which case doesn't have temp_dir? + remote_ecflow_suite_dir <- cluster[['temp_dir']] remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_', suite_id, '/') @@ -141,8 +146,12 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { } done <- FALSE attempt <- 1 - sum_received_chunks <- sum(grepl('output.*\\.Rds', - list.files(ecflow_suite_dir_suite))) + if (!on_remote) { + #TODO: Is it correct? Not all the cases have "output" as beginning + sum_received_chunks <- sum(grepl('output.*\\.Rds', + list.files(ecflow_suite_dir_suite))) + } + if (cluster[['bidirectional']]) { t_transfer_back <- NA } else { @@ -156,92 +165,19 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { writeLines(rsync_petition_file_lines, rsync_petition_file) Sys.sleep(2) while (!done) { - failed <- FALSE - if (cluster[['bidirectional']]) { - status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_", - suite_id, " --host=", - ecflow_server[['host']], " --port=", ecflow_server[['port']]), - intern = TRUE) - if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) { - done <- TRUE - } else if (!wait) { - stop("Computation in progress...") - } - if (!first_chunk_received) { - if (any(grepl('state:complete', status))) { - if (!is.null(time_before_first_chunk)) { - time_after_first_chunk <- Sys.time() - estimate <- (time_after_first_chunk - - time_before_first_chunk) * - ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) / - cluster[['max_jobs']]) - units(estimate) <- 'mins' - .message( - paste0('Remaining time estimate (neglecting queue and ', - 'merge time) (at ', format(time_after_first_chunk), - '): ', format(estimate), ' (', - format(time_after_first_chunk - - time_before_first_chunk), ' per chunk)') - ) - } - first_chunk_received <- TRUE - } - } - Sys.sleep(min(sqrt(attempt), 5)) - } else { - #if (sum_received_chunks == 0) { - # # Accounting for the fist chunk received in ByChunks and - # # setting it to complete - # # ByChunks needs the first chunk to calculate remaining time - # received_files <- list.files(ecflow_suite_dir_suite) - # received_chunks <- received_files[grepl('Rds$', - # received_files)] - #} - t_begin_transfer_back <- Sys.time() - rsync_output <- tryCatch({ - system(paste0("rsync -rav --include-from=", rsync_petition_file, " '", - cluster[['queue_host']], ":", remote_ecflow_suite_dir_suite, "' ", - ecflow_suite_dir_suite, "/"), intern = TRUE) - }, error = function(e) { - message("Warning: rsync from remote server to collect results failed. ", - "Retrying soon.") - failed <- TRUE - }) - t_end_transfer_back <- Sys.time() - t_transfer_back <- t_transfer_back + as.numeric(difftime(t_end_transfer_back, - t_begin_transfer_back, units = 'secs')) - if (!failed) { - #if (sum_received_chunks == 0) { - # rsync_output <- c(rsync_output, received_chunks) - #} - received_running <- grepl('running$', rsync_output) - for (received_chunk_index in which(received_running)) { - file_name <- rsync_output[received_chunk_index] - task_name <- find_task_name(file_name) - system(paste0('ecflow_client --force=active recursive ', - task_name, - " --host=", ecflow_server[['host']], - " --port=", ecflow_server[['port']])) - } - received_crashed <- grepl('crashed$', rsync_output) - for (received_chunk_index in which(received_crashed)) { - file_name <- rsync_output[received_chunk_index] - task_name <- find_task_name(file_name) - system(paste0('ecflow_client --force=aborted recursive ', - task_name, - " --host=", ecflow_server[['host']], - " --port=", ecflow_server[['port']])) + if (!on_remote) { + if (cluster[['bidirectional']]) { + status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_", + suite_id, " --host=", + ecflow_server[['host']], " --port=", ecflow_server[['port']]), + intern = TRUE) + if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) { + done <- TRUE + } else if (!wait) { + stop("Computation in progress...") } - received_chunks <- grepl('Rds$', rsync_output) - for (received_chunk_index in which(received_chunks)) { - file_name <- rsync_output[received_chunk_index] - task_name <- find_task_name(file_name) - system(paste0('ecflow_client --force=complete recursive ', - task_name, - " --host=", ecflow_server[['host']], - " --port=", ecflow_server[['port']])) - sum_received_chunks <- sum_received_chunks + 1 - if (!first_chunk_received) { + if (!first_chunk_received) { + if (any(grepl('state:complete', status))) { if (!is.null(time_before_first_chunk)) { time_after_first_chunk <- Sys.time() estimate <- (time_after_first_chunk - @@ -252,27 +188,117 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { .message( paste0('Remaining time estimate (neglecting queue and ', 'merge time) (at ', format(time_after_first_chunk), - '): ', format(estimate), ' (', - format(time_after_first_chunk - + '): ', format(estimate), ' (', + format(time_after_first_chunk - time_before_first_chunk), ' per chunk)') ) } first_chunk_received <- TRUE } } - if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { - done <- TRUE - } else if (!wait) { - stop("Computation in progress...") + Sys.sleep(min(sqrt(attempt), 5)) + } else { + #if (sum_received_chunks == 0) { + # # Accounting for the fist chunk received in ByChunks and + # # setting it to complete + # # ByChunks needs the first chunk to calculate remaining time + # received_files <- list.files(ecflow_suite_dir_suite) + # received_chunks <- received_files[grepl('Rds$', + # received_files)] + #} + failed <- FALSE + t_begin_transfer_back <- Sys.time() + rsync_output <- tryCatch({ + system(paste0("rsync -rav --include-from=", rsync_petition_file, " '", + cluster[['queue_host']], ":", remote_ecflow_suite_dir_suite, "' ", + ecflow_suite_dir_suite, "/"), intern = TRUE) + }, error = function(e) { + message("Warning: rsync from remote server to collect results failed. ", + "Retrying soon.") + failed <- TRUE + }) + t_end_transfer_back <- Sys.time() + t_transfer_back <- t_transfer_back + as.numeric(difftime(t_end_transfer_back, + t_begin_transfer_back, units = 'secs')) + if (!failed) { + #if (sum_received_chunks == 0) { + # rsync_output <- c(rsync_output, received_chunks) + #} + received_running <- grepl('running$', rsync_output) + for (received_chunk_index in which(received_running)) { + file_name <- rsync_output[received_chunk_index] + task_name <- find_task_name(file_name) + system(paste0('ecflow_client --force=active recursive ', + task_name, + " --host=", ecflow_server[['host']], + " --port=", ecflow_server[['port']])) + } + received_crashed <- grepl('crashed$', rsync_output) + for (received_chunk_index in which(received_crashed)) { + file_name <- rsync_output[received_chunk_index] + task_name <- find_task_name(file_name) + system(paste0('ecflow_client --force=aborted recursive ', + task_name, + " --host=", ecflow_server[['host']], + " --port=", ecflow_server[['port']])) + } + received_chunks <- grepl('Rds$', rsync_output) + for (received_chunk_index in which(received_chunks)) { + file_name <- rsync_output[received_chunk_index] + task_name <- find_task_name(file_name) + system(paste0('ecflow_client --force=complete recursive ', + task_name, + " --host=", ecflow_server[['host']], + " --port=", ecflow_server[['port']])) + sum_received_chunks <- sum_received_chunks + 1 + if (!first_chunk_received) { + if (!is.null(time_before_first_chunk)) { + time_after_first_chunk <- Sys.time() + estimate <- (time_after_first_chunk - + time_before_first_chunk) * + ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) / + cluster[['max_jobs']]) + units(estimate) <- 'mins' + .message( + paste0('Remaining time estimate (neglecting queue and ', + 'merge time) (at ', format(time_after_first_chunk), + '): ', format(estimate), ' (', + format(time_after_first_chunk - + time_before_first_chunk), ' per chunk)') + ) + } + first_chunk_received <- TRUE + } + } + if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { + done <- TRUE + } else if (!wait) { + stop("Computation in progress...") + } } + Sys.sleep(cluster[['polling_period']]) } - Sys.sleep(cluster[['polling_period']]) + + } else { # on_remote + + sum_received_chunks <- sum(grepl('.*\\.Rds$', list.files(remote_ecflow_suite_dir_suite ))) + + if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { + done <- TRUE + } else if (!wait) { + stop("Computation in progress...") + } else { + message("Computation in progress, ", sum_received_chunks, " of ", prod(unlist(chunks)), " chunks are done.") + message("Will try again after polling_period...") + Sys.sleep(cluster[['polling_period']]) + } + } attempt <- attempt + 1 } file.remove(rsync_petition_file) timings[['transfer_back']] <- t_transfer_back - if (!is.null(cluster[['temp_dir']])) { + if (!on_remote && !is.null(cluster[['temp_dir']])) { system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ', remote_ecflow_suite_dir_suite, '"')) } @@ -280,11 +306,18 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { .warning("ATTENTION: The source chunks will be removed from the ", "system. Store the result after Collect() ends if needed.") } + if (!on_remote) { + target_folder <- ecflow_suite_dir + target_folder_suite <- ecflow_suite_dir_suite + } else { + target_folder <- remote_ecflow_suite_dir + target_folder_suite <- remote_ecflow_suite_dir_suite + } t_begin_merge <- Sys.time() - result <- .MergeChunks(ecflow_suite_dir, suite_id, remove) + result <- .MergeChunks(target_folder, suite_id, remove) t_end_merge <- Sys.time() timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs')) - received_files <- list.files(ecflow_suite_dir_suite, full.names = TRUE) + received_files <- list.files(target_folder_suite, full.names = TRUE) received_timings_files <- received_files[grepl('timings$', received_files)] for (timings_file in received_timings_files) { times <- readRDS(timings_file) @@ -294,11 +327,12 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { timings[['compute']] <- c(timings[['compute']], times['compute']) } if (remove) { - system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_", - suite_id, " --host=", ecflow_server[['host']], - " --port=", ecflow_server[['port']])) - unlink(paste0(ecflow_suite_dir_suite), - recursive = TRUE) + if (!on_remote) { + system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_", + suite_id, " --host=", ecflow_server[['host']], + " --port=", ecflow_server[['port']])) + } + unlink(target_folder_suite, recursive = TRUE) } if (attempt > 2) { t_end_total <- Sys.time() @@ -374,7 +408,7 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { -Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE) { +Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) { suite_id <- startr_exec[['suite_id']] chunks <- startr_exec[['chunks']] diff --git a/man/Collect.Rd b/man/Collect.Rd index d90cacaf8367095c5f4505fa8371151a1fdf4060..e701a00d7a6602dacb5bcb84ab0c723c9ad10561 100644 --- a/man/Collect.Rd +++ b/man/Collect.Rd @@ -4,7 +4,7 @@ \alias{Collect} \title{Collect and merge the computation results} \usage{ -Collect(startr_exec, wait = TRUE, remove = TRUE) +Collect(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) } \arguments{ \item{startr_exec}{An R object returned by Compute() when the parameter 'wait' @@ -25,6 +25,10 @@ received from the HPC after data being collected, as well as the local job folder under 'ecflow_suite_dir' or 'autosubmit_suite_dir'. To preserve the data and Collect() them as many times as desired, set remove to FALSE. The default value is TRUE.} + +\item{on_remote}{A logical value deciding to the function is run locally and +sync the outputs back from HPC (FALSE, default), or it is run on HPC +(TRUE).} } \value{ A list of merged data array.