diff --git a/DESCRIPTION b/DESCRIPTION index e31050c2a7acb2c13b8a1fa1f3f38a300e4f3e29..3e7559afef68464e1ac142551409b295e1070e2f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -9,11 +9,10 @@ Description: The base apply function and its variants, as well as the related fu Depends: R (>= 3.2.0) Imports: - abind, - plyr, + abind, doParallel, - future, - foreach + foreach, + plyr License: LGPL-3 URL: https://earth.bsc.es/gitlab/ces/multiApply BugReports: https://earth.bsc.es/gitlab/ces/multiApply/issues diff --git a/NAMESPACE b/NAMESPACE index 3b439b398b51309d2c9fb5a3c6527c78b6f83c76..213333ab7465a0afec0002450ac272964a3c10e5 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,7 +1,7 @@ # Generated by roxygen2: do not edit by hand -importFrom(plyr, llply, splat) importFrom(abind, abind) -importFrom(future, availableCores) -importFrom(doParallel, registerDoParallel) importFrom(foreach, registerDoSEQ) +importFrom(doParallel, registerDoParallel) +importFrom(plyr, splat) +importFrom(plyr, llply) export(Apply) diff --git a/R/Apply.R b/R/Apply.R index c9d35ee1ce252d2484fcc5f8818d01d069a92489..b49b11d46d0a1af59d8a6384389d87e6a567f7f2 100644 --- a/R/Apply.R +++ b/R/Apply.R @@ -1,15 +1,15 @@ #' Wrapper for Applying Atomic Functions to Arrays. #' -#' The Apply function is an extension of the mapply function, which instead of taking lists of unidimensional objects as input, takes lists of multidimensional objects as input, which may have different numbers of dimensions and dimension lengths. The user can specify which dimensions of each array (or matrix) the function is to be applied over with the margins option. +#' This wrapper applies a given function, which takes N [multi-dimensional] arrays as inputs (which may have different numbers of dimensions and dimension lengths), and applies it to a list of N [multi-dimensional] arrays with at least as many dimensions as expected by the given function. The user can specify which dimensions of each array (or matrix) the function is to be applied over with the \code{margins} or \code{target_dims} option. A user can apply a function that receives (in addition to other helper parameters) 1 or more arrays as input, each with a different number of dimensions, and returns any number of multidimensional arrays. The target dimensions can be specified by their names. It is recommended to use this wrapper with multidimensional arrays with named dimensions. #' @param data A single object (vector, matrix or array) or a list of objects. They must be in the same order as expected by AtomicFun. -#' @param margins List of vectors containing the margins for the input objects to be split by. Or, if there is a single vector of margins specified and a list of objects in data, then the single set of margins is applied over all objects. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. If both margins and target_dims are specified, margins takes priority over target_dims. +#' @param target_dims List of vectors containing the dimensions to be input into AtomicFun for each of the objects in the data. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. This parameter is mandatory if margins is not specified. If both margins and target_dims are specified, margins takes priority over target_dims. #' @param AtomicFun Function to be applied to the arrays. #' @param ... Additional arguments to be used in the AtomicFun. -#' @param target_dims List of vectors containing the dimensions to be input into AtomicFun for each of the objects in the data. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. If both margins and target_dims are specified, margins takes priority over target_dims. -#' @param parallel Logical, should the function be applied in parallel. -#' @param ncores The number of cores to use for parallel computation. +#' @param output_dims Optional list of vectors containing the names of the dimensions to be output from the AtomicFun for each of the objects it returns (or a single vector if the function has only one output). +#' @param margins List of vectors containing the margins for the input objects to be split by. Or, if there is a single vector of margins specified and a list of objects in data, then the single set of margins is applied over all objects. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. If both margins and target_dims are specified, margins takes priority over target_dims. +#' @param ncores The number of multicore threads to use for parallel computation. #' @details When using a single object as input, Apply is almost identical to the apply function. For multiple input objects, the output array will have dimensions equal to the dimensions specified in 'margins'. -#' @return Array or matrix or vector resulting from AtomicFun. +#' @return List of arrays or matrices or vectors resulting from applying AtomicFun to data. #' @references Wickham, H (2011), The Split-Apply-Combine Strategy for Data Analysis, Journal of Statistical Software. #' @export #' @examples @@ -20,104 +20,567 @@ #' test_fun <- function(x, y, z) {((sum(x > z) / (length(x))) / #' (sum(y > z) / (length(y)))) * 100} #' margins = list(c(1, 2), c(1, 2), c(1,2)) -#' test <- Apply(data, margins, AtomicFun = "test_fun") -Apply <- function(data, margins = NULL, AtomicFun, ..., target_dims = NULL, parallel = FALSE, ncores = NULL) { +#' test <- Apply(data, margins = margins, AtomicFun = "test_fun") +Apply <- function(data, target_dims = NULL, AtomicFun, ..., output_dims = NULL, + margins = NULL, ncores = NULL) { + # Check data if (!is.list(data)) { data <- list(data) } + if (any(!sapply(data, is.numeric))) { + stop("Parameter 'data' must be one or a list of numeric objects.") + } + is_vector <- rep(FALSE, length(data)) + is_unnamed <- rep(FALSE, length(data)) + for (i in 1 : length(data)) { + if (is.null(dim(data[[i]]))) { + is_vector[i] <- TRUE + is_unnamed[i] <- TRUE + dim(data[[i]]) <- length(data[[i]]) + } + if (!is.null(names(dim(data[[i]])))) { + if (any(sapply(names(dim(data[[i]])), nchar) == 0)) { + stop("Dimension names of arrays in 'data' must be at least ", + "one character long.") + } + } else { + is_unnamed[i] <- TRUE + } + } + + # Check AtomicFun + if (is.character(AtomicFun)) { + try({AtomicFun <- get(AtomicFun)}, silent = TRUE) + if (!is.function(AtomicFun)) { + stop("Could not find the function '", AtomicFun, "'.") + } + } + if (!is.function(AtomicFun)) { + stop("Parameter 'AtomicFun' must be a function or a character string ", + "with the name of a function.") + } + if ('startR_step' %in% class(AtomicFun)) { + if (is.null(target_dims)) { + target_dims <- attr(AtomicFun, 'target_dims') + } + if (is.null(output_dims)) { + output_dims <- attr(AtomicFun, 'output_dims') + } + } + + # Check target_dims and margins + if (is.null(margins) && is.null(target_dims)) { + stop("One of 'margins' or 'target_dims' must be specified.") + } if (!is.null(margins)) { target_dims <- NULL } - if (!is.null(target_dims)) { + margins_names <- vector('list', length(data)) + target_dims_names <- vector('list', length(data)) + if (!is.null(margins)) { + # Check margins and build target_dims accordingly + if (!is.list(margins)) { + margins <- rep(list(margins), length(data)) + } + if (any(!sapply(margins, + function(x) is.character(x) || is.numeric(x) || is.null(x)))) { + stop("Parameter 'margins' must be one or a list of numeric or ", + "character vectors.") + } + duplicate_dim_specs <- sapply(margins, + function(x) { + length(unique(x)) != length(x) + }) + if (any(duplicate_dim_specs)) { + stop("Parameter 'margins' must not contain duplicated dimension ", + "specifications.") + } + target_dims <- vector('list', length(data)) + for (i in 1 : length(data)) { + if (length(margins[[i]]) > 0) { + if (is.character(unlist(margins[i]))) { + if (is.null(names(dim(data[[i]])))) { + stop("Parameter 'margins' contains dimension names, but ", + "some of the corresponding objects in 'data' do not have ", + "dimension names.") + } + margins2 <- margins[[i]] + margins2_new_num <- c() + for (j in 1 : length(margins2)) { + matches <- which(names(dim(data[[i]])) == margins2[j]) + if (length(matches) < 1) { + stop("Could not find dimension '", margins2[j], "' in ", i, + "th object provided in 'data'.") + } + margins2_new_num[j] <- matches[1] + } + margins_names[[i]] <- margins[[i]] + margins[[i]] <- margins2_new_num + } + if (!is.null(names(dim(data[[i]])))) { + target_dims_names[[i]] <- names(dim(data[[i]]))[- margins[[i]]] + } + target_dims[[i]] <- (1 : length(dim(data[[i]])))[- margins[[i]]] + } else { + target_dims[[i]] <- 1 : length(dim(data[[i]])) + if (!is.null(names(dim(data[[i]])))) { + target_dims_names[[i]] <- names(dim(data[[i]])) + } + } + } + } else { + # Check target_dims and build margins accordingly if (!is.list(target_dims)) { target_dims <- rep(list(target_dims), length(data)) } - if (is.character(unlist(target_dims[1]))) { - margins2 <- target_dims - for (i in 1 : length(data)) { - margins_new <- c() - for (j in 1 : length(margins2[[i]])) { - margins_new[j] <- which(names(dim(data[[i]])) == margins2[[i]][[j]]) + if (any(!sapply(target_dims, + function(x) is.character(x) || is.numeric(x)))) { + stop("Parameter 'target_dims' must be one or a list of numeric or ", + "character vectors.") + } + if (any(sapply(target_dims, length) == 0)) { + stop("Parameter 'target_dims' must not contain length-0 vectors.") + } + duplicate_dim_specs <- sapply(target_dims, + function(x) { + length(unique(x)) != length(x) + }) + if (any(duplicate_dim_specs)) { + stop("Parameter 'target_dims' must not contain duplicated dimension ", + "specifications.") + } + margins <- vector('list', length(data)) + for (i in 1 : length(data)) { + if (is.character(unlist(target_dims[i]))) { + if (is.null(names(dim(data[[i]])))) { + stop("Parameter 'target_dims' contains dimension names, but ", + "some of the corresponding objects in 'data' do not have ", + "dimension names.") + } + targs2 <- target_dims[[i]] + targs2_new_num <- c() + for (j in 1 : length(targs2)) { + matches <- which(names(dim(data[[i]])) == targs2[j]) + if (length(matches) < 1) { + stop("Could not find dimension '", targs2[j], "' in ", i, + "th object provided in 'data'.") + } + targs2_new_num[j] <- matches[1] } - target_dims[[i]] <- c(margins_new) + target_dims_names[[i]] <- target_dims[[i]] + target_dims[[i]] <- targs2_new_num } + if (!is.null(names(dim(data[[i]])))) { + margins_names[[i]] <- names(dim(data[[i]]))[- target_dims[[i]]] + } + margins[[i]] <- (1 : length(dim(data[[i]])))[- target_dims[[i]]] } - for (i in 1 : length(data)) { - if (is.unsorted(target_dims[[i]])) { - targ_dims <- sort(target_dims[[i]]) - marg_dims <- c(1 : length(dim(data[[i]])))[- target_dims[[i]]] - data[[i]] <- aperm(data[[i]], c(targ_dims, marg_dims)) - target_dims[[i]] <- 1 : length(targ_dims) + } + # Reorder dimensions of input data for target dims to be left-most + # and in the required order. + for (i in 1 : length(data)) { + if (length(target_dims[[i]]) > 0) { + if (is.unsorted(target_dims[[i]]) || + (max(target_dims[[i]]) > length(target_dims[[i]]))) { + marg_dims <- (1 : length(dim(data[[i]])))[- target_dims[[i]]] + data[[i]] <- .aperm2(data[[i]], c(target_dims[[i]], marg_dims)) + target_dims[[i]] <- 1 : length(target_dims[[i]]) + margins[[i]] <- (length(target_dims[[i]]) + 1) : length(dim(data[[i]])) } - margins[[i]] <- c(1 :length(dim(data[[i]])))[-c(target_dims[[i]])] } } - if (!is.null(margins)) { - if (!is.list(margins)) { - margins <- rep(list(margins), length(data)) + + # Check output_dims + if (!is.null(output_dims)) { + if (!is.list(output_dims)) { + output_dims <- list(output1 = output_dims) + } + if (any(sapply(output_dims, function(x) !(is.character(x) || is.null(x))))) { + stop("Parameter 'output_dims' must be one or a list of vectors of character strings (or NULLs).") + } + if (is.null(names(output_dims))) { + names(output_dims) <- rep('', length(output_dims)) + } + missing_output_names <- which(sapply(names(output_dims), nchar) == 0) + if (length(missing_output_names) > 0) { + names(output_dims)[missing_output_names] <- paste0('output', missing_output_names) } } - if (is.character(unlist(margins[1])) && !is.null(margins)) { - margins2 <- margins - for (i in 1 : length(data)) { - margins_new <- c() - for (j in 1 : length(margins2[[i]])) { - margins_new[j] <- which(names(dim(data[[i]])) == margins2[[i]][[j]]) + + # Check ncores + if (is.null(ncores)) { + ncores <- 1 + } + if (!is.numeric(ncores)) { + stop("Parameter 'ncores' must be numeric.") + } + ncores <- round(ncores) + + # Consistency checks of margins of all input objects + # for each data array, add its margins to the list if not present. + # if there are unnamed margins in the list, check their size matches the margins being added + # and simply assing them a name + # those margins present, check that they match + # if unnamed margins, check consistency with found margins + # if more mrgins than found, add numbers to the list, without names + # with this we end up with a named list of margin sizes + # for data arrays with unnamed margins, we can assume their margins names are those of the first entries in the resulting list + all_found_margins_lengths <- afml <- list() + for (i in 1:length(data)) { + if (!is.null(margins_names[[i]])) { + if (length(afml) > 0) { + matches <- which(margins_names[[i]] %in% names(afml)) + if (length(matches) > 0) { + margs_to_add <- as.list(dim(data[[i]])[margins[[i]]][- matches]) + if (any(dim(data[[i]])[margins[[i]][matches]] != unlist(afml[margins_names[[i]][matches]]))) { + stop("Found one or more margin dimensions with the same name and ", + "different length in some of the input objects in 'data'.") + } + } else { + margs_to_add <- as.list(dim(data[[i]])[margins[[i]]]) + } + unnamed_margins <- which(sapply(names(afml), nchar) == 0) + if (length(unnamed_margins) > 0) { + stop_with_error <- FALSE + if (length(unnamed_margins) <= length(margs_to_add)) { + if (any(unlist(afml[unnamed_margins]) != unlist(margs_to_add[1:length(unnamed_margins)]))) { + stop_with_error <- TRUE + } + names(afml)[unnamed_margins] <- names(margs_to_add)[1:length(unnamed_margins)] + margs_to_add <- margs_to_add[- (1:length(margs_to_add))] + } else { + if (any(unlist(afml[unnamed_margins[1:length(margs_to_add)]]) != unlist(margs_to_add))) { + stop_with_error <- TRUE + } + names(afml)[unnamed_margins[1:length(margs_to_add)]] <- names(margs_to_add) + margs_to_add <- list() + } + if (stop_with_error) { + stop("Found unnamed margins (for some objects in parameter ", + "'data') that have been associated by their position to ", + "named margins in other objects in 'data' and do not have ", + "matching length. It could also be that the unnamed ", + "margins don not follow the same order as the named ", + "margins. In that case, either put the corresponding names ", + "to the dimensions of the objects in 'data', or put them ", + "in a consistent order.") + } + } + afml <- c(afml, margs_to_add) + } else { + afml <- as.list(dim(data[[i]])[margins[[i]]]) + } + } else { + margs_to_add <- as.list(dim(data[[i]])[margins[[i]]]) + names(margs_to_add) <- rep('', length(margs_to_add)) + if (length(afml) > 0) { + stop_with_error <- FALSE + if (length(afml) >= length(margs_to_add)) { + if (any(unlist(margs_to_add) != unlist(afml[1:length(margs_to_add)]))) { + stop_with_error <- TRUE + } + } else { + if (any(unlist(margs_to_add)[1:length(afml)] != unlist(afml))) { + stop_with_error <- TRUE + } + margs_to_add <- margs_to_add[- (1:length(afml))] + afml <- c(afml, margs_to_add) + } + if (stop_with_error) { + stop("Found unnamed margins (for some objects in parameter ", + "'data') that have been associated by their position to ", + "named margins in other objects in 'data' and do not have ", + "matching length. It could also be that the unnamed ", + "margins don not follow the same order as in other ", + "objects. In that case, either put the corresponding names ", + "to the dimensions of the objects in 'data', or put them ", + "in a consistent order.") + } + } else { + afml <- margs_to_add } - margins[[i]] <- c(margins_new) } } - if (!is.logical(parallel)) { - stop("parallel must be logical") + missing_margin_names <- which(names(afml) == '') + if (length(missing_margin_names) > 0) { + names(afml)[missing_margin_names] <- paste0('_unnamed_margin_', + 1:length(missing_margin_names), '_') } - names <- names(dim(data[[1]]))[margins[[1]]] - input <- list() - splatted_f <- splat(get(AtomicFun)) - if (!is.null(margins)) { - .isolate <- function(data, margin_length, drop = TRUE) { - eval(dim(environment()$data)) - structure(list(env = environment(), index = margin_length, subs = as.name("[")), - class = c("indexed_array")) + # afml is now a named list with the lenghts of all margins. Each margin + # appears once only. If some names are not provided, they are set automatically + # to 'unnamed_dim_1', 'unamed_dim_2', ... + + # Now need to check which margins are common for all the data arrays. + # Those will be used by llply. + # For the margins that are not common, we will need to iterate manually + # across them, and use data arrays repeatedly as needed. + margins_afml <- margins + for (i in 1:length(data)) { + if (!is.null(margins_names[[i]])) { + + margins_afml[[i]] <- sapply(margins_names[[i]], + function(x) { + sapply(x, + function(y) { + which(names(afml) == y) + } + ) + } + ) + } else if (length(margins_afml[[i]]) > 0) { + margins_afml[[i]] <- margins_afml[[i]] - min(margins_afml[[i]]) + 1 + # The missing margin and dim names are filled in. + margins_names[[i]] <- names(afml)[margins_afml[[i]]] + names(dim(data[[i]]))[margins[[i]]] <- margins_names[[i]] } - for (i in 1 : length(data)) { - margin_length <- lapply(dim(data[[i]]), function(x) 1 : x) - margin_length[-margins[[i]]] <- "" - margin_length <- expand.grid(margin_length, KEEP.OUT.ATTRS = FALSE, - stringsAsFactors = FALSE) - input[[i]] <- .isolate(data[[i]], margin_length) - } - dims <- dim(data[[1]])[margins[[1]]] - i_max <- length(input[[1]])[1] / dims[[1]] - k <- length(input[[1]]) / i_max - if (parallel == TRUE) { - if (is.null(ncores)) { - ncores <- availableCores() - 1 + } + common_margs <- margins_afml[[1]] + if (length(margins_afml) > 1) { + for (i in 2:length(margins_afml)) { + margs_a <- unlist(afml[common_margs]) + margs_b <- unlist(afml[margins_afml[[i]]]) + matches <- which(names(margs_a) %in% names(margs_b)) + if (length(matches) > 0) { + common_margs <- common_margs[matches] } else { - ncores <- min(availableCores() - 1, ncores) + common_margs <- NULL } - registerDoParallel(ncores) } - WrapperFun <- llply(1 : i_max, function(i) - sapply((k * i - (k - 1)) : (k * i), function(x) splatted_f(lapply(input, `[[`, x),...), simplify = FALSE), - .parallel = parallel) - if (parallel == TRUE) { - registerDoSEQ() + } + non_common_margs <- 1:length(afml) + if (length(common_margs) > 0) { + non_common_margs <- non_common_margs[- common_margs] + } + # common_margs is now a numeric vector with the indices of the common + # margins (i.e. their position in afml) + # non_common_margs is now a numeric vector with the indices of the + # non-common margins (i.e. their position in afml) + + .isolate <- function(data, margin_length, drop = FALSE) { + eval(dim(environment()$data)) + structure(list(env = environment(), index = margin_length, + drop = drop, subs = as.name("[")), + class = c("indexed_array")) + } + .consolidate <- function(subsets, dimnames, out_dims) { + lapply(setNames(1:length(subsets), names(subsets)), + function(x) { + if (length(out_dims[[x]]) > 0) { + dims <- dim(subsets[[x]]) + if (!is_unnamed[x]) { + names(dims) <- dimnames[[x]] + } + dims <- dims[out_dims[[x]]] + array(subsets[[x]], dim = dims) + } else { + as.vector(subsets[[x]]) + } + }) + } + + data_indexed <- vector('list', length(data)) + data_indexed_indices <- vector('list', length(data)) + for (i in 1 : length(data)) { + margs_i <- which(names(dim(data[[i]])) %in% names(afml[c(non_common_margs, common_margs)])) + false_margs_i <- which(margs_i %in% target_dims[[i]]) + margs_i <- setdiff(margs_i, false_margs_i) + if (length(margs_i) > 0) { + margin_length <- lapply(dim(data[[i]]), function(x) 1 : x) + margin_length[- margs_i] <- "" + } else { + margin_length <- as.list(rep("", length(dim(data[[i]])))) } - if (is.null(dim(WrapperFun[[1]][[1]]))) { - WrapperFun <- array(as.numeric(unlist(WrapperFun)), dim=c(c(length((WrapperFun[[1]])[[1]])), - dim(data[[1]])[margins[[1]]])) + margin_length <- expand.grid(margin_length, KEEP.OUT.ATTRS = FALSE, + stringsAsFactors = FALSE) + data_indexed[[i]] <- .isolate(data[[i]], margin_length) + if (length(margs_i) > 0) { + data_indexed_indices[[i]] <- array(1:prod(dim(data[[i]])[margs_i]), + dim = dim(data[[i]])[margs_i]) } else { - WrapperFun <- array(as.numeric(unlist(WrapperFun)), dim=c(c(dim(WrapperFun[[1]][[1]])), - dim(data[[1]])[margins[[1]]])) + data_indexed_indices[[i]] <- array(1, dim = 1) } + } + + splatted_f <- splat(AtomicFun) + + # Iterate along all non-common margins + if (length(c(non_common_margs, common_margs)) > 0) { + marg_inds_ordered <- sort(c(non_common_margs, common_margs)) + margins_array <- ma <- array(1:prod(unlist(afml[marg_inds_ordered])), + dim = unlist(afml[marg_inds_ordered])) + } else { + ma <- array(1) + } + arrays_of_results <- NULL + found_first_result <- FALSE + + total_size <- prod(dim(ma)) + if (!is.null(ncores)) { + chunk_size <- round(total_size / (ncores * 4)) } else { - WrapperFun <- splatted_f(data, ...) + chunk_size <- 4 } - if (!is.null(dim(WrapperFun))) { - names(dim(WrapperFun))[(length(dim(WrapperFun)) - length(names) + 1) : length(dim(WrapperFun))] <- c(names) + if (chunk_size < 1) { + chunk_size <- 1 + } + nchunks <- floor(total_size / chunk_size) + chunk_sizes <- rep(chunk_size, nchunks) + if (total_size %% chunk_size != 0) { + chunk_sizes <- c(chunk_sizes, total_size %% chunk_size) + } + +# need to add progress bar + iteration <- function(m) { + sub_arrays_of_results <- list() + found_first_sub_result <- FALSE + for (n in 1:chunk_sizes[m]) { + # j is the index of the data piece to load in data_indexed + j <- n + (m - 1) * chunk_size + marg_indices <- arrayInd(j, dim(ma)) + names(marg_indices) <- names(dim(ma)) + input <- list() + # Each iteration of n, the variable input is populated with sub-arrays for + # each object in data (if possible). For each set of 'input's, the + # splatted_f is applied in parallel if possible. + for (i in 1:length(data_indexed)) { + inds_to_take <- which(names(marg_indices) %in% names(dim(data_indexed_indices[[i]]))) + if (length(inds_to_take) > 0) { + marg_inds_to_take <- marg_indices[inds_to_take][names(dim(data_indexed_indices[[i]]))] + input[[i]] <- data_indexed[[i]][[do.call("[", + c(list(x = data_indexed_indices[[i]]), marg_inds_to_take, + list(drop = TRUE)))]] + } else { + input[[i]] <- data_indexed[[i]][[1]] + } + } + result <- splatted_f(.consolidate(input, lapply(lapply(data, dim), names), + target_dims), ...) + if (!is.list(result)) { + result <- list(result) + } + if (!found_first_sub_result) { + sub_arrays_of_results <- vector('list', length(result)) + if (!is.null(output_dims)) { + if (length(output_dims) != length(sub_arrays_of_results)) { + stop("The 'AtomicFun' returns ", length(sub_arrays_of_results), + " elements, but ", length(output_dims), + " elements were expected.") + } + names(sub_arrays_of_results) <- names(output_dims) + } else if (!is.null(names(result))) { + names(sub_arrays_of_results) <- names(result) + } else { + names(sub_arrays_of_results) <- paste0('output', 1:length(result)) + } + } + atomic_fun_out_dims <- vector('list', length(result)) + for (component in 1:length(result)) { + if (is.null(dim(result[[component]]))) { + if (length(result[[component]]) == 1) { + component_dims <- NULL + } else { + component_dims <- length(result[[component]]) + } + } else { + component_dims <- dim(result[[component]]) + } + if (!found_first_sub_result) { + sub_arrays_of_results[[component]] <- array(dim = c(component_dims, chunk_sizes[m])) + } + if (!is.null(component_dims)) { + atomic_fun_out_dims[[component]] <- component_dims + } + sub_arrays_of_results[[component]][(1:prod(component_dims)) + + (n - 1) * prod(component_dims)] <- result[[component]] + } + if (!found_first_sub_result) { + found_first_sub_result <- TRUE + } + if (!is.null(output_dims)) { + # Check number of outputs. + if (length(output_dims) != length(result)) { + stop("Expected AtomicFun to return ", length(output_dims), " components, ", + "but ", length(result), " found.") + } + # Check number of output dimensions is correct. + for (component in 1:length(result)) { + if (length(atomic_fun_out_dims[[component]]) != length(output_dims[[component]])) { + stop("Expected ", component, "st returned element by 'AtomicFun' ", + "to have ", length(output_dims[[component]]), " dimensions, ", + "but ", length(atomic_fun_out_dims[[component]]), " found.") + } + } + } + } + sub_arrays_of_results } - out <- WrapperFun -} + # Execute in parallel if needed + parallel <- ncores > 1 + if (parallel) registerDoParallel(ncores) + result <- llply(1:length(chunk_sizes), iteration, .parallel = parallel) + if (parallel) registerDoSEQ() + # Merge the results + chunk_length <- NULL + fun_out_dims <- vector('list', length(result[[1]])) + for (m in 1:length(result)) { + if (!found_first_result) { + arrays_of_results <- vector('list', length(result[[1]])) + if (!is.null(output_dims)) { + if (length(output_dims) != length(arrays_of_results)) { + stop("The 'AtomicFun' returns ", length(arrays_of_results), " elements, but ", + length(output_dims), " elements were expected.") + } + names(arrays_of_results) <- names(output_dims) + } else if (!is.null(names(result[[1]]))) { + names(arrays_of_results) <- names(result[[1]]) + } else { + names(arrays_of_results) <- paste0('output', 1:length(result[[1]])) + } + } + for (component in 1:length(result[[m]])) { + component_dims <- dim(result[[m]][[component]]) + if (!found_first_result) { + if (length(component_dims) > 1) { + fun_out_dims[[component]] <- component_dims[- length(component_dims)] + } + arrays_of_results[[component]] <- array(dim = c(fun_out_dims[[component]], + dim(ma))) + dimnames_to_remove <- which(grepl('^_unnamed_margin_', + names(dim(arrays_of_results[[component]])))) + if (length(dimnames_to_remove) > 0) { + names(dim(arrays_of_results[[component]]))[dimnames_to_remove] <- rep('', length(dimnames_to_remove)) + } + if (all(names(dim(arrays_of_results[[component]])) == '')) { + names(dim(arrays_of_results[[component]])) <- NULL + } + chunk_length <- prod(component_dims) + } + arrays_of_results[[component]][(1:prod(component_dims)) + + (m - 1) * chunk_length] <- result[[m]][[component]] + } + if (!found_first_result) { + found_first_result <- TRUE + } + #if (!is.null(output_dims)) { + # # Check number of output dimensions is correct. + # for (component in 1:length(atomic_fun_out_dims)) { + # if (!is.null(names(fun_out_dims[[component]]))) { + # # check component_dims match names of output_dims[[component]], and reorder if needed + # } + # } + #} + } + # Assign 'output_dims' as dimension names if possible + if (!is.null(output_dims)) { + for (component in 1:length(output_dims)) { + if (length(output_dims[[component]]) > 0) { + names(dim(arrays_of_results[[component]]))[1:length(output_dims[[component]])] <- output_dims[[component]] + } + } + } + # Return + arrays_of_results +} diff --git a/R/zzz.R b/R/zzz.R new file mode 100644 index 0000000000000000000000000000000000000000..3e04077786b652e7a1db6acb4d375fc853115fbd --- /dev/null +++ b/R/zzz.R @@ -0,0 +1,143 @@ +# Function to permute arrays of non-atomic elements (e.g. POSIXct) +.aperm2 <- function(x, new_order) { + y <- array(1:length(x), dim = dim(x)) + y <- aperm(y, new_order) + old_dims <- dim(x) + x <- x[as.vector(y)] + dim(x) <- old_dims[new_order] + x +} + +# This function is a helper for the function .MergeArrays. +# It expects as inputs two named numeric vectors, and it extends them +# with dimensions of length 1 until an ordered common dimension +# format is reached. +.MergeArrayDims <- function(dims1, dims2) { + new_dims1 <- c() + new_dims2 <- c() + while (length(dims1) > 0) { + if (names(dims1)[1] %in% names(dims2)) { + pos <- which(names(dims2) == names(dims1)[1]) + dims_to_add <- rep(1, pos - 1) + if (length(dims_to_add) > 0) { + names(dims_to_add) <- names(dims2[1:(pos - 1)]) + } + new_dims1 <- c(new_dims1, dims_to_add, dims1[1]) + new_dims2 <- c(new_dims2, dims2[1:pos]) + dims1 <- dims1[-1] + dims2 <- dims2[-c(1:pos)] + } else { + new_dims1 <- c(new_dims1, dims1[1]) + new_dims2 <- c(new_dims2, 1) + names(new_dims2)[length(new_dims2)] <- names(dims1)[1] + dims1 <- dims1[-1] + } + } + if (length(dims2) > 0) { + dims_to_add <- rep(1, length(dims2)) + names(dims_to_add) <- names(dims2) + new_dims1 <- c(new_dims1, dims_to_add) + new_dims2 <- c(new_dims2, dims2) + } + list(new_dims1, new_dims2) +} + +# This function takes two named arrays and merges them, filling with +# NA where needed. +# dim(array1) +# 'b' 'c' 'e' 'f' +# 1 3 7 9 +# dim(array2) +# 'a' 'b' 'd' 'f' 'g' +# 2 3 5 9 11 +# dim(.MergeArrays(array1, array2, 'b')) +# 'a' 'b' 'c' 'e' 'd' 'f' 'g' +# 2 4 3 7 5 9 11 +.MergeArrays <- function(array1, array2, along) { + if (!(is.null(array1) || is.null(array2))) { + if (!(identical(names(dim(array1)), names(dim(array2))) && + identical(dim(array1)[-which(names(dim(array1)) == along)], + dim(array2)[-which(names(dim(array2)) == along)]))) { + new_dims <- .MergeArrayDims(dim(array1), dim(array2)) + dim(array1) <- new_dims[[1]] + dim(array2) <- new_dims[[2]] + for (j in 1:length(dim(array1))) { + if (names(dim(array1))[j] != along) { + if (dim(array1)[j] != dim(array2)[j]) { + if (which.max(c(dim(array1)[j], dim(array2)[j])) == 1) { + na_array_dims <- dim(array2) + na_array_dims[j] <- dim(array1)[j] - dim(array2)[j] + na_array <- array(dim = na_array_dims) + array2 <- abind(array2, na_array, along = j) + names(dim(array2)) <- names(na_array_dims) + } else { + na_array_dims <- dim(array1) + na_array_dims[j] <- dim(array2)[j] - dim(array1)[j] + na_array <- array(dim = na_array_dims) + array1 <- abind(array1, na_array, along = j) + names(dim(array1)) <- names(na_array_dims) + } + } + } + } + } + if (!(along %in% names(dim(array2)))) { + stop("The dimension specified in 'along' is not present in the ", + "provided arrays.") + } + array1 <- abind(array1, array2, along = which(names(dim(array1)) == along)) + names(dim(array1)) <- names(dim(array2)) + } else if (is.null(array1)) { + array1 <- array2 + } + array1 +} + +# Takes as input a list of arrays. The list must have named dimensions. +.MergeArrayOfArrays <- function(array_of_arrays) { + MergeArrays <- .MergeArrays + array_dims <- (dim(array_of_arrays)) + dim_names <- names(array_dims) + + # Merge the chunks. + for (dim_index in 1:length(dim_names)) { + dim_sub_array_of_chunks <- dim_sub_array_of_chunk_indices <- NULL + if (dim_index < length(dim_names)) { + dim_sub_array_of_chunks <- array_dims[(dim_index + 1):length(dim_names)] + names(dim_sub_array_of_chunks) <- dim_names[(dim_index + 1):length(dim_names)] + dim_sub_array_of_chunk_indices <- dim_sub_array_of_chunks + sub_array_of_chunk_indices <- array(1:prod(dim_sub_array_of_chunk_indices), + dim_sub_array_of_chunk_indices) + } else { + sub_array_of_chunk_indices <- NULL + } + sub_array_of_chunks <- vector('list', prod(dim_sub_array_of_chunks)) + dim(sub_array_of_chunks) <- dim_sub_array_of_chunks + for (i in 1:prod(dim_sub_array_of_chunks)) { + if (!is.null(sub_array_of_chunk_indices)) { + chunk_sub_indices <- which(sub_array_of_chunk_indices == i, arr.ind = TRUE)[1, ] + } else { + chunk_sub_indices <- NULL + } + for (j in 1:(array_dims[dim_index])) { + new_chunk <- do.call('[[', c(list(x = array_of_arrays), + as.list(c(j, chunk_sub_indices)))) + if (is.null(new_chunk)) { + stop("Chunks missing.") + } + if (is.null(sub_array_of_chunks[[i]])) { + sub_array_of_chunks[[i]] <- new_chunk + } else { + sub_array_of_chunks[[i]] <- MergeArrays(sub_array_of_chunks[[i]], + new_chunk, + dim_names[dim_index]) + } + } + } + array_of_arrays <- sub_array_of_chunks + rm(sub_array_of_chunks) + gc() + } + + array_of_arrays[[1]] +} diff --git a/man/Apply.Rd b/man/Apply.Rd index 7cf39f2954fdb3dbdbc934d2b74d75df4ff6e7fe..2f69b6c3345ea6ed004379bc886368ce9fdcc546 100644 --- a/man/Apply.Rd +++ b/man/Apply.Rd @@ -4,48 +4,42 @@ \alias{Apply} \title{Wrapper for Applying Atomic Functions to Arrays.} \usage{ -Apply(data, margins = NULL, AtomicFun, ..., target_dims = NULL, parallel = FALSE, - ncores = NULL) +Apply(data, target_dims = NULL, AtomicFun, ..., output_dims = NULL, + margins = NULL, ncores = NULL) } \arguments{ \item{data}{A single object (vector, matrix or array) or a list of objects. They must be in the same order as expected by AtomicFun.} -\item{margins}{List of vectors containing the margins for the input objects to be split by. Or, if there is a single vector of margins specified and a list of objects in data, then the single set of margins is applied over all objects. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. If both margins and target_dims are specified, margins takes priority over target_dims.} +\item{target_dims}{List of vectors containing the dimensions to be input into AtomicFun for each of the objects in the data. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. This parameter is mandatory if margins is not specified. If both margins and target_dims are specified, margins takes priority over target_dims.} \item{AtomicFun}{Function to be applied to the arrays.} \item{...}{Additional arguments to be used in the AtomicFun.} -\item{target_dims}{List of vectors containing the dimensions to be input into AtomicFun for each of the objects in the data. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. If both margins and target_dims are specified, margins takes priority over target_dims.} +\item{output_dims}{Optional list of vectors containing the names of the dimensions to be output from the AtomicFun for each of the objects it returns (or a single vector if the function has only one output).} -\item{parallel}{Logical, should the function be applied in parallel.} +\item{margins}{List of vectors containing the margins for the input objects to be split by. Or, if there is a single vector of margins specified and a list of objects in data, then the single set of margins is applied over all objects. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. If both margins and target_dims are specified, margins takes priority over target_dims.} -\item{ncores}{The number of cores to use for parallel computation.} +\item{ncores}{The number of multicore threads to use for parallel computation.} } \value{ -Array or matrix or vector resulting from AtomicFun. +List of arrays or matrices or vectors resulting from applying AtomicFun to data. } \description{ -Takes lists of multidimensional objects as input, which may have different numbers of dimensions and dimension lengths. The user can specify which dimensions of each array (or matrix) the function is to be applied over with the margins option. +This wrapper applies a given function, which takes N [multi-dimensional] arrays as inputs (which may have different numbers of dimensions and dimension lengths), and applies it to a list of N [multi-dimensional] arrays with at least as many dimensions as expected by the given function. The user can specify which dimensions of each array (or matrix) the function is to be applied over with the \code{margins} or \code{target_dims} option. A user can apply a function that receives (in addition to other helper parameters) 1 or more arrays as input, each with a different number of dimensions, and returns any number of multidimensional arrays. The target dimensions can be specified by their names. It is recommended to use this wrapper with multidimensional arrays with named dimensions. } \details{ -A user can apply a function that receives 1 or more objects as input, each with a different number of dimensions, and returns as a result a single array with any number of dimensions. +When using a single object as input, Apply is almost identical to the apply function. For multiple input objects, the output array will have dimensions equal to the dimensions specified in 'margins'. } \examples{ #Change in the rate of exceedance for two arrays, with different #dimensions, for some matrix of exceedances. -array_1 <- array(rnorm(2000), c(10,10,20)) # array with 20 timesteps -array_2 <- array(rnorm(1000), c(10, 10, 15)) # array with 15 timesteps -thresholds <- matrix(rnorm(100), 10, 10) # matrix of thresholds (no timesteps) - -# Function for calculating the change in the frequency of exceedances over the -#thresholds for array_1 relative to array_2 (percentage change). - -test_fun <- function(x, y, z) {(((sum(x > z) / (length(x))) / - (sum(y > z) / (length(y)))) * 100) - 100} -data = list(array_1, array_2, thresholds) +data = list(array(rnorm(2000), c(10,10,20)), array(rnorm(1000), c(10,10,10)), + array(rnorm(100), c(10, 10))) +test_fun <- function(x, y, z) {((sum(x > z) / (length(x))) / + (sum(y > z) / (length(y)))) * 100} margins = list(c(1, 2), c(1, 2), c(1,2)) -test <- Apply(data = data, margins = margins, AtomicFun = "test_fun") +test <- Apply(data, margins = margins, AtomicFun = "test_fun") } \references{ Wickham, H (2011), The Split-Apply-Combine Strategy for Data Analysis, Journal of Statistical Software.