Newer
Older
#'Create the workflow with the previous defined operation and data.
#'
#'The step that combines the previous declared data and operation together to
#'create the complete workflow. It is the final step before data processing.
#'
#'@param inputs One or a list of objects of the class 'startR_cube' returned by
#' Start(), indicating the data to be processed.
#'@param step_fun A startR step function as returned by Step().
#'@param \dots Additional parameters for the inputs of function defined in
#' 'step_fun' by Step().
#'
#'@return A list of the class 'startR_workflow' containing all the objects
#' needed for the data operation.
#'@examples
#' data_path <- system.file('extdata', package = 'startR')
#' path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc')
#' sdates <- c('200011', '200012')
#' data <- Start(dat = path_obs,
#' var = 'tos',
#' sdate = sdates,
#' time = 'all',
#' latitude = 'all',
#' longitude = 'all',
#' return_vars = list(latitude = 'dat',
#' longitude = 'dat',
#' time = 'sdate'),
#' retrieve = FALSE)
#' pi_short <- 3.14
#' fun <- function(x, pi_val) {
#' lat = attributes(x)$Variables$dat1$latitude
#' weight = sqrt(cos(lat * pi_val / 180))
#' corrected = Apply(list(x), target_dims = "latitude",
#' fun = function(x) {x * weight})
#' }
#'
#'
#' step <- Step(fun = fun,
#' target_dims = 'latitude',
#' output_dims = 'latitude',
#' use_libraries = c('multiApply'),
#' use_attributes = list(data = "Variables"))
#' wf <- AddStep(data, step, pi_val = pi_short)
#'
#'@export
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
AddStep <- function(inputs, step_fun, ...) {
# Check step_fun
if (!('startR_step_fun' %in% class(step_fun))) {
stop("Parameter 'step_fun' must be a startR step function as returned by Step.")
}
# Check inputs
if (any(c('startR_cube', 'startR_workflow') %in% class(inputs))) {
inputs <- list(inputs)
names(inputs) <- 'input1'
}
else if (is.list(inputs)) {
if (any(!sapply(inputs,
function(x) any(c('startR_cube',
'startR_workflow') %in% class(x))))) {
stop("Parameter 'inputs' must be one or a list of objects of the class ",
"'startR_cube' or 'startR_workflow'.")
}
} else {
stop("Parameter 'inputs' must be one or a list of objects of the class ",
"'startR_cube' or 'startR_workflow'.")
}
# Consistency checks
if (!is.null(attr(step_fun, "UseAttributes"))) {
if (!all(names(inputs) == names(attr(step_fun, "UseAttributes")))) {
names(inputs) <- names(attr(step_fun, "UseAttributes"))
.warning(paste("The name of inputs is not assigned or differs from",
"name of use_attributes list in Step(). Force inputs",
"name to be consistent with use_attributes list"))
}
}
if (length(inputs) != length(attr(step_fun, 'TargetDims'))) {
stop("The number of provided 'inputs' (", length(inputs), ") does not ",
"match the number of expected inputs by the provided 'step_fun' (",
length(attr(step_fun, 'TargetDims')), ").")
}
# Work out the total target dims of the step
previous_target_dims <- NULL
all_input_dims <- NULL
for (input in 1:length(inputs)) {
dims_to_compare <- names(attr(inputs[[input]], 'Dimensions'))
if (!all(attr(step_fun, 'TargetDims')[[input]] %in% dims_to_compare)) {
stop("The target dimensions required by 'step_fun' for the input ", input,
" are not present in the corresponding provided object in 'inputs'.")
}
if ('startR_workflow' %in% class(inputs[[input]])) {
if (is.null(previous_target_dims)) {
previous_target_dims <- attr(inputs[[input]], 'TargetDims')
} else {
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(attr(inputs[[input]], 'TargetDims')))
names(dims2) <- attr(inputs[[input]], 'TargetDims')
previous_target_dims <- names(.MergeArrayDims(dims1, dims2)[[1]])
}
}
new_input_dims <- attr(inputs[[input]], 'Dimensions')
if (any(is.na(new_input_dims))) {
new_input_dims[which(is.na(new_input_dims))] <- rep(1, length(which(is.na(new_input_dims))))
}
if (is.null(all_input_dims)) {
all_input_dims <- new_input_dims
} else {
all_input_dims <- .MergeArrayDims(all_input_dims, new_input_dims)[[1]]
}
}
new_target_dims <- unique(unlist(attr(step_fun, 'TargetDims')))
result <- list()
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(new_target_dims))
names(dims2) <- new_target_dims
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
for (output in 1:length(attr(step_fun, 'OutputDims'))) {
workflow <- list(inputs = inputs,
fun = step_fun,
params = list(...))
if (!is.null(attr(step_fun, 'OutputDims')[[output]])) {
dimensions <- rep(NA, length(attr(step_fun, 'OutputDims')[[output]]))
names(dimensions) <- attr(step_fun, 'OutputDims')[[output]]
} else {
dimensions <- NULL
}
in_dims_to_remove <- which(names(all_input_dims) %in% new_target_dims)
if (length(in_dims_to_remove) > 0) {
dimensions <- c(dimensions, all_input_dims[-in_dims_to_remove])
} else {
dimensions <- c(dimensions, all_input_dims)
}
attr(workflow, 'Dimensions') <- dimensions
attr(workflow, 'AllTargetDims') <- target_dims
class(workflow) <- 'startR_workflow'
result[[names(attr(step_fun, 'OutputDims'))[output]]] <- workflow
}
if (length(result) == 1) {
result[[1]]
} else {
result
}
}