test_autosubmit.R 11.6 KB
Newer Older
aho's avatar
aho committed
   chunks <- list(lat = 2, lon = 3)
   
aho's avatar
aho committed
   cluster <- list(queue_host = "local", #"nord3", #'local', #Q: What is the name? The ones recognized by autosubmit?
                  #temp_dir = '/esarchive/autosubmit/',  #Q: Do we need a dir for autosubmit? NO.
aho's avatar
aho committed
                  r_module = 'R/4.1.2-foss-2019b',
                  cores_per_job = 4,
                  job_wallclock = '01:00:00',
                  max_jobs = 4,
                  expid = "a659",  # different from now
aho's avatar
aho committed
                  hpc_user = "bsc32734")  # different from now
aho's avatar
aho committed

  autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"

aho's avatar
aho committed
# These two functions should be called by ByChunks
aho's avatar
aho committed
source("/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit/write_autosubmit_conf.R")
aho's avatar
aho committed
source("/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit/write_bash.R")
aho's avatar
aho committed

write_autosubmit_conf(chunks, cluster, autosubmit_suite_dir)
aho's avatar
aho committed

write_bash(chunks, cluster, autosubmit_suite_dir)

aho's avatar
aho committed
#========================================================


#========================================================
# CASE 1: return data directly
aho's avatar
aho committed
#========================================================

  library(startR)

  data <- Start(dat = "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc",
                var = c('tas'),
                sdate = paste0(2017:2018, '0501'),
                ensemble = 'all',
aho's avatar
aho committed
                ftime = indices(1:3),
aho's avatar
aho committed
                lat = values(list(20, 80)), lat_reorder = Sort(),
                lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180),
aho's avatar
aho committed
                synonims = list(ftime = c('ftime', 'time'), lat = c('lat', 'latitude'), lon = c('lon', 'longitude')),
                return_vars = list(ftime = 'sdate', lon = NULL, lat = NULL),
aho's avatar
aho committed
                retrieve = FALSE)

 func <- function(x) {
   return(x)
 }

  step <- Step(func, target_dims = c('lat', 'lon'), output_dims = c('lat', 'lon'),
               use_attributes = list("Variables"))
  wf <- AddStep(data, step)

  autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"

#--------- Source stuff for testing -------------
  setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit')
  source('Compute.R')
  source('ByChunks_AS.R')
aho's avatar
aho committed
  source("Collect_AS.R")
aho's avatar
aho committed
  .message <- startR:::.message
  .warning <- startR:::.warning
#-----------------------------------------------

  res <- Compute(wf, chunks = list(sdate = 2),
                 threads_compute = 4,
                 cluster = list(
aho's avatar
aho committed
                   queue_host = 'nord3', # name in platforms.yml
aho's avatar
aho committed
#                   queue_type = 'slurm',
#                   temp_dir = temp_dir,
                   r_module = 'R/4.1.2-foss-2019b',
aho's avatar
aho committed
                   autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3',
aho's avatar
aho committed
                   cores_per_job = 4,
                   job_wallclock = '01:00:00',
                   max_jobs = 4,
#                   bidirectional = FALSE,
aho's avatar
aho committed
#                   polling_period = 10,
aho's avatar
aho committed
#                   extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'), 
aho's avatar
aho committed
                   expid = "a68h",
                   hpc_user = "bsc32734"
aho's avatar
aho committed
                 ),
aho's avatar
aho committed
                 workflow_manager = 'autosubmit', # 'ecFlow'
                 autosubmit_suite_dir = autosubmit_suite_dir,
                 autosubmit_server = NULL, #'bscesautosubmit01',
aho's avatar
aho committed
                 wait = F #TRUE
aho's avatar
aho committed
                 )  

aho's avatar
aho committed


#========================================================
# CASE 2: real calculation
#========================================================

  library(startR)

  data <- Start(dat = "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc",
                var = c('tas'),
                sdate = paste0(2017:2018, '0501'),
                ensemble = 'all',
                time = indices(1:3),
                lat = values(list(20, 80)), lat_reorder = Sort(),
                lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180),
                synonims = list(lat = c('lat', 'latitude'), lon = c('lon', 'longitude')),
                return_vars = list(time = 'sdate', lon = NULL, lat = NULL),
                retrieve = FALSE)

 func <- function(x) {
   res <- ClimProjDiags::WeightedMean(x, lon = attr(x, 'Variables')$common$lon, lat = attr(x, 'Variables')$common$lat)
   return(res)
 }


  step <- Step(func, target_dims = c('lat', 'lon'), output_dims = NULL,
               use_attributes = list("Variables"), use_libraries = "ClimProjDiags")
  wf <- AddStep(data, step)

  autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"

#--------- Source stuff for testing -------------
  setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit')
  source('Compute.R')
  source('ByChunks_AS.R')
  source("Collect_AS.R")
  .message <- startR:::.message
  .warning <- startR:::.warning
#-----------------------------------------------
  run_on <- 'local'
  if (run_on == 'local') {
    r_module_ver <- "R/4.1.2-foss-2015a-bare"
  } else if (run_on == 'nord3') {
    r_module_ver <- "R/4.1.2-foss-2019b"
  }
  res <- Compute(wf, chunks = list(sdate = 2, ensemble = 2),
                 threads_compute = 4,
                 cluster = list(
                   queue_host = run_on, #'nord3', # name in platforms.yml
                   r_module = r_module_ver,
                   autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3',
                   cores_per_job = 4,
                   job_wallclock = '01:00:00',
                   max_jobs = 4,
                   polling_period = 10,
                   extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'), 
                   expid = "a68e",
                   hpc_user = "bsc32734"
                 ),
                 workflow_manager = 'autosubmit', # 'ecFlow'
                 autosubmit_suite_dir = autosubmit_suite_dir,
                 autosubmit_server = NULL, #'bscesautosubmit01',
                 wait = TRUE
                 )


