From 2e82b56fff5a539630e93290a8f6128c79f017b9 Mon Sep 17 00:00:00 2001 From: aho Date: Fri, 1 Dec 2023 16:49:50 +0100 Subject: [PATCH 1/2] Enable Collect() to run on HPCs and return combined array there --- DESCRIPTION | 2 +- R/Collect.R | 90 +++++++++++++++++++++++++++++++++++++++----------- man/Collect.Rd | 6 +++- 3 files changed, 76 insertions(+), 22 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 60fa08c..90b03a7 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -42,5 +42,5 @@ URL: https://earth.bsc.es/gitlab/es/startR/ BugReports: https://earth.bsc.es/gitlab/es/startR/-/issues SystemRequirements: cdo ecFlow Encoding: UTF-8 -RoxygenNote: 7.2.0 +RoxygenNote: 7.2.3 Config/testthat/edition: 3 diff --git a/R/Collect.R b/R/Collect.R index 6d752f5..9baa264 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -22,6 +22,9 @@ #' folder under 'ecflow_suite_dir' or 'autosubmit_suite_dir'. To preserve the #' data and Collect() them as many times as desired, set remove to FALSE. The #' default value is TRUE. +#' @param on_remote A logical value deciding to the function is run locally and +#' sync the outputs back from HPC (FALSE, default), or it is run on HPC +#' (TRUE). #'@return A list of merged data array. #' #'@examples @@ -72,8 +75,9 @@ #' } #' #'@export -Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { - +#--------NEW------- +Collect <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) { +#-------NEW_END----------- # Parameter checks if (!is(startr_exec, 'startR_exec')) { stop("Parameter 'startr_exec' must be an object of the class ", @@ -88,23 +92,29 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { if (!is.logical(remove)) { stop("Parameter 'remove' must be logical.") } +#------NEW--------- + if (!is.logical(on_remote)) { + stop("Parameter 'on_remote' must be logical.") + } if (tolower(startr_exec$workflow_manager) == 'ecflow') { - res <- Collect_ecflow(startr_exec, wait = wait, remove = remove) + res <- Collect_ecflow(startr_exec, wait = wait, remove = remove, on_remote = on_remote) } else if (tolower(startr_exec$workflow_manager) == 'autosubmit') { - res <- Collect_autosubmit(startr_exec, wait = wait, remove = remove) + res <- Collect_autosubmit(startr_exec, wait = wait, remove = remove, on_remote = on_remote) } +#-------NEW_END---------- return(res) } +#------NEW--------- +Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) { -Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { - - if (Sys.which('ecflow_client') == '') { + if (!on_remote && Sys.which('ecflow_client') == '') { stop("ecFlow must be installed in order to collect results from a ", "Compute() execution.") } +#-------NEW_END----------- cluster <- startr_exec[['cluster']] ecflow_server <- startr_exec[['ecflow_server']] suite_id <- startr_exec[['suite_id']] @@ -114,7 +124,9 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { timings <- startr_exec[['timings']] ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') - if (!is.null(cluster[['temp_dir']])) { + if (!is.null(cluster[['temp_dir']])) { #NOTE: Which case doesn't have temp_dir? +#-------NEW--------- + remote_ecflow_suite_dir <- cluster[['temp_dir']] remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_', suite_id, '/') @@ -141,8 +153,13 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { } done <- FALSE attempt <- 1 - sum_received_chunks <- sum(grepl('output.*\\.Rds', - list.files(ecflow_suite_dir_suite))) +#--------NEW----------- + if (!on_remote) { + #TODO: Is it correct? Not all the cases have "output" as beginning + sum_received_chunks <- sum(grepl('output.*\\.Rds', + list.files(ecflow_suite_dir_suite))) + } +#---------NEW_END-------- if (cluster[['bidirectional']]) { t_transfer_back <- NA } else { @@ -156,7 +173,9 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { writeLines(rsync_petition_file_lines, rsync_petition_file) Sys.sleep(2) while (!done) { - failed <- FALSE +#-------NEW----------- + if (!on_remote) { +#------NEW_END---------- if (cluster[['bidirectional']]) { status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_", suite_id, " --host=", @@ -197,6 +216,7 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { # received_chunks <- received_files[grepl('Rds$', # received_files)] #} + failed <- FALSE t_begin_transfer_back <- Sys.time() rsync_output <- tryCatch({ system(paste0("rsync -rav --include-from=", rsync_petition_file, " '", @@ -268,11 +288,30 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { } Sys.sleep(cluster[['polling_period']]) } +#--------NEW---------- + } else { # on_remote + + sum_received_chunks <- sum(grepl('.*\\.Rds$', list.files(remote_ecflow_suite_dir_suite ))) + + if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { + done <- TRUE + } else if (!wait) { + stop("Computation in progress...") + } else { + message("Computation in progress, ", sum_received_chunks, " of ", prod(unlist(chunks)), " chunks are done.") + message("Will try again after polling_period...") + Sys.sleep(cluster[['polling_period']]) + } + + } +#-------NEW_END--------- attempt <- attempt + 1 } file.remove(rsync_petition_file) timings[['transfer_back']] <- t_transfer_back - if (!is.null(cluster[['temp_dir']])) { + #------NEW-------- + if (!on_remote && !is.null(cluster[['temp_dir']])) { + #-------NEW_END-------- system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ', remote_ecflow_suite_dir_suite, '"')) } @@ -280,11 +319,19 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { .warning("ATTENTION: The source chunks will be removed from the ", "system. Store the result after Collect() ends if needed.") } +#-------NEW------- + if (!on_remote) { + target_folder <- ecflow_suite_dir + target_folder_suite <- ecflow_suite_dir_suite + } else { + target_folder <- remote_ecflow_suite_dir + target_folder_suite <- remote_ecflow_suite_dir_suite + } t_begin_merge <- Sys.time() - result <- .MergeChunks(ecflow_suite_dir, suite_id, remove) + result <- .MergeChunks(target_folder, suite_id, remove) t_end_merge <- Sys.time() timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs')) - received_files <- list.files(ecflow_suite_dir_suite, full.names = TRUE) + received_files <- list.files(target_folder_suite, full.names = TRUE) received_timings_files <- received_files[grepl('timings$', received_files)] for (timings_file in received_timings_files) { times <- readRDS(timings_file) @@ -294,11 +341,14 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { timings[['compute']] <- c(timings[['compute']], times['compute']) } if (remove) { - system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_", - suite_id, " --host=", ecflow_server[['host']], - " --port=", ecflow_server[['port']])) - unlink(paste0(ecflow_suite_dir_suite), - recursive = TRUE) +#--------NEW-------------- + if (!on_remote) { + system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_", + suite_id, " --host=", ecflow_server[['host']], + " --port=", ecflow_server[['port']])) + } + unlink(target_folder_suite, recursive = TRUE) +#---------NEW_END----------- } if (attempt > 2) { t_end_total <- Sys.time() @@ -374,7 +424,7 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE) { -Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE) { +Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) { suite_id <- startr_exec[['suite_id']] chunks <- startr_exec[['chunks']] diff --git a/man/Collect.Rd b/man/Collect.Rd index d90caca..e701a00 100644 --- a/man/Collect.Rd +++ b/man/Collect.Rd @@ -4,7 +4,7 @@ \alias{Collect} \title{Collect and merge the computation results} \usage{ -Collect(startr_exec, wait = TRUE, remove = TRUE) +Collect(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) } \arguments{ \item{startr_exec}{An R object returned by Compute() when the parameter 'wait' @@ -25,6 +25,10 @@ received from the HPC after data being collected, as well as the local job folder under 'ecflow_suite_dir' or 'autosubmit_suite_dir'. To preserve the data and Collect() them as many times as desired, set remove to FALSE. The default value is TRUE.} + +\item{on_remote}{A logical value deciding to the function is run locally and +sync the outputs back from HPC (FALSE, default), or it is run on HPC +(TRUE).} } \value{ A list of merged data array. -- GitLab From c32d9286ec9f217bb0dd0b34cc4446a8342f6406 Mon Sep 17 00:00:00 2001 From: aho Date: Mon, 11 Dec 2023 14:32:08 +0100 Subject: [PATCH 2/2] Clean code --- R/Collect.R | 206 ++++++++++++++++++++++++---------------------------- 1 file changed, 95 insertions(+), 111 deletions(-) diff --git a/R/Collect.R b/R/Collect.R index 05ed1b4..1bb8648 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -75,9 +75,7 @@ #' } #' #'@export -#--------NEW------- Collect <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) { -#-------NEW_END----------- # Parameter checks if (!is(startr_exec, 'startR_exec')) { stop("Parameter 'startr_exec' must be an object of the class ", @@ -92,7 +90,6 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) if (!is.logical(remove)) { stop("Parameter 'remove' must be logical.") } -#------NEW--------- if (!is.logical(on_remote)) { stop("Parameter 'on_remote' must be logical.") } @@ -102,19 +99,16 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) } else if (tolower(startr_exec$workflow_manager) == 'autosubmit') { res <- Collect_autosubmit(startr_exec, wait = wait, remove = remove, on_remote = on_remote) } -#-------NEW_END---------- return(res) } -#------NEW--------- Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE) { if (!on_remote && Sys.which('ecflow_client') == '') { stop("ecFlow must be installed in order to collect results from a ", "Compute() execution.") } -#-------NEW_END----------- cluster <- startr_exec[['cluster']] ecflow_server <- startr_exec[['ecflow_server']] suite_id <- startr_exec[['suite_id']] @@ -125,7 +119,6 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') if (!is.null(cluster[['temp_dir']])) { #NOTE: Which case doesn't have temp_dir? -#-------NEW--------- remote_ecflow_suite_dir <- cluster[['temp_dir']] remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_', @@ -153,13 +146,12 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = } done <- FALSE attempt <- 1 -#--------NEW----------- if (!on_remote) { #TODO: Is it correct? Not all the cases have "output" as beginning sum_received_chunks <- sum(grepl('output.*\\.Rds', list.files(ecflow_suite_dir_suite))) } -#---------NEW_END-------- + if (cluster[['bidirectional']]) { t_transfer_back <- NA } else { @@ -173,95 +165,19 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = writeLines(rsync_petition_file_lines, rsync_petition_file) Sys.sleep(2) while (!done) { -#-------NEW----------- if (!on_remote) { -#------NEW_END---------- - if (cluster[['bidirectional']]) { - status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_", - suite_id, " --host=", - ecflow_server[['host']], " --port=", ecflow_server[['port']]), - intern = TRUE) - if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) { - done <- TRUE - } else if (!wait) { - stop("Computation in progress...") - } - if (!first_chunk_received) { - if (any(grepl('state:complete', status))) { - if (!is.null(time_before_first_chunk)) { - time_after_first_chunk <- Sys.time() - estimate <- (time_after_first_chunk - - time_before_first_chunk) * - ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) / - cluster[['max_jobs']]) - units(estimate) <- 'mins' - .message( - paste0('Remaining time estimate (neglecting queue and ', - 'merge time) (at ', format(time_after_first_chunk), - '): ', format(estimate), ' (', - format(time_after_first_chunk - - time_before_first_chunk), ' per chunk)') - ) - } - first_chunk_received <- TRUE - } - } - Sys.sleep(min(sqrt(attempt), 5)) - } else { - #if (sum_received_chunks == 0) { - # # Accounting for the fist chunk received in ByChunks and - # # setting it to complete - # # ByChunks needs the first chunk to calculate remaining time - # received_files <- list.files(ecflow_suite_dir_suite) - # received_chunks <- received_files[grepl('Rds$', - # received_files)] - #} - failed <- FALSE - t_begin_transfer_back <- Sys.time() - rsync_output <- tryCatch({ - system(paste0("rsync -rav --include-from=", rsync_petition_file, " '", - cluster[['queue_host']], ":", remote_ecflow_suite_dir_suite, "' ", - ecflow_suite_dir_suite, "/"), intern = TRUE) - }, error = function(e) { - message("Warning: rsync from remote server to collect results failed. ", - "Retrying soon.") - failed <- TRUE - }) - t_end_transfer_back <- Sys.time() - t_transfer_back <- t_transfer_back + as.numeric(difftime(t_end_transfer_back, - t_begin_transfer_back, units = 'secs')) - if (!failed) { - #if (sum_received_chunks == 0) { - # rsync_output <- c(rsync_output, received_chunks) - #} - received_running <- grepl('running$', rsync_output) - for (received_chunk_index in which(received_running)) { - file_name <- rsync_output[received_chunk_index] - task_name <- find_task_name(file_name) - system(paste0('ecflow_client --force=active recursive ', - task_name, - " --host=", ecflow_server[['host']], - " --port=", ecflow_server[['port']])) - } - received_crashed <- grepl('crashed$', rsync_output) - for (received_chunk_index in which(received_crashed)) { - file_name <- rsync_output[received_chunk_index] - task_name <- find_task_name(file_name) - system(paste0('ecflow_client --force=aborted recursive ', - task_name, - " --host=", ecflow_server[['host']], - " --port=", ecflow_server[['port']])) + if (cluster[['bidirectional']]) { + status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_", + suite_id, " --host=", + ecflow_server[['host']], " --port=", ecflow_server[['port']]), + intern = TRUE) + if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) { + done <- TRUE + } else if (!wait) { + stop("Computation in progress...") } - received_chunks <- grepl('Rds$', rsync_output) - for (received_chunk_index in which(received_chunks)) { - file_name <- rsync_output[received_chunk_index] - task_name <- find_task_name(file_name) - system(paste0('ecflow_client --force=complete recursive ', - task_name, - " --host=", ecflow_server[['host']], - " --port=", ecflow_server[['port']])) - sum_received_chunks <- sum_received_chunks + 1 - if (!first_chunk_received) { + if (!first_chunk_received) { + if (any(grepl('state:complete', status))) { if (!is.null(time_before_first_chunk)) { time_after_first_chunk <- Sys.time() estimate <- (time_after_first_chunk - @@ -272,23 +188,97 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = .message( paste0('Remaining time estimate (neglecting queue and ', 'merge time) (at ', format(time_after_first_chunk), - '): ', format(estimate), ' (', - format(time_after_first_chunk - + '): ', format(estimate), ' (', + format(time_after_first_chunk - time_before_first_chunk), ' per chunk)') ) } first_chunk_received <- TRUE } } - if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { - done <- TRUE - } else if (!wait) { - stop("Computation in progress...") + Sys.sleep(min(sqrt(attempt), 5)) + } else { + #if (sum_received_chunks == 0) { + # # Accounting for the fist chunk received in ByChunks and + # # setting it to complete + # # ByChunks needs the first chunk to calculate remaining time + # received_files <- list.files(ecflow_suite_dir_suite) + # received_chunks <- received_files[grepl('Rds$', + # received_files)] + #} + failed <- FALSE + t_begin_transfer_back <- Sys.time() + rsync_output <- tryCatch({ + system(paste0("rsync -rav --include-from=", rsync_petition_file, " '", + cluster[['queue_host']], ":", remote_ecflow_suite_dir_suite, "' ", + ecflow_suite_dir_suite, "/"), intern = TRUE) + }, error = function(e) { + message("Warning: rsync from remote server to collect results failed. ", + "Retrying soon.") + failed <- TRUE + }) + t_end_transfer_back <- Sys.time() + t_transfer_back <- t_transfer_back + as.numeric(difftime(t_end_transfer_back, + t_begin_transfer_back, units = 'secs')) + if (!failed) { + #if (sum_received_chunks == 0) { + # rsync_output <- c(rsync_output, received_chunks) + #} + received_running <- grepl('running$', rsync_output) + for (received_chunk_index in which(received_running)) { + file_name <- rsync_output[received_chunk_index] + task_name <- find_task_name(file_name) + system(paste0('ecflow_client --force=active recursive ', + task_name, + " --host=", ecflow_server[['host']], + " --port=", ecflow_server[['port']])) + } + received_crashed <- grepl('crashed$', rsync_output) + for (received_chunk_index in which(received_crashed)) { + file_name <- rsync_output[received_chunk_index] + task_name <- find_task_name(file_name) + system(paste0('ecflow_client --force=aborted recursive ', + task_name, + " --host=", ecflow_server[['host']], + " --port=", ecflow_server[['port']])) + } + received_chunks <- grepl('Rds$', rsync_output) + for (received_chunk_index in which(received_chunks)) { + file_name <- rsync_output[received_chunk_index] + task_name <- find_task_name(file_name) + system(paste0('ecflow_client --force=complete recursive ', + task_name, + " --host=", ecflow_server[['host']], + " --port=", ecflow_server[['port']])) + sum_received_chunks <- sum_received_chunks + 1 + if (!first_chunk_received) { + if (!is.null(time_before_first_chunk)) { + time_after_first_chunk <- Sys.time() + estimate <- (time_after_first_chunk - + time_before_first_chunk) * + ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) / + cluster[['max_jobs']]) + units(estimate) <- 'mins' + .message( + paste0('Remaining time estimate (neglecting queue and ', + 'merge time) (at ', format(time_after_first_chunk), + '): ', format(estimate), ' (', + format(time_after_first_chunk - + time_before_first_chunk), ' per chunk)') + ) + } + first_chunk_received <- TRUE + } + } + if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { + done <- TRUE + } else if (!wait) { + stop("Computation in progress...") + } } + Sys.sleep(cluster[['polling_period']]) } - Sys.sleep(cluster[['polling_period']]) - } -#--------NEW---------- + } else { # on_remote sum_received_chunks <- sum(grepl('.*\\.Rds$', list.files(remote_ecflow_suite_dir_suite ))) @@ -304,14 +294,11 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = } } -#-------NEW_END--------- attempt <- attempt + 1 } file.remove(rsync_petition_file) timings[['transfer_back']] <- t_transfer_back - #------NEW-------- if (!on_remote && !is.null(cluster[['temp_dir']])) { - #-------NEW_END-------- system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ', remote_ecflow_suite_dir_suite, '"')) } @@ -319,7 +306,6 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = .warning("ATTENTION: The source chunks will be removed from the ", "system. Store the result after Collect() ends if needed.") } -#-------NEW------- if (!on_remote) { target_folder <- ecflow_suite_dir target_folder_suite <- ecflow_suite_dir_suite @@ -341,14 +327,12 @@ Collect_ecflow <- function(startr_exec, wait = TRUE, remove = TRUE, on_remote = timings[['compute']] <- c(timings[['compute']], times['compute']) } if (remove) { -#--------NEW-------------- if (!on_remote) { system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_", suite_id, " --host=", ecflow_server[['host']], " --port=", ecflow_server[['port']])) } unlink(target_folder_suite, recursive = TRUE) -#---------NEW_END----------- } if (attempt > 2) { t_end_total <- Sys.time() -- GitLab