Newer
Older
Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
if (!('startR_exec' %in% class(startr_exec))) {
stop("Parameter 'startr_exec' must be an object of the class ",
if (Sys.which('ecflow_client') == '') {
stop("ecFlow must be installed in order to collect results from a ",
"Compute() execution.")
}
ecflow_server <- startr_exec[['ecflow_server']]
ecflow_suite_dir <- startr_exec[['ecflow_suite_dir']]
timings <- startr_exec[['timings']]
ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_',
suite_id, '/')
remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']],
'/STARTR_CHUNKING_',
find_task_name <- function(received_file) {
file_name <- received_file
parts <- strsplit(file_name, '__')[[1]]
parts <- parts[c(2:(length(parts) - 1))]
chunk_indices <- rev(sapply(parts, function(x) {
as.numeric(strsplit(x, '_')[[1]][2])
}))
task_pattern <- paste(paste0('*_', chunk_indices, '/'),
collapse = '')
task_glob <- paste0(ecflow_suite_dir_suite, '/*/*/',
task_pattern)
task_path <- Sys.glob(task_glob)
if (length(task_path) != 1) {
stop("Unexpected error while receiving results.")
}
task_name <- strsplit(task_path, 'computation')[[1]][2]
task_name <- paste0('/STARTR_CHUNKING_', suite_id,
'/computation', task_name)
task_name
}
sum_received_chunks <- 0
if (cluster[['bidirectional']]) {
t_transfer_back <- NA
} else {
t_transfer_back <- 0
}
time_before_first_chunk <- startr_exec[['t_begin_first_chunk']]
first_chunk_received <- FALSE
rsync_petition_file_lines <- c('+ *.Rds', '+ *.timings', '+ *.crashed',
'+ *.running', '- *')
rsync_petition_file <- tempfile()
writeLines(rsync_petition_file_lines, rsync_petition_file)
failed <- FALSE
if (cluster[['bidirectional']]) {
status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_",
suite_id, " --host=",
ecflow_server[['host']], " --port=", ecflow_server[['port']]),
intern = TRUE)
if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) {
done <- TRUE
} else if (!wait) {
stop("Computation in progress...")
}
if (!first_chunk_received) {
if (any(grepl('state:complete', status))) {
if (!is.null(time_before_first_chunk)) {
time_after_first_chunk <- Sys.time()
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
cluster[['max_jobs']])
units(estimate) <- 'mins'
startR:::.message(
paste0('Remaining time estimate (neglecting queue and ',
'merge time) (at ', format(time_after_first_chunk),
'): ', format(estimate), ' (',
format(time_after_first_chunk -
time_before_first_chunk), ' per chunk)')
)
}
first_chunk_received <- TRUE
}
Sys.sleep(min(sqrt(attempt), 5))
} else {
#if (sum_received_chunks == 0) {
# # Accounting for the fist chunk received in ByChunks and
# # setting it to complete
# # ByChunks needs the first chunk to calculate remaining time
# received_files <- list.files(ecflow_suite_dir_suite)
# received_chunks <- received_files[grepl('Rds$',
# received_files)]
#}
t_begin_transfer_back <- Sys.time()
system(paste0("rsync -rav --include-from=", rsync_petition_file, " '",
cluster[['queue_host']], ":", remote_ecflow_suite_dir_suite, "' ",
}, error = function(e) {
message("Warning: rsync from remote server to collect results failed. ",
"Retrying soon.")
failed <- TRUE
})
t_end_transfer_back <- Sys.time()
t_transfer_back <- t_transfer_back + as.numeric(difftime(t_end_transfer_back,
t_begin_transfer_back, units = 'secs'))
#if (sum_received_chunks == 0) {
# rsync_output <- c(rsync_output, received_chunks)
#}
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
received_running <- grepl('running$', rsync_output)
for (received_chunk_index in which(received_running)) {
file_name <- rsync_output[received_chunk_index]
task_name <- find_task_name(file_name)
system(paste0('ecflow_client --force=active recursive ',
task_name,
" --host=", ecflow_server[['host']],
" --port=", ecflow_server[['port']]))
}
received_crashed <- grepl('crashed$', rsync_output)
for (received_chunk_index in which(received_crashed)) {
file_name <- rsync_output[received_chunk_index]
task_name <- find_task_name(file_name)
system(paste0('ecflow_client --force=aborted recursive ',
task_name,
" --host=", ecflow_server[['host']],
" --port=", ecflow_server[['port']]))
}
received_chunks <- grepl('Rds$', rsync_output)
for (received_chunk_index in which(received_chunks)) {
file_name <- rsync_output[received_chunk_index]
task_name <- find_task_name(file_name)
system(paste0('ecflow_client --force=complete recursive ',
task_name,
" --host=", ecflow_server[['host']],
" --port=", ecflow_server[['port']]))
sum_received_chunks <- sum_received_chunks + 1
if (!first_chunk_received) {
if (!is.null(time_before_first_chunk)) {
time_after_first_chunk <- Sys.time()
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
cluster[['max_jobs']])
units(estimate) <- 'mins'
startR:::.message(
paste0('Remaining time estimate (neglecting queue and ',
'merge time) (at ', format(time_after_first_chunk),
'): ', format(estimate), ' (',
format(time_after_first_chunk -
time_before_first_chunk), ' per chunk)')
)
}
first_chunk_received <- TRUE
}
if (sum_received_chunks / num_outputs == prod(unlist(chunks))) {
done <- TRUE
} else if (!wait) {
stop("Computation in progress...")
}
}
Sys.sleep(cluster[['polling_period']])
file.remove(rsync_petition_file)
timings[['transfer_back']] <- t_transfer_back
if (!is.null(cluster[['temp_dir']])) {
system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ',
if (remove) {
.warning("ATTENTION: The source chunks will be removed from the ",
"system. Store the result after Collect() ends if needed.")
}
t_begin_merge <- Sys.time()
result <- startR:::.MergeChunks(ecflow_suite_dir, suite_id, remove)
t_end_merge <- Sys.time()
timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs'))
received_files <- list.files(ecflow_suite_dir_suite, full.names = TRUE)
received_timings_files <- received_files[grepl('timings$', received_files)]
for (timings_file in received_timings_files) {
times <- readRDS(timings_file)
timings[['queue']] <- c(timings[['queue']], times['queue'])
timings[['job_setup']] <- c(timings[['job_setup']], times['job_setup'])
timings[['load']] <- c(timings[['load']], times['load'])
timings[['compute']] <- c(timings[['compute']], times['compute'])
}
if (remove) {
system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_",
suite_id, " --host=", ecflow_server[['host']],
" --port=", ecflow_server[['port']]))
if (attempt > 2) {
t_end_total <- Sys.time()
timings[['total']] <- as.numeric(difftime(t_end_total, timings[['total']], units = 'secs'))
} else {
# When attempt <= 2, it means all results were ready possibly from
# long ago, so is not straightfowrard to work out total time.
timings[['total']] <- NA
}
message(paste0("* Computation ended successfully."))
message(paste0("* Number of chunks: ",
timings[['nchunks']]))
message(paste0("* Max. number of concurrent chunks (jobs): ",
timings[['concurrent_chunks']]))
message(paste0("* Requested cores per job: ",
timings[['cores_per_job']]))
message(paste0("* Load threads per chunk: ",
timings[['threads_load']]))
message(paste0("* Compute threads per chunk: ",
timings[['threads_compute']]))
message(paste0("* Total time (s): ",
timings[['total']]))
message(paste0("* Chunking setup: ",
timings[['bychunks_setup']]))
message(paste0("* Data upload to cluster: ",
timings[['transfer']]))
message(paste0("* All chunks: ",
timings[['total']] -
timings[['bychunks_setup']] -
timings[['transfer']] -
timings[['transfer_back']] -
timings[['merge']]))
message(paste0("* Transfer results from cluster: ",
timings[['transfer_back']]))
message(paste0("* Merge: ",
timings[['merge']]))
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
message(paste0("* Each chunk: "))
message(paste0("* queue: "))
message(paste0("* mean: ",
mean(timings[['queue']])))
message(paste0("* min: ",
min(timings[['queue']])))
message(paste0("* max: ",
max(timings[['queue']])))
message(paste0("* job setup: "))
message(paste0("* mean: ",
mean(timings[['job_setup']])))
message(paste0("* min: ",
min(timings[['job_setup']])))
message(paste0("* max: ",
max(timings[['job_setup']])))
message(paste0("* load: "))
message(paste0("* mean: ",
mean(timings[['load']])))
message(paste0("* min: ",
min(timings[['load']])))
message(paste0("* max: ",
max(timings[['load']])))
message(paste0("* compute: "))
message(paste0("* mean: ",
mean(timings[['compute']])))
message(paste0("* min: ",
min(timings[['compute']])))
message(paste0("* max: ",
max(timings[['compute']])))
#system("ecflow_client --shutdown --port=5678")
#system("ecflow_stop.sh -p 5678")
#result <- readRDS(paste0(ecflow_output_dir, '/result.Rds'))
#file.remove(paste0(ecflow_output_dir, '/result.Rds'))
attr(result, 'startR_compute_profiling') <- timings