Newer
Older
Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
if (!('startR_exec' %in% class(startr_exec))) {
stop("Parameter 'startr_exec' must be an object of the class ",
"'startR_exec', as returned by ByChunks(..., wait = FALSE).")
}
cluster <- startr_exec[['cluster']]
ecflow_host <- startr_exec[['ecflow_host']]
suite_id <- startr_exec[['suite_id']]
shared_dir <- startr_exec[['shared_dir']]
done <- FALSE
attempt <- 1
while (!done) {
Sys.sleep(min(sqrt(attempt), 5))
status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_",
suite_id, " --host=",
ecflow_host[['name']], " --port=", ecflow_host[['port']]),
intern = TRUE)
if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) {
done <- TRUE
} else if (!wait) {
stop("Computation in progress...")
}
attempt <- attempt + 1
}
if (remove) {
.warning("ATTENTION: The source chunks will be removed from the ",
"system. Store the result after Collect() ends if needed.")
}
result <- startR:::.MergeChunks(shared_dir, suite_id, remove)
if (remove) {
system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_",
suite_id, " --host=", ecflow_host[['name']],
" --port=", ecflow_host[['port']]))
unlink(paste0(shared_dir, '/STARTR_CHUNKING_', suite_id),
recursive = TRUE)
}