#========================================================
# CASE 3: 2 dats
#========================================================

  library(startR)
  path1 <- "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc"
  path2 <- "/esarchive/exp/ecmwf/system5_m1/monthly_mean/$var$_f6h/$var$_$sdate$.nc"

  data <- Start(dat = list(list(name = 'system5c3s', path = path1), 
                           list(name = 'system5_m1', path = path2)),
                var = c('tas'),
                sdate = paste0(2017:2018, '0501'),
                ensemble = 'all',
                time = indices(1:3),
                lat = values(list(20, 80)), lat_reorder = Sort(),
                lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180),
                transform = CDORemapper,
                transform_extra_cells = 2,
                transform_params = list(grid = 'r360x181',
                                        method = 'conservative'),
                transform_vars = c('lat', 'lon'),
                synonims = list(lat = c('lat', 'latitude'), lon = c('lon', 'longitude')),
                return_vars = list(time = 'sdate', lon = 'dat', lat = 'dat'),
                retrieve = FALSE)

 func <- function(x) {
   res <- ClimProjDiags::WeightedMean(x, lon = attr(x, 'Variables')$system5c3s$lon, lat = attr(x, 'Variables')$system5c3s$lat)
   return(res)
 }

  step <- Step(func, target_dims = c('dat', 'lat', 'lon'), output_dims = 'dat',
               use_attributes = list("Variables"), use_libraries = "ClimProjDiags")

  step <- Step(func, target_dims = c('lat', 'lon'), output_dims = NULL,
               use_attributes = list("Variables"), use_libraries = "ClimProjDiags")
  wf <- AddStep(data, step)

  autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"

#--------- Source stuff for testing -------------
  setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit')
  source('Compute.R')
  source('ByChunks_AS.R')
  source("Collect_AS.R")
  .message <- startR:::.message
  .warning <- startR:::.warning
#-----------------------------------------------
  run_on <- 'local'
  if (run_on == 'local') {
    r_module_ver <- "R/4.1.2-foss-2015a-bare"
    cdo_module_ver <- "CDO/1.9.8-foss-2015a"
  } else if (run_on == 'nord3') {
    r_module_ver <- "R/4.1.2-foss-2019b"
  }
  res <- Compute(wf, chunks = list(sdate = 2),
                 threads_compute = 4,
                 cluster = list(
                   queue_host = run_on, #'nord3', # name in platforms.yml
                   r_module = r_module_ver,
                   autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3',
                   CDO_module = cdo_module_ver,
                   cores_per_job = 4,
                   job_wallclock = '01:00:00',
                   max_jobs = 4,
                   polling_period = 10,
                   extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'),
#                   hpc_user = "bsc32734",
                   expid = "a68e"
                 ),
                 workflow_manager = 'autosubmit', # 'ecFlow'
                 autosubmit_suite_dir = autosubmit_suite_dir,
                 autosubmit_server = NULL, #'bscesautosubmit01',
                 wait = TRUE
                 )


#========================================================
# CASE 4: 2 outputs, named outputs
#========================================================

  library(startR)

  data <- Start(dat = "/esarchive/exp/ecmwf/system5c3s/monthly_mean/$var$_f6h/$var$_$sdate$.nc",
                var = c('tas'),
                sdate = paste0(2017:2018, '0501'),
                ensemble = 'all',
                time = indices(1:3),
                lat = values(list(20, 80)), lat_reorder = Sort(),
                lon = values(list(-80, 40)), lon_reorder = CircularSort(-180, 180),
                synonims = list(lat = c('lat', 'latitude'), lon = c('lon', 'longitude')),
                return_vars = list(time = 'sdate', lon = NULL, lat = NULL),
                retrieve = FALSE)

 func <- function(x) {
   return(list(mean = mean(x, na.rm = T), min = min(x, na.rm = T)))
 }


  step <- Step(func, target_dims = c('lat', 'lon'), output_dims = list(mean = NULL, min = NULL))
  wf <- AddStep(data, step)[[1]]

  autosubmit_suite_dir <- "/home/Earth/aho/startR_local_autosubmit/"

#--------- Source stuff for testing -------------
  setwd('/esarchive/scratch/aho/git/aho-testtest/startR/autosubmit')
  source('Compute.R')
  source('ByChunks_AS.R')
  source("Collect_AS.R")
  .message <- startR:::.message
  .warning <- startR:::.warning
#-----------------------------------------------
  run_on <- 'local'
  if (run_on == 'local') {
    r_module_ver <- "R/4.1.2-foss-2015a-bare"
    cdo_module_ver <- "CDO/1.9.8-foss-2015a"
  } else if (run_on == 'nord3') {
    r_module_ver <- "R/4.1.2-foss-2019b"
  }
  res <- Compute(wf, chunks = list(sdate = 2),
                 threads_compute = 4,
                 cluster = list(
                   queue_host = run_on, #'nord3', # name in platforms.yml
                   r_module = r_module_ver,
                   autosubmit_module = 'autosubmit/4.0.0b-foss-2015a-Python-3.7.3',
#                   CDO_module = cdo_module_ver,
                   cores_per_job = 4,
                   job_wallclock = '01:00:00',
                   max_jobs = 4,
                   polling_period = 10,
                   extra_queue_params = list('#SBATCH --constraint=medmem', '#SBATCH --exclusive'),
#                   hpc_user = "bsc32734",
                   expid = "a68e"
                 ),
                 workflow_manager = 'autosubmit', # 'ecFlow'
                 autosubmit_suite_dir = autosubmit_suite_dir,
                 autosubmit_server = NULL, #'bscesautosubmit01',
                 wait = TRUE
                 )