Newer
Older
Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
if (!('startR_exec' %in% class(startr_exec))) {
stop("Parameter 'startr_exec' must be an object of the class ",
if (Sys.which('ecflow_client') == '') {
stop("ecFlow must be installed in order to collect results from a ",
"Compute() execution.")
}
ecflow_server <- startr_exec[['ecflow_server']]
ecflow_suite_dir <- startr_exec[['ecflow_suite_dir']]
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
failed <- FALSE
if (cluster[['bidirectional']]) {
Sys.sleep(min(sqrt(attempt), 5))
} else {
Sys.sleep(cluster[['polling_period']])
remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_',
suite_id, '/')
rsync_output <- tryCatch({
system(paste0("rsync -rav ", cluster[['queue_host']], ":",
remote_ecflow_suite_dir_suite, "/*.Rds ",
ecflow_suite_dir, "/"), intern = TRUE)
}, error = function(e) {
message("Warning: rsync from remote server to collect results failed. ",
"Retrying soon.")
failed <- TRUE
})
}
if (!failed) {
if (cluster[['bidirectional']]) {
status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_",
suite_id, " --host=",
ecflow_server[['host']], " --port=", ecflow_server[['port']]),
intern = TRUE)
if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) {
done <- TRUE
} else if (!wait) {
stop("Computation in progress...")
}
} else {
received_chunks <- grepl('Rds$', list.files(ecflow_suite_dir))
if (received_chunks == prod(unlist(chunks))) {
done <- TRUE
} else if (!wait) {
stop("Computation in progress...")
}
}
if (!is.null(cluster[['temp_dir']])) {
ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']], '/STARTR_CHUNKING_', suite_id)
system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ',
ecflow_suite_dir_suite, '"'))
}
if (remove) {
.warning("ATTENTION: The source chunks will be removed from the ",
"system. Store the result after Collect() ends if needed.")
}
result <- startR:::.MergeChunks(ecflow_suite_dir, suite_id, remove)
if (remove) {
system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_",
suite_id, " --host=", ecflow_server[['host']],
" --port=", ecflow_server[['port']]))
unlink(paste0(ecflow_suite_dir, '/STARTR_CHUNKING_', suite_id),
#system("ecflow_client --shutdown --port=5678")
#system("ecflow_stop.sh -p 5678")
#result <- readRDS(paste0(ecflow_output_dir, '/result.Rds'))
#file.remove(paste0(ecflow_output_dir, '/result.Rds'))