test_autosubmit.R 12.1 KB
Newer Older
aho's avatar
aho committed
   chunks <- list(lat = 2, lon = 3)
   
aho's avatar
aho committed
   cluster <- list(queue_host = "local", #"nord3", #'local', #Q: What is the name? The ones recognized by autosubmit?
                  #temp_dir = '/esarchive/autosubmit/',  #Q: Do we need a dir for autosubmit? NO.
aho's avatar
aho committed
                  r_module = 'R/4.1.2-foss-2019b',
                  cores_per_job = 4,
                  job_wallclock = '01:00:00',
                  max_jobs = 4,
                  expid = "a659",  # different from now
aho's avatar
aho committed
                  hpc_user = "bsc32734")  # different from now
aho's avatar
aho committed

  autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"

aho's avatar
aho committed
# These two functions should be called by ByChunks
aho's avatar
aho committed
source("/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit/write_autosubmit_conf.R")
aho's avatar
aho committed
source("/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit/write_bash.R")
aho's avatar
aho committed

write_autosubmit_conf(chunks, cluster, autosubmit_suite_dir)
aho's avatar
aho committed

write_bash(chunks, cluster, autosubmit_suite_dir)

aho's avatar
aho committed
#========================================================


#========================================================
# CASE 1: return data directly
aho's avatar
aho committed
#========================================================

aho's avatar
aho committed
#  library(startR)
 source("~/aho-testtest/helper_script/load_local_package.R")
 load_local_package("/esarchive/scratch/aho/git/startR/")
aho's avatar
aho committed

  data <- Start(dat = "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc",
                var = c('tas'),
                sdate = paste0(2017:2018, '0501'),
                ensemble = 'all',
aho's avatar
aho committed
                ftime = indices(1:3),
aho's avatar
aho committed
                lat = values(list(20, 80)), lat_reorder = Sort(),
                lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180),
aho's avatar
aho committed
                synonims = list(ftime = c('ftime', 'time'), lat = c('lat', 'latitude'), lon = c('lon', 'longitude')),
                return_vars = list(ftime = 'sdate', lon = NULL, lat = NULL),
aho's avatar
aho committed
                retrieve = FALSE)

 func <- function(x) {
   return(x)
 }

  step <- Step(func, target_dims = c('lat', 'lon'), output_dims = c('lat', 'lon'),
               use_attributes = list("Variables"))
  wf <- AddStep(data, step)

aho's avatar
aho committed
  autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"  # can be NULL
aho's avatar
aho committed

aho's avatar
aho committed
  run_on <- 'local'
aho's avatar
aho committed
  if (run_on == 'local') {
    r_module_ver <- "R/4.1.2-foss-2015a-bare"
  } else if (run_on == 'nord3') {
    r_module_ver <- "R/4.1.2-foss-2019b"
  }

aho's avatar
aho committed
  res <- Compute(wf, chunks = list(sdate = 2),
aho's avatar
aho committed
                 threads_load = 2, threads_compute = 4,
aho's avatar
aho committed
                 cluster = list(
aho's avatar
aho committed
                   queue_host = run_on, # name in platforms.yml
                   r_module = r_module_ver, 
aho's avatar
aho committed
                   autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3',
aho's avatar
aho committed
                   cores_per_job = 4,
aho's avatar
aho committed
                   job_wallclock = '01:00:00',
                   max_jobs = 4,
aho's avatar
aho committed
#                   polling_period = 10,
aho's avatar
aho committed
#                   extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'), 
aho's avatar
aho committed
                   expid = "a68h", #can be NULL
aho's avatar
aho committed
                   hpc_user = "bsc32734"
aho's avatar
aho committed
                 ),
aho's avatar
aho committed
                 workflow_manager = 'autosubmit', # 'ecFlow' or NULL
aho's avatar
aho committed
                 autosubmit_suite_dir = autosubmit_suite_dir,
                 autosubmit_server = NULL, #'bscesautosubmit01',
aho's avatar
aho committed
                 wait = TRUE #FALSE
aho's avatar
aho committed
                 )  

aho's avatar
aho committed


