diff --git a/DESCRIPTION b/DESCRIPTION index a75b2cfedb8c97faf8bc273c812c97f75afb21f6..0a32038e3ad44493fd991f1a5051f741c8bf2f19 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: startR Title: Automatically Retrieve Multidimensional Distributed Data Sets -Version: 2.0.1 +Version: 2.0.3 Authors@R: c( person("BSC-CNS", role = c("aut", "cph")), person("Nicolau", "Manubens", , "nicolau.manubens@bsc.es", role = c("aut")), @@ -9,15 +9,15 @@ Authors@R: c( person("Javier", "Vegas", , "javier.vegas@bsc.es", role = c("ctb")), person("Pierre-Antoine", "Bretonniere", , "pierre-antoine.bretonniere@bsc.es", role = c("ctb")), person("Roberto", "Serrano", , "rsnotivoli@gmal.com", role = c("ctb"))) -Description: Tool to automatically fetch, transform and arrange subsets of multi- - dimensional data sets (collections of files) stored in local and/or remote - file systems or servers, using multicore capabilities where possible. The tool - provides an interface to perceive a collection of data sets as a single large - multidimensional data array, and enables the user to request for automatic +Description: Tool to automatically fetch, transform and arrange subsets of + multi- dimensional data sets (collections of files) stored in local and/or + remote file systems or servers, using multicore capabilities where possible. + The tool provides an interface to perceive a collection of data sets as a single + large multidimensional data array, and enables the user to request for automatic retrieval, processing and arrangement of subsets of the large array. Wrapper functions to add support for custom file formats can be plugged in/out, making - the tool suitable for any research field where large multidimensional data - sets are involved. + the tool suitable for any research field where large multidimensional data sets + are involved. Depends: R (>= 3.2.0) Imports: diff --git a/NEWS.md b/NEWS.md index c348ddaa134d2f9e18c08b6eb97f37ef29fa4ce2..295a972491a5f741f822b6e7c49f705a8e5e7f3d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +# startR v2.0.1 (Release date: 2020-09-10) +- /dev/shm automatic cleaning on Compute() # startR v2.0.1 (Release date: 2020-08-25) - Bugfix for the function .chunk(). Its name was chunk() before v2.0.0, and there are two parts were not renamed to .chunk() in v2.0.0. diff --git a/R/ByChunks.R b/R/ByChunks.R index 5f0bba5783974ee912825b25c2449e26893d5f72..dd101120d1b394ba8cae6eca7f8bf1cb43688f8f 100644 --- a/R/ByChunks.R +++ b/R/ByChunks.R @@ -551,6 +551,8 @@ ByChunks <- function(step_fun, cube_headers, ..., chunks = 'auto', ecflow_suite_dir_suite) file.copy(system.file('chunking/tail.h', package = 'startR'), ecflow_suite_dir_suite) + #file.copy(system.file('chunking/clean_devshm.sh', package = 'startR'), + # ecflow_suite_dir_suite) } add_line <- function(suite, line, tabs) { diff --git a/R/Start.R b/R/Start.R index 8243fda0d5826a1da7ba09e65b0524dab2ecac4e..fdcffc43b51ced965fa09f2b707432c9da299711 100644 --- a/R/Start.R +++ b/R/Start.R @@ -686,6 +686,9 @@ #' multiple involved files in a call to Start(). If set to NULL, #' takes the number of available cores (as detected by detectCores() in #' the package 'future'). The default value is 1 (no parallel execution). +#'@param ObjectBigmemory a character string to be included as part of the +#' bigmemory object name. This parameter is thought to be used internally by the +#' chunking capabilities of startR. #'@param silent A logical value of whether to display progress messages (FALSE) #' or not (TRUE). The default value is FALSE. #'@param debug A logical value of whether to return detailed messages on the @@ -807,6 +810,7 @@ Start <- function(..., # dim = indices/selectors, path_glob_permissive = FALSE, retrieve = FALSE, num_procs = 1, + ObjectBigmemory = NULL, silent = FALSE, debug = FALSE) { #, config_file = NULL #dictionary_dim_names = , @@ -3715,8 +3719,24 @@ Start <- function(..., # dim = indices/selectors, # TODO: try performance of storing all in cols instead of rows # Create the shared memory array, and a pointer to it, to be sent # to the work pieces. - data_array <- bigmemory::big.matrix(nrow = prod(final_dims), ncol = 1) + if (is.null(ObjectBigmemory)) { + data_array <- bigmemory::big.matrix(nrow = prod(final_dims), ncol = 1) + } else { + data_array <- bigmemory::big.matrix(nrow = prod(final_dims), ncol = 1, + backingfile = ObjectBigmemory) + } shared_matrix_pointer <- bigmemory::describe(data_array) + if (is.null(ObjectBigmemory)) { + name_bigmemory_obj <- attr(shared_matrix_pointer, 'description')$sharedName + } else { + name_bigmemory_obj <- attr(shared_matrix_pointer, 'description')$filename + } + + #warning(paste("SharedName:", attr(shared_matrix_pointer, 'description')$sharedName)) + #warning(paste("Filename:", attr(shared_matrix_pointer, 'description')$filename)) + #if (!is.null(ObjectBigmemory)) { + # attr(shared_matrix_pointer, 'description')$sharedName <- ObjectBigmemory + #} if (is.null(num_procs)) { num_procs <- future::availableCores() } @@ -4238,7 +4258,8 @@ Start <- function(..., # dim = indices/selectors, Files = array_of_files_to_load, NotFoundFiles = array_of_not_found_files, FileSelectors = file_selectors, - PatternDim = found_pattern_dim) + PatternDim = found_pattern_dim, + ObjectBigmemory = name_bigmemory_obj) #attr(shared_matrix_pointer, 'description')$sharedName) ) attr(data_array, 'class') <- c('startR_array', attr(data_array, 'class')) data_array @@ -4281,6 +4302,7 @@ Start <- function(..., # dim = indices/selectors, file_data_reader, synonims, transform, transform_params, silent = FALSE, debug = FALSE) { + #warning(attr(shared_matrix_pointer, 'description')$sharedName) # suppressPackageStartupMessages({library(bigmemory)}) ### TODO: Specify dependencies as parameter # suppressPackageStartupMessages({library(ncdf4)}) diff --git a/inst/chunking/Chunk.ecf b/inst/chunking/Chunk.ecf index 96b7645e54fadeca3fadcb1ef56c90869313e834..60bd051a657d28ef957876c28c8ab2a45686f579 100644 --- a/inst/chunking/Chunk.ecf +++ b/inst/chunking/Chunk.ecf @@ -15,4 +15,7 @@ task_path=%REMOTE_ECF_HOME%/%ECF_NAME% Rscript load_process_save_chunk.R --args $task_path insert_indices #include_transfer_back_and_rm +#clean temporal folder +#bash %REMOTE_ECF_HOME%clean_devshm.sh $task_path + %include "./tail.h" diff --git a/inst/chunking/clean_devshm.sh b/inst/chunking/clean_devshm.sh new file mode 100644 index 0000000000000000000000000000000000000000..a2f317ba5a4171da4c37936832fcbe092438d4e8 --- /dev/null +++ b/inst/chunking/clean_devshm.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Take the filename +path=$1 +name=.filename.txt +remote=$1$name +echo "$remote" +while IFS= read -r line +do + echo "$line" + parti='/dev/shm/' + filename=$parti$line +# Check the file is exists or not +echo "$filename" +if [[ -f $filename ]] +then + # Remove  the file + rm "$filename" +else + echo File does not exist +fi +done < "$remote" diff --git a/inst/chunking/load_process_save_chunk.R b/inst/chunking/load_process_save_chunk.R index e68b8699ad8309d938431d5d82f5fc6f30fdbb74..49a94a95ff2cfb0e3e9234d6fa94fd874f8200b8 100644 --- a/inst/chunking/load_process_save_chunk.R +++ b/inst/chunking/load_process_save_chunk.R @@ -70,7 +70,33 @@ for (input in 1:length(data)) { if (!('num_procs' %in% names(start_call))) { start_call[['num_procs']] <- threads_load } - data[[input]] <- eval(start_call) + # Creates a name for the temporal file using the chunks numbers: + nameMemoryObject <- gsub("[^0-9.-]", "_", gsub(out_dir, "", task_path)) + nameMemoryObject <- substr(nameMemoryObject, 2, nchar(nameMemoryObject)) + removeRS <- function(str) paste(rle(strsplit(str, "")[[1]])$values, collapse = "") + nameMemoryObject <- removeRS(nameMemoryObject) + start_call[['ObjectBigmemory']] <- nameMemoryObject + data[[input]] <- tryCatch(eval(start_call), + # Handler when an error occurs: + error = function(e) { + message(paste("The data cannot be loaded.")) + message("See the original error message:") + message(e) + message("\n Current files in /dev/shm:") + noreturn <- lapply(list.files("/dev/shm"), function (x) { + info <- file.info(paste0("/dev/shm/", x)) + message(paste("file:", rownames(info), + "size:", info$size, + "uname:", info$uname))}) + message(getwd()) + file.remove(nameMemoryObject) + file.remove(paste0(nameMemoryObject, ".desc")) + message(paste("Files", nameMemoryObject, "has been removed.")) + }) + warning(attributes(data[[input]])$ObjectBigmemory) + #write.table(attributes(data[[input]])$ObjectBigmemory, + # file = paste0(task_path, '.filename.txt'), + # col.names = FALSE, row.names = FALSE, quote = FALSE) } t_end_load <- Sys.time() t_load <- as.numeric(difftime(t_end_load, t_begin_load, units = 'secs')) diff --git a/man/Start.Rd b/man/Start.Rd index c41c9619f4489bb9f6655b4e3203cfb4616c532d..76510ad20e6cc88d38226dba126e0692fec814de 100644 --- a/man/Start.Rd +++ b/man/Start.Rd @@ -13,7 +13,7 @@ Start(..., return_vars = NULL, synonims = NULL, file_opener = NcOpener, selector_checker = SelectorChecker, merge_across_dims = FALSE, merge_across_dims_narm = FALSE, split_multiselected_dims = FALSE, path_glob_permissive = FALSE, retrieve = FALSE, num_procs = 1, - silent = FALSE, debug = FALSE) + ObjectBigmemory = NULL, silent = FALSE, debug = FALSE) } \arguments{ \item{return_vars}{A named list where the names are the names of the @@ -378,6 +378,10 @@ multiple involved files in a call to Start(). If set to NULL, takes the number of available cores (as detected by detectCores() in the package 'future'). The default value is 1 (no parallel execution).} +\item{ObjectBigmemory}{a character string to be included as part of the +bigmemory object name. This parameter is thought to be used internally by the +chunking capabilities of startR.} + \item{silent}{A logical value of whether to display progress messages (FALSE) or not (TRUE). The default value is FALSE.}