From e3b751222758c1cf1704de5bb2f48bf12beebcba Mon Sep 17 00:00:00 2001 From: Nicolau Manubens Date: Fri, 22 Sep 2017 00:01:14 +0200 Subject: [PATCH 1/4] Added explore feature. --- R/Start.R | 13 ++++++++++++- man/Start.Rd | 4 ++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/R/Start.R b/R/Start.R index 2854453..b2d790d 100644 --- a/R/Start.R +++ b/R/Start.R @@ -18,7 +18,8 @@ Start <- function(..., # dim = indices/selectors, apply_indices_after_transform = FALSE, pattern_dims = NULL, metadata_dims = NULL, - selector_checker = SelectorChecker, + selector_checker = SelectorChecker, + explore = FALSE, num_procs = NULL, silent = FALSE, debug = FALSE) { #, config_file = NULL #dictionary_dim_names = , @@ -461,6 +462,11 @@ Start <- function(..., # dim = indices/selectors, } transform_extra_cells <- round(transform_extra_cells) + # Check explore + if (!is.logical(explore)) { + stop("Parameter 'explore' must be TRUE or FALSE.") + } + # Check num_procs if (!is.null(num_procs)) { if (!is.numeric(num_procs)) { @@ -2251,6 +2257,9 @@ print("-> PROCEEDING TO CROP VARIABLES") new_dims <- .MergeArrayDims(dim(array_of_files_to_load), total_inner_dims) final_dims <- pmax(new_dims[[1]], new_dims[[2]])[dim_names] + # The following several lines will only be run if explore = FALSE + if (!explore) { + ########## CREATING THE SHARED MATRIX AND DISPATCHING WORK PIECES ########### # TODO: try performance of storing all in cols instead of rows # Create the shared memory array, and a pointer to it, to be sent @@ -2553,6 +2562,8 @@ print("-> WORK PIECES BUILT") array_of_not_found_files <- NULL } + } # End if (!explore) + # Replace the vars and common vars by the transformed vars and common vars for (i in 1:length(dat)) { if (length(names(transformed_vars[[i]])) > 0) { diff --git a/man/Start.Rd b/man/Start.Rd index d56d23f..b6eecfe 100644 --- a/man/Start.Rd +++ b/man/Start.Rd @@ -38,6 +38,7 @@ Start(..., pattern_dims = NULL, metadata_dims = NULL, selector_checker = SelectorChecker, + explore = FALSE, num_procs = NULL, silent = FALSE, debug = FALSE) @@ -344,6 +345,9 @@ It expects to receive a vector of character strings with the names of the file d } \item{selector_checker}{ Function used internaly by \code{Start()} to translate a set of selectors (values for a dimension associated to a coordinate variable) into a set of numeric indices. It takes by default \code{SelectorChecker} and, in principle, it should not be required to change it for customized file formats. The option to replace it is left open for more versatility. See the code of \code{SelectorChecker} for details on the inputs, functioning and outputs of a selector checker. + } + \item{explore}{ +Logical value telling whether to explore only the dimensions of the resulting array, and the values for the file and inner dimensions. Takes FALSE by default. } \item{num_procs}{ Number of processes to be created for the parallel execution of the retrieval / transformation / arrangement of the multiple involved files in a call to \code{Start()}. Takes by default the number of available cores (as detected by \code{detectCores()} in the package 'future'). -- GitLab From 92b23533179c093eb32c8fc20277d58c98462963 Mon Sep 17 00:00:00 2001 From: Nicolau Manubens Date: Fri, 22 Sep 2017 02:36:22 +0200 Subject: [PATCH 2/4] Added explore functionality. --- R/Start.R | 49 ++++++++++++++++++++++++++++++++++--------------- man/Start.Rd | 25 +++++++++++++++++++++---- 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/R/Start.R b/R/Start.R index b2d790d..288524a 100644 --- a/R/Start.R +++ b/R/Start.R @@ -19,7 +19,7 @@ Start <- function(..., # dim = indices/selectors, pattern_dims = NULL, metadata_dims = NULL, selector_checker = SelectorChecker, - explore = FALSE, + retrieve = FALSE, num_procs = NULL, silent = FALSE, debug = FALSE) { #, config_file = NULL #dictionary_dim_names = , @@ -462,9 +462,9 @@ Start <- function(..., # dim = indices/selectors, } transform_extra_cells <- round(transform_extra_cells) - # Check explore - if (!is.logical(explore)) { - stop("Parameter 'explore' must be TRUE or FALSE.") + # Check retrieve + if (!is.logical(retrieve)) { + stop("Parameter 'retrieve' must be TRUE or FALSE.") } # Check num_procs @@ -2257,8 +2257,8 @@ print("-> PROCEEDING TO CROP VARIABLES") new_dims <- .MergeArrayDims(dim(array_of_files_to_load), total_inner_dims) final_dims <- pmax(new_dims[[1]], new_dims[[2]])[dim_names] - # The following several lines will only be run if explore = FALSE - if (!explore) { + # The following several lines will only be run if retrieve = TRUE + if (retrieve) { ########## CREATING THE SHARED MATRIX AND DISPATCHING WORK PIECES ########### # TODO: try performance of storing all in cols instead of rows @@ -2562,7 +2562,7 @@ print("-> WORK PIECES BUILT") array_of_not_found_files <- NULL } - } # End if (!explore) + } # End if (retrieve) # Replace the vars and common vars by the transformed vars and common vars for (i in 1:length(dat)) { @@ -2583,19 +2583,38 @@ print(str(transformed_vars)) print("-> THE PICKED VARS:") print(str(picked_vars)) } - if (!silent) { - .message("Successfully retrieved data.") - } file_selectors <- NULL for (i in 1:length(dat)) { file_selectors[[dat[[i]][['name']]]] <- dat[[i]][['selectors']][which(names(dat[[i]][['selectors']]) %in% found_file_dims[[i]])] } - list(Data = data_array, - Variables = c(list(common = picked_common_vars), picked_vars), - Files = array_of_files_to_load, - NotFoundFiles = array_of_not_found_files, - FileSelectors = file_selectors) + if (retrieve) { + if (!silent) { + .message("Successfully retrieved data.") + } + structure( + list(Data = data_array, + Variables = c(list(common = picked_common_vars), picked_vars), + Files = array_of_files_to_load, + NotFoundFiles = array_of_not_found_files, + FileSelectors = file_selectors), + class = 'startR_cube' + ) + } else { + if (!silent) { + .message("Successfully discovered data dimensions.") + } + startR_call <- match.call() + startR_call[['retrieve']] <- TRUE + structure( + list(Dimensions = final_dims, + Variables = c(list(common = picked_common_vars), picked_vars), + ExpectedFiles = array_of_files_to_load, + FileSelectors = file_selectors, + StartRCall = startR_call), + class = 'startR_header' + ) + } } # This function is the responsible for loading the data of each work diff --git a/man/Start.Rd b/man/Start.Rd index b6eecfe..bda85a8 100644 --- a/man/Start.Rd +++ b/man/Start.Rd @@ -38,7 +38,7 @@ Start(..., pattern_dims = NULL, metadata_dims = NULL, selector_checker = SelectorChecker, - explore = FALSE, + retrieve = FALSE, num_procs = NULL, silent = FALSE, debug = FALSE) @@ -346,8 +346,8 @@ It expects to receive a vector of character strings with the names of the file d \item{selector_checker}{ Function used internaly by \code{Start()} to translate a set of selectors (values for a dimension associated to a coordinate variable) into a set of numeric indices. It takes by default \code{SelectorChecker} and, in principle, it should not be required to change it for customized file formats. The option to replace it is left open for more versatility. See the code of \code{SelectorChecker} for details on the inputs, functioning and outputs of a selector checker. } - \item{explore}{ -Logical value telling whether to explore only the dimensions of the resulting array, and the values for the file and inner dimensions. Takes FALSE by default. + \item{retrieve}{ +Logical value telling whether to retrieve the data defined in the \code{Start} call or to explore only its dimension lengths and names, and the values for the file and inner dimensions. Takes FALSE by default. } \item{num_procs}{ Number of processes to be created for the parallel execution of the retrieval / transformation / arrangement of the multiple involved files in a call to \code{Start()}. Takes by default the number of available cores (as detected by \code{detectCores()} in the package 'future'). @@ -356,13 +356,14 @@ Number of processes to be created for the parallel execution of the retrieval / Boolean flag, whether to display progress messages (FALSE; default) or not (TRUE). } \item{debug}{ -Whether to return detailed messages on the progress and operations in a \code{Start()} call (TRUE) or not (FALSE; default). +Whether to return detailed messages on the progress and operations in a \code{Start} call (TRUE) or not (FALSE; default). } } \details{ Check \href{https://earth.bsc.es/gitlab/es/startR}{the startR website} for more information. } \value{ +If \code{retrieve = TRUE} the involved data is loaded into RAM memory and an object of the class 'startR_cube' with the following components is returned:\cr \item{Data}{ Multidimensional data array with named dimensions, with the data values requested via \dots and other parameters. This array can potentially contain metadata in the attribute 'variables'. } @@ -378,6 +379,22 @@ Array with the same shape as \code{$Files} but with \code{NULL} in the positions \item{FileSelectors}{ Multidimensional character string array with named dimensions, with the same shape as \code{$Files} and \code{$NotFoundFiles}, which contains the components used to build up the paths to each of the files in the data sources. } +If \code{retrieve = FALSE} the involved data is not loaded into RAM memory and an object of the class 'startR_header' with the following components is returned:\cr + \item{Dimensions}{ +Named vector with the dimension lengths and names of the data involved in the \code{Start} call. + } + \item{Variales}{ +Named list of 1 + N components, containing lists of retrieved variables (as requested in \code{return_vars}) common to all the data sources (in the 1st component, \code{$common}), and for each of the N dara sources (named after the source name, as specified in \dots, or, if not specified, \code{$dat1}, \code{$dat2}, ..., \code{$datN}). Each of the variables are contained in a multidimensional array with named dimensions, and potentially with the attribute 'variables' with additional auxiliary data. + } + \item{Files}{ +Multidimensonal character string array with named dimensions. Its dimensions are the file dimensions (as requested in \dots). Each cell in this array contains a path to a file to be retrieved (which may exist or not). + } + \item{FileSelectors}{ +Multidimensional character string array with named dimensions, with the same shape as \code{$Files} and \code{$NotFoundFiles}, which contains the components used to build up the paths to each of the files in the data sources. + } + \item{StartRCall}{ +List of parameters sent to the \code{Start} call, with the parameter \code{retrieve} set to \code{TRUE}. Intended for calling in order to retrieve the associated data a posteriori with a call to \code{do.call}. + } } \author{ History:\cr -- GitLab From 1a38f2021aad33b5bc5a354c3af641b1a93158a8 Mon Sep 17 00:00:00 2001 From: Nicolau Manubens Date: Fri, 22 Sep 2017 03:04:59 +0200 Subject: [PATCH 3/4] Added code for chunk computation, to be fixed. --- R/Compute.R | 495 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 495 insertions(+) create mode 100644 R/Compute.R diff --git a/R/Compute.R b/R/Compute.R new file mode 100644 index 0000000..652e767 --- /dev/null +++ b/R/Compute.R @@ -0,0 +1,495 @@ +Compute <- function(fun, chunks = NULL, + ..., + cluster = NULL, shared_dir = NULL, + silent = FALSE, debug = FALSE){ + MergeArrays <- startR:::.MergeArrays + + params <- list(...) + + # Check fun + if (!is.function(fun)) { + stop("Parameter 'fun' must be a function.") + } + + # Check cluster + if (!is.null(cluster)) { + cluster <- list(queue_host = 'bsceslogin01.bsc.es', + node_memory = NULL) + } + on_cluster <- !is.null(cluster) + + # Check shared_dir + if (on_cluster) { + if (is.null(shared_dir)) { + stop("Parameter 'shared_dir' must be specified when a cluster is specified.") + } else { + dir.create(shared_dir, recursive = TRUE) + if (!dir.exists(shared_dir)) stop("Could not find or create the directory in ", + "parameter 'shared_dir'.") + } + } + + # Check silent + if (!is.logical(silent)) { + stop("Parameter 'silent' must be logical.") + } + + # Check debug + if (!is.logical(debug)) { + stop("Parameter 'debug' must be logical.") + } + if (silent) { + debug <- FALSE + } + + # Check chunks + if (!is.list(chunks)) { + stop("Parameter 'chunks' must be a named list.") + } + if (is.null(names(chunks))) { + stop("Parameter 'chunks' must be a named list.") + } + new_chunks <- list(dataset = 1, member = 1, sdate = 1, + ftime = 1, lat = 1, lon = 1) + if (any(!(names(chunks) %in% names(new_chunks)))) { + stop("All names in parameter 'chunks' must be one of 'dataset', 'member', ", + "'sdate', 'ftime', 'lat' or 'lon'.") + } + if (any(!(((unlist(chunks) %% 1) == 0) | (unlist(chunks) == 'all')))) { + stop("All values in parameter 'chunks' must take a numeric value or 'all'.") + } + if (any(unlist(chunks) < 1)) { + stop("All values in parameter 'chunks' must be > 1.") + } + new_chunks[names(chunks)] <- chunks + chunks <- new_chunks + + # Discover coordinates + s2dverification:::.message(paste("Discovering grid size...")) + params_to_leave <- c('leadtimemax', 'leadtimemin', 'nmember', 'nmemberobs') + params_to_use <- params + if (length(which(names(params) %in% params_to_leave)) > 0) { + params_to_use <- params[-which(names(params) %in% params_to_leave)] + } + data_lonlat <- do.call("Load", + c( + list( + var, exp[1], obs[1], + sdates[1], leadtimemax = 1, + nmember = 1, nmemberobs = 1, + silent = !debug + ), + params_to_use + ) + ) + lons <- data_lonlat$lon + lats <- data_lonlat$lat + s2dverification:::.message(paste("Discovering forecast time steps...")) + params_to_leave <- c('nmember', 'nmemberobs', 'lonmin', 'lonmax', 'latmin', 'latmax') + params_to_use <- params + if (length(which(names(params) %in% params_to_leave)) > 0) { + params_to_use <- params[-which(names(params) %in% params_to_leave)] + } + data_ftime <- do.call("Load", + c( + list( + var, exp[1], obs[1], sdates[1], + nmember = 1, nmemberobs = 1, + lonmin = lons[1], lonmax = lons[1], + latmin = lats[1], latmax = lats[1], + silent = !debug + ), + params_to_use + ) + ) + s2dverification:::.message(paste("Discovering ensemble members...")) + params_to_leave <- c('leadtimemax', 'leadtimemin', + 'lonmin', 'lonmax', 'latmin', 'latmax') + params_to_use <- params + if (length(which(names(params) %in% params_to_leave)) > 0) { + params_to_use <- params[-which(names(params) %in% params_to_leave)] + } + data_members <- do.call("Load", + c( + list( + var, exp[1], obs[1], + sdates[1], leadtimemax = 1, + lonmin = lons[1], lonmax = lons[1], + latmin = lats[1], latmax = lats[1], + silent = !debug + ), + params_to_use + ) + ) + if (!is.null(exp)) { + nftime <- dim(data_ftime$mod)['ftime'] + nmemb_exp <- dim(data_ftime$mod)['ftime'] + } else { + nftime <- dim(data_ftime$obs)['ftime'] + nmemb_obs <- dim(data_ftime$obs)['ftime'] + } + if (is.null(params[['leadtimemin']])) { + ftimes <- 1:nftime + } else { + ftimes <- (1:nftime) + params[['leadtimemin']] - 1 + } + + # Replace 'all's + if (chunks[['dataset']] == 'all') { + chunks[['dataset']] <- length(exp) + } + if (chunks[['member']] == 'all') { + chunks[['dataset']] <- nmemb_exp + } + if (chunks[['sdate']] == 'all') { + chunks[['sdate']] <- length(sdates) + } + if (chunks[['ftime']] == 'all') { + chunks[['ftime']] <- nftime + } + if (chunks[['lat']] == 'all') { + chunks[['lat']] <- length(lats) + } + if (chunks[['lon']] == 'all') { + chunks[['lon']] <- length(lons) + } + + # Empty array to collect results + result <- list(mod = NULL, obs = NULL) + + # Mount the ecFlow suite + if (on_cluster) { + # Copy load_process_save_chunk.R into shared folder + chunk_script <- file(system.file('chunking/load_process_save_chunk.R', + package = 's2dverification')) + chunk_script_lines <- readLines(chunk_script) + chunk_script_lines <- gsub('^var <- *', paste0('var <- ', paste(deparse(var), collapse = '\n')), + chunk_script_lines) + chunk_script_lines <- gsub('^obs <- *', paste0('obs <- ', paste(deparse(obs), collapse = '\n')), + chunk_script_lines) + chunk_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', + paste(deparse(shared_dir), collapse = '\n')), chunk_script_lines) + chunk_script_lines <- gsub('^debug <- *', paste0('debug <- ', paste(deparse(debug), collapse = '\n')), + chunk_script_lines) + params_to_leave <- c('leadtimemax', 'leadtimemin', + 'lonmin', 'lonmax', 'latmin', 'latmax') + params_to_use <- params + if (length(which(names(params) %in% params_to_leave)) > 0) { + params_to_use <- params[-which(names(params) %in% params_to_leave)] + } + chunk_script_lines <- gsub('^params <- *', paste0('params <- ', + paste(deparse(params_to_use), collapse = '\n')), chunk_script_lines) + chunk_script_lines <- gsub('^fun <- *', paste0('fun <- ', paste(deparse(fun), collapse = '\n')), + chunk_script_lines) + original_selectors <- vector('list', 6) + names(original_selectors) <- c('dataset', 'member', 'sdate', 'ftime', 'lat', 'lon') + original_selectors[['dataset']] <- exp + original_selectors[['sdate']] <- sdates + original_selectors[['ftime']] <- ftimes + chunk_script_lines <- gsub('^original_selectors <- *', + paste0('original_selectors <- ', + paste(deparse(original_selectors), collapse = '\n')), + chunk_script_lines) + writeLines(chunk_script_lines, paste0(shared_dir, '/load_process_save_chunk.R')) + + # Copy Chunk.ecf into shared folder + #TODO: Modify chain of params sent to r script in function of + #array dimensions: %DIM_CHUNK_INDEX% %DIM_MIN% %DIM_MAX% + chunk_ecf_script <- file(system.file('chunking/Chunk.ecf', + package = 's2dverification')) + chunk_ecf_script_lines <- readLines(chunk_ecf_script) + writeLines(chunk_ecf_script_lines, paste0(shared_dir, '/Chunk.ecf')) + + # Copy merge_chunks.R into tmp folder + merge_script <- file(system.file('chunking/merge_chunks.R', + package = 's2dverification')) + merge_script_lines <- readLines(merge_script) + merge_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', + paste(deparse(shared_dir), collapse = '\n')), merge_script_lines) + writeLines(merge_script_lines, paste0(shared_dir, '/merge_chunk.R')) + + # Copy Merge.ecf into tmp folder + #TODO: Modify chain of parameters sent to r script when merging + #chunks progressively + merge_ecf_script <- file(system.file('chunking/Merge.ecf', + package = 's2dverification')) + merge_ecf_script_lines <- readLines(merge_ecf_script) + writeLines(merge_ecf_script_lines, paste0(shared_dir, '/Merge.ecf')) + } + + add_line <- function(suite, line, tabs) { + c(suite, paste0(paste(rep(' ', tabs), collapse = ''), line)) + } + suite <- NULL + tabs <- 0 + suite <- add_line(suite, 'suite S2DV_CHUNKING', tabs) + tabs <- tabs + 2 + suite <- add_line(suite, paste0("edit ECF_HOME '", shared_dir, "'"), tabs) + suite <- add_line(suite, paste0("edit ECF_FILES '", shared_dir, "'"), tabs) + suite <- add_line(suite, paste0("edit HOST '", cluster[['queue_host']], "'"), tabs) + suite <- add_line(suite, "edit ECF_JOB_CMD 'ssh %HOST% 'sbatch %ECF_JOB% > %ECF_JOBOUT% 2>&1 &''", tabs) + + # Iterate through chunks + chunk_counter <- 0 + if (chunks[['dataset']] > length(exp)) { + warning("Too many 'dataset' chunks requested. Reducing from ", + chunks[['dataset']], " to ", length(exp), ".") + chunks[['dataset']] <- length(exp) + } + datasets_per_chunk <- ceiling(length(exp) / chunks[['dataset']]) + dataset_index <- 1 + result <- list(mod = NULL, obs = NULL) + for (dataset in 1:chunks[['dataset']]) { + dataset_min <- (1:length(exp))[dataset_index] + dataset_max <- (1:length(exp))[min(length(exp), dataset_index + datasets_per_chunk - 1)] + suite <- add_line(suite, paste0('family ', 'dataset_', dataset), tabs) + tabs <- tabs + 2 + suite <- add_line(suite, paste0('edit ', 'DATASET ', dataset), tabs) + suite <- add_line(suite, paste0('edit ', 'DATASET_MIN ', dataset_min), tabs) + suite <- add_line(suite, paste0('edit ', 'DATASET_MAX ', dataset_max), tabs) + + if (chunks[['member']] > nmemb_exp) { + warning("Too many 'member' chunks requested. Reducing from ", + chunks[['member']], " to ", nmemb_exp, ".") + chunks[['member']] <- nmemb_exp + } + members_per_chunk <- ceiling(nmemb_exp / chunks[['member']]) + member_index <- 1 + res_member <- list(mod = NULL, obs = NULL) + for (member in 1:chunks[['member']]) { + member_min <- (1:nmemb_exp)[member_index] + member_max <- (1:nmemb_exp)[min(nmemb_exp, member_index + members_per_chunk - 1)] + suite <- add_line(suite, paste0('family ', 'member_', member), tabs) + tabs <- tabs + 2 + suite <- add_line(suite, paste0('edit ', 'MEMBER ', member), tabs) + suite <- add_line(suite, paste0('edit ', 'MEMBER_MIN ', member_min), tabs) + suite <- add_line(suite, paste0('edit ', 'MEMBER_MAX ', member_max), tabs) + + if (chunks[['sdate']] > length(sdates)) { + warning("Too many 'sdate' chunks requested. Reducing from ", + chunks[['sdate']], " to ", length(sdates), ".") + chunks[['sdate']] <- length(sdate) + } + sdates_per_chunk <- ceiling(length(sdates) / chunks[['sdate']]) + sdate_index <- 1 + res_sdate <- list(mod = NULL, obs = NULL) + for (sdate in 1:chunks[['sdate']]) { + sdate_min <- (1:length(sdates))[sdate_index] + sdate_max <- (1:length(sdates))[min(length(sdates), sdate_index + sdates_per_chunk - 1)] + suite <- add_line(suite, paste0('family ', 'sdate_', sdate), tabs) + tabs <- tabs + 2 + suite <- add_line(suite, paste0('edit ', 'SDATE ', sdate), tabs) + suite <- add_line(suite, paste0('edit ', 'SDATE_MIN ', sdate_min), tabs) + suite <- add_line(suite, paste0('edit ', 'SDATE_MAX ', sdate_max), tabs) + + if (chunks[['ftime']] > nftime) { + warning("Too many 'ftime' chunks requested. Reducing from ", + chunks[['ftime']], " to ", nftime, ".") + chunks[['ftime']] <- nftime + } + ftimes_per_chunk <- ceiling(nftime / chunks[['ftime']]) + ftime_index <- 1 + res_ftime <- list(mod = NULL, obs = NULL) + for (ftime in 1:chunks[['ftime']]) { + ftime_min <- (1:nftime)[ftime_index] + ftime_max <- (1:nftime)[min(nftime, ftime_index + ftimes_per_chunk - 1)] + suite <- add_line(suite, paste0('family ', 'ftime_', ftime), tabs) + tabs <- tabs + 2 + suite <- add_line(suite, paste0('edit ', 'FTIME ', ftime), tabs) + suite <- add_line(suite, paste0('edit ', 'FTIME_MIN ', ftime_min), tabs) + suite <- add_line(suite, paste0('edit ', 'FTIME_MAX ', ftime_max), tabs) + + if (chunks[['lat']] > length(lats)) { + warning("Too many 'lat' chunks requested. Reducing from ", + chunks[['lat']], " to ", length(lats), ".") + chunks[['lat']] <- length(lats) + } + lats_per_chunk <- ceiling(length(lats) / chunks[['lat']]) + lat_index <- 1 + res_lat <- list(mod = NULL, obs = NULL) + for (lat in 1:chunks[['lat']]) { + lat_max <- lats[lat_index] + lat_min <- lats[min(length(lats), lat_index + lats_per_chunk - 1)] + suite <- add_line(suite, paste0('family ', 'lat_', lat), tabs) + tabs <- tabs + 2 + suite <- add_line(suite, paste0('edit ', 'LAT ', lat), tabs) + suite <- add_line(suite, paste0('edit ', 'LAT_MIN ', lat_min), tabs) + suite <- add_line(suite, paste0('edit ', 'LAT_MAX ', lat_max), tabs) + + if (chunks[['lon']] > length(lons)) { + warning("Too many 'lon' chunks requested. Reducing from ", + chunks[['lon']], " to ", length(lons), ".") + chunks[['lon']] <- length(lons) + } + lons_per_chunk <- ceiling(length(lons) / chunks[['lon']]) + lon_index <- 1 + res_lon <- list(mod = NULL, obs = NULL) + for (lon in 1:chunks[['lon']]) { + lon_min <- lons[lon_index] + lon_max <- lons[min(length(lons), lon_index + lons_per_chunk - 1)] + suite <- add_line(suite, paste0('family ', 'lon_', lon), tabs) + tabs <- tabs + 2 + suite <- add_line(suite, paste0('edit ', 'LON ', lon), tabs) + suite <- add_line(suite, paste0('edit ', 'LON_MIN ', lon_min), tabs) + suite <- add_line(suite, paste0('edit ', 'LON_MAX ', lon_max), tabs) + + # ADD CHUNK SCRIPT TO SUITE + suite <- add_line(suite, "task Chunk", tabs) + tabs <- tabs - 2 + suite <- add_line(suite, paste0('endfamily'), tabs) + + if (!on_cluster) { + if (!silent) { + s2dverification:::.message(paste("Loading chunk", chunk_counter + 1, + "out of", prod(unlist(chunks)), "...")) + } + params_to_leave <- c('leadtimemax', 'leadtimemin', + 'latmin', 'latmax', 'lonmin', 'lonmax') + params_to_use <- params + if (length(which(names(params) %in% params_to_leave)) > 0) { + params_to_use <- params[-which(names(params) %in% params_to_leave)] + } + data <- do.call("Load", + c( + list( + var, exp[dataset_min:dataset_max], obs, + sdates[sdate_min:sdate_max], + leadtimemin = ftimes[ftime_min], + leadtimemax = ftimes[ftime_max], + latmin = lat_min, latmax = lat_max, + lonmin = lon_min, lonmax = lon_max, + silent = !debug + ), + params_to_use + ) + ) + + if (!silent) { + s2dverification:::.message(paste("Processing...")) + } + + res <- fun(data) + + rm(data) + gc() + + if (chunks[['lon']] > 1) { + res_lon$mod <- MergeArrays(res_lon$mod, res$mod, 'lon') + res_lon$obs <- MergeArrays(res_lon$obs, res$obs, 'lon') + } else { + res_lon <- res + } + rm(res) + gc() + } + chunk_counter <- chunk_counter + 1 + lon_index <- lon_index + lons_per_chunk + } + tabs <- tabs - 2 + suite <- add_line(suite, paste0('endfamily'), tabs) + if (!on_cluster) { + if (chunks[['lat']] > 1) { + res_lat$mod <- MergeArrays(res_lat$mod, res_lon$mod, 'lat') + res_lat$obs <- MergeArrays(res_lat$obs, res_lon$obs, 'lat') + } else { + res_lat <- res_lon + } + rm(res_lon) + gc() + } + lat_index <- lat_index + lats_per_chunk + } + tabs <- tabs - 2 + suite <- add_line(suite, paste0('endfamily'), tabs) + if (!on_cluster) { + if (chunks[['ftime']] > 1) { + res_ftime$mod <- MergeArrays(res_ftime$mod, res_lat$mod, 'ftime') + res_ftime$obs <- MergeArrays(res_ftime$obs, res_lat$obs, 'ftime') + } else { + res_ftime <- res_lat + } + rm(res_lat) + gc() + } + ftime_index <- ftime_index + ftimes_per_chunk + } + tabs <- tabs - 2 + suite <- add_line(suite, paste0('endfamily'), tabs) + if (!on_cluster) { + if (chunks[['sdate']] > 1) { + res_sdate$mod <- MergeArrays(res_sdate$mod, res_ftime$mod, 'sdate') + res_sdate$obs <- MergeArrays(res_sdate$obs, res_ftime$obs, 'sdate') + } else { + res_sdate <- res_ftime + } + rm(res_ftime) + gc() + } + sdate_index <- sdate_index + sdates_per_chunk + } + tabs <- tabs - 2 + suite <- add_line(suite, paste0('endfamily'), tabs) + if (!on_cluster) { + if (chunks[['member']] > 1) { + res_member$mod <- MergeArrays(res_member$mod, res_sdate$mod, 'member') + res_member$obs <- MergeArrays(res_member$obs, res_sdate$obs, 'member') + } else { + res_member <- res_sdate + } + rm(res_sdate) + gc() + } + member_index <- member_index + members_per_chunk + } + tabs <- tabs - 2 + suite <- add_line(suite, paste0('endfamily'), tabs) + if (!on_cluster) { + if (chunks[['dataset']] > 1) { + result$mod <- MergeArrays(result$mod, res_member$mod, 'dataset') + result$obs <- MergeArrays(result$obs, res_member$obs, 'dataset') + } else { + result <- res_member + } + rm(res_member) + gc() + } + dataset_index <- dataset_index + datasets_per_chunk + } + + # Close the ecFlow suite + suite <- add_line(suite, "family merge", tabs) + tabs <- tabs + 2 + suite <- add_line(suite, "edit ECF_JOB_CMD 'bash %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'", tabs) + suite <- add_line(suite, "task Merge", tabs) + tabs <- tabs - 2 + suite <- add_line(suite, paste0('endfamily'), tabs) + + tabs <- tabs - 2 + suite <- add_line(suite, "endsuite", tabs) + + # Run ecFlow suite if needed + if (on_cluster) { + suite_file <- paste0(shared_dir, '/s2dv_chunking.def') + suite_file_o <- file(suite_file) + writeLines(suite, suite_file_o) + close(suite_file_o) + system("ecflow_start.sh -p 5678") + system(paste0("ecflow_client --load=", suite_file, ' --port=5678')) + system("ecflow_client --begin=S2DV_CHUNKING --port=5678") + done <- FALSE + while (!done) { + Sys.sleep(1) + status <- system("ecflow_client --get_state=S2DV_CHUNKING --port=5678", + intern = TRUE) + if (any(grepl("suite S2DV_CHUNKING #.* state:complete", status))) { + done <- TRUE + } + } + system("ecflow_client --shutdown --port=5678") + system("ecflow_stop.sh -p 5678") + result <- readRDS(paste0(shared_dir, '/result.Rds')) + file.remove(paste0(shared_dir, '/result.Rds')) + } + + result +} -- GitLab From 5f480d6afe8b7e7a25d28b3f1937f61fb65e3b8c Mon Sep 17 00:00:00 2001 From: Nicolau Manubens Date: Fri, 22 Sep 2017 19:49:37 +0200 Subject: [PATCH 4/4] Cleanse. --- R/Compute.R | 495 ---------------------------------------------------- 1 file changed, 495 deletions(-) delete mode 100644 R/Compute.R diff --git a/R/Compute.R b/R/Compute.R deleted file mode 100644 index 652e767..0000000 --- a/R/Compute.R +++ /dev/null @@ -1,495 +0,0 @@ -Compute <- function(fun, chunks = NULL, - ..., - cluster = NULL, shared_dir = NULL, - silent = FALSE, debug = FALSE){ - MergeArrays <- startR:::.MergeArrays - - params <- list(...) - - # Check fun - if (!is.function(fun)) { - stop("Parameter 'fun' must be a function.") - } - - # Check cluster - if (!is.null(cluster)) { - cluster <- list(queue_host = 'bsceslogin01.bsc.es', - node_memory = NULL) - } - on_cluster <- !is.null(cluster) - - # Check shared_dir - if (on_cluster) { - if (is.null(shared_dir)) { - stop("Parameter 'shared_dir' must be specified when a cluster is specified.") - } else { - dir.create(shared_dir, recursive = TRUE) - if (!dir.exists(shared_dir)) stop("Could not find or create the directory in ", - "parameter 'shared_dir'.") - } - } - - # Check silent - if (!is.logical(silent)) { - stop("Parameter 'silent' must be logical.") - } - - # Check debug - if (!is.logical(debug)) { - stop("Parameter 'debug' must be logical.") - } - if (silent) { - debug <- FALSE - } - - # Check chunks - if (!is.list(chunks)) { - stop("Parameter 'chunks' must be a named list.") - } - if (is.null(names(chunks))) { - stop("Parameter 'chunks' must be a named list.") - } - new_chunks <- list(dataset = 1, member = 1, sdate = 1, - ftime = 1, lat = 1, lon = 1) - if (any(!(names(chunks) %in% names(new_chunks)))) { - stop("All names in parameter 'chunks' must be one of 'dataset', 'member', ", - "'sdate', 'ftime', 'lat' or 'lon'.") - } - if (any(!(((unlist(chunks) %% 1) == 0) | (unlist(chunks) == 'all')))) { - stop("All values in parameter 'chunks' must take a numeric value or 'all'.") - } - if (any(unlist(chunks) < 1)) { - stop("All values in parameter 'chunks' must be > 1.") - } - new_chunks[names(chunks)] <- chunks - chunks <- new_chunks - - # Discover coordinates - s2dverification:::.message(paste("Discovering grid size...")) - params_to_leave <- c('leadtimemax', 'leadtimemin', 'nmember', 'nmemberobs') - params_to_use <- params - if (length(which(names(params) %in% params_to_leave)) > 0) { - params_to_use <- params[-which(names(params) %in% params_to_leave)] - } - data_lonlat <- do.call("Load", - c( - list( - var, exp[1], obs[1], - sdates[1], leadtimemax = 1, - nmember = 1, nmemberobs = 1, - silent = !debug - ), - params_to_use - ) - ) - lons <- data_lonlat$lon - lats <- data_lonlat$lat - s2dverification:::.message(paste("Discovering forecast time steps...")) - params_to_leave <- c('nmember', 'nmemberobs', 'lonmin', 'lonmax', 'latmin', 'latmax') - params_to_use <- params - if (length(which(names(params) %in% params_to_leave)) > 0) { - params_to_use <- params[-which(names(params) %in% params_to_leave)] - } - data_ftime <- do.call("Load", - c( - list( - var, exp[1], obs[1], sdates[1], - nmember = 1, nmemberobs = 1, - lonmin = lons[1], lonmax = lons[1], - latmin = lats[1], latmax = lats[1], - silent = !debug - ), - params_to_use - ) - ) - s2dverification:::.message(paste("Discovering ensemble members...")) - params_to_leave <- c('leadtimemax', 'leadtimemin', - 'lonmin', 'lonmax', 'latmin', 'latmax') - params_to_use <- params - if (length(which(names(params) %in% params_to_leave)) > 0) { - params_to_use <- params[-which(names(params) %in% params_to_leave)] - } - data_members <- do.call("Load", - c( - list( - var, exp[1], obs[1], - sdates[1], leadtimemax = 1, - lonmin = lons[1], lonmax = lons[1], - latmin = lats[1], latmax = lats[1], - silent = !debug - ), - params_to_use - ) - ) - if (!is.null(exp)) { - nftime <- dim(data_ftime$mod)['ftime'] - nmemb_exp <- dim(data_ftime$mod)['ftime'] - } else { - nftime <- dim(data_ftime$obs)['ftime'] - nmemb_obs <- dim(data_ftime$obs)['ftime'] - } - if (is.null(params[['leadtimemin']])) { - ftimes <- 1:nftime - } else { - ftimes <- (1:nftime) + params[['leadtimemin']] - 1 - } - - # Replace 'all's - if (chunks[['dataset']] == 'all') { - chunks[['dataset']] <- length(exp) - } - if (chunks[['member']] == 'all') { - chunks[['dataset']] <- nmemb_exp - } - if (chunks[['sdate']] == 'all') { - chunks[['sdate']] <- length(sdates) - } - if (chunks[['ftime']] == 'all') { - chunks[['ftime']] <- nftime - } - if (chunks[['lat']] == 'all') { - chunks[['lat']] <- length(lats) - } - if (chunks[['lon']] == 'all') { - chunks[['lon']] <- length(lons) - } - - # Empty array to collect results - result <- list(mod = NULL, obs = NULL) - - # Mount the ecFlow suite - if (on_cluster) { - # Copy load_process_save_chunk.R into shared folder - chunk_script <- file(system.file('chunking/load_process_save_chunk.R', - package = 's2dverification')) - chunk_script_lines <- readLines(chunk_script) - chunk_script_lines <- gsub('^var <- *', paste0('var <- ', paste(deparse(var), collapse = '\n')), - chunk_script_lines) - chunk_script_lines <- gsub('^obs <- *', paste0('obs <- ', paste(deparse(obs), collapse = '\n')), - chunk_script_lines) - chunk_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', - paste(deparse(shared_dir), collapse = '\n')), chunk_script_lines) - chunk_script_lines <- gsub('^debug <- *', paste0('debug <- ', paste(deparse(debug), collapse = '\n')), - chunk_script_lines) - params_to_leave <- c('leadtimemax', 'leadtimemin', - 'lonmin', 'lonmax', 'latmin', 'latmax') - params_to_use <- params - if (length(which(names(params) %in% params_to_leave)) > 0) { - params_to_use <- params[-which(names(params) %in% params_to_leave)] - } - chunk_script_lines <- gsub('^params <- *', paste0('params <- ', - paste(deparse(params_to_use), collapse = '\n')), chunk_script_lines) - chunk_script_lines <- gsub('^fun <- *', paste0('fun <- ', paste(deparse(fun), collapse = '\n')), - chunk_script_lines) - original_selectors <- vector('list', 6) - names(original_selectors) <- c('dataset', 'member', 'sdate', 'ftime', 'lat', 'lon') - original_selectors[['dataset']] <- exp - original_selectors[['sdate']] <- sdates - original_selectors[['ftime']] <- ftimes - chunk_script_lines <- gsub('^original_selectors <- *', - paste0('original_selectors <- ', - paste(deparse(original_selectors), collapse = '\n')), - chunk_script_lines) - writeLines(chunk_script_lines, paste0(shared_dir, '/load_process_save_chunk.R')) - - # Copy Chunk.ecf into shared folder - #TODO: Modify chain of params sent to r script in function of - #array dimensions: %DIM_CHUNK_INDEX% %DIM_MIN% %DIM_MAX% - chunk_ecf_script <- file(system.file('chunking/Chunk.ecf', - package = 's2dverification')) - chunk_ecf_script_lines <- readLines(chunk_ecf_script) - writeLines(chunk_ecf_script_lines, paste0(shared_dir, '/Chunk.ecf')) - - # Copy merge_chunks.R into tmp folder - merge_script <- file(system.file('chunking/merge_chunks.R', - package = 's2dverification')) - merge_script_lines <- readLines(merge_script) - merge_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ', - paste(deparse(shared_dir), collapse = '\n')), merge_script_lines) - writeLines(merge_script_lines, paste0(shared_dir, '/merge_chunk.R')) - - # Copy Merge.ecf into tmp folder - #TODO: Modify chain of parameters sent to r script when merging - #chunks progressively - merge_ecf_script <- file(system.file('chunking/Merge.ecf', - package = 's2dverification')) - merge_ecf_script_lines <- readLines(merge_ecf_script) - writeLines(merge_ecf_script_lines, paste0(shared_dir, '/Merge.ecf')) - } - - add_line <- function(suite, line, tabs) { - c(suite, paste0(paste(rep(' ', tabs), collapse = ''), line)) - } - suite <- NULL - tabs <- 0 - suite <- add_line(suite, 'suite S2DV_CHUNKING', tabs) - tabs <- tabs + 2 - suite <- add_line(suite, paste0("edit ECF_HOME '", shared_dir, "'"), tabs) - suite <- add_line(suite, paste0("edit ECF_FILES '", shared_dir, "'"), tabs) - suite <- add_line(suite, paste0("edit HOST '", cluster[['queue_host']], "'"), tabs) - suite <- add_line(suite, "edit ECF_JOB_CMD 'ssh %HOST% 'sbatch %ECF_JOB% > %ECF_JOBOUT% 2>&1 &''", tabs) - - # Iterate through chunks - chunk_counter <- 0 - if (chunks[['dataset']] > length(exp)) { - warning("Too many 'dataset' chunks requested. Reducing from ", - chunks[['dataset']], " to ", length(exp), ".") - chunks[['dataset']] <- length(exp) - } - datasets_per_chunk <- ceiling(length(exp) / chunks[['dataset']]) - dataset_index <- 1 - result <- list(mod = NULL, obs = NULL) - for (dataset in 1:chunks[['dataset']]) { - dataset_min <- (1:length(exp))[dataset_index] - dataset_max <- (1:length(exp))[min(length(exp), dataset_index + datasets_per_chunk - 1)] - suite <- add_line(suite, paste0('family ', 'dataset_', dataset), tabs) - tabs <- tabs + 2 - suite <- add_line(suite, paste0('edit ', 'DATASET ', dataset), tabs) - suite <- add_line(suite, paste0('edit ', 'DATASET_MIN ', dataset_min), tabs) - suite <- add_line(suite, paste0('edit ', 'DATASET_MAX ', dataset_max), tabs) - - if (chunks[['member']] > nmemb_exp) { - warning("Too many 'member' chunks requested. Reducing from ", - chunks[['member']], " to ", nmemb_exp, ".") - chunks[['member']] <- nmemb_exp - } - members_per_chunk <- ceiling(nmemb_exp / chunks[['member']]) - member_index <- 1 - res_member <- list(mod = NULL, obs = NULL) - for (member in 1:chunks[['member']]) { - member_min <- (1:nmemb_exp)[member_index] - member_max <- (1:nmemb_exp)[min(nmemb_exp, member_index + members_per_chunk - 1)] - suite <- add_line(suite, paste0('family ', 'member_', member), tabs) - tabs <- tabs + 2 - suite <- add_line(suite, paste0('edit ', 'MEMBER ', member), tabs) - suite <- add_line(suite, paste0('edit ', 'MEMBER_MIN ', member_min), tabs) - suite <- add_line(suite, paste0('edit ', 'MEMBER_MAX ', member_max), tabs) - - if (chunks[['sdate']] > length(sdates)) { - warning("Too many 'sdate' chunks requested. Reducing from ", - chunks[['sdate']], " to ", length(sdates), ".") - chunks[['sdate']] <- length(sdate) - } - sdates_per_chunk <- ceiling(length(sdates) / chunks[['sdate']]) - sdate_index <- 1 - res_sdate <- list(mod = NULL, obs = NULL) - for (sdate in 1:chunks[['sdate']]) { - sdate_min <- (1:length(sdates))[sdate_index] - sdate_max <- (1:length(sdates))[min(length(sdates), sdate_index + sdates_per_chunk - 1)] - suite <- add_line(suite, paste0('family ', 'sdate_', sdate), tabs) - tabs <- tabs + 2 - suite <- add_line(suite, paste0('edit ', 'SDATE ', sdate), tabs) - suite <- add_line(suite, paste0('edit ', 'SDATE_MIN ', sdate_min), tabs) - suite <- add_line(suite, paste0('edit ', 'SDATE_MAX ', sdate_max), tabs) - - if (chunks[['ftime']] > nftime) { - warning("Too many 'ftime' chunks requested. Reducing from ", - chunks[['ftime']], " to ", nftime, ".") - chunks[['ftime']] <- nftime - } - ftimes_per_chunk <- ceiling(nftime / chunks[['ftime']]) - ftime_index <- 1 - res_ftime <- list(mod = NULL, obs = NULL) - for (ftime in 1:chunks[['ftime']]) { - ftime_min <- (1:nftime)[ftime_index] - ftime_max <- (1:nftime)[min(nftime, ftime_index + ftimes_per_chunk - 1)] - suite <- add_line(suite, paste0('family ', 'ftime_', ftime), tabs) - tabs <- tabs + 2 - suite <- add_line(suite, paste0('edit ', 'FTIME ', ftime), tabs) - suite <- add_line(suite, paste0('edit ', 'FTIME_MIN ', ftime_min), tabs) - suite <- add_line(suite, paste0('edit ', 'FTIME_MAX ', ftime_max), tabs) - - if (chunks[['lat']] > length(lats)) { - warning("Too many 'lat' chunks requested. Reducing from ", - chunks[['lat']], " to ", length(lats), ".") - chunks[['lat']] <- length(lats) - } - lats_per_chunk <- ceiling(length(lats) / chunks[['lat']]) - lat_index <- 1 - res_lat <- list(mod = NULL, obs = NULL) - for (lat in 1:chunks[['lat']]) { - lat_max <- lats[lat_index] - lat_min <- lats[min(length(lats), lat_index + lats_per_chunk - 1)] - suite <- add_line(suite, paste0('family ', 'lat_', lat), tabs) - tabs <- tabs + 2 - suite <- add_line(suite, paste0('edit ', 'LAT ', lat), tabs) - suite <- add_line(suite, paste0('edit ', 'LAT_MIN ', lat_min), tabs) - suite <- add_line(suite, paste0('edit ', 'LAT_MAX ', lat_max), tabs) - - if (chunks[['lon']] > length(lons)) { - warning("Too many 'lon' chunks requested. Reducing from ", - chunks[['lon']], " to ", length(lons), ".") - chunks[['lon']] <- length(lons) - } - lons_per_chunk <- ceiling(length(lons) / chunks[['lon']]) - lon_index <- 1 - res_lon <- list(mod = NULL, obs = NULL) - for (lon in 1:chunks[['lon']]) { - lon_min <- lons[lon_index] - lon_max <- lons[min(length(lons), lon_index + lons_per_chunk - 1)] - suite <- add_line(suite, paste0('family ', 'lon_', lon), tabs) - tabs <- tabs + 2 - suite <- add_line(suite, paste0('edit ', 'LON ', lon), tabs) - suite <- add_line(suite, paste0('edit ', 'LON_MIN ', lon_min), tabs) - suite <- add_line(suite, paste0('edit ', 'LON_MAX ', lon_max), tabs) - - # ADD CHUNK SCRIPT TO SUITE - suite <- add_line(suite, "task Chunk", tabs) - tabs <- tabs - 2 - suite <- add_line(suite, paste0('endfamily'), tabs) - - if (!on_cluster) { - if (!silent) { - s2dverification:::.message(paste("Loading chunk", chunk_counter + 1, - "out of", prod(unlist(chunks)), "...")) - } - params_to_leave <- c('leadtimemax', 'leadtimemin', - 'latmin', 'latmax', 'lonmin', 'lonmax') - params_to_use <- params - if (length(which(names(params) %in% params_to_leave)) > 0) { - params_to_use <- params[-which(names(params) %in% params_to_leave)] - } - data <- do.call("Load", - c( - list( - var, exp[dataset_min:dataset_max], obs, - sdates[sdate_min:sdate_max], - leadtimemin = ftimes[ftime_min], - leadtimemax = ftimes[ftime_max], - latmin = lat_min, latmax = lat_max, - lonmin = lon_min, lonmax = lon_max, - silent = !debug - ), - params_to_use - ) - ) - - if (!silent) { - s2dverification:::.message(paste("Processing...")) - } - - res <- fun(data) - - rm(data) - gc() - - if (chunks[['lon']] > 1) { - res_lon$mod <- MergeArrays(res_lon$mod, res$mod, 'lon') - res_lon$obs <- MergeArrays(res_lon$obs, res$obs, 'lon') - } else { - res_lon <- res - } - rm(res) - gc() - } - chunk_counter <- chunk_counter + 1 - lon_index <- lon_index + lons_per_chunk - } - tabs <- tabs - 2 - suite <- add_line(suite, paste0('endfamily'), tabs) - if (!on_cluster) { - if (chunks[['lat']] > 1) { - res_lat$mod <- MergeArrays(res_lat$mod, res_lon$mod, 'lat') - res_lat$obs <- MergeArrays(res_lat$obs, res_lon$obs, 'lat') - } else { - res_lat <- res_lon - } - rm(res_lon) - gc() - } - lat_index <- lat_index + lats_per_chunk - } - tabs <- tabs - 2 - suite <- add_line(suite, paste0('endfamily'), tabs) - if (!on_cluster) { - if (chunks[['ftime']] > 1) { - res_ftime$mod <- MergeArrays(res_ftime$mod, res_lat$mod, 'ftime') - res_ftime$obs <- MergeArrays(res_ftime$obs, res_lat$obs, 'ftime') - } else { - res_ftime <- res_lat - } - rm(res_lat) - gc() - } - ftime_index <- ftime_index + ftimes_per_chunk - } - tabs <- tabs - 2 - suite <- add_line(suite, paste0('endfamily'), tabs) - if (!on_cluster) { - if (chunks[['sdate']] > 1) { - res_sdate$mod <- MergeArrays(res_sdate$mod, res_ftime$mod, 'sdate') - res_sdate$obs <- MergeArrays(res_sdate$obs, res_ftime$obs, 'sdate') - } else { - res_sdate <- res_ftime - } - rm(res_ftime) - gc() - } - sdate_index <- sdate_index + sdates_per_chunk - } - tabs <- tabs - 2 - suite <- add_line(suite, paste0('endfamily'), tabs) - if (!on_cluster) { - if (chunks[['member']] > 1) { - res_member$mod <- MergeArrays(res_member$mod, res_sdate$mod, 'member') - res_member$obs <- MergeArrays(res_member$obs, res_sdate$obs, 'member') - } else { - res_member <- res_sdate - } - rm(res_sdate) - gc() - } - member_index <- member_index + members_per_chunk - } - tabs <- tabs - 2 - suite <- add_line(suite, paste0('endfamily'), tabs) - if (!on_cluster) { - if (chunks[['dataset']] > 1) { - result$mod <- MergeArrays(result$mod, res_member$mod, 'dataset') - result$obs <- MergeArrays(result$obs, res_member$obs, 'dataset') - } else { - result <- res_member - } - rm(res_member) - gc() - } - dataset_index <- dataset_index + datasets_per_chunk - } - - # Close the ecFlow suite - suite <- add_line(suite, "family merge", tabs) - tabs <- tabs + 2 - suite <- add_line(suite, "edit ECF_JOB_CMD 'bash %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'", tabs) - suite <- add_line(suite, "task Merge", tabs) - tabs <- tabs - 2 - suite <- add_line(suite, paste0('endfamily'), tabs) - - tabs <- tabs - 2 - suite <- add_line(suite, "endsuite", tabs) - - # Run ecFlow suite if needed - if (on_cluster) { - suite_file <- paste0(shared_dir, '/s2dv_chunking.def') - suite_file_o <- file(suite_file) - writeLines(suite, suite_file_o) - close(suite_file_o) - system("ecflow_start.sh -p 5678") - system(paste0("ecflow_client --load=", suite_file, ' --port=5678')) - system("ecflow_client --begin=S2DV_CHUNKING --port=5678") - done <- FALSE - while (!done) { - Sys.sleep(1) - status <- system("ecflow_client --get_state=S2DV_CHUNKING --port=5678", - intern = TRUE) - if (any(grepl("suite S2DV_CHUNKING #.* state:complete", status))) { - done <- TRUE - } - } - system("ecflow_client --shutdown --port=5678") - system("ecflow_stop.sh -p 5678") - result <- readRDS(paste0(shared_dir, '/result.Rds')) - file.remove(paste0(shared_dir, '/result.Rds')) - } - - result -} -- GitLab