pax_global_header 0000666 0000000 0000000 00000000064 13424417765 0014527 g ustar 00root root 0000000 0000000 52 comment=a51e0764f8205aaeb065fae098618c23fb122092
startR-develop-explore-enh/ 0000775 0000000 0000000 00000000000 13424417765 0016232 5 ustar 00root root 0000000 0000000 startR-develop-explore-enh/.Rbuildignore 0000664 0000000 0000000 00000000043 13424417765 0020655 0 ustar 00root root 0000000 0000000 .git
.gitignore
.tar.gz
.pdf
./.nc
startR-develop-explore-enh/.gitignore 0000664 0000000 0000000 00000000252 13424417765 0020221 0 ustar 00root root 0000000 0000000 .*.sw*
./*.nc
*.tar.gz
*.Rcheck
.Rd2pdf*
*~
*.Rhistory
\#*\#
build_output.txt
check_output.txt
checkout_output.txt
merge_output.txt
master_pull.txt
*.eps
*.ps
Rplots.pdf
startR-develop-explore-enh/DESCRIPTION 0000664 0000000 0000000 00000002622 13424417765 0017742 0 ustar 00root root 0000000 0000000 Package: startR
Title: Automatically Retrieve Multidimensional Distributed Data Sets
Version: 0.1.0
Authors@R: c(
person("BSC-CNS", role = c("aut", "cph")),
person("Nicolau", "Manubens", , "nicolau.manubens@bsc.es", role = c("aut", "cre")),
person("Javier", "Vegas", , "javier.vegas@bsc.es", role = c("ctb")),
person("Pierre-Antoine", "Bretonniere", , "pierre-antoine.bretonniere@bsc.es", role = c("ctb")),
person("Roberto", "Serrano", , "rsnotivoli@gmal.com", role = c("ctb")))
Description: Tool to automatically fetch, transform and arrange subsets of multidimensional data sets (collections of files) stored in local and/or remote file systems or servers, using multicore capabilities where possible. The tool provides an interface to perceive a collection of data sets as a single large multidimensional data array, and enables the user to request for automatic retrieval, processing and arrangement of subsets of the large array. Wrapper functions to add support for custom file formats can be plugged in/out, making the tool suitable for any research field where large multidimensional data sets are involved.
Depends:
R (>= 3.2.0)
Imports:
abind,
bigmemory,
future,
multiApply (>= 2.1.1),
parallel
Suggests:
easyNCDF,
s2dverification
License: LGPL-3
URL: https://earth.bsc.es/gitlab/es/startR/
BugReports: https://earth.bsc.es/gitlab/es/startR/issues
LazyData: true
startR-develop-explore-enh/NAMESPACE 0000664 0000000 0000000 00000000122 13424417765 0017444 0 ustar 00root root 0000000 0000000 exportPattern("^[^\\.]")
import(stats, utils, abind, bigmemory, future, parallel)
startR-develop-explore-enh/R/ 0000775 0000000 0000000 00000000000 13424417765 0016433 5 ustar 00root root 0000000 0000000 startR-develop-explore-enh/R/AddStep.R 0000664 0000000 0000000 00000007244 13424417765 0020111 0 ustar 00root root 0000000 0000000 AddStep <- function(inputs, step_fun, ...) {
# Check step_fun
if (!('startR_step_fun' %in% class(step_fun))) {
stop("Parameter 'step_fun' must be a startR step function as returned by Step.")
}
# Check inputs
if (any(c('startR_header', 'startR_workflow') %in% class(inputs))) {
inputs <- list(inputs)
names(inputs) <- 'input1'
}
if (is.list(inputs)) {
if (any(!sapply(inputs,
function(x) any(c('startR_header',
'startR_workflow') %in% class(x))))) {
stop("Parameter 'inputs' must be one or a list of objects of the class ",
"'startR_header' or 'startR_workflow'.")
}
} else {
stop("Parameter 'inputs' must be one or a list of objects of the class ",
"'startR_header' or 'startR_workflow'.")
}
# Consistency checks
if (length(inputs) != length(attr(step_fun, 'TargetDims'))) {
stop("The number of provided 'inputs' (", length(inputs), ") does not ",
"match the number of expected inputs by the provided 'step_fun' (",
length(attr(step_fun, 'TargetDims')), ").")
}
# Work out the total target dims of the step
previous_target_dims <- NULL
all_input_dims <- NULL
for (input in 1:length(inputs)) {
dims_to_compare <- names(attr(inputs[[input]], 'Dimensions'))
if (!all(attr(step_fun, 'TargetDims')[[input]] %in% dims_to_compare)) {
stop("The target dimensions required by 'step_fun' for the input ", input,
" are not present in the corresponding provided object in 'inputs'.")
}
if ('startR_workflow' %in% class(inputs[[input]])) {
if (is.null(previous_target_dims)) {
previous_target_dims <- attr(inputs[[input]], 'TargetDims')
} else {
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(attr(inputs[[input]], 'TargetDims')))
names(dims2) <- attr(inputs[[input]], 'TargetDims')
previous_target_dims <- names(startR:::.MergeArrayDims(dims1, dims2)[[1]])
}
}
new_input_dims <- attr(inputs[[input]], 'Dimensions')
if (any(is.na(new_input_dims))) {
new_input_dims[which(is.na(new_input_dims))] <- rep(1, length(which(is.na(new_input_dims))))
}
if (is.null(all_input_dims)) {
all_input_dims <- new_input_dims
} else {
all_input_dims <- startR:::.MergeArrayDims(all_input_dims, new_input_dims)[[1]]
}
}
new_target_dims <- unique(unlist(attr(step_fun, 'TargetDims')))
result <- list()
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(new_target_dims))
names(dims2) <- new_target_dims
target_dims <- names(startR:::.MergeArrayDims(dims1, dims2)[[1]])
for (output in 1:length(attr(step_fun, 'OutputDims'))) {
workflow <- list(inputs = inputs,
fun = step_fun,
params = list(...))
if (!is.null(attr(step_fun, 'OutputDims')[[output]])) {
dimensions <- rep(NA, length(attr(step_fun, 'OutputDims')[[output]]))
names(dimensions) <- attr(step_fun, 'OutputDims')[[output]]
} else {
dimensions <- NULL
}
in_dims_to_remove <- which(names(all_input_dims) %in% new_target_dims)
if (length(in_dims_to_remove) > 0) {
dimensions <- c(dimensions, all_input_dims[-in_dims_to_remove])
} else {
dimensions <- c(dimensions, all_input_dims)
}
attr(workflow, 'Dimensions') <- dimensions
attr(workflow, 'AllTargetDims') <- target_dims
class(workflow) <- 'startR_workflow'
result[[names(attr(step_fun, 'OutputDims'))[output]]] <- workflow
}
if (length(result) == 1) {
result[[1]]
} else {
result
}
}
startR-develop-explore-enh/R/ByChunks.R 0000664 0000000 0000000 00000117472 13424417765 0020320 0 ustar 00root root 0000000 0000000 ByChunks <- function(step_fun, cube_headers, ..., chunks = 'auto',
threads_load = 2, threads_compute = 1,
cluster = NULL,
ecflow_suite_dir = NULL,
ecflow_server = NULL,
silent = FALSE, debug = FALSE,
wait = TRUE) {
# Build object to store profiling timings
t_begin_total <- Sys.time()
t_begin_bychunks_setup <- t_begin_total
timings <- list(nchunks = NULL,
concurrent_chunks = NULL,
cores_per_job = NULL,
threads_load = NULL,
threads_compute = NULL,
bychunks_setup = NULL,
transfer = NULL,
queue = NULL,
job_setup = NULL,
load = NULL,
compute = NULL,
transfer_back = NULL,
merge = NULL,
total = NULL)
MergeArrays <- startR:::.MergeArrays
# Check input headers
if ('startR_header' %in% class(cube_headers)) {
cube_headers <- list(cube_headers)
}
if (!all(sapply(lapply(cube_headers, class),
function(x) 'startR_header' %in% x))) {
stop("All objects passed in 'cube_headers' must be of class 'startR_header', ",
"as returned by Start().")
}
# Check step_fun
if (!is.function(step_fun)) {
stop("Parameter 'step_fun' must be a function.")
}
# Check cores
if (!is.numeric(threads_load)) {
stop("Parameter 'threads_load' must be a numeric value.")
}
threads_load <- round(threads_load)
if (!is.numeric(threads_compute)) {
stop("Parameter 'threads_compute' must be a numeric value.")
}
threads_compute <- round(threads_compute)
timings[['threads_load']] <- threads_load
timings[['threads_compute']] <- threads_compute
on_cluster <- !is.null(cluster)
# Check ecflow_suite_dir
suite_id <- sample(10 ^ 10, 1)
ecflow_suite_dir_suite <- ''
if (on_cluster) {
if (is.null(ecflow_suite_dir)) {
stop("Parameter 'ecflow_suite_dir' must be specified when dispatching on a cluster.")
}
if (!is.character(ecflow_suite_dir)) {
stop("Parameter 'ecflow_suite_dir' must be a character string.")
}
ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_', suite_id, '/')
dir.create(ecflow_suite_dir_suite, recursive = TRUE)
if (!dir.exists(ecflow_suite_dir_suite)) {
stop("Could not find or create the directory in ",
"parameter 'ecflow_suite_dir'.")
}
}
# Check cluster
default_cluster <- list(queue_host = NULL,
queue_type = 'slurm',
data_dir = NULL,
temp_dir = NULL,
lib_dir = NULL,
init_commands = list(''),
r_module = 'R',
ecflow_module = 'ecFlow',
node_memory = NULL,
cores_per_job = NULL,
job_wallclock = '01:00:00',
max_jobs = 6,
extra_queue_params = list(''),
bidirectional = TRUE,
polling_period = 10,
special_setup = 'none')
if (on_cluster) {
if (!is.list(cluster)) {
stop("Parameter 'cluster' must be a named list.")
}
if (is.null(names(cluster))) {
stop("Parameter 'cluster' must be a named list.")
}
if (any(!(names(cluster) %in% c('queue_host', 'queue_type', 'data_dir',
'temp_dir', 'lib_dir', 'init_commands',
'r_module', 'ecflow_module', 'node_memory',
'cores_per_job', 'job_wallclock', 'max_jobs',
'extra_queue_params', 'bidirectional',
'polling_period', 'special_setup')))) {
stop("Found invalid component names in parameter 'cluster'.")
}
default_cluster[names(cluster)] <- cluster
}
localhost_name <- NULL
cluster <- default_cluster
remote_ecflow_suite_dir <- ecflow_suite_dir
is_data_dir_shared <- FALSE
is_ecflow_suite_dir_shared <- FALSE
if (on_cluster) {
#localhost_name <- Sys.info()[['nodename']]
localhost_name <- system('hostname -f', intern = TRUE)
if (Sys.which('ecflow_client') == '') {
stop("ecFlow must be installed in order to run the computation on clusters.")
}
if (is.null(cluster[['queue_host']])) {
queue_host <- localhost_name
} else if ((cluster[['queue_host']] %in% c('localhost', '127.0.0.1', localhost_name)) ||
grepl(paste0('^', localhost_name), cluster[['queue_host']])) {
queue_host <- localhost_name
}
if (!(cluster[['queue_type']] %in% c('slurm', 'pbs', 'lsf', 'host'))) {
stop("The only supported 'queue_type's are 'slurm', 'pbs', 'lsf' and 'host'.")
}
if (is.null(cluster[['data_dir']])) {
is_data_dir_shared <- TRUE
} else {
if (!is.character(cluster[['data_dir']])) {
stop("The component 'data_dir' of the parameter 'cluster' must be a character string.")
}
remote_data_dir <- cluster[['data_dir']]
}
if (is.null(cluster[['temp_dir']])) {
is_ecflow_suite_dir_shared <- TRUE
} else {
if (!is.character(cluster[['temp_dir']])) {
stop("The component 'temp_dir' of the parameter 'cluster' must be a character string.")
}
remote_ecflow_suite_dir <- cluster[['temp_dir']]
}
if (!is.null(cluster[['lib_dir']])) {
if (!is.character(cluster[['lib_dir']])) {
stop("The component 'lib_dir', of the parameter 'cluster' must be NULL or ",
"a character string.")
}
}
if (!is.logical(cluster[['bidirectional']])) {
stop("The component 'bidirectional' of the parameter 'cluster' must be a logical value.")
}
if (cluster[['bidirectional']]) {
cluster[['init_commands']] <- c(cluster[['init_commands']],
list(paste('module load', cluster[['ecflow_module']])))
}
if (!is.list(cluster[['init_commands']]) ||
!all(sapply(cluster[['init_commands']], is.character))) {
stop("The component 'init_commands' of the parameter 'cluster' must be a list of ",
"character strings.")
}
if (!is.character(cluster[['r_module']])) {
stop("The component 'r_module' of the parameter 'cluster' must be a character string.")
}
if ((nchar(cluster[['r_module']]) < 1) || (grepl(' ', cluster[['r_module']]))) {
stop("The component 'r_module' of the parameter 'cluster' must have at least one character ",
"and contain no blank spaces.")
}
if (!is.character(cluster[['ecflow_module']])) {
stop("The component 'ecflow_module' of the parameter 'cluster' must be a character string.")
}
if ((nchar(cluster[['ecflow_module']]) < 1) ||
(grepl(' ', cluster[['ecflow_module']]))) {
stop("The component 'ecflow_module' of the parameter 'cluster' must have at least ",
"one character, and contain no blank spaces.")
}
if (is.null(cluster[['cores_per_job']])) {
cluster[['cores_per_job']] <- threads_compute
}
if (!is.numeric(cluster[['cores_per_job']])) {
stop("The component 'cores_per_job' of the parameter 'cluster' must be numeric.")
}
cluster[['cores_per_job']] <- round(cluster[['cores_per_job']])
if (cluster[['cores_per_job']] > threads_compute) {
startR:::.message("WARNING: 'threads_compute' should be >= cluster[['cores_per_job']].")
}
if (!is.list(cluster[['extra_queue_params']]) ||
!all(sapply(cluster[['extra_queue_params']], is.character))) {
stop("The component 'extra_queue_params' of the parameter 'cluster' must be a list of ",
"character strings.")
}
if (!(cluster[['special_setup']] %in% c('none', 'marenostrum4'))) {
stop("The value provided for the component 'special_setup' of the parameter ",
"'cluster' is not recognized.")
}
}
# Check ecflow_suite_dir
remote_ecflow_suite_dir_suite <- ''
if (on_cluster) {
remote_ecflow_suite_dir_suite <- paste0(remote_ecflow_suite_dir, '/STARTR_CHUNKING_', suite_id, '/')
}
# Check ecflow_server
if (!is.null(ecflow_server) && !(is.character(ecflow_server))) {
stop("Parameter 'ecflow_server' must be a character string if specified.")
}
# Check silent
if (!is.logical(silent)) {
stop("Parameter 'silent' must be logical.")
}
# Check debug
if (!is.logical(debug)) {
stop("Parameter 'debug' must be logical.")
}
if (silent) {
debug <- FALSE
}
# Check wait
if (!is.logical(wait)) {
stop("Parameter 'wait' must be logical.")
}
# Work out chunked dimensions and target dimensions
all_dims <- lapply(cube_headers, attr, 'Dimensions')
all_dims_merged <- NULL
for (i in all_dims) {
if (is.null(all_dims_merged)) {
all_dims_merged <- i
} else {
all_dims_merged <- startR:::.MergeArrayDims(all_dims_merged, i)[[1]]
}
}
all_dimnames <- names(all_dims_merged)
target_dims_indices <- which(all_dimnames %in% unlist(attr(step_fun, 'TargetDims')))
target_dims <- NULL
if (length(target_dims_indices) > 0) {
target_dims <- all_dimnames[target_dims_indices]
}
chunked_dims <- all_dimnames
if (length(target_dims_indices) > 0) {
chunked_dims <- chunked_dims[-target_dims_indices]
}
if (length(chunked_dims) < 1) {
stop("Not possible to process input by chunks. All input dimensions are ",
"target dimensions.")
}
if (length(cube_headers) != length(attr(step_fun, 'TargetDims'))) {
stop("Number of inputs in parameter 'cube_headers' must be equal to the ",
"number of inputs expected by the function 'step_fun'.")
}
# Check all input headers have matching dimensions
cube_index <- 1
for (cube_header in cube_headers) {
if (!all(attr(cube_header, 'Dimensions') == all_dims_merged[names(attr(cube_header, 'Dimensions'))])) {
stop("All provided 'cube_headers' must have matching dimension lengths ",
"with each other.")
}
if (!all(attr(step_fun, 'TargetDims')[[cube_index]] %in% names(attr(cube_header, 'Dimensions')))) {
stop("All provided 'cube_headers' must contain at least the target dimensions ",
"expected by 'step_fun'.")
}
cube_index <- cube_index + 1
# work out expected result dimensions
}
# Check chunks
default_chunks <- as.list(rep(1, length(chunked_dims)))
names(default_chunks) <- chunked_dims
if (length(chunks) == 1 && chunks == 'auto') {
chunks <- default_chunks
}
if (!is.list(chunks)) {
stop("Parameter 'chunks' must be a named list or 'auto'.")
}
if (is.null(names(chunks))) {
stop("Parameter 'chunks' must be a named list or 'auto'.")
}
if (any(!(names(chunks) %in% chunked_dims))) {
stop("All names in parameter 'chunks' must be one of the non-target dimensions ",
"present in the cubes in 'cube_headers'. The target dimensions are ",
paste(paste0("'", target_dims, "'"), collapse = ', '), ". The non-target ",
"dimensions (margins) are ", paste(paste0("'", chunked_dims, "'"), collapse = ', '), ".")
}
if (any(!(((unlist(chunks) %% 1) == 0) | (unlist(chunks) == 'all')))) {
stop("All values in parameter 'chunks' must take a numeric value or 'all'.")
}
if (any(unlist(chunks) < 1)) {
stop("All values in parameter 'chunks' must be >= 1.")
}
for (chunk_spec in 1:length(chunks)) {
if (chunks[[chunk_spec]] > all_dims_merged[names(chunks)[chunk_spec]]) {
stop("Too many chunks requested for the dimension ", names(chunks)[chunk_spec],
". Maximum allowed is ", all_dims_merged[names(chunks)[chunk_spec]])
}
}
default_chunks[names(chunks)] <- chunks
chunks <- default_chunks
timings[['nchunks']] <- prod(unlist(chunks))
# Check step_fun
if (!('startR_step_fun' %in% class(step_fun))) {
stop("Parameter 'step_fun' must be of the class 'startR_step_fun', as returned ",
"by the function Step.")
}
# Replace 'all's
chunks_all <- which(unlist(chunks) == 'all')
if (length(chunks_all) > 0) {
chunks[chunks_all] <- all_dims[names(chunks)[chunks_all]]
}
# Mount the ecFlow suite
if (on_cluster) {
.message(paste0("ATTENTION: Dispatching chunks on a remote cluster",
". Make sure passwordless ",
"access is properly set in both directions."))
# Copy load_process_save_chunk.R into shared folder
chunk_script <- file(system.file('chunking/load_process_save_chunk.R',
package = 'startR'))
chunk_script_lines <- readLines(chunk_script)
close(chunk_script)
chunk_script_lines <- gsub('^lib_dir <- *', paste0('lib_dir <- ',
paste(deparse(cluster[['lib_dir']]), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^out_dir <- *', paste0('out_dir <- ',
paste(deparse(remote_ecflow_suite_dir_suite), collapse = '\n')), chunk_script_lines)
chunk_script_lines <- gsub('^debug <- *', paste0('debug <- ', paste(deparse(debug), collapse = '\n')),
chunk_script_lines)
deparsed_calls <- paste0('start_calls <- list(')
extra_path <- ''
if (cluster[['special_setup']] == 'marenostrum4') {
extra_path <- '/gpfs/archive/bsc32/'
}
for (cube_header in 1:length(cube_headers)) {
pattern_dim <- attr(cube_headers[[cube_header]], 'PatternDim')
bk_pattern_dim <- cube_headers[[cube_header]][[pattern_dim]]
bk_expected_files <- attr(cube_headers[[cube_header]], 'ExpectedFiles')
if (!is_data_dir_shared) {
cube_headers[[cube_header]][[pattern_dim]] <- paste0(remote_data_dir, '/',
extra_path, '/', cube_headers[[cube_header]][[pattern_dim]])
for (file_n in 1:length(bk_expected_files)) {
attr(cube_headers[[cube_header]], 'ExpectedFiles')[file_n] <- paste0(remote_data_dir, '/',
extra_path, '/', attr(cube_headers[[cube_header]], 'ExpectedFiles')[file_n])
}
}
deparsed_calls <- paste0(deparsed_calls, '\nquote(',
paste(deparse(cube_headers[[cube_header]]), collapse = '\n'),
')')
cube_headers[[cube_header]][[pattern_dim]] <- bk_pattern_dim
attr(cube_headers[[cube_header]], 'ExpectedFiles') <- bk_expected_files
if (cube_header < length(cube_headers)) {
deparsed_calls <- paste0(deparsed_calls, ', ')
}
}
deparsed_calls <- paste0(deparsed_calls, '\n)')
chunk_script_lines <- gsub('^start_calls <- *', deparsed_calls, chunk_script_lines)
chunk_script_lines <- gsub('^start_calls_attrs <- *', paste0('start_calls_attrs <- ', paste(deparse(lapply(cube_headers, attributes)), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^param_dimnames <- *', paste0('param_dimnames <- ', paste(deparse(chunked_dims), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^threads_load <- *', paste0('threads_load <- ', threads_load),
chunk_script_lines)
chunk_script_lines <- gsub('^threads_compute <- *', paste0('threads_compute <- ', threads_compute),
chunk_script_lines)
chunk_script_lines <- gsub('^fun <- *', paste0('fun <- ', paste(deparse(step_fun), collapse = '\n')),
chunk_script_lines)
chunk_script_lines <- gsub('^params <- *', paste0('params <- ', paste(deparse(list(...)), collapse = '\n')),
chunk_script_lines)
writeLines(chunk_script_lines, paste0(ecflow_suite_dir_suite, '/load_process_save_chunk.R'))
# Copy Chunk.ecf into shared folder
chunk_ecf_script <- file(system.file('chunking/Chunk.ecf',
package = 'startR'))
chunk_ecf_script_lines <- readLines(chunk_ecf_script)
close(chunk_ecf_script)
if (cluster[['queue_type']] == 'host') {
chunk_ecf_script_lines <- gsub('^include_queue_header',
'',
chunk_ecf_script_lines)
} else {
chunk_ecf_script_lines <- gsub('^include_queue_header',
paste0('%include "./', cluster[['queue_type']], '.h"'),
chunk_ecf_script_lines)
}
chunk_ecf_script_lines <- gsub('^include_init_commands',
paste0(paste0(cluster[['init_commands']], collapse = '\n'), '\n'),
chunk_ecf_script_lines)
chunk_ecf_script_lines <- gsub('^include_module_load',
paste0('module load ', cluster[['r_module']]),
chunk_ecf_script_lines)
ecf_vars <- paste0('%', as.vector(sapply(chunked_dims,
function(x) {
c(toupper(x), paste0(toupper(x), '_N'))
})), '%')
# if (!is_ecflow_suite_dir_shared && (cluster[['queue_host']] != localhost_name)) {
# #transfer_back_line <- paste0('rsync -rav %REMOTE_ECF_HOME% ', localhost_name,
# # ':%ECF_HOME%\nrm -f %ECF_HOME%/',
# # paste0('*', paste(ecf_vars[((1:(length(ecf_vars) / 2)) * 2) - 1], collapse = '*'), '*.Rds'))
result_file_id <- paste0('*',
paste(paste0('_', ecf_vars[((1:(length(ecf_vars) / 2)) * 2) - 1], '__'),
collapse = '*'), '*')
# transfer_back_line <- paste0('rsync -rav %REMOTE_ECF_HOME%/%SUITE%/ ',
# localhost_name,
# ':%ECF_HOME%/%SUITE%/\nscp %REMOTE_ECF_HOME%/',
# result_file_id, ' ', localhost_name,
# ':%ECF_HOME%\nrm -f %REMOTE_ECF_HOME%/',
# result_file_id)
# } else {
# transfer_back_line <- ''
# }
chunk_ecf_script_lines <- gsub('^Rscript load_process_save_chunk.R --args \\$task_path insert_indices',
paste0('Rscript load_process_save_chunk.R --args $task_path ', paste(ecf_vars, collapse = ' ')),
chunk_ecf_script_lines)
#chunk_ecf_script_lines <- gsub('^include_transfer_back_and_rm', transfer_back_line, chunk_ecf_script_lines)
writeLines(chunk_ecf_script_lines, paste0(ecflow_suite_dir_suite, '/Chunk.ecf'))
# Copy merge_chunks.R into tmp folder
# merge_script <- file(system.file('chunking/merge_chunks.R',
# package = 'startR'))
# merge_script_lines <- readLines(merge_script)
# close(merge_script)
# merge_script_lines <- gsub('^shared_dir <- *', paste0('shared_dir <- ',
# paste(deparse(shared_dir_suite), collapse = '\n')), merge_script_lines)
# writeLines(merge_script_lines, paste0(shared_dir_suite, '/merge_chunks.R'))
# Copy Merge.ecf into tmp folder
#TODO: Modify chain of parameters sent to r script when merging
#chunks progressively
# merge_ecf_script <- file(system.file('chunking/Merge.ecf',
# package = 'startR'))
# merge_ecf_script_lines <- readLines(merge_ecf_script)
# close(merge_ecf_script)
# writeLines(merge_ecf_script_lines, paste0(shared_dir_suite, '/Merge.ecf'))
# Copy queue header into shared folder
#file.copy(system.file(paste0('chunking/', cluster[['queue_type']], '.h'), package = 'startR'),
# ecflow_suite_dir_suite)
chunk_queue_header <- file(system.file(paste0('chunking/', cluster[['queue_type']], '.h'), package = 'startR'))
chunk_queue_header_lines <- readLines(chunk_queue_header)
close(chunk_queue_header)
chunk_queue_header_lines <- gsub('^include_extra_queue_params',
paste0(paste0(cluster[['extra_queue_params']], collapse = '\n'), '\n'),
chunk_queue_header_lines)
writeLines(chunk_queue_header_lines, paste0(ecflow_suite_dir_suite, '/', cluster[['queue_type']], '.h'))
# Copy headers
file.copy(system.file('chunking/head.h', package = 'startR'),
ecflow_suite_dir_suite)
file.copy(system.file('chunking/tail.h', package = 'startR'),
ecflow_suite_dir_suite)
}
add_line <- function(suite, line, tabs) {
c(suite, paste0(paste(rep(' ', tabs), collapse = ''), line))
}
suite <- NULL
tabs <- 0
suite <- add_line(suite, paste0('suite STARTR_CHUNKING_', suite_id), tabs)
tabs <- tabs + 2
submit_command <- ''
if (cluster[['queue_type']] == 'slurm') {
submit_command <- 'sbatch'
} else if (cluster[['queue_type']] == 'pbs') {
submit_command <- 'qsub'
} else if (cluster[['queue_type']] == 'lsf') {
submit_command <- 'bsub <'
} else if (cluster[['queue_type']] == 'host') {
submit_command <- 'bash'
}
if (on_cluster) {
suite <- add_line(suite, paste0("edit BIDIRECTIONAL '", cluster[['bidirectional']], "'"), tabs)
suite <- add_line(suite, paste0("edit QUEUE_HOST '", cluster[['queue_host']], "'"), tabs)
suite <- add_line(suite, paste0("edit ECF_HOST '", localhost_name, "'"), tabs)
suite <- add_line(suite, paste0("edit EC_HOST_FULL '", localhost_name, "'"), tabs)
suite <- add_line(suite, paste0("edit RESULT_FILE_ID '", result_file_id, "'"), tabs)
#} else {
# suite <- add_line(suite, paste0("edit ECF_JOB_CMD '", submit_command, " %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'"), tabs)
}
suite <- add_line(suite, paste0("edit ECF_HOME '", ecflow_suite_dir_suite, "'"), tabs)
suite <- add_line(suite, paste0("edit REMOTE_ECF_HOME '", remote_ecflow_suite_dir_suite, "'"), tabs)
suite <- add_line(suite, paste0("edit CORES_PER_JOB ", cluster[['cores_per_job']], ""), tabs)
suite <- add_line(suite, paste0("edit JOB_WALLCLOCK '", cluster[['job_wallclock']], "'"), tabs)
suite <- add_line(suite, paste0("limit max_jobs ", cluster[['max_jobs']]), tabs)
suite <- add_line(suite, paste0("inlimit max_jobs"), tabs)
suite <- add_line(suite, "family computation", tabs)
tabs <- tabs + 2
if (on_cluster) {
# source $HOME/.profile ;
sync_command <- ''
if (!is_ecflow_suite_dir_shared) {
sync_command <- paste0("rsync -rav ",
"%ECF_HOME%/ ",
"%QUEUE_HOST%:%REMOTE_ECF_HOME%/ ; ")
}
suite <- add_line(suite, paste0("edit ECF_JOB_CMD '",
#"mkdir -p %REMOTE_ECF_HOME%/%SUITE%/ ; ",
sync_command,
"ssh %QUEUE_HOST% \"",
"date --rfc-3339=seconds > %REMOTE_ECF_HOME%/%ECF_NAME%.submit_time ; ",
submit_command,
" %REMOTE_ECF_HOME%/%ECF_NAME%.job%ECF_TRYNO% > ",
"%REMOTE_ECF_HOME%/%ECF_NAME%.%ECF_TRYNO% 2>&1 &\" ",
"2>&1'"), tabs)
if (is_ecflow_suite_dir_shared) {
suite <- add_line(suite, paste0("edit REPORT_BACK 'FALSE'"), tabs)
} else {
suite <- add_line(suite, paste0("edit REPORT_BACK 'TRUE'"), tabs)
}
}
# Open nested ecFlow families
for (i in length(chunked_dims):1) {
suite <- add_line(suite, paste0('family ', chunked_dims[i], '_CHUNK_', 1), tabs)
tabs <- tabs + 2
suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), ' ', 1), tabs)
suite <- add_line(suite, paste0('edit ', toupper(chunked_dims[i]), '_N ', chunks[[chunked_dims[i]]]), tabs)
}
# Iterate through chunks
chunk_array <- array(1:prod(unlist(chunks)), dim = (unlist(chunks)))
arrays_of_results <- vector('list', length(attr(step_fun, 'OutputDims')))
names(arrays_of_results) <- names(attr(step_fun, 'OutputDims'))
for (component in 1:length(arrays_of_results)) {
arrays_of_results[[component]] <- vector('list', prod((unlist(chunks))))
dim(arrays_of_results[[component]]) <- (unlist(chunks))
}
if (!on_cluster) {
t_end_bychunks_setup <- Sys.time()
timings[['bychunks_setup']] <- as.numeric(difftime(t_end_bychunks_setup,
t_begin_bychunks_setup, units = 'secs'))
timings[['transfer']] <- 0
timings[['queue']] <- 0
timings[['job_setup']] <- 0
timings[['transfer_back']] <- 0
if (!silent) {
startR:::.message(paste0("Processing chunks... ",
"remaining time estimate soon..."))
}
time_before_first_chunk <- Sys.time()
time_after_first_chunk <- NULL
}
previous_chunk_indices <- rep(1, length(chunks))
found_first_result <- FALSE
for (i in 1:length(chunk_array)) {
chunk_indices <- which(chunk_array == i, arr.ind = TRUE)[1, ]
names(chunk_indices) <- names(dim(chunk_array))
# ADD CHUNK SCRIPT TO SUITE
families_to_jump <- which(chunk_indices != previous_chunk_indices)
if (length(families_to_jump) > 0) {
families_to_jump <- max(families_to_jump)
# Close ecFlow families
for (j in 1:families_to_jump) {
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
}
# Open ecFlow families
for (j in families_to_jump:1) {
suite <- add_line(suite, paste0('family ', (chunked_dims)[j], '_CHUNK_', chunk_indices[j]), tabs)
tabs <- tabs + 2
suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), ' ', chunk_indices[j]), tabs)
suite <- add_line(suite, paste0('edit ', toupper((chunked_dims)[j]), '_N ', chunks[[(chunked_dims)[j]]]), tabs)
}
}
suite <- add_line(suite, "task Chunk", tabs)
if (!on_cluster) {
if (!silent) {
startR:::.message(paste("Loading chunk", i,
"out of", length(chunk_array), "..."))
}
data <- vector('list', length(cube_headers))
t_begin_load <- Sys.time()
for (input in 1:length(data)) {
start_call <- cube_headers[[input]]
dims_to_alter <- which(names(attr(start_call, 'Dimensions')) %in% names(chunks))
names_dims_to_alter <- names(attr(start_call, 'Dimensions'))[dims_to_alter]
# If any dimension comes from split dimensions
split_dims <- attr(start_call, 'SplitDims')
for (k in 1:length(split_dims)) {
if (any(names(split_dims[[k]]) %in% names_dims_to_alter)) {
chunks_split_dims <- rep(1, length(split_dims[[k]]))
names(chunks_split_dims) <- names(split_dims[[k]])
chunks_indices_split_dims <- chunks_split_dims
split_dims_to_alter <- which(names(split_dims[[k]]) %in% names_dims_to_alter)
chunks_split_dims[split_dims_to_alter] <- unlist(chunks[names(split_dims[[k]])[split_dims_to_alter]])
chunks_indices_split_dims[split_dims_to_alter] <- chunk_indices[names(split_dims[[k]])[split_dims_to_alter]]
start_call[[names(split_dims)[k]]] <- chunk(chunks_indices_split_dims, chunks_split_dims,
eval(start_call[[names(split_dims)[k]]]))
dims_to_alter_to_remove <- which(names_dims_to_alter %in% names(split_dims[[k]]))
if (length(dims_to_alter_to_remove) > 0) {
dims_to_alter <- dims_to_alter[-dims_to_alter_to_remove]
names_dims_to_alter <- names_dims_to_alter[-dims_to_alter_to_remove]
}
}
}
if (length(dims_to_alter) > 0) {
for (call_dim in names(attr(start_call, 'Dimensions'))[dims_to_alter]) {
start_call[[call_dim]] <- chunk(chunk_indices[call_dim], chunks[[call_dim]],
eval(start_call[[call_dim]]))
}
}
start_call[['silent']] <- !debug
if (!('num_procs' %in% names(start_call))) {
start_call[['num_procs']] <- threads_load
}
data[[input]] <- eval(start_call)
}
t_end_load <- Sys.time()
timings[['load']] <- c(timings[['load']],
as.numeric(difftime(t_end_load, t_begin_load, units = 'secs')))
if (!silent) {
startR:::.message(paste("Processing..."))
}
t_begin_compute <- Sys.time()
result <- multiApply::Apply(data,
target_dims = attr(step_fun, 'TargetDims'),
fun = step_fun, ...,
output_dims = attr(step_fun, 'OutputDims'),
ncores = threads_compute)
if (!found_first_result) {
names(arrays_of_results) <- names(result)
found_first_result <- TRUE
}
for (component in 1:length(result)) {
arrays_of_results[[component]][[i]] <- result[[component]]
}
rm(data)
gc()
t_end_compute <- Sys.time()
timings[['compute']] <- c(timings[['compute']],
as.numeric(difftime(t_end_compute,
t_begin_compute, units = 'secs')))
}
# Time estimate
if (!on_cluster) {
if (is.null(time_after_first_chunk)) {
time_after_first_chunk <- Sys.time()
if (!silent) {
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
(length(chunk_array) - 1)
units(estimate) <- 'mins'
startR:::.message(
paste0("Remaining time estimate (at ", format(time_after_first_chunk), ") ",
"(neglecting merge time): ", format(estimate))
)
}
}
}
previous_chunk_indices <- chunk_indices
}
# Close nested ecFlow families
for (i in length(chunked_dims):1) {
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
}
# Close the ecFlow suite
tabs <- tabs - 2
suite <- add_line(suite, paste0('endfamily'), tabs)
# suite <- add_line(suite, "family merge", tabs)
# tabs <- tabs + 2
# suite <- add_line(suite, "trigger computation == complete", tabs)
# suite <- add_line(suite, "edit ECF_JOB_CMD 'bash %ECF_JOB% > %ECF_JOBOUT% 2>&1 &'", tabs)
# suite <- add_line(suite, "task Merge", tabs)
# tabs <- tabs - 2
# suite <- add_line(suite, paste0('endfamily'), tabs)
tabs <- tabs - 2
suite <- add_line(suite, "endsuite", tabs)
# Run ecFlow suite if needed
if (on_cluster) {
timings[['cores_per_job']] <- cluster[['cores_per_job']]
timings[['concurrent_chunks']] <- cluster[['max_jobs']]
suite_file <- paste0(ecflow_suite_dir_suite, '/startR_chunking.def')
suite_file_o <- file(suite_file)
writeLines(suite, suite_file_o)
close(suite_file_o)
default_ecflow_server <- list(host = localhost_name, port = '5678')
if (is.null(ecflow_server)) {
.warning("Parameter 'ecflow_server' has not been specified but execution on ",
"cluster has been requested. An ecFlow server instance will ",
"be created on localhost:5678.")
} else {
if ('host' %in% names(ecflow_server)) {
stop("A host has been specified for the 'ecflow_server', but this option is not available yet.")
}
default_ecflow_server[names(ecflow_server)] <- ecflow_server
}
ecflow_server <- default_ecflow_server
system(paste0("ecflow_start.sh -p ", ecflow_server[['port']]))
system(paste0("ecflow_client --load=", suite_file, " --host=",
ecflow_server[['host']], " --port=", ecflow_server[['port']]))
if (!is_ecflow_suite_dir_shared) {
system(paste0('ssh ', cluster[['queue_host']], ' "mkdir -p ',
remote_ecflow_suite_dir_suite, '"'))
system(paste0('rsync -ra ', ecflow_suite_dir_suite,
' ', cluster[['queue_host']], ':',
remote_ecflow_suite_dir_suite))
}
t_end_bychunks_setup <- Sys.time()
timings[['bychunks_setup']] <- as.numeric(difftime(t_end_bychunks_setup,
t_begin_bychunks_setup, units = 'secs'))
if (!is_data_dir_shared) {
t_begin_transfer <- Sys.time()
startR:::.message("Sending involved files to the cluster file system...")
files_to_send <- NULL
#files_to_check <- NULL
for (cube_header in 1:length(cube_headers)) {
expected_files <- attr(cube_headers[[cube_header]], 'ExpectedFiles')
#files_to_check <- c(files_to_check, expected_files)
#if (cluster[['special_setup']] == 'marenostrum4') {
# expected_files <- paste0('/gpfs/archive/bsc32/', expected_files)
#}
files_to_send <- c(files_to_send, expected_files)
}
#which_files_exist <- sapply(files_to_check, file.exists)
which_files_exist <- sapply(files_to_send, file.exists)
files_to_send <- files_to_send[which_files_exist]
if (cluster[['special_setup']] == 'marenostrum4') {
file_spec <- paste(paste0("/gpfs/archive/bsc32/",
files_to_send), collapse = ' ')
system(paste0("ssh ", cluster[['queue_host']], " 'mkdir -p ", remote_data_dir,
' ; module load transfer ; cd ', remote_ecflow_suite_dir_suite,
' ; dtrsync -Rrav ', '\'', file_spec, '\' "', remote_data_dir, '/"',
" ; sleep 1 ; ",
"while [[ ! $(ls dtrsync_*.out 2>/dev/null | wc -l) -ge 1 ]] ; ",
"do sleep 2 ; done",
" ; sleep 1 ; ",
'while [[ ! $(grep "total size is" dtrsync_*.out | ',
"wc -l) -ge 1 ]] ; ",
"do sleep 5 ; done", "'"))
} else {
file_spec <- paste(files_to_send, collapse = ' :')
system(paste0("ssh ", cluster[['queue_host']], ' "mkdir -p ',
remote_data_dir, '"'))
system(paste0("rsync -Rrav '", file_spec, "' '",
cluster[['queue_host']], ":", remote_data_dir, "/'"))
}
startR:::.message("Files sent successfully.")
t_end_transfer <- Sys.time()
timings[['transfer']] <- as.numeric(difftime(t_end_transfer, t_begin_transfer, units = 'secs'))
} else {
timings[['transfer']] <- 0
}
if (!silent) {
startR:::.message(paste0("Processing chunks... "))
}
time_begin_first_chunk <- Sys.time()
# time_after_first_chunk <- NULL
system(paste0("ecflow_client --begin=STARTR_CHUNKING_", suite_id,
" --host=", ecflow_server[['host']], " --port=",
ecflow_server[['port']]))
timings[['total']] <- t_begin_total
startr_exec <- list(cluster = cluster, ecflow_server = ecflow_server,
suite_id = suite_id, chunks = chunks,
num_outputs = length(arrays_of_results),
ecflow_suite_dir = ecflow_suite_dir,
timings = timings)
class(startr_exec) <- 'startR_exec'
if (wait) {
if (!silent) {
startR:::.message(paste0("Remaining time estimate soon... "))
# while (is.null(time_after_first_chunk)) {
# if (any(grepl('.*\\.Rds$', list.files(ecflow_suite_dir_suite)))) {
# time_after_first_chunk <- Sys.time()
# estimate <- (time_after_first_chunk -
# time_before_first_chunk) *
# ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
# cluster[['max_jobs']])
# units(estimate) <- 'mins'
# startR:::.message(
# paste0('Remaining time estimate (neglecting queue and ',
# 'merge time) (at ', format(time_after_first_chunk),
# '): ', format(estimate), ' (',
# format(time_after_first_chunk -
# time_before_first_chunk), ' per chunk)')
# )
# } else if (!cluster[['bidirectional']]) {
# rsync_output <- tryCatch({
# system(paste0("rsync -ra --ignore-missing-args ",
# cluster[['queue_host']], ":",
# remote_ecflow_suite_dir_suite, "/*.Rds ",
# ecflow_suite_dir_suite, "/"), intern = TRUE)
# }, error = function(e) {
# message("Warning: rsync from remote server to collect results failed. ",
# "Retrying soon.")
# failed <- TRUE
# })
# Sys.sleep(cluster[['polling_period']])
# }
# }
startr_exec[['t_begin_first_chunk']] <- time_begin_first_chunk
}
result <- Collect(startr_exec, wait = TRUE)
startR:::.message("Computation ended successfully.")
result
} else {
startr_exec
}
} else {
timings[['cores_per_job']] <- NA
timings[['concurrent_chunks']] <- 1
t_begin_merge <- Sys.time()
for (component in 1:length(arrays_of_results)) {
arrays_of_results[[component]] <- startR:::.MergeArrayOfArrays(arrays_of_results[[component]])
}
t_end_merge <- Sys.time()
timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs'))
t_end_total <- t_end_merge
timings[['total']] <- as.numeric(difftime(t_end_total, t_begin_total, units = 'secs'))
message(paste0("* Computation ended successfully."))
message(paste0("* Number of chunks: ",
timings[['nchunks']]))
message(paste0("* Max. number of concurrent chunks (jobs): ",
timings[['concurrent_chunks']]))
message(paste0("* Requested cores per job: ",
timings[['cores_per_job']]))
message(paste0("* Load threads per chunk: ",
timings[['threads_load']]))
message(paste0("* Compute threads per chunk: ",
timings[['threads_compute']]))
message(paste0("* Total time (s): ",
timings[['total']]))
message(paste0("* Chunking setup: ",
timings[['bychunks_setup']]))
message(paste0("* Data upload to cluster: ",
timings[['transfer']]))
message(paste0("* All chunks: ",
timings[['total']] -
timings[['bychunks_setup']] -
timings[['transfer']] -
timings[['transfer_back']] -
timings[['merge']]))
message(paste0("* Transfer results from cluster: ",
timings[['transfer_back']]))
message(paste0("* Merge: ",
timings[['merge']]))
message(paste0("* Each chunk: "))
message(paste0("* queue: "))
message(paste0("* mean: ",
mean(timings[['queue']])))
message(paste0("* min: ",
min(timings[['queue']])))
message(paste0("* max: ",
max(timings[['queue']])))
message(paste0("* job setup: "))
message(paste0("* mean: ",
mean(timings[['job_setup']])))
message(paste0("* min: ",
min(timings[['job_setup']])))
message(paste0("* max: ",
max(timings[['job_setup']])))
message(paste0("* load: "))
message(paste0("* mean: ",
mean(timings[['load']])))
message(paste0("* min: ",
min(timings[['load']])))
message(paste0("* max: ",
max(timings[['load']])))
message(paste0("* compute: "))
message(paste0("* mean: ",
mean(timings[['compute']])))
message(paste0("* min: ",
min(timings[['compute']])))
message(paste0("* max: ",
max(timings[['compute']])))
attr(arrays_of_results, 'startR_compute_profiling') <- timings
arrays_of_results
}
#TODO: check result dimensions match expected dimensions
}
startR-develop-explore-enh/R/CDORemapper.R 0000664 0000000 0000000 00000004121 13424417765 0020655 0 ustar 00root root 0000000 0000000 CDORemapper <- function(data_array, variables, file_selectors = NULL, ...) {
file_dims <- names(file_selectors)
known_lon_names <- s2dverification:::.KnownLonNames()
known_lat_names <- s2dverification:::.KnownLatNames()
if (!any(known_lon_names %in% names(variables)) ||
!any(known_lat_names %in% names(variables))) {
stop("The longitude and latitude variables must be requested in ",
"'return_vars' and specified in 'transform_vars' for the ",
"CDORemapper to work.")
}
lon_name <- names(variables)[which(names(variables) %in% known_lon_names)[1]]
lons <- variables[[lon_name]]
if (!is.null(dim(lons))) {
dims_to_subset <- which(names(dim(lons)) %in% file_dims)
if (length(dims_to_subset) > 0) {
lons_to_use <- as.list(rep(TRUE, length(dim(lons))))
names(lons_to_use) <- names(dim(lons))
lons_to_use[dims_to_subset] <- as.list(rep(1, length(dims_to_subset)))
attr_bk <- attributes(lons)
lons <- do.call('[', c(list(x = lons), lons_to_use, list(drop = TRUE)))
attributes(lons) <- attr_bk
}
}
lat_name <- names(variables)[which(names(variables) %in% known_lat_names)[1]]
lats <- variables[[lat_name]]
if (!is.null(dim(lats))) {
dims_to_subset <- which(names(dim(lats)) %in% file_dims)
if (length(dims_to_subset) > 0) {
lats_to_use <- as.list(rep(TRUE, length(dim(lats))))
names(lats_to_use) <- names(dim(lats))
lats_to_use[dims_to_subset] <- as.list(rep(1, length(dims_to_subset)))
attr_bk <- attributes(lons)
lats <- do.call('[', c(list(x = lats), lats_to_use, list(drop = TRUE)))
attributes(lats) <- attr_bk
}
}
extra_params <- list(...)
if (!all(c('grid', 'method') %in% names(extra_params))) {
stop("Parameters 'grid' and 'method' must be specified for the ",
"CDORemapper, via the 'transform_params' argument.")
}
result <- s2dverification::CDORemap(data_array, lons, lats, ...)
return_variables <- list(result$lons, result$lats)
names(return_variables) <- c(lon_name, lat_name)
list(data_array = result$data_array, variables = return_variables)
}
startR-develop-explore-enh/R/Collect.R 0000664 0000000 0000000 00000030274 13424417765 0020151 0 ustar 00root root 0000000 0000000 Collect <- function(startr_exec, wait = TRUE, remove = TRUE) {
if (!('startR_exec' %in% class(startr_exec))) {
stop("Parameter 'startr_exec' must be an object of the class ",
"'startR_exec', as returned by Collect(..., wait = FALSE).")
}
if (Sys.which('ecflow_client') == '') {
stop("ecFlow must be installed in order to collect results from a ",
"Compute() execution.")
}
cluster <- startr_exec[['cluster']]
ecflow_server <- startr_exec[['ecflow_server']]
suite_id <- startr_exec[['suite_id']]
chunks <- startr_exec[['chunks']]
num_outputs <- startr_exec[['num_outputs']]
ecflow_suite_dir <- startr_exec[['ecflow_suite_dir']]
timings <- startr_exec[['timings']]
ecflow_suite_dir_suite <- paste0(ecflow_suite_dir, '/STARTR_CHUNKING_',
suite_id, '/')
if (!is.null(cluster[['temp_dir']])) {
remote_ecflow_suite_dir_suite <- paste0(cluster[['temp_dir']],
'/STARTR_CHUNKING_',
suite_id, '/')
}
find_task_name <- function(received_file) {
file_name <- received_file
parts <- strsplit(file_name, '__')[[1]]
parts <- parts[c(2:(length(parts) - 1))]
chunk_indices <- rev(sapply(parts, function(x) {
as.numeric(strsplit(x, '_')[[1]][2])
}))
task_pattern <- paste(paste0('*_', chunk_indices, '/'),
collapse = '')
task_glob <- paste0(ecflow_suite_dir_suite, '/*/*/',
task_pattern)
task_path <- Sys.glob(task_glob)
if (length(task_path) != 1) {
stop("Unexpected error while receiving results.")
}
task_name <- strsplit(task_path, 'computation')[[1]][2]
task_name <- paste0('/STARTR_CHUNKING_', suite_id,
'/computation', task_name)
task_name
}
done <- FALSE
attempt <- 1
sum_received_chunks <- 0
if (cluster[['bidirectional']]) {
t_transfer_back <- NA
} else {
t_transfer_back <- 0
}
time_before_first_chunk <- startr_exec[['t_begin_first_chunk']]
first_chunk_received <- FALSE
rsync_petition_file_lines <- c('+ *.Rds', '+ *.timings', '+ *.crashed',
'+ *.running', '- *')
rsync_petition_file <- tempfile()
writeLines(rsync_petition_file_lines, rsync_petition_file)
Sys.sleep(2)
while (!done) {
failed <- FALSE
if (cluster[['bidirectional']]) {
status <- system(paste0("ecflow_client --get_state=STARTR_CHUNKING_",
suite_id, " --host=",
ecflow_server[['host']], " --port=", ecflow_server[['port']]),
intern = TRUE)
if (any(grepl(paste0("suite STARTR_CHUNKING_", suite_id, " #.* state:complete"), status))) {
done <- TRUE
} else if (!wait) {
stop("Computation in progress...")
}
if (!first_chunk_received) {
if (any(grepl('state:complete', status))) {
if (!is.null(time_before_first_chunk)) {
time_after_first_chunk <- Sys.time()
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
cluster[['max_jobs']])
units(estimate) <- 'mins'
startR:::.message(
paste0('Remaining time estimate (neglecting queue and ',
'merge time) (at ', format(time_after_first_chunk),
'): ', format(estimate), ' (',
format(time_after_first_chunk -
time_before_first_chunk), ' per chunk)')
)
}
first_chunk_received <- TRUE
}
}
Sys.sleep(min(sqrt(attempt), 5))
} else {
#if (sum_received_chunks == 0) {
# # Accounting for the fist chunk received in ByChunks and
# # setting it to complete
# # ByChunks needs the first chunk to calculate remaining time
# received_files <- list.files(ecflow_suite_dir_suite)
# received_chunks <- received_files[grepl('Rds$',
# received_files)]
#}
t_begin_transfer_back <- Sys.time()
rsync_output <- tryCatch({
system(paste0("rsync -rav --include-from=", rsync_petition_file, " '",
cluster[['queue_host']], ":", remote_ecflow_suite_dir_suite, "' ",
ecflow_suite_dir_suite, "/"), intern = TRUE)
}, error = function(e) {
message("Warning: rsync from remote server to collect results failed. ",
"Retrying soon.")
failed <- TRUE
})
t_end_transfer_back <- Sys.time()
t_transfer_back <- t_transfer_back + as.numeric(difftime(t_end_transfer_back,
t_begin_transfer_back, units = 'secs'))
if (!failed) {
#if (sum_received_chunks == 0) {
# rsync_output <- c(rsync_output, received_chunks)
#}
received_running <- grepl('running$', rsync_output)
for (received_chunk_index in which(received_running)) {
file_name <- rsync_output[received_chunk_index]
task_name <- find_task_name(file_name)
system(paste0('ecflow_client --force=active recursive ',
task_name,
" --host=", ecflow_server[['host']],
" --port=", ecflow_server[['port']]))
}
received_crashed <- grepl('crashed$', rsync_output)
for (received_chunk_index in which(received_crashed)) {
file_name <- rsync_output[received_chunk_index]
task_name <- find_task_name(file_name)
system(paste0('ecflow_client --force=aborted recursive ',
task_name,
" --host=", ecflow_server[['host']],
" --port=", ecflow_server[['port']]))
}
received_chunks <- grepl('Rds$', rsync_output)
for (received_chunk_index in which(received_chunks)) {
file_name <- rsync_output[received_chunk_index]
task_name <- find_task_name(file_name)
system(paste0('ecflow_client --force=complete recursive ',
task_name,
" --host=", ecflow_server[['host']],
" --port=", ecflow_server[['port']]))
sum_received_chunks <- sum_received_chunks + 1
if (!first_chunk_received) {
if (!is.null(time_before_first_chunk)) {
time_after_first_chunk <- Sys.time()
estimate <- (time_after_first_chunk -
time_before_first_chunk) *
ceiling((prod(unlist(chunks)) - cluster[['max_jobs']]) /
cluster[['max_jobs']])
units(estimate) <- 'mins'
startR:::.message(
paste0('Remaining time estimate (neglecting queue and ',
'merge time) (at ', format(time_after_first_chunk),
'): ', format(estimate), ' (',
format(time_after_first_chunk -
time_before_first_chunk), ' per chunk)')
)
}
first_chunk_received <- TRUE
}
}
if (sum_received_chunks / num_outputs == prod(unlist(chunks))) {
done <- TRUE
} else if (!wait) {
stop("Computation in progress...")
}
}
Sys.sleep(cluster[['polling_period']])
}
attempt <- attempt + 1
}
file.remove(rsync_petition_file)
timings[['transfer_back']] <- t_transfer_back
if (!is.null(cluster[['temp_dir']])) {
system(paste0('ssh ', cluster[['queue_host']], ' "rm -rf ',
remote_ecflow_suite_dir_suite, '"'))
}
if (remove) {
.warning("ATTENTION: The source chunks will be removed from the ",
"system. Store the result after Collect() ends if needed.")
}
t_begin_merge <- Sys.time()
result <- startR:::.MergeChunks(ecflow_suite_dir, suite_id, remove)
t_end_merge <- Sys.time()
timings[['merge']] <- as.numeric(difftime(t_end_merge, t_begin_merge, units = 'secs'))
received_files <- list.files(ecflow_suite_dir_suite, full.names = TRUE)
received_timings_files <- received_files[grepl('timings$', received_files)]
for (timings_file in received_timings_files) {
times <- readRDS(timings_file)
timings[['queue']] <- c(timings[['queue']], times['queue'])
timings[['job_setup']] <- c(timings[['job_setup']], times['job_setup'])
timings[['load']] <- c(timings[['load']], times['load'])
timings[['compute']] <- c(timings[['compute']], times['compute'])
}
if (remove) {
system(paste0("ecflow_client --delete=force yes /STARTR_CHUNKING_",
suite_id, " --host=", ecflow_server[['host']],
" --port=", ecflow_server[['port']]))
unlink(paste0(ecflow_suite_dir_suite),
recursive = TRUE)
}
if (attempt > 2) {
t_end_total <- Sys.time()
timings[['total']] <- as.numeric(difftime(t_end_total, timings[['total']], units = 'secs'))
} else {
# When attempt <= 2, it means all results were ready possibly from
# long ago, so is not straightfowrard to work out total time.
timings[['total']] <- NA
}
message(paste0("* Computation ended successfully."))
message(paste0("* Number of chunks: ",
timings[['nchunks']]))
message(paste0("* Max. number of concurrent chunks (jobs): ",
timings[['concurrent_chunks']]))
message(paste0("* Requested cores per job: ",
timings[['cores_per_job']]))
message(paste0("* Load threads per chunk: ",
timings[['threads_load']]))
message(paste0("* Compute threads per chunk: ",
timings[['threads_compute']]))
message(paste0("* Total time (s): ",
timings[['total']]))
message(paste0("* Chunking setup: ",
timings[['bychunks_setup']]))
message(paste0("* Data upload to cluster: ",
timings[['transfer']]))
message(paste0("* All chunks: ",
timings[['total']] -
timings[['bychunks_setup']] -
timings[['transfer']] -
timings[['transfer_back']] -
timings[['merge']]))
message(paste0("* Transfer results from cluster: ",
timings[['transfer_back']]))
message(paste0("* Merge: ",
timings[['merge']]))
message(paste0("* Each chunk: "))
message(paste0("* queue: "))
message(paste0("* mean: ",
mean(timings[['queue']])))
message(paste0("* min: ",
min(timings[['queue']])))
message(paste0("* max: ",
max(timings[['queue']])))
message(paste0("* job setup: "))
message(paste0("* mean: ",
mean(timings[['job_setup']])))
message(paste0("* min: ",
min(timings[['job_setup']])))
message(paste0("* max: ",
max(timings[['job_setup']])))
message(paste0("* load: "))
message(paste0("* mean: ",
mean(timings[['load']])))
message(paste0("* min: ",
min(timings[['load']])))
message(paste0("* max: ",
max(timings[['load']])))
message(paste0("* compute: "))
message(paste0("* mean: ",
mean(timings[['compute']])))
message(paste0("* min: ",
min(timings[['compute']])))
message(paste0("* max: ",
max(timings[['compute']])))
#system("ecflow_client --shutdown --port=5678")
#system("ecflow_stop.sh -p 5678")
#result <- readRDS(paste0(ecflow_output_dir, '/result.Rds'))
#file.remove(paste0(ecflow_output_dir, '/result.Rds'))
attr(result, 'startR_compute_profiling') <- timings
result
}
startR-develop-explore-enh/R/Compute.R 0000664 0000000 0000000 00000006461 13424417765 0020201 0 ustar 00root root 0000000 0000000 Compute <- function(workflow, chunks = 'auto',
threads_load = 1, threads_compute = 1,
cluster = NULL, ecflow_suite_dir = NULL,
ecflow_server = NULL, silent = FALSE, debug = FALSE,
wait = TRUE) {
# Check workflow
if (!any(c('startR_cube', 'startR_workflow') %in% class(workflow))) {
stop("Parameter 'workflow' must be an object of class 'startR_cube' as ",
"returned by Start or of class 'startR_workflow' as returned by ",
"AddStep.")
}
if ('startR_cube' %in% class(workflow)) {
#machine_free_ram <- 1000000000
#max_ram_ratio <- 0.5
#data_size <- prod(c(attr(workflow, 'Dimensions'), 8))
#if (data_size > (machine_free_ram * max_ram_ratio)) {
# stop("It is not possible to fit the requested data (", data_size,
# " bytes) into the maximum allowed free ram (", max_ram_ratio,
# " x ", machine_free_ram, ").")
#}
eval(workflow)
} else {
# TODO:
#explore tree of operations and identify set of operations that reduce dimensionality as much as possible
# while being able to fit in (cluster and to exploit number of available nodes) | (machine)
#combine set of operations into a single function
#Goal: to build manually a function following this pattern:
#operation <- function(input1, input2) {
# fun1 <- workflow$fun
# fun1(input1, input2, names(workflow$params)[1] = workflow$params[[1]])
#}
op_text <- "function("
op_text <- paste0(op_text,
paste(paste0('input', 1:length(workflow$inputs)),
collapse = ', '))
op_text <- paste0(op_text, ") {")
op_text <- paste0(op_text, "\n fun1 <- ", paste(deparse(workflow$fun), collapse = '\n'))
op_text <- paste0(op_text, "\n res <- fun1(",
paste(paste0('input', 1:length(workflow$inputs)),
collapse = ", "))
if (length(workflow$params) > 0) {
for (j in 1:length(workflow$params)) {
op_text <- paste0(op_text, ", ")
op_text <- paste0(op_text, names(workflow$params)[j], " = ",
paste(deparse(workflow$params[[j]]), collapse = '\n'))
}
}
op_text <- paste0(op_text, ")")
op_text <- paste0(op_text, "\n}")
operation <- eval(parse(text = op_text))
operation <- Step(operation,
attr(workflow$fun, 'TargetDims'),
attr(workflow$fun, 'OutputDims'),
attr(workflow$fun, 'UseLibraries'),
attr(workflow$fun, 'UseAttributes'))
if (!all(sapply(workflow$inputs, class) == 'startR_header')) {
stop("Workflows with only one step supported by now.")
}
# Run ByChunks with the combined operation
res <- ByChunks(step_fun = operation,
cube_headers = workflow$inputs,
chunks = chunks,
threads_load = threads_load,
threads_compute = threads_compute,
cluster = cluster,
ecflow_suite_dir = ecflow_suite_dir,
ecflow_server = ecflow_server,
silent = silent, debug = debug, wait = wait)
# TODO: carry out remaining steps locally, using multiApply
# Return results
res
}
}
startR-develop-explore-enh/R/NcCloser.R 0000664 0000000 0000000 00000000107 13424417765 0020264 0 ustar 00root root 0000000 0000000 NcCloser <- function(file_object) {
easyNCDF::NcClose(file_object)
}
startR-develop-explore-enh/R/NcDataReader.R 0000664 0000000 0000000 00000015164 13424417765 0021042 0 ustar 00root root 0000000 0000000 # Parameter 'file_selectos' expects a named character vector of single
# file dimension selectors.
# Parameter 'inner_indices' expects a named list of numeric vectors.
NcDataReader <- function(file_path = NULL, file_object = NULL,
file_selectors = NULL, inner_indices = NULL,
synonims) {
close <- FALSE
if (!is.null(file_object)) {
file_to_read <- file_object
file_path <- file_object$filename
} else if (!is.null(file_path)) {
file_to_read <- NcOpener(file_path)
close <- TRUE
} else {
stop("Either 'file_path' or 'file_object' must be provided.")
}
if (is.null(file_to_read)) {
return(NULL)
}
var_requested <- is.null(inner_indices)
drop_var_dim <- FALSE
if (any(c('var', 'variable') %in% names(file_selectors))) {
if (!any(c('var', 'variable') %in% names(inner_indices))) {
inner_indices <- c(inner_indices,
list(var = file_selectors[[which(names(file_selectors) %in%
c('var', 'variable'))[1]]]))
drop_var_dim <- TRUE
}
}
vars_in_file <- easyNCDF::NcReadVarNames(file_to_read)
if (any(names(inner_indices) %in% c('var', 'variable'))) {
position_of_var <- which(names(inner_indices) %in% c('var', 'variable'))[1]
} else if (length(vars_in_file) == 1) {
inner_indices <- c(inner_indices,
list(var = vars_in_file))
drop_var_dim <- TRUE
position_of_var <- length(inner_indices)
} else {
stop("A 'var'/'variable' file dimension or inner dimension must be ",
"requested for NcDataReader() to read NetCDF files with more than ",
"one variable.")
}
inner_indices[[position_of_var]] <- sapply(inner_indices[[position_of_var]],
function(x) {
if (x %in% names(synonims)) {
x_in_file <- which(synonims[[x]] %in% vars_in_file)
if (length(x_in_file) < 1) {
stop("Could not find variable '", x, "' (or its synonims if ",
"specified) in the file ", file_path)
}
if (length(x_in_file) > 1) {
stop("Found more than one matches for the synonims of the ",
"variable '", x, "' in the file ", file_path)
}
synonims[[x]][x_in_file]
} else {
if (is.character(x) && !(x %in% c('all', 'first', 'last'))) {
if (!(x %in% vars_in_file)) {
stop("Could not find variable '", x, "' (or its synonims if ",
"specified) in the file ", file_path)
}
}
x
}
})
#inner_indices[[position_of_var]] <- SelectorChecker(inner_indices[[position_of_var]], vars_in_file)
dims_in_file <- NcDimReader(NULL, file_to_read, NULL,
inner_indices[position_of_var], synonims)
names(inner_indices) <- sapply(names(inner_indices),
function(x) {
if (x %in% names(synonims)) {
x_in_file <- which(synonims[[x]] %in% names(dims_in_file))
if (length(x_in_file) < 1) {
stop("Could not find dimension '", x, "' (or its synonims if ",
"specified) in the file ", file_path)
}
if (length(x_in_file) > 1) {
stop("Found more than one matches for the synonims of the ",
"dimension '", x, "' in the file ", file_path)
}
synonims[[x]][x_in_file]
} else {
if (!(x %in% names(dims_in_file))) {
stop("Could not find dimension '", x, "' (or its synonims if ",
"specified) in the file ", file_path)
}
x
}
})
if (drop_var_dim) {
dims_in_file <- dims_in_file[-which(names(dims_in_file) %in% c('var', 'variable'))]
}
singleton_unspecified_dims <- which((dims_in_file == 1) &
!(names(dims_in_file) %in% names(inner_indices)))
if (length(singleton_unspecified_dims) > 0) {
dims_in_file <- dims_in_file[-singleton_unspecified_dims]
}
if (var_requested) {
result <- easyNCDF::NcToArray(file_to_read, inner_indices, drop_var_dim = drop_var_dim,
expect_all_indices = FALSE, allow_out_of_range = TRUE)
} else {
if (any(!(names(dims_in_file) %in% names(inner_indices)))) {
expected_dim_names <- names(inner_indices)
if (drop_var_dim) {
expected_dim_names <- expected_dim_names[-position_of_var]
}
stop("Unexpected extra dimensions (of length > 1) in the file.\nExpected: ",
paste(expected_dim_names, collapse = ', '), "\n",
"Found: ", paste(names(dims_in_file), collapse = ', '), "\n",
file_path)
}
result <- easyNCDF::NcToArray(file_to_read, inner_indices, drop_var_dim = drop_var_dim,
expect_all_indices = TRUE, allow_out_of_range = TRUE)
}
if (!is.null(dim(result))) {
names(dim(result)) <- sapply(names(dim(result)),
function(x) {
which_entry <- which(sapply(synonims, function(y) x %in% y))
if (length(which_entry) > 0) {
names(synonims)[which_entry]
} else {
x
}
})
}
if (!is.null(result)) {
names(attr(result, 'variables')) <- sapply(names(attr(result, 'variables')),
function(x) {
which_entry <- which(sapply(synonims, function(y) x %in% y))
if (length(which_entry) > 0) {
names(synonims)[which_entry]
} else {
x
}
})
if (length(names(attr(result, 'variables'))) == 1) {
var_name <- names(attr(result, 'variables'))
units <- attr(result, 'variables')[[var_name]][['units']]
if (units %in% c('seconds', 'minutes', 'hours', 'days', 'weeks', 'months', 'years')) {
if (units == 'seconds') {
units <- 'secs'
} else if (units == 'minutes') {
units <- 'mins'
}
result[] <- paste(result[], units)
} else if (grepl(' since ', units)) {
parts <- strsplit(units, ' since ')[[1]]
units <- parts[1]
if (units %in% c('second', 'seconds')) {
units <- 'secs'
} else if (units %in% c('minute', 'minutes')) {
units <- 'mins'
} else if (units == 'day') {
units <- 'days'
}
new_array <- rep(as.POSIXct(parts[2]), length(result)) +
as.difftime(result[], units = units)
#new_array <- seq(as.POSIXct(parts[2]),
# length = max(result, na.rm = TRUE) + 1,
# by = units)[result[] + 1]
dim(new_array) <- dim(result)
attr(new_array, 'variables') <- attr(result, 'variables')
result <- new_array
}
}
}
if (close) {
NcCloser(file_to_read)
}
result
}
startR-develop-explore-enh/R/NcDimReader.R 0000664 0000000 0000000 00000005770 13424417765 0020704 0 ustar 00root root 0000000 0000000 # Parameter 'file_selectors' expects a named character vector of single
# file dimension selectors.
# Parameter 'inner_indices' expects a named list of numeric or
# character string vectors.
NcDimReader <- function(file_path = NULL, file_object = NULL,
file_selectors = NULL, inner_indices = NULL,
synonims) {
close <- FALSE
if (!is.null(file_object)) {
file_to_read <- file_object
file_path <- file_object$filename
} else if (!is.null(file_path)) {
file_to_read <- NcOpener(file_path)
close <- TRUE
} else {
stop("Either 'file_path' or 'file_object' must be provided.")
}
vars_in_file <- easyNCDF::NcReadVarNames(file_to_read)
if (any(c('var', 'variable') %in% names(inner_indices))) {
vars_to_read <- inner_indices[[which(names(inner_indices) %in% c('var', 'variable'))[1]]]
var_tag <- names(inner_indices)[[which(names(inner_indices) %in% c('var', 'variable'))[1]]]
} else if (any(c('var', 'variable') %in% names(file_selectors))) {
vars_to_read <- file_selectors[[which(names(file_selectors) %in% c('var', 'variable'))[1]]]
var_tag <- names(file_selectors)[[which(names(file_selectors) %in% c('var', 'variable'))[1]]]
} else if (length(vars_in_file) == 1) {
vars_to_read <- vars_in_file
file_selectors <- c(file_selectors, list(var = vars_in_file))
var_tag <- 'var'
} else {
stop("NcDimReader expected to find a requested 'var' or 'variable' in 'file_selectors'.")
}
if ((length(vars_to_read) == 1) && (vars_to_read[1] == 'var_names')) {
result <- setNames(length(vars_in_file), var_tag)
} else {
vars_to_read <- sapply(vars_to_read,
function(x) {
if (x %in% names(synonims)) {
x_in_file <- which(synonims[[x]] %in% vars_in_file)
if (length(x_in_file) < 1) {
stop("Could not find variable '", x, "' (or its synonims if ",
"specified) in the file ", file_path)
}
if (length(x_in_file) > 1) {
stop("Found more than one matches for the synonims of the ",
"variable '", x, "' in the file ", file_path)
}
synonims[[x]][x_in_file]
} else {
if (is.character(x) && !(x %in% c('all', 'last', 'first'))) {
if (!(x %in% vars_in_file)) {
stop("Could not find variable '", x, "' (or its synonims if ",
"specified) in the file ", file_path)
}
}
x
}
})
vars_to_read <- SelectorChecker(vars_to_read, vars_in_file,
return_indices = FALSE)
read_dims <- easyNCDF::NcReadDims(file_to_read, vars_to_read)
if (any(c('var', 'variable') %in% names(inner_indices))) {
names(read_dims)[which(names(read_dims) == 'var')] <- var_tag
read_dims[var_tag] <- length(vars_in_file)
} else {
read_dims <- read_dims[-which(names(read_dims) == 'var')]
}
result <- read_dims
}
if (close) {
NcCloser(file_to_read)
}
result
}
startR-develop-explore-enh/R/NcOpener.R 0000664 0000000 0000000 00000000102 13424417765 0020260 0 ustar 00root root 0000000 0000000 NcOpener <- function(file_path) {
easyNCDF::NcOpen(file_path)
}
startR-develop-explore-enh/R/NcVarReader.R 0000664 0000000 0000000 00000001604 13424417765 0020713 0 ustar 00root root 0000000 0000000 NcVarReader <- function(file_path = NULL, file_object = NULL,
file_selectors = NULL, var_name = NULL,
synonims) {
if (!is.null(file_object)) {
file_to_read <- file_object
} else if (!is.null(file_path)) {
file_to_read <- file_path
} else {
stop("Either 'file_path' or 'file_object' must be provided.")
}
if (var_name %in% c('var_names')) {
vars_in_file <- easyNCDF::NcReadVarNames(file_to_read)
vars_in_file <- sapply(vars_in_file,
function(x) {
which_entry <- which(sapply(synonims, function(y) x %in% y))
if (length(which_entry) > 0) {
names(synonims)[which_entry]
} else {
x
}
})
dim(vars_in_file) <- c(var_names = length(vars_in_file))
vars_in_file
} else {
NcDataReader(file_path, file_object, list(var = var_name), NULL, synonims)
}
}
startR-develop-explore-enh/R/SelectorChecker.R 0000664 0000000 0000000 00000020755 13424417765 0021634 0 ustar 00root root 0000000 0000000 SelectorChecker <- function(selectors, var = NULL, return_indices = TRUE,
tolerance = NULL) {
if (length(selectors) == 0) {
stop("No selectors provided in 'selectors'.")
}
if (return_indices) {
if (is.list(selectors)) {
if (length(selectors) != 2) {
stop("'selectors' provided in a wrong format.")
}
crescent_selectors <- TRUE
if (all(sapply(selectors,
function(x) {
any(c('numeric', "POSIXct", "POSIXlt", "POSIXt", "Date") %in% class(x))
}))) {
if (selectors[[2]] < selectors[[1]]) {
crescent_selectors <- FALSE
}
}
for (i in 1:length(selectors)) {
if (is.null(var)) {
if (!is.numeric(selectors[[i]])) {
stop("No selector values provided in 'var'.")
} else {
selectors[[i]] <- round(selectors[[i]])
}
} else if (is.na(selectors[[i]])) {
if (i == 1) {
if (crescent_selectors) {
selectors[[i]] <- 1
} else {
selectors[[i]] <- length(var)
}
}
else {
if (crescent_selectors) {
selectors[[i]] <- length(var)
} else {
selectors[[i]] <- 1
}
}
} else if (is.character(selectors[[i]])) {
if (is.character(var)) {
candidate <- which(var == selectors[[i]])
if (length(candidate) > 0) {
selectors[[i]] <- candidate[1]
} else {
stop("Selector value not found in 'var'.")
}
} else {
stop("Character selectors provided but possible values in 'var' are not character.")
}
} else if (is.numeric(selectors[[i]])) {
if (is.numeric(var)) {
val <- selectors[[i]]
tol <- 0
if (!is.null(tolerance)) {
if (!any(class(tolerance) %in% "numeric")) {
stop("Expected a numeric *_tolerance.")
}
tol <- tolerance
}
if (i == 1) {
if (crescent_selectors) {
val <- val - tol
selectors[[i]] <- which(var >= val)[1]
} else {
val <- val + tol
selectors[[i]] <- rev(which(var <= val))[1]
}
}
else {
if (crescent_selectors) {
val <- val + tol
selectors[[i]] <- rev(which(var <= val))[1]
} else {
val <- val - tol
selectors[[i]] <- which(var >= val)[1]
}
}
} else {
stop("Numeric selectors provided but possible values in 'var' are not numeric.")
}
} else if (any(c("POSIXct", "POSIXlt", "POSIXt", "Date") %in% class(selectors[[i]]))) {
if (any(c("POSIXct", "POSIXlt", "POSIXt", "Date") %in% class(var))) {
val <- selectors[[i]]
tol <- 0
if (!is.null(tolerance)) {
if (!any(class(tolerance) %in% "difftime")) {
stop("Expected a difftime *_tolerance.")
}
tol <- tolerance
}
if (i == 1) {
if (crescent_selectors) {
val <- val - tol
selectors[[i]] <- which(var >= val)[1]
} else {
val <- val + tol
selectors[[i]] <- rev(which(var <= val))[1]
}
}
else {
if (crescent_selectors) {
val <- val + tol
selectors[[i]] <- rev(which(var <= val))[1]
} else {
val <- val - tol
selectors[[i]] <- which(var >= val)[1]
}
}
} else {
stop("Datetime selectors provided but possible values in 'var' are not datetime.")
}
}
}
# The checker is returning a list of two indices.
##selectors[[1]]:selectors[[2]]
selectors
} else if (is.numeric(selectors)) {
if (is.null(var)) {
## TODO: Crash if negative indices?
round(selectors)
} else {
if (is.numeric(var)) {
if (!all(selectors %in% var)) {
.warning(paste0("Numeric selectors have been ",
"provided for a dimension defined along a ",
"numeric variable, but no exact match ",
"found for all the selectors. Taking the index of the ",
"nearest values."))
}
if (!is.null(tolerance)) {
if (!any(class(tolerance) %in% 'numeric')) {
stop("Expected a numeric *_tolerance.")
}
}
sapply(selectors, function(x) {
dif <- abs(var - x)
res <- which.min(dif)[1]
if (!is.null(tolerance)) {
if (dif[res] > tolerance) {
stop("Could not find a value in 'var' close ",
"enough to one of the 'selectors', ",
"according to 'tolerance'.")
}
}
res
})
} else {
stop("Numeric selectors provided but possible values in 'var' are not numeric.")
}
}
} else if (any(c('POSIXct', 'POSIXlt', 'POSIXt', 'Date') %in% class(selectors))) {
if (is.null(var)) {
stop("Numeric selectors have been provided for a dimension ",
"defined along a date variable, but no possible values ",
"provided in 'var'.")
}
if (!all(selectors %in% var)) {
.warning(paste0("Date selectors have been ",
"provided for a dimension defined along a ",
"date variable, but no exact match ",
"found for all the selectors. Taking the index of the ",
"nearest values."))
}
if (!is.null(tolerance)) {
if (!any(class(tolerance) %in% 'difftime')) {
stop("Expected a difftime *_tolerance.")
}
}
sapply(selectors, function(x) {
dif <- abs(var - x)
res <- which.min(dif)[1]
if (!is.null(tolerance)) {
if (dif[res] > tolerance) {
res <- NA
#stop("Could not find a value in 'var' close ",
# "enough to one of the 'selectors', ",
# "according to 'tolerance'.")
}
}
res
})
} else {
if (is.null(var)) {
stop("No selector values provided in 'var'.")
} else {
if ((length(selectors) == 1) &&
(selectors %in% c('all', 'first', 'last'))) {
if (selectors == 'all') {
1:length(var)
} else if (selectors == 'first') {
1
} else {
length(var)
}
} else {
if (!identical(class(var), class(selectors))) {
stop("Class of provided selectors does not match class of 'var'.")
}
candidates <- which(as.vector(var) == as.vector(selectors))
if (length(candidates) == 0) {
stop("Selectors do not match values in 'var'.")
} else if (length(candidates) != length(selectors)) {
stop("Some selectors do not match values in 'var'.")
}
candidates
}
}
}
} else {
if (!is.null(var)) {
if (is.list(selectors)) {
if (length(selectors != 2)) {
stop("'selectors' provided in a wrong format.")
} else {
var[selectors[[1]]:selectors[[2]]]
}
} else if (is.numeric(selectors)) {
if (length(selectors) > 0) {
var[selectors]
} else {
stop("No selectors provided.")
}
} else {
if ((length(selectors) == 1) &&
(selectors %in% c('all', 'first', 'last'))) {
if (selectors == 'all') {
var
} else if (selectors == 'first') {
head(var, 1)
} else {
tail(var, 1)
}
} else {
selectors
}
}
} else {
selectors
}
}
}
startR-develop-explore-enh/R/Sort.R 0000664 0000000 0000000 00000001565 13424417765 0017514 0 ustar 00root root 0000000 0000000 Sort <- function(...) {
params <- list(...)
f <- "function(x) {
dim_bk <- dim(x)
x <- do.call(sort, c(list(x, index.return = TRUE),
PARAMS))
dim(x$x) <- dim_bk
dim(x$ix) <- dim_bk
x
}"
f <- gsub("PARAMS", deparse(params), f)
r <- eval(parse(text = f))
attr(r, 'circular') <- FALSE
r
}
CircularSort <- function(start, end, ...) {
params <- list(...)
f <- "function (x) {
start <- START
end <- END
dim_bk <- dim(x)
x <- do.call(sort, c(list((x - start) %% (end - start) + start,
index.return = TRUE),
PARAMS))
dim(x$x) <- dim_bk
dim(x$ix) <- dim_bk
x
}"
f <- gsub("START", deparse(start), f)
f <- gsub("END", deparse(end), f)
f <- gsub("PARAMS", deparse(params), f)
r <- eval(parse(text = f))
attr(r, 'circular') <- TRUE
r
}
startR-develop-explore-enh/R/Start.R 0000664 0000000 0000000 00000454600 13424417765 0017664 0 ustar 00root root 0000000 0000000 Start <- function(..., # dim = indices/selectors,
# dim_var = 'var',
# dim_reorder = Sort/CircularSort,
# dim_tolerance = number,
# dim_depends = 'file_dim',
# dim_across = 'file_dim',
return_vars = NULL,
synonims = NULL,
file_opener = NcOpener,
file_var_reader = NcVarReader,
file_dim_reader = NcDimReader,
file_data_reader = NcDataReader,
file_closer = NcCloser,
transform = NULL,
transform_params = NULL,
transform_vars = NULL,
transform_extra_cells = 0,
apply_indices_after_transform = FALSE,
pattern_dims = NULL,
metadata_dims = NULL,
selector_checker = SelectorChecker,
merge_across_dims = FALSE,
split_multiselected_dims = FALSE,
path_glob_permissive = FALSE,
retrieve = FALSE,
num_procs = 1,
silent = FALSE, debug = FALSE) {
#, config_file = NULL
#dictionary_dim_names = ,
#dictionary_var_names =
dim_params <- list(...)
# Take *_var parameters apart
var_params_ind <- grep('_var$', names(dim_params))
var_params <- dim_params[var_params_ind]
# Check all *_var are NULL or vectors of character strings, and
# that they all have a matching dimension param.
i <- 1
for (var_param in var_params) {
if (!is.character(var_param)) {
stop("All '*_var' parameters must be character strings.")
} else if (!any(grepl(paste0('^', strsplit(names(var_params)[i],
'_var$')[[1]][1], '$'),
names(dim_params)))) {
stop(paste0("All '*_var' parameters must be associated to a dimension parameter. Found parameter '",
names(var_params)[i], "' but no parameter '",
strsplit(names(var_params)[i], '_var$')[[1]][1], "'."))
}
i <- i + 1
}
# Make the keys of 'var_params' to be the name of
# the corresponding dimension.
if (length(var_params) < 1) {
var_params <- NULL
} else {
names(var_params) <- gsub('_var$', '', names(var_params))
}
# Take *_reorder parameters apart
dim_reorder_params_ind <- grep('_reorder$', names(dim_params))
dim_reorder_params <- dim_params[dim_reorder_params_ind]
# Make the keys of 'dim_reorder_params' to be the name of
# the corresponding dimension.
if (length(dim_reorder_params) < 1) {
dim_reorder_params <- NULL
} else {
names(dim_reorder_params) <- gsub('_reorder$', '', names(dim_reorder_params))
}
# Take *_tolerance parameters apart
tolerance_params_ind <- grep('_tolerance$', names(dim_params))
tolerance_params <- dim_params[tolerance_params_ind]
# Take *_depends parameters apart
depends_params_ind <- grep('_depends$', names(dim_params))
depends_params <- dim_params[depends_params_ind]
# Check all *_depends are NULL or vectors of character strings, and
# that they all have a matching dimension param.
i <- 1
for (depends_param in depends_params) {
if (!is.character(depends_param) || (length(depends_param) > 1)) {
stop("All '*_depends' parameters must be single character strings.")
} else if (!any(grepl(paste0('^', strsplit(names(depends_params)[i],
'_depends$')[[1]][1], '$'),
names(dim_params)))) {
stop(paste0("All '*_depends' parameters must be associated to a dimension parameter. Found parameter '",
names(depends_params)[i], "' but no parameter '",
strsplit(names(depends_params)[i], '_depends$')[[1]][1], "'."))
}
i <- i + 1
}
# Make the keys of 'depends_params' to be the name of
# the corresponding dimension.
if (length(depends_params) < 1) {
depends_params <- NULL
} else {
names(depends_params) <- gsub('_depends$', '', names(depends_params))
}
# Change name to depending_file_dims
depending_file_dims <- depends_params
# Take *_across parameters apart
across_params_ind <- grep('_across$', names(dim_params))
across_params <- dim_params[across_params_ind]
# Check all *_across are NULL or vectors of character strings, and
# that they all have a matching dimension param.
i <- 1
for (across_param in across_params) {
if (!is.character(across_param) || (length(across_param) > 1)) {
stop("All '*_across' parameters must be single character strings.")
} else if (!any(grepl(paste0('^', strsplit(names(across_params)[i],
'_across$')[[1]][1], '$'),
names(dim_params)))) {
stop(paste0("All '*_across' parameters must be associated to a dimension parameter. Found parameter '",
names(across_params)[i], "' but no parameter '",
strsplit(names(across_params)[i], '_across$')[[1]][1], "'."))
}
i <- i + 1
}
# Make the keys of 'across_params' to be the name of
# the corresponding dimension.
if (length(across_params) < 1) {
across_params <- NULL
} else {
names(across_params) <- gsub('_across$', '', names(across_params))
}
# Change name to inner_dims_across_files
inner_dims_across_files <- across_params
# Check merge_across_dims
if (!is.logical(merge_across_dims)) {
stop("Parameter 'merge_across_dims' must be TRUE or FALSE.")
}
# Leave alone the dimension parameters in the variable dim_params
if (length(c(var_params_ind, dim_reorder_params_ind, tolerance_params_ind,
depends_params_ind, across_params_ind)) > 0) {
dim_params <- dim_params[-c(var_params_ind, dim_reorder_params_ind,
tolerance_params_ind, depends_params_ind,
across_params_ind)]
# Reallocating pairs of across file and inner dimensions if they have
# to be merged. They are put one next to the other to ease merge later.
if (merge_across_dims) {
for (inner_dim_across in names(inner_dims_across_files)) {
inner_dim_pos <- which(names(dim_params) == inner_dim_across)
file_dim_pos <- which(names(dim_params) == inner_dims_across_files[[inner_dim_across]])
new_pos <- inner_dim_pos
if (file_dim_pos < inner_dim_pos) {
new_pos <- new_pos - 1
}
dim_params_to_move <- dim_params[c(inner_dim_pos, file_dim_pos)]
dim_params <- dim_params[-c(inner_dim_pos, file_dim_pos)]
new_dim_params <- list()
if (new_pos > 1) {
new_dim_params <- c(new_dim_params, dim_params[1:(new_pos - 1)])
}
new_dim_params <- c(new_dim_params, dim_params_to_move)
if (length(dim_params) >= new_pos) {
new_dim_params <- c(new_dim_params, dim_params[new_pos:length(dim_params)])
}
dim_params <- new_dim_params
}
}
}
dim_names <- names(dim_params)
if (is.null(dim_names)) {
stop("At least one pattern dim must be specified.")
}
# Look for chunked dims
chunks <- vector('list', length(dim_names))
names(chunks) <- dim_names
for (dim_name in dim_names) {
if (!is.null(attr(dim_params[[dim_name]], 'chunk'))) {
chunks[[dim_name]] <- attr(dim_params[[dim_name]], 'chunk')
attributes(dim_params[[dim_name]]) <- attributes(dim_params[[dim_name]])[-which(names(attributes(dim_params[[dim_name]])) == 'chunk')]
} else {
chunks[[dim_name]] <- c(chunk = 1, n_chunks = 1)
}
}
# This is a helper function to compute the chunk indices to take once the total
# number of indices for a dimension has been discovered.
chunk_indices <- function(n_indices, chunk, n_chunks, dim_name) {
if (n_chunks > n_indices) {
stop("Requested to divide dimension '", dim_name, "' of length ",
n_indices, " in ", n_chunks, " chunks, which is not possible.")
}
chunk_sizes <- rep(floor(n_indices / n_chunks), n_chunks)
chunks_to_extend <- n_indices - chunk_sizes[1] * n_chunks
if (chunks_to_extend > 0) {
chunk_sizes[1:chunks_to_extend] <- chunk_sizes[1:chunks_to_extend] + 1
}
chunk_size <- chunk_sizes[chunk]
offset <- 0
if (chunk > 1) {
offset <- sum(chunk_sizes[1:(chunk - 1)])
}
indices <- 1:chunk_sizes[chunk] + offset
array(indices, dim = setNames(length(indices), dim_name))
}
# Check pattern_dims
if (is.null(pattern_dims)) {
.warning(paste0("Parameter 'pattern_dims' not specified. Taking the first dimension, '",
dim_names[1], "' as 'pattern_dims'."))
pattern_dims <- dim_names[1]
} else if (is.character(pattern_dims) && (length(pattern_dims) > 0)) {
pattern_dims <- unique(pattern_dims)
} else {
stop("Parameter 'pattern_dims' must be a vector of character strings.")
}
if (any(names(var_params) %in% pattern_dims)) {
stop("'*_var' parameters specified for pattern dimensions. Remove or fix them.")
}
# Find the pattern dimension with the pattern specifications
found_pattern_dim <- NULL
for (pattern_dim in pattern_dims) {
# Check all specifications in pattern_dim are valid
dat <- datasets <- dim_params[[pattern_dim]]
if (is.null(dat) || !(is.character(dat) && all(nchar(dat) > 0)) && !is.list(dat)) {
stop(paste0("Parameter '", pattern_dim,
"' must be a list of lists with pattern specifications or a vector of character strings."))
}
if (!is.null(dim_reorder_params[[pattern_dim]])) {
.warning(paste0("A reorder for the selectors of '", pattern_dim,
"' has been specified, but it is a pattern dimension and the reorder will be ignored."))
}
if (is.list(dat) || any(sapply(dat, is.list))) {
if (is.null(found_pattern_dim)) {
found_pattern_dim <- pattern_dim
} else {
stop("Found more than one pattern dim with pattern specifications (list of lists). One and only one pattern dim must contain pattern specifications.")
}
}
}
if (is.null(found_pattern_dim)) {
.warning(paste0("Could not find any pattern dim with explicit data set descriptions (in the form of list of lists). Taking the first pattern dim, '", pattern_dims[1], "', as dimension with pattern specifications."))
found_pattern_dim <- pattern_dims[1]
}
# Check all *_reorder are NULL or functions, and that they all have
# a matching dimension param.
i <- 1
for (dim_reorder_param in dim_reorder_params) {
if (!is.function(dim_reorder_param)) {
stop("All '*_reorder' parameters must be functions.")
} else if (!any(grepl(paste0('^', strsplit(names(dim_reorder_params)[i],
'_reorder$')[[1]][1], '$'),
names(dim_params)))) {
stop(paste0("All '*_reorder' parameters must be associated to a dimension parameter. Found parameter '",
names(dim_reorder_params)[i], "' but no parameter '",
strsplit(names(dim_reorder_params)[i], '_reorder$')[[1]][1], "'."))
#} else if (!any(grepl(paste0('^', strsplit(names(dim_reorder_params)[i],
# '_reorder$')[[1]][1], '$'),
# names(var_params)))) {
# stop(paste0("All '*_reorder' parameters must be associated to a dimension parameter associated to a ",
# "variable. Found parameter '", names(dim_reorder_params)[i], "' and dimension parameter '",
# strsplit(names(dim_reorder_params)[i], '_reorder$')[[1]][1], "' but did not find variable ",
# "parameter '", strsplit(names(dim_reorder_params)[i], '_reorder$')[[1]][1], "_var'."))
}
i <- i + 1
}
# Check all *_tolerance are NULL or vectors of character strings, and
# that they all have a matching dimension param.
i <- 1
for (tolerance_param in tolerance_params) {
if (!any(grepl(paste0('^', strsplit(names(tolerance_params)[i],
'_tolerance$')[[1]][1], '$'),
names(dim_params)))) {
stop(paste0("All '*_tolerance' parameters must be associated to a dimension parameter. Found parameter '",
names(tolerance_params)[i], "' but no parameter '",
strsplit(names(tolerance_params)[i], '_tolerance$')[[1]][1], "'."))
#} else if (!any(grepl(paste0('^', strsplit(names(tolerance_params)[i],
# '_tolerance$')[[1]][1], '$'),
# names(var_params)))) {
# stop(paste0("All '*_tolerance' parameters must be associated to a dimension parameter associated to a ",
# "variable. Found parameter '", names(tolerance_params)[i], "' and dimension parameter '",
# strsplit(names(tolerance_params)[i], '_tolerance$')[[1]][1], "' but did not find variable ",
# "parameter '", strsplit(names(tolerance_params)[i], '_tolerance$')[[1]][1], "_var'."))
}
i <- i + 1
}
# Make the keys of 'tolerance_params' to be the name of
# the corresponding dimension.
if (length(tolerance_params) < 1) {
tolerance_params <- NULL
} else {
names(tolerance_params) <- gsub('_tolerance$', '', names(tolerance_params))
}
# Check metadata_dims
if (!is.null(metadata_dims)) {
if (is.na(metadata_dims)) {
metadata_dims <- NULL
} else if (!is.character(metadata_dims) || (length(metadata_dims) < 1)) {
stop("Parameter 'metadata' dims must be a vector of at least one character string.")
}
} else {
metadata_dims <- pattern_dims
}
# Once the pattern dimension with dataset specifications is found,
# the variable 'dat' is mounted with the information of each
# dataset.
# Take only the datasets for the requested chunk
dats_to_take <- chunk_indices(length(dim_params[[found_pattern_dim]]),
chunks[[found_pattern_dim]]['chunk'],
chunks[[found_pattern_dim]]['n_chunks'],
found_pattern_dim)
dim_params[[found_pattern_dim]] <- dim_params[[found_pattern_dim]][dats_to_take]
dat <- datasets <- dim_params[[found_pattern_dim]]
dat_info_names <- c('name', 'path')#, 'nc_var_name', 'suffix', 'var_min', 'var_max', 'dimnames')
dat_to_fetch <- c()
dat_names <- c()
if (!is.list(dat)) {
dat <- as.list(dat)
} else {
if (!any(sapply(dat, is.list))) {
dat <- list(dat)
}
}
for (i in 1:length(dat)) {
if (is.character(dat[[i]]) && length(dat[[i]]) == 1 && nchar(dat[[i]]) > 0) {
if (grepl('^(\\./|\\.\\./|/.*/|~/)', dat[[i]])) {
dat[[i]] <- list(path = dat[[i]])
} else {
dat[[i]] <- list(name = dat[[i]])
}
} else if (!is.list(dat[[i]])) {
stop(paste0("Parameter '", pattern_dim,
"' is incorrect. It must be a list of lists or character strings."))
}
#if (!(all(names(dat[[i]]) %in% dat_info_names))) {
# stop("Error: parameter 'dat' is incorrect. There are unrecognized components in the information of some of the datasets. Check 'dat' in ?Load for details.")
#}
if (!('name' %in% names(dat[[i]]))) {
dat[[i]][['name']] <- paste0('dat', i)
if (!('path' %in% names(dat[[i]]))) {
stop(paste0("Parameter '", found_pattern_dim,
"' is incorrect. A 'path' should be provided for each dataset if no 'name' is provided."))
}
} else if (!('path' %in% names(dat[[i]]))) {
dat_to_fetch <- c(dat_to_fetch, i)
}
#if ('path' %in% names(dat[[i]])) {
# if (!('nc_var_name' %in% names(dat[[i]]))) {
# dat[[i]][['nc_var_name']] <- '$var_name$'
# }
# if (!('suffix' %in% names(dat[[i]]))) {
# dat[[i]][['suffix']] <- ''
# }
# if (!('var_min' %in% names(dat[[i]]))) {
# dat[[i]][['var_min']] <- ''
# }
# if (!('var_max' %in% names(dat[[i]]))) {
# dat[[i]][['var_max']] <- ''
# }
#}
dat_names <- c(dat_names, dat[[i]][['name']])
}
if ((length(dat_to_fetch) > 0) && (length(dat_to_fetch) < length(dat))) {
.warning("'path' has been provided for some datasets. Any information in the configuration file related to these will be ignored.")
}
if (length(dat_to_fetch) > 0) {
stop("Specified only the name for some data sets, but not the path ",
"pattern. This option has not been yet implemented.")
}
# Reorder inner_dims_across_files (to make the keys be the file dimensions,
# and the values to be the inner dimensions that go across it).
if (!is.null(inner_dims_across_files)) {
# Reorder: example, convert list(ftime = 'chunk', ensemble = 'member', xx = 'chunk')
# to list(chunk = c('ftime', 'xx'), member = 'ensemble')
new_idaf <- list()
for (i in names(inner_dims_across_files)) {
if (!(inner_dims_across_files[[i]] %in% names(new_idaf))) {
new_idaf[[inner_dims_across_files[[i]]]] <- i
} else {
new_idaf[[inner_dims_across_files[[i]]]] <- c(new_idaf[[inner_dims_across_files[[i]]]], i)
}
}
inner_dims_across_files <- new_idaf
}
# Check return_vars
if (is.null(return_vars)) {
return_vars <- list()
# if (length(var_params) > 0) {
# return_vars <- as.list(paste0(names(var_params), '_var'))
# } else {
# return_vars <- list()
# }
}
if (!is.list(return_vars)) {
stop("Parameter 'return_vars' must be a list or NULL.")
}
if (length(return_vars) > 0 && is.null(names(return_vars))) {
# names(return_vars) <- rep('', length(return_vars))
stop("Parameter 'return_vars' must be a named list.")
}
i <- 1
while (i <= length(return_vars)) {
# if (names(return_vars)[i] == '') {
# if (!(is.character(return_vars[[i]]) && (length(return_vars[[i]]) == 1))) {
# stop("The ", i, "th specification in 'return_vars' is malformed.")
# }
# if (!grepl('_var$', return_vars[[i]])) {
# stop("The ", i, "th specification in 'return_vars' is malformed.")
# }
# dim_name <- strsplit(return_vars[[i]], '_var$')[[1]][1]
# if (!(dim_name %in% names(var_params))) {
# stop("'", dim_name, "_var' requested in 'return_vars' but ",
# "no '", dim_name, "_var' specified in the .Load call.")
# }
# names(return_vars)[i] <- var_params[[dim_name]]
# return_vars[[i]] <- found_pattern_dim
# } else
if (length(return_vars[[i]]) > 0) {
if (!is.character(return_vars[[i]])) {
stop("The ", i, "th specification in 'return_vars' is malformed. It ",
"must be a vector of character strings of valid file dimension ",
"names.")
}
}
i <- i + 1
}
# Check synonims
if (!is.null(synonims)) {
error <- FALSE
if (!is.list(synonims)) {
error <- TRUE
}
for (synonim_entry in names(synonims)) {
if (!(synonim_entry %in% names(dim_params)) &&
!(synonim_entry %in% names(return_vars))) {
error <- TRUE
}
if (!is.character(synonims[[synonim_entry]]) ||
length(synonims[[synonim_entry]]) < 1) {
error <- TRUE
}
}
if (error) {
stop("Parameter 'synonims' must be a named list, where the names are ",
"a name of a requested dimension or variable and the values are ",
"vectors of character strings with at least one alternative name ",
" for each dimension or variable in 'synonims'.")
}
}
if (length(unique(names(synonims))) < length(names(synonims))) {
stop("There must not be repeated entries in 'synonims'.")
}
if (length(unique(unlist(synonims))) < length(unlist(synonims))) {
stop("There must not be repeated values in 'synonims'.")
}
# Make that all dims and vars have an entry in synonims, even if only dim_name = dim_name
dim_entries_to_add <- which(!(names(dim_params) %in% names(synonims)))
if (length(dim_entries_to_add) > 0) {
synonims[names(dim_params)[dim_entries_to_add]] <- as.list(names(dim_params)[dim_entries_to_add])
}
var_entries_to_add <- which(!(names(var_params) %in% names(synonims)))
if (length(var_entries_to_add) > 0) {
synonims[names(var_params)[var_entries_to_add]] <- as.list(names(var_params)[var_entries_to_add])
}
# Check selector_checker
if (is.null(selector_checker) || !is.function(selector_checker)) {
stop("Parameter 'selector_checker' must be a function.")
}
# Check file_opener
if (is.null(file_opener) || !is.function(file_opener)) {
stop("Parameter 'file_opener' must be a function.")
}
# Check file_var_reader
if (!is.null(file_var_reader) && !is.function(file_var_reader)) {
stop("Parameter 'file_var_reader' must be a function.")
}
# Check file_dim_reader
if (!is.null(file_dim_reader) && !is.function(file_dim_reader)) {
stop("Parameter 'file_dim_reader' must be a function.")
}
# Check file_data_reader
if (is.null(file_data_reader) || !is.function(file_data_reader)) {
stop("Parameter 'file_data_reader' must be a function.")
}
# Check file_closer
if (is.null(file_closer) || !is.function(file_closer)) {
stop("Parameter 'file_closer' must be a function.")
}
# Check transform
if (!is.null(transform)) {
if (!is.function(transform)) {
stop("Parameter 'transform' must be a function.")
}
}
# Check transform_params
if (!is.null(transform_params)) {
if (!is.list(transform_params)) {
stop("Parameter 'transform_params' must be a list.")
}
if (is.null(names(transform_params))) {
stop("Parameter 'transform_params' must be a named list.")
}
}
# Check transform_vars
if (!is.null(transform_vars)) {
if (!is.character(transform_vars)) {
stop("Parameter 'transform_vars' must be a vector of character strings.")
}
}
if (any(!(transform_vars %in% names(return_vars)))) {
stop("All the variables specified in 'transform_vars' must also be specified in 'return_vars'.")
}
# Check apply_indices_after_transform
if (!is.logical(apply_indices_after_transform)) {
stop("Parameter 'apply_indices_after_transform' must be either TRUE or FALSE.")
}
aiat <- apply_indices_after_transform
# Check transform_extra_cells
if (!is.numeric(transform_extra_cells)) {
stop("Parameter 'transform_extra_cells' must be numeric.")
}
transform_extra_cells <- round(transform_extra_cells)
# Check split_multiselected_dims
if (!is.logical(split_multiselected_dims)) {
stop("Parameter 'split_multiselected_dims' must be TRUE or FALSE.")
}
# Check path_glob_permissive
if (!is.numeric(path_glob_permissive) && !is.logical(path_glob_permissive)) {
stop("Parameter 'path_glob_permissive' must be TRUE, FALSE or an integer.")
}
if (length(path_glob_permissive) != 1) {
stop("Parameter 'path_glob_permissive' must be of length 1.")
}
# Check retrieve
if (!is.logical(retrieve)) {
stop("Parameter 'retrieve' must be TRUE or FALSE.")
}
# Check num_procs
if (!is.null(num_procs)) {
if (!is.numeric(num_procs)) {
stop("Parameter 'num_procs' must be numeric.")
} else {
num_procs <- round(num_procs)
}
}
# Check silent
if (!is.logical(silent)) {
stop("Parameter 'silent' must be logical.")
}
dim_params[[found_pattern_dim]] <- dat_names
if (!silent) {
.message(paste0("Exploring files... This will take a variable amount ",
"of time depending on the issued request and the ",
"performance of the file server..."))
}
if (!is.character(debug)) {
dims_to_check <- c('time')
} else {
dims_to_check <- debug
debug <- TRUE
}
############################## READING FILE DIMS ############################
# Check that no unrecognized variables are present in the path patterns
# and also that no file dimensions are requested to THREDDs catalogs.
# And in the mean time, build all the work pieces and look for the
# first available file of each dataset.
array_of_files_to_load <- NULL
array_of_not_found_files <- NULL
indices_of_first_files_with_data <- vector('list', length(dat))
selectors_of_first_files_with_data <- vector('list', length(dat))
dataset_has_files <- rep(FALSE, length(dat))
found_file_dims <- vector('list', length(dat))
expected_inner_dims <- vector('list', length(dat))
#print("A")
for (i in 1:length(dat)) {
#print("B")
dat_selectors <- dim_params
dat_selectors[[found_pattern_dim]] <- dat_selectors[[found_pattern_dim]][i]
dim_vars <- paste0('$', dim_names, '$')
file_dims <- which(sapply(dim_vars, grepl, dat[[i]][['path']], fixed = TRUE))
if (length(file_dims) > 0) {
file_dims <- dim_names[file_dims]
}
file_dims <- unique(c(pattern_dims, file_dims))
found_file_dims[[i]] <- file_dims
expected_inner_dims[[i]] <- dim_names[which(!(dim_names %in% file_dims))]
# (Check the depending_file_dims).
if (any(c(names(depending_file_dims), unlist(depending_file_dims)) %in%
expected_inner_dims[[i]])) {
stop(paste0("The dimension dependancies specified in ",
"'depending_file_dims' can only be between file ",
"dimensions, but some inner dimensions found in ",
"dependancies for '", dat[[i]][['name']], "', which ",
"has the following file dimensions: ",
paste(paste0("'", file_dims, "'"), collapse = ', '), "."))
} else {
a <- names(depending_file_dims) %in% file_dims
b <- unlist(depending_file_dims) %in% file_dims
ab <- a & b
if (any(!ab)) {
.warning(paste0("Detected some dependancies in 'depending_file_dims' with ",
"non-existing dimension names. These will be disregarded."))
depending_file_dims <- depending_file_dims[-which(!ab)]
}
if (any(names(depending_file_dims) == unlist(depending_file_dims))) {
depending_file_dims <- depending_file_dims[-which(names(depending_file_dims) == unlist(depending_file_dims))]
}
}
# (Check the inner_dims_across_files).
if (any(!(names(inner_dims_across_files) %in% file_dims)) ||
any(!(unlist(inner_dims_across_files) %in% expected_inner_dims[[i]]))) {
stop(paste0("All relationships specified in ",
"'_across' parameters must be between a inner ",
"dimension and a file dimension. Found wrong ",
"specification for '", dat[[i]][['name']], "', which ",
"has the following file dimensions: ",
paste(paste0("'", file_dims, "'"), collapse = ', '),
", and the following inner dimensions: ",
paste(paste0("'", expected_inner_dims[[i]], "'"),
collapse = ', '), "."))
}
# (Check the return_vars).
j <- 1
while (j <= length(return_vars)) {
if (any(!(return_vars[[j]] %in% file_dims))) {
if (any(return_vars[[j]] %in% expected_inner_dims[[i]])) {
stop("Found variables in 'return_vars' requested ",
"for some inner dimensions (for dataset '",
dat[[i]][['name']], "'), but variables can only be ",
"requested for file dimensions.")
} else {
stop("Found variables in 'return_vars' requested ",
"for non-existing dimensions.")
}
}
j <- j + 1
}
# (Check the metadata_dims).
if (!is.null(metadata_dims)) {
if (any(!(metadata_dims %in% file_dims))) {
stop("All dimensions in 'metadata_dims' must be file dimensions.")
}
}
## Look for _var params that should be requested automatically.
for (dim_name in dim_names) {
if (!(dim_name %in% pattern_dims)) {
if (is.null(attr(dat_selectors[[dim_name]], 'values')) ||
is.null(attr(dat_selectors[[dim_name]], 'indices'))) {
flag <- ((dat_selectors[[dim_name]] %in% c('all', 'first', 'last')) ||
(is.numeric(unlist(dat_selectors[[dim_name]]))))
attr(dat_selectors[[dim_name]], 'values') <- !flag
attr(dat_selectors[[dim_name]], 'indices') <- flag
}
## The following code 'rewrites' var_params for all datasets. If providing different
## path pattern repositories with different file/inner dimensions, var_params might
## have to be handled for each dataset separately.
if ((attr(dat_selectors[[dim_name]], 'values') || (dim_name %in% c('var', 'variable'))) &&
!(dim_name %in% names(var_params)) && !(dim_name %in% file_dims)) {
if (dim_name %in% c('var', 'variable')) {
var_params <- c(var_params, setNames(list('var_names'), dim_name))
.warning(paste0("Found specified values for dimension '", dim_name, "' but no '",
dim_name, "_var' requested. ", '"', dim_name, "_var = '",
'var_names', "'", '"', " has been automatically added to ",
"the Start call."))
} else {
var_params <- c(var_params, setNames(list(dim_name), dim_name))
.warning(paste0("Found specified values for dimension '", dim_name, "' but no '",
dim_name, "_var' requested. ", '"', dim_name, "_var = '",
dim_name, "'", '"', " has been automatically added to ",
"the Start call."))
}
}
}
}
## (Check the *_var parameters).
if (any(!(unlist(var_params) %in% names(return_vars)))) {
vars_to_add <- which(!(unlist(var_params) %in% names(return_vars)))
new_return_vars <- vector('list', length(vars_to_add))
names(new_return_vars) <- unlist(var_params)[vars_to_add]
return_vars <- c(return_vars, new_return_vars)
.warning(paste0("All '*_var' params must associate a dimension to one of the ",
"requested variables in 'return_vars'. The following variables",
" have been added to 'return_vars': ",
paste(paste0("'", unlist(var_params), "'"), collapse = ', ')))
}
replace_values <- vector('list', length = length(file_dims))
names(replace_values) <- file_dims
# Take the first selector for all possible file dimensions
for (file_dim in file_dims) {
if (file_dim %in% names(var_params)) {
.warning(paste0("The '", file_dim, "_var' param will be ignored since '",
file_dim, "' is a file dimension (for the dataset with pattern ",
dat[[i]][['path']], ")."))
}
if (!is.list(dat_selectors[[file_dim]]) ||
(is.list(dat_selectors[[file_dim]]) &&
length(dat_selectors[[file_dim]]) == 2 &&
is.null(names(dat_selectors[[file_dim]])))) {
dat_selectors[[file_dim]] <- list(dat_selectors[[file_dim]])
}
first_class <- class(dat_selectors[[file_dim]][[1]])
first_length <- length(dat_selectors[[file_dim]][[1]])
for (j in 1:length(dat_selectors[[file_dim]])) {
sv <- selector_vector <- dat_selectors[[file_dim]][[j]]
if (!identical(first_class, class(sv)) ||
!identical(first_length, length(sv))) {
stop("All provided selectors for depending dimensions must ",
"be vectors of the same length and of the same class.")
}
if (is.character(sv) && !((length(sv) == 1) && (sv[1] %in% c('all', 'first', 'last')))) {
dat_selectors[[file_dim]][[j]] <- selector_checker(selectors = sv,
return_indices = FALSE)
# Take chunk if needed
dat_selectors[[file_dim]][[j]] <- dat_selectors[[file_dim]][[j]][chunk_indices(length(dat_selectors[[file_dim]][[j]]),
chunks[[file_dim]]['chunk'],
chunks[[file_dim]]['n_chunks'],
file_dim)]
} else if (!(is.numeric(sv) ||
(is.character(sv) && (length(sv) == 1) && (sv %in% c('all', 'first', 'last'))) ||
(is.list(sv) && (length(sv) == 2) && (all(sapply(sv, is.character)) ||
all(sapply(sv, is.numeric)))))) {
stop("All explicitly provided selectors for file dimensions must be character strings.")
}
}
sv <- dat_selectors[[file_dim]][[1]]
if (is.character(sv) && !((length(sv) == 1) && (sv[1] %in% c('all', 'first', 'last')))) {
replace_values[[file_dim]] <- dat_selectors[[file_dim]][[1]][1]
}
}
#print("C")
# Now we know which dimensions whose selectors are provided non-explicitly.
undefined_file_dims <- file_dims[which(sapply(replace_values, is.null))]
defined_file_dims <- file_dims[which(!(file_dims %in% undefined_file_dims))]
# Quickly check if the depending dimensions are provided properly.
for (file_dim in file_dims) {
if (file_dim %in% names(depending_file_dims)) {
## TODO: Detect multi-dependancies and forbid.
if (all(c(file_dim, depending_file_dims[[file_dim]]) %in% defined_file_dims)) {
if (length(dat_selectors[[file_dim]]) != length(dat_selectors[[depending_file_dims[[file_dim]]]][[1]])) {
stop(paste0("If providing selectors for the depending ",
"dimension '", file_dim, "', a ",
"vector of selectors must be provided for ",
"each selector of the dimension it depends on, '",
depending_file_dims[[file_dim]], "'."))
} else if (!all(names(dat_selectors[[file_dim]]) == dat_selectors[[depending_file_dims[[file_dim]]]][[1]])) {
stop(paste0("If providing selectors for the depending ",
"dimension '", file_dim, "', the name of the ",
"provided vectors of selectors must match ",
"exactly the selectors of the dimension it ",
"depends on, '", depending_file_dims[[file_dim]], "'."))
}
}
}
}
# Find the possible values for the selectors that are provided as
# indices. If the requested file is on server, impossible operation.
if (length(grep("^http", dat[[i]][['path']])) > 0) {
if (length(undefined_file_dims) > 0) {
stop(paste0("All selectors for the file dimensions must be ",
"character strings if requesting data to a remote ",
"server. Found invalid selectors for the file dimensions ",
paste(paste0("'", undefined_file_dims, "'"), collapse = ', '), "."))
}
dataset_has_files[i] <- TRUE
} else {
dat[[i]][['path']] <- path.expand(dat[[i]][['path']])
# Iterate over the known dimensions to find the first existing file.
# The path to the first existing file will be used to find the
# values for the non explicitly defined selectors.
first_file <- NULL
first_file_selectors <- NULL
if (length(undefined_file_dims) > 0) {
replace_values[undefined_file_dims] <- '*'
}
## TODO: What if length of defined_file_dims is 0? code might crash (in practice it worked for an example case)
files_to_check <- sapply(dat_selectors[defined_file_dims], function(x) length(x[[1]]))
sub_array_of_files_to_check <- array(1:prod(files_to_check), dim = files_to_check)
j <- 1
#print("D")
while (j <= prod(files_to_check) && is.null(first_file)) {
selector_indices <- which(sub_array_of_files_to_check == j, arr.ind = TRUE)[1, ]
selectors <- sapply(1:length(defined_file_dims),
function (x) {
vector_to_pick <- 1
if (defined_file_dims[x] %in% names(depending_file_dims)) {
vector_to_pick <- selector_indices[which(defined_file_dims == depending_file_dims[[defined_file_dims[x]]])]
}
dat_selectors[defined_file_dims][[x]][[vector_to_pick]][selector_indices[x]]
})
replace_values[defined_file_dims] <- selectors
file_path <- .ReplaceVariablesInString(dat[[i]][['path']], replace_values)
file_path <- Sys.glob(file_path)
if (length(file_path) > 0) {
first_file <- file_path[1]
first_file_selectors <- selectors
}
j <- j + 1
}
#print("E")
# Start looking for values for the non-explicitly defined selectors.
if (is.null(first_file)) {
.warning(paste0("No found files for the datset '", dat[[i]][['name']],
"'. Provide existing selectors for the file dimensions ",
" or check and correct its path pattern: ", dat[[i]][['path']]))
} else {
dataset_has_files[i] <- TRUE
## TODO: Improve message here if no variable found:
if (length(undefined_file_dims) > 0) {
# Looking for the first values, parsed from first_file.
first_values <- vector('list', length = length(undefined_file_dims))
names(first_values) <- undefined_file_dims
found_values <- 0
stop <- FALSE
try_dim <- 1
last_success <- 1
while ((found_values < length(undefined_file_dims)) && !stop) {
u_file_dim <- undefined_file_dims[try_dim]
if (is.null(first_values[[u_file_dim]])) {
path_with_globs_and_tag <- .ReplaceVariablesInString(dat[[i]][['path']],
replace_values[-which(file_dims == u_file_dim)],
allow_undefined_key_vars = TRUE)
found_value <- .FindTagValue(path_with_globs_and_tag,
first_file, u_file_dim)
if (!is.null(found_value)) {
found_values <- found_values + 1
last_success <- try_dim
first_values[[u_file_dim]] <- found_value
replace_values[[u_file_dim]] <- found_value
}
}
try_dim <- (try_dim %% length(undefined_file_dims)) + 1
if (try_dim == last_success) {
stop <- TRUE
}
}
if (found_values < length(undefined_file_dims)) {
stop(paste0("Path pattern of dataset '", dat[[i]][['name']],
"' is too complex. Could not automatically ",
"detect values for all non-explicitly defined ",
"indices. Check its pattern: ", dat[[i]][['path']]))
}
## TODO: Replace ReplaceGlobExpressions by looped call to FindTagValue? As done above
## Maybe it can solve more cases actually. I got warnings in ReplGlobExp with a typical
## cmor case, requesting all members and chunks for fixed var and sdate. Not fixing
## sdate raised 'too complex' error.
# Replace shell globs in path pattern and keep the file_dims as tags
dat[[i]][['path']] <- .ReplaceGlobExpressions(dat[[i]][['path']], first_file, replace_values,
file_dims, dat[[i]][['name']], path_glob_permissive)
# Now time to look for the available values for the non
# explicitly defined selectors for the file dimensions.
#print("H")
# Check first the ones that do not depend on others.
ufd <- c(undefined_file_dims[which(!(undefined_file_dims %in% names(depending_file_dims)))],
undefined_file_dims[which(undefined_file_dims %in% names(depending_file_dims))])
for (u_file_dim in ufd) {
replace_values[undefined_file_dims] <- first_values
replace_values[[u_file_dim]] <- '*'
depended_dim <- NULL
depended_dim_values <- NA
selectors <- dat_selectors[[u_file_dim]][[1]]
if (u_file_dim %in% names(depending_file_dims)) {
depended_dim <- depending_file_dims[[u_file_dim]]
depended_dim_values <- dat_selectors[[depended_dim]][[1]]
dat_selectors[[u_file_dim]] <- vector('list', length = length(depended_dim_values))
names(dat_selectors[[u_file_dim]]) <- depended_dim_values
} else {
dat_selectors[[u_file_dim]] <- list()
}
if (u_file_dim %in% unlist(depending_file_dims)) {
depending_dims <- names(depending_file_dims)[which(sapply(depending_file_dims, function(x) u_file_dim %in% x))]
replace_values[depending_dims] <- rep('*', length(depending_dims))
}
for (j in 1:length(depended_dim_values)) {
parsed_values <- c()
if (!is.null(depended_dim)) {
replace_values[[depended_dim]] <- depended_dim_values[j]
}
path_with_globs <- .ReplaceVariablesInString(dat[[i]][['path']], replace_values)
found_files <- Sys.glob(path_with_globs)
## TODO: Enhance this error message, or change by warning.
## Raises if a wrong sdate is specified, for example.
if (length(found_files) == 0) {
.warning(paste0("Could not find files for any '", u_file_dim,
"' for '", depended_dim, "' = '",
depended_dim_values[j], "'."))
dat_selectors[[u_file_dim]][[j]] <- NA
} else {
for (found_file in found_files) {
path_with_globs_and_tag <- .ReplaceVariablesInString(dat[[i]][['path']],
replace_values[-which(file_dims == u_file_dim)],
allow_undefined_key_vars = TRUE)
parsed_values <- c(parsed_values,
.FindTagValue(path_with_globs_and_tag, found_file,
u_file_dim))
}
dat_selectors[[u_file_dim]][[j]] <- selector_checker(selectors = selectors,
var = unique(parsed_values),
return_indices = FALSE)
# Take chunk if needed
dat_selectors[[u_file_dim]][[j]] <- dat_selectors[[u_file_dim]][[j]][chunk_indices(length(dat_selectors[[u_file_dim]][[j]]),
chunks[[u_file_dim]]['chunk'],
chunks[[u_file_dim]]['n_chunks'],
u_file_dim)]
}
}
}
#print("I")
} else {
dat[[i]][['path']] <- .ReplaceGlobExpressions(dat[[i]][['path']], first_file, replace_values,
defined_file_dims, dat[[i]][['name']], path_glob_permissive)
}
}
}
# Now fetch for the first available file
if (dataset_has_files[i]) {
known_dims <- file_dims
} else {
known_dims <- defined_file_dims
}
replace_values <- vector('list', length = length(known_dims))
names(replace_values) <- known_dims
files_to_load <- sapply(dat_selectors[known_dims], function(x) length(x[[1]]))
files_to_load[found_pattern_dim] <- 1
sub_array_of_files_to_load <- array(1:prod(files_to_load),
dim = files_to_load)
names(dim(sub_array_of_files_to_load)) <- known_dims
sub_array_of_not_found_files <- array(!dataset_has_files[i],
dim = files_to_load)
names(dim(sub_array_of_not_found_files)) <- known_dims
j <- 1
while (j <= prod(files_to_load)) {
selector_indices <- which(sub_array_of_files_to_load == j, arr.ind = TRUE)[1, ]
names(selector_indices) <- known_dims
selectors <- sapply(1:length(known_dims),
function (x) {
vector_to_pick <- 1
if (known_dims[x] %in% names(depending_file_dims)) {
vector_to_pick <- selector_indices[which(known_dims == depending_file_dims[[known_dims[x]]])]
}
dat_selectors[known_dims][[x]][[vector_to_pick]][selector_indices[x]]
})
names(selectors) <- known_dims
replace_values[known_dims] <- selectors
if (!dataset_has_files[i]) {
if (any(is.na(selectors))) {
replace_values <- replace_values[-which(names(replace_values) %in% names(selectors[which(is.na(selectors))]))]
}
file_path <- .ReplaceVariablesInString(dat[[i]][['path']], replace_values, TRUE)
sub_array_of_files_to_load[j] <- file_path
#sub_array_of_not_found_files[j] <- TRUE???
} else {
if (any(is.na(selectors))) {
replace_values <- replace_values[-which(names(replace_values) %in% names(selectors[which(is.na(selectors))]))]
file_path <- .ReplaceVariablesInString(dat[[i]][['path']], replace_values, TRUE)
sub_array_of_files_to_load[j] <- file_path
sub_array_of_not_found_files[j] <- TRUE
} else {
file_path <- .ReplaceVariablesInString(dat[[i]][['path']], replace_values)
if (!(length(grep("^http", file_path)) > 0)) {
if (grepl(file_path, '*', fixed = TRUE)) {
file_path_full <- Sys.glob(file_path)[1]
if (nchar(file_path_full) > 0) {
file_path <- file_path_full
}
}
}
sub_array_of_files_to_load[j] <- file_path
if (is.null(indices_of_first_files_with_data[[i]])) {
if (!(length(grep("^http", file_path)) > 0)) {
if (!file.exists(file_path)) {
file_path <- NULL
}
}
if (!is.null(file_path)) {
test_file <- NULL
## TODO: suppress error messages
test_file <- file_opener(file_path)
if (!is.null(test_file)) {
selector_indices[which(known_dims == found_pattern_dim)] <- i
indices_of_first_files_with_data[[i]] <- selector_indices
selectors_of_first_files_with_data[[i]] <- selectors
file_closer(test_file)
}
}
}
}
}
j <- j + 1
}
# Extend array as needed progressively
if (is.null(array_of_files_to_load)) {
array_of_files_to_load <- sub_array_of_files_to_load
array_of_not_found_files <- sub_array_of_not_found_files
} else {
array_of_files_to_load <- .MergeArrays(array_of_files_to_load, sub_array_of_files_to_load,
along = found_pattern_dim)
## TODO: file_dims, and variables like that.. are still ok now? I don't think so
array_of_not_found_files <- .MergeArrays(array_of_not_found_files, sub_array_of_not_found_files,
along = found_pattern_dim)
}
dat[[i]][['selectors']] <- dat_selectors
}
if (all(sapply(indices_of_first_files_with_data, is.null))) {
stop("No data files found for any of the specified datasets.")
}
########################### READING INNER DIMS. #############################
#print("J")
## TODO: To be run in parallel (local multi-core)
# Now time to work out the inner file dimensions.
# First pick the requested variables.
dims_to_iterate <- NULL
for (return_var in names(return_vars)) {
dims_to_iterate <- unique(c(dims_to_iterate, return_vars[[return_var]]))
}
if (found_pattern_dim %in% dims_to_iterate) {
dims_to_iterate <- dims_to_iterate[-which(dims_to_iterate == found_pattern_dim)]
}
common_return_vars <- NULL
common_first_found_file <- NULL
common_return_vars_pos <- NULL
if (length(return_vars) > 0) {
common_return_vars_pos <- which(sapply(return_vars, function(x) !(found_pattern_dim %in% x)))
}
if (length(common_return_vars_pos) > 0) {
common_return_vars <- return_vars[common_return_vars_pos]
return_vars <- return_vars[-common_return_vars_pos]
common_first_found_file <- rep(FALSE, length(which(sapply(common_return_vars, length) == 0)))
names(common_first_found_file) <- names(common_return_vars[which(sapply(common_return_vars, length) == 0)])
}
return_vars <- lapply(return_vars,
function(x) {
if (found_pattern_dim %in% x) {
x[-which(x == found_pattern_dim)]
} else {
x
}
})
if (length(common_return_vars) > 0) {
picked_common_vars <- vector('list', length = length(common_return_vars))
names(picked_common_vars) <- names(common_return_vars)
} else {
picked_common_vars <- NULL
}
picked_common_vars_ordered <- picked_common_vars
picked_common_vars_unorder_indices <- picked_common_vars
picked_vars <- vector('list', length = length(dat))
names(picked_vars) <- dat_names
picked_vars_ordered <- picked_vars
picked_vars_unorder_indices <- picked_vars
for (i in 1:length(dat)) {
if (dataset_has_files[i]) {
# Put all selectors in a list of a single list/vector of selectors.
# The dimensions that go across files will later be extended to have
# lists of lists/vectors of selectors.
for (inner_dim in expected_inner_dims[[i]]) {
if (!is.list(dat[[i]][['selectors']][[inner_dim]]) ||
(is.list(dat[[i]][['selectors']][[inner_dim]]) &&
length(dat[[i]][['selectors']][[inner_dim]]) == 2 &&
is.null(names(dat[[i]][['selectors']][[inner_dim]])))) {
dat[[i]][['selectors']][[inner_dim]] <- list(dat[[i]][['selectors']][[inner_dim]])
}
}
if (length(return_vars) > 0) {
picked_vars[[i]] <- vector('list', length = length(return_vars))
names(picked_vars[[i]]) <- names(return_vars)
picked_vars_ordered[[i]] <- picked_vars[[i]]
picked_vars_unorder_indices[[i]] <- picked_vars[[i]]
}
indices_of_first_file <- as.list(indices_of_first_files_with_data[[i]])
array_file_dims <- sapply(dat[[i]][['selectors']][found_file_dims[[i]]], function(x) length(x[[1]]))
names(array_file_dims) <- found_file_dims[[i]]
if (length(dims_to_iterate) > 0) {
indices_of_first_file[dims_to_iterate] <- lapply(array_file_dims[dims_to_iterate], function(x) 1:x)
}
array_of_var_files <- do.call('[', c(list(x = array_of_files_to_load), indices_of_first_file, list(drop = FALSE)))
array_of_var_indices <- array(1:length(array_of_var_files), dim = dim(array_of_var_files))
array_of_not_found_var_files <- do.call('[', c(list(x = array_of_not_found_files), indices_of_first_file, list(drop = FALSE)))
previous_indices <- rep(-1, length(indices_of_first_file))
names(previous_indices) <- names(indices_of_first_file)
first_found_file <- NULL
if (length(return_vars) > 0) {
first_found_file <- rep(FALSE, length(which(sapply(return_vars, length) == 0)))
names(first_found_file) <- names(return_vars[which(sapply(return_vars, length) == 0)])
}
for (j in 1:length(array_of_var_files)) {
current_indices <- which(array_of_var_indices == j, arr.ind = TRUE)[1, ]
names(current_indices) <- names(indices_of_first_file)
if (!is.na(array_of_var_files[j]) && !array_of_not_found_var_files[j]) {
changed_dims <- which(current_indices != previous_indices)
vars_to_read <- NULL
if (length(return_vars) > 0) {
vars_to_read <- names(return_vars)[sapply(return_vars, function(x) any(names(changed_dims) %in% x))]
}
if (!is.null(first_found_file)) {
if (any(!first_found_file)) {
vars_to_read <- c(vars_to_read, names(first_found_file[which(!first_found_file)]))
}
}
if ((i == 1) && (length(common_return_vars) > 0)) {
vars_to_read <- c(vars_to_read, names(common_return_vars)[sapply(common_return_vars, function(x) any(names(changed_dims) %in% x))])
}
if (!is.null(common_first_found_file)) {
if (any(!common_first_found_file)) {
vars_to_read <- c(vars_to_read, names(common_first_found_file[which(!common_first_found_file)]))
}
}
file_object <- file_opener(array_of_var_files[j])
if (!is.null(file_object)) {
for (var_to_read in vars_to_read) {
if (var_to_read %in% unlist(var_params)) {
associated_dim_name <- names(var_params)[which(unlist(var_params) == var_to_read)]
}
var_name_to_reader <- var_to_read
names(var_name_to_reader) <- 'var'
var_dims <- file_dim_reader(NULL, file_object, var_name_to_reader, NULL,
synonims)
# file_dim_reader returns dimension names as found in the file.
# Need to translate accoridng to synonims:
names(var_dims) <- sapply(names(var_dims),
function(x) {
which_entry <- which(sapply(synonims, function(y) x %in% y))
if (length(which_entry) > 0) {
names(synonims)[which_entry]
} else {
x
}
})
if (!is.null(var_dims)) {
var_file_dims <- NULL
if (var_to_read %in% names(common_return_vars)) {
var_to_check <- common_return_vars[[var_to_read]]
} else {
var_to_check <- return_vars[[var_to_read]]
}
if (any(names(dim(array_of_files_to_load)) %in% var_to_check)) {
var_file_dims <- dim(array_of_files_to_load)[which(names(dim(array_of_files_to_load)) %in%
var_to_check)]
}
if (((var_to_read %in% names(common_return_vars)) &&
is.null(picked_common_vars[[var_to_read]])) ||
((var_to_read %in% names(return_vars)) &&
is.null(picked_vars[[i]][[var_to_read]]))) {
if (any(names(var_file_dims) %in% names(var_dims))) {
stop("Found a requested var in 'return_var' requested for a ",
"file dimension which also appears in the dimensions of ",
"the variable inside the file.\n", array_of_var_files[j])
}
special_types <- list('POSIXct' = as.POSIXct, 'POSIXlt' = as.POSIXlt,
'Date' = as.Date)
first_sample <- file_var_reader(NULL, file_object, NULL,
var_to_read, synonims)
if (any(class(first_sample) %in% names(special_types))) {
array_size <- prod(c(var_file_dims, var_dims))
new_array <- rep(special_types[[class(first_sample)[1]]](NA), array_size)
dim(new_array) <- c(var_file_dims, var_dims)
} else {
new_array <- array(dim = c(var_file_dims, var_dims))
}
attr(new_array, 'variables') <- attr(first_sample, 'variables')
if (var_to_read %in% names(common_return_vars)) {
picked_common_vars[[var_to_read]] <- new_array
pick_ordered <- FALSE
if (var_to_read %in% unlist(var_params)) {
if (associated_dim_name %in% names(dim_reorder_param) && !aiat) {
picked_common_vars_ordered[[var_to_read]] <- new_array
pick_ordered <- TRUE
}
}
if (!pick_ordered) {
picked_common_vars_ordered[[var_to_read]] <- NULL
}
} else {
picked_vars[[i]][[var_to_read]] <- new_array
pick_ordered <- FALSE
if (var_to_read %in% unlist(var_params)) {
if (associated_dim_name %in% names(dim_reorder_params) && !aiat) {
picked_vars_ordered[[i]][[var_to_read]] <- new_array
pick_ordered <- TRUE
}
}
if (!pick_ordered) {
picked_vars_ordered[[i]][[var_to_read]] <- NULL
}
}
} else {
if (var_to_read %in% names(common_return_vars)) {
array_var_dims <- dim(picked_common_vars[[var_to_read]])
} else {
array_var_dims <- dim(picked_vars[[i]][[var_to_read]])
}
full_array_var_dims <- array_var_dims
if (any(names(array_var_dims) %in% names(var_file_dims))) {
array_var_dims <- array_var_dims[-which(names(array_var_dims) %in% names(var_file_dims))]
}
if (names(array_var_dims) != names(var_dims)) {
stop("Error while reading the variable '", var_to_read, "' from ",
"the file. Dimensions do not match.\nExpected ",
paste(paste0("'", names(array_var_dims), "'"),
collapse = ', '), " but found ",
paste(paste0("'", names(var_dims), "'"),
collapse = ', '), ".\n", array_of_var_files[j])
}
if (any(var_dims > array_var_dims)) {
longer_dims <- which(var_dims > array_var_dims)
if (length(longer_dims) == 1) {
longer_dims_in_full_array <- longer_dims
if (any(names(full_array_var_dims) %in% names(var_file_dims))) {
candidates <- (1:length(full_array_var_dims))[-which(names(full_array_var_dims) %in% names(var_file_dims))]
longer_dims_in_full_array <- candidates[longer_dims]
}
padding_dims <- full_array_var_dims
padding_dims[longer_dims_in_full_array] <- var_dims[longer_dims] -
array_var_dims[longer_dims]
special_types <- list('POSIXct' = as.POSIXct, 'POSIXlt' = as.POSIXlt,
'Date' = as.Date)
if (var_to_read %in% names(common_return_vars)) {
var_class <- class(picked_common_vars[[var_to_read]])
} else {
var_class <- class(picked_vars[[i]][[var_to_read]])
}
if (any(var_class %in% names(special_types))) {
padding_size <- prod(padding_dims)
padding <- rep(special_types[[var_class[1]]](NA), padding_size)
dim(padding) <- padding_dims
} else {
padding <- array(dim = padding_dims)
}
if (var_to_read %in% names(common_return_vars)) {
picked_common_vars[[var_to_read]] <- .abind2(
picked_common_vars[[var_to_read]],
padding,
names(full_array_var_dims)[longer_dims_in_full_array]
)
} else {
picked_vars[[i]][[var_to_read]] <- .abind2(
picked_vars[[i]][[var_to_read]],
padding,
names(full_array_var_dims)[longer_dims_in_full_array]
)
}
} else {
stop("Error while reading the variable '", var_to_read, "' from ",
"the file. Found size (", paste(var_dims, collapse = ' x '),
") is greater than expected maximum size (",
array_var_dims, ").")
}
}
}
var_store_indices <- c(as.list(current_indices[names(var_file_dims)]), lapply(var_dims, function(x) 1:x))
var_values <- file_var_reader(NULL, file_object, NULL, var_to_read, synonims)
if (var_to_read %in% unlist(var_params)) {
if ((associated_dim_name %in% names(dim_reorder_params)) && !aiat) {
## Is this check really needed?
if (length(dim(var_values)) > 1) {
stop("Requested a '", associated_dim_name, "_reorder' for a dimension ",
"whose coordinate variable that has more than 1 dimension. This is ",
"not supported.")
}
ordered_var_values <- dim_reorder_params[[associated_dim_name]](var_values)
attr(ordered_var_values$x, 'variables') <- attr(var_values, 'variables')
if (!all(c('x', 'ix') %in% names(ordered_var_values))) {
stop("All the dimension reorder functions must return a list with the components 'x' and 'ix'.")
}
# Save the indices to reorder back the ordered variable values.
# This will be used to define the first round indices.
unorder <- sort(ordered_var_values$ix, index.return = TRUE)$ix
if (var_to_read %in% names(common_return_vars)) {
picked_common_vars_ordered[[var_to_read]] <- do.call('[<-',
c(list(x = picked_common_vars_ordered[[var_to_read]]),
var_store_indices,
list(value = ordered_var_values$x)))
picked_common_vars_unorder_indices[[var_to_read]] <- do.call('[<-',
c(list(x = picked_common_vars_unorder_indices[[var_to_read]]),
var_store_indices,
list(value = unorder)))
} else {
picked_vars_ordered[[i]][[var_to_read]] <- do.call('[<-',
c(list(x = picked_vars_ordered[[i]][[var_to_read]]),
var_store_indices,
list(value = ordered_var_values$x)))
picked_vars_unorder_indices[[i]][[var_to_read]] <- do.call('[<-',
c(list(x = picked_vars_unorder_indices[[i]][[var_to_read]]),
var_store_indices,
list(value = unorder)))
}
}
}
if (var_to_read %in% names(common_return_vars)) {
picked_common_vars[[var_to_read]] <- do.call('[<-',
c(list(x = picked_common_vars[[var_to_read]]),
var_store_indices,
list(value = var_values)))
} else {
picked_vars[[i]][[var_to_read]] <- do.call('[<-',
c(list(x = picked_vars[[i]][[var_to_read]]),
var_store_indices,
list(value = var_values)))
}
if (var_to_read %in% names(first_found_file)) {
first_found_file[var_to_read] <- TRUE
}
if (var_to_read %in% names(common_first_found_file)) {
common_first_found_file[var_to_read] <- TRUE
}
} else {
stop("Could not find variable '", var_to_read,
"' in the file ", array_of_var_files[j])
}
}
file_closer(file_object)
}
}
previous_indices <- current_indices
}
}
}
# Once we have the variable values, we can work out the indices
# for the implicitly defined selectors.
#
# Trnasforms a vector of indices v expressed in a world of
# length N from 1 to N, into a world of length M, from
# 1 to M. Repeated adjacent indices are collapsed.
transform_indices <- function(v, n, m) {
#unique2 turns e.g. 1 1 2 2 2 3 3 1 1 1 into 1 2 3 1
unique2 <- function(v) {
if (length(v) < 2) {
v
} else {
v[c(1, v[2:length(v)] - v[1:(length(v) - 1)]) != 0]
}
}
unique2(round(((v - 1) / (n - 1)) * (m - 1))) + 1 # this rounding may generate 0s. what then?
}
beta <- transform_extra_cells
dims_to_crop <- vector('list')
transformed_vars <- vector('list', length = length(dat))
names(transformed_vars) <- dat_names
transformed_vars_ordered <- transformed_vars
transformed_vars_unorder_indices <- transformed_vars
transformed_common_vars <- NULL
transformed_common_vars_ordered <- NULL
transformed_common_vars_unorder_indices <- NULL
for (i in 1:length(dat)) {
if (dataset_has_files[i]) {
indices <- indices_of_first_files_with_data[[i]]
if (!is.null(indices)) {
file_path <- do.call("[", c(list(array_of_files_to_load), as.list(indices_of_first_files_with_data[[i]])))
# The following 5 lines should go several lines below, but were moved
# here for better performance.
# If any of the dimensions comes without defining variable, then we read
# the data dimensions.
data_dims <- NULL
if (length(unlist(var_params[expected_inner_dims[[i]]])) < length(expected_inner_dims[[i]])) {
file_to_open <- file_path
data_dims <- file_dim_reader(file_to_open, NULL, selectors_of_first_files_with_data[[i]],
lapply(dat[[i]][['selectors']][expected_inner_dims[[i]]], '[[', 1),
synonims)
# file_dim_reader returns dimension names as found in the file.
# Need to translate accoridng to synonims:
names(data_dims) <- sapply(names(data_dims),
function(x) {
which_entry <- which(sapply(synonims, function(y) x %in% y))
if (length(which_entry) > 0) {
names(synonims)[which_entry]
} else {
x
}
})
}
# Transform the variables if needed and keep them apart.
if (!is.null(transform) && (length(transform_vars) > 0)) {
if (!all(transform_vars %in% c(names(picked_vars[[i]]), names(picked_common_vars)))) {
stop("Could not find all the required variables in 'transform_vars' ",
"for the dataset '", dat[[i]][['name']], "'.")
}
vars_to_transform <- NULL
picked_vars_to_transform <- which(names(picked_vars[[i]]) %in% transform_vars)
if (length(picked_vars_to_transform) > 0) {
picked_vars_to_transform <- names(picked_vars[[i]])[picked_vars_to_transform]
new_vars_to_transform <- picked_vars[[i]][picked_vars_to_transform]
which_are_ordered <- which(!sapply(picked_vars_ordered[[i]][picked_vars_to_transform], is.null))
if (length(which_are_ordered) > 0) {
new_vars_to_transform[which_are_ordered] <- picked_vars_ordered[[i]][which_are_ordered]
}
vars_to_transform <- c(vars_to_transform, new_vars_to_transform)
}
picked_common_vars_to_transform <- which(names(picked_common_vars) %in% transform_vars)
if (length(picked_common_vars_to_transform) > 0) {
picked_common_vars_to_transform <- names(picked_common_vars)[picked_common_vars_to_transform]
new_vars_to_transform <- picked_common_vars[[i]][picked_common_vars_to_transform]
which_are_ordered <- which(!sapply(picked_common_vars_ordered[[i]][picked_common_vars_to_transform], is.null))
if (length(which_are_ordered) > 0) {
new_vars_to_transform[which_are_ordered] <- picked_common_vars_ordered[[i]][which_are_ordered]
}
vars_to_transform <- c(vars_to_transform, new_vars_to_transform)
}
# Transform the variables
transformed_data <- do.call(transform, c(list(data_array = NULL,
variables = vars_to_transform,
file_selectors = selectors_of_first_files_with_data[[i]]),
transform_params))
# Discard the common transformed variables if already transformed before
if (!is.null(transformed_common_vars)) {
common_ones <- which(names(picked_common_vars) %in% names(transformed_data$variables))
if (length(common_ones) > 0) {
transformed_data$variables <- transformed_data$variables[-common_ones]
}
}
transformed_vars[[i]] <- list()
transformed_vars_ordered[[i]] <- list()
transformed_vars_unorder_indices[[i]] <- list()
# Order the transformed variables if needed
# 'var_to_read' should be 'transformed_var', but is kept to reuse the same code as above.
for (var_to_read in names(transformed_data$variables)) {
if (var_to_read %in% unlist(var_params)) {
associated_dim_name <- names(var_params)[which(unlist(var_params) == var_to_read)]
if ((associated_dim_name %in% names(dim_reorder_params)) && aiat) {
## Is this check really needed?
if (length(dim(transformed_data$variables[[associated_dim_name]])) > 1) {
stop("Requested a '", associated_dim_name, "_reorder' for a dimension ",
"whose coordinate variable that has more than 1 dimension (after ",
"transform). This is not supported.")
}
ordered_var_values <- dim_reorder_params[[associated_dim_name]](transformed_data$variables[[associated_dim_name]])
attr(ordered_var_values, 'variables') <- attr(transformed_data$variables[[associated_dim_name]], 'variables')
if (!all(c('x', 'ix') %in% names(ordered_var_values))) {
stop("All the dimension reorder functions must return a list with the components 'x' and 'ix'.")
}
# Save the indices to reorder back the ordered variable values.
# This will be used to define the first round indices.
unorder <- sort(ordered_var_values$ix, index.return = TRUE)$ix
if (var_to_read %in% names(picked_common_vars)) {
transformed_common_vars_ordered[[var_to_read]] <- ordered_var_values$x
transformed_common_vars_unorder_indices[[var_to_read]] <- unorder
} else {
transformed_vars_ordered[[i]][[var_to_read]] <- ordered_var_values$x
transformed_vars_unorder_indices[[i]][[var_to_read]] <- unorder
}
}
}
}
transformed_picked_vars <- which(names(picked_vars[[i]]) %in% names(transformed_data$variables))
if (length(transformed_picked_vars) > 0) {
transformed_picked_vars <- names(picked_vars[[i]])[transformed_picked_vars]
transformed_vars[[i]][transformed_picked_vars] <- transformed_data$variables[transformed_picked_vars]
}
if (is.null(transformed_common_vars)) {
transformed_picked_common_vars <- which(names(picked_common_vars) %in% names(transformed_data$variables))
if (length(transformed_picked_common_vars) > 0) {
transformed_picked_common_vars <- names(picked_common_vars)[transformed_picked_common_vars]
transformed_common_vars <- transformed_data$variables[transformed_picked_common_vars]
}
}
}
# Once the variables are transformed, we compute the indices to be
# taken for each inner dimension.
# In all cases, indices will have to be computed to know which data
# values to take from the original data for each dimension (if a
# variable is specified for that dimension, it will be used to
# convert the provided selectors into indices). These indices are
# referred to as 'first round of indices'.
# The taken data will then be transformed if needed, together with
# the dimension variable if specified, and, in that case, indices
# will have to be computed again to know which values to take from the
# transformed data. These are the 'second round of indices'. In the
# case there is no transformation, the second round of indices will
# be all the available indices, i.e. from 1 to the number of taken
# values with the first round of indices.
for (inner_dim in expected_inner_dims[[i]]) {
if (debug) {
print("-> DEFINING INDICES FOR INNER DIMENSION:")
print(inner_dim)
}
file_dim <- NULL
if (inner_dim %in% unlist(inner_dims_across_files)) {
file_dim <- names(inner_dims_across_files)[which(sapply(inner_dims_across_files, function(x) inner_dim %in% x))[1]]
chunk_amount <- length(dat[[i]][['selectors']][[file_dim]][[1]])
names(chunk_amount) <- file_dim
} else {
chunk_amount <- 1
}
# In the special case that the selectors for a dimension are 'all', 'first', ...
# and chunking (dividing in more than 1 chunk) is requested, the selectors are
# replaced for equivalent indices.
if ((dat[[i]][['selectors']][[inner_dim]][[1]] %in% c('all', 'first', 'last')) &&
(chunks[[inner_dim]]['n_chunks'] != 1)) {
selectors <- dat[[i]][['selectors']][[inner_dim]][[1]]
if (selectors == 'all') {
selectors <- indices(1:(data_dims[[inner_dim]] * chunk_amount))
} else if (selectors == 'first') {
selectors <- indices(1)
} else {
selectors <- indices(data_dims[[inner_dim]] * chunk_amount)
}
dat[[i]][['selectors']][[inner_dim]][[1]] <- selectors
}
# The selectors for the inner dimension are taken.
selector_array <- dat[[i]][['selectors']][[inner_dim]][[1]]
if (debug) {
if (inner_dim %in% dims_to_check) {
print(paste0("-> DEBUG MESSAGES FOR THE DATASET", i, " AND INNER DIMENSION '", inner_dim, "':"))
print("-> STRUCTURE OF SELECTOR ARRAY:")
print(str(selector_array))
print("-> PICKED VARS:")
print(picked_vars)
print("-> TRANSFORMED VARS:")
print(transformed_vars)
}
}
if (is.null(dim(selector_array))) {
dim(selector_array) <- length(selector_array)
}
if (is.null(names(dim(selector_array)))) {
if (length(dim(selector_array)) == 1) {
names(dim(selector_array)) <- inner_dim
} else {
stop("Provided selector arrays must be provided with dimension ",
"names. Found an array of selectors without dimension names ",
"for the dimension '", inner_dim, "'.")
}
}
selectors_are_indices <- FALSE
if (!is.null(attr(selector_array, 'indices'))) {
if (!is.logical(attr(selector_array, 'indices'))) {
stop("The atribute 'indices' for the selectors for the dimension '",
inner_dim, "' must be TRUE or FALSE.")
}
selectors_are_indices <- attr(selector_array, 'indices')
}
taken_chunks <- rep(FALSE, chunk_amount)
selector_file_dims <- 1
if (any(found_file_dims[[i]] %in% names(dim(selector_array)))) {
selector_file_dims <- dim(selector_array)[which(names(dim(selector_array)) %in% found_file_dims[[i]])]
}
selector_inner_dims <- dim(selector_array)[which(!(names(dim(selector_array)) %in% found_file_dims[[i]]))]
var_with_selectors <- NULL
var_with_selectors_name <- var_params[[inner_dim]]
var_ordered <- NULL
var_unorder_indices <- NULL
with_transform <- FALSE
# If the selectors come with an associated variable
if (!is.null(var_with_selectors_name)) {
if ((var_with_selectors_name %in% transform_vars) && (!is.null(transform))) {
with_transform <- TRUE
if (!is.null(file_dim)) {
stop("Requested a transformation over the dimension '",
inner_dim, "', wich goes across files. This feature ",
"is not supported. Either do the request without the ",
"transformation or request it over dimensions that do ",
"not go across files.")
}
}
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> NAME OF THE VARIABLE WITH THE SELECTOR VALUES FOR THE CURRENT INNER DIMENSION:")
print(var_with_selectors_name)
print("-> NAMES OF THE VARIABLES TO BE TRANSFORMED:")
print(transform_vars)
print("-> STRUCTURE OF THE TRANSFORMATION FUNCTION:")
print(str(transform))
}
}
if (var_with_selectors_name %in% names(picked_vars[[i]])) {
var_with_selectors <- picked_vars[[i]][[var_with_selectors_name]]
var_ordered <- picked_vars_ordered[[i]][[var_with_selectors_name]]
var_unorder_indices <- picked_vars_unorder_indices[[i]][[var_with_selectors_name]]
} else if (var_with_selectors_name %in% names(picked_common_vars)) {
var_with_selectors <- picked_common_vars[[var_with_selectors_name]]
var_ordered <- picked_common_vars_ordered[[var_with_selectors_name]]
var_unorder_indices <- picked_common_vars_unorder_indices[[var_with_selectors_name]]
}
n <- prod(dim(var_with_selectors))
if (is.null(var_unorder_indices)) {
var_unorder_indices <- 1:n
}
if (with_transform) {
if (var_with_selectors_name %in% names(transformed_vars[[i]])) {
m <- prod(dim(transformed_vars[[i]][[var_with_selectors_name]]))
if (aiat) {
var_with_selectors <- transformed_vars[[i]][[var_with_selectors_name]]
var_ordered <- transformed_vars_ordered[[i]][[var_with_selectors_name]]
var_unorder_indices <- transformed_vars_unorder_indices[[i]][[var_with_selectors_name]]
}
} else if (var_with_selectors_name %in% names(transformed_common_vars)) {
m <- prod(dim(transformed_common_vars[[var_with_selectors_name]]))
if (aiat) {
var_with_selectors <- transformed_common_vars[[var_with_selectors_name]]
var_ordered <- transformed_common_vars_ordered[[var_with_selectors_name]]
var_unorder_indices <- transformed_common_vars_unorder_indices[[var_with_selectors_name]]
}
}
if (is.null(var_unorder_indices)) {
var_unorder_indices <- 1:m
}
}
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> SIZE OF ORIGINAL VARIABLE:")
print(n)
print("-> SIZE OF TRANSFORMED VARIABLE:")
if (with_transform) print(m)
print("-> STRUCTURE OF ORDERED VAR:")
print(str(var_ordered))
print("-> UNORDER INDICES:")
print(var_unorder_indices)
}
}
var_dims <- dim(var_with_selectors)
var_file_dims <- 1
if (any(names(var_dims) %in% found_file_dims[[i]])) {
if (with_transform) {
stop("Requested transformation for inner dimension '",
inner_dim, "' but provided selectors for such dimension ",
"over one or more file dimensions. This is not ",
"supported. Either request no transformation for the ",
"dimension '", inner_dim, "' or specify the ",
"selectors for this dimension without the file dimensions.")
}
var_file_dims <- var_dims[which(names(var_dims) %in% found_file_dims[[i]])]
var_dims <- var_dims[-which(names(var_dims) %in% found_file_dims[[i]])]
}
## # Keep the selectors if they correspond to a variable that will be transformed.
## if (with_transform) {
## if (var_with_selectors_name %in% names(picked_vars[[i]])) {
## transformed_var_with_selectors <- transformed_vars[[i]][[var_with_selectors_name]]
## } else if (var_with_selectors_name %in% names(picked_common_vars)) {
## transformed_var_with_selectors <- transformed_common_vars[[var_with_selectors_name]]
## }
## transformed_var_dims <- dim(transformed_var_with_selectors)
## transformed_var_file_dims <- 1
## if (any(names(transformed_var_dims) %in% found_file_dims[[i]])) {
## transformed_var_file_dims <- transformed_var_dims[which(names(transformed_var_dims) %in% found_file_dims[[i]])]
## transformed_var_dims <- tranasformed_var_dims[-which(names(transformed_var_dims) %in% found_file_dims[[i]])]
## }
##if (inner_dim %in% dims_to_check) {
##print("111m")
##print(str(transformed_var_dims))
##}
##
## m <- prod(transformed_var_dims)
## }
# Work out var file dims and inner dims.
if (inner_dim %in% unlist(inner_dims_across_files)) {
#TODO: if (chunk_amount != number of chunks in selector_file_dims), crash
if (length(var_dims) > 1) {
stop("Specified a '", inner_dim, "_var' for the dimension '",
inner_dim, "', which goes across files (across '", file_dim,
"'). The specified variable, '", var_with_selectors_name, "', has more ",
"than one dimension and can not be used as selector variable. ",
"Select another variable or fix it in the files.")
}
}
## TODO HERE::
#- indices_of_first_files_with_data may change, because array is now extended
var_full_dims <- dim(var_with_selectors)
if (!(inner_dim %in% names(var_full_dims))) {
stop("Could not find the dimension '", inner_dim, "' in ",
"the file. Either change the dimension name in ",
"your request, adjust the parameter ",
"'dim_names_in_files' or fix the dimension name in ",
"the file.\n", file_path)
}
} else if (((is.numeric(selector_array) || is.list(selector_array)) && selectors_are_indices) ||
(is.character(selector_array) && (length(selector_array) == 1) &&
(selector_array %in% c('all', 'first', 'last')) &&
!is.null(file_dim_reader))) {
#### TODO HERE::
###- indices_of_first_files_with_data may change, because array is now extended
# Lines moved above for better performance.
##data_dims <- file_dim_reader(file_path, NULL, selectors_of_first_files_with_data[[i]],
## lapply(dat[[i]][['selectors']][expected_inner_dims[[i]]], '[[', 1))
if (!(inner_dim %in% names(data_dims))) {
stop("Could not find the dimension '", inner_dim, "' in ",
"the file. Either change the dimension name in ",
"your request, adjust the parameter ",
"'dim_names_in_files' or fix the dimension name in ",
"the file.\n", file_path)
}
} else {
stop(paste0("Can not translate the provided selectors for '", inner_dim,
"' to numeric indices. Provide numeric indices and a ",
"'file_dim_reader' function, or a '", inner_dim,
"_var' in order to calculate the indices."))
}
# At this point, if no selector variable was provided, the variable
# data_dims has been populated. If a selector variable was provided,
# the variables var_dims, var_file_dims and var_full_dims have been
# populated instead.
fri <- first_round_indices <- NULL
sri <- second_round_indices <- NULL
# This variable will keep the indices needed to crop the transformed
# variable (the one that has been transformed without being subset
# with the first round indices).
tvi <- tranaformed_variable_indices <- NULL
ordered_fri <- NULL
ordered_sri <- NULL
if ((length(selector_array) == 1) && is.character(selector_array) &&
(selector_array %in% c('all', 'first', 'last')) &&
(chunks[[inner_dim]]['n_chunks'] == 1)) {
if (is.null(var_with_selectors_name)) {
fri <- vector('list', length = chunk_amount)
dim(fri) <- c(chunk_amount)
sri <- vector('list', length = chunk_amount)
dim(sri) <- c(chunk_amount)
if (selector_array == 'all') {
fri[] <- replicate(chunk_amount, list(1:(data_dims[inner_dim])))
taken_chunks <- rep(TRUE, chunk_amount)
#sri <- NULL
} else if (selector_array == 'first') {
fri[[1]] <- 1
taken_chunks[1] <- TRUE
#sri <- NULL
} else if (selector_array == 'last') {
fri[[chunk_amount]] <- data_dims[inner_dim]
taken_chunks[length(taken_chunks)] <- TRUE
#sri <- NULL
}
} else {
if ((!is.null(file_dim)) && !(file_dim %in% names(var_file_dims))) {
stop("The variable '", var_with_selectors_name, "' must also be ",
"requested for the file dimension '", file_dim, "' in ",
"this configuration.")
}
fri <- vector('list', length = prod(var_file_dims))
dim(fri) <- var_file_dims
ordered_fri <- fri
sri <- vector('list', length = prod(var_file_dims))
dim(sri) <- var_file_dims
ordered_sri <- sri
if (selector_array == 'all') {
# TODO: Populate ordered_fri
ordered_fri[] <- replicate(prod(var_file_dims), list(1:n))
fri[] <- replicate(prod(var_file_dims), list(var_unorder_indices[1:n]))
taken_chunks <- rep(TRUE, chunk_amount)
if (!with_transform) {
#fri[] <- replicate(prod(var_file_dims), list(1:n))
#taken_chunks <- rep(TRUE, chunk_amount)
#sri <- NULL
} else {
ordered_sri[] <- replicate(prod(var_file_dims), list(1:m))
sri[] <- replicate(prod(var_file_dims), list(1:m))
## var_file_dims instead??
#if (!aiat) {
#fri[] <- replicate(prod(var_file_dims), list(1:n))
#taken_chunks <- rep(TRUE, chunk_amount)
#sri[] <- replicate(prod(transformed_var_file_dims), list(1:m))
#} else {
#fri[] <- replicate(prod(var_file_dims), list(1:n))
#taken_chunks <- rep(TRUE, chunk_amount)
#sri[] <- replicate(prod(transformed_var_file_dims), list(1:m))
#}
tvi <- 1:m
}
} else if (selector_array == 'first') {
taken_chunks[1] <- TRUE
if (!with_transform) {
ordered_fri[[1]] <- 1
fri[[1]] <- var_unorder_indices[1]
#taken_chunks[1] <- TRUE
#sri <- NULL
} else {
if (!aiat) {
ordered_fri[[1]] <- 1
fri[[1]] <- var_unorder_indices[1]
# TODO: TO BE IMPROVED
#taken_chunks[1] <- TRUE
ordered_sri[[1]] <- 1:ceiling(m / n)
sri[[1]] <- 1:ceiling(m / n)
tvi <- 1:ceiling(m / n)
} else {
ordered_fri[[1]] <- 1:ceiling(m / n)
fri[[1]] <- var_unorder_indices[1:ceiling(m / n)]
#taken_chunks[1] <- TRUE
ordered_sri[[1]] <- 1
sri[[1]] <- 1
tvi <- 1
}
}
} else if (selector_array == 'last') {
taken_chunks[length(taken_chunks)] <- TRUE
if (!with_transform) {
ordered_fri[[prod(var_file_dims)]] <- n
fri[[prod(var_file_dims)]] <- var_unorder_indices[n]
#taken_chunks[length(taken_chunks)] <- TRUE
#sri <- NULL
} else {
if (!aiat) {
ordered_fri[[prod(var_file_dims)]] <- prod(var_dims)
fri[[prod(var_file_dims)]] <- var_unorder_indices[prod(var_dims)]
#taken_chunks[length(taken_chunks)] <- TRUE
ordered_sri[[prod(var_file_dims)]] <- 1:ceiling(m / n)
sri[[prod(var_file_dims)]] <- 1:ceiling(m / n)
# TODO: TO BE IMPROVED. THE TVI MAY BE WRONG IF THERE'S BEEN A REORDERING.
tvi <- 1:ceiling(m / n)
} else {
ordered_fri[[prod(var_file_dims)]] <- (n - ceiling(m / n) + 1):n
fri[[prod(var_file_dims)]] <- var_unorder_indices[(n - ceiling(m / n) + 1):n]
#taken_chunks[length(taken_chunks)] <- TRUE
ordered_sri[[prod(var_file_dims)]] <- 1
sri[[prod(var_file_dims)]] <- 1
tvi <- 1
}
}
}
}
# If the selectors are not 'all', 'first', 'last', ...
} else {
if (!is.null(var_with_selectors_name)) {
unmatching_file_dims <- which(!(names(var_file_dims) %in% names(selector_file_dims)))
if ((length(unmatching_file_dims) > 0)) {
raise_error <- FALSE
if (is.null(file_dim)) {
raise_error <- TRUE
} else {
if (!((length(unmatching_file_dims) == 1) &&
(names(var_file_dims)[unmatching_file_dims] == file_dim) &&
(inner_dim %in% names(selector_inner_dims)))) {
raise_error <- TRUE
}
}
if (raise_error) {
stop("Provided selectors for the dimension '", inner_dim, "' must have as many ",
"file dimensions as the variable the dimension is defined along, '",
var_with_selectors_name, "', with the exceptions of the file pattern dimension ('",
found_pattern_dim, "') and any depended file dimension (if specified as ",
"depended dimension in parameter 'inner_dims_across_files' and the ",
"depending file dimension is present in the provided selector array).")
}
}
if (any(names(selector_file_dims) %in% names(dim(var_with_selectors)))) {
if (any(dim(var_with_selectors)[names(selector_file_dims)] != selector_file_dims)) {
stop("Size of selector file dimensions must mach size of requested ",
"variable dimensions.")
}
}
}
## TODO: If var dimensions are not in the same order as selector dimensions, reorder
if (is.null(names(selector_file_dims))) {
if (is.null(file_dim)) {
fri_dims <- 1
} else {
fri_dims <- chunk_amount
names(fri_dims) <- file_dim
}
} else {
fri_dim_names <- names(selector_file_dims)
if (!is.null(file_dim)) {
fri_dim_names <- c(fri_dim_names, file_dim)
}
fri_dim_names <- found_file_dims[[i]][which(found_file_dims[[i]] %in% fri_dim_names)]
fri_dims <- rep(NA, length(fri_dim_names))
names(fri_dims) <- fri_dim_names
fri_dims[names(selector_file_dims)] <- selector_file_dims
if (!is.null(file_dim)) {
fri_dims[file_dim] <- chunk_amount
}
}
fri <- vector('list', length = prod(fri_dims))
dim(fri) <- fri_dims
sri <- vector('list', length = prod(fri_dims))
dim(sri) <- fri_dims
selector_file_dim_array <- array(1:prod(selector_file_dims), dim = selector_file_dims)
selector_store_position <- fri_dims
for (j in 1:prod(dim(selector_file_dim_array))) {
selector_indices_to_take <- which(selector_file_dim_array == j, arr.ind = TRUE)[1, ]
names(selector_indices_to_take) <- names(selector_file_dims)
selector_store_position[names(selector_indices_to_take)] <- selector_indices_to_take
sub_array_of_selectors <- Subset(selector_array, names(selector_indices_to_take),
as.list(selector_indices_to_take), drop = 'selected')
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> ITERATING OVER FILE DIMENSIONS OF THE SELECTORS.")
print("-> STRUCTURE OF A SUB ARRAY:")
print(str(sub_array_of_selectors))
print("-> STRUCTURE OF THE VARIABLE WITH SELECTORS:")
print(str(var_with_selectors))
print(dim(var_with_selectors))
}
}
if (selectors_are_indices) {
sub_array_of_values <- NULL
#} else if (!is.null(var_ordered)) {
# sub_array_of_values <- var_ordered
} else {
if (length(var_file_dims) > 0) {
var_indices_to_take <- selector_indices_to_take[which(names(selector_indices_to_take) %in% names(var_file_dims))]
sub_array_of_values <- Subset(var_with_selectors, names(var_indices_to_take),
as.list(var_indices_to_take), drop = 'selected')
} else {
sub_array_of_values <- var_with_selectors
}
}
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> STRUCTURE OF THE SUB ARRAY FROM THE VARIABLE CORRESPONDING TO THE SUB ARRAY OF SELECTORS")
print(str(sub_array_of_values))
print(dim(sub_array_of_values))
print("-> NAME OF THE FILE DIMENSION THE CURRENT INNER DIMENSION EXTENDS ALONG:")
print(file_dim)
}
}
if ((!is.null(file_dim) && (file_dim %in% names(selector_file_dims))) || is.null(file_dim)) {
if (length(sub_array_of_selectors) > 0) {
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> THE INNER DIMENSION DOES NOT GO ACROSS ANY FILE DIMENSION OR IT DOES BUT IS IN THE PROVIDED SELECTOR ARRAY.")
}
}
if (selectors_are_indices) {
if (!is.null(var_with_selectors_name)) {
max_allowed <- ifelse(aiat, m, n)
} else {
max_allowed <- data_dims[inner_dim]
}
if (any(na.omit(unlist(sub_array_of_selectors)) > max_allowed) ||
any(na.omit(unlist(sub_array_of_selectors)) < 1)) {
stop("Provided indices out of range for dimension '", inner_dim, "' ",
"for dataset '", dat[[i]][['name']], "' (accepted range: 1 to ",
max_allowed, ").")
}
}
# The selector_checker will return either a vector of indices or a list
# with the first and last desired indices.
goes_across_prime_meridian <- FALSE
if (!is.null(var_ordered) && !selectors_are_indices) {
if (!is.null(dim_reorder_params[[inner_dim]])) {
if (is.list(sub_array_of_selectors)) {
sub_array_reordered <- dim_reorder_params[[inner_dim]](unlist(sub_array_of_selectors))
sub_array_unorder <- sort(sub_array_reordered$ix, index.return = TRUE)$ix
sub_array_of_selectors <- as.list(sub_array_reordered$x[sub_array_unorder])
is_circular_dim <- attr(dim_reorder_params[[inner_dim]], 'circular')
if (!is.null(is_circular_dim)) {
if (is_circular_dim) {
goes_across_prime_meridian <- abs(sub_array_of_selectors[[1]]) > abs(sub_array_of_selectors[[2]])
## TODO: if (bounds[1] > bounds[2]) goes_across_prime_meridian <- !goes_across_prime_meridian
}
}
} else {
sub_array_of_selectors <- dim_reorder_params[[inner_dim]](sub_array_of_selectors)$x
}
}
sub_array_of_indices <- selector_checker(sub_array_of_selectors, var_ordered,
tolerance = if (aiat) {
NULL
} else {
tolerance_params[[inner_dim]]
})
} else {
sub_array_of_indices <- selector_checker(sub_array_of_selectors, sub_array_of_values,
tolerance = if (aiat) {
NULL
} else {
tolerance_params[[inner_dim]]
})
}
sub_array_of_indices <- sub_array_of_indices[chunk_indices(length(sub_array_of_indices),
chunks[[inner_dim]]['chunk'],
chunks[[inner_dim]]['n_chunks'],
inner_dim)]
# The sub_array_of_indices now contains numeric indices of the values to be taken.
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> TRANSFORMATION REQUESTED?")
print(with_transform)
print("-> BETA:")
print(beta)
}
}
if (with_transform) {
# If there is a transformation and selector values are provided, these
# selectors will be processed in the same way either if aiat = TRUE or
# aiat = FALSE.
## TODO: If sub_array_of_selectors was integer and aiat then... do what's commented 50 lines below.
## otherwise, do what's coded.
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> SELECTORS REQUESTED BEFORE TRANSFORM.")
}
}
if (goes_across_prime_meridian) {
sub_array_of_fri <- 1:n
#gap_width <- sub_array_of_indices[[1]] - sub_array_of_indices[[2]] - 1
#sub_array_of_fri <- c((1:(sub_array_of_indices[[2]] + min(gap_width, beta))),
# (sub_array_of_indices[[1]] - min(gap_width, beta)):n)
} else {
if (is.list(sub_array_of_indices)) {
sub_array_of_indices <- sub_array_of_indices[[1]]:sub_array_of_indices[[2]]
}
first_index <- min(unlist(sub_array_of_indices))
last_index <- max(unlist(sub_array_of_indices))
start_padding <- min(beta, first_index - 1)
end_padding <- min(beta, n - last_index)
sub_array_of_fri <- (first_index - start_padding):(last_index + end_padding)
}
subset_vars_to_transform <- vars_to_transform
if (!is.null(var_ordered)) {
subset_vars_to_transform[[var_with_selectors_name]] <- Subset(var_ordered, inner_dim, sub_array_of_fri)
} else {
subset_vars_to_transform[[var_with_selectors_name]] <- Subset(sub_array_of_values, inner_dim, sub_array_of_fri)
}
transformed_subset_var <- do.call(transform, c(list(data_array = NULL,
variables = subset_vars_to_transform,
file_selectors = selectors_of_first_files_with_data[[i]]),
transform_params))$variables[[var_with_selectors_name]]
# Sorting the transformed variable and working out the indices again after transform.
if (!is.null(dim_reorder_params[[inner_dim]])) {
transformed_subset_var_reorder <- dim_reorder_params[[inner_dim]](transformed_subset_var)
transformed_subset_var <- transformed_subset_var_reorder$x
transformed_subset_var_unorder <- sort(transformed_subset_var_reorder$ix, index.return = TRUE)$ix
} else {
transformed_subset_var_unorder <- 1:length(transformed_subset_var)
}
sub_array_of_sri <- selector_checker(sub_array_of_selectors, transformed_subset_var,
tolerance = if (aiat) {
tolerance_params[[inner_dim]]
} else {
NULL
})
if (goes_across_prime_meridian) {
sub_array_of_sri <- c(1:sub_array_of_sri[[2]], sub_array_of_sri[[1]]:length(transformed_subset_var))
#sub_array_of_sri <- c(sub_array_of_sri[[1]]:length(transformed_subset_var), 1:sub_array_of_sri[[2]])
} else if (is.list(sub_array_of_sri)) {
sub_array_of_sri <- sub_array_of_sri[[1]]:sub_array_of_sri[[2]]
}
ordered_sri <- sub_array_of_sri
sub_array_of_sri <- transformed_subset_var_unorder[sub_array_of_sri]
# In this case, the tvi are not defined and the 'transformed_subset_var'
# will be taken instead of the var transformed before in the code.
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> FIRST INDEX:")
print(first_index)
print("-> LAST INDEX:")
print(last_index)
print("-> STRUCTURE OF FIRST ROUND INDICES:")
print(str(sub_array_of_fri))
print("-> STRUCTURE OF SECOND ROUND INDICES:")
print(str(sub_array_of_sri))
print("-> STRUCTURE OF TRANSFORMED VARIABLE INDICES:")
print(str(tvi))
}
}
### # If the selectors are expressed after transformation
### } else {
###if (debug) {
###if (inner_dim %in% dims_to_check) {
###print("-> SELECTORS REQUESTED AFTER TRANSFORM.")
###}
###}
### if (goes_across_prime_meridian) {
### sub_array_of_indices <- c(sub_array_of_indices[[1]]:m,
### 1:sub_array_of_indices[[2]])
### }
### first_index <- min(unlist(sub_array_of_indices))
### last_index <- max(unlist(sub_array_of_indices))
### first_index_before_transform <- max(transform_indices(first_index, m, n) - beta, 1)
### last_index_before_transform <- min(transform_indices(last_index, m, n) + beta, n)
### sub_array_of_fri <- first_index_before_transform:last_index_before_transform
### n_of_extra_cells <- round(beta / n * m)
### if (is.list(sub_array_of_indices) && (length(sub_array_of_indices) > 1)) {
### sub_array_of_sri <- 1:(last_index - first_index + 1)
### if (is.null(tvi)) {
### tvi <- sub_array_of_sri + first_index - 1
### }
### } else {
### sub_array_of_sri <- sub_array_of_indices - first_index + 1
### if (is.null(tvi)) {
### tvi <- sub_array_of_indices
### }
### }
### sub_array_of_sri <- sub_array_of_sri + n_of_extra_cells
sri <- do.call('[[<-', c(list(x = sri), as.list(selector_store_position),
list(value = sub_array_of_sri)))
} else {
if (goes_across_prime_meridian) {
#sub_array_of_fri <- 1:n
sub_array_of_fri <- c(1:sub_array_of_indices[[2]], sub_array_of_indices[[1]]:n)
} else if (is.list(sub_array_of_indices)) {
sub_array_of_fri <- sub_array_of_indices[[1]]:sub_array_of_indices[[2]]
} else {
sub_array_of_fri <- sub_array_of_indices
}
}
if (!is.null(var_unorder_indices)) {
if (is.null(ordered_fri)) {
ordered_fri <- sub_array_of_fri
}
sub_array_of_fri <- var_unorder_indices[sub_array_of_fri]
}
fri <- do.call('[[<-', c(list(x = fri), as.list(selector_store_position),
list(value = sub_array_of_fri)))
if (!is.null(file_dim)) {
taken_chunks[selector_store_position[[file_dim]]] <- TRUE
} else {
taken_chunks <- TRUE
}
}
} else {
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> THE INNER DIMENSION GOES ACROSS A FILE DIMENSION.")
}
}
if (inner_dim %in% names(dim(sub_array_of_selectors))) {
if (is.null(var_with_selectors_name)) {
if (any(na.omit(unlist(sub_array_of_selectors)) < 1) ||
any(na.omit(unlist(sub_array_of_selectors)) > data_dims[inner_dim] * chunk_amount)) {
stop("Provided indices out of range for dimension '", inner_dim, "' ",
"for dataset '", dat[[i]][['name']], "' (accepted range: 1 to ",
data_dims[inner_dim] * chunk_amount, ").")
}
} else {
if (inner_dim %in% names(dim(sub_array_of_values))) {
inner_dim_pos_in_sub_array <- which(names(dim(sub_array_of_values)) == inner_dim)
if (inner_dim_pos_in_sub_array != 1) {
new_sub_array_order <- (1:length(dim(sub_array_of_values)))[-inner_dim_pos_in_sub_array]
new_sub_array_order <- c(inner_dim_pos_in_sub_array, new_sub_array_order)
sub_array_of_values <- .aperm2(sub_array_of_values, new_sub_array_order)
}
}
}
inner_dim_pos_in_sub_array <- which(names(dim(sub_array_of_selectors)) == inner_dim)
if (inner_dim_pos_in_sub_array != 1) {
new_sub_array_order <- (1:length(dim(sub_array_of_selectors)))[-inner_dim_pos_in_sub_array]
new_sub_array_order <- c(inner_dim_pos_in_sub_array, new_sub_array_order)
sub_array_of_selectors <- .aperm2(sub_array_of_selectors, new_sub_array_order)
}
sub_array_of_indices <- selector_checker(sub_array_of_selectors, sub_array_of_values,
tolerance = tolerance_params[[inner_dim]])
# It is needed to expand the indices here, otherwise for
# values(list(date1, date2)) only 2 values are picked.
if (is.list(sub_array_of_indices)) {
sub_array_of_indices <- sub_array_of_indices[[1]]:sub_array_of_indices[[2]]
}
sub_array_of_indices <- sub_array_of_indices[chunk_indices(length(sub_array_of_indices),
chunks[[inner_dim]]['chunk'],
chunks[[inner_dim]]['n_chunks'],
inner_dim)]
sub_array_is_list <- FALSE
if (is.list(sub_array_of_indices)) {
sub_array_is_list <- TRUE
sub_array_of_indices <- unlist(sub_array_of_indices)
}
if (is.null(var_with_selectors_name)) {
indices_chunk <- floor((sub_array_of_indices - 1) / data_dims[inner_dim]) + 1
transformed_indices <- ((sub_array_of_indices - 1) %% data_dims[inner_dim]) + 1
} else {
indices_chunk <- floor((sub_array_of_indices - 1) / var_full_dims[inner_dim]) + 1
transformed_indices <- ((sub_array_of_indices - 1) %% var_full_dims[inner_dim]) + 1
}
if (sub_array_is_list) {
sub_array_of_indices <- as.list(sub_array_of_indices)
}
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> GOING TO ITERATE ALONG CHUNKS.")
}
}
for (chunk in 1:chunk_amount) {
if (!is.null(names(selector_store_position))) {
selector_store_position[file_dim] <- chunk
} else {
selector_store_position <- chunk
}
chunk_selectors <- transformed_indices[which(indices_chunk == chunk)]
sub_array_of_indices <- chunk_selectors
if (with_transform) {
# If the provided selectors are expressed in the world
# before transformation
if (!aiat) {
first_index <- min(unlist(sub_array_of_indices))
last_index <- max(unlist(sub_array_of_indices))
sub_array_of_fri <- max(c(first_index - beta, 1)):min(c(last_index + beta, n))
sub_array_of_sri <- transform_indices(unlist(sub_array_of_indices) - first_index + 1, n, m)
if (is.list(sub_array_of_indices)) {
if (length(sub_array_of_sri) > 1) {
sub_array_of_sri <- sub_array_of_sri[[1]]:sub_array_of_sri[[2]]
}
}
##TODO: TRANSFORM SUBSET VARIABLE AS ABOVE, TO COMPUTE SRI
# If the selectors are expressed after transformation
} else {
first_index <- min(unlist(sub_array_of_indices))
last_index <- max(unlist(sub_array_of_indices))
first_index_before_transform <- max(transform_indices(first_index, m, n) - beta, 1)
last_index_before_transform <- min(transform_indices(last_index, m, n) + beta, n)
sub_array_of_fri <- first_index_before_transform:last_index_before_transform
if (is.list(sub_array_of_indices) && (length(sub_array_of_indices) > 1)) {
sub_array_of_sri <- 1:(last_index - first_index + 1) +
round(beta / n * m)
} else {
sub_array_of_sri <- sub_array_of_indices - first_index + 1 +
round(beta / n * m)
}
##TODO: FILL IN TVI
}
sri <- do.call('[[<-', c(list(x = sri), as.list(selector_store_position),
list(value = sub_array_of_sri)))
if (length(sub_array_of_sri) > 0) {
taken_chunks[chunk] <- TRUE
}
} else {
sub_array_of_fri <- sub_array_of_indices
if (length(sub_array_of_fri) > 0) {
taken_chunks[chunk] <- TRUE
}
}
if (!is.null(var_unorder_indices)) {
ordered_fri <- sub_array_of_fri
sub_array_of_fri <- var_unorder_indices[sub_array_of_fri]
}
fri <- do.call('[[<-', c(list(x = fri), as.list(selector_store_position),
list(value = sub_array_of_fri)))
}
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> FINISHED ITERATING ALONG CHUNKS")
}
}
} else {
stop("Provided array of indices for dimension '", inner_dim, "', ",
"which goes across the file dimension '", file_dim, "', but ",
"the provided array does not have the dimension '", inner_dim,
"', which is mandatory.")
}
}
}
}
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> PROCEEDING TO CROP VARIABLES")
}
}
#if ((length(selector_array) == 1) && (selector_array %in% c('all', 'first', 'last'))) {
#if (!is.null(var_with_selectors_name) || (is.null(var_with_selectors_name) && is.character(selector_array) &&
# (length(selector_array) == 1) && (selector_array %in% c('all', 'first', 'last')))) {
empty_chunks <- which(!taken_chunks)
if (length(empty_chunks) >= length(taken_chunks)) {
stop("Selectors do not match any of the possible values for the dimension '", inner_dim, "'.")
}
if (length(empty_chunks) > 0) {
# # Get the first group of chunks to remove, and remove them.
# # E.g., from c(1, 2, 4, 5, 6, 8, 9) remove only 1 and 2
# dist <- abs(rev(empty_chunks) - c(rev(empty_chunks)[1] - 1, head(rev(empty_chunks), length(rev(empty_chunks)) - 1)))
# if (all(dist == 1)) {
# start_chunks_to_remove <- NULL
# } else {
# first_chunk_to_remove <- tail(which(dist > 1), 1)
# start_chunks_to_remove <- rev(rev(empty_chunks)[first_chunk_to_remove:length(empty_chunks)])
# }
# # Get the last group of chunks to remove, and remove them.
# # E.g., from c(1, 2, 4, 5, 6, 8, 9) remove only 8 and 9
# dist <- abs(empty_chunks - c(empty_chunks[1] - 1, head(empty_chunks, length(empty_chunks) - 1)))
# if (all(dist == 1)) {
# first_chunk_to_remove <- 1
# } else {
# first_chunk_to_remove <- tail(which(dist > 1), 1)
# }
# end_chunks_to_remove <- empty_chunks[first_chunk_to_remove:length(empty_chunks)]
# chunks_to_keep <- which(!((1:length(taken_chunks)) %in% c(start_chunks_to_remove, end_chunks_to_remove)))
chunks_to_keep <- which(taken_chunks)
dims_to_crop[[file_dim]] <- c(dims_to_crop[[file_dim]], list(chunks_to_keep))
# found_indices <- Subset(found_indices, file_dim, chunks_to_keep)
# # Crop dataset variables file dims.
# for (picked_var in names(picked_vars[[i]])) {
# if (file_dim %in% names(dim(picked_vars[[i]][[picked_var]]))) {
# picked_vars[[i]][[picked_var]] <- Subset(picked_vars[[i]][[picked_var]], file_dim, chunks_to_keep)
# }
# }
}
#}
dat[[i]][['selectors']][[inner_dim]] <- list(fri = fri, sri = sri)
# Crop dataset variables inner dims.
# Crop common variables inner dims.
types_of_var_to_crop <- 'picked'
if (with_transform) {
types_of_var_to_crop <- c(types_of_var_to_crop, 'transformed')
}
if (!is.null(dim_reorder_params[[inner_dim]])) {
types_of_var_to_crop <- c(types_of_var_to_crop, 'reordered')
}
for (type_of_var_to_crop in types_of_var_to_crop) {
if (type_of_var_to_crop == 'transformed') {
if (is.null(tvi)) {
if (!is.null(dim_reorder_params[[inner_dim]])) {
crop_indices <- unique(unlist(ordered_sri))
} else {
crop_indices <- unique(unlist(sri))
}
} else {
crop_indices <- unique(unlist(tvi))
}
vars_to_crop <- transformed_vars[[i]]
common_vars_to_crop <- transformed_common_vars
} else if (type_of_var_to_crop == 'reordered') {
crop_indices <- unique(unlist(ordered_fri))
vars_to_crop <- picked_vars_ordered[[i]]
common_vars_to_crop <- picked_common_vars_ordered
} else {
crop_indices <- unique(unlist(fri))
vars_to_crop <- picked_vars[[i]]
common_vars_to_crop <- picked_common_vars
}
for (var_to_crop in names(vars_to_crop)) {
if (inner_dim %in% names(dim(vars_to_crop[[var_to_crop]]))) {
if (!is.null(crop_indices)) {
if (type_of_var_to_crop == 'transformed') {
if (!aiat) {
vars_to_crop[[var_to_crop]] <- Subset(transformed_subset_var, inner_dim, crop_indices)
} else {
vars_to_crop[[var_to_crop]] <- Subset(vars_to_crop[[var_to_crop]], inner_dim, crop_indices)
}
} else {
vars_to_crop[[var_to_crop]] <- Subset(vars_to_crop[[var_to_crop]], inner_dim, crop_indices)
}
}
}
}
if (i == length(dat)) {
for (common_var_to_crop in names(common_vars_to_crop)) {
if (inner_dim %in% names(dim(common_vars_to_crop[[common_var_to_crop]]))) {
common_vars_to_crop[[common_var_to_crop]] <- Subset(common_vars_to_crop[[common_var_to_crop]], inner_dim, crop_indices)
}
}
}
if (type_of_var_to_crop == 'transformed') {
if (!is.null(vars_to_crop)) {
transformed_vars[[i]] <- vars_to_crop
}
if (i == length(dat)) {
transformed_common_vars <- common_vars_to_crop
}
} else if (type_of_var_to_crop == 'reordered') {
if (!is.null(vars_to_crop)) {
picked_vars_ordered[[i]] <- vars_to_crop
}
if (i == length(dat)) {
picked_common_vars_ordered <- common_vars_to_crop
}
} else {
if (!is.null(vars_to_crop)) {
picked_vars[[i]] <- vars_to_crop
}
if (i == length(dat)) {
picked_common_vars <- common_vars_to_crop
}
}
}
#}
}
# After the selectors have been picked (using the original variables),
# the variables are transformed. At that point, the original selectors
# for the transformed variables are also kept in the variable original_selectors.
#print("L")
}
}
}
# if (!is.null(transformed_common_vars)) {
# picked_common_vars[names(transformed_common_vars)] <- transformed_common_vars
# }
# Remove the trailing chunks, if any.
for (file_dim in names(dims_to_crop)) {
# indices_to_keep <- min(sapply(dims_to_crop[[file_dim]], min)):max(sapply(dims_to_crop[[file_dim]], max))
## TODO: Merge indices in dims_to_crop with some advanced mechanism?
indices_to_keep <- unique(unlist(dims_to_crop[[file_dim]]))
array_of_files_to_load <- Subset(array_of_files_to_load, file_dim, indices_to_keep)
array_of_not_found_files <- Subset(array_of_not_found_files, file_dim, indices_to_keep)
for (i in 1:length(dat)) {
# Crop selectors
for (selector_dim in names(dat[[i]][['selectors']])) {
if (selector_dim == file_dim) {
for (j in 1:length(dat[[i]][['selectors']][[selector_dim]][['fri']])) {
dat[[i]][['selectors']][[selector_dim]][['fri']][[j]] <- dat[[i]][['selectors']][[selector_dim]][['fri']][[j]][indices_to_keep]
}
for (j in 1:length(dat[[i]][['selectors']][[selector_dim]][['sri']])) {
dat[[i]][['selectors']][[selector_dim]][['sri']][[j]] <- dat[[i]][['selectors']][[selector_dim]][['sri']][[j]][indices_to_keep]
}
}
if (file_dim %in% names(dim(dat[[i]][['selectors']][[selector_dim]][['fri']]))) {
dat[[i]][['selectors']][[selector_dim]][['fri']] <- Subset(dat[[i]][['selectors']][[selector_dim]][['fri']], file_dim, indices_to_keep)
dat[[i]][['selectors']][[selector_dim]][['sri']] <- Subset(dat[[i]][['selectors']][[selector_dim]][['sri']], file_dim, indices_to_keep)
}
}
# Crop dataset variables file dims.
for (picked_var in names(picked_vars[[i]])) {
if (file_dim %in% names(dim(picked_vars[[i]][[picked_var]]))) {
picked_vars[[i]][[picked_var]] <- Subset(picked_vars[[i]][[picked_var]], file_dim, indices_to_keep)
}
}
for (transformed_var in names(transformed_vars[[i]])) {
if (file_dim %in% names(dim(transformed_vars[[i]][[transformed_var]]))) {
transformed_vars[[i]][[transformed_var]] <- Subset(transformed_vars[[i]][[transformed_var]], file_dim, indices_to_keep)
}
}
}
# Crop common variables file dims.
for (picked_common_var in names(picked_common_vars)) {
if (file_dim %in% names(dim(picked_common_vars[[picked_common_var]]))) {
picked_common_vars[[picked_common_var]] <- Subset(picked_common_vars[[picked_common_var]], file_dim, indices_to_keep)
}
}
for (transformed_common_var in names(transformed_common_vars)) {
if (file_dim %in% names(dim(transformed_common_vars[[transformed_common_var]]))) {
transformed_common_vars[[transformed_common_var]] <- Subset(transformed_common_vars[[transformed_common_var]], file_dim, indices_to_keep)
}
}
}
# Calculate the size of the final array.
total_inner_dims <- NULL
for (i in 1:length(dat)) {
if (dataset_has_files[i]) {
inner_dims <- expected_inner_dims[[i]]
inner_dims <- sapply(inner_dims,
function(x) {
if (!all(sapply(dat[[i]][['selectors']][[x]][['sri']], is.null))) {
max(sapply(dat[[i]][['selectors']][[x]][['sri']], length))
} else {
if (length(var_params[[x]]) > 0) {
if (var_params[[x]] %in% names(transformed_vars[[i]])) {
length(transformed_vars[[i]][[var_params[[x]]]])
} else if (var_params[[x]] %in% names(transformed_common_vars)) {
length(transformed_common_vars[[var_params[[x]]]])
} else {
max(sapply(dat[[i]][['selectors']][[x]][['fri']], length))
}
} else {
max(sapply(dat[[i]][['selectors']][[x]][['fri']], length))
}
}
})
names(inner_dims) <- expected_inner_dims[[i]]
if (is.null(total_inner_dims)) {
total_inner_dims <- inner_dims
} else {
new_dims <- .MergeArrayDims(total_inner_dims, inner_dims)
total_inner_dims <- pmax(new_dims[[1]], new_dims[[2]])
}
}
}
new_dims <- .MergeArrayDims(dim(array_of_files_to_load), total_inner_dims)
final_dims <- pmax(new_dims[[1]], new_dims[[2]])[dim_names]
# final_dims_fake is the vector of final dimensions after having merged the
# 'across' file dimensions with the respective 'across' inner dimensions, and
# after having broken into multiple dimensions those dimensions for which
# multidimensional selectors have been provided.
# final_dims will be used for collocation of data, whereas final_dims_fake
# will be used for shaping the final array to be returned to the user.
final_dims_fake <- final_dims
if (merge_across_dims) {
if (!is.null(inner_dims_across_files)) {
for (file_dim_across in names(inner_dims_across_files)) {
inner_dim_pos <- which(names(final_dims_fake) == inner_dims_across_files[[file_dim_across]])
new_dims <- c()
if (inner_dim_pos > 1) {
new_dims <- c(new_dims, final_dims_fake[1:(inner_dim_pos - 1)])
}
new_dims <- c(new_dims, setNames(prod(final_dims_fake[c(inner_dim_pos, inner_dim_pos + 1)]),
inner_dims_across_files[[file_dim_across]]))
if (inner_dim_pos + 1 < length(final_dims_fake)) {
new_dims <- c(new_dims, final_dims_fake[(inner_dim_pos + 2):length(final_dims_fake)])
}
final_dims_fake <- new_dims
}
}
}
all_split_dims <- NULL
if (split_multiselected_dims) {
for (dim_param in 1:length(dim_params)) {
if (!is.null(dim(dim_params[[dim_param]]))) {
if (length(dim(dim_params[[dim_param]])) > 1) {
split_dims <- dim(dim_params[[dim_param]])
all_split_dims <- c(all_split_dims, setNames(list(split_dims),
names(dim_params)[dim_param]))
if (is.null(names(split_dims))) {
names(split_dims) <- paste0(names(dim_params)[dim_param],
1:length(split_dims))
}
old_dim_pos <- which(names(final_dims_fake) == names(dim_params)[dim_param])
new_dims <- c()
if (old_dim_pos > 1) {
new_dims <- c(new_dims, final_dims_fake[1:(old_dim_pos - 1)])
}
new_dims <- c(new_dims, split_dims)
if (old_dim_pos < length(final_dims_fake)) {
new_dims <- c(new_dims, final_dims_fake[(old_dim_pos + 1):length(final_dims_fake)])
}
final_dims_fake <- new_dims
}
}
}
}
if (!silent) {
.message("Detected dimension sizes:")
longest_dim_len <- max(sapply(names(final_dims_fake), nchar))
longest_size_len <- max(sapply(paste0(final_dims_fake, ''), nchar))
sapply(names(final_dims_fake),
function(x) {
message(paste0("* ", paste(rep(' ', longest_dim_len - nchar(x)), collapse = ''),
x, ": ", paste(rep(' ', longest_size_len - nchar(paste0(final_dims_fake[x], ''))), collapse = ''),
final_dims_fake[x]))
})
bytes <- prod(c(final_dims_fake, 8))
dim_sizes <- paste(final_dims_fake, collapse = ' x ')
if (retrieve) {
.message(paste("Total size of requested data:"))
} else {
.message(paste("Total size of involved data:"))
}
.message(paste(dim_sizes, " x 8 bytes =",
format(structure(bytes, class = "object_size"), units = "auto")),
indent = 2)
}
# The following several lines will only be run if retrieve = TRUE
if (retrieve) {
########## CREATING THE SHARED MATRIX AND DISPATCHING WORK PIECES ###########
# TODO: try performance of storing all in cols instead of rows
# Create the shared memory array, and a pointer to it, to be sent
# to the work pieces.
data_array <- big.matrix(nrow = prod(final_dims), ncol = 1)
shared_matrix_pointer <- describe(data_array)
if (is.null(num_procs)) {
num_procs <- availableCores()
}
# Creating a shared tmp folder to store metadata from each chunk
array_of_metadata_flags <- array(FALSE, dim = dim(array_of_files_to_load))
if (!is.null(metadata_dims)) {
metadata_indices_to_load <- as.list(rep(1, length(dim(array_of_files_to_load))))
names(metadata_indices_to_load) <- names(dim(array_of_files_to_load))
metadata_indices_to_load[metadata_dims] <- as.list(rep(TRUE, length(metadata_dims)))
array_of_metadata_flags <- do.call('[<-', c(list(array_of_metadata_flags), metadata_indices_to_load,
list(value = rep(TRUE, prod(dim(array_of_files_to_load)[metadata_dims])))))
}
metadata_file_counter <- 0
metadata_folder <- tempfile('metadata')
dir.create(metadata_folder)
# Build the work pieces, each with:
# - file path
# - total size (dims) of store array
# - start position in store array
# - file selectors (to provide extra info. useful e.g. to select variable)
# - indices to take from file
work_pieces <- list()
for (i in 1:length(dat)) {
if (dataset_has_files[i]) {
selectors <- dat[[i]][['selectors']]
file_dims <- found_file_dims[[i]]
inner_dims <- expected_inner_dims[[i]]
sub_array_dims <- final_dims[file_dims]
sub_array_dims[found_pattern_dim] <- 1
sub_array_of_files_to_load <- array(1:prod(sub_array_dims),
dim = sub_array_dims)
names(dim(sub_array_of_files_to_load)) <- names(sub_array_dims)
# Detect which of the dimensions of the dataset go across files.
file_dim_across_files <- lapply(inner_dims,
function(x) {
dim_across <- sapply(inner_dims_across_files, function(y) x %in% y)
if (any(dim_across)) {
names(inner_dims_across_files)[which(dim_across)[1]]
} else {
NULL
}
})
names(file_dim_across_files) <- inner_dims
j <- 1
while (j <= prod(sub_array_dims)) {
# Work out file path.
file_to_load_sub_indices <- which(sub_array_of_files_to_load == j, arr.ind = TRUE)[1, ]
names(file_to_load_sub_indices) <- names(sub_array_dims)
file_to_load_sub_indices[found_pattern_dim] <- i
big_dims <- rep(1, length(dim(array_of_files_to_load)))
names(big_dims) <- names(dim(array_of_files_to_load))
file_to_load_indices <- .MergeArrayDims(file_to_load_sub_indices, big_dims)[[1]]
file_to_load <- do.call('[[', c(list(array_of_files_to_load),
as.list(file_to_load_indices)))
not_found_file <- do.call('[[', c(list(array_of_not_found_files),
as.list(file_to_load_indices)))
load_file_metadata <- do.call('[', c(list(array_of_metadata_flags),
as.list(file_to_load_indices)))
if (load_file_metadata) {
metadata_file_counter <- metadata_file_counter + 1
}
if (!is.na(file_to_load) && !not_found_file) {
# Work out indices to take
first_round_indices <- lapply(inner_dims,
function (x) {
if (is.null(file_dim_across_files[[x]])) {
selectors[[x]][['fri']][[1]]
} else {
which_chunk <- file_to_load_sub_indices[file_dim_across_files[[x]]]
selectors[[x]][['fri']][[which_chunk]]
}
})
names(first_round_indices) <- inner_dims
second_round_indices <- lapply(inner_dims,
function (x) {
if (is.null(file_dim_across_files[[x]])) {
selectors[[x]][['sri']][[1]]
} else {
which_chunk <- file_to_load_sub_indices[file_dim_across_files[[x]]]
selectors[[x]][['sri']][[which_chunk]]
}
})
if (debug) {
print("-> BUILDING A WORK PIECE")
#print(str(selectors))
}
names(second_round_indices) <- inner_dims
if (!any(sapply(first_round_indices, length) == 0)) {
work_piece <- list()
work_piece[['first_round_indices']] <- first_round_indices
work_piece[['second_round_indices']] <- second_round_indices
work_piece[['file_indices_in_array_of_files']] <- file_to_load_indices
work_piece[['file_path']] <- file_to_load
work_piece[['store_dims']] <- final_dims
# Work out store position
store_position <- final_dims
store_position[names(file_to_load_indices)] <- file_to_load_indices
store_position[inner_dims] <- rep(1, length(inner_dims))
work_piece[['store_position']] <- store_position
# Work out file selectors
file_selectors <- sapply(file_dims,
function (x) {
vector_to_pick <- 1
if (x %in% names(depending_file_dims)) {
vector_to_pick <- file_to_load_indices[depending_file_dims[[x]]]
}
selectors[file_dims][[x]][[vector_to_pick]][file_to_load_indices[x]]
})
names(file_selectors) <- file_dims
work_piece[['file_selectors']] <- file_selectors
# Send variables for transformation
if (!is.null(transform) && (length(transform_vars) > 0)) {
vars_to_transform <- NULL
picked_vars_to_transform <- which(names(picked_vars[[i]]) %in% transform_vars)
if (length(picked_vars_to_transform) > 0) {
picked_vars_to_transform <- names(picked_vars[[i]])[picked_vars_to_transform]
vars_to_transform <- c(vars_to_transform, picked_vars[[i]][picked_vars_to_transform])
if (any(picked_vars_to_transform %in% names(picked_vars_ordered[[i]]))) {
picked_vars_ordered_to_transform <- picked_vars_to_transform[which(picked_vars_to_transform %in% names(picked_vars_ordered[[i]]))]
vars_to_transform[picked_vars_ordered_to_transform] <- picked_vars_ordered[[i]][picked_vars_ordered_to_transform]
}
}
picked_common_vars_to_transform <- which(names(picked_common_vars) %in% transform_vars)
if (length(picked_common_vars_to_transform) > 0) {
picked_common_vars_to_transform <- names(picked_common_vars)[picked_common_vars_to_transform]
vars_to_transform <- c(vars_to_transform, picked_common_vars[picked_common_vars_to_transform])
if (any(picked_common_vars_to_transform %in% names(picked_common_vars_ordered))) {
picked_common_vars_ordered_to_transform <- picked_common_vars_to_transform[which(picked_common_vars_to_transform %in% names(picked_common_vars_ordered))]
vars_to_transform[picked_common_vars_ordered_to_transform] <- picked_common_vars_ordered[picked_common_vars_ordered_to_transform]
}
}
work_piece[['vars_to_transform']] <- vars_to_transform
}
# Send flag to load metadata
if (load_file_metadata) {
work_piece[['save_metadata_in']] <- paste0(metadata_folder, '/', metadata_file_counter)
}
work_pieces <- c(work_pieces, list(work_piece))
}
}
j <- j + 1
}
}
}
#print("N")
if (debug) {
print("-> WORK PIECES BUILT")
}
# Calculate the progress %s that will be displayed and assign them to
# the appropriate work pieces.
if (length(work_pieces) / num_procs >= 2 && !silent) {
if (length(work_pieces) / num_procs < 10) {
amount <- 100 / ceiling(length(work_pieces) / num_procs)
reps <- ceiling(length(work_pieces) / num_procs)
} else {
amount <- 10
reps <- 10
}
progress_steps <- rep(amount, reps)
if (length(work_pieces) < (reps + 1)) {
selected_pieces <- length(work_pieces)
progress_steps <- c(sum(head(progress_steps, reps)),
tail(progress_steps, reps))
} else {
selected_pieces <- round(seq(1, length(work_pieces),
length.out = reps + 1))[-1]
}
progress_steps <- paste0(' + ', round(progress_steps, 2), '%')
progress_message <- 'Progress: 0%'
} else {
progress_message <- ''
selected_pieces <- NULL
}
piece_counter <- 1
step_counter <- 1
work_pieces <- lapply(work_pieces,
function (x) {
if (piece_counter %in% selected_pieces) {
wp <- c(x, list(progress_amount = progress_steps[step_counter]))
step_counter <<- step_counter + 1
} else {
wp <- x
}
piece_counter <<- piece_counter + 1
wp
})
if (!silent) {
.message("If the size of the requested data is close to or above the free shared RAM memory, R may crash.")
.message("If the size of the requested data is close to or above the half of the free RAM memory, R may crash.")
.message(paste0("Will now proceed to read and process ", length(work_pieces), " data files:"))
if (length(work_pieces) < 30) {
lapply(work_pieces, function (x) .message(x[['file_path']], indent = 2))
} else {
.message("The list of files is long. You can check it after Start() finishes in the output '$Files'.", indent = 2, exdent = 5)
}
}
# Build the cluster of processes that will do the work and dispatch work pieces.
# The function .LoadDataFile is applied to each work piece. This function will
# open the data file, regrid if needed, subset, apply the mask,
# compute and apply the weights if needed,
# disable extreme values and store in the shared memory matrix.
#print("O")
if (!silent) {
.message("Loading... This may take several minutes...")
if (progress_message != '') {
.message(progress_message, appendLF = FALSE)
}
}
if (num_procs == 1) {
found_files <- lapply(work_pieces, .LoadDataFile,
shared_matrix_pointer = shared_matrix_pointer,
file_data_reader = file_data_reader,
synonims = synonims,
transform = transform,
transform_params = transform_params,
silent = silent, debug = debug)
} else {
cluster <- makeCluster(num_procs, outfile = "")
# Send the heavy work to the workers
work_errors <- try({
found_files <- clusterApplyLB(cluster, work_pieces, .LoadDataFile,
shared_matrix_pointer = shared_matrix_pointer,
file_data_reader = file_data_reader,
synonims = synonims,
transform = transform,
transform_params = transform_params,
silent = silent, debug = debug)
})
stopCluster(cluster)
}
if (!silent) {
if (progress_message != '') {
.message("\n", tag = '')
}
}
#print("P")
data_array <- array(bigmemory::as.matrix(data_array), dim = final_dims_fake)
gc()
# Load metadata and remove the metadata folder
if (!is.null(metadata_dims)) {
loaded_metadata_files <- list.files(metadata_folder)
loaded_metadata <- lapply(paste0(metadata_folder, '/', loaded_metadata_files), readRDS)
unlink(metadata_folder, recursive = TRUE)
return_metadata <- vector('list', length = prod(dim(array_of_metadata_flags)[metadata_dims]))
return_metadata[as.numeric(loaded_metadata_files)] <- loaded_metadata
dim(return_metadata) <- dim(array_of_metadata_flags[metadata_dims])
attr(data_array, 'Variables') <- return_metadata
# TODO: Try to infer data type from loaded_metadata
# as.integer(data_array)
}
failed_pieces <- work_pieces[which(unlist(found_files))]
for (failed_piece in failed_pieces) {
array_of_not_found_files <- do.call('[<-',
c(list(array_of_not_found_files),
as.list(failed_piece[['file_indices_in_array_of_files']]),
list(value = TRUE)))
}
if (any(array_of_not_found_files)) {
for (i in 1:prod(dim(array_of_files_to_load))) {
if (is.na(array_of_not_found_files[i])) {
array_of_files_to_load[i] <- NA
} else {
if (array_of_not_found_files[i]) {
array_of_not_found_files[i] <- array_of_files_to_load[i]
array_of_files_to_load[i] <- NA
} else {
array_of_not_found_files[i] <- NA
}
}
}
} else {
array_of_not_found_files <- NULL
}
} # End if (retrieve)
# Replace the vars and common vars by the transformed vars and common vars
for (i in 1:length(dat)) {
if (length(names(transformed_vars[[i]])) > 0) {
picked_vars[[i]][names(transformed_vars[[i]])] <- transformed_vars[[i]]
} else if (length(names(picked_vars_ordered[[i]])) > 0) {
picked_vars[[i]][names(picked_vars_ordered[[i]])] <- picked_vars_ordered[[i]]
}
}
if (length(names(transformed_common_vars)) > 0) {
picked_common_vars[names(transformed_common_vars)] <- transformed_common_vars
} else if (length(names(picked_common_vars_ordered)) > 0) {
picked_common_vars[names(picked_common_vars_ordered)] <- picked_common_vars_ordered
}
if (debug) {
print("-> THE TRANSFORMED VARS:")
print(str(transformed_vars))
print("-> THE PICKED VARS:")
print(str(picked_vars))
}
file_selectors <- NULL
for (i in 1:length(dat)) {
file_selectors[[dat[[i]][['name']]]] <- dat[[i]][['selectors']][which(names(dat[[i]][['selectors']]) %in% found_file_dims[[i]])]
}
if (retrieve) {
if (!silent) {
.message("Successfully retrieved data.")
}
var_backup <- attr(data_array, 'Variables')[[1]]
attr(data_array, 'Variables') <- NULL
attributes(data_array) <- c(attributes(data_array),
list(Variables = c(list(common = c(picked_common_vars, var_backup)),
picked_vars),
Files = array_of_files_to_load,
NotFoundFiles = array_of_not_found_files,
FileSelectors = file_selectors,
PatternDim = found_pattern_dim)
)
attr(data_array, 'class') <- c('startR_cube', attr(data_array, 'class'))
data_array
} else {
if (!silent) {
.message("Successfully discovered data dimensions.")
}
start_call <- match.call()
for (i in 2:length(start_call)) {
if (class(start_call[[i]]) %in% c('name', 'call')) {
start_call[[i]] <- eval.parent(start_call[[i]])
}
}
start_call[['retrieve']] <- TRUE
attributes(start_call) <- c(attributes(start_call),
list(Dimensions = final_dims_fake,
Variables = c(list(common = picked_common_vars), picked_vars),
ExpectedFiles = array_of_files_to_load,
FileSelectors = file_selectors,
PatternDim = found_pattern_dim,
MergedDims = if (merge_across_dims) {
inner_dims_across_files
} else {
NULL
},
SplitDims = if (split_multiselected_dims) {
all_split_dims
} else {
NULL
})
)
attr(start_call, 'class') <- c('startR_header', attr(start_call, 'class'))
start_call
}
}
# This function is the responsible for loading the data of each work
# piece.
.LoadDataFile <- function(work_piece, shared_matrix_pointer,
file_data_reader, synonims,
transform, transform_params,
silent = FALSE, debug = FALSE) {
# suppressPackageStartupMessages({library(bigmemory)})
### TODO: Specify dependencies as parameter
# suppressPackageStartupMessages({library(ncdf4)})
#print("1")
store_indices <- as.list(work_piece[['store_position']])
first_round_indices <- work_piece[['first_round_indices']]
second_round_indices <- work_piece[['second_round_indices']]
#print("2")
file_to_open <- work_piece[['file_path']]
sub_array <- file_data_reader(file_to_open, NULL,
work_piece[['file_selectors']],
first_round_indices, synonims)
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> LOADING A WORK PIECE")
print("-> STRUCTURE OF READ UNTRANSFORMED DATA:")
print(str(sub_array))
print("-> STRUCTURE OF VARIABLES TO TRANSFORM:")
print(str(work_piece[['vars_to_transform']]))
print("-> COMMON ARRAY DIMENSIONS:")
print(str(work_piece[['store_dims']]))
}
}
if (!is.null(sub_array)) {
# Apply data transformation once we have the data arrays.
if (!is.null(transform)) {
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> PROCEEDING TO TRANSFORM ARRAY")
print("-> DIMENSIONS OF ARRAY RIGHT BEFORE TRANSFORMING:")
print(dim(sub_array))
}
}
sub_array <- do.call(transform, c(list(data_array = sub_array,
variables = work_piece[['vars_to_transform']],
file_selectors = work_piece[['file_selectors']]),
transform_params))
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER TRANSFORMING:")
print(str(sub_array))
print("-> DIMENSIONS OF ARRAY RIGHT AFTER TRANSFORMING:")
print(dim(sub_array$data_array))
}
}
sub_array <- sub_array$data_array
# Subset with second round of indices
dims_to_crop <- which(!sapply(second_round_indices, is.null))
if (length(dims_to_crop) > 0) {
dimnames_to_crop <- names(second_round_indices)[dims_to_crop]
sub_array <- Subset(sub_array, dimnames_to_crop,
second_round_indices[dimnames_to_crop])
}
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER SUBSETTING WITH 2nd ROUND INDICES:")
print(str(sub_array))
}
}
}
metadata <- attr(sub_array, 'variables')
names_bk <- names(store_indices)
store_indices <- lapply(names(store_indices),
function (x) {
if (!(x %in% names(first_round_indices))) {
store_indices[[x]]
} else if (is.null(second_round_indices[[x]])) {
1:dim(sub_array)[x]
} else {
if (is.numeric(second_round_indices[[x]])) {
## TODO: Review carefully this line. Inner indices are all
## aligned to the left-most positions. If dataset A has longitudes
## 1, 2, 3, 4 but dataset B has only longitudes 3 and 4, then
## they will be stored as follows:
## 1, 2, 3, 4
## 3, 4, NA, NA
##x - min(x) + 1
1:length(second_round_indices[[x]])
} else {
1:length(second_round_indices[[x]])
}
}
})
names(store_indices) <- names_bk
if (debug) {
if (all(unlist(store_indices) == 1)) {
print("-> STRUCTURE OF FIRST ROUND INDICES FOR THIS WORK PIECE:")
print(str(first_round_indices))
print("-> STRUCTURE OF SECOND ROUND INDICES FOR THIS WORK PIECE:")
print(str(second_round_indices))
print("-> STRUCTURE OF STORE INDICES FOR THIS WORK PIECE:")
print(str(store_indices))
}
}
store_indices <- lapply(store_indices, as.integer)
store_dims <- work_piece[['store_dims']]
# split the storage work of the loaded subset in parts
largest_dim_name <- names(dim(sub_array))[which.max(dim(sub_array))]
max_parts <- length(store_indices[[largest_dim_name]])
# Indexing a data file of N MB with expand.grid takes 30*N MB
# The peak ram of Start is, minimum, 2 * total data to load from all files
# due to inefficiencies in other regions of the code
# The more parts we split the indexing done below in, the lower
# the memory footprint of the indexing and the fast.
# But more than 10 indexing iterations (parts) for each MB processed
# makes the iteration slower (tested empirically on BSC workstations).
subset_size_in_mb <- prod(dim(sub_array)) * 8 / 1024 / 1024
best_n_parts <- ceiling(subset_size_in_mb * 10)
# We want to set n_parts to a greater value than the one that would
# result in a memory footprint (of the subset indexing code below) equal
# to 2 * total data to load from all files.
# s = subset size in MB
# p = number of parts to break it in
# T = total size of data to load
# then, s / p * 30 = 2 * T
# then, p = s * 15 / T
min_n_parts <- ceiling(prod(dim(sub_array)) * 15 / prod(store_dims))
# Make sure we pick n_parts much greater than the minimum calculated
n_parts <- min_n_parts * 10
if (n_parts > best_n_parts) {
n_parts <- best_n_parts
}
# Boundary checks
if (n_parts < 1) {
n_parts <- 1
}
if (n_parts > max_parts) {
n_parts <- max_parts
}
if (n_parts > 1) {
make_parts <- function(length, n) {
clusters <- cut(1:length, n, labels = FALSE)
lapply(1:n, function(y) which(clusters == y))
}
part_indices <- make_parts(max_parts, n_parts)
parts <- lapply(part_indices,
function(x) {
store_indices[[largest_dim_name]][x]
})
} else {
part_indices <- list(1:max_parts)
parts <- store_indices[largest_dim_name]
}
# do the storage work
weights <- sapply(1:length(store_dims),
function(i) prod(c(1, store_dims)[1:i]))
part_indices_in_sub_array <- as.list(rep(TRUE, length(dim(sub_array))))
names(part_indices_in_sub_array) <- names(dim(sub_array))
data_array <- bigmemory::attach.big.matrix(shared_matrix_pointer)
for (i in 1:n_parts) {
store_indices[[largest_dim_name]] <- parts[[i]]
# Converting array indices to vector indices
matrix_indices <- do.call("expand.grid", store_indices)
# Given a matrix where each row is a set of array indices of an element
# the vector indices are computed
matrix_indices <- 1 + colSums(t(matrix_indices - 1) * weights)
part_indices_in_sub_array[[largest_dim_name]] <- part_indices[[i]]
data_array[matrix_indices] <- as.vector(do.call('[',
c(list(x = sub_array),
part_indices_in_sub_array)))
}
rm(data_array)
gc()
if (!is.null(work_piece[['save_metadata_in']])) {
saveRDS(metadata, file = work_piece[['save_metadata_in']])
}
}
if (!is.null(work_piece[['progress_amount']]) && !silent) {
message(work_piece[['progress_amount']], appendLF = FALSE)
}
is.null(sub_array)
}
startR-develop-explore-enh/R/Step.R 0000664 0000000 0000000 00000004523 13424417765 0017475 0 ustar 00root root 0000000 0000000 Step <- function(fun, target_dims, output_dims,
use_libraries = NULL, use_attributes = NULL) {
# Check fun
if (!is.function(fun)) {
stop("Parameter 'fun' must be a function.")
}
# Check target_dims
if (is.character(target_dims)) {
target_dims <- list(target_dims)
names(target_dims) <- 'input1'
}
if (is.list(target_dims)) {
sapply(target_dims,
function(x) {
if (!(is.character(x) && (length(x) > 0))) {
stop("Parameter 'target_dims' must be one or a list of vectors ",
"of target dimension names for each data array input in ",
"the function 'fun'.")
}
})
if (is.null(names(target_dims))) {
names(target_dims) <- paste0('input', 1:length(target_dims))
}
}
# Check output_dims
if (is.character(output_dims) || is.null(output_dims)) {
output_dims <- list(output_dims)
names(output_dims) <- 'output1'
}
if (is.list(output_dims)) {
sapply(output_dims,
function(x) {
if (!(is.character(x) || is.null(x))) {
stop("Parameter 'output_dims' must be one or a list of vectors ",
"of target dimension names for each data array input in ",
"the function 'fun'.")
}
})
if (is.null(names(output_dims))) {
names(output_dims) <- paste0('output', 1:length(output_dims))
}
}
# Check use_libraries
if (!is.null(use_libraries)) {
if (!is.character(use_libraries)) {
stop("Parameter 'use_libraries' must be a vector of character ",
"strings.")
}
}
# Check use_attributes
if (!is.null(use_attributes)) {
raise_error <- FALSE
if (!is.list(use_attributes)) {
raise_error <- TRUE
}
if (!all(sapply(use_attributes,
function(x) {
is.character(x) ||
(is.list(x) && all(sapply(x, is.character)))
}))) {
raise_error <- TRUE
}
if (raise_error) {
stop("Parameter 'use_attributes' must be a list of vectors of ",
"character strings or of lists of vectors of character ",
"strings.")
}
}
attr(fun, 'TargetDims') <- target_dims
attr(fun, 'OutputDims') <- output_dims
attr(fun, 'UseLibraries') <- use_libraries
attr(fun, 'UseAttributes') <- use_attributes
# TODO: Add provenance info
class(fun) <- 'startR_step_fun'
fun
}
startR-develop-explore-enh/R/Subset.R 0000664 0000000 0000000 00000006417 13424417765 0020033 0 ustar 00root root 0000000 0000000 Subset <- function(x, along, indices, drop = FALSE) {
# Check x
if (!is.array(x)) {
stop("Input array 'x' must be a numeric array.")
}
# Take the input array dimension names
dim_names <- attr(x, 'dimensions')
if (!is.character(dim_names)) {
dim_names <- names(dim(x))
}
if (!is.character(dim_names)) {
if (any(sapply(along, is.character))) {
stop("The input array 'x' doesn't have labels for the dimensions but the parameter 'along' contains dimension names.")
}
}
# Check along
if (any(sapply(along, function(x) !is.numeric(x) && !is.character(x)))) {
stop("All provided dimension indices in 'along' must be integers or character strings.")
}
if (any(sapply(along, is.character))) {
req_dimnames <- along[which(sapply(along, is.character))]
if (length(unique(req_dimnames)) < length(req_dimnames)) {
stop("The parameter 'along' must not contain repeated dimension names.")
}
along[which(sapply(along, is.character))] <- match(req_dimnames, dim_names)
if (any(is.na(along))) {
stop("Could not match all dimension names in 'indices' with dimension names in input array 'x'.")
}
along <- as.numeric(along)
}
# Check indices
if (!is.list(indices)) {
indices <- list(indices)
}
# Check parameter drop
dims_to_drop <- c()
if (is.character(drop)) {
if (drop == 'all') {
drop <- TRUE
} else if (any(drop %in% c('selected', 'non-selected', 'none'))) {
if (drop == 'selected') {
dims_to_drop <- along[which(sapply(indices, length) == 1)]
} else if (drop == 'non-selected') {
dims_to_drop <- dim(x) == 1
dims_to_drop[along] <- FALSE
dims_to_drop <- which(dims_to_drop)
}
drop <- FALSE
} else {
stop("Parameter 'drop' must be one of TRUE, FALSE, 'all', 'selected', 'non-selected', 'none'.")
}
} else if (!is.logical(drop)) {
stop("Parameter 'drop' must be one of TRUE, FALSE, 'all', 'selected', 'non-selected', 'none'.")
}
# Take the subset
nd <- length(dim(x))
index <- as.list(rep(TRUE, nd))
index[along] <- indices
subset <- eval(as.call(c(as.name("["), as.name("x"), index, drop = drop)))
# If dropped all dimensions, need to drop dimnames too
if (is.character(dim_names) && drop == TRUE) {
dim_names_to_remove <- unique(c(along[which(sapply(indices, length) == 1)],
which(dim(x) == 1)))
if (length(dim_names_to_remove) > 0) {
dim_names <- dim_names[-dim_names_to_remove]
}
}
# Amend the final dimensions and put dimnames and attributes
metadata <- attributes(x)
metadata[['dim']] <- dim(subset)
if (length(dims_to_drop) > 0) {
metadata[['dim']] <- metadata[['dim']][-dims_to_drop]
if (is.character(dim_names)) {
names(metadata[['dim']]) <- dim_names[-dims_to_drop]
if ('dimensions' %in% names(attributes(x))) {
metadata[['dimensions']] <- dim_names[-dims_to_drop]
}
}
if (length(metadata[['dim']]) == 0) {
metadata['dim'] <- list(NULL)
metadata['dimensions'] <- list(NULL)
}
} else if (is.character(dim_names)) {
names(metadata[['dim']]) <- dim_names
if ('dimensions' %in% names(attributes(x))) {
metadata[['dimensions']] <- dim_names
}
}
attributes(subset) <- metadata
subset
}
startR-develop-explore-enh/R/Utils.R 0000664 0000000 0000000 00000072452 13424417765 0017670 0 ustar 00root root 0000000 0000000 indices <- function(x) {
attr(x, 'indices') <- TRUE
attr(x, 'values') <- FALSE
attr(x, 'chunk') <- c(chunk = 1, n_chunks = 1)
x
}
values <- function(x) {
attr(x, 'indices') <- FALSE
attr(x, 'values') <- TRUE
attr(x, 'chunk') <- c(chunk = 1, n_chunks = 1)
x
}
chunk <- function(chunk, n_chunks, selectors) {
if (any(chunk > n_chunks)) {
stop("Requested chunk index out of bounds.")
}
if (length(chunk) == 1 && length(n_chunks) == 1) {
if (!is.null(attr(selectors, 'chunk'))) {
attr(selectors, 'chunk') <- c((attr(selectors, 'chunk')['chunk'] - 1) * n_chunks +
chunk,
attr(selectors, 'chunk')['n_chunks'] * n_chunks)
} else {
attr(selectors, 'chunk') <- c(chunk = unname(chunk), n_chunks = unname(n_chunks))
}
} else {
# Chunking arrays of multidimensional selectors.
# This should be done in Start.R but implies modifications.
if (length(chunk) != length(n_chunks)) {
stop("Wrong chunk specification.")
}
if (!attr(selectors, 'values')) {
stop("Multidimensional chunking only available when selector ",
"values provided.")
}
if (is.null(dim(selectors))) {
stop("Multidimensional chunking only available when multidimensional ",
"selector values provided.")
}
if (length(dim(selectors)) != length(chunk)) {
stop("As many chunk indices and chunk lengths as dimensions in the ",
"multidimensional selector array must be specified.")
}
old_indices <- attr(selectors, 'indices')
old_values <- attr(selectors, 'values')
selectors <- Subset(selectors, 1:length(chunk),
lapply(1:length(chunk),
function(x) {
n_indices <- dim(selectors)[x]
chunk_sizes <- rep(floor(n_indices / n_chunks[x]), n_chunks[x])
chunks_to_extend <- n_indices - chunk_sizes[1] * n_chunks[x]
if (chunks_to_extend > 0) {
chunk_sizes[1:chunks_to_extend] <- chunk_sizes[1:chunks_to_extend] + 1
}
chunk_size <- chunk_sizes[chunk[x]]
offset <- 0
if (chunk[x] > 1) {
offset <- sum(chunk_sizes[1:(chunk[x] - 1)])
}
1:chunk_sizes[chunk[x]] + offset
}))
attr(selectors, 'indices') <- old_indices
attr(selectors, 'values') <- old_values
}
selectors
}
.ReplaceVariablesInString <- function(string, replace_values, allow_undefined_key_vars = FALSE) {
# This function replaces all the occurrences of a variable in a string by
# their corresponding string stored in the replace_values.
if (length(strsplit(string, "\\$")[[1]]) > 1) {
parts <- strsplit(string, "\\$")[[1]]
output <- ""
i <- 0
for (part in parts) {
if (i %% 2 == 0) {
output <- paste(output, part, sep = "")
} else {
if (part %in% names(replace_values)) {
output <- paste(output, .ReplaceVariablesInString(replace_values[[part]], replace_values, allow_undefined_key_vars), sep = "")
} else if (allow_undefined_key_vars) {
output <- paste0(output, "$", part, "$")
} else {
stop(paste('Error: The variable $', part, '$ was not defined in the configuration file.', sep = ''))
}
}
i <- i + 1
}
output
} else {
string
}
}
.ReplaceGlobExpressions <- function(path_with_globs, actual_path,
replace_values, tags_to_keep,
dataset_name, permissive) {
# The goal of this function is to replace the shell globbing expressions in
# a path pattern (that may contain shell globbing expressions and Load()
# tags) by the corresponding part of the real existing path.
# What is done actually is to replace all the values of the tags in the
# actual path by the corresponding $TAG$
#
# It takes mainly two inputs. The path with expressions and tags, e.g.:
# /data/experiments/*/$EXP_NAME$/$VAR_NAME$/$VAR_NAME$_*$START_DATE$*.nc
# and a complete known path to one of the matching files, e.g.:
# /data/experiments/ecearth/i00k/tos/tos_fc0-1_19901101_199011-199110.nc
# and it returns the path pattern but without shell globbing expressions:
# /data/experiments/ecearth/$EXP_NAME$/$VAR_NAME$/$VAR_NAME$_fc0-1_$START_DATE$_199011-199110.nc
#
# To do that, it needs also as inputs the list of replace values (the
# association of each tag to their value).
#
# All the tags not present in the parameter tags_to_keep will be repalced.
#
# Not all cases can be resolved with the implemented algorithm. In an
# unsolvable case a warning is given and one possible guess is returned.
#
# In some cases it is interesting to replace only the expressions in the
# path to the file, but not the ones in the file name itself. To keep the
# expressions in the file name, the parameter permissive can be set to
# TRUE. To replace all the expressions it can be set to FALSE.
# Tests
#a <- "/esarchive/exp/ecearth/a13c/3hourly/$var$_*/$var$_*-LR_historical_r1i1p1f1_gr_$chunk$.nc"
#b <- "/esarchive/exp/ecearth/a13c/3hourly/psl_f6h/psl_E3hrPt_EC-Earth3-LR_historical_r1i1p1f1_gr_195001010000-195001312100.nc"
#c <- list(dat = 'dat1', var = 'psl', chunk = '195001010000-195001312100')
#d <- c('dat', 'var', 'chunk')
#e <- 'dat1'
#f <- FALSE #TRUE/0/1/2/3
#r <- .ReplaceGlobExpressions(a, b, c, d, e, f)
clean <- function(x) {
if (nchar(x) > 0) {
x <- gsub('\\\\', '', x)
x <- gsub('\\^', '', x)
x <- gsub('\\$', '', x)
x <- unname(sapply(strsplit(x, '[',fixed = TRUE)[[1]], function(y) gsub('.*]', '.', y)))
do.call(paste0, as.list(x))
} else {
x
}
}
strReverse <- function(x) sapply(lapply(strsplit(x, NULL), rev), paste, collapse = "")
if (permissive == 0) {
permissive <- FALSE
} else {
if (permissive == TRUE) {
permissive_levels <- 1
} else {
permissive_levels <- round(permissive[1])
permissive <- TRUE
}
}
if (permissive) {
actual_path_chunks <- strsplit(actual_path, '/')[[1]]
if (permissive_levels >= length(actual_path_chunks)) {
stop("Error: Provided levels out of scope in parameter 'permissive'.")
}
permissive_levels <- 1:permissive_levels
permissive_levels <- length(actual_path_chunks) - (rev(permissive_levels) - 1)
actual_path <- paste(actual_path_chunks[-permissive_levels], collapse = '/')
file_name <- paste(actual_path_chunks[permissive_levels], collapse = '/')
if (length(actual_path_chunks) > 1) {
file_name <- paste0('/', file_name)
}
path_with_globs_chunks <- strsplit(path_with_globs, '/')[[1]]
path_with_globs <- paste(path_with_globs_chunks[-permissive_levels],
collapse = '/')
path_with_globs_no_tags <- .ReplaceVariablesInString(path_with_globs, replace_values)
file_name_with_globs <- paste(path_with_globs_chunks[permissive_levels], collapse = '/')
if (length(path_with_globs_chunks) > 1) {
file_name_with_globs <- paste0('/', file_name_with_globs)
}
right_known <- head(strsplit(file_name_with_globs, '*', fixed = TRUE)[[1]], 1)
right_known_no_tags <- .ReplaceVariablesInString(right_known, replace_values)
path_with_globs_no_tags_rx <- utils::glob2rx(paste0(path_with_globs_no_tags, right_known_no_tags))
match <- regexpr(gsub('$', '', path_with_globs_no_tags_rx, fixed = TRUE), paste0(actual_path, file_name))
if (match != 1) {
stop("Incorrect parameters to replace glob expressions. The path with expressions does not match the actual path.")
}
#if (attr(match, 'match.length') - nchar(right_known_no_tags) < nchar(actual_path)) {
# path_with_globs_no_tags <- paste0(path_with_globs_no_tags, right_known_no_tags, '*')
# file_name_with_globs <- sub(right_known, '/*', file_name_with_globs)
#}
}
path_with_globs_rx <- utils::glob2rx(path_with_globs)
values_to_replace <- c()
tags_to_replace_starts <- c()
tags_to_replace_ends <- c()
give_warning <- FALSE
for (tag in tags_to_keep) {
matches <- gregexpr(paste0('$', tag, '$'), path_with_globs_rx, fixed = TRUE)[[1]]
lengths <- attr(matches, 'match.length')
if (!(length(matches) == 1 && matches[1] == -1)) {
for (i in 1:length(matches)) {
left <- NULL
if (matches[i] > 1) {
left <- .ReplaceVariablesInString(substr(path_with_globs_rx, 1, matches[i] - 1), replace_values)
left_known <- strReverse(head(strsplit(strReverse(left), strReverse('.*'), fixed = TRUE)[[1]], 1))
}
right <- NULL
if ((matches[i] + lengths[i] - 1) < nchar(path_with_globs_rx)) {
right <- .ReplaceVariablesInString(substr(path_with_globs_rx, matches[i] + lengths[i], nchar(path_with_globs_rx)), replace_values)
right_known <- head(strsplit(right, '.*', fixed = TRUE)[[1]], 1)
}
final_match <- NULL
match_limits <- NULL
if (!is.null(left)) {
left_match <- regexpr(paste0(left, replace_values[[tag]], right_known), actual_path)
match_len <- attr(left_match, 'match.length')
left_match_limits <- c(left_match + match_len - 1 - nchar(clean(right_known)) - nchar(replace_values[[tag]]) + 1,
left_match + match_len - 1 - nchar(clean(right_known)))
if (!(left_match < 1)) {
match_limits <- left_match_limits
}
}
right_match <- NULL
if (!is.null(right)) {
right_match <- regexpr(paste0(left_known, replace_values[[tag]], right), actual_path)
match_len <- attr(right_match, 'match.length')
right_match_limits <- c(right_match + nchar(clean(left_known)),
right_match + nchar(clean(left_known)) + nchar(replace_values[[tag]]) - 1)
if (is.null(match_limits) && !(right_match < 1)) {
match_limits <- right_match_limits
}
}
if (!is.null(right_match) && !is.null(left_match)) {
if (!identical(right_match_limits, left_match_limits)) {
give_warning <- TRUE
}
}
if (is.null(match_limits)) {
stop("Too complex path pattern specified for ", dataset_name,
". Specify a simpler path pattern for this dataset.")
}
values_to_replace <- c(values_to_replace, tag)
tags_to_replace_starts <- c(tags_to_replace_starts, match_limits[1])
tags_to_replace_ends <- c(tags_to_replace_ends, match_limits[2])
}
}
}
actual_path_with_tags <- actual_path
if (length(tags_to_replace_starts) > 0) {
reorder <- sort(tags_to_replace_starts, index.return = TRUE)
tags_to_replace_starts <- reorder$x
values_to_replace <- values_to_replace[reorder$ix]
tags_to_replace_ends <- tags_to_replace_ends[reorder$ix]
while (length(values_to_replace) > 0) {
actual_path_with_tags <- paste0(substr(actual_path_with_tags, 1, head(tags_to_replace_starts, 1) - 1),
'$', head(values_to_replace, 1), '$',
substr(actual_path_with_tags, head(tags_to_replace_ends, 1) + 1, nchar(actual_path_with_tags)))
extra_chars <- nchar(head(values_to_replace, 1)) + 2 - (head(tags_to_replace_ends, 1) - head(tags_to_replace_starts, 1) + 1)
values_to_replace <- values_to_replace[-1]
tags_to_replace_starts <- tags_to_replace_starts[-1]
tags_to_replace_ends <- tags_to_replace_ends[-1]
tags_to_replace_starts <- tags_to_replace_starts + extra_chars
tags_to_replace_ends <- tags_to_replace_ends + extra_chars
}
}
if (give_warning) {
.warning(paste0("Too complex path pattern specified for ", dataset_name,
". Double check carefully the '$Files' fetched for this dataset or specify a simpler path pattern."))
}
if (permissive) {
paste0(actual_path_with_tags, file_name_with_globs)
} else {
actual_path_with_tags
}
}
.FindTagValue <- function(path_with_globs_and_tag, actual_path, tag, select_one = FALSE) {
if (!all(sapply(c(path_with_globs_and_tag, actual_path, tag), is.character))) {
stop("All 'path_with_globs_and_tag', 'actual_path' and 'tag' must be character strings.")
}
if (grepl('$', tag, fixed = TRUE)) {
stop("The provided 'tag' must not contain '$' symbols.")
}
full_tag <- paste0('$', tag, '$')
if (!grepl(full_tag, path_with_globs_and_tag, fixed = TRUE)) {
stop("The provided 'path_with_globs_and_tag' must contain the tag in 'tag' surrounded by '$' symbols.")
}
parts <- strsplit(path_with_globs_and_tag, full_tag, fixed = TRUE)[[1]]
if (length(parts) == 1) {
parts <- c(parts, '')
}
parts[1] <- paste0('^', parts[1])
parts[length(parts)] <- paste0(parts[length(parts)], '$')
if (select_one) {
# Group the parts in 2 groups, in a way that both groups have a number
# of characters as similar as possible.
part_lengths <- sapply(parts, nchar)
group_len_diffs <- sapply(1:(length(parts) - 1),
function(x) {
sum(part_lengths[(x + 1):length(parts)]) - sum(part_lengths[1:x])
}
)
left_parts_to_try <- which.min(abs(group_len_diffs))[1]
} else {
left_parts_to_try <- 1:(length(parts) - 1)
}
found_values <- list()
# We set clp (chosen left part) to the index of each possible left part
# and see what value we find for that chosen part.
for (clp in left_parts_to_try) {
found_value <- try({
left_expr <- paste(parts[1:clp], collapse = full_tag)
left_expr <- gsub('?', '.', left_expr, fixed = TRUE)
# The .*? will force lazy evaluation (find the shortest match from the
# beginning of the actual_path).
left_expr <- gsub('*', '.*?', left_expr, fixed = TRUE)
left_expr <- gsub(full_tag, '.*?', left_expr, fixed = TRUE)
left_match <- regexec(left_expr, actual_path)[[1]]
if (left_match < 0) {
stop("Unexpected error in .FindTagValue.")
}
right_expr <- paste(parts[(clp + 1):(length(parts))], collapse = full_tag)
right_expr <- gsub('?', '.', right_expr, fixed = TRUE)
# For lazy evaulation to work, pattern and string have to be reversed.
right_expr <- gsub('*', '.*?', right_expr, fixed = TRUE)
right_expr <- gsub(full_tag, '.*?', right_expr, fixed = TRUE)
right_expr <- gsub('$', '^', right_expr, fixed = TRUE)
rev_str <- function(s) {
paste(rev(strsplit(s, NULL)[[1]]), collapse = '')
}
right_expr <- rev_str(right_expr)
right_expr <- gsub('?*.', '.*?', right_expr, fixed = TRUE)
right_match <- regexec(right_expr, rev_str(actual_path))[[1]]
if (right_match < 0) {
stop("Unexpected error in .FindTagValue.")
}
right_match[] <- nchar(actual_path) -
(right_match[] + attr(right_match, 'match.length') - 1) + 1
if ((left_match + attr(left_match, 'match.length')) >
(right_match - 1)) {
NULL
} else {
substr(actual_path, left_match + attr(left_match, 'match.length'),
right_match - 1)
}
})
if (!('try-error' %in% class(found_value))) {
found_values <- c(found_values, list(found_value))
}
}
# We take the shortest found value.
if (length(found_values) > 0) {
found_values_lens <- sapply(found_values, nchar)
selected_found_value <- which.min(found_values_lens)[1]
found_values[[selected_found_value]]
} else {
stop("Unexpected error in .FindTagValue. No matches found.")
}
}
.message <- function(...) {
# Function to use the 'message' R function with our custom settings
# Default: new line at end of message, indent to 0, exdent to 3,
# collapse to \n*
args <- list(...)
## In case we need to specify message arguments
if (!is.null(args[["appendLF"]])) {
appendLF <- args[["appendLF"]]
} else {
## Default value in message function
appendLF <- TRUE
}
if (!is.null(args[["domain"]])) {
domain <- args[["domain"]]
} else {
## Default value in message function
domain <- NULL
}
args[["appendLF"]] <- NULL
args[["domain"]] <- NULL
## To modify strwrap indent and exdent arguments
if (!is.null(args[["indent"]])) {
indent <- args[["indent"]]
} else {
indent <- 0
}
if (!is.null(args[["exdent"]])) {
exdent <- args[["exdent"]]
} else {
exdent <- 3
}
args[["indent"]] <- NULL
args[["exdent"]] <- NULL
## To modify paste collapse argument
if (!is.null(args[["collapse"]])) {
collapse <- args[["collapse"]]
} else {
collapse <- "\n*"
}
args[["collapse"]] <- NULL
## Message tag
if (!is.null(args[["tag"]])) {
tag <- args[["tag"]]
} else {
tag <- "* "
}
args[["tag"]] <- NULL
message(paste0(tag, paste(strwrap(
args, indent = indent, exdent = exdent
), collapse = collapse)), appendLF = appendLF, domain = domain)
}
.warning <- function(...) {
# Function to use the 'warning' R function with our custom settings
# Default: no call information, indent to 0, exdent to 3,
# collapse to \n
args <- list(...)
## In case we need to specify warning arguments
if (!is.null(args[["call."]])) {
call <- args[["call."]]
} else {
## Default: don't show info about the call where the warning came up
call <- FALSE
}
if (!is.null(args[["immediate."]])) {
immediate <- args[["immediate."]]
} else {
## Default value in warning function
immediate <- FALSE
}
if (!is.null(args[["noBreaks."]])) {
noBreaks <- args[["noBreaks."]]
} else {
## Default value warning function
noBreaks <- FALSE
}
if (!is.null(args[["domain"]])) {
domain <- args[["domain"]]
} else {
## Default value warning function
domain <- NULL
}
args[["call."]] <- NULL
args[["immediate."]] <- NULL
args[["noBreaks."]] <- NULL
args[["domain"]] <- NULL
## To modify strwrap indent and exdent arguments
if (!is.null(args[["indent"]])) {
indent <- args[["indent"]]
} else {
indent <- 0
}
if (!is.null(args[["exdent"]])) {
exdent <- args[["exdent"]]
} else {
exdent <- 3
}
args[["indent"]] <- NULL
args[["exdent"]] <- NULL
## To modify paste collapse argument
if (!is.null(args[["collapse"]])) {
collapse <- args[["collapse"]]
} else {
collapse <- "\n!"
}
args[["collapse"]] <- NULL
## Warning tag
if (!is.null(args[["tag"]])) {
tag <- args[["tag"]]
} else {
tag <- "! Warning: "
}
args[["tag"]] <- NULL
warning(paste0(tag, paste(strwrap(
args, indent = indent, exdent = exdent
), collapse = collapse)), call. = call, immediate. = immediate,
noBreaks. = noBreaks, domain = domain)
}
# Function to permute arrays of non-atomic elements (e.g. POSIXct)
# Function to permute arrays of non-atomic elements (e.g. POSIXct)
.aperm2 <- function(x, new_order) {
old_dims <- dim(x)
attr_bk <- attributes(x)
if ('dim' %in% names(attr_bk)) {
attr_bk[['dim']] <- NULL
}
if (is.numeric(x)) {
x <- aperm(x, new_order)
} else {
y <- array(1:length(x), dim = dim(x))
y <- aperm(y, new_order)
x <- x[as.vector(y)]
}
dim(x) <- old_dims[new_order]
attributes(x) <- c(attributes(x), attr_bk)
x
}
# Function to bind arrays of non-atomic elements (e.g. POSIXct)
# 'x' and 'y' must have dimension names
# parameter 'along' must be a dimension name
.abind2 <- function(x, y, along) {
x_along <- which(names(dim(x)) == along)
if (x_along != length(dim(x))) {
tmp_order_x <- c((1:length(dim(x)))[-x_along], x_along)
x <- .aperm2(x, tmp_order_x)
}
y_along <- which(names(dim(y)) == along)
if (y_along != length(dim(y))) {
tmp_order_y <- c((1:length(dim(y)))[-y_along], y_along)
y <- .aperm2(y, tmp_order_y)
}
r <- c(x, y)
new_dims <- dim(x)
new_dims[length(new_dims)] <- dim(x)[length(dim(x))] + dim(y)[length(dim(y))]
dim(r) <- new_dims
if (x_along != length(dim(x))) {
final_order <- NULL
if (x_along > 1) {
final_order <- c(final_order, (1:length(dim(r)))[1:(x_along - 1)])
}
final_order <- c(final_order, length(dim(r)))
final_order <- c(final_order, (1:length(dim(r)))[x_along:(length(dim(r)) - 1)])
r <- .aperm2(r, final_order)
}
r
}
# This function is a helper for the function .MergeArrays.
# It expects as inputs two named numeric vectors, and it extends them
# with dimensions of length 1 until an ordered common dimension
# format is reached.
.MergeArrayDims <- function(dims1, dims2) {
new_dims1 <- c()
new_dims2 <- c()
while (length(dims1) > 0) {
if (names(dims1)[1] %in% names(dims2)) {
pos <- which(names(dims2) == names(dims1)[1])
dims_to_add <- rep(1, pos - 1)
if (length(dims_to_add) > 0) {
names(dims_to_add) <- names(dims2[1:(pos - 1)])
}
new_dims1 <- c(new_dims1, dims_to_add, dims1[1])
new_dims2 <- c(new_dims2, dims2[1:pos])
dims1 <- dims1[-1]
dims2 <- dims2[-c(1:pos)]
} else {
new_dims1 <- c(new_dims1, dims1[1])
new_dims2 <- c(new_dims2, 1)
names(new_dims2)[length(new_dims2)] <- names(dims1)[1]
dims1 <- dims1[-1]
}
}
if (length(dims2) > 0) {
dims_to_add <- rep(1, length(dims2))
names(dims_to_add) <- names(dims2)
new_dims1 <- c(new_dims1, dims_to_add)
new_dims2 <- c(new_dims2, dims2)
}
list(new_dims1, new_dims2)
}
# This function takes two named arrays and merges them, filling with
# NA where needed.
# dim(array1)
# 'b' 'c' 'e' 'f'
# 1 3 7 9
# dim(array2)
# 'a' 'b' 'd' 'f' 'g'
# 2 3 5 9 11
# dim(.MergeArrays(array1, array2, 'b'))
# 'a' 'b' 'c' 'e' 'd' 'f' 'g'
# 2 4 3 7 5 9 11
.MergeArrays <- function(array1, array2, along) {
if (!(is.null(array1) || is.null(array2))) {
if (!(identical(names(dim(array1)), names(dim(array2))) &&
identical(dim(array1)[-which(names(dim(array1)) == along)],
dim(array2)[-which(names(dim(array2)) == along)]))) {
new_dims <- .MergeArrayDims(dim(array1), dim(array2))
dim(array1) <- new_dims[[1]]
dim(array2) <- new_dims[[2]]
for (j in 1:length(dim(array1))) {
if (names(dim(array1))[j] != along) {
if (dim(array1)[j] != dim(array2)[j]) {
if (which.max(c(dim(array1)[j], dim(array2)[j])) == 1) {
na_array_dims <- dim(array2)
na_array_dims[j] <- dim(array1)[j] - dim(array2)[j]
na_array <- array(dim = na_array_dims)
array2 <- abind(array2, na_array, along = j)
names(dim(array2)) <- names(na_array_dims)
} else {
na_array_dims <- dim(array1)
na_array_dims[j] <- dim(array2)[j] - dim(array1)[j]
na_array <- array(dim = na_array_dims)
array1 <- abind(array1, na_array, along = j)
names(dim(array1)) <- names(na_array_dims)
}
}
}
}
}
if (!(along %in% names(dim(array2)))) {
stop("The dimension specified in 'along' is not present in the ",
"provided arrays.")
}
array1 <- abind(array1, array2, along = which(names(dim(array1)) == along))
names(dim(array1)) <- names(dim(array2))
} else if (is.null(array1)) {
array1 <- array2
}
array1
}
# Takes as input a list of arrays. The list must have named dimensions.
.MergeArrayOfArrays <- function(array_of_arrays) {
MergeArrays <- .MergeArrays
array_dims <- (dim(array_of_arrays))
dim_names <- names(array_dims)
# Merge the chunks.
for (dim_index in 1:length(dim_names)) {
dim_sub_array_of_chunks <- dim_sub_array_of_chunk_indices <- NULL
if (dim_index < length(dim_names)) {
dim_sub_array_of_chunks <- array_dims[(dim_index + 1):length(dim_names)]
names(dim_sub_array_of_chunks) <- dim_names[(dim_index + 1):length(dim_names)]
dim_sub_array_of_chunk_indices <- dim_sub_array_of_chunks
sub_array_of_chunk_indices <- array(1:prod(dim_sub_array_of_chunk_indices),
dim_sub_array_of_chunk_indices)
} else {
sub_array_of_chunk_indices <- NULL
}
sub_array_of_chunks <- vector('list', prod(dim_sub_array_of_chunks))
dim(sub_array_of_chunks) <- dim_sub_array_of_chunks
for (i in 1:prod(dim_sub_array_of_chunks)) {
if (!is.null(sub_array_of_chunk_indices)) {
chunk_sub_indices <- which(sub_array_of_chunk_indices == i, arr.ind = TRUE)[1, ]
} else {
chunk_sub_indices <- NULL
}
for (j in 1:(array_dims[dim_index])) {
new_chunk <- do.call('[[', c(list(x = array_of_arrays),
as.list(c(j, chunk_sub_indices))))
if (is.null(new_chunk)) {
stop("Chunks missing.")
}
if (is.null(sub_array_of_chunks[[i]])) {
sub_array_of_chunks[[i]] <- new_chunk
} else {
sub_array_of_chunks[[i]] <- MergeArrays(sub_array_of_chunks[[i]],
new_chunk,
dim_names[dim_index])
}
}
}
array_of_arrays <- sub_array_of_chunks
rm(sub_array_of_chunks)
gc()
}
array_of_arrays[[1]]
}
.MergeChunks <- function(shared_dir, suite_id, remove) {
MergeArrays <- startR:::.MergeArrays
args <- NULL
shared_dir <- paste0(shared_dir, '/STARTR_CHUNKING_', suite_id)
all_chunk_files_original <- list.files(paste0(shared_dir, '/'), '.*\\.Rds$')
all_chunk_files <- gsub('\\.Rds$', '', all_chunk_files_original)
chunk_filename_parts_all_components <- strsplit(all_chunk_files, '__')
all_components <- sapply(chunk_filename_parts_all_components, '[[', 1)
components <- unique(all_components)
result <- vector('list', length(components))
names(result) <- components
for (component in components) {
chunk_files_original <- all_chunk_files_original[which(all_components == component)]
chunk_filename_parts <- chunk_filename_parts_all_components[which(all_components == component)]
chunk_filename_parts <- lapply(chunk_filename_parts, '[', -1)
if (length(unique(sapply(chunk_filename_parts, length))) != 1) {
stop("Detected chunks with more dimensions than others.")
}
dim_names <- sapply(chunk_filename_parts[[1]],
# TODO: strsplit by the last '_' match, not the first.
function(x) strsplit(x, '_')[[1]][1])
# TODO check all files have exactly the same dimnames
found_chunk_indices <- sapply(chunk_filename_parts,
function(x) as.numeric(sapply(strsplit(x, '_'), '[[', 2)))
found_chunk_indices <- array(found_chunk_indices,
dim = c(length(dim_names),
length(found_chunk_indices) / length(dim_names))
)
found_chunks_str <- apply(found_chunk_indices, 2, paste, collapse = '_')
if (length(args) > 0) {
if ((length(args) %% 2) != 0) {
stop("Wrong number of parameters.")
}
expected_dim_names <- args[(1:(length(args) / 2) - 1) * 2 + 1]
if (any(!is.character(expected_dim_names))) {
stop("Expected dimension names in parameters at odd positions.")
}
dim_indices <- args[(1:(length(args) / 2) - 1) * 2 + 2]
if (!any(dim_indices == 'all')) {
stop("Expected one dimension index to be 'all'.")
}
dim_to_merge <- which(dim_indices == 'all')
if (length(dim_indices) > 1) {
if (!all(is.numeric(dim_indices[-dim_to_merge]))) {
stop("Expected all dimension index but one to be numeric.")
}
}
# Check expected dim names match dim names
## TODO
# Merge indices that vary along dim_to_merge whereas other fixed by dim_indices
# REMOVE FILES
## TODO
stop("Feature not implemented.")
} else {
chunks_indices <- 1:length(dim_names)
chunks_indices <- lapply(chunks_indices, function(x) sort(unique(found_chunk_indices[x, ])))
names(chunks_indices) <- dim_names
# Load all found chunks into the array 'array_of_chuks'.
array_dims <- sapply(chunks_indices, length)
names(array_dims) <- dim_names
array_of_chunks <- vector('list', prod(array_dims))
dim(array_of_chunks) <- array_dims
array_of_chunks_indices <- array(1:prod(array_dims), array_dims)
for (i in 1:prod(array_dims)) {
chunk_indices <- which(array_of_chunks_indices == i, arr.ind = TRUE)[1, ]
j <- 1
chunk_indices_on_file <- sapply(chunk_indices,
function(x) {
r <- chunks_indices[[j]][x]
j <<- j + 1
r
})
found_chunk <- which(found_chunks_str == paste(chunk_indices_on_file,
collapse = '_'))[1]
if (length(found_chunk) > 0) {
array_of_chunks[[i]] <- readRDS(paste0(shared_dir, '/',
chunk_files_original[found_chunk]))
}
}
result[[component]] <- startR:::.MergeArrayOfArrays(array_of_chunks)
rm(array_of_chunks)
gc()
}
}
if (remove) {
sapply(all_chunk_files_original,
function(x) {
file.remove(paste0(shared_dir, '/', x))
})
}
result
}
startR-develop-explore-enh/README.md 0000664 0000000 0000000 00000014643 13424417765 0017521 0 ustar 00root root 0000000 0000000 ## startR - Retrieval and processing of multidimensional datasets
startR is an R package developed at the Barcelona Supercomputing Center which implements the MapReduce paradigm (a.k.a. domain decomposition) on HPCs in a way transparent to the user and specially oriented to complex multidimensional datasets.
Following the startR framework, the user can represent in a one-page startR script all the information that defines a use case, including:
- the involved (multidimensional) data sources and the distribution of the data files
- the workflow of operations to be applied, over which data sources and dimensions
- the HPC platform properties and the configuration of the execution
When run, the script triggers the execution of the defined workflow. Furthermore, the EC-Flow workflow manager is transparently used to dispatch tasks onto the HPC, and the user can employ its graphical interface for monitoring and control purposes.
An extensive part of this package is devoted to the automatic retrieval (from disk or store to RAM) and arrangement of multi-dimensional distributed data sets. This functionality is encapsulated in a single funcion called `Start()`, which is explained in detail in the [**Start()**](inst/doc/start.md) documentation page and in `?Start`.
startR is an open source project that is open to external collaboration and funding, and will continuously evolve to support as many data set formats as possible while maximizing its efficiency.
### Installation
See the [**Deployment**](inst/doc/deployment.md) documentation page for details on the set up steps. The most relevant system dependencies are listed next:
- netCDF-4
- R with the startR, bigmemory and easyNCDF R packages
- For distributed computation:
- UNIX-based HPC (cluster of multi-processor nodes)
- a job scheduler (Slurm, PBS or LSF)
- EC-Flow >= 4.9.0
In order to install and load the latest published version of the package on CRAN, you can run the following lines in your R session:
```r
install.packages('startR')
library(startR)
```
Also, you can install the latest stable version from the GitLab repository as follows:
```r
devtools::install_git('https://earth.bsc.es/gitlab/es/startR')
```
### How it works
An overview example of how to process a large data set is shown in the following. You can see real use cases in the [**Practical guide for processing large data sets with startR**](inst/doc/practical_guide.md), and you can find more information on the use of the `Start()` function in the [**Start()**](inst/doc/start.md) documentation page, as well as in the documentation of the functions in the package.
The purpose of the example in this section is simply to illustrate how the user is expected to use startR once the framework is deployed on the workstation and HPC. It shows how a simple addition and averaging operation is performed on BSC's CTE-Power HPC, over a multi-dimensional climate data set, which lives in the BSC-ES storage infrastructure. As mentioned in the introduction, the user will need to declare the involved data sources, the workflow of operations to carry out, and the computing environment and parameters.
#### 1. Declaration of data sources
```r
library(startR)
# A path pattern is built
repos <- '/esarchive/exp/ecmwf/system5_m1/6hourly/$var$/$var$_$sdate$.nc'
# A Start() call is built with the indices to operate
data <- Start(dat = repos,
var = 'tas',
sdate = '20180101',
ensemble = 'all',
time = 'all',
latitude = indices(1:40),
longitude = indices(1:40),
retrieve = FALSE)
```
#### 2. Declaration of the workflow
```r
# The function to be applied is defined.
# It only operates on the essential 'target' dimensions.
fun <- function(x) {
# Expected inputs:
# x: array with dimensions ('ensemble', 'time')
apply(x + 1, 2, mean)
# Outputs:
# single array with dimensions ('time')
}
# A startR Step is defined, specifying its expected input and
# output dimensions.
step <- Step(fun,
target_dims = c('ensemble', 'time'),
output_dims = c('time'))
# The workflow of operations is cast before execution.
wf <- AddStep(data, step)
```
#### 3. Declaration of the HPC platform and execution
```r
res <- Compute(wf,
chunks = list(latitude = 2,
longitude = 2),
threads_load = 1,
threads_compute = 2,
cluster = list(queue_host = 'p9login1.bsc.es',
queue_type = 'slurm',
temp_dir = '/gpfs/scratch/bsc32/bsc32473/startR_tests/',
lib_dir = '/gpfs/projects/bsc32/share/R_libs/3.5/',
r_module = 'R/3.5.0-foss-2018b',
cores_per_job = 2,
job_wallclock = '00:10:00',
max_jobs = 4,
extra_queue_params = list('#SBATCH --qos=bsc_es'),
bidirectional = FALSE,
polling_period = 10
),
ecflow_suite_dir = '/home/Earth/nmanuben/test_startR/',
wait = TRUE)
```
#### 4. Monitoring the execution
During the execution of the workflow, which is orchestrated by EC-Flow and a job scheduler (either Slurm, LSF or PBS), the status can be monitored using the EC-Flow graphical user interface. Pending tasks are coloured in blue, ongoing in green, and finished in yellow.
#### 5. Profiling of the execution
Additionally, profiling measurements of the execution are provided together with the output data. Such measurements can be visualized with the `PlotProfiling()` function made available in the source code of the startR package.
This function has not been included as part of the official set of functions of the package because it requires a number of extense plotting libraries which take time to load and, since the startR package is loaded in each of the worker jobs on the HPC or cluster, this could imply a substantial amount of time spent in repeatedly loading unused visualization libraries during the computing stage.
```r
source('https://earth.bsc.es/gitlab/es/startR/raw/master/inst/PlotProfiling.R')
PlotProfiling(attr(res, 'startR_compute_profiling'))
```
You can click on the image to expand it.
startR-develop-explore-enh/inst/ 0000775 0000000 0000000 00000000000 13424417765 0017207 5 ustar 00root root 0000000 0000000 startR-develop-explore-enh/inst/PlotProfiling.R 0000664 0000000 0000000 00000024536 13424417765 0022134 0 ustar 00root root 0000000 0000000 PlotProfiling <- function(configs, n_test = 1, file_name = NULL,
config_names = NULL, items = NULL,
total_timings = TRUE,
ideal_timings = FALSE,
crop = NULL, subtitle = NULL) {
check_package <- function(x) {
if (!(x %in% installed.packages())) {
stop("This function requires ", x, " to be installed.")
}
}
sapply(c('reshape2', 'ggplot2', 'gridExtra'), check_package)
library(reshape2)
library(ggplot2)
library(gridExtra)
gglegend <- function(x) {
tmp <- ggplot_gtable(ggplot_build(x))
leg <- which(sapply(tmp$grobs, function(y) y$name) == "guide-box")
tmp$grobs[[leg]]
}
# Check configs
if (is.list(configs)) {
if (!is.list(configs[[1]])) {
configs <- list(setNames(list(configs), Sys.time()))
} else {
if (!is.list(configs[[1]][[1]])) {
configs <- list(configs)
}
}
} else {
stop("Expected one or a list of 'startR_compute_profiling' objects in configs.")
}
# Check config names
if (is.null(config_names)) {
config_names <- paste0('config_', 1:length(configs))
}
# Check items
if (is.null(items)) {
items <- c('bychunks_setup', 'transfer', 'all_chunks', 'queue', 'job_setup',
'load', 'compute', 'transfer_back', 'merge')
}
items <- c('nchunks', 'concurrent_chunks', 'cores_per_job', 'threads_load',
'threads_compute', items, 'total', 'all_chunks')
all_timings <- NULL
config_total_times <- NULL
config_long_names <- NULL
config_index <- 1
for (timings in configs) {
#config_name <- config_name[length(config_name)]
#config_name <- strsplit(config_name, '.timings')[[1]]
#config_name <- config_name[1]
#config_name <- strsplit(config, 'tests/')[[1]]
#config_name <- config_name[length(config_name)]
#timings <- readRDS(config)
dates <- names(timings)
if (n_test > length(timings)) {
selected_sample <- 1
} else {
selected_sample <- length(timings) + 1 - n_test
}
timings <- timings[[selected_sample]]
# crop_value <- 400
# timings[['total']] <- sapply(timings[['total']], function(x) min(crop_value, x))
# timings[['queue']] <- sapply(timings[['queue']], function(x) min(crop_value, x))
config_name <- config_names[config_index]
config_long_name <- paste0('\n', config_name,
'\nDate: ', dates[selected_sample],
'\nN. chunks: ', timings[['nchunks']],
'\nMax. jobs: ', timings[['concurrent_chunks']],
'\nAsk cores: ', timings[['cores_per_job']],
'\nLoad thr: ', timings[['threads_load']],
'\nComp. thr: ', timings[['threads_compute']],
'\n')
config_long_names <- c(config_long_names, config_long_name)
config_total_times <- c(config_total_times, timings[['total']])
timings <- as.data.frame(timings)
t_all_chunks <- timings[['total']] - timings[['bychunks_setup']] - timings[['merge']] -
timings[['merge']]
if (!is.na(timings[['transfer_back']])) {
t_all_chunks <- t_all_chunks - timings[['transfer_back']]
} else {
#EEP
}
timings$all_chunks <- t_all_chunks
timings <- timings[items[which(items %in% names(timings))]]
if (ideal_timings) {
timings[['T - [q] * N / M']] <- timings[['total']] -
mean(timings[['queue']]) * timings[['nchunks']] / timings[['concurrent_chunks']]
timings[['b_s + ([j_s] + [l] + [c]) * N / M + m']] <- timings[['bychunks_setup']] +
(mean(timings[['job_setup']]) + mean(timings[['load']]) +
mean(timings[['compute']])) * timings[['nchunks']] /
timings[['concurrent_chunks']] + timings[['merge']]
}
timings$config <- config_long_name
#timings$confign <- config_index
timings <- melt(timings, id.vars = c('config'))
if (is.null(all_timings)) {
all_timings <- timings
} else {
all_timings <- rbind(all_timings, timings)
}
config_index <- config_index + 1
}
if (!is.null(crop)) {
all_timings$value <- sapply(all_timings$value, function(x) min(crop, x))
}
a <- as.factor(all_timings$config)
all_timings$config <- a
#new_levels <- levels(a)[order(nchar(levels(a)), levels(a))]
new_levels <- config_long_names
all_timings$config <- factor(all_timings$config, levels = new_levels)
cols <- colorRampPalette(RColorBrewer::brewer.pal(9, "Set1"))
myPal <- cols(length(configs))
items_total <- c('total')
if (ideal_timings) {
items_total <- c(items_total, 'T - [q] * N / M')
}
timings_total <- subset(all_timings, variable %in% items_total)
if (is.null(subtitle)) {
n_lines_subtitle <- 0
} else {
n_lines_subtitle <- length(strsplit(subtitle, "\n")[[1]])
}
plot_total <- ggplot(timings_total, aes(x = config,
y = value, fill = config, label = round(value))) +
geom_bar(stat = 'summary', fun.y = 'mean') + facet_wrap(~variable, nrow = 1) +
#geom_text(angle = 90, nudge_y = -10) +
labs(y = 'time (s)',
title = ' ',
subtitle = paste0(rep("\n", n_lines_subtitle), collapse = '')) +
guides(fill = guide_legend(title = 'configurations')) +
theme(axis.title.x = element_blank(),
axis.text.x = element_blank(),
axis.ticks.x = element_blank()) +
scale_fill_manual(values = myPal)
if (ideal_timings) {
items_ideal <- c('b_s + ([j_s] + [l] + [c]) * N / M + m')
timings_ideal <- subset(all_timings, variable %in% items_ideal)
plot_ideal <- ggplot(timings_ideal, aes(x = config, y = value, fill = config)) +
geom_bar(stat = 'summary', fun.y = 'mean') +
facet_wrap(~variable, nrow = 1) +
labs(y = 'time (s)',
title = ' ') +
guides(fill = guide_legend(title = 'configurations')) +
theme(axis.title.x = element_blank(),
axis.text.x = element_blank(),
axis.ticks.x = element_blank()) +
scale_fill_manual(values = myPal)
}
items_general <- items[which(items %in% c('bychunks_setup', 'transfer', 'all_chunks', 'merge'))]
timings_general <- subset(all_timings, variable %in% items_general)
plot_general <- ggplot(timings_general, aes(x = config, y = value, fill = config)) +
geom_bar(stat = 'summary', fun.y = 'mean') + facet_wrap(~variable, nrow = 1) +
labs(y = 'time (s)',
title = 'startR::Compute profiling',
subtitle = subtitle) +
guides(fill = guide_legend(title = 'configurations')) +
theme(axis.title.x = element_blank(),
axis.text.x = element_blank(),
axis.ticks.x = element_blank()) +
scale_fill_manual(values = myPal)
items_chunk <- items[which(items %in% c('queue', 'job_setup', 'load', 'compute', 'transfer_back'))]
timings_chunk <- subset(all_timings, variable %in% items_chunk)
plot_chunk <- ggplot(timings_chunk, aes(x = config, y = value, fill = config)) +
geom_boxplot() + facet_wrap(~variable, nrow = 1) +
labs(y = 'time (s)',
title = 'summary of performance of all chunks') +
# subtitle = subtitle) +
guides(fill = guide_legend(title = 'configurations', ncol = ceiling(length(configs) / 10))) +
theme(axis.title.x = element_blank(),
axis.text.x = element_blank(),
axis.ticks.x = element_blank()) +
scale_fill_manual(values = myPal)
legend_cols <- ceiling(length(configs) / 10)
legend_rows <- ceiling(length(configs) / legend_cols)
if (legend_rows > 8) {
height <- 30
} else if (legend_rows > 6) {
height <- 25
} else if (legend_rows > 4) {
height <- 20
} else {
height <- 15
}
width <- 25 + 5 * legend_cols
if (!total_timings) {
if (!ideal_timings) {
plot <- list(plot_general + guides(fill = FALSE),
plot_chunk + guides(fill = FALSE),
gglegend(plot_chunk),
#top = 'startR::Compute() profiling',
widths = c(3, legend_cols),
layout_matrix = rbind(c(1, 3),
c(2, 3)))
} else {
extra <- legend_cols - 1
width <- width + (legend_cols + 1) * 6
plot <- list(plot_general + guides(fill = FALSE),
plot_ideal + guides(fill = FALSE),
plot_chunk + guides(fill = FALSE),
gglegend(plot_chunk),
#top = 'startR::Compute() profiling',
widths = c(0.7 + extra / 4, 0.3 + extra / 4,
3 + extra / 2, legend_cols),
layout_matrix = rbind(c(1, 1, 1, 4),
c(2, 3, 3, 4)))
}
} else {
if (!ideal_timings) {
plot <- list(plot_total + guides(fill = FALSE),
plot_general + guides(fill = FALSE),
plot_chunk + guides(fill = FALSE),
gglegend(plot_chunk),
#top = 'startR::Compute() profiling',
widths = c(1, 3, legend_cols),
layout_matrix = rbind(c(1, 2, 4),
c(3, 3, 4)))
} else {
extra <- legend_cols - 1
width <- width + (legend_cols + 1) * 6
plot <- list(plot_total + guides(fill = FALSE),
plot_general + guides(fill = FALSE),
plot_ideal + guides(fill = FALSE),
plot_chunk + guides(fill = FALSE),
gglegend(plot_chunk),
#top = 'startR::Compute() profiling',
widths = c(0.7 + extra / 4, 0.3 + extra / 4,
3 + extra / 2, legend_cols),
layout_matrix = rbind(c(1, 1, 2, 5),
c(3, 4, 4, 5)))
}
}
if (!is.null(file_name)) {
plot <- do.call('arrangeGrob', plot)
ggsave(file_name, plot, units = 'cm', width = width, height = height)
} else {
do.call('grid.arrange', plot)
}
}
startR-develop-explore-enh/inst/chunking/ 0000775 0000000 0000000 00000000000 13424417765 0021015 5 ustar 00root root 0000000 0000000 startR-develop-explore-enh/inst/chunking/Chunk.ecf 0000664 0000000 0000000 00000000542 13424417765 0022545 0 ustar 00root root 0000000 0000000 include_queue_header
#module purge
date --rfc-3339=seconds > %REMOTE_ECF_HOME%/%ECF_NAME%.setup_time
include_init_commands
%include "./head.h"
include_module_load
set -vx
cd %REMOTE_ECF_HOME%
task_path=%REMOTE_ECF_HOME%/%ECF_NAME%
Rscript load_process_save_chunk.R --args $task_path insert_indices
#include_transfer_back_and_rm
%include "./tail.h"
startR-develop-explore-enh/inst/chunking/head.h 0000664 0000000 0000000 00000004556 13424417765 0022101 0 ustar 00root root 0000000 0000000 #!/bin/bash
if [[ %REPORT_BACK% == "TRUE" ]] ; then
if [[ %BIDIRECTIONAL% == "TRUE" ]] ; then
_ecf_report_back_() {
rsync -rav %REMOTE_ECF_HOME%/%SUITE%/%FAMILY%/ %EC_HOST_FULL%:%ECF_HOME%/%SUITE%/%FAMILY%/
if [[ $(ls %REMOTE_ECF_HOME%/%RESULT_FILE_ID% 2>/dev/null | wc -l) -ge 1 ]] ; then
scp %REMOTE_ECF_HOME%/%RESULT_FILE_ID% %EC_HOST_FULL%:%ECF_HOME%/
rm -f %REMOTE_ECF_HOME%/%RESULT_FILE_ID%
fi
}
fi
fi
set -e # stop the shell on first error
set -u # fail when using an undefined variable
#module load ecFlow/%ECF_VERSION%-foss-2015a
set -x # echo script lines as they are executed
if [[ %BIDIRECTIONAL% == "TRUE" ]] ; then
# Defines the variables that are needed for any communication with ECF
export ECF_PORT=%ECF_PORT% # The server port number
export ECF_HOST=%EC_HOST_FULL% # The host name where the server is running
export ECF_NAME=%ECF_NAME% # The name of this current task
export ECF_PASS=%ECF_PASS% # A unique password
export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task
export ECF_RID=$$ # record the process id. Also used for zombie detection
# Define the path where to find ecflow_client
# make sure client and server use the *same* version.
# Important when there are multiple versions of ecFlow
export PATH=/usr/local/bin:$PATH
# Tell ecFlow we have started
ecflow_client --init=$$
else
running_file=$(echo "%RESULT_FILE_ID%" | sed -e 's/*_/x_/g' | sed -e 's/*/.running/g')
touch %REMOTE_ECF_HOME%/$running_file
fi
# Define a error handler
ERROR() {
if [[ %BIDIRECTIONAL% == "FALSE" ]] ; then
err_file=$(echo "%RESULT_FILE_ID%" | sed -e 's/*_/x_/g' | sed -e 's/*/.crashed/g')
touch %REMOTE_ECF_HOME%/$err_file
fi
if [ "$(type -t _ecf_report_back_)" = function ] ; then
_ecf_report_back_
fi
set +e # Clear -e flag, so we don't fail
wait # wait for background process to stop
if [[ %BIDIRECTIONAL% == "TRUE" ]] ; then
ecflow_client --abort=trap # Notify ecFlow that something went wrong
fi
trap 0 # Remove the trap
exit 0 # End the script
}
# Trap any calls to exit and errors caught by the -e flag
trap ERROR 0
# Trap any signal that may cause the script to fail
trap '{ echo "Killed by a signal"; ERROR ; }' 1 2 3 4 5 6 7 8 10 12 13 15
startR-develop-explore-enh/inst/chunking/load_process_save_chunk.R 0000664 0000000 0000000 00000010435 13424417765 0026026 0 ustar 00root root 0000000 0000000 lib_dir <-
if (!is.null(lib_dir)) {
if (!dir.exists(lib_dir)) {
stop("The specified 'lib_dir' does not exist.")
}
.libPaths(new = lib_dir)
}
library(startR)
out_dir <-
debug <-
start_calls <-
start_calls_attrs <-
param_dimnames <-
fun <-
params <-
threads_load <-
threads_compute <-
task_path <- commandArgs(TRUE)[2]
args <- as.integer(commandArgs(TRUE)[-c(1, 2)])
total_specified_dims <- length(args) / 2
chunk_indices <- args[((1:total_specified_dims) - 1) * 2 + 1]
names(chunk_indices) <- param_dimnames
chunks <- as.list(args[((1:total_specified_dims) - 1) * 2 + 2])
names(chunks) <- param_dimnames
t_begin_queue <- as.POSIXct(readLines(paste0(task_path, '.submit_time'))[1])
t_begin_job_setup <- as.POSIXct(readLines(paste0(task_path, '.setup_time'))[1])
t_queue <- as.numeric(difftime(t_begin_job_setup, t_begin_queue, units = 'secs'))
t_end_job_setup <- Sys.time()
t_job_setup <- as.numeric(difftime(t_end_job_setup, t_begin_job_setup, units = 'secs'))
t_begin_load <- Sys.time()
data <- vector('list', length(start_calls))
for (input in 1:length(data)) {
start_call <- start_calls[[input]]
call_dims <- names(start_calls_attrs[[input]][['Dimensions']])
dims_to_alter <- which(call_dims %in% param_dimnames)
names_dims_to_alter <- call_dims[dims_to_alter]
# If any dimension comes from split dimensions
split_dims <- start_calls_attrs[[input]][['SplitDims']]
for (k in 1:length(split_dims)) {
if (any(names(split_dims[[k]]) %in% names_dims_to_alter)) {
chunks_split_dims <- rep(1, length(split_dims[[k]]))
names(chunks_split_dims) <- names(split_dims[[k]])
chunks_indices_split_dims <- chunks_split_dims
split_dims_to_alter <- which(names(split_dims[[k]]) %in% names_dims_to_alter)
chunks_split_dims[split_dims_to_alter] <- unlist(chunks[names(split_dims[[k]])[split_dims_to_alter]])
chunks_indices_split_dims[split_dims_to_alter] <- chunk_indices[names(split_dims[[k]])[split_dims_to_alter]]
start_call[[names(split_dims)[k]]] <- chunk(chunks_indices_split_dims, chunks_split_dims,
eval(start_call[[names(split_dims)[k]]]))
dims_to_alter_to_remove <- which(names_dims_to_alter %in% names(split_dims[[k]]))
if (length(dims_to_alter_to_remove) > 0) {
dims_to_alter <- dims_to_alter[-dims_to_alter_to_remove]
names_dims_to_alter <- names_dims_to_alter[-dims_to_alter_to_remove]
}
}
}
if (length(dims_to_alter) > 0) {
for (call_dim in names_dims_to_alter) {
start_call[[call_dim]] <- chunk(chunk_indices[call_dim], chunks[[call_dim]],
eval(start_call[[call_dim]]))
}
}
if (!('num_procs' %in% names(start_call))) {
start_call[['num_procs']] <- threads_load
}
data[[input]] <- eval(start_call)
}
t_end_load <- Sys.time()
t_load <- as.numeric(difftime(t_end_load, t_begin_load, units = 'secs'))
t_begin_compute <- Sys.time()
if (!is.null(attr(fun, 'UseLibraries'))) {
for (i in seq_along(attr(fun, 'UseLibraries'))) {
require(attr(fun, 'UseLibraries')[i], character.only = TRUE)
}
}
chunk_indices_apply <- setNames(as.integer(chunk_indices), names(chunk_indices))
chunk_indices_apply <- chunk_indices_apply[names(chunks)[which(chunks > 1)]]
Apply <- multiApply::Apply
res <- do.call("Apply",
c(
list(data,
target_dims = attr(fun, 'TargetDims'),
fun = fun,
output_dims = attr(fun, 'OutputDims'),
use_attributes = attr(fun, 'UseAttributes'),
extra_info = list(chunk_indices = chunk_indices_apply),
ncores = threads_compute),
params
)
)
rm(data)
gc()
for (component in names(res)) {
filename <- paste0(component, '__')
for (i in 1:total_specified_dims) {
filename <- paste0(filename, param_dimnames[i], '_', chunk_indices[i], '__')
}
saveRDS(res[[component]], file = paste0(out_dir, '/', filename, '.Rds'))
}
rm(res)
gc()
t_end_compute <- Sys.time()
t_compute <- as.numeric(difftime(t_end_compute, t_begin_compute, units = 'secs'))
timings <- c(queue = t_queue, job_setup = t_job_setup,
load = t_load, compute = t_compute)
saveRDS(timings, file = paste0(out_dir, '/', filename, '.timings'))
startR-develop-explore-enh/inst/chunking/lsf.h 0000664 0000000 0000000 00000000335 13424417765 0021753 0 ustar 00root root 0000000 0000000 #!/bin/bash
#BSUB -n %CORES_PER_JOB%
#BSUB -W %JOB_WALLCLOCK%
#BSUB -J %ECF_NAME%
#BSUB -eo %REMOTE_ECF_HOME%/%ECF_NAME%.%ECF_TRYNO%.err
#BSUB -oo %REMOTE_ECF_HOME%/%ECF_NAME%.%ECF_TRYNO%.out
include_extra_queue_params
startR-develop-explore-enh/inst/chunking/pbs.h 0000664 0000000 0000000 00000000347 13424417765 0021756 0 ustar 00root root 0000000 0000000 #!/bin/bash
##SBATCH -n %CORES_PER_JOB%
#PBS -l walltime=%JOB_WALLCLOCK%
##SBATCH -J %ECF_NAME%
#PBS -e %REMOTE_ECF_HOME%/%ECF_NAME%.%ECF_TRYNO%.err
#PBS -o %REMOTE_ECF_HOME%/%ECF_NAME%.%ECF_TRYNO%.out
include_extra_queue_params
startR-develop-explore-enh/inst/chunking/slurm.h 0000664 0000000 0000000 00000000345 13424417765 0022332 0 ustar 00root root 0000000 0000000 #!/bin/bash
#SBATCH -n %CORES_PER_JOB%
#SBATCH -t %JOB_WALLCLOCK%
#SBATCH -J %ECF_NAME%
#SBATCH -e %REMOTE_ECF_HOME%/%ECF_NAME%.%ECF_TRYNO%.err
#SBATCH -o %REMOTE_ECF_HOME%/%ECF_NAME%.%ECF_TRYNO%.out
include_extra_queue_params
startR-develop-explore-enh/inst/chunking/tail.h 0000664 0000000 0000000 00000000517 13424417765 0022122 0 ustar 00root root 0000000 0000000 if [ "$(type -t _ecf_report_back_)" = function ] ; then
_ecf_report_back_
fi
wait # wait for background process to stop
if [[ %BIDIRECTIONAL% == "TRUE" ]] ; then
ecflow_client --complete # Notify ecFlow of a normal end
fi
trap 0 # Remove all traps
exit 0 # End the shell
startR-develop-explore-enh/inst/doc/ 0000775 0000000 0000000 00000000000 13424417765 0017754 5 ustar 00root root 0000000 0000000 startR-develop-explore-enh/inst/doc/compute_profiling.png 0000664 0000000 0000000 00000404101 13424417765 0024207 0 ustar 00root root 0000000 0000000 PNG
IHDR
pHYs .# .#x?v IDATxy\T"
" )I.KiFRbim料B~H)[0ln 0?`x=qϙs}˝ü=@P $Ú D&