#========================================================
# CASE 2: real calculation
#========================================================

  library(startR)

  data <- Start(dat = "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc",
                var = c('tas'),
                sdate = paste0(2017:2018, '0501'),
                ensemble = 'all',
                time = indices(1:3),
                lat = values(list(20, 80)), lat_reorder = Sort(),
                lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180),
                synonims = list(lat = c('lat', 'latitude'), lon = c('lon', 'longitude')),
                return_vars = list(time = 'sdate', lon = NULL, lat = NULL),
                retrieve = FALSE)

 func <- function(x) {
   res <- ClimProjDiags::WeightedMean(x, lon = attr(x, 'Variables')$common$lon, lat = attr(x, 'Variables')$common$lat)
   return(res)
 }


  step <- Step(func, target_dims = c('lat', 'lon'), output_dims = NULL,
               use_attributes = list("Variables"), use_libraries = "ClimProjDiags")
  wf <- AddStep(data, step)

aho's avatar
aho committed
 autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"
aho's avatar
aho committed

#--------- Source stuff for testing -------------
  setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit')
  source('Compute.R')
  source('ByChunks_AS.R')
  source("Collect_AS.R")
  .message <- startR:::.message
  .warning <- startR:::.warning
#-----------------------------------------------
  run_on <- 'local'
  if (run_on == 'local') {
    r_module_ver <- "R/4.1.2-foss-2015a-bare"
  } else if (run_on == 'nord3') {
    r_module_ver <- "R/4.1.2-foss-2019b"
  }
  res <- Compute(wf, chunks = list(sdate = 2, ensemble = 2),
                 threads_compute = 4,
                 cluster = list(
                   queue_host = run_on, #'nord3', # name in platforms.yml
                   r_module = r_module_ver,
                   autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3',
                   cores_per_job = 4,
                   job_wallclock = '01:00:00',
                   max_jobs = 4,
                   polling_period = 10,
                   extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'), 
aho's avatar
aho committed
                   expid = "a68h",
aho's avatar
aho committed
                   hpc_user = "bsc32734"
                 ),
                 workflow_manager = 'autosubmit', # 'ecFlow'
                 autosubmit_suite_dir = autosubmit_suite_dir,
                 autosubmit_server = NULL, #'bscesautosubmit01',
                 wait = TRUE
                 )


#========================================================
# CASE 3: 2 dats
#========================================================

  library(startR)
  path1 <- "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc"
  path2 <- "/esarchive/exp/ecmwf/system5_m1/monthly_mean/$var$_f6h/$var$_$sdate$.nc"

  data <- Start(dat = list(list(name = 'system5c3s', path = path1), 
                           list(name = 'system5_m1', path = path2)),
                var = c('tas'),
                sdate = paste0(2017:2018, '0501'),
                ensemble = 'all',
                time = indices(1:3),
                lat = values(list(20, 80)), lat_reorder = Sort(),
                lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180),
                transform = CDORemapper,
                transform_extra_cells = 2,
                transform_params = list(grid = 'r360x181',
                                        method = 'conservative'),
                transform_vars = c('lat', 'lon'),
                synonims = list(lat = c('lat', 'latitude'), lon = c('lon', 'longitude')),
                return_vars = list(time = 'sdate', lon = 'dat', lat = 'dat'),
                retrieve = FALSE)

 func <- function(x) {
   res <- ClimProjDiags::WeightedMean(x, lon = attr(x, 'Variables')$system5c3s$lon, lat = attr(x, 'Variables')$system5c3s$lat)
   return(res)
 }

  step <- Step(func, target_dims = c('dat', 'lat', 'lon'), output_dims = 'dat',
               use_attributes = list("Variables"), use_libraries = "ClimProjDiags")

  step <- Step(func, target_dims = c('lat', 'lon'), output_dims = NULL,
               use_attributes = list("Variables"), use_libraries = "ClimProjDiags")
  wf <- AddStep(data, step)

  autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"

#--------- Source stuff for testing -------------
  setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit')
  source('Compute.R')
  source('ByChunks_AS.R')
  source("Collect_AS.R")
  .message <- startR:::.message
  .warning <- startR:::.warning
