chunks <- list(lat = 2, lon = 3) cluster <- list(queue_host = "local", #"nord3", #'local', #Q: What is the name? The ones recognized by autosubmit? #temp_dir = '/esarchive/autosubmit/', #Q: Do we need a dir for autosubmit? NO. r_module = 'R/4.1.2-foss-2019b', cores_per_job = 4, job_wallclock = '01:00:00', max_jobs = 4, expid = "a659", # different from now hpc_user = "bsc32734") # different from now autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/" # These two functions should be called by ByChunks source("/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit/write_autosubmit_conf.R") source("/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit/write_bash.R") write_autosubmit_conf(chunks, cluster, autosubmit_suite_dir) write_bash(chunks, cluster, autosubmit_suite_dir) #======================================================== #======================================================== # CASE 1: return data directly #======================================================== # library(startR) source("~/aho-testtest/helper_script/load_local_package.R") load_local_package("/esarchive/scratch/aho/git/startR/") data <- Start(dat = "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc", var = c('tas'), sdate = paste0(2017:2018, '0501'), ensemble = 'all', ftime = indices(1:3), lat = values(list(20, 80)), lat_reorder = Sort(), lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180), synonims = list(ftime = c('ftime', 'time'), lat = c('lat', 'latitude'), lon = c('lon', 'longitude')), return_vars = list(ftime = 'sdate', lon = NULL, lat = NULL), retrieve = FALSE) func <- function(x) { return(x) } step <- Step(func, target_dims = c('lat', 'lon'), output_dims = c('lat', 'lon'), use_attributes = list("Variables")) wf <- AddStep(data, step) autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/" # can be NULL run_on <- 'local' if (run_on == 'local') { r_module_ver <- "R/4.1.2-foss-2015a-bare" } else if (run_on == 'nord3') { r_module_ver <- "R/4.1.2-foss-2019b" } res <- Compute(wf, chunks = list(sdate = 2), threads_load = 2, threads_compute = 4, cluster = list( queue_host = run_on, # name in platforms.yml r_module = r_module_ver, autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3', # cores_per_job = 4, job_wallclock = '01:00:00', max_jobs = 4, # polling_period = 10, # extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'), expid = "a68h", #can be NULL hpc_user = "bsc32734" ), workflow_manager = 'autosubmit', # 'ecFlow' or NULL autosubmit_suite_dir = autosubmit_suite_dir, autosubmit_server = NULL, #'bscesautosubmit01', wait = TRUE #FALSE ) #======================================================== # CASE 2: real calculation #======================================================== library(startR) data <- Start(dat = "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc", var = c('tas'), sdate = paste0(2017:2018, '0501'), ensemble = 'all', time = indices(1:3), lat = values(list(20, 80)), lat_reorder = Sort(), lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180), synonims = list(lat = c('lat', 'latitude'), lon = c('lon', 'longitude')), return_vars = list(time = 'sdate', lon = NULL, lat = NULL), retrieve = FALSE) func <- function(x) { res <- ClimProjDiags::WeightedMean(x, lon = attr(x, 'Variables')$common$lon, lat = attr(x, 'Variables')$common$lat) return(res) } step <- Step(func, target_dims = c('lat', 'lon'), output_dims = NULL, use_attributes = list("Variables"), use_libraries = "ClimProjDiags") wf <- AddStep(data, step) autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/" #--------- Source stuff for testing ------------- setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit') source('Compute.R') source('ByChunks_AS.R') source("Collect_AS.R") .message <- startR:::.message .warning <- startR:::.warning #----------------------------------------------- run_on <- 'local' if (run_on == 'local') { r_module_ver <- "R/4.1.2-foss-2015a-bare" } else if (run_on == 'nord3') { r_module_ver <- "R/4.1.2-foss-2019b" } res <- Compute(wf, chunks = list(sdate = 2, ensemble = 2), threads_compute = 4, cluster = list( queue_host = run_on, #'nord3', # name in platforms.yml r_module = r_module_ver, autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3', cores_per_job = 4, job_wallclock = '01:00:00', max_jobs = 4, polling_period = 10, extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'), expid = "a68h", hpc_user = "bsc32734" ), workflow_manager = 'autosubmit', # 'ecFlow' autosubmit_suite_dir = autosubmit_suite_dir, autosubmit_server = NULL, #'bscesautosubmit01', wait = TRUE ) #======================================================== # CASE 3: 2 dats #======================================================== library(startR) path1 <- "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc" path2 <- "/esarchive/exp/ecmwf/system5_m1/monthly_mean/$var$_f6h/$var$_$sdate$.nc" data <- Start(dat = list(list(name = 'system5c3s', path = path1), list(name = 'system5_m1', path = path2)), var = c('tas'), sdate = paste0(2017:2018, '0501'), ensemble = 'all', time = indices(1:3), lat = values(list(20, 80)), lat_reorder = Sort(), lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180), transform = CDORemapper, transform_extra_cells = 2, transform_params = list(grid = 'r360x181', method = 'conservative'), transform_vars = c('lat', 'lon'), synonims = list(lat = c('lat', 'latitude'), lon = c('lon', 'longitude')), return_vars = list(time = 'sdate', lon = 'dat', lat = 'dat'), retrieve = FALSE) func <- function(x) { res <- ClimProjDiags::WeightedMean(x, lon = attr(x, 'Variables')$system5c3s$lon, lat = attr(x, 'Variables')$system5c3s$lat) return(res) } step <- Step(func, target_dims = c('dat', 'lat', 'lon'), output_dims = 'dat', use_attributes = list("Variables"), use_libraries = "ClimProjDiags") step <- Step(func, target_dims = c('lat', 'lon'), output_dims = NULL, use_attributes = list("Variables"), use_libraries = "ClimProjDiags") wf <- AddStep(data, step) autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/" #--------- Source stuff for testing ------------- setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit') source('Compute.R') source('ByChunks_AS.R') source("Collect_AS.R") .message <- startR:::.message .warning <- startR:::.warning #----------------------------------------------- run_on <- 'local' if (run_on == 'local') { r_module_ver <- "R/4.1.2-foss-2015a-bare" cdo_module_ver <- "CDO/1.9.8-foss-2015a" } else if (run_on == 'nord3') { r_module_ver <- "R/4.1.2-foss-2019b" cdo_module_ver <- "CDO/1.9.8-foss-2019b" } res <- Compute(wf, chunks = list(sdate = 2), threads_compute = 4, threads_load = 2, cluster = list( queue_host = run_on, #'nord3', # name in platforms.yml r_module = r_module_ver, autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3', CDO_module = cdo_module_ver, cores_per_job = 4, job_wallclock = '01:00:00', max_jobs = 4, polling_period = 10, extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'), # hpc_user = "bsc32734", expid = "a68e" ), workflow_manager = 'autosubmit', # 'ecFlow' autosubmit_suite_dir = autosubmit_suite_dir, autosubmit_server = NULL, #'bscesautosubmit01', wait = TRUE ) #======================================================== # CASE 4: 2 outputs, named outputs #======================================================== library(startR) data <- Start(dat = "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc", var = c('tas'), sdate = paste0(2017:2018, '0501'), ensemble = 'all', time = indices(1:3), lat = values(list(20, 80)), lat_reorder = Sort(), lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180), synonims = list(lat = c('lat', 'latitude'), lon = c('lon', 'longitude')), return_vars = list(time = 'sdate', lon = NULL, lat = NULL), retrieve = FALSE) func <- function(x) { return(list(mean = mean(x, na.rm = T), min = min(x, na.rm = T))) } step <- Step(func, target_dims = c('lat', 'lon'), output_dims = list(mean = NULL, min = NULL)) wf <- AddStep(data, step)[[1]] autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/" #--------- Source stuff for testing ------------- setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit') source('Compute.R') source('ByChunks_AS.R') source("Collect_AS.R") .message <- startR:::.message .warning <- startR:::.warning #----------------------------------------------- run_on <- 'local' if (run_on == 'local') { r_module_ver <- "R/4.1.2-foss-2015a-bare" cdo_module_ver <- "CDO/1.9.8-foss-2015a" } else if (run_on == 'nord3') { r_module_ver <- "R/4.1.2-foss-2019b" } res <- Compute(wf, chunks = list(sdate = 2), threads_compute = 4, cluster = list( queue_host = run_on, #'nord3', # name in platforms.yml r_module = r_module_ver, autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3', # CDO_module = cdo_module_ver, cores_per_job = 4, job_wallclock = '01:00:00', max_jobs = 4, polling_period = 10, extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'), # hpc_user = "bsc32734", expid = "a68e" ), workflow_manager = 'autosubmit', # 'ecFlow' autosubmit_suite_dir = autosubmit_suite_dir, autosubmit_server = NULL, #'bscesautosubmit01', wait = TRUE ) # Minimal example, on AS VM test_AS_error <- function () { as_module <- "autosubmit/4.0.0b-foss-2015a-Python-3.7.3" exp_id <- "a659" # minimum script sys_commands <- paste0("module load ", as_module, "; ", "autosubmit create ", exp_id, " -np; ", "autosubmit refresh ", exp_id, "; ", "autosubmit run ", exp_id) system(sys_commands) # It should stop here because autosubmit run xxxx jobs failed # But now, it keeps running the following line. print("You shouldn't see this message!!") }