ByChunks <- function(fun, cube_headers, ..., chunks = 'auto', cluster = NULL, shared_dir = NULL, silent = FALSE, debug = FALSE){ MergeArrays <- startR:::.MergeArrays # Check input headers if ('startR_header' %in% class(cube_headers)) { cube_headers <- list(cube_headers) } if (!all(sapply(lapply(cube_headers, class), function(x) 'startR_header' %in% x))) { stop("All objects passed in 'cube_headers' must be of class 'startR_header', " "as returned by Start().") } # Check fun if (!is.function(fun)) { stop("Parameter 'fun' must be a function.") } # Check cluster if (!is.null(cluster)) { if (!is.list(cluster)) { stop("Parameter 'cluster' must be a named list.") } if (is.null(names(cluster))) { stop("Parameter 'cluster' must be a named list.") } if (any(!(names(cluster) %in% c('queue_host', 'node_memory', 'cores_per_node', 'max_jobs')))) { stop("Found invalid component names in parameter 'cluster'.") } default_cluster <- list(queue_host = 'bsceslogin01.bsc.es', node_memory = NULL, cores_per_node = 1, max_jobs = 6) default_cluster[names(cluster)] <- cluster cluster <- default_cluster } on_cluster <- !is.null(cluster) if (on_cluster) { if (Sys.which('ecflow_client') == '') { stop("ecFlow must be installed in order to run the computation on clusters.") } } # Check shared_dir if (on_cluster) { if (is.null(shared_dir)) { stop("Parameter 'shared_dir' must be specified when a cluster is specified.") } else { dir.create(shared_dir, recursive = TRUE) if (!dir.exists(shared_dir)) stop("Could not find or create the directory in ", "parameter 'shared_dir'.") } } # Check silent if (!is.logical(silent)) { stop("Parameter 'silent' must be logical.") } # Check debug if (!is.logical(debug)) { stop("Parameter 'debug' must be logical.") } if (silent) { debug <- FALSE } # Work out chunked dimensions and target dimensions all_dims <- lapply(cube_headers, attr, 'Dimensions') all_dims_merged <- NULL for (i in all_dims) { if (is.null(all_dims_merged)) { all_dims_merged <- startR:::.MergeArrayDims(all_dims_merged, all_dims[i])[[1]] } } all_dimnames <- names(all_dims_merged) target_dims_indices <- which(all_dimnames %in% unlist(attr(fun, 'target_dims'))) target_dims <- NULL if (length(target_dims_indices) > 0) { target_dims <- all_dimnames[target_dims_indices] } chunked_dims <- all_dimnames if (length(target_dims_indices) > 0) { chunked_dims <- chunked_dims[-target_dims_indices] } if (length(chunked_dims) < 1) { stop("Not possible to process input by chunks. All input dimensions are ", "target dimensions.") } if (length(cube_headers) != length(attr(fun, 'target_dims'))) { stop("Number of inputs in parameter 'cube_headers' must be equal to the ", "number of inputs expected by the function 'fun'.") } # Check all input headers have matching dimensions cube_index <- 1 for (cube_header in cube_headers) { if (!all(attr(cube_header, 'Dimensions') == all_dims[names(attr(cube_header, 'Dimensions'))])) { stop("All provided 'cube_headers' must have matching dimension lengths ", "with each other.") } if (!all(attr(fun, 'target_dims')[[cube_index]] %in% attr(cube_header, 'Dimensions'))) { stop("All provided 'cube_headers' must contain at least the target dimensions ", "expected by 'fun'.") } cube_index <- cube_index + 1 # work out expected result dimensions } # Check chunks default_chunks <- as.list(rep(1, length(chunked_dims))) names(default_chunks) <- chunked_dims if (chunks == 'auto') { chunks <- default_chunks } if (!is.list(chunks)) { stop("Parameter 'chunks' must be a named list or 'auto'.") } if (is.null(names(chunks))) { stop("Parameter 'chunks' must be a named list or 'auto'.") } if (any(!(names(chunks) %in% names(chunked_dims)))) { stop("All names in parameter 'chunks' must be one of the non-target dimensions ", "present in the cubes in 'cube_headers'. The target dimensions are ", paste(paste0("'", target_dims, "'"), collapse = ', '), ". The non-target ", "dimensions (margins) are ", paste(paste0("'", chunked_dims, "'")), ".") } if (any(!(((unlist(chunks) %% 1) == 0) | (unlist(chunks) == 'all')))) { stop("All values in parameter 'chunks' must take a numeric value or 'all'.") } if (any(unlist(chunks) < 1)) { stop("All values in parameter 'chunks' must be >= 1.") } for (chunk_spec in 1:length(chunks)) { if (chunks[[chunk_spec]] > unlist(all_dims)[names(chunks)[chunk_spec]]) { stop("Too many chunks requested for the dimension ", names(chunks)[chunk_spec], ". Maximum allowed is ", unlist(all_dims)[names(chunks)[chunk_spec]]) } } default_chunks[names(chunks)] <- chunks chunks <- default_chunks # Check fun if (!('startR_step' %in% class(fun))) { stop("Parameter 'fun' must be of the class 'startR_step', as returned ", "by the function Step.") } # Replace 'all's chunks_all <- which(unlist(chunks) == 'all') if (length(chunks_all) > 0) { chunks[chunks_all] <- all_dims[names(chunks)[chunks_all]] } # Mount the ecFlow suite if (on_cluster) { # Copy load_process_save_chunk.R into shared folder chunk_script <- file(system.file('chunking/load_process_save_chunk.R', package = 'startR')) chunk_script_lines <- readLines(chunk_script) chunk_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', paste(deparse(shared_dir), collapse = '\n')), chunk_script_lines) chunk_script_lines <- gsub('^debug <- *', paste0('debug <- ', paste(deparse(debug), collapse = '\n')), chunk_script_lines) chunk_script_lines <- gsub('^start_calls <- *', paste0('start_calls <- ', paste(deparse(cube_headers), collapse = '\n')), chunk_script_lines) chunk_script_lines <- gsub('^param_dimnames <- *', paste0('param_dimnames <- ', paste(deparse(chunked_dims), collapse = '\n')), chunk_script_lines) chunk_script_lines <- gsub('^cores_per_node <- *', paste0('cores_per_node <- ', cluster[['cores_per_node']]), chunk_script_lines) chunk_script_lines <- gsub('^fun <- *', paste0('fun <- ', paste(deparse(fun), collapse = '\n')), chunk_script_lines) chunk_script_lines <- gsub('^params <- *', paste0('params <- ', paste(deparse(list(...)), collapse = '\n')), chunk_script_lines) writeLines(chunk_script_lines, paste0(shared_dir, '/load_process_save_chunk.R')) # Copy Chunk.ecf into shared folder chunk_ecf_script <- file(system.file('chunking/Chunk.ecf', package = 'startR')) chunk_ecf_script_lines <- readLines(chunk_ecf_script) ecf_vars <- paste(paste0('%', as.vector(sapply(chunked_dims, function(x) { c(toupper(x), paste0(toupper(x), '_N')) })), '%'), collapse = ' ') chunk_ecf_script_lines <- gsub('^Rscript load_process_save_chunk.R --args *', paste0('Rscript load_process_save_chunk.R --args ', ecf_vars), chunk_ecf_script_lines) writeLines(chunk_ecf_script_lines, paste0(shared_dir, '/Chunk.ecf')) # Copy merge_chunks.R into tmp folder merge_script <- file(system.file('chunking/merge_chunks.R', package = 'startR')) merge_script_lines <- readLines(merge_script) merge_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', paste(deparse(shared_dir), collapse = '\n')), merge_script_lines) writeLines(merge_script_lines, paste0(shared_dir, '/merge_chunks.R')) # Copy Merge.ecf into tmp folder #TODO: Modify chain of parameters sent to r script when merging #chunks progressively merge_ecf_script <- file(system.file('chunking/Merge.ecf', package = 'startR')) merge_ecf_script_lines <- readLines(merge_ecf_script) writeLines(merge_ecf_script_lines, paste0(shared_dir, '/Merge.ecf')) # Copy headers file.copy(system.file('chunking/head.h', package = 'startR'), shared_dir) file.copy(system.file('chunking/tail.h', package = 'startR'), shared_dir) file.copy(system.file('chunking/slurm.h', package = 'startR'), shared_dir) } add_line <- function(suite, line, tabs) { c(suite, paste0(paste(rep(' ', tabs), collapse = ''), line)) } suite <- NULL tabs <- 0 suite <- add_line(suite, 'suite STARTR_CHUNKING', tabs) tabs <- tabs + 2 suite <- add_line(suite, paste0("edit ECF_HOME '", shared_dir, "'"), tabs) suite <- add_line(suite, paste0("edit ECF_FILES '", shared_dir, "'"), tabs) suite <- add_line(suite, paste0("edit ECF_INCLUDE '", shared_dir, "'"), tabs) suite <- add_line(suite, paste0("limit max_jobs ", cluster[['max_jobs']]), tabs) suite <- add_line(suite, paste0("inlimit max_jobs"), tabs) suite <- add_line(suite, paste0("edit HOST '", cluster[['queue_host']], "'"), tabs) suite <- add_line(suite, "edit ECF_JOB_CMD 'ssh %HOST% 'sbatch %ECF_JOB% > %ECF_JOBOUT% 2>&1 &''", tabs) suite <- add_line(suite, "family computation", tabs) tabs <- tabs + 2 if (!silent) { startR:::.message(paste0("Processing chunks... ", "remaining time estimate soon...")) } time_before_first_chunk <- Sys.time() time_after_first_chunk <- NULL # Empty list to collect results result <- vector('list', length(attr(fun, 'output_dims'))) # Open nested ecFlow families for (i in 1:length(chunked_dims)) { suite <- add_line(suite, paste0('family ', names(chunked_dims)[i], '_', 1), tabs) tabs <- tabs + 2 suite <- add_line(suite, paste0('edit ', toupper(names(chunked_dims)[i]), ' ', 1), tabs) suite <- add_line(suite, paste0('edit ', toupper(names(chunked_dims)[i]), '_N ', chunks[[names(chunked_dims)[i]]]), tabs) } # Iterate through chunks chunk_array <- array(1:prod(unlist(chunks)), dim = rev(unlist(chunks))) previous_chunk_indices <- NULL for (i in 1:length(chunk_array)) { chunk_indices <- which(chunk_array == i, arr.ind = TRUE) # ADD CHUNK SCRIPT TO SUITE suite <- add_line(suite, "task Chunk", tabs) if (!on_cluster) { if (!silent) { startR:::.message(paste("Loading chunk", i, "out of", length(chunk_array), "...")) } data <- vector('list', length(cube_headers)) for (input in 1:length(data)) { start_call <- cube_headers[[input]] for (call_dim in names(attr(cube_headers[[input]], 'Dimensions'))) { start_call[[call_dim]] <- chunk(chunk_indices[[call_dim]], chunks[[call_dim]], start_call[[call_dim]]) } data[[input]] <- eval(start_call) } if (!silent) { startR:::.message(paste("Processing...")) } res <- multiApply::Apply(data, inverse_margins = attr(fun, 'target_dims'), AtomicFun = fun, ..., parallel = cluster[['cores_per_node']] > 1, ncores = cluster[['cores_per_node']]) rm(data) gc() } # Close ecFlow families + merge results if (!is.null(previous_chunk_indices)) { families_to_jump <- max(which(chunk_indices != previous_chunk_indices)) for (j in 1:families_to_jump) { tabs <- tabs - 2 suite <- add_line(suite, paste0('endfamily'), tabs) if (!on_cluster) { # Merge results for (component in 1:length(res)) { if (chunks[[names(rev(chunked_dims))[j]]] > 1) { result[[i]] <- MergeArrays(result[[i]], res[[i]], names(rev(chunked_dims))[j]) } else { result[[i]] <- res } rm(res) gc() } # Time estimate if (is.null(time_after_first_chunk)) { time_after_first_chunk <- Sys.time() if (!silent) { estimate <- (time_after_first_chunk - time_before_first_chunk) * (length(chunk_array) - 1) units(estimate) <- 'mins' startR:::.message( paste0("Remaining time estimate (at ", format(time_after_first_chunk), "): ", format(estimate)) ) } } } } for (j in families_to_jump:1) { suite <- add_line(suite, paste0('family ', names(rev(chunked_dims))[j], '_', chunk_indices[j]), tabs) tabs <- tabs + 2 suite <- add_line(suite, paste0('edit ', toupper(names(rev(chunked_dims))[j]), ' ', chunk_indices[j]), tabs) suite <- add_line(suite, paste0('edit ', toupper(names(rev(chunked_dims))[j]), '_N ', chunks[[names(rev(chunked_dims))[j]]]), tabs) } } } # Close nested ecFlow families for (i in length(chunked_dims):1) { tabs <- tabs - 2 suite <- add_line(suite, paste0('endfamily'), tabs) } # Close the ecFlow suite suite <- add_line(suite, "family merge", tabs) tabs <- tabs + 2 suite <- add_line(suite, "trigger computation == complete", tabs) suite <- add_line(suite, "edit ECF_JOB_CMD 'bash %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'", tabs) suite <- add_line(suite, "task Merge", tabs) tabs <- tabs - 2 suite <- add_line(suite, paste0('endfamily'), tabs) tabs <- tabs - 2 suite <- add_line(suite, "endsuite", tabs) # Run ecFlow suite if needed if (on_cluster) { suite_file <- paste0(shared_dir, '/startR_chunking.def') suite_file_o <- file(suite_file) writeLines(suite, suite_file_o) close(suite_file_o) system("ecflow_start.sh -p 5678") system(paste0("ecflow_client --load=", suite_file, ' --port=5678')) system("ecflow_client --begin=STARTR_CHUNKING --port=5678") if (!silent) { while (is.null(time_after_first_chunk)) { if (any(grepl('.*\\.Rds$', list.files(shared_dir)))) { time_after_first_chunk <- Sys.time() estimate <- (time_after_first_chunk - time_before_first_chunk) * ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) / cluster[['max_jobs']]) units(estimate) <- 'mins' startR:::.message( paste0('Remaining time estimate (neglecting queue and ', 'merge time) (at ', format(time_after_first_chunk), '): ', format(estimate)) ) } } } done <- FALSE while (!done) { Sys.sleep(1) status <- system("ecflow_client --get_state=STARTR_CHUNKING --port=5678", intern = TRUE) if (any(grepl("suite STARTR_CHUNKING #.* state:complete", status))) { done <- TRUE } } system("ecflow_client --delete=force yes /STARTR_CHUNKING --port=5678") #system("ecflow_client --shutdown --port=5678") #system("ecflow_stop.sh -p 5678") result <- readRDS(paste0(shared_dir, '/result.Rds')) file.remove(paste0(shared_dir, '/result.Rds')) } check result dimensions match expected_ startR:::.message("Computation ended successfully.") result }