#-----------------------------------------------
  run_on <- 'local'
  if (run_on == 'local') {
    r_module_ver <- "R/4.1.2-foss-2015a-bare"
    cdo_module_ver <- "CDO/1.9.8-foss-2015a"
  } else if (run_on == 'nord3') {
    r_module_ver <- "R/4.1.2-foss-2019b"
aho's avatar
aho committed
    cdo_module_ver <- "CDO/1.9.8-foss-2019b"
aho's avatar
aho committed
  }
  res <- Compute(wf, chunks = list(sdate = 2),
aho's avatar
aho committed
                 threads_compute = 4, threads_load = 2,
aho's avatar
aho committed
                 cluster = list(
                   queue_host = run_on, #'nord3', # name in platforms.yml
                   r_module = r_module_ver,
                   autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3',
                   CDO_module = cdo_module_ver,
                   cores_per_job = 4,
                   job_wallclock = '01:00:00',
                   max_jobs = 4,
                   polling_period = 10,
                   extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'),
#                   hpc_user = "bsc32734",
                   expid = "a68e"
                 ),
                 workflow_manager = 'autosubmit', # 'ecFlow'
                 autosubmit_suite_dir = autosubmit_suite_dir,
                 autosubmit_server = NULL, #'bscesautosubmit01',
                 wait = TRUE
                 )


#========================================================
# CASE 4: 2 outputs, named outputs
#========================================================

  library(startR)

  data <- Start(dat = "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc",
                var = c('tas'),
                sdate = paste0(2017:2018, '0501'),
                ensemble = 'all',
                time = indices(1:3),
                lat = values(list(20, 80)), lat_reorder = Sort(),
                lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180),
                synonims = list(lat = c('lat', 'latitude'), lon = c('lon', 'longitude')),
                return_vars = list(time = 'sdate', lon = NULL, lat = NULL),
                retrieve = FALSE)

 func <- function(x) {
   return(list(mean = mean(x, na.rm = T), min = min(x, na.rm = T)))
 }


  step <- Step(func, target_dims = c('lat', 'lon'), output_dims = list(mean = NULL, min = NULL))
  wf <- AddStep(data, step)[[1]]

  autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"

#--------- Source stuff for testing -------------
  setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit')
  source('Compute.R')
  source('ByChunks_AS.R')
  source("Collect_AS.R")
  .message <- startR:::.message
  .warning <- startR:::.warning
#-----------------------------------------------
  run_on <- 'local'
  if (run_on == 'local') {
    r_module_ver <- "R/4.1.2-foss-2015a-bare"
    cdo_module_ver <- "CDO/1.9.8-foss-2015a"
  } else if (run_on == 'nord3') {
    r_module_ver <- "R/4.1.2-foss-2019b"
  }
  res <- Compute(wf, chunks = list(sdate = 2),
                 threads_compute = 4,
                 cluster = list(
                   queue_host = run_on, #'nord3', # name in platforms.yml
                   r_module = r_module_ver,
                   autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3',
#                   CDO_module = cdo_module_ver,
                   cores_per_job = 4,
                   job_wallclock = '01:00:00',
                   max_jobs = 4,
                   polling_period = 10,
                   extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'),
#                   hpc_user = "bsc32734",
                   expid = "a68e"
                 ),
                 workflow_manager = 'autosubmit', # 'ecFlow'
                 autosubmit_suite_dir = autosubmit_suite_dir,
                 autosubmit_server = NULL, #'bscesautosubmit01',
                 wait = TRUE
                 )
aho's avatar
aho committed


# Minimal example, on AS VM
test_AS_error <- function () {
  as_module <- "autosubmit/4.0.0b-foss-2015a-Python-3.7.3"
  exp_id <- "a659"  # minimum script
  sys_commands <- paste0("module load ", as_module, "; ",
                         "autosubmit create ", exp_id, " -np; ",
                         "autosubmit refresh ", exp_id, "; ",
                         "autosubmit run ", exp_id)
  system(sys_commands)
  # It should stop here because autosubmit run xxxx jobs failed
  # But now, it keeps running the following line.
  print("You shouldn't see this message!!")
}