diff --git a/.Rbuildignore b/.Rbuildignore index 98316ccfd13a0286857a3ce70b87bc08517f8cb6..698f5e2cc314e9d59694eef5f2c6f7fc309cd23b 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -16,4 +16,5 @@ ^.*\.Rproj$ # Automatically added by RStudio, ^\.Rproj\.user$ # used for temporary files. ^cran-comments\.md$ # Comments for CRAN submission +^CONTRIBUTING\.md$ #^NEWS\.md$ # A news file written in Markdown diff --git a/R/ByChunks_autosubmit.R b/R/ByChunks_autosubmit.R index bf63878d861ab1c77f9f1f16073be9052d83cb34..796c01121d6d3b7f4e1dcb57b443af36397e847b 100644 --- a/R/ByChunks_autosubmit.R +++ b/R/ByChunks_autosubmit.R @@ -29,7 +29,7 @@ #' as autosubmit machine. The default value is NULL, and a temporary folder #' under the current working folder will be created. #'@param autosubmit_server A character vector indicating the login node of the -#' autosubmit machine. It can be "bscesautosubmit01" or "bscesautosubmit02". +#' autosubmit machine. It can be "bscesautosubmit03" or "bscesautosubmit04". #' If NULL, Autosubmit will be run locally on the current machine. #' The default value is NULL. #'@param silent A logical value deciding whether to print the computation @@ -165,8 +165,9 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', ## autosubmit_server if (!is.null(autosubmit_server)) { - if (!autosubmit_server %in% c('bscesautosubmit01', 'bscesautosubmit02')) { - stop("Parameter 'autosubmit_server' must be one existing Autosubmit machine login node, 'bscesautosubmit01' or 'bscesautosubmit02'.") + if (!autosubmit_server %in% c('bscesautosubmit03', 'bscesautosubmit04')) { + stop("Parameter 'autosubmit_server' must be one existing Autosubmit ", + "machine login node, 'bscesautosubmit03' or 'bscesautosubmit04'.") } } @@ -192,7 +193,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', default_cluster <- list(queue_host = NULL, # queue_type = 'slurm', data_dir = NULL, -# temp_dir = NULL, + temp_dir = NULL, lib_dir = NULL, init_commands = list(''), r_module = 'R', @@ -236,11 +237,22 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', cluster <- default_cluster ### queue_host - support_hpcs <- c('local', 'nord3', 'nord4') # names in platforms.yml + support_hpcs <- c('local', 'nord3', 'nord4', 'amd', 'mn5') # names in platforms.yml + hostnames <- list(local = "", + nord3 = "nord4.bsc.es", + nord4 = "n4login1.bsc.es", + amd = "amdlogin1.bsc.es", + mn5 = "glogin1.bsc.es") if (is.null(cluster$queue_host) || !cluster$queue_host %in% support_hpcs) { stop("Cluster component 'queue_host' must be one of the following: ", paste(support_hpcs, collapse = ','), '.') } + ### hostname + if (is.null(cluster$hostname)) { + cluster$hostname <- hostnames[[cluster$queue_host]] + } else { + warning("Taking user-defined hostname for HPC platform.") + } ### data_dir is_data_dir_shared <- FALSE @@ -388,12 +400,17 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', stop("Cluster component 'hpc_user' must be a character string.") } ### run_dir + is_autosubmit_suite_dir_shared <- TRUE if (!is.null(cluster$run_dir)) { if (!dir.exists(cluster$run_dir)) { - stop("Cluster component 'run_dir' ", cluster$run_dir," is not found.") + warning("Cluster component 'run_dir' ", cluster$run_dir, " not found.", + " It will be created in the remote cluster.") + is_autosubmit_suite_dir_shared <- FALSE } } + + #============================================== autosubmit_suite_dir_suite <- paste0(autosubmit_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') @@ -404,9 +421,13 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', stop("Could not find or create the directory in parameter 'autosubmit_suite_dir'.") } - remote_autosubmit_suite_dir <- file.path("/esarchive/autosubmit/", suite_id, 'proj') + if (!is.null(cluster[['run_dir']])) { + remote_autosubmit_suite_dir <- cluster[['run_dir']] + } else { + remote_autosubmit_suite_dir <- file.path("/esarchive/autosubmit/", suite_id, 'proj') + } remote_autosubmit_suite_dir_suite <- paste0(remote_autosubmit_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') - + cluster[['run_dir']] <- remote_autosubmit_suite_dir_suite # Work out chunked dimensions and target dimensions all_dims <- lapply(cube_headers, attr, 'Dimensions') @@ -565,6 +586,13 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', # Modify conf files from template and rewrite to /esarchive/autosubmit/expid/conf/ write_autosubmit_confs(chunks, cluster, autosubmit_suite_dir) + + if (!is_autosubmit_suite_dir_shared) { + system(paste0("ssh ", cluster[['hpc_user']], "@", cluster[['hostname']], + " 'mkdir -p remote_autosubmit_suite_dir'; rsync -r ", + autosubmit_suite_dir, "/. ", cluster[['hpc_user']], "@", cluster[['hostname']], + ":", remote_autosubmit_suite_dir, " ; sleep 10")) + } # Iterate through chunks chunk_array <- array(1:prod(unlist(chunks)), dim = (unlist(chunks))) @@ -596,7 +624,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', for (cube_header in 1:length(cube_headers)) { expected_files <- attr(cube_headers[[cube_header]], 'ExpectedFiles') #files_to_check <- c(files_to_check, expected_files) - #if (cluster[['special_setup']] == 'marenostrum4') { + #if (cluster[['special_setup']] == 'gpfs') { # expected_files <- paste0('/gpfs/archive/bsc32/', expected_files) #} files_to_send <- c(files_to_send, expected_files) @@ -653,7 +681,6 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', if ((is.null(autosubmit_server)) || (gsub('[[:digit:]]', "", Sys.getenv('HOSTNAME')) == 'bscesautosubmit')) { # If autosubmit_server is NULL or we are already on bscesautosubmit0x - #NOTE: If we ssh to AS VM and run everything there, we don't need to ssh here system(sys_commands) } else { @@ -669,10 +696,13 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', # Remove bigmemory objects (e.g., a68h_1_1 and a68h_1_1.desc) if they exist # If run_dir is specified, the files are under run_dir; if not, files are under proj/STARTR_CHUNKING_xxxx/ if (!is.null(cluster[['run_dir']])) { - file.remove( - file.path(cluster[['run_dir']], - list.files(cluster[['run_dir']])[grepl(paste0("^", suite_id, "_.*"), list.files(cluster[['run_dir']]))]) - ) + if (is_autosubmit_suite_dir_shared) { + file.remove( + file.path(cluster[['run_dir']], + list.files(cluster[['run_dir']])[grepl(paste0("^", suite_id, "_.*"), list.files(cluster[['run_dir']]))]) + ) + } + ## TODO: Remove files if run_dir is not in esarchive } else { file.remove( file.path(remote_autosubmit_suite_dir_suite, @@ -692,8 +722,11 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', class(startr_exec) <- 'startR_exec' if (wait) { - result <- Collect(startr_exec, wait = TRUE, remove = T) - .message("Computation ended successfully.") + result <- Collect(startr_exec, + wait = TRUE, + remove = TRUE, + on_remote = !is_autosubmit_suite_dir_shared) + .message("Computation ended successfully.") return(result) } else { diff --git a/R/Collect.R b/R/Collect.R index 5ae8b150626c815f713af279f4dbf244502d787a..49b38917522dd6157e12a3f8659405477bcf30bd 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -417,22 +417,45 @@ Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE, on_remot remote_autosubmit_suite_dir <- file.path("/esarchive/autosubmit/", suite_id, 'proj') remote_autosubmit_suite_dir_suite <- paste0(remote_autosubmit_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') run_dir <- startr_exec$cluster[['run_dir']] + hpc_user <- startr_exec$cluster[['hpc_user']] + hostname <- startr_exec$cluster[['hostname']] done <- FALSE while (!done) { # If wait, try until it is done - sum_received_chunks <- sum(grepl('.*\\.Rds$', list.files(remote_autosubmit_suite_dir_suite))) - if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { - done <- TRUE + if (!on_remote) { + sum_received_chunks <- sum(grepl('.*\\.Rds$', list.files(remote_autosubmit_suite_dir_suite))) + if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { + done <- TRUE - } else if (!wait) { - stop("Computation in progress...") + } else if (!wait) { + stop("Computation in progress...") + } else { + message("Computation in progress, ", sum_received_chunks, " of ", prod(unlist(chunks)), " chunks are done...\n", + "Check status on Autosubmit GUI: https://earth.bsc.es/autosubmitapp/experiment/", suite_id) + Sys.sleep(startr_exec$cluster[['polling_period']]) + } } else { - message("Computation in progress, ", sum_received_chunks, " of ", prod(unlist(chunks)), " chunks are done...\n", - "Check status on Autosubmit GUI: https://earth.bsc.es/autosubmitapp/experiment/", suite_id) - Sys.sleep(startr_exec$cluster[['polling_period']]) + # Execution on remote server + files_in_remote_dir <- system(paste0("ssh ", hpc_user, "@", hostname, " 'ls ", + run_dir, "'"), + intern = TRUE) + sum_received_chunks <- sum(grepl('.*\\.Rds$', files_in_remote_dir)) + if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { + done <- TRUE + # Transfer files back + message("Computation finished on cluster, retrieving files...") + files_to_retrieve <- paste0(run_dir, files_in_remote_dir[grepl('.*\\.Rds$', files_in_remote_dir)]) + system(paste0("rsync -avz --remove-source-files -e ssh ", hpc_user, "@", hostname, ":", run_dir, "/* ", + remote_autosubmit_suite_dir_suite)) + } else if (!wait) { + stop("Computation in progress...") + } else { + message("Computation in progress, ", sum_received_chunks, " of ", prod(unlist(chunks)), " chunks are done...\n", + "Check status on Autosubmit GUI: https://earth.bsc.es/autosubmitapp/experiment/", suite_id) + Sys.sleep(startr_exec$cluster[['polling_period']]) + } } - } # while !done result <- .MergeChunks(remote_autosubmit_suite_dir, suite_id, remove = remove) diff --git a/R/Utils.R b/R/Utils.R index 28840e1968002b8218c8767fcb9e2f6630b1b346..af1cd079a16fd4839d953e3b1f35cc880265f6a0 100644 --- a/R/Utils.R +++ b/R/Utils.R @@ -1028,6 +1028,7 @@ write_autosubmit_confs <- function(chunks, cluster, autosubmit_suite_dir) { if (tolower(cluster$queue_host) != "local") { conf$Platforms[[cluster$queue_host]]$USER <- cluster$hpc_user conf$Platforms[[cluster$queue_host]]$PROCESSORS_PER_NODE <- as.integer(cluster$cores_per_job) + conf$Platforms[[cluster$queue_host]]$HOSTNAME <- cluster$hostname if (!is.null(cluster$extra_queue_params)) { tmp <- unlist(cluster$extra_queue_params) for (ii in 1:length(tmp)) { diff --git a/inst/chunking/Autosubmit/startR_autosubmit.sh b/inst/chunking/Autosubmit/startR_autosubmit.sh index be8ce1d5ef754b737b2b8d8bde4950111f18b8a7..14931eed1136ad783573a8bf9edc59db53fbe060 100644 --- a/inst/chunking/Autosubmit/startR_autosubmit.sh +++ b/inst/chunking/Autosubmit/startR_autosubmit.sh @@ -22,5 +22,5 @@ include_module_load cd_run_dir #e.g., Rscript load_process_save_chunk_autosubmit.R --args $task_path 1 1 1 1 2 2 1 1 1 2 1 2 -Rscript ${proj_dir}/load_process_save_chunk_autosubmit.R --args ${task_path} ${chunk_args[@]} +Rscript load_process_save_chunk_autosubmit.R --args ${task_path} ${chunk_args[@]} diff --git a/inst/doc/usecase/ex2_15_run_on_gpfs.R b/inst/doc/usecase/ex2_15_run_on_gpfs.R new file mode 100644 index 0000000000000000000000000000000000000000..c89a703d0ed6effd5543b10a2b5e3866f7b9f2d5 --- /dev/null +++ b/inst/doc/usecase/ex2_15_run_on_gpfs.R @@ -0,0 +1,83 @@ +# Author: Victòria Agudetse Roures, Sara Moreno, Núria Pérez Zanón +# Date: August 2025 +# ------------------------------------------------------------------ + +# ----------------------------------------------------------------- +# Running Compute() on a machine with a different filesystem. +# +# In this example, we use Compute() to apply a function to some data +# using an HPC cluster that does not share a filesystem with our local +# machine. The data files are available on our local machine *and* on the +# cluster, under different folders ('local_path' and 'cluster_path'). +# We show how to change the paths of the files so that startR can read the +# metadata from our local machine and then load the correct files on the +# remote cluster. +# ------------------------------------------------------------------ + +library(startR) + +# Define the local path and the path on the cluster +local_path <- "/esarchive/" +cluster_path <- "/gpfs/projects/bsc32/esarchive_cache/" + +# Step 1: Load the data in esarchive with retrieve = FALSE +repos <- '/esarchive/exp/ecmwf/system51c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc' +data <- Start(dat = repos, + var = 'tas', + sdate = c('20170101', '20180101'), + ensemble = indices(1:20), + time = 'all', + lat = 'all', + lon = indices(1:40), + return_vars = list(lat = 'dat', lon = 'dat', time = 'sdate'), + retrieve = FALSE) + +# Step 2: Replace the esarchive path pattern with the gpfs path pattern +file_dims <- dim(attr(data, "ExpectedFiles")) +new_files <- sapply(attr(data, "ExpectedFiles"), + function(x) { + gsub(pattern = local_path, replacement = cluster_path, x) + }) +dim(new_files) <- file_dims +attr(data, "ExpectedFiles") <- new_files +data[2] <- gsub(pattern = local_path, replacement = cluster_path, x = data[[2]]) + +# Step 3: Define function and step and run Compute() +fun_spring <- function(x) { + stop() + y <- s2dv::Season(x, time_dim = 'time', monini = 1, moninf = 3, monsup = 5) + return(y) +} + +step1 <- Step(fun = fun_spring, + target_dims = c('var', 'time'), + output_dims = c('var', 'time')) + +wf1 <- AddStep(data, step1) + +#-----------modify according to your personal info--------- + queue_host <- 'amd' + run_dir <- '/gpfs/scratch/bsc32/bsc032762/startR_hpc/' # temporary directory to use on cluster + autosubmit_suite_dir <- '/esarchive/scratch/vagudets/startR_local/' # your own local directory + autosubmit_expid <- 'a9p5' # your autosubmit expid (see documentation) +#------------------------------------------------------------ + res <- Compute(workflow = wf1, + chunks = list(ensemble = 2, sdate = 2), + threads_load = 2, + threads_compute = 4, + cluster = list( + queue_host = queue_host, + expid = autosubmit_expid, + hpc_user = "bsc032762", + autosubmit_module = 'autosubmit/4.1.14-foss-2023b-Python-3.11.5', + cores_per_job = 4, + job_wallclock = '01:00', + max_jobs = 100, + run_dir = run_dir + ), + workflow_manager = 'autosubmit', + autosubmit_suite_dir = autosubmit_suite_dir, + autosubmit_server = 'bscesautosubmit03', + wait = TRUE + ) +