#' Wrapper for Applying Atomic Functions to Arrays. #' #' This wrapper applies a given function, which takes N [multi-dimensional] arrays as inputs (which may have different numbers of dimensions and dimension lengths), and applies it to a list of N [multi-dimensional] arrays with at least as many dimensions as expected by the given function. The user can specify which dimensions of each array (or matrix) the function is to be applied over with the \code{margins} or \code{target_dims} option. A user can apply a function that receives (in addition to other helper parameters) 1 or more arrays as input, each with a different number of dimensions, and returns any number of multidimensional arrays. The target dimensions can be specified by their names. It is recommended to use this wrapper with multidimensional arrays with named dimensions. #' @param data A single object (vector, matrix or array) or a list of objects. They must be in the same order as expected by fun. #' @param target_dims List of vectors containing the dimensions to be input into fun for each of the objects in the data. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. This parameter is mandatory if margins is not specified. If both margins and target_dims are specified, margins takes priority over target_dims. #' @param fun Function to be applied to the arrays. #' @param ... Additional arguments to be used in the fun. #' @param output_dims Optional list of vectors containing the names of the dimensions to be output from the fun for each of the objects it returns (or a single vector if the function has only one output). #' @param margins List of vectors containing the margins for the input objects to be split by. Or, if there is a single vector of margins specified and a list of objects in data, then the single set of margins is applied over all objects. These vectors can contain either integers specifying the dimension position, or characters corresponding to the dimension names. If both margins and target_dims are specified, margins takes priority over target_dims. #' @param guess_dim_names Whether to automatically guess missing dimension names for dimensions of equal length across different inputs in 'data' with a warning (TRUE; default), or to crash whenever unnamed dimensions of equa length are identified across different inputs (FALSE). #' @param ncores The number of multicore threads to use for parallel computation. #' @param split_factor Factor telling to which degree the input data should be split into smaller pieces to be processed by the available cores. By default (split_factor = 1) the data is split into 4 pieces for each of the cores (as specified in ncores). A split_factor of 2 will result in 8 pieces for each of the cores, and so on. The special value 'greatest' will split the input data into as many pieces as possible. #' @details When using a single object as input, Apply is almost identical to the apply function. For multiple input objects, the output array will have dimensions equal to the dimensions specified in 'margins'. #' @return List of arrays or matrices or vectors resulting from applying fun to data. #' @references Wickham, H (2011), The Split-Apply-Combine Strategy for Data Analysis, Journal of Statistical Software. #' @export #' @examples #' #Change in the rate of exceedance for two arrays, with different #' #dimensions, for some matrix of exceedances. #' data <- list(array(rnorm(1000), c(5, 10, 20)), #' array(rnorm(500), c(5, 10, 10)), #' array(rnorm(50), c(5, 10))) #' test_fun <- function(x, y, z) { #' ((sum(x > z) / (length(x))) / #' (sum(y > z) / (length(y)))) * 100 #' } #' test <- Apply(data, target = list(3, 3, NULL), test_fun) #' @importFrom foreach registerDoSEQ #' @importFrom doParallel registerDoParallel #' @importFrom plyr splat llply Apply <- function(data, target_dims = NULL, fun, ..., output_dims = NULL, margins = NULL, guess_dim_names = TRUE, ncores = NULL, split_factor = 1) { # Check data if (!is.list(data)) { data <- list(data) } if (any(!sapply(data, is.numeric))) { stop("Parameter 'data' must be one or a list of numeric objects.") } is_vector <- rep(FALSE, length(data)) is_unnamed <- rep(FALSE, length(data)) unnamed_dims <- c() guessed_any_dimnames <- FALSE for (i in 1 : length(data)) { if (length(data[[i]]) < 1) { stop("Arrays in 'data' must be of length > 0.") } if (is.null(dim(data[[i]]))) { is_vector[i] <- TRUE is_unnamed[i] <- TRUE dim(data[[i]]) <- length(data[[i]]) } if (!is.null(names(dim(data[[i]])))) { if (any(sapply(names(dim(data[[i]])), nchar) == 0)) { stop("Dimension names of arrays in 'data' must be at least ", "one character long.") } if (length(unique(names(dim(data[[i]])))) != length(names(dim(data[[i]])))) { stop("Arrays in 'data' must not have repeated dimension names.") } if (any(is.na(names(dim(data[[i]]))))) { stop("Arrays in 'data' must not have NA as dimension names.") } } else { is_unnamed[i] <- TRUE new_unnamed_dims <- c() unnamed_dims_copy <- unnamed_dims for (j in 1 : length(dim(data[[i]]))) { len_of_dim_j <- dim(data[[i]])[j] found_match <- which(unnamed_dims_copy == len_of_dim_j) if (!guess_dim_names && (length(found_match) > 0)) { stop("Arrays in 'data' have multiple unnamed dimensions of the ", "same length. Please provide dimension names.") } if (length(found_match) > 0) { found_match <- found_match[1] names(dim(data[[i]]))[j] <- names(unnamed_dims_copy[found_match]) unnamed_dims_copy <- unnamed_dims_copy[-found_match] guessed_any_dimnames <- TRUE } else { new_dim <- len_of_dim_j names(new_dim) <- paste0('_unnamed_dim_', length(unnamed_dims) + length(new_unnamed_dims) + 1, '_') new_unnamed_dims <- c(new_unnamed_dims, new_dim) names(dim(data[[i]]))[j] <- names(new_dim) } } unnamed_dims <- c(unnamed_dims, new_unnamed_dims) } } if (guessed_any_dimnames) { dim_names_string <- "" for (i in 1:length(data)) { dim_names_string <- c(dim_names_string, "\n\tInput ", i, ":", sapply(capture.output(print(dim(data[[i]]))), function(x) paste0('\n\t\t', x))) } warning("Guessed names for some unnamed dimensions of equal length ", "found across different inputs in 'data'. Please check ", "carefully the assumed names below are correct, or provide ", "dimension names for safety, or disable the parameter ", "'guess_dimension_names'.", dim_names_string) } # Check fun if (is.character(fun)) { try({fun <- get(fun)}, silent = TRUE) if (!is.function(fun)) { stop("Could not find the function '", fun, "'.") } } if (!is.function(fun)) { stop("Parameter 'fun' must be a function or a character string ", "with the name of a function.") } if (!is.null(attributes(fun))) { if (is.null(target_dims)) { if ('target_dims' %in% names(attributes(fun))) { target_dims <- attr(fun, 'target_dims') } } if (is.null(output_dims)) { if ('output_dims' %in% names(attributes(fun))) { output_dims <- attr(fun, 'output_dims') } } } # Check target_dims and margins arglist <- as.list(match.call()) if (!any(c('margins', 'target_dims') %in% names(arglist)) && is.null(target_dims)) { stop("One of 'margins' or 'target_dims' must be specified.") } margins_names <- vector('list', length(data)) target_dims_names <- vector('list', length(data)) if ('margins' %in% names(arglist)) { # Check margins and build target_dims accordingly if (!is.list(margins)) { margins <- rep(list(margins), length(data)) } if (any(!sapply(margins, function(x) is.character(x) || is.numeric(x) || is.null(x)))) { stop("Parameter 'margins' must be one or a list of numeric or ", "character vectors.") } if (any(sapply(margins, function(x) is.character(x) && (length(x) == 0)))) { stop("Parameter 'margins' must not contain length-0 character vectors.") } duplicate_dim_specs <- sapply(margins, function(x) { length(unique(x)) != length(x) }) if (any(duplicate_dim_specs)) { stop("Parameter 'margins' must not contain duplicated dimension ", "specifications.") } target_dims <- vector('list', length(data)) for (i in 1 : length(data)) { if (length(margins[[i]]) > 0) { if (is.character(unlist(margins[i]))) { if (is.null(names(dim(data[[i]])))) { stop("Parameter 'margins' contains dimension names, but ", "some of the corresponding objects in 'data' do not have ", "dimension names.") } margins2 <- margins[[i]] margins2_new_num <- c() for (j in 1 : length(margins2)) { matches <- which(names(dim(data[[i]])) == margins2[j]) if (length(matches) < 1) { stop("Could not find dimension '", margins2[j], "' in ", i, "th object provided in 'data'.") } margins2_new_num[j] <- matches[1] } margins_names[[i]] <- margins[[i]] margins[[i]] <- margins2_new_num } if (length(margins[[i]]) == length(dim(data[[i]]))) { target_dims_names[i] <- list(NULL) target_dims[i] <- list(NULL) margins_names[[i]] <- names(dim(data[[i]])) } else { margins_names[[i]] <- names(dim(data[[i]]))[margins[[i]]] target_dims_names[[i]] <- names(dim(data[[i]]))[- margins[[i]]] target_dims[[i]] <- (1 : length(dim(data[[i]])))[- margins[[i]]] } } else { target_dims[[i]] <- 1 : length(dim(data[[i]])) if (!is.null(names(dim(data[[i]])))) { target_dims_names[[i]] <- names(dim(data[[i]])) } } } } else { # Check target_dims and build margins accordingly if (!is.list(target_dims)) { target_dims <- rep(list(target_dims), length(data)) } if (any(!sapply(target_dims, function(x) is.character(x) || is.numeric(x) || is.null(x)))) { stop("Parameter 'target_dims' must be one or a list of numeric or ", "character vectors.") } if (any(sapply(target_dims, function(x) is.character(x) && (length(x) == 0)))) { stop("Parameter 'target_dims' must not contain length-0 character vectors.") } duplicate_dim_specs <- sapply(target_dims, function(x) { length(unique(x)) != length(x) }) if (any(duplicate_dim_specs)) { stop("Parameter 'target_dims' must not contain duplicated dimension ", "specifications.") } margins <- vector('list', length(data)) for (i in 1 : length(data)) { if (length(target_dims[[i]]) > 0) { if (is.character(unlist(target_dims[i]))) { if (is.null(names(dim(data[[i]])))) { stop("Parameter 'target_dims' contains dimension names, but ", "some of the corresponding objects in 'data' do not have ", "dimension names.") } targs2 <- target_dims[[i]] targs2_new_num <- c() for (j in 1 : length(targs2)) { matches <- which(names(dim(data[[i]])) == targs2[j]) if (length(matches) < 1) { stop("Could not find dimension '", targs2[j], "' in ", i, "th object provided in 'data'.") } targs2_new_num[j] <- matches[1] } target_dims_names[[i]] <- target_dims[[i]] target_dims[[i]] <- targs2_new_num } if (length(target_dims[[i]]) == length(dim(data[[i]]))) { margins_names[i] <- list(NULL) margins[i] <- list(NULL) target_dims_names[[i]] <- names(dim(data[[i]])) } else { target_dims_names[[i]] <- names(dim(data[[i]]))[target_dims[[i]]] margins_names[[i]] <- names(dim(data[[i]]))[- target_dims[[i]]] margins[[i]] <- (1 : length(dim(data[[i]])))[- target_dims[[i]]] } } else { margins[[i]] <- 1 : length(dim(data[[i]])) if (!is.null(names(dim(data[[i]])))) { margins_names[[i]] <- names(dim(data[[i]])) } } } } # Reorder dimensions of input data for target dims to be left-most # and in the required order. for (i in 1 : length(data)) { if (length(target_dims[[i]]) > 0) { if (is.unsorted(target_dims[[i]]) || (max(target_dims[[i]]) > length(target_dims[[i]]))) { marg_dims <- (1 : length(dim(data[[i]])))[- target_dims[[i]]] data[[i]] <- .aperm2(data[[i]], c(target_dims[[i]], marg_dims)) target_dims[[i]] <- 1 : length(target_dims[[i]]) target_dims_names[[i]] <- names(dim(data[[i]]))[target_dims[[i]]] if (length(target_dims[[i]]) < length(dim(data[[i]]))) { margins[[i]] <- (length(target_dims[[i]]) + 1) : length(dim(data[[i]])) margins_names[[i]] <- names(dim(data[[i]]))[margins[[i]]] } } } } # Check output_dims if (!is.null(output_dims)) { if (!is.list(output_dims)) { output_dims <- list(output1 = output_dims) } if (any(sapply(output_dims, function(x) !(is.character(x) || is.null(x))))) { stop("Parameter 'output_dims' must be one or a list of vectors of character strings (or NULLs).") } if (is.null(names(output_dims))) { names(output_dims) <- rep('', length(output_dims)) } missing_output_names <- which(sapply(names(output_dims), nchar) == 0) if (length(missing_output_names) > 0) { names(output_dims)[missing_output_names] <- paste0('output', missing_output_names) } } # Check ncores if (is.null(ncores)) { ncores <- 1 } if (!is.numeric(ncores)) { stop("Parameter 'ncores' must be numeric.") } ncores <- round(ncores) # Consistency checks of margins of all input objects # for each data array, add its margins to the list if not present. # those margins present, check that they match # with this we end up with a named list of margin sizes all_found_margins_lengths <- afml <- list() for (i in 1:length(data)) { #if (!is.null(margins_names[[i]])) { if (length(afml) > 0) { matches <- which(margins_names[[i]] %in% names(afml)) if (length(matches) > 0) { margs_to_add <- as.list(dim(data[[i]])[margins[[i]]][- matches]) if (any(dim(data[[i]])[margins[[i]][matches]] != unlist(afml[margins_names[[i]][matches]]))) { stop("Found one or more margin dimensions with the same name and ", "different length in some of the input objects in 'data'.") } } else { margs_to_add <- as.list(dim(data[[i]])[margins[[i]]]) } afml <- c(afml, margs_to_add) } else { afml <- as.list(dim(data[[i]])[margins[[i]]]) } #} } # afml is now a named list with the lenghts of all margins. Each margin # appears once only. If some names are not provided, they are set automatically # to 'unnamed_dim_1', 'unamed_dim_2', ... # Now need to check which margins are common for all the data arrays. # Those will be used by llply. # For the margins that are not common, we will need to iterate manually # across them, and use data arrays repeatedly as needed. margins_afml <- margins for (i in 1:length(data)) { if (length(margins[[i]]) > 0) { margins_afml[[i]] <- sapply(margins_names[[i]], function(x) { sapply(x, function(y) { which(names(afml) == y) } ) } ) } } common_margs <- margins_afml[[1]] if (length(margins_afml) > 1) { for (i in 2:length(margins_afml)) { margs_a <- unlist(afml[common_margs]) margs_b <- unlist(afml[margins_afml[[i]]]) matches <- which(names(margs_a) %in% names(margs_b)) if (length(matches) > 0) { common_margs <- common_margs[matches] } else { common_margs <- NULL } } } if (length(afml) > 0) { non_common_margs <- 1:length(afml) if (length(common_margs) > 0) { non_common_margs <- non_common_margs[- common_margs] } } else { non_common_margs <- NULL } # common_margs is now a numeric vector with the indices of the common # margins (i.e. their position in afml) # non_common_margs is now a numeric vector with the indices of the # non-common margins (i.e. their position in afml) if (length(c(non_common_margs, common_margs)) > 0) { marg_inds_ordered <- sort(c(non_common_margs, common_margs)) margins_array_dims <- mad <- unlist(afml[marg_inds_ordered]) } else { margins_array_dims <- mad <- NULL } # Sharing workload across cores. Each core will run 4 chunks if possible. # the larger the split factor, the smaller the amount of data that # will be processed at once and the finer the granules to be distributed # across cores, but the larger the overhead for granule startup, etc. total_size <- prod(mad) if (split_factor == 'greatest') { chunks_per_core <- ceiling(total_size / ncores) } else { chunks_per_core <- 4 * split_factor } if (!is.null(ncores)) { chunk_size <- round(total_size / (ncores * chunks_per_core)) } #} else { # chunk_size <- 4 #} if (chunk_size < 1) { chunk_size <- 1 } nchunks <- floor(total_size / chunk_size) chunk_sizes <- rep(chunk_size, nchunks) if (total_size %% chunk_size != 0) { chunk_sizes <- c(chunk_sizes, total_size %% chunk_size) } # The following few lines have an impact on memory footprint. # Flattening margin dimensions so that the iteration function can access # them easily. for (i in 1:length(data)) { if (length(margins[[i]]) > 0) { dims <- dim(data[[i]]) margins_inds <- 1:length(margins[[i]]) + length(target_dims[[i]]) dim(data[[i]]) <- c(dims[-margins_inds], '_margins_dim_' = prod(dims[margins_inds])) } else { dim(data[[i]]) <- c(dim(data[[i]]), '_margins_dim_' = 1) } } ###.isolate <- function(data, margins, drop = FALSE) { ### if (length(margins) > 0) { ### margin_length <- lapply(dim(data), function(x) 1 : x) ### margin_length[- margins] <- "" ### } else { ### margin_length <- as.list(rep("", length(dim(data)))) ### } ### margin_length <- expand.grid(margin_length, KEEP.OUT.ATTRS = FALSE, ### stringsAsFactors = FALSE) ### eval(dim(environment()$data)) ### structure(list(env = environment(), index = margin_length, ### drop = drop, subs = as.name("[")), ### class = c("indexed_array")) ###} input_margin_weights <- vector('list', length(data)) ###iteration_input_dims <- vector('list', length(data)) ###flat_data <- vector('list', length(data)) for (i in 1:length(data)) { marg_sizes <- dim(data[[i]])[margins[[i]]] input_margin_weights[[i]] <- sapply(1:length(marg_sizes), function(k) prod(c(1, marg_sizes)[1:k])) ###iteration_input_dims[[i]] <- dim(data[[i]])[target_dims[[i]]] ###flat_data[[i]] <- .isolate(data[[i]], margins[[i]]) } # TODO: need to add progress bar # TODO: IF ONLY ONE INPUT ARRAY, MAKE USE OF apply. splatted_f <- splat(fun) # For a selected use case, these are the timings: # - total: 17 s # - preparation + post: 1 s # - llply (40 iterations): 16 s # - one iteration: 1.5s with profiling of 50 sub-iterations (0.4 without) # - intro: 0 s # - for loop with profiling of 50 sub-iterations (5000 sub-iterations): 1.5 s # - one sub-iteration: 0.0003 s # - intro: 0.000125 s # - splatted_f: 0.000125 s # - outro: 0.00005 iteration <- function(m) { # INTRO n <- 1 first_index <- n + (m - 1) * chunk_size first_marg_indices <- arrayInd(first_index, mad) names(first_marg_indices) <- names(mad) sub_arrays_of_results <- list() found_first_sub_result <- FALSE iteration_indices_to_take <- list() for (i in 1:length(data)) { iteration_indices_to_take[[i]] <- as.list(rep(TRUE, length(dim(data[[i]])))) } add_one_multidim <- function(index, dims) { stop_iterating <- FALSE check_dim <- 1 ndims <- length(index) while (!stop_iterating) { index[check_dim] <- index[check_dim] + 1 if (index[check_dim] > dims[check_dim]) { index[check_dim] <- 1 check_dim <- check_dim + 1 if (check_dim > ndims) { check_dim <- rep(1, ndims) stop_iterating <- TRUE } } else { stop_iterating <- TRUE } } index } # FOR LOOP for (n in 1:chunk_sizes[m]) { # SUB-ITERATION INTRO iteration_input <- list() for (i in 1:length(data)) { input_margin_dim_index <- first_marg_indices[margins_names[[i]]] input_margin_dim_index <- 1 + sum((input_margin_dim_index - 1) * input_margin_weights[[i]]) iteration_indices_to_take[[i]][[length(dim(data[[i]]))]] <- input_margin_dim_index iteration_input[[i]] <- do.call('[', c(list(x = data[[i]]), iteration_indices_to_take[[i]], list(drop = FALSE))) num_dims <- length(dim(iteration_input[[i]])) if (num_dims > 1) { dim(iteration_input[[i]]) <- dim(iteration_input[[i]])[-length(dim(iteration_input[[i]]))] } else { dim(iteration_input[[i]]) <- NULL } ###if (length(iteration_input_dims[[i]]) > 0) { ### iteration_input[[i]] <- array(flat_data[[i]][[input_margin_dim_index]], ### dim = iteration_input_dims[[i]]) ###} else { ### iteration_input[[i]] <- as.vector(flat_data[[i]][[input_margin_dim_index]]) ###} } if (!is.null(mad)) { first_marg_indices <- add_one_multidim(first_marg_indices, mad) } # SPLATTED_F result <- splatted_f(iteration_input, ...) # SUB-ITERATION OUTRO if (!is.list(result)) { result <- list(result) } if (!found_first_sub_result) { sub_arrays_of_results <- vector('list', length(result)) if (!is.null(output_dims)) { if (length(output_dims) != length(sub_arrays_of_results)) { stop("The 'fun' returns ", length(sub_arrays_of_results), " elements, but ", length(output_dims), " elements were expected.") } names(sub_arrays_of_results) <- names(output_dims) } else if (!is.null(names(result))) { names(sub_arrays_of_results) <- names(result) } else { names(sub_arrays_of_results) <- paste0('output', 1:length(result)) } len0_names <- which(nchar(names(sub_arrays_of_results)) == 0) if (length(len0_names) > 0) { names(sub_arrays_of_results)[len0_names] <- paste0('output', len0_names) } } atomic_fun_out_dims <- vector('list', length(result)) for (component in 1:length(result)) { if (is.null(dim(result[[component]]))) { if (length(result[[component]]) == 1) { component_dims <- NULL } else { component_dims <- length(result[[component]]) } } else { component_dims <- dim(result[[component]]) } if (!found_first_sub_result) { sub_arrays_of_results[[component]] <- array(dim = c(component_dims, chunk_sizes[m])) } if (!is.null(component_dims)) { atomic_fun_out_dims[[component]] <- component_dims } sub_arrays_of_results[[component]][(1:prod(component_dims)) + (n - 1) * prod(component_dims)] <- result[[component]] } if (!found_first_sub_result) { found_first_sub_result <- TRUE } if (!is.null(output_dims)) { # Check number of outputs. if (length(output_dims) != length(result)) { stop("Expected fun to return ", length(output_dims), " components, ", "but ", length(result), " found.") } # Check number of output dimensions is correct. for (component in 1:length(result)) { if (length(atomic_fun_out_dims[[component]]) != length(output_dims[[component]])) { stop("Expected ", component, "st returned element by 'fun' ", "to have ", length(output_dims[[component]]), " dimensions, ", "but ", length(atomic_fun_out_dims[[component]]), " found.") } } } } sub_arrays_of_results } # Execute in parallel if needed parallel <- ncores > 1 if (parallel) registerDoParallel(ncores) result <- llply(1:length(chunk_sizes), iteration, .parallel = parallel) if (parallel) registerDoSEQ() # Merge the results arrays_of_results <- NULL found_first_result <- FALSE result_chunk_lengths <- vector('list', length(result[[1]])) fun_out_dims <- vector('list', length(result[[1]])) for (m in 1:length(result)) { if (!found_first_result) { arrays_of_results <- vector('list', length(result[[1]])) if (!is.null(output_dims)) { if (length(output_dims) != length(arrays_of_results)) { stop("The 'fun' returns ", length(arrays_of_results), " elements, but ", length(output_dims), " elements were expected.") } names(arrays_of_results) <- names(output_dims) } else if (!is.null(names(result[[1]]))) { names(arrays_of_results) <- names(result[[1]]) } else { names(arrays_of_results) <- paste0('output', 1:length(result[[1]])) } } for (component in 1:length(result[[m]])) { component_dims <- dim(result[[m]][[component]]) if (!found_first_result) { result_chunk_lengths[[component]] <- prod(component_dims) if (length(component_dims) > 1) { fun_out_dims[[component]] <- component_dims[- length(component_dims)] } if (length(fun_out_dims[[component]]) + length(mad) > 0) { arrays_of_results[[component]] <- array(dim = c(fun_out_dims[[component]], mad)) dimnames_to_remove <- which(grepl('^_unnamed_dim_', names(dim(arrays_of_results[[component]])))) if (length(dimnames_to_remove) > 0) { names(dim(arrays_of_results[[component]]))[dimnames_to_remove] <- rep('', length(dimnames_to_remove)) } if (all(names(dim(arrays_of_results[[component]])) == '')) { names(dim(arrays_of_results[[component]])) <- NULL } } } arrays_of_results[[component]][(1:prod(component_dims)) + (m - 1) * result_chunk_lengths[[component]]] <- result[[m]][[component]] } if (!found_first_result) { found_first_result <- TRUE } #if (!is.null(output_dims)) { # # Check number of output dimensions is correct. # for (component in 1:length(atomic_fun_out_dims)) { # if (!is.null(names(fun_out_dims[[component]]))) { # # check component_dims match names of output_dims[[component]], and reorder if needed # } # } #} } # Assign 'output_dims' as dimension names if possible if (!is.null(output_dims)) { for (component in 1:length(output_dims)) { if (length(output_dims[[component]]) > 0) { names(dim(arrays_of_results[[component]]))[1:length(output_dims[[component]])] <- output_dims[[component]] } } } # Return arrays_of_results }