Newer
Older
Compute <- function(fun, cube_headers, ..., chunks = 'auto',
ncores = 1, cluster = NULL, shared_dir = NULL,
ecflow_host = NULL, silent = FALSE, debug = FALSE,
wait = TRUE) {
library(multiApply, lib.loc = '~nmanuben/multiApply/multiApply.Rcheck')
MergeArrays <- startR:::.MergeArrays
# Check input headers
if ('startR_header' %in% class(cube_headers)) {
cube_headers <- list(cube_headers)
}
if (!all(sapply(lapply(cube_headers, class),
function(x) 'startR_header' %in% x))) {
stop("All objects passed in 'cube_headers' must be of class 'startR_header', ",
"as returned by Start().")
}
# Check fun
if (!is.function(fun)) {
stop("Parameter 'fun' must be a function.")
}
# Check cores
if (!is.numeric(ncores)) {
stop("Parameter 'ncores' must be a numeric value.")
}
ncores <- round(ncores)
cores_per_job = 1,
job_wallclock = '01:00:00',
if (!is.null(cluster)) {
if (!is.list(cluster)) {
stop("Parameter 'cluster' must be a named list.")
}
if (is.null(names(cluster))) {
stop("Parameter 'cluster' must be a named list.")
}
if (any(!(names(cluster) %in% c('queue_host', 'node_memory', 'cores_per_job', 'job_wallclock', 'max_jobs')))) {
stop("Found invalid component names in parameter 'cluster'.")
}
default_cluster[names(cluster)] <- cluster
}
if (on_cluster) {
if (Sys.which('ecflow_client') == '') {
stop("ecFlow must be installed in order to run the computation on clusters.")
}
if (is.null(cluster[['queue_host']])) {
stop("The component 'queue_host' of the parameter 'cluster' must be specified.")
}
suite_id <- sample(10 ^ 10, 1)
shared_dir_suite <- ''
if (on_cluster) {
if (is.null(shared_dir)) {
stop("Parameter 'shared_dir' must be specified when a cluster is specified.")
} else {
shared_dir_suite <- paste0(shared_dir, '/STARTR_CHUNKING_', suite_id, '/')
dir.create(shared_dir_suite, recursive = TRUE)
if (!dir.exists(shared_dir_suite)) stop("Could not find or create the directory in ",
"parameter 'shared_dir'.")
}
}
# Check silent
if (!is.logical(silent)) {
stop("Parameter 'silent' must be logical.")
}
# Check debug
if (!is.logical(debug)) {
stop("Parameter 'debug' must be logical.")
}
if (silent) {
debug <- FALSE
}
# Check wait
if (!is.logical(wait)) {
stop("Parameter 'wait' must be logical.")
}
# Work out chunked dimensions and target dimensions
all_dims <- lapply(cube_headers, attr, 'Dimensions')
all_dims_merged <- NULL
for (i in all_dims) {
if (is.null(all_dims_merged)) {
all_dims_merged <- startR:::.MergeArrayDims(all_dims_merged, i)[[1]]
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
}
}
all_dimnames <- names(all_dims_merged)
target_dims_indices <- which(all_dimnames %in% unlist(attr(fun, 'target_dims')))
target_dims <- NULL
if (length(target_dims_indices) > 0) {
target_dims <- all_dimnames[target_dims_indices]
}
chunked_dims <- all_dimnames
if (length(target_dims_indices) > 0) {
chunked_dims <- chunked_dims[-target_dims_indices]
}
if (length(chunked_dims) < 1) {
stop("Not possible to process input by chunks. All input dimensions are ",
"target dimensions.")
}
if (length(cube_headers) != length(attr(fun, 'target_dims'))) {
stop("Number of inputs in parameter 'cube_headers' must be equal to the ",
"number of inputs expected by the function 'fun'.")
}
# Check all input headers have matching dimensions
cube_index <- 1
for (cube_header in cube_headers) {
if (!all(attr(cube_header, 'Dimensions') == unlist(all_dims)[names(attr(cube_header, 'Dimensions'))])) {
stop("All provided 'cube_headers' must have matching dimension lengths ",
"with each other.")
}
if (!all(attr(fun, 'target_dims')[[cube_index]] %in% names(attr(cube_header, 'Dimensions')))) {
stop("All provided 'cube_headers' must contain at least the target dimensions ",
"expected by 'fun'.")
}
cube_index <- cube_index + 1
# work out expected result dimensions
}
# Check chunks
default_chunks <- as.list(rep(1, length(chunked_dims)))
names(default_chunks) <- chunked_dims
chunks <- default_chunks
}
if (!is.list(chunks)) {
stop("Parameter 'chunks' must be a named list or 'auto'.")
}
if (is.null(names(chunks))) {
stop("Parameter 'chunks' must be a named list or 'auto'.")
}
stop("All names in parameter 'chunks' must be one of the non-target dimensions ",
"present in the cubes in 'cube_headers'. The target dimensions are ",
paste(paste0("'", target_dims, "'"), collapse = ', '), ". The non-target ",
"dimensions (margins) are ", paste(paste0("'", chunked_dims, "'"), collapse = ', '), ".")
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
}
if (any(!(((unlist(chunks) %% 1) == 0) | (unlist(chunks) == 'all')))) {
stop("All values in parameter 'chunks' must take a numeric value or 'all'.")
}
if (any(unlist(chunks) < 1)) {
stop("All values in parameter 'chunks' must be >= 1.")
}
for (chunk_spec in 1:length(chunks)) {
if (chunks[[chunk_spec]] > unlist(all_dims)[names(chunks)[chunk_spec]]) {
stop("Too many chunks requested for the dimension ", names(chunks)[chunk_spec],
". Maximum allowed is ", unlist(all_dims)[names(chunks)[chunk_spec]])
}
}
default_chunks[names(chunks)] <- chunks
chunks <- default_chunks
# Check fun
if (!('startR_step' %in% class(fun))) {
stop("Parameter 'fun' must be of the class 'startR_step', as returned ",
"by the function Step.")
}
# Replace 'all's
chunks_all <- which(unlist(chunks) == 'all')
if (length(chunks_all) > 0) {
chunks[chunks_all] <- all_dims[names(chunks)[chunks_all]]
}
# Mount the ecFlow suite
if (on_cluster) {
# Copy load_process_save_chunk.R into shared folder
chunk_script <- file(system.file('chunking/load_process_save_chunk.R',
package = 'startR'))
chunk_script_lines <- readLines(chunk_script)
chunk_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ',
paste(deparse(shared_dir_suite), collapse = '\n')), chunk_script_lines)
chunk_script_lines <- gsub('^debug <- *', paste0('debug <- ', paste(deparse(debug), collapse = '\n')),
chunk_script_lines)
deparsed_calls <- paste0('start_calls <- list(')
for (cube_header in 1:length(cube_headers)) {
deparsed_calls <- paste0(deparsed_calls, '\nquote(',
paste(deparse(cube_headers[[cube_header]]), collapse = '\n'),
')')
if (cube_header < length(cube_headers)) {
deparsed_calls <- paste0(deparsed_calls, ', ')
}
}
deparsed_calls <- paste0(deparsed_calls, '\n)')
chunk_script_lines <- gsub('^start_calls <- *', deparsed_calls, chunk_script_lines)
chunk_script_lines <- gsub('^start_calls_dims <- *', paste0('start_calls_dims <- ', paste(deparse(lapply(cube_headers, attr, 'Dimensions')), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^param_dimnames <- *', paste0('param_dimnames <- ', paste(deparse(chunked_dims), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^cores_per_job <- *', paste0('cores_per_job <- ', cluster[['cores_per_job']]),
chunk_script_lines)
chunk_script_lines <- gsub('^fun <- *', paste0('fun <- ', paste(deparse(fun), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^params <- *', paste0('params <- ', paste(deparse(list(...)), collapse = '\n')),
chunk_script_lines)
writeLines(chunk_script_lines, paste0(shared_dir_suite, '/load_process_save_chunk.R'))
# Copy Chunk.ecf into shared folder
chunk_ecf_script <- file(system.file('chunking/Chunk.ecf',
package = 'startR'))
chunk_ecf_script_lines <- readLines(chunk_ecf_script)
ecf_vars <- paste(paste0('%', as.vector(sapply(chunked_dims,
function(x) {
c(toupper(x), paste0(toupper(x), '_N'))
})), '%'), collapse = ' ')
chunk_ecf_script_lines <- gsub('^Rscript load_process_save_chunk.R --args *',
paste0('Rscript load_process_save_chunk.R --args ', ecf_vars),
chunk_ecf_script_lines)
writeLines(chunk_ecf_script_lines, paste0(shared_dir_suite, '/Chunk.ecf'))
# Copy merge_chunks.R into tmp folder
# merge_script <- file(system.file('chunking/merge_chunks.R',
# package = 'startR'))
# merge_script_lines <- readLines(merge_script)
# close(merge_script)
# merge_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ',
# paste(deparse(shared_dir_suite), collapse = '\n')), merge_script_lines)
# writeLines(merge_script_lines, paste0(shared_dir_suite, '/merge_chunks.R'))
# Copy Merge.ecf into tmp folder
#TODO: Modify chain of parameters sent to r script when merging
#chunks progressively
# merge_ecf_script <- file(system.file('chunking/Merge.ecf',
# package = 'startR'))
# merge_ecf_script_lines <- readLines(merge_ecf_script)
# close(merge_ecf_script)
# writeLines(merge_ecf_script_lines, paste0(shared_dir_suite, '/Merge.ecf'))
# Copy headers
file.copy(system.file('chunking/head.h', package = 'startR'),
file.copy(system.file('chunking/tail.h', package = 'startR'),
file.copy(system.file('chunking/slurm.h', package = 'startR'),
}
add_line <- function(suite, line, tabs) {
c(suite, paste0(paste(rep(' ', tabs), collapse = ''), line))
}
suite <- NULL
tabs <- 0
suite <- add_line(suite, paste0('suite STARTR_CHUNKING_', suite_id), tabs)
suite <- add_line(suite, paste0("edit ECF_HOME '", shared_dir_suite, "'"), tabs)
suite <- add_line(suite, paste0("edit ECF_FILES '", shared_dir_suite, "'"), tabs)
suite <- add_line(suite, paste0("edit ECF_INCLUDE '", shared_dir_suite, "'"), tabs)
suite <- add_line(suite, paste0("edit CORES_PER_JOB ", cluster[['cores_per_job']], ""), tabs)
suite <- add_line(suite, paste0("edit JOB_WALLCLOCK '", cluster[['job_wallclock']], "'"), tabs)
suite <- add_line(suite, paste0("limit max_jobs ", cluster[['max_jobs']]), tabs)
suite <- add_line(suite, paste0("inlimit max_jobs"), tabs)
suite <- add_line(suite, paste0("edit HOST '", cluster[['queue_host']], "'"), tabs)
suite <- add_line(suite, "edit ECF_JOB_CMD 'ssh %HOST% 'sbatch %ECF_JOB% > %ECF_JOBOUT% 2>&1 &''", tabs)
suite <- add_line(suite, "family computation", tabs)
tabs <- tabs + 2
if (!silent) {
startR:::.message(paste0("Processing chunks... ",
"remaining time estimate soon..."))
}
time_before_first_chunk <- Sys.time()
time_after_first_chunk <- NULL
# Open nested ecFlow families
suite <- add_line(suite, paste0('family ', chunked_dims[i], '_CHUNK_', 1), tabs)
suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), ' ', 1), tabs)
suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), '_N ', chunks[[chunked_dims[i]]]), tabs)
chunk_array <- array(1:prod(unlist(chunks)), dim = (unlist(chunks)))
arrays_of_results <- vector('list', length(attr(fun, 'output_dims')))
names(arrays_of_results) <- names(attr(fun, 'output_dims'))
for (component in 1:length(arrays_of_results)) {
arrays_of_results[[component]] <- vector('list', prod((unlist(chunks))))
dim(arrays_of_results[[component]]) <- (unlist(chunks))
}
for (i in 1:length(chunk_array)) {
chunk_indices <- which(chunk_array == i, arr.ind = TRUE)[1, ]
names(chunk_indices) <- names(dim(chunk_array))
# ADD CHUNK SCRIPT TO SUITE
families_to_jump <- which(chunk_indices != previous_chunk_indices)
if (length(families_to_jump) > 0) {
families_to_jump <- max(families_to_jump)
# Close ecFlow families
for (j in 1:families_to_jump) {
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
}
# Open ecFlow families
for (j in families_to_jump:1) {
suite <- add_line(suite, paste0('family ', (chunked_dims)[j], '_CHUNK_', chunk_indices[j]), tabs)
tabs <- tabs + 2
suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), ' ', chunk_indices[j]), tabs)
suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), '_N ', chunks[[(chunked_dims)[j]]]), tabs)
}
}
suite <- add_line(suite, "task Chunk", tabs)
if (!on_cluster) {
if (!silent) {
startR:::.message(paste("Loading chunk", i,
"out of", length(chunk_array), "..."))
}
data <- vector('list', length(cube_headers))
for (input in 1:length(data)) {
start_call <- cube_headers[[input]]
dims_to_alter <- which(names(attr(start_call, 'Dimensions')) %in% names(chunks))
if (length(dims_to_alter) > 0) {
for (call_dim in names(attr(start_call, 'Dimensions'))[dims_to_alter]) {
start_call[[call_dim]] <- chunk(chunk_indices[call_dim], chunks[[call_dim]],
eval(start_call[[call_dim]]))
}
data[[input]] <- eval(start_call)
}
if (!silent) {
startR:::.message(paste("Processing..."))
}
result <- multiApply::Apply(data,
target_dims = attr(fun, 'target_dims'),
AtomicFun = deparse(substitute(fun)), ...,
output_dims = attr(fun, 'output_dims'),
parallel = ncores > 1,
ncores = ncores)
if (!found_first_result) {
names(arrays_of_results) <- names(result)
found_first_result <- TRUE
}
for (component in 1:length(result)) {
arrays_of_results[[component]][[i]] <- result[[component]]
}
rm(data)
gc()
}
# Time estimate
if (!on_cluster) {
if (is.null(time_after_first_chunk)) {
time_after_first_chunk <- Sys.time()
if (!silent) {
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
(length(chunk_array) - 1)
units(estimate) <- 'mins'
startR:::.message(
paste0("Remaining time estimate (at ", format(time_after_first_chunk), ") ",
"(neglecting merge time): ", format(estimate))
)
}
# Close nested ecFlow families
for (i in length(chunked_dims):1) {
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
}
# Close the ecFlow suite
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
# suite <- add_line(suite, "family merge", tabs)
# tabs <- tabs + 2
# suite <- add_line(suite, "trigger computation == complete", tabs)
# suite <- add_line(suite, "edit ECF_JOB_CMD 'bash %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'", tabs)
# suite <- add_line(suite, "task Merge", tabs)
# tabs <- tabs - 2
# suite <- add_line(suite, paste0('endfamily'), tabs)
tabs <- tabs - 2
suite <- add_line(suite, "endsuite", tabs)
# Run ecFlow suite if needed
if (on_cluster) {
suite_file <- paste0(shared_dir_suite, '/startR_chunking.def')
suite_file_o <- file(suite_file)
writeLines(suite, suite_file_o)
close(suite_file_o)
default_ecflow_host <- list(name = 'localhost', port = '5678')
if (is.null(ecflow_host)) {
.warning("Parameter 'ecflow_host' has not been specified but execution on ",
"cluster has been requested. An ecFlow server instance will ",
"be created on localhost:5678.")
} else {
default_ecflow_host[names(ecflow_host)] <- ecflow_host
}
ecflow_host <- default_ecflow_host
system(paste0("ecflow_start.sh -p ", ecflow_host[['port']]))
system(paste0("ecflow_client --load=", suite_file, " --host=",
ecflow_host[['name']], " --port=", ecflow_host[['port']]))
system(paste0("ecflow_client --begin=STARTR_CHUNKING_", suite_id,
" --host=", ecflow_host[['name']], " --port=",
ecflow_host[['port']]))
startr_exec <- list(cluster = cluster, ecflow_host = ecflow_host,
shared_dir = shared_dir, suite_id = suite_id)
class(startr_exec) <- 'startR_exec'
if (wait) {
if (!silent) {
while (is.null(time_after_first_chunk)) {
if (any(grepl('.*\\.Rds$', list.files(shared_dir_suite)))) {
time_after_first_chunk <- Sys.time()
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
cluster[['max_jobs']])
units(estimate) <- 'mins'
startR:::.message(
paste0('Remaining time estimate (neglecting queue and ',
'merge time) (at ', format(time_after_first_chunk),
'): ', format(estimate))
)
}
result <- Collect(startr_exec, wait = TRUE)
startR:::.message("Computation ended successfully.")
result
} else {
startr_exec
for (component in 1:length(arrays_of_results)) {
arrays_of_results[[component]] <- startR:::.MergeArrayOfArrays(arrays_of_results[[component]])
}