Newer
Older
"""Diagnostic to compute regional averages"""
import iris.coords
import iris.exceptions
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticIntOption, DiagnosticDomainOption, \
DiagnosticBoolOption, DiagnosticBasinOption, DiagnosticVariableOption
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
class RegionMean(Diagnostic):
"""
Computes the mean value of the field (3D, weighted).
For 3D fields, a horizontal mean for each level is also given. If a spatial window
is specified, the mean value is computed only in this window.
:original author: Javier Vegas-Regidor <javier.vegas@bsc.es>
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable to average
:type variable: str
:param box: box used to restrict the vertical mean
:type box: Box
"""
alias = 'regmean'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, startdate, member, chunk, domain, variable, box, save3d, weights_file,
variance, basin):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.domain = domain
self.variable = variable
self.box = box
self.weights_file = weights_file
self.basin = basin
self.lat_name = 'lat'
self.lon_name = 'lon'
if type(self) is not type(other):
return False
return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
self.box == other.box and self.variable == other.variable
def __str__(self):
return 'Region mean Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} Variable: {0.variable} ' \
'Box: {0.box} Save 3D: {0.save3d} Save variance: {0.variance}'.format(self)
def __hash__(self):
return hash(str(self))
@classmethod
def generate_jobs(cls, diags, options):
"""
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: variable, minimum depth (level), maximum depth (level)
:type options: list[str]
:return:
"""
DiagnosticVariableOption(diags.data_manager.config.var_manager),
DiagnosticBasinOption('basin', Basins().Global),
DiagnosticIntOption('min_depth', -1),
DiagnosticIntOption('max_depth', -1),
DiagnosticBoolOption('variance', False),
DiagnosticOption('grid', ''))
options = cls.process_options(options, options_available)
box = Box()
box.min_depth = options['min_depth']
box.max_depth = options['max_depth']
weights_file = TempFile.get()
weight_diagnostics = ComputeWeights(diags.data_manager, options['grid_point'], options['basin'], box,
weights_file)
job_list = list()
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job = RegionMean(diags.data_manager, startdate, member, chunk,
options['domain'], options['variable'], box,
options['save3D'], weights_file, options['variance'], options['basin'])
job.add_subjob(weight_diagnostics)
job_list.append(job)
self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk)
if self.box.min_depth == 0:
# To cdftools, this means all levels
box_save = None
else:
box_save = self.box
self._declare_var('mean', False, box_save)
self._declare_var('mean', True, box_save)
self._declare_var('var', False, box_save)
self._declare_var('var', True, box_save)
iris.FUTURE.netcdf_promote = True
iris.FUTURE.netcdf_no_unlimited = True
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
has_levels = self._fix_file_metadata()
data = self._load_data()
weights = iris.load_cube(self.weights_file, 'weights').data
i_indexes = iris.load_cube(self.weights_file, 'i_indexes').data
j_indexes = iris.load_cube(self.weights_file, 'j_indexes').data
lev_limits = iris.load_cube(self.weights_file, 'lev_limits').data
def selected_i(cell):
return cell.point - 1 in i_indexes
def selected_j(cell):
return cell.point - 1 in j_indexes
def selected_level(cell):
return lev_limits[0] <= cell.point <= lev_limits[1]
data = data.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level))
if has_levels:
self._meand_3d_variable(data, weights)
else:
self._mean_2d_var(data, weights)
def _mean_2d_var(self, data, weights):
mean = iris.cube.CubeList()
var = iris.cube.CubeList()
for time_slice in data.slices_over('time'):
mean.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.MEAN, weights=weights))
var.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.VARIANCE, weights=weights))
self._send_var('mean', False, mean.merge_cube())
if self.variance:
self._send_var('var', False, var.merge_cube())
def _meand_3d_variable(self, data, weights):
mean = iris.cube.CubeList()
mean3d = iris.cube.CubeList()
var = iris.cube.CubeList()
var3d = iris.cube.CubeList()
for time_slice in data.slices_over('time'):
mean.append(time_slice.collapsed(['latitude', 'longitude', 'depth'],
iris.analysis.MEAN, weights=weights))
if self.save3d:
mean3d.append(time_slice.collapsed(['latitude', 'longitude'],
iris.analysis.MEAN, weights=weights))
if self.variance:
var.append(time_slice.collapsed(['latitude', 'longitude', 'depth'],
iris.analysis.VARIANCE, weights=weights))
if self.save3d:
var3d.append(time_slice.collapsed(['latitude', 'longitude'],
iris.analysis.VARIANCE, weights=weights))
self._send_var('mean', True, mean3d)
self._send_var('mean', False, mean)
if self.variance:
self._send_var('var', True, var3d)
self._send_var('var', False, var)
def add_i_j(cube, field, filename):
if cube.var_name != self.variable:
return
if not cube.coords('i'):
index = field.dimensions.index('i')
i = np.arange(1, field.shape[index] + 1)
i_coord = iris.coords.DimCoord(i, var_name='i')
cube.add_dim_coord(i_coord, index)
if not cube.coords('j'):
index = field.dimensions.index('j')
i = np.arange(1, field.shape[index] + 1)
i_coord = iris.coords.DimCoord(i, var_name='j')
cube.add_dim_coord(i_coord, index)
if not cube.coords('lev'):
index = field.dimensions.index('lev')
i = np.arange(1, field.shape[index] + 1)
lev = iris.coords.AuxCoord(i, var_name='lev')
cube.add_aux_coord(lev, index)
data = iris.load_cube(self.variable_file.local_file,
iris.AttributeConstraint(short_name=self.variable),
callback=add_i_j)
def _fix_file_metadata(self):
handler = Utils.open_cdf(self.variable_file.local_file)
var = handler.variables[self.variable]
coordinates = ''
has_levels = False
for dimension in handler.variables.keys():
if dimension in ['time', 'lev', 'lat', 'latitude', 'lon', 'longitude', 'i', 'j']:
coordinates += ' {0}'.format(dimension)
if dimension == 'lev':
has_levels = True
var.coordinates = coordinates
handler.close()
return has_levels
def _declare_var(self, var, threed, box_save):
if threed:
if not self.save3d:
return False
final_name = '{1}3d{0}'.format(var, self.variable)
final_name = '{1}{0}'.format(var, self.variable)
self.declared[final_name] = self.declare_chunk(ModelingRealms.ocean, final_name, self.startdate, self.member,
self.chunk, box=box_save, region=self.basin)
def _send_var(self, var, threed, cube_list):
if not self.save3d and threed:
return False
final_name = '{1}3d{0}'.format(var, self.variable)
final_name = '{1}{0}'.format(var, self.variable)
cube = cube_list.merge_cube()
cube.remove_coord('latitude')
cube.remove_coord('longitude')
cube.remove_coord('depth')
temp = TempFile.get()
iris.save(cube, temp)
self.declared[final_name].set_local_file(temp, diagnostic=self, rename_var='result', region=self.basin)
class ComputeWeights(Diagnostic):
"""
Diagnostic used to compute regional mean and sum weights
Parameters
----------
data_manager: DataManager
weights_file: str
"""
"Diagnostic alias for the configuration file"
@classmethod
def generate_jobs(cls, diags, options):
"""
Generate the instances of the diagnostics that will be run by the manager
This method does not does anything as this diagnostic is not expected to be called by the users
"""
def __init__(self, data_manager, grid_point, basin, box, weights_file):
Diagnostic.__init__(self, data_manager)
self.weights_file = weights_file
self.basin = basin
self.grid_point = grid_point.lower()
def __eq__(self, other):
if type(self) is not type(other):
return False
return self.weights_file == other.weights_file and self.basin == other.basin and \
self.grid_point == other.grid_point and self.box != other.box
return 'Computing weights for region averaging: Point {0.grid_point} Basin: {0.basin} Box: {0.box}'\
.format(self)
def __hash__(self):
return hash(str(self))
def compute(self):
"""Compute weights"""
iris.FUTURE.netcdf_promote = True
iris.FUTURE.netcdf_no_unlimited = True
mask = np.squeeze(Utils.get_mask(self.basin))
i_indexes = np.where(np.any(mask != 0, 0))[0]
j_indexes = np.where(np.any(mask != 0, 1))[0]
mask_small = np.take(np.take(mask, i_indexes, 1), j_indexes, 0)
e1 = self._try_load_cube(1)
e2 = self._try_load_cube(2)
e3 = self._try_load_cube(3)
depth = iris.util.squeeze(iris.load_cube('mesh_hgr.nc', 'gdept_0'))
if self.box.min_depth == -1:
min_level = 0
else:
distance = abs((depth - self.box.min_depth).data)
min_level = np.argmin(distance)
if self.box.max_depth == -1:
max_level = depth.shape[0]
else:
distance = abs((depth - self.box.max_depth).data)
max_level = np.argmin(distance)
def selected_i(cell):
return cell.point - 1 in i_indexes
def selected_j(cell):
return cell.point - 1 in j_indexes
def selected_level(cell):
return min_level <= cell.point <= max_level
e1_small = e1.extract(iris.Constraint(i=selected_i, j=selected_j))
e2_small = e2.extract(iris.Constraint(i=selected_i, j=selected_j))
e3_small = e3.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level))
mask_small = e1_small * e2_small * mask_small
for coord in e3_small.coords():
e3_small.remove_coord(coord)
for coord in mask_small.coords():
mask_small.remove_coord(coord)
weights = e3_small * mask_small
weights.var_name = 'weights'
i_indexes = iris.cube.Cube(i_indexes, var_name='i_indexes')
j_indexes = iris.cube.Cube(j_indexes, var_name='j_indexes')
lev_limits = iris.cube.Cube([min_level, max_level], var_name='lev_limits')
iris.save((weights, i_indexes, j_indexes, lev_limits), self.weights_file)
try:
cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}'.format(number, self.grid_point))
except iris.exceptions.ConstraintMismatchError:
cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}_0'.format(number, self.grid_point))
return iris.util.squeeze(cube)
def request_data(self):
"""Request data required by the diagnostic"""
def declare_data_generated(self):
"""Declare data to be generated by the diagnostic"""
pass