From 8c5ba084ba66c72f936b02c264e86d88b261f742 Mon Sep 17 00:00:00 2001 From: vagudets Date: Fri, 1 Aug 2025 12:38:41 +0200 Subject: [PATCH 1/9] Run Compute() with Autosubmit on MN5 and CTE-AMD (WIP) --- R/ByChunks_autosubmit.R | 67 +++++++++++++------ R/Collect.R | 42 +++++++++--- R/Utils.R | 6 +- inst/chunking/Autosubmit/startR_autosubmit.sh | 2 +- 4 files changed, 85 insertions(+), 32 deletions(-) diff --git a/R/ByChunks_autosubmit.R b/R/ByChunks_autosubmit.R index bf63878d..795cd8ea 100644 --- a/R/ByChunks_autosubmit.R +++ b/R/ByChunks_autosubmit.R @@ -29,7 +29,7 @@ #' as autosubmit machine. The default value is NULL, and a temporary folder #' under the current working folder will be created. #'@param autosubmit_server A character vector indicating the login node of the -#' autosubmit machine. It can be "bscesautosubmit01" or "bscesautosubmit02". +#' autosubmit machine. It can be "bscesautosubmit03" or "bscesautosubmit04". #' If NULL, Autosubmit will be run locally on the current machine. #' The default value is NULL. #'@param silent A logical value deciding whether to print the computation @@ -165,8 +165,9 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', ## autosubmit_server if (!is.null(autosubmit_server)) { - if (!autosubmit_server %in% c('bscesautosubmit01', 'bscesautosubmit02')) { - stop("Parameter 'autosubmit_server' must be one existing Autosubmit machine login node, 'bscesautosubmit01' or 'bscesautosubmit02'.") + if (!autosubmit_server %in% c('bscesautosubmit03', 'bscesautosubmit04')) { + stop("Parameter 'autosubmit_server' must be one existing Autosubmit ", + "machine login node, 'bscesautosubmit03' or 'bscesautosubmit04'.") } } @@ -192,7 +193,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', default_cluster <- list(queue_host = NULL, # queue_type = 'slurm', data_dir = NULL, -# temp_dir = NULL, + temp_dir = NULL, lib_dir = NULL, init_commands = list(''), r_module = 'R', @@ -236,7 +237,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', cluster <- default_cluster ### queue_host - support_hpcs <- c('local', 'nord3', 'nord4') # names in platforms.yml + support_hpcs <- c('local', 'nord3', 'nord4', 'amd', 'mn5') # names in platforms.yml if (is.null(cluster$queue_host) || !cluster$queue_host %in% support_hpcs) { stop("Cluster component 'queue_host' must be one of the following: ", paste(support_hpcs, collapse = ','), '.') @@ -353,7 +354,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', if (is.null(cluster[['special_setup']])) { cluster[['special_setup']] <- 'none' } - if (!(cluster[['special_setup']] %in% c('none', 'marenostrum4', 'nord4'))) { + if (!(cluster[['special_setup']] %in% c('none', 'gpfs', 'nord4'))) { stop("The value provided for the component 'special_setup' of the parameter ", "'cluster' is not recognized.") } @@ -387,12 +388,13 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', if (!is.null(cluster$hpc_user) && (!is.character(cluster$hpc_user) | length(cluster$hpc_user) != 1)) { stop("Cluster component 'hpc_user' must be a character string.") } + ## TODO: Adjust ### run_dir - if (!is.null(cluster$run_dir)) { - if (!dir.exists(cluster$run_dir)) { - stop("Cluster component 'run_dir' ", cluster$run_dir," is not found.") - } - } + ## if (!is.null(cluster$run_dir)) { + ## if (!dir.exists(cluster$run_dir)) { + ## stop("Cluster component 'run_dir' ", cluster$run_dir," is not found.") + ## } + ## } #============================================== @@ -404,9 +406,15 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', stop("Could not find or create the directory in parameter 'autosubmit_suite_dir'.") } - remote_autosubmit_suite_dir <- file.path("/esarchive/autosubmit/", suite_id, 'proj') + ## TODO: Order + is_autosubmit_suite_dir_shared <- TRUE + if (!is.null(cluster[['run_dir']])) { + remote_autosubmit_suite_dir <- cluster[['run_dir']] + is_autosubmit_suite_dir_shared <- FALSE + } else { + remote_autosubmit_suite_dir <- file.path("/esarchive/autosubmit/", suite_id, 'proj') + } remote_autosubmit_suite_dir_suite <- paste0(remote_autosubmit_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') - # Work out chunked dimensions and target dimensions all_dims <- lapply(cube_headers, attr, 'Dimensions') @@ -502,8 +510,10 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', } # Copy load_process_save_chunk_autosubmit.R into local folder - chunk_script <- file(system.file('chunking/Autosubmit/load_process_save_chunk_autosubmit.R', - package = 'startR')) + ## TODO: Change for testing + chunk_script <- file("/esarchive/scratch/vagudets/repos/startR/inst/chunking/Autosubmit/load_process_save_chunk_autosubmit.R") + ## chunk_script <- file(system.file('chunking/Autosubmit/load_process_save_chunk_autosubmit.R', + ## package = 'startR')) chunk_script_lines <- readLines(chunk_script) close(chunk_script) chunk_script_lines <- gsub('^lib_dir <- *', paste0('lib_dir <- ', @@ -516,7 +526,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', chunk_script_lines) deparsed_calls <- paste0('start_calls <- list(') extra_path <- '' - if (cluster[['special_setup']] == 'marenostrum4') { + if (cluster[['special_setup']] == 'gpfs') { extra_path <- '/gpfs/archive/bsc32/' } for (cube_header in 1:length(cube_headers)) { @@ -565,6 +575,20 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', # Modify conf files from template and rewrite to /esarchive/autosubmit/expid/conf/ write_autosubmit_confs(chunks, cluster, autosubmit_suite_dir) + + ## TODO: Copy files HERE? + command <- paste0("ssh ", cluster[['hpc_user']], "@transfer1.bsc.es 'mkdir -p ", + remote_autosubmit_suite_dir, + ' ; cp -r ', '/gpfs/archive/bsc32/', autosubmit_suite_dir, '/. ', remote_autosubmit_suite_dir, + " ; sleep 10 '") + if (!is_autosubmit_suite_dir_shared) { + system(command) + # system(paste0("ssh ", cluster[['hpc_user']], "@transfer1.bsc.es 'mkdir -p ", + # remote_autosubmit_suite_dir, + # # rsync -Rrav + # ' ; cp -r ', '\'', '/gpfs/archive/bsc32/', autosubmit_suite_dir, '/. \' "', remote_autosubmit_suite_dir, '/"', + # " ; sleep 10 '")) + } # Iterate through chunks chunk_array <- array(1:prod(unlist(chunks)), dim = (unlist(chunks))) @@ -596,7 +620,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', for (cube_header in 1:length(cube_headers)) { expected_files <- attr(cube_headers[[cube_header]], 'ExpectedFiles') #files_to_check <- c(files_to_check, expected_files) - #if (cluster[['special_setup']] == 'marenostrum4') { + #if (cluster[['special_setup']] == 'gpfs') { # expected_files <- paste0('/gpfs/archive/bsc32/', expected_files) #} files_to_send <- c(files_to_send, expected_files) @@ -653,7 +677,6 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', if ((is.null(autosubmit_server)) || (gsub('[[:digit:]]', "", Sys.getenv('HOSTNAME')) == 'bscesautosubmit')) { # If autosubmit_server is NULL or we are already on bscesautosubmit0x - #NOTE: If we ssh to AS VM and run everything there, we don't need to ssh here system(sys_commands) } else { @@ -668,6 +691,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', if (substr(failed_file_size, 1, 1) != 0) { # Remove bigmemory objects (e.g., a68h_1_1 and a68h_1_1.desc) if they exist # If run_dir is specified, the files are under run_dir; if not, files are under proj/STARTR_CHUNKING_xxxx/ + ## TODO: If run_dir is not in esarchive this will not work if (!is.null(cluster[['run_dir']])) { file.remove( file.path(cluster[['run_dir']], @@ -692,8 +716,11 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', class(startr_exec) <- 'startR_exec' if (wait) { - result <- Collect(startr_exec, wait = TRUE, remove = T) - .message("Computation ended successfully.") + result <- Collect(startr_exec, + wait = TRUE, + remove = TRUE, + on_remote = !is_autosubmit_suite_dir_shared) + .message("Computation ended successfully.") return(result) } else { diff --git a/R/Collect.R b/R/Collect.R index 5ae8b150..6dd58165 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -417,22 +417,46 @@ Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE, on_remot remote_autosubmit_suite_dir <- file.path("/esarchive/autosubmit/", suite_id, 'proj') remote_autosubmit_suite_dir_suite <- paste0(remote_autosubmit_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') run_dir <- startr_exec$cluster[['run_dir']] + hpc_user <- startr_exec$cluster[['hpc_user']] done <- FALSE while (!done) { # If wait, try until it is done - sum_received_chunks <- sum(grepl('.*\\.Rds$', list.files(remote_autosubmit_suite_dir_suite))) - if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { - done <- TRUE + if (!on_remote) { + sum_received_chunks <- sum(grepl('.*\\.Rds$', list.files(remote_autosubmit_suite_dir_suite))) + if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { + done <- TRUE - } else if (!wait) { - stop("Computation in progress...") + } else if (!wait) { + stop("Computation in progress...") + } else { + message("Computation in progress, ", sum_received_chunks, " of ", prod(unlist(chunks)), " chunks are done...\n", + "Check status on Autosubmit GUI: https://earth.bsc.es/autosubmitapp/experiment/", suite_id) + Sys.sleep(startr_exec$cluster[['polling_period']]) + } } else { - message("Computation in progress, ", sum_received_chunks, " of ", prod(unlist(chunks)), " chunks are done...\n", - "Check status on Autosubmit GUI: https://earth.bsc.es/autosubmitapp/experiment/", suite_id) - Sys.sleep(startr_exec$cluster[['polling_period']]) + # Execution on remote server + ## TODO: Not use transfer machine + files_in_remote_dir <- system(paste0("ssh ", hpc_user, "@transfer1.bsc.es 'ls ", + run_dir, "'"), + intern = TRUE) + sum_received_chunks <- sum(grepl('.*\\.Rds$', files_in_remote_dir)) + if (sum_received_chunks / num_outputs == prod(unlist(chunks))) { + done <- TRUE + # Transfer files back + message("Computation finished on cluster, retrieving files...") + files_to_retrieve <- paste0(run_dir, files_in_remote_dir[grepl('.*\\.Rds$', files_in_remote_dir)]) + ## TODO: Not use transfer machine + system(paste0("ssh ", hpc_user, "@transfer1.bsc.es 'mv ", + run_dir, "/* /gpfs/archive/bsc32/", remote_autosubmit_suite_dir_suite, "'")) + } else if (!wait) { + stop("Computation in progress...") + } else { + message("Computation in progress, ", sum_received_chunks, " of ", prod(unlist(chunks)), " chunks are done...\n", + "Check status on Autosubmit GUI: https://earth.bsc.es/autosubmitapp/experiment/", suite_id) + Sys.sleep(startr_exec$cluster[['polling_period']]) + } } - } # while !done result <- .MergeChunks(remote_autosubmit_suite_dir, suite_id, remove = remove) diff --git a/R/Utils.R b/R/Utils.R index 28840e19..84a6be49 100644 --- a/R/Utils.R +++ b/R/Utils.R @@ -885,8 +885,10 @@ write_autosubmit_bash <- function(chunks, cluster, autosubmit_suite_dir) { chunk_args[2, ] <- paste0('%JOBS.CHUNK_', n_chunk, '.', chunk_names, '_N%') chunk_args <- paste0('(', paste(c(chunk_args), collapse = ' '), ')') - bash_script_template <- file(system.file('chunking/Autosubmit/startR_autosubmit.sh', - package = 'startR')) + ## TODO: Change back + bash_script_template <- file("/esarchive/scratch/vagudets/repos/startR/inst/chunking/Autosubmit/startR_autosubmit.sh") + ## bash_script_template <- file(system.file('chunking/Autosubmit/startR_autosubmit.sh', + ## package = 'startR')) bash_script_lines <- readLines(bash_script_template) close(bash_script_template) diff --git a/inst/chunking/Autosubmit/startR_autosubmit.sh b/inst/chunking/Autosubmit/startR_autosubmit.sh index be8ce1d5..14931eed 100644 --- a/inst/chunking/Autosubmit/startR_autosubmit.sh +++ b/inst/chunking/Autosubmit/startR_autosubmit.sh @@ -22,5 +22,5 @@ include_module_load cd_run_dir #e.g., Rscript load_process_save_chunk_autosubmit.R --args $task_path 1 1 1 1 2 2 1 1 1 2 1 2 -Rscript ${proj_dir}/load_process_save_chunk_autosubmit.R --args ${task_path} ${chunk_args[@]} +Rscript load_process_save_chunk_autosubmit.R --args ${task_path} ${chunk_args[@]} -- GitLab From 453c901c0f47acd7225d5aed032ad923f2db47aa Mon Sep 17 00:00:00 2001 From: vagudets Date: Thu, 7 Aug 2025 15:27:52 +0200 Subject: [PATCH 2/9] Clean code --- R/ByChunks_autosubmit.R | 46 +++++++++++++++++++++++++---------------- R/Collect.R | 11 ++++++---- R/Utils.R | 1 + 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/R/ByChunks_autosubmit.R b/R/ByChunks_autosubmit.R index 795cd8ea..4f6767f1 100644 --- a/R/ByChunks_autosubmit.R +++ b/R/ByChunks_autosubmit.R @@ -238,10 +238,21 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', ### queue_host support_hpcs <- c('local', 'nord3', 'nord4', 'amd', 'mn5') # names in platforms.yml + hostnames <- list(local = "", + nord3 = "nord4.bsc.es", + nord4 = "n4login1.bsc.es", + amd = "amdlogin1.bsc.es", + mn5 = "glogin1.bsc.es") if (is.null(cluster$queue_host) || !cluster$queue_host %in% support_hpcs) { stop("Cluster component 'queue_host' must be one of the following: ", paste(support_hpcs, collapse = ','), '.') } + ### hostname + if (is.null(cluster$hostname) { + hostname <- hostnames[[cluster$queue_host]] + } else { + warning("Taking user-defined hostname for HPC platform.") + } ### data_dir is_data_dir_shared <- FALSE @@ -388,13 +399,17 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', if (!is.null(cluster$hpc_user) && (!is.character(cluster$hpc_user) | length(cluster$hpc_user) != 1)) { stop("Cluster component 'hpc_user' must be a character string.") } - ## TODO: Adjust ### run_dir - ## if (!is.null(cluster$run_dir)) { - ## if (!dir.exists(cluster$run_dir)) { - ## stop("Cluster component 'run_dir' ", cluster$run_dir," is not found.") - ## } - ## } + is_autosubmit_suite_dir_shared <- TRUE + if (!is.null(cluster$run_dir)) { + if (!dir.exists(cluster$run_dir)) { + warning("Cluster component 'run_dir' ", cluster$run_dir" not found.", + " It will be created in the remote cluster.") + is_autosubmit_suite_dir_shared <- FALSE + } + } + + #============================================== @@ -406,11 +421,8 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', stop("Could not find or create the directory in parameter 'autosubmit_suite_dir'.") } - ## TODO: Order - is_autosubmit_suite_dir_shared <- TRUE if (!is.null(cluster[['run_dir']])) { remote_autosubmit_suite_dir <- cluster[['run_dir']] - is_autosubmit_suite_dir_shared <- FALSE } else { remote_autosubmit_suite_dir <- file.path("/esarchive/autosubmit/", suite_id, 'proj') } @@ -526,7 +538,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', chunk_script_lines) deparsed_calls <- paste0('start_calls <- list(') extra_path <- '' - if (cluster[['special_setup']] == 'gpfs') { + if (cluster[['special_setup']] == 'marenostrum4') { extra_path <- '/gpfs/archive/bsc32/' } for (cube_header in 1:length(cube_headers)) { @@ -576,17 +588,15 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', # Modify conf files from template and rewrite to /esarchive/autosubmit/expid/conf/ write_autosubmit_confs(chunks, cluster, autosubmit_suite_dir) - ## TODO: Copy files HERE? - command <- paste0("ssh ", cluster[['hpc_user']], "@transfer1.bsc.es 'mkdir -p ", - remote_autosubmit_suite_dir, - ' ; cp -r ', '/gpfs/archive/bsc32/', autosubmit_suite_dir, '/. ', remote_autosubmit_suite_dir, - " ; sleep 10 '") + ## TODO: Avoid using transfer machine if (!is_autosubmit_suite_dir_shared) { - system(command) + system(paste0("ssh ", cluster[['hpc_user']], "@", cluster[['hostname']], + " 'mkdir -p remote_autosubmit_suite_dir'; rsync -r", + autosubmit_suite_dir, "/.", " " cluster[['hostname']], ":", remote_autosubmit_suite_dir, + " ; sleep 10")) # system(paste0("ssh ", cluster[['hpc_user']], "@transfer1.bsc.es 'mkdir -p ", # remote_autosubmit_suite_dir, - # # rsync -Rrav - # ' ; cp -r ', '\'', '/gpfs/archive/bsc32/', autosubmit_suite_dir, '/. \' "', remote_autosubmit_suite_dir, '/"', + # ' ; cp -r ', '/gpfs/archive/bsc32/', autosubmit_suite_dir, '/. ', remote_autosubmit_suite_dir, # " ; sleep 10 '")) } diff --git a/R/Collect.R b/R/Collect.R index 6dd58165..61216c96 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -418,6 +418,7 @@ Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE, on_remot remote_autosubmit_suite_dir_suite <- paste0(remote_autosubmit_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') run_dir <- startr_exec$cluster[['run_dir']] hpc_user <- startr_exec$cluster[['hpc_user']] + hostname <- startr_exec$cluster[['hostname']] done <- FALSE @@ -436,8 +437,7 @@ Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE, on_remot } } else { # Execution on remote server - ## TODO: Not use transfer machine - files_in_remote_dir <- system(paste0("ssh ", hpc_user, "@transfer1.bsc.es 'ls ", + files_in_remote_dir <- system(paste0("ssh ", hpc_user, "@", hostname, " 'ls ", run_dir, "'"), intern = TRUE) sum_received_chunks <- sum(grepl('.*\\.Rds$', files_in_remote_dir)) @@ -447,8 +447,11 @@ Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE, on_remot message("Computation finished on cluster, retrieving files...") files_to_retrieve <- paste0(run_dir, files_in_remote_dir[grepl('.*\\.Rds$', files_in_remote_dir)]) ## TODO: Not use transfer machine - system(paste0("ssh ", hpc_user, "@transfer1.bsc.es 'mv ", - run_dir, "/* /gpfs/archive/bsc32/", remote_autosubmit_suite_dir_suite, "'")) + ## Alternative without rsync: 'for file in bar*; do scp "$file" user@server:/destination && rm "$file" ; done' + system(paste0("rsync -avz --remove-source-files -e ssh ", hpc_user, "@", hostname, ":" run_dir, "/* ", + remote_autosubmit_suite_dir_suite)) + ## system(paste0("ssh ", hpc_user, "@transfer1.bsc.es 'mv ", + ## run_dir, "/* /gpfs/archive/bsc32/", remote_autosubmit_suite_dir_suite, "'")) } else if (!wait) { stop("Computation in progress...") } else { diff --git a/R/Utils.R b/R/Utils.R index 84a6be49..5640e855 100644 --- a/R/Utils.R +++ b/R/Utils.R @@ -1030,6 +1030,7 @@ write_autosubmit_confs <- function(chunks, cluster, autosubmit_suite_dir) { if (tolower(cluster$queue_host) != "local") { conf$Platforms[[cluster$queue_host]]$USER <- cluster$hpc_user conf$Platforms[[cluster$queue_host]]$PROCESSORS_PER_NODE <- as.integer(cluster$cores_per_job) + conf$Platforms[[cluster$queue_host]]$HOSTNAME <- cluster$hostname if (!is.null(cluster$extra_queue_params)) { tmp <- unlist(cluster$extra_queue_params) for (ii in 1:length(tmp)) { -- GitLab From 24fae7b2d815419fdd57d944a794cd6708299407 Mon Sep 17 00:00:00 2001 From: vagudets Date: Thu, 7 Aug 2025 16:20:56 +0200 Subject: [PATCH 3/9] Fix typos and 'run_dir' definition --- R/ByChunks_autosubmit.R | 13 +++++++------ R/Collect.R | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/R/ByChunks_autosubmit.R b/R/ByChunks_autosubmit.R index 4f6767f1..617febc7 100644 --- a/R/ByChunks_autosubmit.R +++ b/R/ByChunks_autosubmit.R @@ -248,8 +248,8 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', paste(support_hpcs, collapse = ','), '.') } ### hostname - if (is.null(cluster$hostname) { - hostname <- hostnames[[cluster$queue_host]] + if (is.null(cluster$hostname)) { + cluster$hostname <- hostnames[[cluster$queue_host]] } else { warning("Taking user-defined hostname for HPC platform.") } @@ -403,7 +403,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', is_autosubmit_suite_dir_shared <- TRUE if (!is.null(cluster$run_dir)) { if (!dir.exists(cluster$run_dir)) { - warning("Cluster component 'run_dir' ", cluster$run_dir" not found.", + warning("Cluster component 'run_dir' ", cluster$run_dir, " not found.", " It will be created in the remote cluster.") is_autosubmit_suite_dir_shared <- FALSE } @@ -427,6 +427,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', remote_autosubmit_suite_dir <- file.path("/esarchive/autosubmit/", suite_id, 'proj') } remote_autosubmit_suite_dir_suite <- paste0(remote_autosubmit_suite_dir, '/STARTR_CHUNKING_', suite_id, '/') + cluster[['run_dir']] <- remote_autosubmit_suite_dir_suite # Work out chunked dimensions and target dimensions all_dims <- lapply(cube_headers, attr, 'Dimensions') @@ -591,9 +592,9 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', ## TODO: Avoid using transfer machine if (!is_autosubmit_suite_dir_shared) { system(paste0("ssh ", cluster[['hpc_user']], "@", cluster[['hostname']], - " 'mkdir -p remote_autosubmit_suite_dir'; rsync -r", - autosubmit_suite_dir, "/.", " " cluster[['hostname']], ":", remote_autosubmit_suite_dir, - " ; sleep 10")) + " 'mkdir -p remote_autosubmit_suite_dir'; rsync -r ", + autosubmit_suite_dir, "/. ", cluster[['hpc_user']], "@", cluster[['hostname']], + ":", remote_autosubmit_suite_dir, " ; sleep 10")) # system(paste0("ssh ", cluster[['hpc_user']], "@transfer1.bsc.es 'mkdir -p ", # remote_autosubmit_suite_dir, # ' ; cp -r ', '/gpfs/archive/bsc32/', autosubmit_suite_dir, '/. ', remote_autosubmit_suite_dir, diff --git a/R/Collect.R b/R/Collect.R index 61216c96..be9f6d38 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -448,7 +448,7 @@ Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE, on_remot files_to_retrieve <- paste0(run_dir, files_in_remote_dir[grepl('.*\\.Rds$', files_in_remote_dir)]) ## TODO: Not use transfer machine ## Alternative without rsync: 'for file in bar*; do scp "$file" user@server:/destination && rm "$file" ; done' - system(paste0("rsync -avz --remove-source-files -e ssh ", hpc_user, "@", hostname, ":" run_dir, "/* ", + system(paste0("rsync -avz --remove-source-files -e ssh ", hpc_user, "@", hostname, ":", run_dir, "/* ", remote_autosubmit_suite_dir_suite)) ## system(paste0("ssh ", hpc_user, "@transfer1.bsc.es 'mv ", ## run_dir, "/* /gpfs/archive/bsc32/", remote_autosubmit_suite_dir_suite, "'")) -- GitLab From 72b17815e75d3c205e187be6cc3adf0555ba0dd5 Mon Sep 17 00:00:00 2001 From: vagudets Date: Thu, 7 Aug 2025 16:27:19 +0200 Subject: [PATCH 4/9] Add example --- inst/doc/usecase/ex2_15_margin_run_on_gpfs.R | 70 ++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 inst/doc/usecase/ex2_15_margin_run_on_gpfs.R diff --git a/inst/doc/usecase/ex2_15_margin_run_on_gpfs.R b/inst/doc/usecase/ex2_15_margin_run_on_gpfs.R new file mode 100644 index 00000000..51bf1f31 --- /dev/null +++ b/inst/doc/usecase/ex2_15_margin_run_on_gpfs.R @@ -0,0 +1,70 @@ +# Author: Victòria Agudetse Roures, Sara Moreno, Núria Pérez Zanón +# Date: 7th August 2025 +# ------------------------------------------------------------------ + +# ----------------------------------------------------------------- +# Function working on time dimension e.g.: Season +# ------------------------------------------------------------------ +library(startR) +path <- "/esarchive/scratch/vagudets/repos/startR/R/" +ff <- lapply(list.files(path), function(x) paste0(path, x)) +invisible(lapply(ff, source)) + +repos <- '/esarchive/exp/ecmwf/system51c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc' +data <- Start(dat = repos, + var = 'tas', + sdate = c('20170101', '20180101'), + ensemble = indices(1:20), + time = 'all', + lat = 'all', + lon = indices(1:40), + return_vars = list(lat = 'dat', lon = 'dat', time = 'sdate'), + retrieve = FALSE) + +fun_spring <- function(x) { + y <- s2dv::Season(x, time_dim = 'time', monini = 1, moninf = 3, monsup = 5) + return(y) +} + +step1 <- Step(fun = fun_spring, + target_dims = c('var', 'time'), + output_dims = c('var', 'time')) + + +file_dims <- dim(attr(data, "ExpectedFiles")) +new_files <- sapply(attr(data, "ExpectedFiles"), + function(x) { + gsub("/esarchive/", "/gpfs/projects/bsc32/esarchive_cache/", x) + }) +dim(new_files) <- file_dims +attr(data, "ExpectedFiles") <- new_files +data[2] <- gsub("/esarchive/", "/gpfs/projects/bsc32/esarchive_cache/", data[2]) + +wf1 <- AddStep(data, step1) + +#-----------modify according to your personal info--------- + queue_host <- 'cte-amd' + temp_dir <- '/gpfs/scratch/bsc32/bsc032762/startR_hpc/' + ecflow_suite_dir <- '/gpfs/home/bsc/bsc032762/startR_local/' #your own local directory +#------------------------------------------------------------ + res <- Compute(workflow = wf1, + chunks = list(ensemble = 2, + sdate = 2), + threads_load = 2, + threads_compute = 4, + cluster = list( + queue_host = 'amd', + expid = 'a9p5', + hpc_user = "bsc032762", + autosubmit_module = 'autosubmit/4.1.14-foss-2023b-Python-3.11.5', + cores_per_job = 4, + job_wallclock = '01:00', + max_jobs = 100, + run_dir = '/gpfs/scratch/bsc32/bsc032762/startR_hpc/' + ), + workflow_manager = 'autosubmit', + autosubmit_suite_dir = "/esarchive/scratch/vagudets/startR_local/", + autosubmit_server = 'bscesautosubmit03', + wait = TRUE + ) + -- GitLab From 225a46d0563496906dc2044ac0ce15373d5b2def Mon Sep 17 00:00:00 2001 From: vagudets Date: Thu, 7 Aug 2025 16:27:52 +0200 Subject: [PATCH 5/9] Run unit tests --- .Rbuildignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.Rbuildignore b/.Rbuildignore index d79d3e82..698f5e2c 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -9,7 +9,7 @@ ^inst/doc$ ^\.gitlab-ci\.yml$ ## unit tests should be ignored when building the package for CRAN -^tests$ +#^tests$ ^inst/PlotProfiling\.R$ ^.gitlab$ # Suggested by http://r-pkgs.had.co.nz/package.html -- GitLab From ea6066123ba3beb7dc862ee043601b89fbf8b3e4 Mon Sep 17 00:00:00 2001 From: vagudets Date: Fri, 8 Aug 2025 12:16:39 +0200 Subject: [PATCH 6/9] Use installed autosubmit config files; add/remove TODOs --- R/ByChunks_autosubmit.R | 23 +++++++++-------------- R/Collect.R | 4 ---- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/R/ByChunks_autosubmit.R b/R/ByChunks_autosubmit.R index 617febc7..0dd2bcd4 100644 --- a/R/ByChunks_autosubmit.R +++ b/R/ByChunks_autosubmit.R @@ -523,10 +523,8 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', } # Copy load_process_save_chunk_autosubmit.R into local folder - ## TODO: Change for testing - chunk_script <- file("/esarchive/scratch/vagudets/repos/startR/inst/chunking/Autosubmit/load_process_save_chunk_autosubmit.R") - ## chunk_script <- file(system.file('chunking/Autosubmit/load_process_save_chunk_autosubmit.R', - ## package = 'startR')) + chunk_script <- file(system.file('chunking/Autosubmit/load_process_save_chunk_autosubmit.R', + package = 'startR')) chunk_script_lines <- readLines(chunk_script) close(chunk_script) chunk_script_lines <- gsub('^lib_dir <- *', paste0('lib_dir <- ', @@ -589,16 +587,11 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', # Modify conf files from template and rewrite to /esarchive/autosubmit/expid/conf/ write_autosubmit_confs(chunks, cluster, autosubmit_suite_dir) - ## TODO: Avoid using transfer machine if (!is_autosubmit_suite_dir_shared) { system(paste0("ssh ", cluster[['hpc_user']], "@", cluster[['hostname']], " 'mkdir -p remote_autosubmit_suite_dir'; rsync -r ", autosubmit_suite_dir, "/. ", cluster[['hpc_user']], "@", cluster[['hostname']], ":", remote_autosubmit_suite_dir, " ; sleep 10")) - # system(paste0("ssh ", cluster[['hpc_user']], "@transfer1.bsc.es 'mkdir -p ", - # remote_autosubmit_suite_dir, - # ' ; cp -r ', '/gpfs/archive/bsc32/', autosubmit_suite_dir, '/. ', remote_autosubmit_suite_dir, - # " ; sleep 10 '")) } # Iterate through chunks @@ -702,12 +695,14 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', if (substr(failed_file_size, 1, 1) != 0) { # Remove bigmemory objects (e.g., a68h_1_1 and a68h_1_1.desc) if they exist # If run_dir is specified, the files are under run_dir; if not, files are under proj/STARTR_CHUNKING_xxxx/ - ## TODO: If run_dir is not in esarchive this will not work if (!is.null(cluster[['run_dir']])) { - file.remove( - file.path(cluster[['run_dir']], - list.files(cluster[['run_dir']])[grepl(paste0("^", suite_id, "_.*"), list.files(cluster[['run_dir']]))]) - ) + if (is_autosubmit_suite_dir_shared) { + file.remove( + file.path(cluster[['run_dir']], + list.files(cluster[['run_dir']])[grepl(paste0("^", suite_id, "_.*"), list.files(cluster[['run_dir']]))]) + ) + } + ## TODO: Remove files if run_dir is not in esarchive } else { file.remove( file.path(remote_autosubmit_suite_dir_suite, diff --git a/R/Collect.R b/R/Collect.R index be9f6d38..49b38917 100644 --- a/R/Collect.R +++ b/R/Collect.R @@ -446,12 +446,8 @@ Collect_autosubmit <- function(startr_exec, wait = TRUE, remove = TRUE, on_remot # Transfer files back message("Computation finished on cluster, retrieving files...") files_to_retrieve <- paste0(run_dir, files_in_remote_dir[grepl('.*\\.Rds$', files_in_remote_dir)]) - ## TODO: Not use transfer machine - ## Alternative without rsync: 'for file in bar*; do scp "$file" user@server:/destination && rm "$file" ; done' system(paste0("rsync -avz --remove-source-files -e ssh ", hpc_user, "@", hostname, ":", run_dir, "/* ", remote_autosubmit_suite_dir_suite)) - ## system(paste0("ssh ", hpc_user, "@transfer1.bsc.es 'mv ", - ## run_dir, "/* /gpfs/archive/bsc32/", remote_autosubmit_suite_dir_suite, "'")) } else if (!wait) { stop("Computation in progress...") } else { -- GitLab From 5e0aa4a8cbf16016de57d7522502f867055c4ff6 Mon Sep 17 00:00:00 2001 From: vagudets Date: Mon, 25 Aug 2025 15:42:13 +0200 Subject: [PATCH 7/9] Replace PCICt dependency with ClimProjDiags --- DESCRIPTION | 3 +-- NAMESPACE | 2 +- R/NcDataReader.R | 11 ++++++----- tests/testthat/test-Start-calendar.R | 5 ++++- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 35bb1b9f..e3809c00 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: startR Title: Automatically Retrieve Multidimensional Distributed Data Sets -Version: 2.4.0 +Version: 2.4.1 Authors@R: c( person("Nicolau", "Manubens", , "nicolau.manubens@bsc.es", role = c("aut")), person("An-Chi", "Ho", , "an.ho@bsc.es", role = c("aut"), comment = c(ORCID = "0000-0002-4182-5258")), @@ -32,7 +32,6 @@ Imports: easyNCDF, s2dv, ClimProjDiags, - PCICt, methods Suggests: stats, diff --git a/NAMESPACE b/NAMESPACE index c6bca724..c3232eda 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -16,7 +16,7 @@ export(Start) export(Step) export(indices) export(values) -import(PCICt) +import(ClimProjDiags) import(abind) import(bigmemory) import(easyNCDF) diff --git a/R/NcDataReader.R b/R/NcDataReader.R index 47817c65..66d58fd7 100644 --- a/R/NcDataReader.R +++ b/R/NcDataReader.R @@ -43,8 +43,9 @@ #' first_round_indices, synonims) #'@seealso \code{\link{NcOpener}} \code{\link{NcDimReader}} #' \code{\link{NcCloser}} \code{\link{NcVarReader}} -#'@import easyNCDF PCICt +#'@import easyNCDF ClimProjDiags #'@export + NcDataReader <- function(file_path = NULL, file_object = NULL, file_selectors = NULL, inner_indices = NULL, synonims) { @@ -303,8 +304,8 @@ NcDataReader <- function(file_path = NULL, file_object = NULL, result_vec[result_i] <- tmp } # Transfer the strings to time class - new_array <- PCICt::as.PCICt(result_vec, cal = 'gregorian') - new_array <- suppressWarnings(PCICt::as.POSIXct.PCICt(new_array, tz = "UTC")) + new_array <- ClimProjDiags::as.PCICt(result_vec, cal = 'gregorian') + new_array <- suppressWarnings(ClimProjDiags::as.POSIXct.PCICt(new_array, tz = "UTC")) # if (calendar == 'gregorian') { # # Find how many years + months @@ -373,8 +374,8 @@ NcDataReader <- function(file_path = NULL, file_object = NULL, } if (!(units %in% c('month', 'months') & calendar == 'gregorian')) { - new_array <- PCICt::as.PCICt(result, cal = calendar, origin = parts[2])[] - new_array <- suppressWarnings(PCICt::as.POSIXct.PCICt(new_array, tz = "UTC")) + new_array <- ClimProjDiags::as.PCICt(result, cal = calendar, origin = parts[2])[] + new_array <- suppressWarnings(ClimProjDiags::as.POSIXct.PCICt(new_array, tz = "UTC")) } #new_array <- seq(as.POSIXct(parts[2]), # length = max(result, na.rm = TRUE) + 1, diff --git a/tests/testthat/test-Start-calendar.R b/tests/testthat/test-Start-calendar.R index 43651ab4..0ba03c84 100644 --- a/tests/testthat/test-Start-calendar.R +++ b/tests/testthat/test-Start-calendar.R @@ -243,8 +243,11 @@ expect_equal( dim(attr(data_obs, 'Variables')$common$time), c(date = 4, time = 1) ) + +## NOTE: as.character() behavior changes in R/4.?.?, midnight dates become YYYYmmdd without +## the hour information. This is why the unit test fails in the hub. expect_equal( -as.character(attr(data_obs, 'Variables')$common$time), +format(attr(data_obs, 'Variables')$common$time), c("1960-11-16 00:00:00", "1960-12-16 12:00:00", "1961-01-16 12:00:00", "1961-02-15 00:00:00") ) -- GitLab From fc28d09a0982b67b7924eb04a3e52d202ac7ad2d Mon Sep 17 00:00:00 2001 From: vagudets Date: Tue, 26 Aug 2025 15:16:39 +0200 Subject: [PATCH 8/9] Changes from master! --- R/ByChunks_autosubmit.R | 2 +- R/Utils.R | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/R/ByChunks_autosubmit.R b/R/ByChunks_autosubmit.R index 0dd2bcd4..796c0112 100644 --- a/R/ByChunks_autosubmit.R +++ b/R/ByChunks_autosubmit.R @@ -365,7 +365,7 @@ ByChunks_autosubmit <- function(step_fun, cube_headers, ..., chunks = 'auto', if (is.null(cluster[['special_setup']])) { cluster[['special_setup']] <- 'none' } - if (!(cluster[['special_setup']] %in% c('none', 'gpfs', 'nord4'))) { + if (!(cluster[['special_setup']] %in% c('none', 'marenostrum4', 'nord4'))) { stop("The value provided for the component 'special_setup' of the parameter ", "'cluster' is not recognized.") } diff --git a/R/Utils.R b/R/Utils.R index 5640e855..56a5e54e 100644 --- a/R/Utils.R +++ b/R/Utils.R @@ -885,10 +885,9 @@ write_autosubmit_bash <- function(chunks, cluster, autosubmit_suite_dir) { chunk_args[2, ] <- paste0('%JOBS.CHUNK_', n_chunk, '.', chunk_names, '_N%') chunk_args <- paste0('(', paste(c(chunk_args), collapse = ' '), ')') - ## TODO: Change back - bash_script_template <- file("/esarchive/scratch/vagudets/repos/startR/inst/chunking/Autosubmit/startR_autosubmit.sh") - ## bash_script_template <- file(system.file('chunking/Autosubmit/startR_autosubmit.sh', - ## package = 'startR')) + + bash_script_template <- file(system.file('chunking/Autosubmit/startR_autosubmit.sh', + package = 'startR')) bash_script_lines <- readLines(bash_script_template) close(bash_script_template) -- GitLab From 976084053c1543320f4887566b1b76a9701e8950 Mon Sep 17 00:00:00 2001 From: vagudets Date: Tue, 26 Aug 2025 15:17:38 +0200 Subject: [PATCH 9/9] Remove file --- inst/doc/usecase/ex2_15_margin_run_on_gpfs.R | 70 -------------------- 1 file changed, 70 deletions(-) delete mode 100644 inst/doc/usecase/ex2_15_margin_run_on_gpfs.R diff --git a/inst/doc/usecase/ex2_15_margin_run_on_gpfs.R b/inst/doc/usecase/ex2_15_margin_run_on_gpfs.R deleted file mode 100644 index 51bf1f31..00000000 --- a/inst/doc/usecase/ex2_15_margin_run_on_gpfs.R +++ /dev/null @@ -1,70 +0,0 @@ -# Author: Victòria Agudetse Roures, Sara Moreno, Núria Pérez Zanón -# Date: 7th August 2025 -# ------------------------------------------------------------------ - -# ----------------------------------------------------------------- -# Function working on time dimension e.g.: Season -# ------------------------------------------------------------------ -library(startR) -path <- "/esarchive/scratch/vagudets/repos/startR/R/" -ff <- lapply(list.files(path), function(x) paste0(path, x)) -invisible(lapply(ff, source)) - -repos <- '/esarchive/exp/ecmwf/system51c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc' -data <- Start(dat = repos, - var = 'tas', - sdate = c('20170101', '20180101'), - ensemble = indices(1:20), - time = 'all', - lat = 'all', - lon = indices(1:40), - return_vars = list(lat = 'dat', lon = 'dat', time = 'sdate'), - retrieve = FALSE) - -fun_spring <- function(x) { - y <- s2dv::Season(x, time_dim = 'time', monini = 1, moninf = 3, monsup = 5) - return(y) -} - -step1 <- Step(fun = fun_spring, - target_dims = c('var', 'time'), - output_dims = c('var', 'time')) - - -file_dims <- dim(attr(data, "ExpectedFiles")) -new_files <- sapply(attr(data, "ExpectedFiles"), - function(x) { - gsub("/esarchive/", "/gpfs/projects/bsc32/esarchive_cache/", x) - }) -dim(new_files) <- file_dims -attr(data, "ExpectedFiles") <- new_files -data[2] <- gsub("/esarchive/", "/gpfs/projects/bsc32/esarchive_cache/", data[2]) - -wf1 <- AddStep(data, step1) - -#-----------modify according to your personal info--------- - queue_host <- 'cte-amd' - temp_dir <- '/gpfs/scratch/bsc32/bsc032762/startR_hpc/' - ecflow_suite_dir <- '/gpfs/home/bsc/bsc032762/startR_local/' #your own local directory -#------------------------------------------------------------ - res <- Compute(workflow = wf1, - chunks = list(ensemble = 2, - sdate = 2), - threads_load = 2, - threads_compute = 4, - cluster = list( - queue_host = 'amd', - expid = 'a9p5', - hpc_user = "bsc032762", - autosubmit_module = 'autosubmit/4.1.14-foss-2023b-Python-3.11.5', - cores_per_job = 4, - job_wallclock = '01:00', - max_jobs = 100, - run_dir = '/gpfs/scratch/bsc32/bsc032762/startR_hpc/' - ), - workflow_manager = 'autosubmit', - autosubmit_suite_dir = "/esarchive/scratch/vagudets/startR_local/", - autosubmit_server = 'bscesautosubmit03', - wait = TRUE - ) - -- GitLab