Newer
Older
"""Diagnostic to compute regional averages"""
import iris.coords
import iris.exceptions
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticIntOption, DiagnosticDomainOption, \
DiagnosticBoolOption, DiagnosticBasinOption, DiagnosticVariableOption
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
from bscearth.utils.log import Log
class RegionMean(Diagnostic):
"""
Computes the mean value of the field (3D, weighted).
For 3D fields, a horizontal mean for each level is also given. If a spatial window
is specified, the mean value is computed only in this window.
:original author: Javier Vegas-Regidor <javier.vegas@bsc.es>
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable to average
:type variable: str
:param box: box used to restrict the vertical mean
:type box: Box
"""
alias = 'regmean'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, startdate, member, chunk, domain, variable, box, save3d, weights_file,
variance, basin):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.domain = domain
self.variable = variable
self.box = box
self.weights_file = weights_file
self.basin = basin
self.lat_name = 'lat'
self.lon_name = 'lon'
if type(self) is not type(other):
return False
return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
self.box == other.box and self.variable == other.variable
def __str__(self):
return 'Region mean Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} Variable: {0.variable} ' \
'Box: {0.box} Save 3D: {0.save3d} Save variance: {0.variance}'.format(self)
def __hash__(self):
return hash(str(self))
@classmethod
def generate_jobs(cls, diags, options):
"""
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: variable, minimum depth (level), maximum depth (level)
:type options: list[str]
:return:
"""
DiagnosticVariableOption(diags.data_manager.config.var_manager),
DiagnosticBasinOption('basin', Basins().Global),
DiagnosticIntOption('min_depth', -1),
DiagnosticIntOption('max_depth', -1),
DiagnosticBoolOption('variance', False),
DiagnosticOption('grid', ''))
options = cls.process_options(options, options_available)
box = Box()
box.min_depth = options['min_depth']
box.max_depth = options['max_depth']
weights_file = TempFile.get()
weight_diagnostics = ComputeWeights(diags.data_manager, options['grid_point'], options['basin'], box,
weights_file)
job_list = list()
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job = RegionMean(diags.data_manager, startdate, member, chunk,
options['domain'], options['variable'], box,
options['save3D'], weights_file, options['variance'], options['basin'])
job.add_subjob(weight_diagnostics)
job_list.append(job)
self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk)
if self.box.min_depth == 0:
# To cdftools, this means all levels
box_save = None
else:
box_save = self.box
self._declare_var('mean', False, box_save)
self._declare_var('mean', True, box_save)
self._declare_var('var', False, box_save)
self._declare_var('var', True, box_save)
iris.FUTURE.netcdf_promote = True
iris.FUTURE.netcdf_no_unlimited = True
handler = Utils.open_cdf(self.variable_file.local_file)
var = handler.variables[self.variable]
coordinates = ''
has_levels = False
for dimension in handler.variables.keys():
if dimension in ['time', 'lev', 'lat', 'latitude', 'lon', 'longitude', 'i', 'j']:
coordinates += ' {0}'.format(dimension)
if dimension == 'lev':
has_levels = True
var.coordinates = coordinates
handler.close()
def add_i_j(cube, field, filename):
if cube.var_name != self.variable:
return
if not cube.coords('i'):
index = field.dimensions.index('i')
i = np.arange(1, field.shape[index]+1)
i_coord = iris.coords.DimCoord(i, var_name='i')
cube.add_dim_coord(i_coord, index)
if not cube.coords('j'):
index = field.dimensions.index('j')
i = np.arange(1, field.shape[index] + 1)
i_coord = iris.coords.DimCoord(i, var_name='j')
cube.add_dim_coord(i_coord, index)
if not cube.coords('lev'):
index = field.dimensions.index('lev')
i = np.arange(1, field.shape[index] + 1)
lev = iris.coords.AuxCoord(i, var_name='lev')
cube.add_aux_coord(lev, index)
data = iris.load_cube(self.variable_file.local_file,
iris.AttributeConstraint(short_name=self.variable),
callback=add_i_j)
weights = iris.load_cube(self.weights_file, 'weights').data
i_indexes = iris.load_cube(self.weights_file, 'i_indexes').data
j_indexes = iris.load_cube(self.weights_file, 'j_indexes').data
lev_limits = iris.load_cube(self.weights_file, 'lev_limits').data
def selected_i(cell):
return cell.point - 1 in i_indexes
def selected_j(cell):
return cell.point - 1 in j_indexes
def selected_level(cell):
return lev_limits[0] <= cell.point <= lev_limits[1]
data = data.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level))
if has_levels:
mean = iris.cube.CubeList()
mean3d = iris.cube.CubeList()
var = iris.cube.CubeList()
var3d = iris.cube.CubeList()
for time_slice in data.slices_over('time'):
mean.append(time_slice.collapsed(['latitude', 'longitude', 'depth'],
iris.analysis.MEAN, weights=weights))
if self.save3d:
mean3d.append(time_slice.collapsed(['latitude', 'longitude'],
iris.analysis.MEAN, weights=weights))
if self.variance:
var.append(time_slice.collapsed(['latitude', 'longitude', 'depth'],
iris.analysis.VARIANCE, weights=weights))
if self.save3d:
var3d.append(time_slice.collapsed(['latitude', 'longitude'],
iris.analysis.VARIANCE, weights=weights))
self._send_var('mean', True, mean3d)
self._send_var('mean', False, mean)
self._send_var('var', True, var3d)
self._send_var('var', False, var)
else:
mean = iris.cube.CubeList()
var = iris.cube.CubeList()
for time_slice in data.slices_over('time'):
mean.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.MEAN, weights=weights))
var.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.VARIANCE, weights=weights))
self._send_var('mean', False, mean.merge_cube())
if self.variance:
self._send_var('var', False, var.merge_cube())
def _declare_var(self, var, threed, box_save):
if threed:
if not self.save3d:
return False
final_name = '{1}3d{0}iris'.format(var, self.variable)
final_name = '{1}{0}iris'.format(var, self.variable)
self.declared[final_name] = self.declare_chunk(ModelingRealms.ocean, final_name, self.startdate, self.member,
self.chunk, box=box_save, region=self.basin)
def _send_var(self, var, threed, cube_list):
if not self.save3d and threed:
return False
final_name = '{1}3d{0}iris'.format(var, self.variable)
else:
final_name = '{1}{0}iris'.format(var, self.variable)
cube = cube_list.merge_cube()
cube.remove_coord('latitude')
cube.remove_coord('longitude')
cube.remove_coord('depth')
temp = TempFile.get()
iris.save(cube, temp)
self.declared[final_name].set_local_file(temp, diagnostic=self, rename_var='result', region=self.basin)
class ComputeWeights(Diagnostic):
"""
Diagnostic used to compute regional mean and sum weights
Parameters
----------
data_manager: DataManager
weights_file: str
"""
alias = 'computeinterpcdoweights'
"Diagnostic alias for the configuration file"
@classmethod
def generate_jobs(cls, diags, options):
pass
def __init__(self, data_manager, grid_point, basin, box, weights_file):
Diagnostic.__init__(self, data_manager)
self.weights_file = weights_file
self.basin = basin
self.grid_point = grid_point.lower()
def __eq__(self, other):
if type(self) is not type(other):
return False
return self.weights_file == other.weights_file and self.basin == other.basin and \
self.grid_point == other.grid_point and self.box != other.box
return 'Computing weights for region averaging: Point {0.grid_point} Basin: {0.basin} Box: {0.box}'\
.format(self)
def __hash__(self):
return hash(str(self))
def compute(self):
"""Compute weights"""
iris.FUTURE.netcdf_promote = True
iris.FUTURE.netcdf_no_unlimited = True
mask = np.squeeze(Utils.get_mask(self.basin))
i_indexes = np.where(np.any(mask != 0, 0))[0]
j_indexes = np.where(np.any(mask != 0, 1))[0]
mask_small = np.take(np.take(mask, i_indexes, 1), j_indexes, 0)
e1 = self.try_load_cube(1)
e2 = self.try_load_cube(2)
e3 = self.try_load_cube(3)
depth = iris.util.squeeze(iris.load_cube('mesh_hgr.nc', 'gdept_0'))
if self.box.min_depth == -1:
min_level = 0
else:
distance = abs((depth - self.box.min_depth).data)
min_level = np.argmin(distance)
if self.box.max_depth == -1:
max_level = depth.shape[0]
else:
distance = abs((depth - self.box.max_depth).data)
max_level = np.argmin(distance)
def selected_i(cell):
return cell.point - 1 in i_indexes
def selected_j(cell):
return cell.point - 1 in j_indexes
def selected_level(cell):
return min_level <= cell.point <= max_level
e1_small = e1.extract(iris.Constraint(i=selected_i, j=selected_j))
e2_small = e2.extract(iris.Constraint(i=selected_i, j=selected_j))
e3_small = e3.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level))
mask_small = e1_small * e2_small * mask_small
for coord in e3_small.coords():
e3_small.remove_coord(coord)
for coord in mask_small.coords():
mask_small.remove_coord(coord)
weights = e3_small * mask_small
weights.var_name = 'weights'
i_indexes = iris.cube.Cube(i_indexes, var_name='i_indexes')
j_indexes = iris.cube.Cube(j_indexes, var_name='j_indexes')
lev_limits = iris.cube.Cube([min_level, max_level], var_name='lev_limits')
iris.save((weights, i_indexes, j_indexes, lev_limits), self.weights_file)
def try_load_cube(self, number):
try:
cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}'.format(number, self.grid_point))
except iris.exceptions.ConstraintMismatchError:
cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}_0'.format(number, self.grid_point))
return iris.util.squeeze(cube)
def request_data(self):
"""Request data required by the diagnostic"""
def declare_data_generated(self):
"""Declare data to be generated by the diagnostic"""
pass