Newer
Older
"""Diagnostic to compute regional averages"""
import iris.coords
import iris.exceptions
from earthdiagnostics.diagnostic import (
Diagnostic,
DiagnosticOption,
DiagnosticIntOption,
DiagnosticDomainOption,
DiagnosticBoolOption,
DiagnosticBasinListOption,
DiagnosticVariableListOption,
DiagnosticFrequencyOption,
)
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
import diagonals.regmean as regmean
from diagonals.mesh_helpers.nemo import Nemo
Computes the mean value of the field (3D, weighted).
For 3D fields, a horizontal mean for each level is also given.
If a spatial window is specified, the mean value is computed
only in this window.
:original author: Javier Vegas-Regidor <javier.vegas@bsc.es>
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable to average
:type variable: str
:param box: box used to restrict the vertical mean
:type box: Box
"""
"Diagnostic alias for the configuration file"
def __init__(
self,
data_manager,
startdate,
member,
chunk,
domain,
variable,
box,
save3d,
basins,
grid_point,
frequency,
):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.domain = domain
self.variable = variable
self.box = box
return (
self.startdate == other.startdate
and self.member == other.member
and self.chunk == other.chunk
and self.box == other.box
and self.variable == other.variable
)
return (
"Region mean Startdate: {0.startdate} Member: {0.member} "
"Chunk: {0.chunk} Variable: {0.variable} "
def __hash__(self):
return hash(str(self))
@classmethod
def generate_jobs(cls, diags, options):
"""
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: variable, minimum depth (level), maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (
DiagnosticDomainOption(),
DiagnosticVariableListOption(
diags.data_manager.config.var_manager, "variable"
),
DiagnosticBasinListOption("basins", "global"),
DiagnosticOption("grid_point", "T"),
DiagnosticIntOption("min_depth", -1),
DiagnosticIntOption("max_depth", -1),
DiagnosticBoolOption("save3D", True),
DiagnosticOption("grid", ""),
DiagnosticFrequencyOption("frequency", diags.config.frequency),
options = cls.process_options(options, options_available)
box = Box()
box.min_depth = options["min_depth"]
box.max_depth = options["max_depth"]
for var in options["variable"]:
for (
startdate,
member,
chunk,
) in diags.config.experiment.get_chunk_list():
diags.data_manager,
startdate,
member,
chunk,
options["domain"],
var,
box,
options["save3D"],
options["basins"],
options["grid_point"].lower(),
options["frequency"],
self.variable_file = self.request_chunk(
self.domain, self.variable, self.startdate, self.member, self.chunk
)
if self.box.min_depth == -1 and self.box.max_depth == -1:
box_save = None
else:
box_save = self.box
self._declare_var("mean", False, box_save)
self._declare_var("mean", True, box_save)
has_levels = self._fix_file_metadata()
data = self._load_data()
masks = {}
self.basins.sort()
for basin in self.basins:
masks[basin] = Utils.get_mask(basin)
self._mean_2d_var(data, mesh, masks)
def _mean_2d_var(self, data, mesh, masks):
areacello = mesh.get_areacello(cell_point=self.grid_point)
mean = regmean.compute_regmean_2d(data.data, masks, areacello)
areacello = mesh.get_areacello(cell_point=self.grid_point)
e3.coord("depth").bounds = data.coord("depth").bounds
if self.box.min_depth is not -1 and self.box.max_depth is not -1:
depth_constraint = iris.Constraint(
depth=lambda c: self.box.min_depth <= c <= self.box.max_depth
)
e3 = e3.extract(depth_constraint)
data = data.extract(depth_constraint)
volcello = areacello * e3.data.astype(np.float32)
mean = regmean.compute_regmean_3d(data.data, masks, volcello)
if self.save3d:
mean3d = regmean.compute_regmean_levels(data.data, masks, volcello)
cube = iris.load_cube("mesh_hgr.nc", f"e{number}{self.grid_point}")
"mesh_hgr.nc", f"e{number}{self.grid_point}_0"
)
cube = iris.util.squeeze(cube)
dims = len(cube.shape)
try:
cube.add_dim_coord(
iris.coords.DimCoord(
np.arange(cube.shape[dims - 1]), var_name="i"
),
dims - 1,
)
cube.add_dim_coord(
iris.coords.DimCoord(
np.arange(cube.shape[dims - 2]), var_name="j"
),
dims - 2,
)
# handler = Utils.open_cdf(self.variable_file.local_file)
# coords = set(handler.variables[self.variable].coordinates.split(' '))
# for variable in handler.variables:
# if variable in (
# "time",
# "lev",
# "lat",
# "lon",
# "latitude",
# "longitude",
# "leadtime",
# "time_centered",
# ):
# coords.add(variable)
# if variable == "time_centered":
# handler.variables[variable].standard_name = ""
# handler.variables[self.variable].coordinates = " ".join(coords)
# handler.close()
data = iris.load_cube(self.variable_file.local_file)
return self._rename_depth(data)
def _rename_depth(self, data):
for coord_name in ("model_level_number", "Vertical T levels", "lev"):
if data.coords(coord_name):
coord = data.coord(coord_name)
coord.standard_name = "depth"
coord.long_name = "depth"
def _fix_file_metadata(self):
handler = Utils.open_cdf(self.variable_file.local_file)
has_levels = False
for dimension in handler.variables.keys():
has_levels = True
handler.close()
return has_levels
def _declare_var(self, var, threed, box_save):
if threed:
if not self.save3d:
return False
final_name = "{1}3d{0}".format(var, self.variable)
final_name = "{1}{0}".format(var, self.variable)
self.declared[final_name] = self.declare_chunk(
ModelingRealms.ocean,
final_name,
self.startdate,
self.member,
self.chunk,
box=box_save,
region=self.basins,
)
def _save_result_2d(self, var, result, data):
final_name = "{1}{0}".format(var, self.variable)
temp = TempFile.get()
handler_source = Utils.open_cdf(self.variable_file.local_file)
handler_temp = Utils.open_cdf(temp, "w")
Utils.copy_variable(handler_source, handler_temp, "time", True, True)
handler_temp.createDimension("region", len(result))
handler_temp.createDimension("region_length", 50)
var_region = handler_temp.createVariable(
"region", "S1", ("region", "region_length")
)
var = handler_temp.createVariable(
"{1}{0}".format(var, self.variable), float, ("time", "region",),
)
var.units = "{0}".format(data.units)
for i, basin in enumerate(result):
var_region[i, ...] = netCDF4.stringtoarr(str(basin), 50)
var[..., i] = result[basin]
handler_temp.close()
self.declared[final_name].set_local_file(temp, diagnostic=self)
def _save_result_3d(self, var, result, data):
final_name = "{1}3d{0}".format(var, self.variable)
temp = TempFile.get()
handler_source = Utils.open_cdf(self.variable_file.local_file)
handler_temp = Utils.open_cdf(temp, "w")
Utils.copy_variable(handler_source, handler_temp, "time", True, True)
handler_temp.createDimension("region", len(result))
handler_temp.createDimension("region_length", 50)
handler_temp.createDimension("lev", data.shape[1])
var_level = handler_temp.createVariable("lev", float, "lev")
var_level[...] = data.coord("depth").points
var_level.units = "m"
var_level.axis = "Z"
var_level.positive = "down"
var_level.long_name = "ocean depth coordinate"
var_level.standard_name = "depth"
var_region = handler_temp.createVariable(
"region", "S1", ("region", "region_length")
)
var = handler_temp.createVariable(
"{1}3d{0}".format(var, self.variable),
float,
("time", "lev", "region",),
)
var.units = "{0}".format(data.units)