Newer
Older
"""Diagnostic to compute regional averages"""
import iris.coords
import iris.exceptions
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticIntOption, DiagnosticDomainOption, \
DiagnosticBoolOption, DiagnosticBasinOption, DiagnosticVariableOption
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
class RegionMean(Diagnostic):
"""
Computes the mean value of the field (3D, weighted).
For 3D fields, a horizontal mean for each level is also given. If a spatial window
is specified, the mean value is computed only in this window.
:original author: Javier Vegas-Regidor <javier.vegas@bsc.es>
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable to average
:type variable: str
:param box: box used to restrict the vertical mean
:type box: Box
"""
alias = 'regmean'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, startdate, member, chunk, domain, variable, box, save3d,
variance, basin, grid_point):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.domain = domain
self.variable = variable
self.box = box
self.lat_name = 'lat'
self.lon_name = 'lon'
return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
self.box == other.box and self.variable == other.variable
def __str__(self):
return 'Region mean Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} Variable: {0.variable} ' \
'Box: {0.box} Save 3D: {0.save3d} Save variance: {0.variance} Grid point: {0.grid_point}'.format(self)
def __hash__(self):
return hash(str(self))
@classmethod
def generate_jobs(cls, diags, options):
"""
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: variable, minimum depth (level), maximum depth (level)
:type options: list[str]
:return:
"""
DiagnosticVariableOption(diags.data_manager.config.var_manager),
DiagnosticBasinOption('basin', Basins().Global),
DiagnosticIntOption('min_depth', -1),
DiagnosticIntOption('max_depth', -1),
DiagnosticBoolOption('variance', False),
DiagnosticOption('grid', ''))
options = cls.process_options(options, options_available)
box = Box()
box.min_depth = options['min_depth']
box.max_depth = options['max_depth']
job_list = list()
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job = RegionMean(diags.data_manager, startdate, member, chunk,
options['domain'], options['variable'], box,
options['save3D'], options['variance'], options['basin'],
options['grid_point'].lower())
job_list.append(job)
self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk)
if self.box.min_depth == -1 and self.box.max_depth == -1:
box_save = None
else:
box_save = self.box
self._declare_var('mean', False, box_save)
self._declare_var('mean', True, box_save)
self._declare_var('var', False, box_save)
self._declare_var('var', True, box_save)
iris.FUTURE.netcdf_promote = True
iris.FUTURE.netcdf_no_unlimited = True
has_levels = self._fix_file_metadata()
data = self._load_data()
if has_levels:
mean = iris.cube.CubeList()
var = iris.cube.CubeList()
mask = np.squeeze(Utils.get_mask(self.basin, True))
if len(mask.shape) == 3:
mask = mask[0, ...]
e1 = self._try_load_cube(1)
e2 = self._try_load_cube(2)
weights = e1 * e2 * mask
for time_slice in data.slices_over('time'):
mean.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.MEAN, weights=weights.data))
if self.variance:
var.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.VARIANCE))
self._send_var('mean', False, mean)
mean = iris.cube.CubeList()
mean3d = iris.cube.CubeList()
var = iris.cube.CubeList()
var3d = iris.cube.CubeList()
mask = np.squeeze(Utils.get_mask(self.basin, True))
e1 = self._try_load_cube(1)
e2 = self._try_load_cube(2)
e3 = self._try_load_cube(3)
weights = e1 * e2 * e3 * mask
weights = weights.extract(depth=lambda c: self.box.min_depth <= c <= self.box.max_depth).data
data = data.extract(depth=lambda c: self.box.min_depth <= c <= self.box.max_depth)
for time_slice in data.slices_over('time'):
mean.append(time_slice.collapsed(['latitude', 'longitude', 'depth'],
if self.save3d:
mean3d.append(time_slice.collapsed(['latitude', 'longitude'],
iris.analysis.MEAN, weights=weights))
if self.variance:
var.append(time_slice.collapsed(['latitude', 'longitude', 'depth'],
if self.save3d:
var3d.append(time_slice.collapsed(['latitude', 'longitude'],
self._send_var('mean', True, mean3d)
self._send_var('mean', False, mean)
if self.variance:
self._send_var('var', True, var3d)
self._send_var('var', False, var)
def _try_load_cube(self, number):
try:
cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}'.format(number, self.grid_point))
except iris.exceptions.ConstraintMismatchError:
cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}_0'.format(number, self.grid_point))
cube = iris.util.squeeze(cube)
dims = len(cube.shape)
try:
cube.coord('i')
except iris.exceptions.CoordinateNotFoundError:
cube.add_dim_coord(iris.coords.DimCoord(np.arange(cube.shape[dims - 1]), var_name='i'), dims - 1)
try:
cube.coord('j')
except iris.exceptions.CoordinateNotFoundError:
cube.add_dim_coord(iris.coords.DimCoord(np.arange(cube.shape[dims - 2]), var_name='j'), dims -2)
return cube
coords = []
handler = Utils.open_cdf(self.variable_file.local_file)
for variable in handler.variables:
if variable in ('time', 'lev', 'lat', 'lon', 'latitude', 'longitude', 'leadtime', 'time_centered'):
coords.append(variable)
if variable == 'time_centered':
handler.variables[variable].standard_name = ''
handler.variables[self.variable].coordinates = ' '.join(coords)
handler.close()
data = iris.load_cube(self.variable_file.local_file)
if data.coords('model_level_number'):
coord = data.coord('model_level_number')
coord.standard_name = 'depth'
coord.long_name = 'depth'
elif data.coords('Vertical T levels'):
coord = data.coord('Vertical T levels')
coord.standard_name = 'depth'
coord.long_name = 'depth'
def _fix_file_metadata(self):
handler = Utils.open_cdf(self.variable_file.local_file)
var = handler.variables[self.variable]
coordinates = ''
has_levels = False
for dimension in handler.variables.keys():
if dimension in ['time', 'lev', 'lat', 'latitude', 'lon', 'longitude', 'i', 'j']:
coordinates += ' {0}'.format(dimension)
if dimension == 'lev':
has_levels = True
var.coordinates = coordinates
handler.close()
return has_levels
def _declare_var(self, var, threed, box_save):
if threed:
if not self.save3d:
return False
final_name = '{1}3d{0}'.format(var, self.variable)
final_name = '{1}{0}'.format(var, self.variable)
self.declared[final_name] = self.declare_chunk(ModelingRealms.ocean, final_name, self.startdate, self.member,
self.chunk, box=box_save, region=self.basin)
def _send_var(self, var, threed, cube_list):
if not self.save3d and threed:
return False
final_name = '{1}3d{0}'.format(var, self.variable)
final_name = '{1}{0}'.format(var, self.variable)
cube = cube_list.merge_cube()
cube.remove_coord('latitude')
cube.remove_coord('longitude')
try:
cube.remove_coord('depth')
except iris.exceptions.CoordinateNotFoundError:
pass
try:
cube.remove_coord('lev')
except iris.exceptions.CoordinateNotFoundError:
pass
temp = TempFile.get()
iris.save(cube, temp)
self.declared[final_name].set_local_file(temp, diagnostic=self, rename_var='result', region=self.basin)