diff --git a/R/AddStep.R b/R/AddStep.R index fece57225aec829518468ceb4911c4e90d5299f5..c34e1b0c8b03dc545c7cffe5e63363a33de1fac4 100644 --- a/R/AddStep.R +++ b/R/AddStep.R @@ -5,20 +5,20 @@ AddStep <- function(inputs, step_fun, ...) { } # Check inputs - if (any(c('startR_header', 'startR_workflow') %in% class(inputs))) { + if (any(c('startR_cube', 'startR_workflow') %in% class(inputs))) { inputs <- list(inputs) names(inputs) <- 'input1' } if (is.list(inputs)) { if (any(!sapply(inputs, - function(x) any(c('startR_header', + function(x) any(c('startR_cube', 'startR_workflow') %in% class(x))))) { stop("Parameter 'inputs' must be one or a list of objects of the class ", - "'startR_header' or 'startR_workflow'.") + "'startR_cube' or 'startR_workflow'.") } } else { stop("Parameter 'inputs' must be one or a list of objects of the class ", - "'startR_header' or 'startR_workflow'.") + "'startR_cube' or 'startR_workflow'.") } # Consistency checks diff --git a/R/ByChunks.R b/R/ByChunks.R index 933fc71f9147585df7283f92d6bc99625abaf5e5..f9b4027c510ec1abf7fa7d8cef3323f59f689c3f 100644 --- a/R/ByChunks.R +++ b/R/ByChunks.R @@ -27,12 +27,12 @@ ByChunks <- function(step_fun, cube_headers, ..., chunks = 'auto', MergeArrays <- startR:::.MergeArrays # Check input headers - if ('startR_header' %in% class(cube_headers)) { + if ('startR_cube' %in% class(cube_headers)) { cube_headers <- list(cube_headers) } if (!all(sapply(lapply(cube_headers, class), - function(x) 'startR_header' %in% x))) { - stop("All objects passed in 'cube_headers' must be of class 'startR_header', ", + function(x) 'startR_cube' %in% x))) { + stop("All objects passed in 'cube_headers' must be of class 'startR_cube', ", "as returned by Start().") } diff --git a/R/Collect.R b/R/Collect.R index 44cfda5d4fed0d5fc4e16629d871b659bc58befa..2325d55fcd8834392d647a76c658c04630e17751 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -43,7 +43,8 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { } done <- FALSE attempt <- 1 - sum_received_chunks <- 0 + sum_received_chunks <- sum(grepl('output.*\\.Rds', + list.files(ecflow_suite_dir_suite))) if (cluster[['bidirectional']]) { t_transfer_back <- NA } else { diff --git a/R/Compute.R b/R/Compute.R index 29d6be023f661af92e6da63fabb709818f05b9be..12c2dcea6cfcfa923f009d37746f2c263b94e5fd 100644 --- a/R/Compute.R +++ b/R/Compute.R @@ -55,7 +55,7 @@ Compute <- function(workflow, chunks = 'auto', attr(workflow$fun, 'UseLibraries'), attr(workflow$fun, 'UseAttributes')) - if (!all(sapply(workflow$inputs, class) == 'startR_header')) { + if (!all(sapply(workflow$inputs, class) == 'startR_cube')) { stop("Workflows with only one step supported by now.") } # Run ByChunks with the combined operation diff --git a/R/Start.R b/R/Start.R index 168fae710c96085dfd34e22610febae131ea90a9..bac9c6616a8cac0b0c0b1037f5f76f8ddb387515 100644 --- a/R/Start.R +++ b/R/Start.R @@ -2868,7 +2868,7 @@ print(str(picked_vars)) FileSelectors = file_selectors, PatternDim = found_pattern_dim) ) - attr(data_array, 'class') <- c('startR_cube', attr(data_array, 'class')) + attr(data_array, 'class') <- c('startR_array', attr(data_array, 'class')) data_array } else { if (!silent) { @@ -2898,7 +2898,7 @@ print(str(picked_vars)) NULL }) ) - attr(start_call, 'class') <- c('startR_header', attr(start_call, 'class')) + attr(start_call, 'class') <- c('startR_cube', attr(start_call, 'class')) start_call } } diff --git a/R/Utils.R b/R/Utils.R index 0d1fdc65f7309b2801c27077541fa639c2bd72bc..699468a059247a6a9f2b0c11e5d75ccaddcedf7a 100644 --- a/R/Utils.R +++ b/R/Utils.R @@ -742,8 +742,26 @@ chunk <- function(chunk, n_chunks, selectors) { found_chunk <- which(found_chunks_str == paste(chunk_indices_on_file, collapse = '_'))[1] if (length(found_chunk) > 0) { - array_of_chunks[[i]] <- readRDS(paste0(shared_dir, '/', - chunk_files_original[found_chunk])) + num_tries <- 5 + found <- FALSE + try_num <- 1 + while ((try_num <= num_tries) && !found) { + array_of_chunks[[i]] <- try({ + readRDS(paste0(shared_dir, '/', + chunk_files_original[found_chunk])) + }) + if (('try-error' %in% class(array_of_chunks[[i]]))) { + message("Waiting for an incomplete file transfer...") + Sys.sleep(5) + } else { + found <- TRUE + } + try_num <- try_num + 1 + } + if (!found) { + stop("Could not open one of the chunks. Might be a large chunk ", + "in transfer. Merge aborted, files have been preserved.") + } } } diff --git a/inst/chunking/load_process_save_chunk.R b/inst/chunking/load_process_save_chunk.R index 8a5843c31819ea3e3ba5e017b4faeedc20012e16..a8a31a888654452737fae18d38ee69a1a59e379d 100644 --- a/inst/chunking/load_process_save_chunk.R +++ b/inst/chunking/load_process_save_chunk.R @@ -104,7 +104,11 @@ for (component in names(res)) { for (i in 1:total_specified_dims) { filename <- paste0(filename, param_dimnames[i], '_', chunk_indices[i], '__') } - saveRDS(res[[component]], file = paste0(out_dir, '/', filename, '.Rds')) + # Saving in a temporary file, then renaming. This way, the polling mechanism + # won't transfer back results before the save is completed. + saveRDS(res[[component]], file = paste0(out_dir, '/', filename, '.Rds.tmp')) + file.rename(paste0(out_dir, '/', filename, '.Rds.tmp'), + paste0(out_dir, '/', filename, '.Rds')) } rm(res) gc()