Newer
Older
ByChunks <- function(fun, cube_headers, ..., chunks = 'auto',
MergeArrays <- startR:::.MergeArrays
# Check input headers
if ('startR_header' %in% class(cube_headers)) {
cube_headers <- list(cube_headers)
}
if (!all(sapply(lapply(cube_headers, class),
function(x) 'startR_header' %in% x))) {
stop("All objects passed in 'cube_headers' must be of class 'startR_header', ",
"as returned by Start().")
}
# Check fun
if (!is.function(fun)) {
stop("Parameter 'fun' must be a function.")
}
# Check cores
if (!is.numeric(ncores)) {
stop("Parameter 'ncores' must be a numeric value.")
}
ncores <- round(ncores)
cores_per_job = 1,
job_wallclock = '01:00:00',
if (!is.null(cluster)) {
if (!is.list(cluster)) {
stop("Parameter 'cluster' must be a named list.")
}
if (is.null(names(cluster))) {
stop("Parameter 'cluster' must be a named list.")
}
if (any(!(names(cluster) %in% c('queue_host', 'node_memory', 'cores_per_job', 'job_wallclock', 'max_jobs')))) {
stop("Found invalid component names in parameter 'cluster'.")
}
default_cluster[names(cluster)] <- cluster
}
if (on_cluster) {
if (Sys.which('ecflow_client') == '') {
stop("ecFlow must be installed in order to run the computation on clusters.")
}
if (is.null(cluster[['queue_host']])) {
stop("The component 'queue_host' of the parameter 'cluster' must be specified.")
}
}
# Check shared_dir
if (on_cluster) {
if (is.null(shared_dir)) {
stop("Parameter 'shared_dir' must be specified when a cluster is specified.")
} else {
shared_dir <- paste0(shared_dir, '/STARTR_CHUNKING_', suite_id, '/')
dir.create(shared_dir, recursive = TRUE)
if (!dir.exists(shared_dir)) stop("Could not find or create the directory in ",
"parameter 'shared_dir'.")
}
}
# Check ecflow_host
if (is.null(ecflow_host)) {
if (on_cluster) {
}
}
# Check silent
if (!is.logical(silent)) {
stop("Parameter 'silent' must be logical.")
}
# Check debug
if (!is.logical(debug)) {
stop("Parameter 'debug' must be logical.")
}
if (silent) {
debug <- FALSE
}
# Work out chunked dimensions and target dimensions
all_dims <- lapply(cube_headers, attr, 'Dimensions')
all_dims_merged <- NULL
for (i in all_dims) {
if (is.null(all_dims_merged)) {
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
all_dims_merged <- startR:::.MergeArrayDims(all_dims_merged, all_dims[i])[[1]]
}
}
all_dimnames <- names(all_dims_merged)
target_dims_indices <- which(all_dimnames %in% unlist(attr(fun, 'target_dims')))
target_dims <- NULL
if (length(target_dims_indices) > 0) {
target_dims <- all_dimnames[target_dims_indices]
}
chunked_dims <- all_dimnames
if (length(target_dims_indices) > 0) {
chunked_dims <- chunked_dims[-target_dims_indices]
}
if (length(chunked_dims) < 1) {
stop("Not possible to process input by chunks. All input dimensions are ",
"target dimensions.")
}
if (length(cube_headers) != length(attr(fun, 'target_dims'))) {
stop("Number of inputs in parameter 'cube_headers' must be equal to the ",
"number of inputs expected by the function 'fun'.")
}
# Check all input headers have matching dimensions
cube_index <- 1
for (cube_header in cube_headers) {
if (!all(attr(cube_header, 'Dimensions') == unlist(all_dims)[names(attr(cube_header, 'Dimensions'))])) {
stop("All provided 'cube_headers' must have matching dimension lengths ",
"with each other.")
}
if (!all(attr(fun, 'target_dims')[[cube_index]] %in% names(attr(cube_header, 'Dimensions')))) {
stop("All provided 'cube_headers' must contain at least the target dimensions ",
"expected by 'fun'.")
}
cube_index <- cube_index + 1
# work out expected result dimensions
}
# Check chunks
default_chunks <- as.list(rep(1, length(chunked_dims)))
names(default_chunks) <- chunked_dims
chunks <- default_chunks
}
if (!is.list(chunks)) {
stop("Parameter 'chunks' must be a named list or 'auto'.")
}
if (is.null(names(chunks))) {
stop("Parameter 'chunks' must be a named list or 'auto'.")
}
stop("All names in parameter 'chunks' must be one of the non-target dimensions ",
"present in the cubes in 'cube_headers'. The target dimensions are ",
paste(paste0("'", target_dims, "'"), collapse = ', '), ". The non-target ",
"dimensions (margins) are ", paste(paste0("'", chunked_dims, "'"), collapse = ', '), ".")
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
}
if (any(!(((unlist(chunks) %% 1) == 0) | (unlist(chunks) == 'all')))) {
stop("All values in parameter 'chunks' must take a numeric value or 'all'.")
}
if (any(unlist(chunks) < 1)) {
stop("All values in parameter 'chunks' must be >= 1.")
}
for (chunk_spec in 1:length(chunks)) {
if (chunks[[chunk_spec]] > unlist(all_dims)[names(chunks)[chunk_spec]]) {
stop("Too many chunks requested for the dimension ", names(chunks)[chunk_spec],
". Maximum allowed is ", unlist(all_dims)[names(chunks)[chunk_spec]])
}
}
default_chunks[names(chunks)] <- chunks
chunks <- default_chunks
# Check fun
if (!('startR_step' %in% class(fun))) {
stop("Parameter 'fun' must be of the class 'startR_step', as returned ",
"by the function Step.")
}
# Replace 'all's
chunks_all <- which(unlist(chunks) == 'all')
if (length(chunks_all) > 0) {
chunks[chunks_all] <- all_dims[names(chunks)[chunks_all]]
}
# Mount the ecFlow suite
if (on_cluster) {
# Copy load_process_save_chunk.R into shared folder
chunk_script <- file(system.file('chunking/load_process_save_chunk.R',
package = 'startR'))
chunk_script_lines <- readLines(chunk_script)
chunk_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ',
paste(deparse(shared_dir), collapse = '\n')), chunk_script_lines)
chunk_script_lines <- gsub('^debug <- *', paste0('debug <- ', paste(deparse(debug), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^start_calls <- *', paste0('start_calls <- ', paste(suppressWarnings(deparse(cube_headers, control = 'quoteExpressions')), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^start_calls_dims <- *', paste0('start_calls_dims <- ', paste(deparse(lapply(cube_headers, attr, 'Dimensions')), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^param_dimnames <- *', paste0('param_dimnames <- ', paste(deparse(chunked_dims), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^cores_per_job <- *', paste0('cores_per_job <- ', cluster[['cores_per_job']]),
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
chunk_script_lines)
chunk_script_lines <- gsub('^fun <- *', paste0('fun <- ', paste(deparse(fun), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^params <- *', paste0('params <- ', paste(deparse(list(...)), collapse = '\n')),
chunk_script_lines)
writeLines(chunk_script_lines, paste0(shared_dir, '/load_process_save_chunk.R'))
# Copy Chunk.ecf into shared folder
chunk_ecf_script <- file(system.file('chunking/Chunk.ecf',
package = 'startR'))
chunk_ecf_script_lines <- readLines(chunk_ecf_script)
ecf_vars <- paste(paste0('%', as.vector(sapply(chunked_dims,
function(x) {
c(toupper(x), paste0(toupper(x), '_N'))
})), '%'), collapse = ' ')
chunk_ecf_script_lines <- gsub('^Rscript load_process_save_chunk.R --args *',
paste0('Rscript load_process_save_chunk.R --args ', ecf_vars),
chunk_ecf_script_lines)
writeLines(chunk_ecf_script_lines, paste0(shared_dir, '/Chunk.ecf'))
# Copy merge_chunks.R into tmp folder
merge_script <- file(system.file('chunking/merge_chunks.R',
package = 'startR'))
merge_script_lines <- readLines(merge_script)
merge_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ',
paste(deparse(shared_dir), collapse = '\n')), merge_script_lines)
writeLines(merge_script_lines, paste0(shared_dir, '/merge_chunks.R'))
# Copy Merge.ecf into tmp folder
#TODO: Modify chain of parameters sent to r script when merging
#chunks progressively
merge_ecf_script <- file(system.file('chunking/Merge.ecf',
package = 'startR'))
merge_ecf_script_lines <- readLines(merge_ecf_script)
writeLines(merge_ecf_script_lines, paste0(shared_dir, '/Merge.ecf'))
# Copy headers
file.copy(system.file('chunking/head.h', package = 'startR'),
shared_dir)
file.copy(system.file('chunking/tail.h', package = 'startR'),
shared_dir)
file.copy(system.file('chunking/slurm.h', package = 'startR'),
shared_dir)
}
add_line <- function(suite, line, tabs) {
c(suite, paste0(paste(rep(' ', tabs), collapse = ''), line))
}
suite <- NULL
tabs <- 0
suite <- add_line(suite, paste0('suite STARTR_CHUNKING_', suite_id), tabs)
tabs <- tabs + 2
suite <- add_line(suite, paste0("edit ECF_HOME '", shared_dir, "'"), tabs)
suite <- add_line(suite, paste0("edit ECF_FILES '", shared_dir, "'"), tabs)
suite <- add_line(suite, paste0("edit ECF_INCLUDE '", shared_dir, "'"), tabs)
suite <- add_line(suite, paste0("edit CORES_PER_JOB ", cluster[['cores_per_job']], ""), tabs)
suite <- add_line(suite, paste0("edit JOB_WALLCLOCK '", cluster[['job_wallclock']], "'"), tabs)
suite <- add_line(suite, paste0("limit max_jobs ", cluster[['max_jobs']]), tabs)
suite <- add_line(suite, paste0("inlimit max_jobs"), tabs)
suite <- add_line(suite, paste0("edit HOST '", cluster[['queue_host']], "'"), tabs)
suite <- add_line(suite, "edit ECF_JOB_CMD 'ssh %HOST% 'sbatch %ECF_JOB% > %ECF_JOBOUT% 2>&1 &''", tabs)
suite <- add_line(suite, "family computation", tabs)
tabs <- tabs + 2
if (!silent) {
startR:::.message(paste0("Processing chunks... ",
"remaining time estimate soon..."))
}
time_before_first_chunk <- Sys.time()
time_after_first_chunk <- NULL
# Empty list to collect results
result <- vector('list', length(attr(fun, 'output_dims')))
# Open nested ecFlow families
suite <- add_line(suite, paste0('family ', chunked_dims[i], '_CHUNK_', 1), tabs)
suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), ' ', 1), tabs)
suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), '_N ', chunks[[chunked_dims[i]]]), tabs)
chunk_array <- array(1:prod(unlist(chunks)), dim = (unlist(chunks)))
array_of_results <- vector('list', prod((unlist(chunks))))
dim(array_of_results) <- (unlist(chunks))
previous_chunk_indices <- rep(1, length(chunks))
for (i in 1:length(chunk_array)) {
chunk_indices <- which(chunk_array == i, arr.ind = TRUE)[1, ]
names(chunk_indices) <- names(dim(chunk_array))
# ADD CHUNK SCRIPT TO SUITE
families_to_jump <- which(chunk_indices != previous_chunk_indices)
if (length(families_to_jump) > 0) {
families_to_jump <- max(families_to_jump)
# Close ecFlow families
for (j in 1:families_to_jump) {
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
}
# Open ecFlow families
for (j in families_to_jump:1) {
suite <- add_line(suite, paste0('family ', (chunked_dims)[j], '_CHUNK_', chunk_indices[j]), tabs)
tabs <- tabs + 2
suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), ' ', chunk_indices[j]), tabs)
suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), '_N ', chunks[[(chunked_dims)[j]]]), tabs)
}
}
suite <- add_line(suite, "task Chunk", tabs)
if (!on_cluster) {
if (!silent) {
startR:::.message(paste("Loading chunk", i,
"out of", length(chunk_array), "..."))
}
data <- vector('list', length(cube_headers))
for (input in 1:length(data)) {
start_call <- cube_headers[[input]]
dims_to_alter <- which(names(attr(start_call, 'Dimensions')) %in% names(chunks))
if (length(dims_to_alter) > 0) {
for (call_dim in names(attr(start_call, 'Dimensions'))[dims_to_alter]) {
start_call[[call_dim]] <- chunk(chunk_indices[call_dim], chunks[[call_dim]],
eval(start_call[[call_dim]]))
}
data[[input]] <- eval(start_call)
}
if (!silent) {
startR:::.message(paste("Processing..."))
}
array_of_results[[i]] <- list(output1 = multiApply::Apply(data,
inverse_margins = attr(fun, 'target_dims'),
rm(data)
gc()
}
# Time estimate
if (!on_cluster) {
if (is.null(time_after_first_chunk)) {
time_after_first_chunk <- Sys.time()
if (!silent) {
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
(length(chunk_array) - 1)
units(estimate) <- 'mins'
startR:::.message(
paste0("Remaining time estimate (at ", format(time_after_first_chunk), ") ",
"(neglecting merge time): ", format(estimate))
)
}
# Close nested ecFlow families
for (i in length(chunked_dims):1) {
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
}
# Close the ecFlow suite
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
suite <- add_line(suite, "family merge", tabs)
tabs <- tabs + 2
suite <- add_line(suite, "trigger computation == complete", tabs)
suite <- add_line(suite, "edit ECF_JOB_CMD 'bash %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'", tabs)
suite <- add_line(suite, "task Merge", tabs)
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
tabs <- tabs - 2
suite <- add_line(suite, "endsuite", tabs)
# Run ecFlow suite if needed
if (on_cluster) {
suite_file <- paste0(shared_dir, '/startR_chunking.def')
suite_file_o <- file(suite_file)
writeLines(suite, suite_file_o)
close(suite_file_o)
if (is.null(ecflow_host)) {
default_ecflow_host <- list(host = 'localhost', port = '5678')
if (is.null(ecflow_host)) {
.warning("Parameter 'ecflow_host' has not been specified but execution on ",
"cluster has been requested. An ecFlow server instance will ",
"be created on localhost:5678.")
} else {
default_ecflow_host[names(ecflow_host)] <- ecflow_host
}
ecflow_host <- default_ecflow_host
system(paste0("ecflow_start.sh -p ", ecflow_host[['port']]))
system(paste0("ecflow_client --load=", suite_file, " --host=",
ecflow_host[['name']], " --port=", ecflow_host[['port']]))
system(paste0("ecflow_client --begin=STARTR_CHUNKING_", suite_id,
" --host=", ecflow_host[['name']], " --port=",
ecflow_host[['port']]))
if (!silent) {
while (is.null(time_after_first_chunk)) {
if (any(grepl('.*\\.Rds$', list.files(shared_dir)))) {
time_after_first_chunk <- Sys.time()
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
cluster[['max_jobs']])
units(estimate) <- 'mins'
startR:::.message(
paste0('Remaining time estimate (neglecting queue and ',
'merge time) (at ', format(time_after_first_chunk),
'): ', format(estimate))
)
}
}
}
done <- FALSE
Sys.sleep(min(sqrt(attempt), 5))
status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_",
suite_id, " --host=",
ecflow_host[['name']], " --port=", ecflow_host[['port']],
intern = TRUE))
if (any(grepl("suite STARTR_CHUNKING #.* state:complete", status))) {
done <- TRUE
}
system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_",
suite_id, " --host=", ecflow_host[['name']],
" --port=", ecflow_host[['port']]))
#system("ecflow_client --shutdown --port=5678")
#system("ecflow_stop.sh -p 5678")
result <- readRDS(paste0(shared_dir, '/result.Rds'))
file.remove(paste0(shared_dir, '/result.Rds'))
startR:::.message("Computation ended successfully.")
result
} else {
array_of_results <- startR:::.MergeArrayOfArrays(array_of_results)
startR:::.message("Computation ended successfully.")
array_of_results