Compute <- function(fun, cube_headers, ..., chunks = 'auto', ncores = 1, cluster = NULL, shared_dir = NULL, ecflow_host = NULL, silent = FALSE, debug = FALSE, wait = TRUE) { library(multiApply, lib.loc = '~nmanuben/multiApply/multiApply.Rcheck') MergeArrays <- startR:::.MergeArrays # Check input headers if ('startR_header' %in% class(cube_headers)) { cube_headers <- list(cube_headers) } if (!all(sapply(lapply(cube_headers, class), function(x) 'startR_header' %in% x))) { stop("All objects passed in 'cube_headers' must be of class 'startR_header', ", "as returned by Start().") } # Check fun if (!is.function(fun)) { stop("Parameter 'fun' must be a function.") } # Check cores if (!is.numeric(ncores)) { stop("Parameter 'ncores' must be a numeric value.") } ncores <- round(ncores) # Check cluster on_cluster <- !is.null(cluster) default_cluster <- list(queue_host = NULL, node_memory = NULL, cores_per_job = 1, job_wallclock = '01:00:00', max_jobs = 6) if (!is.null(cluster)) { if (!is.list(cluster)) { stop("Parameter 'cluster' must be a named list.") } if (is.null(names(cluster))) { stop("Parameter 'cluster' must be a named list.") } if (any(!(names(cluster) %in% c('queue_host', 'node_memory', 'cores_per_job', 'job_wallclock', 'max_jobs')))) { stop("Found invalid component names in parameter 'cluster'.") } default_cluster[names(cluster)] <- cluster } cluster <- default_cluster if (on_cluster) { if (Sys.which('ecflow_client') == '') { stop("ecFlow must be installed in order to run the computation on clusters.") } if (is.null(cluster[['queue_host']])) { stop("The component 'queue_host' of the parameter 'cluster' must be specified.") } } # Check shared_dir suite_id <- sample(10 ^ 10, 1) shared_dir_suite <- '' if (on_cluster) { if (is.null(shared_dir)) { stop("Parameter 'shared_dir' must be specified when a cluster is specified.") } else { shared_dir_suite <- paste0(shared_dir, '/STARTR_CHUNKING_', suite_id, '/') dir.create(shared_dir_suite, recursive = TRUE) if (!dir.exists(shared_dir_suite)) stop("Could not find or create the directory in ", "parameter 'shared_dir'.") } } # Check silent if (!is.logical(silent)) { stop("Parameter 'silent' must be logical.") } # Check debug if (!is.logical(debug)) { stop("Parameter 'debug' must be logical.") } if (silent) { debug <- FALSE } # Check wait if (!is.logical(wait)) { stop("Parameter 'wait' must be logical.") } # Work out chunked dimensions and target dimensions all_dims <- lapply(cube_headers, attr, 'Dimensions') all_dims_merged <- NULL for (i in all_dims) { if (is.null(all_dims_merged)) { all_dims_merged <- i } else { all_dims_merged <- startR:::.MergeArrayDims(all_dims_merged, i)[[1]] } } all_dimnames <- names(all_dims_merged) target_dims_indices <- which(all_dimnames %in% unlist(attr(fun, 'target_dims'))) target_dims <- NULL if (length(target_dims_indices) > 0) { target_dims <- all_dimnames[target_dims_indices] } chunked_dims <- all_dimnames if (length(target_dims_indices) > 0) { chunked_dims <- chunked_dims[-target_dims_indices] } if (length(chunked_dims) < 1) { stop("Not possible to process input by chunks. All input dimensions are ", "target dimensions.") } if (length(cube_headers) != length(attr(fun, 'target_dims'))) { stop("Number of inputs in parameter 'cube_headers' must be equal to the ", "number of inputs expected by the function 'fun'.") } # Check all input headers have matching dimensions cube_index <- 1 for (cube_header in cube_headers) { if (!all(attr(cube_header, 'Dimensions') == unlist(all_dims)[names(attr(cube_header, 'Dimensions'))])) { stop("All provided 'cube_headers' must have matching dimension lengths ", "with each other.") } if (!all(attr(fun, 'target_dims')[[cube_index]] %in% names(attr(cube_header, 'Dimensions')))) { stop("All provided 'cube_headers' must contain at least the target dimensions ", "expected by 'fun'.") } cube_index <- cube_index + 1 # work out expected result dimensions } # Check chunks default_chunks <- as.list(rep(1, length(chunked_dims))) names(default_chunks) <- chunked_dims if (length(chunks) == 1 && chunks == 'auto') { chunks <- default_chunks } if (!is.list(chunks)) { stop("Parameter 'chunks' must be a named list or 'auto'.") } if (is.null(names(chunks))) { stop("Parameter 'chunks' must be a named list or 'auto'.") } if (any(!(names(chunks) %in% chunked_dims))) { stop("All names in parameter 'chunks' must be one of the non-target dimensions ", "present in the cubes in 'cube_headers'. The target dimensions are ", paste(paste0("'", target_dims, "'"), collapse = ', '), ". The non-target ", "dimensions (margins) are ", paste(paste0("'", chunked_dims, "'"), collapse = ', '), ".") } if (any(!(((unlist(chunks) %% 1) == 0) | (unlist(chunks) == 'all')))) { stop("All values in parameter 'chunks' must take a numeric value or 'all'.") } if (any(unlist(chunks) < 1)) { stop("All values in parameter 'chunks' must be >= 1.") } for (chunk_spec in 1:length(chunks)) { if (chunks[[chunk_spec]] > unlist(all_dims)[names(chunks)[chunk_spec]]) { stop("Too many chunks requested for the dimension ", names(chunks)[chunk_spec], ". Maximum allowed is ", unlist(all_dims)[names(chunks)[chunk_spec]]) } } default_chunks[names(chunks)] <- chunks chunks <- default_chunks # Check fun if (!('startR_step' %in% class(fun))) { stop("Parameter 'fun' must be of the class 'startR_step', as returned ", "by the function Step.") } # Replace 'all's chunks_all <- which(unlist(chunks) == 'all') if (length(chunks_all) > 0) { chunks[chunks_all] <- all_dims[names(chunks)[chunks_all]] } # Mount the ecFlow suite if (on_cluster) { # Copy load_process_save_chunk.R into shared folder chunk_script <- file(system.file('chunking/load_process_save_chunk.R', package = 'startR')) chunk_script_lines <- readLines(chunk_script) close(chunk_script) chunk_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', paste(deparse(shared_dir_suite), collapse = '\n')), chunk_script_lines) chunk_script_lines <- gsub('^debug <- *', paste0('debug <- ', paste(deparse(debug), collapse = '\n')), chunk_script_lines) deparsed_calls <- paste0('start_calls <- list(') for (cube_header in 1:length(cube_headers)) { deparsed_calls <- paste0(deparsed_calls, '\nquote(', paste(deparse(cube_headers[[cube_header]]), collapse = '\n'), ')') if (cube_header < length(cube_headers)) { deparsed_calls <- paste0(deparsed_calls, ', ') } } deparsed_calls <- paste0(deparsed_calls, '\n)') chunk_script_lines <- gsub('^start_calls <- *', deparsed_calls, chunk_script_lines) chunk_script_lines <- gsub('^start_calls_attrs <- *', paste0('start_calls_attrs <- ', paste(deparse(lapply(cube_headers, attributes)), collapse = '\n')), chunk_script_lines) chunk_script_lines <- gsub('^param_dimnames <- *', paste0('param_dimnames <- ', paste(deparse(chunked_dims), collapse = '\n')), chunk_script_lines) chunk_script_lines <- gsub('^cores_per_job <- *', paste0('cores_per_job <- ', cluster[['cores_per_job']]), chunk_script_lines) chunk_script_lines <- gsub('^fun <- *', paste0('fun <- ', paste(deparse(fun), collapse = '\n')), chunk_script_lines) chunk_script_lines <- gsub('^params <- *', paste0('params <- ', paste(deparse(list(...)), collapse = '\n')), chunk_script_lines) writeLines(chunk_script_lines, paste0(shared_dir_suite, '/load_process_save_chunk.R')) # Copy Chunk.ecf into shared folder chunk_ecf_script <- file(system.file('chunking/Chunk.ecf', package = 'startR')) chunk_ecf_script_lines <- readLines(chunk_ecf_script) close(chunk_ecf_script) ecf_vars <- paste(paste0('%', as.vector(sapply(chunked_dims, function(x) { c(toupper(x), paste0(toupper(x), '_N')) })), '%'), collapse = ' ') chunk_ecf_script_lines <- gsub('^Rscript load_process_save_chunk.R --args *', paste0('Rscript load_process_save_chunk.R --args ', ecf_vars), chunk_ecf_script_lines) writeLines(chunk_ecf_script_lines, paste0(shared_dir_suite, '/Chunk.ecf')) # Copy merge_chunks.R into tmp folder # merge_script <- file(system.file('chunking/merge_chunks.R', # package = 'startR')) # merge_script_lines <- readLines(merge_script) # close(merge_script) # merge_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', # paste(deparse(shared_dir_suite), collapse = '\n')), merge_script_lines) # writeLines(merge_script_lines, paste0(shared_dir_suite, '/merge_chunks.R')) # Copy Merge.ecf into tmp folder #TODO: Modify chain of parameters sent to r script when merging #chunks progressively # merge_ecf_script <- file(system.file('chunking/Merge.ecf', # package = 'startR')) # merge_ecf_script_lines <- readLines(merge_ecf_script) # close(merge_ecf_script) # writeLines(merge_ecf_script_lines, paste0(shared_dir_suite, '/Merge.ecf')) # Copy headers file.copy(system.file('chunking/head.h', package = 'startR'), shared_dir_suite) file.copy(system.file('chunking/tail.h', package = 'startR'), shared_dir_suite) file.copy(system.file('chunking/slurm.h', package = 'startR'), shared_dir_suite) } add_line <- function(suite, line, tabs) { c(suite, paste0(paste(rep(' ', tabs), collapse = ''), line)) } suite <- NULL tabs <- 0 suite <- add_line(suite, paste0('suite STARTR_CHUNKING_', suite_id), tabs) tabs <- tabs + 2 suite <- add_line(suite, paste0("edit ECF_HOME '", shared_dir_suite, "'"), tabs) suite <- add_line(suite, paste0("edit ECF_FILES '", shared_dir_suite, "'"), tabs) suite <- add_line(suite, paste0("edit ECF_INCLUDE '", shared_dir_suite, "'"), tabs) suite <- add_line(suite, paste0("edit CORES_PER_JOB ", cluster[['cores_per_job']], ""), tabs) suite <- add_line(suite, paste0("edit JOB_WALLCLOCK '", cluster[['job_wallclock']], "'"), tabs) suite <- add_line(suite, paste0("limit max_jobs ", cluster[['max_jobs']]), tabs) suite <- add_line(suite, paste0("inlimit max_jobs"), tabs) suite <- add_line(suite, paste0("edit HOST '", cluster[['queue_host']], "'"), tabs) suite <- add_line(suite, "edit ECF_JOB_CMD 'ssh %HOST% 'sbatch %ECF_JOB% > %ECF_JOBOUT% 2>&1 &''", tabs) suite <- add_line(suite, "family computation", tabs) tabs <- tabs + 2 if (!silent) { startR:::.message(paste0("Processing chunks... ", "remaining time estimate soon...")) } time_before_first_chunk <- Sys.time() time_after_first_chunk <- NULL # Open nested ecFlow families for (i in length(chunked_dims):1) { suite <- add_line(suite, paste0('family ', chunked_dims[i], '_CHUNK_', 1), tabs) tabs <- tabs + 2 suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), ' ', 1), tabs) suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), '_N ', chunks[[chunked_dims[i]]]), tabs) } # Iterate through chunks chunk_array <- array(1:prod(unlist(chunks)), dim = (unlist(chunks))) arrays_of_results <- vector('list', length(attr(fun, 'output_dims'))) names(arrays_of_results) <- names(attr(fun, 'output_dims')) for (component in 1:length(arrays_of_results)) { arrays_of_results[[component]] <- vector('list', prod((unlist(chunks)))) dim(arrays_of_results[[component]]) <- (unlist(chunks)) } previous_chunk_indices <- rep(1, length(chunks)) found_first_result <- FALSE for (i in 1:length(chunk_array)) { chunk_indices <- which(chunk_array == i, arr.ind = TRUE)[1, ] names(chunk_indices) <- names(dim(chunk_array)) # ADD CHUNK SCRIPT TO SUITE families_to_jump <- which(chunk_indices != previous_chunk_indices) if (length(families_to_jump) > 0) { families_to_jump <- max(families_to_jump) # Close ecFlow families for (j in 1:families_to_jump) { tabs <- tabs - 2 suite <- add_line(suite, paste0('endfamily'), tabs) } # Open ecFlow families for (j in families_to_jump:1) { suite <- add_line(suite, paste0('family ', (chunked_dims)[j], '_CHUNK_', chunk_indices[j]), tabs) tabs <- tabs + 2 suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), ' ', chunk_indices[j]), tabs) suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), '_N ', chunks[[(chunked_dims)[j]]]), tabs) } } suite <- add_line(suite, "task Chunk", tabs) if (!on_cluster) { if (!silent) { startR:::.message(paste("Loading chunk", i, "out of", length(chunk_array), "...")) } data <- vector('list', length(cube_headers)) for (input in 1:length(data)) { start_call <- cube_headers[[input]] dims_to_alter <- which(names(attr(start_call, 'Dimensions')) %in% names(chunks)) names_dims_to_alter <- names(attr(start_call, 'Dimensions'))[dims_to_alter] # If any dimension comes from split dimensions split_dims <- attr(start_call, 'SplitDims') for (k in 1:length(split_dims)) { if (any(names(split_dims[[k]]) %in% names_dims_to_alter)) { chunks_split_dims <- rep(1, length(split_dims[[k]])) names(chunks_split_dims) <- names(split_dims[[k]]) chunks_indices_split_dims <- chunks_split_dims split_dims_to_alter <- which(names(split_dims[[k]]) %in% names_dims_to_alter) chunks_split_dims[split_dims_to_alter] <- unlist(chunks[names(split_dims[[k]])[split_dims_to_alter]]) chunks_indices_split_dims[split_dims_to_alter] <- chunk_indices[names(split_dims[[k]])[split_dims_to_alter]] start_call[[names(split_dims)[k]]] <- chunk(chunks_indices_split_dims, chunks_split_dims, eval(start_call[[names(split_dims)[k]]])) dims_to_alter_to_remove <- which(names_dims_to_alter %in% names(split_dims[[k]])) if (length(dims_to_alter_to_remove) > 0) { dims_to_alter <- dims_to_alter[-dims_to_alter_to_remove] names_dims_to_alter <- names_dims_to_alter[-dims_to_alter_to_remove] } } } if (length(dims_to_alter) > 0) { for (call_dim in names(attr(start_call, 'Dimensions'))[dims_to_alter]) { start_call[[call_dim]] <- chunk(chunk_indices[call_dim], chunks[[call_dim]], eval(start_call[[call_dim]])) } } start_call[['silent']] <- !debug data[[input]] <- eval(start_call) } if (!silent) { startR:::.message(paste("Processing...")) } result <- multiApply::Apply(data, target_dims = attr(fun, 'target_dims'), AtomicFun = deparse(substitute(fun)), ..., output_dims = attr(fun, 'output_dims'), parallel = ncores > 1, ncores = ncores) if (!found_first_result) { names(arrays_of_results) <- names(result) found_first_result <- TRUE } for (component in 1:length(result)) { arrays_of_results[[component]][[i]] <- result[[component]] } rm(data) gc() } # Time estimate if (!on_cluster) { if (is.null(time_after_first_chunk)) { time_after_first_chunk <- Sys.time() if (!silent) { estimate <- (time_after_first_chunk - time_before_first_chunk) * (length(chunk_array) - 1) units(estimate) <- 'mins' startR:::.message( paste0("Remaining time estimate (at ", format(time_after_first_chunk), ") ", "(neglecting merge time): ", format(estimate)) ) } } } previous_chunk_indices <- chunk_indices } # Close nested ecFlow families for (i in length(chunked_dims):1) { tabs <- tabs - 2 suite <- add_line(suite, paste0('endfamily'), tabs) } # Close the ecFlow suite tabs <- tabs - 2 suite <- add_line(suite, paste0('endfamily'), tabs) # suite <- add_line(suite, "family merge", tabs) # tabs <- tabs + 2 # suite <- add_line(suite, "trigger computation == complete", tabs) # suite <- add_line(suite, "edit ECF_JOB_CMD 'bash %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'", tabs) # suite <- add_line(suite, "task Merge", tabs) # tabs <- tabs - 2 # suite <- add_line(suite, paste0('endfamily'), tabs) tabs <- tabs - 2 suite <- add_line(suite, "endsuite", tabs) # Run ecFlow suite if needed if (on_cluster) { suite_file <- paste0(shared_dir_suite, '/startR_chunking.def') suite_file_o <- file(suite_file) writeLines(suite, suite_file_o) close(suite_file_o) default_ecflow_host <- list(name = 'localhost', port = '5678') if (is.null(ecflow_host)) { .warning("Parameter 'ecflow_host' has not been specified but execution on ", "cluster has been requested. An ecFlow server instance will ", "be created on localhost:5678.") } else { default_ecflow_host[names(ecflow_host)] <- ecflow_host } ecflow_host <- default_ecflow_host system(paste0("ecflow_start.sh -p ", ecflow_host[['port']])) system(paste0("ecflow_client --load=", suite_file, " --host=", ecflow_host[['name']], " --port=", ecflow_host[['port']])) system(paste0("ecflow_client --begin=STARTR_CHUNKING_", suite_id, " --host=", ecflow_host[['name']], " --port=", ecflow_host[['port']])) startr_exec <- list(cluster = cluster, ecflow_host = ecflow_host, shared_dir = shared_dir, suite_id = suite_id) class(startr_exec) <- 'startR_exec' if (wait) { if (!silent) { while (is.null(time_after_first_chunk)) { if (any(grepl('.*\\.Rds$', list.files(shared_dir_suite)))) { time_after_first_chunk <- Sys.time() estimate <- (time_after_first_chunk - time_before_first_chunk) * ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) / cluster[['max_jobs']]) units(estimate) <- 'mins' startR:::.message( paste0('Remaining time estimate (neglecting queue and ', 'merge time) (at ', format(time_after_first_chunk), '): ', format(estimate)) ) } } } result <- Collect(startr_exec, wait = TRUE) startR:::.message("Computation ended successfully.") result } else { startr_exec } } else { for (component in 1:length(arrays_of_results)) { arrays_of_results[[component]] <- startR:::.MergeArrayOfArrays(arrays_of_results[[component]]) } startR:::.message("Computation ended successfully.") arrays_of_results } #TODO: check result dimensions match expected dimensions }