Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { if (!('startR_exec' %in% class(startr_exec))) { stop("Parameter 'startr_exec' must be an object of the class ", "'startR_exec', as returned by Collect(..., wait = FALSE).") } if (Sys.which('ecflow_client') == '') { stop("ecFlow must be installed in order to collect results from a ", "Compute() execution.") } cluster <- startr_exec[['cluster']] ecflow_server <- startr_exec[['ecflow_server']] suite_id <- startr_exec[['suite_id']] chunks <- startr_exec[['chunks']] ecflow_suite_dir <- startr_exec[['ecflow_suite_dir']] if (!is.null(cluster[['temp_dir']])) { ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_', suite_id, '/') remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_', suite_id, '/') } done <- FALSE attempt <- 1 while (!done) { failed <- FALSE if (cluster[['bidirectional']]) { Sys.sleep(min(sqrt(attempt), 5)) status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_", suite_id, " --host=", ecflow_server[['host']], " --port=", ecflow_server[['port']]), intern = TRUE) if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) { done <- TRUE } else if (!wait) { stop("Computation in progress...") } } else { Sys.sleep(cluster[['polling_period']]) rsync_output <- tryCatch({ system(paste0("rsync -rav ", cluster[['queue_host']], ":", remote_ecflow_suite_dir_suite, "/*.Rds ", ecflow_suite_dir_suite, "/"), intern = TRUE) }, error = function(e) { message("Warning: rsync from remote server to collect results failed. ", "Retrying soon.") failed <- TRUE }) if (!failed) { received_chunks <- grepl('Rds$', list.files(ecflow_suite_dir_suite)) if (received_chunks == prod(unlist(chunks))) { done <- TRUE } else if (!wait) { stop("Computation in progress...") } } } attempt <- attempt + 1 } if (!is.null(cluster[['temp_dir']])) { system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ', remote_ecflow_suite_dir_suite, '"')) } if (remove) { .warning("ATTENTION: The source chunks will be removed from the ", "system. Store the result after Collect() ends if needed.") } result <- startR:::.MergeChunks(ecflow_suite_dir, suite_id, remove) if (remove) { system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_", suite_id, " --host=", ecflow_server[['host']], " --port=", ecflow_server[['port']])) unlink(paste0(ecflow_suite_dir_suite), recursive = TRUE) } #system("ecflow_client --shutdown --port=5678") #system("ecflow_stop.sh -p 5678") #result <- readRDS(paste0(ecflow_output_dir, '/result.Rds')) #file.remove(paste0(ecflow_output_dir, '/result.Rds')) result }