Newer
Older
ByChunks <- function(fun, cube_headers, ...,
chunks = 'auto',
silent = FALSE, debug = FALSE){
MergeArrays <- startR:::.MergeArrays
# Check input headers
if ('startR_header' %in% class(cube_headers)) {
cube_headers <- list(cube_headers)
}
if (!all(sapply(lapply(cube_headers, class),
function(x) 'startR_header' %in% x))) {
stop("All objects passed in 'cube_headers' must be of class 'startR_header', ",
"as returned by Start().")
}
# Check fun
if (!is.function(fun)) {
stop("Parameter 'fun' must be a function.")
}
# Check cluster
on_cluster <- !is.null(cluster)
default_cluster <- list(queue_host = 'bsceslogin01.bsc.es',
node_memory = NULL,
cores_per_node = 1,
max_jobs = 6)
if (!is.null(cluster)) {
if (!is.list(cluster)) {
stop("Parameter 'cluster' must be a named list.")
}
if (is.null(names(cluster))) {
stop("Parameter 'cluster' must be a named list.")
}
if (any(!(names(cluster) %in% c('queue_host', 'node_memory', 'cores_per_node', 'max_jobs')))) {
stop("Found invalid component names in parameter 'cluster'.")
}
default_cluster[names(cluster)] <- cluster
}
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
if (on_cluster) {
if (Sys.which('ecflow_client') == '') {
stop("ecFlow must be installed in order to run the computation on clusters.")
}
}
# Check shared_dir
if (on_cluster) {
if (is.null(shared_dir)) {
stop("Parameter 'shared_dir' must be specified when a cluster is specified.")
} else {
dir.create(shared_dir, recursive = TRUE)
if (!dir.exists(shared_dir)) stop("Could not find or create the directory in ",
"parameter 'shared_dir'.")
}
}
# Check silent
if (!is.logical(silent)) {
stop("Parameter 'silent' must be logical.")
}
# Check debug
if (!is.logical(debug)) {
stop("Parameter 'debug' must be logical.")
}
if (silent) {
debug <- FALSE
}
# Work out chunked dimensions and target dimensions
all_dims <- lapply(cube_headers, attr, 'Dimensions')
all_dims_merged <- NULL
for (i in all_dims) {
if (is.null(all_dims_merged)) {
all_dims_merged <- startR:::.MergeArrayDims(all_dims_merged, all_dims[i])[[1]]
}
}
all_dimnames <- names(all_dims_merged)
target_dims_indices <- which(all_dimnames %in% unlist(attr(fun, 'target_dims')))
target_dims <- NULL
if (length(target_dims_indices) > 0) {
target_dims <- all_dimnames[target_dims_indices]
}
chunked_dims <- all_dimnames
if (length(target_dims_indices) > 0) {
chunked_dims <- chunked_dims[-target_dims_indices]
}
if (length(chunked_dims) < 1) {
stop("Not possible to process input by chunks. All input dimensions are ",
"target dimensions.")
}
if (length(cube_headers) != length(attr(fun, 'target_dims'))) {
stop("Number of inputs in parameter 'cube_headers' must be equal to the ",
"number of inputs expected by the function 'fun'.")
}
# Check all input headers have matching dimensions
cube_index <- 1
for (cube_header in cube_headers) {
if (!all(attr(cube_header, 'Dimensions') == unlist(all_dims)[names(attr(cube_header, 'Dimensions'))])) {
stop("All provided 'cube_headers' must have matching dimension lengths ",
"with each other.")
}
if (!all(attr(fun, 'target_dims')[[cube_index]] %in% names(attr(cube_header, 'Dimensions')))) {
stop("All provided 'cube_headers' must contain at least the target dimensions ",
"expected by 'fun'.")
}
cube_index <- cube_index + 1
# work out expected result dimensions
}
# Check chunks
default_chunks <- as.list(rep(1, length(chunked_dims)))
names(default_chunks) <- chunked_dims
chunks <- default_chunks
}
if (!is.list(chunks)) {
stop("Parameter 'chunks' must be a named list or 'auto'.")
}
if (is.null(names(chunks))) {
stop("Parameter 'chunks' must be a named list or 'auto'.")
}
stop("All names in parameter 'chunks' must be one of the non-target dimensions ",
"present in the cubes in 'cube_headers'. The target dimensions are ",
paste(paste0("'", target_dims, "'"), collapse = ', '), ". The non-target ",
"dimensions (margins) are ", paste(paste0("'", chunked_dims, "'"), collapse = ', '), ".")
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
}
if (any(!(((unlist(chunks) %% 1) == 0) | (unlist(chunks) == 'all')))) {
stop("All values in parameter 'chunks' must take a numeric value or 'all'.")
}
if (any(unlist(chunks) < 1)) {
stop("All values in parameter 'chunks' must be >= 1.")
}
for (chunk_spec in 1:length(chunks)) {
if (chunks[[chunk_spec]] > unlist(all_dims)[names(chunks)[chunk_spec]]) {
stop("Too many chunks requested for the dimension ", names(chunks)[chunk_spec],
". Maximum allowed is ", unlist(all_dims)[names(chunks)[chunk_spec]])
}
}
default_chunks[names(chunks)] <- chunks
chunks <- default_chunks
# Check fun
if (!('startR_step' %in% class(fun))) {
stop("Parameter 'fun' must be of the class 'startR_step', as returned ",
"by the function Step.")
}
# Replace 'all's
chunks_all <- which(unlist(chunks) == 'all')
if (length(chunks_all) > 0) {
chunks[chunks_all] <- all_dims[names(chunks)[chunks_all]]
}
# Mount the ecFlow suite
if (on_cluster) {
# Copy load_process_save_chunk.R into shared folder
chunk_script <- file(system.file('chunking/load_process_save_chunk.R',
package = 'startR'))
chunk_script_lines <- readLines(chunk_script)
chunk_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ',
paste(deparse(shared_dir), collapse = '\n')), chunk_script_lines)
chunk_script_lines <- gsub('^debug <- *', paste0('debug <- ', paste(deparse(debug), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^start_calls <- *', paste0('start_calls <- ', paste(deparse(cube_headers), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^param_dimnames <- *', paste0('param_dimnames <- ', paste(deparse(chunked_dims), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^cores_per_node <- *', paste0('cores_per_node <- ', cluster[['cores_per_node']]),
chunk_script_lines)
chunk_script_lines <- gsub('^fun <- *', paste0('fun <- ', paste(deparse(fun), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^params <- *', paste0('params <- ', paste(deparse(list(...)), collapse = '\n')),
chunk_script_lines)
writeLines(chunk_script_lines, paste0(shared_dir, '/load_process_save_chunk.R'))
# Copy Chunk.ecf into shared folder
chunk_ecf_script <- file(system.file('chunking/Chunk.ecf',
package = 'startR'))
chunk_ecf_script_lines <- readLines(chunk_ecf_script)
ecf_vars <- paste(paste0('%', as.vector(sapply(chunked_dims,
function(x) {
c(toupper(x), paste0(toupper(x), '_N'))
})), '%'), collapse = ' ')
chunk_ecf_script_lines <- gsub('^Rscript load_process_save_chunk.R --args *',
paste0('Rscript load_process_save_chunk.R --args ', ecf_vars),
chunk_ecf_script_lines)
writeLines(chunk_ecf_script_lines, paste0(shared_dir, '/Chunk.ecf'))
# Copy merge_chunks.R into tmp folder
merge_script <- file(system.file('chunking/merge_chunks.R',
package = 'startR'))
merge_script_lines <- readLines(merge_script)
merge_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ',
paste(deparse(shared_dir), collapse = '\n')), merge_script_lines)
writeLines(merge_script_lines, paste0(shared_dir, '/merge_chunks.R'))
# Copy Merge.ecf into tmp folder
#TODO: Modify chain of parameters sent to r script when merging
#chunks progressively
merge_ecf_script <- file(system.file('chunking/Merge.ecf',
package = 'startR'))
merge_ecf_script_lines <- readLines(merge_ecf_script)
writeLines(merge_ecf_script_lines, paste0(shared_dir, '/Merge.ecf'))
# Copy headers
file.copy(system.file('chunking/head.h', package = 'startR'),
shared_dir)
file.copy(system.file('chunking/tail.h', package = 'startR'),
shared_dir)
file.copy(system.file('chunking/slurm.h', package = 'startR'),
shared_dir)
}
add_line <- function(suite, line, tabs) {
c(suite, paste0(paste(rep(' ', tabs), collapse = ''), line))
}
suite <- NULL
tabs <- 0
suite <- add_line(suite, 'suite STARTR_CHUNKING', tabs)
tabs <- tabs + 2
suite <- add_line(suite, paste0("edit ECF_HOME '", shared_dir, "'"), tabs)
suite <- add_line(suite, paste0("edit ECF_FILES '", shared_dir, "'"), tabs)
suite <- add_line(suite, paste0("edit ECF_INCLUDE '", shared_dir, "'"), tabs)
suite <- add_line(suite, paste0("limit max_jobs ", cluster[['max_jobs']]), tabs)
suite <- add_line(suite, paste0("inlimit max_jobs"), tabs)
suite <- add_line(suite, paste0("edit HOST '", cluster[['queue_host']], "'"), tabs)
suite <- add_line(suite, "edit ECF_JOB_CMD 'ssh %HOST% 'sbatch %ECF_JOB% > %ECF_JOBOUT% 2>&1 &''", tabs)
suite <- add_line(suite, "family computation", tabs)
tabs <- tabs + 2
if (!silent) {
startR:::.message(paste0("Processing chunks... ",
"remaining time estimate soon..."))
}
time_before_first_chunk <- Sys.time()
time_after_first_chunk <- NULL
# Empty list to collect results
result <- vector('list', length(attr(fun, 'output_dims')))
# Open nested ecFlow families
for (i in length(chunked_dims):1) {
suite <- add_line(suite, paste0('family ', chunked_dims[i], '_', 1), tabs)
suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), ' ', 1), tabs)
suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), '_N ', chunks[[chunked_dims[i]]]), tabs)
chunk_array <- array(1:prod(unlist(chunks)), dim = (unlist(chunks)))
array_of_results <- vector('list', prod((unlist(chunks))))
dim(array_of_results) <- (unlist(chunks))
previous_chunk_indices <- rep(1, length(chunks))
for (i in 1:length(chunk_array)) {
chunk_indices <- which(chunk_array == i, arr.ind = TRUE)[1, ]
names(chunk_indices) <- names(dim(chunk_array))
# ADD CHUNK SCRIPT TO SUITE
families_to_jump <- which(chunk_indices != previous_chunk_indices)
if (length(families_to_jump) > 0) {
families_to_jump <- max(families_to_jump)
# Close ecFlow families
for (j in 1:families_to_jump) {
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
}
# Open ecFlow families
for (j in families_to_jump:1) {
suite <- add_line(suite, paste0('family ', (chunked_dims)[j], '_', chunk_indices[j]), tabs)
tabs <- tabs + 2
suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), ' ', chunk_indices[j]), tabs)
suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), '_N ', chunks[[(chunked_dims)[j]]]), tabs)
}
}
suite <- add_line(suite, "task Chunk", tabs)
if (!on_cluster) {
if (!silent) {
startR:::.message(paste("Loading chunk", i,
"out of", length(chunk_array), "..."))
}
data <- vector('list', length(cube_headers))
for (input in 1:length(data)) {
start_call <- cube_headers[[input]]
dims_to_alter <- which(names(attr(start_call, 'Dimensions')) %in% names(chunks))
if (length(dims_to_alter) > 0) {
for (call_dim in names(attr(start_call, 'Dimensions'))[dims_to_alter]) {
start_call[[call_dim]] <- chunk(chunk_indices[call_dim], chunks[[call_dim]],
eval(start_call[[call_dim]]))
}
data[[input]] <- eval(start_call)
}
if (!silent) {
startR:::.message(paste("Processing..."))
}
array_of_results[[i]] <- list(output1 = multiApply::Apply(data,
inverse_margins = attr(fun, 'target_dims'),
parallel = cluster[['cores_per_node']] > 1,
ncores = ncores)
)
rm(data)
gc()
}
# Time estimate
if (!on_cluster) {
if (is.null(time_after_first_chunk)) {
time_after_first_chunk <- Sys.time()
if (!silent) {
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
(length(chunk_array) - 1)
units(estimate) <- 'mins'
startR:::.message(
paste0("Remaining time estimate (at ", format(time_after_first_chunk), ") ",
"(neglecting merge time): ", format(estimate))
)
}
# Close nested ecFlow families
for (i in length(chunked_dims):1) {
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
}
# Close the ecFlow suite
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
suite <- add_line(suite, "family merge", tabs)
tabs <- tabs + 2
suite <- add_line(suite, "trigger computation == complete", tabs)
suite <- add_line(suite, "edit ECF_JOB_CMD 'bash %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'", tabs)
suite <- add_line(suite, "task Merge", tabs)
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
tabs <- tabs - 2
suite <- add_line(suite, "endsuite", tabs)
# Run ecFlow suite if needed
if (on_cluster) {
suite_file <- paste0(shared_dir, '/startR_chunking.def')
suite_file_o <- file(suite_file)
writeLines(suite, suite_file_o)
close(suite_file_o)
system("ecflow_start.sh -p 5678")
system(paste0("ecflow_client --load=", suite_file, ' --port=5678'))
system("ecflow_client --begin=STARTR_CHUNKING --port=5678")
if (!silent) {
while (is.null(time_after_first_chunk)) {
if (any(grepl('.*\\.Rds$', list.files(shared_dir)))) {
time_after_first_chunk <- Sys.time()
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
cluster[['max_jobs']])
units(estimate) <- 'mins'
startR:::.message(
paste0('Remaining time estimate (neglecting queue and ',
'merge time) (at ', format(time_after_first_chunk),
'): ', format(estimate))
)
}
}
}
done <- FALSE
while (!done) {
Sys.sleep(1)
status <- system("ecflow_client --get_state=STARTR_CHUNKING --port=5678",
intern = TRUE)
if (any(grepl("suite STARTR_CHUNKING #.* state:complete", status))) {
done <- TRUE
}
}
system("ecflow_client --delete=force yes /STARTR_CHUNKING --port=5678")
#system("ecflow_client --shutdown --port=5678")
#system("ecflow_stop.sh -p 5678")
result <- readRDS(paste0(shared_dir, '/result.Rds'))
file.remove(paste0(shared_dir, '/result.Rds'))
startR:::.message("Computation ended successfully.")
result
} else {
array_of_results <- startR:::.MergeArrayOfArrays(array_of_results)
startR:::.message("Computation ended successfully.")
array_of_results