diff --git a/R/Collect.R b/R/Collect.R index bf387297722c15e61e36d6b768624aec0b73c605..33d3a5ed200437f30364483d1ed26a2171b07c3d 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -76,10 +76,7 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { stop("Parameter 'startr_exec' must be an object of the class ", "'startR_exec', as returned by Collect(..., wait = FALSE).") } - if (Sys.which('ecflow_client') == '') { - stop("ecFlow must be installed in order to collect results from a ", - "Compute() execution.") - } + cluster <- startr_exec[['cluster']] ecflow_server <- startr_exec[['ecflow_server']] suite_id <- startr_exec[['suite_id']] @@ -89,11 +86,6 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { timings <- startr_exec[['timings']] ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') - if (!is.null(cluster[['temp_dir']])) { - remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], - '/STARTR_CHUNKING_', - suite_id, '/') - } find_task_name <- function(received_file) { file_name <- received_file parts <- strsplit(file_name, '__')[[1]] @@ -116,8 +108,39 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { } done <- FALSE attempt <- 1 + if (Sys.which('ecflow_client') == '') { + startR:::.warning("Retrieving the data from remote cluster without ecFlow installation.") + if (timings[['nchunks']] != sum(grepl('output.*\\.Rds', + list.files(paste0(cluster[['temp_dir']], + '/STARTR_CHUNKING_', + suite_id, '/'))))) { + stop("Check all jobs jave finished successfully.") + } + # remove false to test the results + t_begin_merge <- Sys.time() + result <- startR:::.MergeChunks(cluster[['temp_dir']], suite_id, remove = FALSE) + t_end_merge <- Sys.time() + timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs')) + received_files <- list.files(paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_', + suite_id, '/'), full.names = TRUE) + received_timings_files <- received_files[grepl('timings$', received_files)] + for (timings_file in received_timings_files) { + times <- readRDS(timings_file) + timings[['queue']] <- c(timings[['queue']], times['queue']) + timings[['job_setup']] <- c(timings[['job_setup']], times['job_setup']) + timings[['load']] <- c(timings[['load']], times['load']) + timings[['compute']] <- c(timings[['compute']], times['compute']) + } + } else { + if (!is.null(cluster[['temp_dir']])) { + remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], + '/STARTR_CHUNKING_', + suite_id, '/') + } + sum_received_chunks <- sum(grepl('output.*\\.Rds', list.files(ecflow_suite_dir_suite))) + if (cluster[['bidirectional']]) { t_transfer_back <- NA } else { @@ -283,6 +306,7 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { # long ago, so is not straightfowrard to work out total time. timings[['total']] <- NA } +} message(paste0("* Computation ended successfully.")) message(paste0("* Number of chunks: ", timings[['nchunks']]))