Commit 437488c9 authored by aho's avatar aho
Browse files

Add multiple steps option

parent 0921db30
Pipeline #9830 failed with stage
in 36 seconds
......@@ -44,10 +44,27 @@
#'@export
AddStep <- function(inputs, step_fun, ...) {
# Check step_fun
if (!('startR_step_fun' %in% class(step_fun))) {
stop("Parameter 'step_fun' must be a startR step function as returned by Step.")
#-----------------NEW------------------
if (is.list(step_fun)) { # multiple steps
if (any(sapply(step_fun, class) != 'startR_step_fun')) {
stop("Parameter 'step_fun' must be a startR step function or a list of it ",
"as returned by Step.")
}
if (is.null(names(step_fun))) {
step_name <- paste0('step', 1:length(step_fun))
} else {
step_name <- names(step_fun)
}
multi_steps <- TRUE
} else { # one step only
if (!('startR_step_fun' %in% class(step_fun))) {
stop("Parameter 'step_fun' must be a startR step function or a list of it ",
"as returned by Step.")
}
step_name <- 'step1'
multi_steps <- FALSE
}
#--------------NEW_END-----------------
# Check inputs
if (any(c('startR_cube', 'startR_workflow') %in% class(inputs))) {
inputs <- list(inputs)
......@@ -66,30 +83,67 @@ AddStep <- function(inputs, step_fun, ...) {
}
# Consistency checks
if (!is.null(attr(step_fun, "UseAttributes"))) {
if (!all(names(inputs) == names(attr(step_fun, "UseAttributes")))) {
names(inputs) <- names(attr(step_fun, "UseAttributes"))
.warning(paste("The name of inputs is not assigned or differs from",
"name of use_attributes list in Step(). Force inputs",
"name to be consistent with use_attributes list"))
#-----------------NEW---------------
if (multi_steps) {
if (any(!sapply(lapply(step_fun, attr, "UseAttributes"), is.null))) {
tmps <- step_fun[[which(!sapply(lapply(step_fun, attr, "UseAttributes"), is.null))]]
for (tmp in tmps) {
if (!all(names(inputs) == names(attr(tmp, "UseAttributes")))) {
names(inputs) <- names(attr(tmp, "UseAttributes"))
.warning(paste("The name of inputs is not assigned or differs from",
"name of use_attributes list in Step(). Force inputs",
"name to be consistent with use_attributes list"))
}
}
}
} else {
if (!is.null(attr(step_fun, "UseAttributes"))) {
if (!all(names(inputs) == names(attr(step_fun, "UseAttributes")))) {
names(inputs) <- names(attr(step_fun, "UseAttributes"))
.warning(paste("The name of inputs is not assigned or differs from",
"name of use_attributes list in Step(). Force inputs",
"name to be consistent with use_attributes list"))
}
}
}
if (length(inputs) != length(attr(step_fun, 'TargetDims'))) {
stop("The number of provided 'inputs' (", length(inputs), ") does not ",
"match the number of expected inputs by the provided 'step_fun' (",
length(attr(step_fun, 'TargetDims')), ").")
#-----------NEW_END------------------
#-----------------NEW---------------
if (multi_steps) {
if (any(sapply(sapply(step_fun, attr, 'TargetDims'), length) != length(inputs))) {
stop("The number of provided 'inputs' (", length(inputs), ") does not ",
"match the number of expected inputs by the provided 'step_fun' (",
length(attr(step_fun, 'TargetDims')), ").")
}
} else {
if (length(inputs) != length(attr(step_fun, 'TargetDims'))) {
stop("The number of provided 'inputs' (", length(inputs), ") does not ",
"match the number of expected inputs by the provided 'step_fun' (",
length(attr(step_fun, 'TargetDims')), ").")
}
}
#-----------NEW_END------------------
# Work out the total target dims of the step
previous_target_dims <- NULL
all_input_dims <- NULL
for (input in 1:length(inputs)) {
dims_to_compare <- names(attr(inputs[[input]], 'Dimensions'))
if (!all(attr(step_fun, 'TargetDims')[[input]] %in% dims_to_compare)) {
stop("The target dimensions required by 'step_fun' for the input ", input,
" are not present in the corresponding provided object in 'inputs'.")
#----------------NEW---------------
if (multi_steps) {
for (tmp in step_fun) {
if (!all(attr(tmp, 'TargetDims')[[input]] %in% dims_to_compare)) {
stop("The target dimensions required by 'step_fun' for the input ", input,
" are not present in the corresponding provided object in 'inputs'.")
}
}
} else {
if (!all(attr(step_fun, 'TargetDims')[[input]] %in% dims_to_compare)) {
stop("The target dimensions required by 'step_fun' for the input ", input,
" are not present in the corresponding provided object in 'inputs'.")
}
}
#-----------NEW_END------------------
if ('startR_workflow' %in% class(inputs[[input]])) {
if (is.null(previous_target_dims)) {
previous_target_dims <- attr(inputs[[input]], 'TargetDims')
......@@ -111,39 +165,61 @@ AddStep <- function(inputs, step_fun, ...) {
all_input_dims <- .MergeArrayDims(all_input_dims, new_input_dims)[[1]]
}
}
new_target_dims <- unique(unlist(attr(step_fun, 'TargetDims')))
#---------------NEW----------------
# if multiple steps, need for loop over each step
if (multi_steps) {
step_funs <- step_fun
} else {
step_funs <- list(step_fun)
}
result <- list()
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(new_target_dims))
names(dims2) <- new_target_dims
target_dims <- names(.MergeArrayDims(dims1, dims2)[[1]])
for (output in 1:length(attr(step_fun, 'OutputDims'))) {
workflow <- list(inputs = inputs,
fun = step_fun,
params = list(...))
if (!is.null(attr(step_fun, 'OutputDims')[[output]])) {
dimensions <- rep(NA, length(attr(step_fun, 'OutputDims')[[output]]))
names(dimensions) <- attr(step_fun, 'OutputDims')[[output]]
} else {
dimensions <- NULL
}
in_dims_to_remove <- which(names(all_input_dims) %in% new_target_dims)
if (length(in_dims_to_remove) > 0) {
dimensions <- c(dimensions, all_input_dims[-in_dims_to_remove])
} else {
dimensions <- c(dimensions, all_input_dims)
}
attr(workflow, 'Dimensions') <- dimensions
step_fun_count <- 1
for (step_fun in step_funs) {
new_target_dims <- unique(unlist(attr(step_fun, 'TargetDims')))
dims1 <- rep(1, length(previous_target_dims))
names(dims1) <- previous_target_dims
dims2 <- rep(1, length(new_target_dims))
names(dims2) <- new_target_dims
target_dims <- names(.MergeArrayDims(dims1, dims2)[[1]])
#NOTE: Stop saving each output one by one. Since the info is the same,
# just save once, and the list is $step1 - $inputs, $fun, $params.
# for (output in 1:length(attr(step_fun, 'OutputDims'))) {
workflow <- list(inputs = inputs,
fun = step_fun,
params = list(...))
# if (!is.null(attr(step_fun, 'OutputDims')[[output]])) {
# dimensions <- rep(NA, length(attr(step_fun, 'OutputDims')[[output]]))
# names(dimensions) <- attr(step_fun, 'OutputDims')[[output]]
# } else {
# dimensions <- NULL
# }
# in_dims_to_remove <- which(names(all_input_dims) %in% new_target_dims)
# if (length(in_dims_to_remove) > 0) {
# dimensions <- c(dimensions, all_input_dims[-in_dims_to_remove])
# } else {
# dimensions <- c(dimensions, all_input_dims)
# }
#NOTE: This Dimensions attribute is not used afterward.
# attr(workflow, 'Dimensions') <- dimensions
attr(workflow, 'AllTargetDims') <- target_dims
class(workflow) <- 'startR_workflow'
result[[names(attr(step_fun, 'OutputDims'))[output]]] <- workflow
# result[[step_name[step_fun_count]]][[names(attr(step_fun, 'OutputDims'))[output]]] <- workflow
result[[step_name[step_fun_count]]] <- workflow
# }
step_fun_count <- step_fun_count + 1
}
if (length(result) == 1) {
result[[1]]
} else {
result
if (!multi_steps) {
result <- result[[1]] # remove $step1
# if (length(result) == 1) {
# result <- result[[1]] # remove $output1
# }
}
return(result)
#-----------NEW_END------------------
}
......@@ -89,12 +89,23 @@ Compute <- function(workflow, chunks = 'auto',
ecflow_server = NULL, silent = FALSE, debug = FALSE,
wait = TRUE) {
# Check workflow
if (!any(c('startR_cube', 'startR_workflow') %in% class(workflow))) {
stop("Parameter 'workflow' must be an object of class 'startR_cube' as ",
"returned by Start or of class 'startR_workflow' as returned by ",
"AddStep.")
#-------------NEW--------------
if (class(workflow) == 'list') { # multiple steps
if (!all(sapply(workflow, class) %in% c('startR_cube', 'startR_workflow'))) {
stop("Parameter 'workflow' must be an object of class 'startR_cube' as ",
"returned by Start or of class 'startR_workflow' or a list of it ",
"as returned by AddStep.")
}
} else {
if (!any(c('startR_cube', 'startR_workflow') %in% class(workflow))) {
stop("Parameter 'workflow' must be an object of class 'startR_cube' as ",
"returned by Start or of class 'startR_workflow' as returned by ",
"AddStep.")
}
}
#-----------NEW_END--------------
if ('startR_cube' %in% class(workflow)) {
#machine_free_ram <- 1000000000
#max_ram_ratio <- 0.5
......
......@@ -56,11 +56,16 @@
#'@export
Step <- function(fun, target_dims, output_dims,
use_libraries = NULL, use_attributes = NULL) {
# Check fun
if (!is.function(fun)) {
stop("Parameter 'fun' must be a function.")
}
#----------------NEW-------------------
# Save function name as attribute first. It will be used in AddStep().
attr(fun, 'FunName') <- deparse(substitute(fun))
#---------------NEW_END------------------
# Check target_dims
if (is.character(target_dims)) {
target_dims <- list(target_dims)
......@@ -131,7 +136,7 @@ Step <- function(fun, target_dims, output_dims,
attr(fun, 'OutputDims') <- output_dims
attr(fun, 'UseLibraries') <- use_libraries
attr(fun, 'UseAttributes') <- use_attributes
# TODO: Add provenance info
class(fun) <- 'startR_step_fun'
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment