... | ... | @@ -402,6 +402,139 @@ res <- Compute(step, list(system4, erai), |
|
|
</p>
|
|
|
</details>
|
|
|
|
|
|
## Two data inputs example with multiple parameters
|
|
|
<details><summary>CLICK ME</summary>
|
|
|
</p>
|
|
|
<br/>
|
|
|
|
|
|
```r
|
|
|
library(lubridate)
|
|
|
library(startR)
|
|
|
|
|
|
#===================
|
|
|
# Prepare sdates and paths
|
|
|
#===================
|
|
|
dataset <- "/esarchive/recon/ecmwf/era5/daily_mean/$var$_f1h/$var$_$sdate$.nc"
|
|
|
firsty <- "1981"
|
|
|
lasty <- "1987"
|
|
|
months <- 10:12
|
|
|
firstd <- paste0(firsty,"1001")
|
|
|
sdates <- ymd(firstd) + months(0:2) + rep(years(0:(1987-1981)), each = 3)
|
|
|
|
|
|
#===================
|
|
|
# Get daily winds
|
|
|
#===================
|
|
|
wind <- Start(dataset = dataset,
|
|
|
var = "sfcWind",
|
|
|
sdate = format(sdates, "%Y%m"),
|
|
|
time = 'all',
|
|
|
longitude = indices(1:30),
|
|
|
latitude = indices(1:20),
|
|
|
return_vars = list(time = NULL, latitude = NULL, longitude = NULL),
|
|
|
num_procs = 4, retrieve = FALSE,
|
|
|
synonims = list(longitude = c('lon', 'longitude'),
|
|
|
latitude = c('lat', 'latitude')))
|
|
|
|
|
|
# Synthetic MJO for 'season = OND':
|
|
|
MJO <- data.frame(vdate = 1:(30 * 7 + 31 * 14),
|
|
|
phase = c(rep(1:8, 80), 1:4),
|
|
|
amplitude = 10 * rnorm(31 * 14 + 30 * 7))
|
|
|
|
|
|
stratify_atomic <- function(field, MJO, season = c("JFM", "OND"),
|
|
|
lag = 0, ampl = 2, relative = TRUE, signif = 0.05) {
|
|
|
# Arrange wind in form (days) to match MJO
|
|
|
nmonths <- dim(field)[3]
|
|
|
field <- aperm(field, c(1, 2, 4, 3))
|
|
|
dim(field) <- c(31 * nmonths)
|
|
|
if(season == "JFM") {
|
|
|
daysok <- rep(c(rep(TRUE, 31), rep(TRUE, 28),
|
|
|
rep(FALSE, 3), rep(TRUE, 31)), nmonths / 3)
|
|
|
} else if (season == "OND") {
|
|
|
daysok <- rep(c(rep(TRUE, 31), rep(TRUE, 30),
|
|
|
rep(FALSE, 1), rep(TRUE, 31)), nmonths / 3)
|
|
|
}
|
|
|
field <- field[daysok]
|
|
|
dim(field) <- c(days = length(field))
|
|
|
|
|
|
if(dim(field)[1] != dim(MJO)[1]) {
|
|
|
stop("MJO indices and wind data have different number of days")
|
|
|
}
|
|
|
|
|
|
idx <- function(MJO, phase, ampl, lag){
|
|
|
if(lag == 0) {
|
|
|
return(MJO$phase == phase & MJO$amplitude > ampl)
|
|
|
}
|
|
|
if(lag > 0) {
|
|
|
return(dplyr::lag(MJO$phase == phase & MJO$amplitude > ampl,
|
|
|
lag, default = FALSE))
|
|
|
}
|
|
|
if(lag < 0) {
|
|
|
return(dplyr::lead(MJO$phase == phase & MJO$amplitude > ampl,
|
|
|
- 1 * lag, default = FALSE))
|
|
|
}
|
|
|
}
|
|
|
strat <- plyr::laply(1:8, function(i) {
|
|
|
idx2 <- idx(MJO, i, ampl, lag)
|
|
|
if (relative) {
|
|
|
return(mean(field[idx2]) / mean(field) - 1)
|
|
|
} else {
|
|
|
return(mean(field[idx2]) - mean(field))
|
|
|
}})
|
|
|
strat.t.test <- plyr::laply(1:8, function(i) {
|
|
|
idx2 <- idx(MJO, i, ampl, lag)
|
|
|
return(t.test(field[idx2], field)$p.value)})
|
|
|
return(list(strat = strat, t_test = strat.t.test))
|
|
|
}
|
|
|
|
|
|
step <- Step(stratify_atomic,
|
|
|
target_dims = list(field = c('dataset', 'var', 'sdate', 'time')),
|
|
|
output_dims = list(strat = c('phase'), t_test = c('phase')))
|
|
|
workflow <- AddStep(wind, step, MJO = MJO, season = "OND", lag = "0", amp = 0)
|
|
|
|
|
|
res <- Compute(workflow$strat,
|
|
|
chunks = list(latitude = 2, longitude = 2),
|
|
|
threads_load = 2,
|
|
|
threads_compute = 4)
|
|
|
# -----------------------------------------------------------
|
|
|
#names(res)
|
|
|
#[1] "strat" "t_test"
|
|
|
#> dim(res$strat)
|
|
|
# phase longitude latitude
|
|
|
# 8 30 20
|
|
|
#> summary(res$strat)
|
|
|
# Min. 1st Qu. Median Mean 3rd Qu. Max.
|
|
|
#-0.133300 -0.032530 -0.001822 -0.005715 0.031700 0.094220
|
|
|
#> res$strat[1:5, 1:2, 1]
|
|
|
# [,1] [,2]
|
|
|
#[1,] -0.04661354 -0.04661539
|
|
|
#[2,] -0.01058483 -0.01053589
|
|
|
#[3,] 0.06123498 0.06125660
|
|
|
#[4,] 0.06371607 0.06374801
|
|
|
#[5,] 0.07085255 0.07078778
|
|
|
|
|
|
# -------------------------------------------------------------
|
|
|
|
|
|
# ---- To be tried for comparison of results:
|
|
|
res <- Compute(workflow$strat,
|
|
|
chunks = list(latitude = 2, longitude = 2),
|
|
|
threads_load = 2,
|
|
|
threads_compute = 2,
|
|
|
cluster = list(queue_host = 'cte-power', #user-specific
|
|
|
queue_type = 'slurm',
|
|
|
temp_dir = '/gpfs/scratch/bsc32/bsc32339/', #user-specific
|
|
|
r_module = 'R/3.5.0-foss-2018b',
|
|
|
cores_per_job = 2,
|
|
|
job_wallclock = '10:00:00',
|
|
|
max_jobs = 4,
|
|
|
bidirectional = FALSE,
|
|
|
polling_period = 10)
|
|
|
ecflow_suite_dir = '/esarchive/scratch/nperez/ecFlow', #user-specific
|
|
|
wait = FALSE)
|
|
|
```
|
|
|
</p>
|
|
|
</details>
|
|
|
|
|
|
|
|
|
## Computation of weekly means
|
|
|
<details><summary>CLICK ME</summary>
|
|
|
</p>
|
... | ... | |