Newer
Older
"""Diagnostic to compute regional averages"""
import iris.coords
import iris.exceptions
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, \
DiagnosticIntOption, DiagnosticDomainOption, \
DiagnosticBoolOption, DiagnosticBasinListOption, \
DiagnosticVariableListOption, DiagnosticFrequencyOption
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
import diagonals.regmean as regmean
from diagonals.mesh_helpers.nemo import Nemo
Computes the mean value of the field (3D, weighted).
For 3D fields, a horizontal mean for each level is also given.
If a spatial window is specified, the mean value is computed
only in this window.
:original author: Javier Vegas-Regidor <javier.vegas@bsc.es>
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable to average
:type variable: str
:param box: box used to restrict the vertical mean
:type box: Box
"""
alias = 'regmean'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, startdate, member, chunk, domain,
variable, box, save3d, variance, basins, grid_point,
frequency):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.domain = domain
self.variable = variable
self.box = box
self.lat_name = 'lat'
self.lon_name = 'lon'
return self.startdate == other.startdate and \
self.member == other.member and self.chunk == other.chunk and \
self.box == other.box and self.variable == other.variable
def __str__(self):
return ('Region mean Startdate: {0.startdate} Member: {0.member} '
'Chunk: {0.chunk} Variable: {0.variable} '
'Box: {0.box} Save 3D: {0.save3d} Save variance: {0.variance} '
'Grid point: {0.grid_point}'.format(self))
def __hash__(self):
return hash(str(self))
@classmethod
def generate_jobs(cls, diags, options):
"""
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: variable, minimum depth (level), maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (
DiagnosticDomainOption(),
DiagnosticVariableListOption(
diags.data_manager.config.var_manager, 'variable'),
DiagnosticBasinListOption('basins', 'global'),
DiagnosticOption('grid_point', 'T'),
DiagnosticIntOption('min_depth', -1),
DiagnosticIntOption('max_depth', -1),
DiagnosticBoolOption('save3D', True),
DiagnosticBoolOption('variance', False),
DiagnosticOption('grid', ''),
DiagnosticFrequencyOption('frequency', diags.config.frequency),
)
options = cls.process_options(options, options_available)
box = Box()
box.min_depth = options['min_depth']
box.max_depth = options['max_depth']
basins = options['basins']
if not basins:
Log.error('Basins not recognized')
return()
for var in options['variable']:
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job = RegionMean(
diags.data_manager, startdate, member, chunk,
options['domain'], var, box,
options['save3D'], options['variance'],
options['basins'],
options['grid_point'].lower(),
options['frequency'],
)
job_list.append(job)
self.variable_file = self.request_chunk(self.domain, self.variable,
self.startdate, self.member,
self.chunk)
if self.box.min_depth == -1 and self.box.max_depth == -1:
box_save = None
else:
box_save = self.box
self._declare_var('mean', False, box_save)
self._declare_var('mean', True, box_save)
has_levels = self._fix_file_metadata()
data = self._load_data()
masks = {}
self.basins.sort()
for basin in self.basins:
masks[basin] = Utils.get_mask(basin)
mesh = Nemo('mesh_hgr.nc', 'mask_regions.nc')
self._mean_2d_var(data, mesh, masks)
def _mean_2d_var(self, data, mesh, masks):
areacello = mesh.get_areacello(cell_point=self.grid_point)
mean = regmean.compute_regmean_2D(data.data, masks, areacello)
self._save_result_2D('mean', mean, data)
def _meand_3d_variable(self, data, mesh, masks):
areacello = mesh.get_areacello(cell_point=self.grid_point)
e3.coord('depth').bounds = data.coord('depth').bounds
if self.box.min_depth is not -1 and self.box.max_depth is not -1:
depth_constraint = iris.Constraint(
depth=lambda c: self.box.min_depth <= c <= self.box.max_depth
)
e3 = e3.extract(depth_constraint)
data = data.extract(depth_constraint)
volcello = areacello * e3.data.astype(np.float32)
mean = regmean.compute_regmean_3D(data.data, masks, volcello)
self._save_result_2D('mean', mean, data)
if self.save3d:
mean3d = regmean.compute_regmean_levels(data.data, masks, volcello)
cube = iris.load_cube('mesh_hgr.nc', f'e{number}{self.grid_point}')
cube = iris.load_cube(
'mesh_hgr.nc', f'e{number}{self.grid_point}_0')
cube = iris.util.squeeze(cube)
dims = len(cube.shape)
try:
cube.coord('i')
except iris.exceptions.CoordinateNotFoundError:
cube.add_dim_coord(iris.coords.DimCoord(np.arange(cube.shape[dims - 1]),
var_name='i'), dims - 1)
try:
cube.coord('j')
except iris.exceptions.CoordinateNotFoundError:
cube.add_dim_coord(iris.coords.DimCoord(np.arange(cube.shape[dims - 2]),
var_name='j'), dims - 2)
coords = []
handler = Utils.open_cdf(self.variable_file.local_file)
for variable in handler.variables:
if variable in ('time', 'lev', 'lat', 'lon', 'latitude',
'longitude', 'leadtime', 'time_centered'):
coords.append(variable)
if variable == 'time_centered':
handler.variables[variable].standard_name = ''
handler.variables[self.variable].coordinates = ' '.join(coords)
handler.close()
data = iris.load_cube(self.variable_file.local_file)
return self._rename_depth(data)
def _rename_depth(self, data):
for coord_name in ('model_level_number', 'Vertical T levels', 'lev'):
if data.coords(coord_name):
coord = data.coord(coord_name)
coord.standard_name = 'depth'
coord.long_name = 'depth'
break
def _fix_file_metadata(self):
handler = Utils.open_cdf(self.variable_file.local_file)
var = handler.variables[self.variable]
coordinates = ''
has_levels = False
for dimension in handler.variables.keys():
if dimension in ['time', 'lev', 'lat', 'latitude',
'lon', 'longitude', 'i', 'j']:
coordinates += ' {0}'.format(dimension)
if dimension == 'lev':
has_levels = True
var.coordinates = coordinates
handler.close()
return has_levels
def _declare_var(self, var, threed, box_save):
if threed:
if not self.save3d:
return False
final_name = '{1}3d{0}'.format(var, self.variable)
final_name = '{1}{0}'.format(var, self.variable)
self.declared[final_name] = self.declare_chunk(ModelingRealms.ocean,
final_name,
self.startdate,
self.member,
self.chunk,
box=box_save,
region=self.basins)
def _save_result_2D(self, var, result, data):
final_name = '{1}{0}'.format(var, self.variable)
temp = TempFile.get()
handler_source = Utils.open_cdf(self.variable_file.local_file)
handler_temp = Utils.open_cdf(temp, 'w')
Utils.copy_variable(handler_source, handler_temp, 'time', True, True)
handler_temp.createDimension('region', len(result))
handler_temp.createDimension('region_length', 50)
var_region = handler_temp.createVariable('region', 'S1',
('region', 'region_length'))
var = handler_temp.createVariable('{1}{0}'.format(var, self.variable),
float, ('time', 'region',),)
var.units = '{0}'.format(data.units)
for i, basin in enumerate(result):
var_region[i, ...] = netCDF4.stringtoarr(str(basin), 50)
var[..., i] = result[basin]
handler_temp.close()
self.declared[final_name].set_local_file(temp, diagnostic=self)
def _save_result_3D(self, var, result, data):
temp = TempFile.get()
handler_source = Utils.open_cdf(self.variable_file.local_file)
handler_temp = Utils.open_cdf(temp, 'w')
Utils.copy_variable(handler_source, handler_temp, 'time', True, True)
handler_temp.createDimension('region', len(result))
handler_temp.createDimension('region_length', 50)
handler_temp.createDimension('lev', data.shape[1])
var_level = handler_temp.createVariable('lev', float, 'lev')
var_level[...] = data.coord('depth').points
var_level.units = 'm'
var_level.axis = 'Z'
var_level.positive = 'down'
var_level.long_name = 'ocean depth coordinate'
var_level.standard_name = 'depth'
var_region = handler_temp.createVariable('region', 'S1',
('region', 'region_length'))
var = handler_temp.createVariable('{1}3d{0}'.format(var, self.variable),
var.units = '{0}'.format(data.units)
for i, basin in enumerate(result):
var_region[i, ...] = netCDF4.stringtoarr(str(basin), 50)
var[..., i] = result[basin]
handler_temp.close()
self.declared[final_name].set_local_file(temp, diagnostic=self)