Newer
Older
#'Create the workflow with the previous defined operation and data.
#'
#'The step that combines the previous declared data and operation together to
#'create the complete workflow. It is the final step before data processing.
#'
#'@param inputs One or a list of objects of the class 'startR_cube' returned by
#' Start(), indicating the data to be processed.
#'@param step_fun A startR step function as returned by Step().
#'@param \dots Additional parameters for the inputs of function defined in
#' 'step_fun' by Step().
#'
#'@return A list of the class 'startR_workflow' containing all the objects
#' needed for the data operation.
#'@examples
#' data_path <- system.file('extdata', package = 'startR')
#' path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc')
#' sdates <- c('200011', '200012')
#' data <- Start(dat = list(list(path = path_obs)),
#' var = 'tos',
#' sdate = sdates,
#' time = 'all',
#' latitude = 'all',
#' longitude = 'all',
#' return_vars = list(latitude = 'dat',
#' longitude = 'dat',
#' time = 'sdate'),
#' retrieve = FALSE)
#' pi_short <- 3.14
#' fun <- function(x, pi_val) {
#' lat = attributes(x)$Variables$dat1$latitude
#' weight = sqrt(cos(lat * pi_val / 180))
#' corrected = Apply(list(x), target_dims = "latitude",
#' fun = function(x) {x * weight})
#' }
#'
#'
#' step <- Step(fun = fun,
#' target_dims = 'latitude',
#' output_dims = 'latitude',
#' use_libraries = c('multiApply'),
#' use_attributes = list(data = "Variables"))
#' wf <- AddStep(data, step, pi_val = pi_short)
#'
#'@export
AddStep <- function(inputs, step_fun, ...) {
# Check step_fun
#-----------------NEW------------------
if (is.list(step_fun)) { # multiple steps
if (any(sapply(step_fun, class) != 'startR_step_fun')) {
stop("Parameter 'step_fun' must be a startR step function or a list of it ",
"as returned by Step.")
}
if (is.null(names(step_fun))) {
step_name <- paste0('step', 1:length(step_fun))
} else {
step_name <- names(step_fun)
}
multi_steps <- TRUE
} else { # one step only
if (!('startR_step_fun' %in% class(step_fun))) {
stop("Parameter 'step_fun' must be a startR step function or a list of it ",
"as returned by Step.")
}
step_name <- 'step1'
multi_steps <- FALSE
# Check inputs
if (any(c('startR_cube', 'startR_workflow') %in% class(inputs))) {
inputs <- list(inputs)
names(inputs) <- 'input1'
}
else if (is.list(inputs)) {
if (any(!sapply(inputs,
function(x) any(c('startR_cube',
'startR_workflow') %in% class(x))))) {
stop("Parameter 'inputs' must be one or a list of objects of the class ",
"'startR_cube' or 'startR_workflow'.")
}
} else {
stop("Parameter 'inputs' must be one or a list of objects of the class ",
"'startR_cube' or 'startR_workflow'.")
}
# Consistency checks
#-----------------NEW---------------
if (multi_steps) {
if (any(!sapply(lapply(step_fun, attr, "UseAttributes"), is.null))) {
tmps <- step_fun[[which(!sapply(lapply(step_fun, attr, "UseAttributes"), is.null))]]
for (tmp in tmps) {
if (!all(names(inputs) == names(attr(tmp, "UseAttributes")))) {
names(inputs) <- names(attr(tmp, "UseAttributes"))
.warning(paste("The name of inputs is not assigned or differs from",
"name of use_attributes list in Step(). Force inputs",
"name to be consistent with use_attributes list"))
}
}
}
} else {
if (!is.null(attr(step_fun, "UseAttributes"))) {
if (!all(names(inputs) == names(attr(step_fun, "UseAttributes")))) {
names(inputs) <- names(attr(step_fun, "UseAttributes"))
.warning(paste("The name of inputs is not assigned or differs from",
"name of use_attributes list in Step(). Force inputs",
"name to be consistent with use_attributes list"))
}
#-----------NEW_END------------------
#-----------------NEW---------------
if (multi_steps) {
if (any(sapply(sapply(step_fun, attr, 'TargetDims'), length) != length(inputs))) {
stop("The number of provided 'inputs' (", length(inputs), ") does not ",
"match the number of expected inputs by the provided 'step_fun' (",
length(attr(step_fun, 'TargetDims')), ").")
}
} else {
if (length(inputs) != length(attr(step_fun, 'TargetDims'))) {
stop("The number of provided 'inputs' (", length(inputs), ") does not ",
"match the number of expected inputs by the provided 'step_fun' (",
length(attr(step_fun, 'TargetDims')), ").")
}
# Work out the total target dims of the step
previous_target_dims <- NULL
all_input_dims <- NULL
for (input in 1:length(inputs)) {
dims_to_compare <- names(attr(inputs[[input]], 'Dimensions'))
#----------------NEW---------------
if (multi_steps) {
for (tmp in step_fun) {
if (!all(attr(tmp, 'TargetDims')[[input]] %in% dims_to_compare)) {
stop("The target dimensions required by 'step_fun' for the input ", input,
" are not present in the corresponding provided object in 'inputs'.")
}
}
} else {
if (!all(attr(step_fun, 'TargetDims')[[input]] %in% dims_to_compare)) {
stop("The target dimensions required by 'step_fun' for the input ", input,
" are not present in the corresponding provided object in 'inputs'.")
}
if ('startR_workflow' %in% class(inputs[[input]])) {
if (is.null(previous_target_dims)) {
previous_target_dims <- attr(inputs[[input]], 'TargetDims')
} else {
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(attr(inputs[[input]], 'TargetDims')))
names(dims2) <- attr(inputs[[input]], 'TargetDims')
previous_target_dims <- names(.MergeArrayDims(dims1, dims2)[[1]])
}
}
new_input_dims <- attr(inputs[[input]], 'Dimensions')
if (any(is.na(new_input_dims))) {
new_input_dims[which(is.na(new_input_dims))] <- rep(1, length(which(is.na(new_input_dims))))
}
if (is.null(all_input_dims)) {
all_input_dims <- new_input_dims
} else {
all_input_dims <- .MergeArrayDims(all_input_dims, new_input_dims)[[1]]
#---------------NEW----------------
# if multiple steps, need for loop over each step
if (multi_steps) {
step_funs <- step_fun
} else {
step_funs <- list(step_fun)
}
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
step_fun_count <- 1
for (step_fun in step_funs) {
new_target_dims <- unique(unlist(attr(step_fun, 'TargetDims')))
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(new_target_dims))
names(dims2) <- new_target_dims
target_dims <- names(.MergeArrayDims(dims1, dims2)[[1]])
#NOTE: Stop saving each output one by one. Since the info is the same,
# just save once, and the list is $step1 - $inputs, $fun, $params.
# for (output in 1:length(attr(step_fun, 'OutputDims'))) {
workflow <- list(inputs = inputs,
fun = step_fun,
params = list(...))
# if (!is.null(attr(step_fun, 'OutputDims')[[output]])) {
# dimensions <- rep(NA, length(attr(step_fun, 'OutputDims')[[output]]))
# names(dimensions) <- attr(step_fun, 'OutputDims')[[output]]
# } else {
# dimensions <- NULL
# }
# in_dims_to_remove <- which(names(all_input_dims) %in% new_target_dims)
# if (length(in_dims_to_remove) > 0) {
# dimensions <- c(dimensions, all_input_dims[-in_dims_to_remove])
# } else {
# dimensions <- c(dimensions, all_input_dims)
# }
#NOTE: This Dimensions attribute is not used afterward.
# attr(workflow, 'Dimensions') <- dimensions
attr(workflow, 'AllTargetDims') <- target_dims
class(workflow) <- 'startR_workflow'
# result[[step_name[step_fun_count]]][[names(attr(step_fun, 'OutputDims'))[output]]] <- workflow
result[[step_name[step_fun_count]]] <- workflow
# }
step_fun_count <- step_fun_count + 1
if (!multi_steps) {
result <- result[[1]] # remove $step1
# if (length(result) == 1) {
# result <- result[[1]] # remove $output1
# }
return(result)
#-----------NEW_END------------------