Newer
Older
#'Create the workflow with the previous defined operation and data.
#'
#'The step that combines the previous declared data and operation together to
#'create the complete workflow. It is the final step before data processing.
#'
#'@param inputs One or a list of objects of the class 'startR_cube' returned by
#' Start(), indicating the data to be processed.
#'@param step_fun A startR step function as returned by Step().
#'@param \dots Additional parameters for the inputs of function defined in
#' 'step_fun' by Step().
#'
#'@return A list of the class 'startR_workflow' containing all the objects
#' needed for the data operation.
#'@examples
#' data_path <- system.file('extdata', package = 'startR')
#' path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc')
#' sdates <- c('200011', '200012')
#' data <- Start(dat = list(list(path = path_obs)),
#' var = 'tos',
#' sdate = sdates,
#' time = 'all',
#' latitude = 'all',
#' longitude = 'all',
#' return_vars = list(latitude = 'dat',
#' longitude = 'dat',
#' time = 'sdate'),
#' retrieve = FALSE)
#' pi_short <- 3.14
#' fun <- function(x, pi_val) {
#' lat = attributes(x)$Variables$dat1$latitude
#' weight = sqrt(cos(lat * pi_val / 180))
#' corrected = Apply(list(x), target_dims = "latitude",
#' fun = function(x) {x * weight})
#' }
#'
#'
#' step <- Step(fun = fun,
#' target_dims = 'latitude',
#' output_dims = 'latitude',
#' use_libraries = c('multiApply'),
#' use_attributes = list(data = "Variables"))
#' wf <- AddStep(data, step, pi_val = pi_short)
#'
AddStep <- function(inputs, step_fun, ...) {
# Check step_fun
stop("Parameter 'step_fun' must be a startR step function as returned by Step.")
}
# Check inputs
if (is(inputs, 'startR_cube') | is(inputs, 'startR_workflow')) {
inputs <- list(inputs)
names(inputs) <- 'input1'
}
else if (is.list(inputs)) {
if (any(!sapply(inputs,
function(x) is(x, 'startR_cube') | is(x, 'startR_workflow')))) {
stop("Parameter 'inputs' must be one or a list of objects of the class ",
"'startR_cube' or 'startR_workflow'.")
}
} else {
stop("Parameter 'inputs' must be one or a list of objects of the class ",
"'startR_cube' or 'startR_workflow'.")
}
# Consistency checks
if (!is.null(attr(step_fun, "UseAttributes"))) {
if (!all(names(inputs) == names(attr(step_fun, "UseAttributes")))) {
names(inputs) <- names(attr(step_fun, "UseAttributes"))
.warning(paste("The name of inputs is not assigned or differs from",
"name of use_attributes list in Step(). Force inputs",
"name to be consistent with use_attributes list"))
}
}
if (length(inputs) != length(attr(step_fun, 'TargetDims'))) {
stop("The number of provided 'inputs' (", length(inputs), ") does not ",
"match the number of expected inputs by the provided 'step_fun' (",
length(attr(step_fun, 'TargetDims')), ").")
}
# Work out the total target dims of the step
previous_target_dims <- NULL
all_input_dims <- NULL
for (input in 1:length(inputs)) {
dims_to_compare <- names(attr(inputs[[input]], 'Dimensions'))
if (!all(attr(step_fun, 'TargetDims')[[input]] %in% dims_to_compare)) {
stop("The target dimensions required by 'step_fun' for the input ", input,
" are not present in the corresponding provided object in 'inputs'.")
}
if (is.null(previous_target_dims)) {
previous_target_dims <- attr(inputs[[input]], 'TargetDims')
} else {
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(attr(inputs[[input]], 'TargetDims')))
names(dims2) <- attr(inputs[[input]], 'TargetDims')
previous_target_dims <- names(.MergeArrayDims(dims1, dims2)[[1]])
}
}
new_input_dims <- attr(inputs[[input]], 'Dimensions')
if (any(is.na(new_input_dims))) {
new_input_dims[which(is.na(new_input_dims))] <- rep(1, length(which(is.na(new_input_dims))))
}
if (is.null(all_input_dims)) {
all_input_dims <- new_input_dims
} else {
all_input_dims <- .MergeArrayDims(all_input_dims, new_input_dims)[[1]]
}
}
new_target_dims <- unique(unlist(attr(step_fun, 'TargetDims')))
result <- list()
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(new_target_dims))
names(dims2) <- new_target_dims
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
for (output in 1:length(attr(step_fun, 'OutputDims'))) {
workflow <- list(inputs = inputs,
fun = step_fun,
params = list(...))
if (!is.null(attr(step_fun, 'OutputDims')[[output]])) {
dimensions <- rep(NA, length(attr(step_fun, 'OutputDims')[[output]]))
names(dimensions) <- attr(step_fun, 'OutputDims')[[output]]
} else {
dimensions <- NULL
}
in_dims_to_remove <- which(names(all_input_dims) %in% new_target_dims)
if (length(in_dims_to_remove) > 0) {
dimensions <- c(dimensions, all_input_dims[-in_dims_to_remove])
} else {
dimensions <- c(dimensions, all_input_dims)
}
attr(workflow, 'Dimensions') <- dimensions
attr(workflow, 'AllTargetDims') <- target_dims
class(workflow) <- 'startR_workflow'
result[[names(attr(step_fun, 'OutputDims'))[output]]] <- workflow
}
if (length(result) == 1) {
result[[1]]
} else {
result
}
}