From 7bb9c79c7a80cfc6f3d2f8d17c2c98a26441661e Mon Sep 17 00:00:00 2001 From: Nicolau Manubens Date: Tue, 5 Feb 2019 19:36:29 +0100 Subject: [PATCH 1/2] Enhancement in polling. --- R/Collect.R | 3 ++- R/Compute.R | 6 +++--- R/Utils.R | 22 ++++++++++++++++++++-- inst/chunking/load_process_save_chunk.R | 6 +++++- 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/R/Collect.R b/R/Collect.R index 44cfda5..2325d55 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -43,7 +43,8 @@ Collect <- function(startr_exec, wait = TRUE, remove = TRUE) { } done <- FALSE attempt <- 1 - sum_received_chunks <- 0 + sum_received_chunks <- sum(grepl('output.*\\.Rds', + list.files(ecflow_suite_dir_suite))) if (cluster[['bidirectional']]) { t_transfer_back <- NA } else { diff --git a/R/Compute.R b/R/Compute.R index 29d6be0..69c1bce 100644 --- a/R/Compute.R +++ b/R/Compute.R @@ -4,13 +4,13 @@ Compute <- function(workflow, chunks = 'auto', ecflow_server = NULL, silent = FALSE, debug = FALSE, wait = TRUE) { # Check workflow - if (!any(c('startR_cube', 'startR_workflow') %in% class(workflow))) { - stop("Parameter 'workflow' must be an object of class 'startR_cube' as ", + if (!any(c('startR_header', 'startR_workflow') %in% class(workflow))) { + stop("Parameter 'workflow' must be an object of class 'startR_header' as ", "returned by Start or of class 'startR_workflow' as returned by ", "AddStep.") } - if ('startR_cube' %in% class(workflow)) { + if ('startR_header' %in% class(workflow)) { #machine_free_ram <- 1000000000 #max_ram_ratio <- 0.5 #data_size <- prod(c(attr(workflow, 'Dimensions'), 8)) diff --git a/R/Utils.R b/R/Utils.R index 0d1fdc6..699468a 100644 --- a/R/Utils.R +++ b/R/Utils.R @@ -742,8 +742,26 @@ chunk <- function(chunk, n_chunks, selectors) { found_chunk <- which(found_chunks_str == paste(chunk_indices_on_file, collapse = '_'))[1] if (length(found_chunk) > 0) { - array_of_chunks[[i]] <- readRDS(paste0(shared_dir, '/', - chunk_files_original[found_chunk])) + num_tries <- 5 + found <- FALSE + try_num <- 1 + while ((try_num <= num_tries) && !found) { + array_of_chunks[[i]] <- try({ + readRDS(paste0(shared_dir, '/', + chunk_files_original[found_chunk])) + }) + if (('try-error' %in% class(array_of_chunks[[i]]))) { + message("Waiting for an incomplete file transfer...") + Sys.sleep(5) + } else { + found <- TRUE + } + try_num <- try_num + 1 + } + if (!found) { + stop("Could not open one of the chunks. Might be a large chunk ", + "in transfer. Merge aborted, files have been preserved.") + } } } diff --git a/inst/chunking/load_process_save_chunk.R b/inst/chunking/load_process_save_chunk.R index 8a5843c..a8a31a8 100644 --- a/inst/chunking/load_process_save_chunk.R +++ b/inst/chunking/load_process_save_chunk.R @@ -104,7 +104,11 @@ for (component in names(res)) { for (i in 1:total_specified_dims) { filename <- paste0(filename, param_dimnames[i], '_', chunk_indices[i], '__') } - saveRDS(res[[component]], file = paste0(out_dir, '/', filename, '.Rds')) + # Saving in a temporary file, then renaming. This way, the polling mechanism + # won't transfer back results before the save is completed. + saveRDS(res[[component]], file = paste0(out_dir, '/', filename, '.Rds.tmp')) + file.rename(paste0(out_dir, '/', filename, '.Rds.tmp'), + paste0(out_dir, '/', filename, '.Rds')) } rm(res) gc() -- GitLab From 041ef18db7687fae50a459047f00355ecc07f68d Mon Sep 17 00:00:00 2001 From: Nicolau Manubens Date: Tue, 5 Feb 2019 19:40:03 +0100 Subject: [PATCH 2/2] Properly naming cubes. --- R/AddStep.R | 8 ++++---- R/ByChunks.R | 6 +++--- R/Compute.R | 8 ++++---- R/Start.R | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/R/AddStep.R b/R/AddStep.R index fece572..c34e1b0 100644 --- a/R/AddStep.R +++ b/R/AddStep.R @@ -5,20 +5,20 @@ AddStep <- function(inputs, step_fun, ...) { } # Check inputs - if (any(c('startR_header', 'startR_workflow') %in% class(inputs))) { + if (any(c('startR_cube', 'startR_workflow') %in% class(inputs))) { inputs <- list(inputs) names(inputs) <- 'input1' } if (is.list(inputs)) { if (any(!sapply(inputs, - function(x) any(c('startR_header', + function(x) any(c('startR_cube', 'startR_workflow') %in% class(x))))) { stop("Parameter 'inputs' must be one or a list of objects of the class ", - "'startR_header' or 'startR_workflow'.") + "'startR_cube' or 'startR_workflow'.") } } else { stop("Parameter 'inputs' must be one or a list of objects of the class ", - "'startR_header' or 'startR_workflow'.") + "'startR_cube' or 'startR_workflow'.") } # Consistency checks diff --git a/R/ByChunks.R b/R/ByChunks.R index 933fc71..f9b4027 100644 --- a/R/ByChunks.R +++ b/R/ByChunks.R @@ -27,12 +27,12 @@ ByChunks <- function(step_fun, cube_headers, ..., chunks = 'auto', MergeArrays <- startR:::.MergeArrays # Check input headers - if ('startR_header' %in% class(cube_headers)) { + if ('startR_cube' %in% class(cube_headers)) { cube_headers <- list(cube_headers) } if (!all(sapply(lapply(cube_headers, class), - function(x) 'startR_header' %in% x))) { - stop("All objects passed in 'cube_headers' must be of class 'startR_header', ", + function(x) 'startR_cube' %in% x))) { + stop("All objects passed in 'cube_headers' must be of class 'startR_cube', ", "as returned by Start().") } diff --git a/R/Compute.R b/R/Compute.R index 69c1bce..12c2dce 100644 --- a/R/Compute.R +++ b/R/Compute.R @@ -4,13 +4,13 @@ Compute <- function(workflow, chunks = 'auto', ecflow_server = NULL, silent = FALSE, debug = FALSE, wait = TRUE) { # Check workflow - if (!any(c('startR_header', 'startR_workflow') %in% class(workflow))) { - stop("Parameter 'workflow' must be an object of class 'startR_header' as ", + if (!any(c('startR_cube', 'startR_workflow') %in% class(workflow))) { + stop("Parameter 'workflow' must be an object of class 'startR_cube' as ", "returned by Start or of class 'startR_workflow' as returned by ", "AddStep.") } - if ('startR_header' %in% class(workflow)) { + if ('startR_cube' %in% class(workflow)) { #machine_free_ram <- 1000000000 #max_ram_ratio <- 0.5 #data_size <- prod(c(attr(workflow, 'Dimensions'), 8)) @@ -55,7 +55,7 @@ Compute <- function(workflow, chunks = 'auto', attr(workflow$fun, 'UseLibraries'), attr(workflow$fun, 'UseAttributes')) - if (!all(sapply(workflow$inputs, class) == 'startR_header')) { + if (!all(sapply(workflow$inputs, class) == 'startR_cube')) { stop("Workflows with only one step supported by now.") } # Run ByChunks with the combined operation diff --git a/R/Start.R b/R/Start.R index 168fae7..bac9c66 100644 --- a/R/Start.R +++ b/R/Start.R @@ -2868,7 +2868,7 @@ print(str(picked_vars)) FileSelectors = file_selectors, PatternDim = found_pattern_dim) ) - attr(data_array, 'class') <- c('startR_cube', attr(data_array, 'class')) + attr(data_array, 'class') <- c('startR_array', attr(data_array, 'class')) data_array } else { if (!silent) { @@ -2898,7 +2898,7 @@ print(str(picked_vars)) NULL }) ) - attr(start_call, 'class') <- c('startR_header', attr(start_call, 'class')) + attr(start_call, 'class') <- c('startR_cube', attr(start_call, 'class')) start_call } } -- GitLab