Commit f4721419 authored by nperez's avatar nperez
Browse files

First version to collect result from cluster

parent ec8ebfba
Pipeline #6665 passed with stage
in 58 minutes and 18 seconds
......@@ -76,10 +76,7 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
stop("Parameter 'startr_exec' must be an object of the class ",
"'startR_exec', as returned by Collect(..., wait = FALSE).")
}
if (Sys.which('ecflow_client') == '') {
stop("ecFlow must be installed in order to collect results from a ",
"Compute() execution.")
}
cluster <- startr_exec[['cluster']]
ecflow_server <- startr_exec[['ecflow_server']]
suite_id <- startr_exec[['suite_id']]
......@@ -89,11 +86,6 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
timings <- startr_exec[['timings']]
ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_',
suite_id, '/')
if (!is.null(cluster[['temp_dir']])) {
remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']],
'/STARTR_CHUNKING_',
suite_id, '/')
}
find_task_name <- function(received_file) {
file_name <- received_file
parts <- strsplit(file_name, '__')[[1]]
......@@ -116,8 +108,39 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
}
done <- FALSE
attempt <- 1
if (Sys.which('ecflow_client') == '') {
startR:::.warning("Retrieving the data from remote cluster without ecFlow installation.")
if (timings[['nchunks']] != sum(grepl('output.*\\.Rds',
list.files(paste0(cluster[['temp_dir']],
'/STARTR_CHUNKING_',
suite_id, '/'))))) {
stop("Check all jobs jave finished successfully.")
}
# remove false to test the results
t_begin_merge <- Sys.time()
result <- startR:::.MergeChunks(cluster[['temp_dir']], suite_id, remove = FALSE)
t_end_merge <- Sys.time()
timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs'))
received_files <- list.files(paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_',
suite_id, '/'), full.names = TRUE)
received_timings_files <- received_files[grepl('timings$', received_files)]
for (timings_file in received_timings_files) {
times <- readRDS(timings_file)
timings[['queue']] <- c(timings[['queue']], times['queue'])
timings[['job_setup']] <- c(timings[['job_setup']], times['job_setup'])
timings[['load']] <- c(timings[['load']], times['load'])
timings[['compute']] <- c(timings[['compute']], times['compute'])
}
} else {
if (!is.null(cluster[['temp_dir']])) {
remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']],
'/STARTR_CHUNKING_',
suite_id, '/')
}
sum_received_chunks <- sum(grepl('output.*\\.Rds',
list.files(ecflow_suite_dir_suite)))
if (cluster[['bidirectional']]) {
t_transfer_back <- NA
} else {
......@@ -283,6 +306,7 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
# long ago, so is not straightfowrard to work out total time.
timings[['total']] <- NA
}
}
message(paste0("* Computation ended successfully."))
message(paste0("* Number of chunks: ",
timings[['nchunks']]))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment