# coding=utf-8 """Diagnostic to compute regional averages""" import iris import iris.util import iris.coords import iris.analysis import iris.exceptions import numpy as np from earthdiagnostics.box import Box from earthdiagnostics.constants import Basins from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticIntOption, DiagnosticDomainOption, \ DiagnosticBoolOption, DiagnosticBasinOption, DiagnosticVariableOption from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile class RegionMean(Diagnostic): """ Computes the mean value of the field (3D, weighted). For 3D fields, a horizontal mean for each level is also given. If a spatial window is specified, the mean value is computed only in this window. :original author: Javier Vegas-Regidor :created: March 2017 :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int :param variable: variable to average :type variable: str :param box: box used to restrict the vertical mean :type box: Box """ alias = 'regmean' "Diagnostic alias for the configuration file" def __init__(self, data_manager, startdate, member, chunk, domain, variable, box, save3d, weights_file, variance, basin): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.domain = domain self.variable = variable self.box = box self.save3d = save3d self.weights_file = weights_file self.variance = variance self.basin = basin self.declared = {} self.lat_name = 'lat' self.lon_name = 'lon' def __eq__(self, other): if self._different_type(other): return False return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \ self.box == other.box and self.variable == other.variable def __str__(self): return 'Region mean Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} Variable: {0.variable} ' \ 'Box: {0.box} Save 3D: {0.save3d} Save variance: {0.variance}'.format(self) def __hash__(self): return hash(str(self)) @classmethod def generate_jobs(cls, diags, options): """ Create a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: variable, minimum depth (level), maximum depth (level) :type options: list[str] :return: """ options_available = (DiagnosticDomainOption(), DiagnosticVariableOption(diags.data_manager.config.var_manager), DiagnosticOption('grid_point', 'T'), DiagnosticBasinOption('basin', Basins().Global), DiagnosticIntOption('min_depth', -1), DiagnosticIntOption('max_depth', -1), DiagnosticBoolOption('save3D', True), DiagnosticBoolOption('variance', False), DiagnosticOption('grid', '')) options = cls.process_options(options, options_available) box = Box() box.min_depth = options['min_depth'] box.max_depth = options['max_depth'] weights_file = TempFile.get() weight_diagnostics = ComputeWeights(diags.data_manager, options['grid_point'], options['basin'], box, weights_file) job_list = list() for startdate, member, chunk in diags.config.experiment.get_chunk_list(): job = RegionMean(diags.data_manager, startdate, member, chunk, options['domain'], options['variable'], box, options['save3D'], weights_file, options['variance'], options['basin']) job.add_subjob(weight_diagnostics) job_list.append(job) return job_list def request_data(self): """Request data required by the diagnostic""" self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk) def declare_data_generated(self): """Declare data to be generated by the diagnostic""" if self.box.min_depth == 0: # To cdftools, this means all levels box_save = None else: box_save = self.box self._declare_var('mean', False, box_save) self._declare_var('mean', True, box_save) if self.variance: self._declare_var('var', False, box_save) self._declare_var('var', True, box_save) def compute(self): """Run the diagnostic""" iris.FUTURE.netcdf_promote = True iris.FUTURE.netcdf_no_unlimited = True has_levels = self._fix_file_metadata() data = self._load_data() weights = iris.load_cube(self.weights_file, 'weights').data i_indexes = iris.load_cube(self.weights_file, 'i_indexes').data j_indexes = iris.load_cube(self.weights_file, 'j_indexes').data lev_limits = iris.load_cube(self.weights_file, 'lev_limits').data def selected_i(cell): return cell.point - 1 in i_indexes def selected_j(cell): return cell.point - 1 in j_indexes def selected_level(cell): return lev_limits[0] <= cell.point - 1 <= lev_limits[1] data = data.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level)) if has_levels: self._meand_3d_variable(data, weights) else: self._mean_2d_var(data, weights) def _mean_2d_var(self, data, weights): mean = iris.cube.CubeList() var = iris.cube.CubeList() for time_slice in data.slices_over('time'): mean.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.MEAN, weights=weights)) var.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.VARIANCE, weights=weights)) self._send_var('mean', False, mean.merge_cube()) if self.variance: self._send_var('var', False, var.merge_cube()) def _meand_3d_variable(self, data, weights): mean = iris.cube.CubeList() mean3d = iris.cube.CubeList() var = iris.cube.CubeList() var3d = iris.cube.CubeList() for time_slice in data.slices_over('time'): mean.append(time_slice.collapsed(['latitude', 'longitude', 'depth'], iris.analysis.MEAN, weights=weights)) if self.save3d: mean3d.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.MEAN, weights=weights)) if self.variance: var.append(time_slice.collapsed(['latitude', 'longitude', 'depth'], iris.analysis.VARIANCE, weights=weights)) if self.save3d: var3d.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.VARIANCE, weights=weights)) self._send_var('mean', True, mean3d) self._send_var('mean', False, mean) if self.variance: self._send_var('var', True, var3d) self._send_var('var', False, var) def _load_data(self): def add_i_j(cube, field, filename): if cube.var_name != self.variable: raise iris.exceptions.IgnoreCubeException() if not cube.coords('i'): index = field.dimensions.index('i') i = np.arange(1, field.shape[index] + 1) i_coord = iris.coords.DimCoord(i, var_name='i') cube.add_dim_coord(i_coord, index) if not cube.coords('j'): index = field.dimensions.index('j') i = np.arange(1, field.shape[index] + 1) i_coord = iris.coords.DimCoord(i, var_name='j') cube.add_dim_coord(i_coord, index) if not cube.coords('lev'): index = field.dimensions.index('lev') i = np.arange(1, field.shape[index] + 1) lev = iris.coords.AuxCoord(i, var_name='lev') cube.add_aux_coord(lev, index) coords = [] handler = Utils.open_cdf(self.variable_file.local_file) for variable in handler.variables: if variable in ('time', 'lev', 'lat', 'lon', 'latitude', 'longitude'): coords.append(variable) handler.variables[self.variable].coordinates = ' '.join(coords) handler.close() data = iris.load_cube(self.variable_file.local_file, callback=add_i_j) if data.coords('model_level_number'): coord = data.coord('model_level_number') coord.standard_name = 'depth' coord.long_name = 'depth' return data def _fix_file_metadata(self): handler = Utils.open_cdf(self.variable_file.local_file) var = handler.variables[self.variable] coordinates = '' has_levels = False for dimension in handler.variables.keys(): if dimension in ['time', 'lev', 'lat', 'latitude', 'lon', 'longitude', 'i', 'j']: coordinates += ' {0}'.format(dimension) if dimension == 'lev': has_levels = True var.coordinates = coordinates handler.close() return has_levels def _declare_var(self, var, threed, box_save): if threed: if not self.save3d: return False final_name = '{1}3d{0}'.format(var, self.variable) else: final_name = '{1}{0}'.format(var, self.variable) self.declared[final_name] = self.declare_chunk(ModelingRealms.ocean, final_name, self.startdate, self.member, self.chunk, box=box_save, region=self.basin) def _send_var(self, var, threed, cube_list): if threed: if not self.save3d and threed: return False final_name = '{1}3d{0}'.format(var, self.variable) else: final_name = '{1}{0}'.format(var, self.variable) cube = cube_list.merge_cube() print(cube) print(cube.data) cube.var_name = 'result' cube.remove_coord('latitude') cube.remove_coord('longitude') cube.remove_coord('depth') cube.remove_coord('lev') temp = TempFile.get() iris.save(cube, temp) self.declared[final_name].set_local_file(temp, diagnostic=self, rename_var='result', region=self.basin) class ComputeWeights(Diagnostic): """ Diagnostic used to compute regional mean and sum weights Parameters ---------- data_manager: DataManager grid_point: str basin: int weights_file: str """ alias = 'computeregmeanweights' "Diagnostic alias for the configuration file" @classmethod def generate_jobs(cls, diags, options): """ Generate the instances of the diagnostics that will be run by the manager This method does not does anything as this diagnostic is not expected to be called by the users """ pass def __init__(self, data_manager, grid_point, basin, box, weights_file): Diagnostic.__init__(self, data_manager) self.weights_file = weights_file self.basin = basin self.grid_point = grid_point.lower() self.box = box def __eq__(self, other): if self._different_type(other): return False return self.weights_file == other.weights_file and self.basin == other.basin and \ self.grid_point == other.grid_point and self.box != other.box def __str__(self): return 'Computing weights for region averaging: Point {0.grid_point} Basin: {0.basin} Box: {0.box}'\ .format(self) def __hash__(self): return hash(str(self)) def compute(self): """Compute weights""" iris.FUTURE.netcdf_promote = True iris.FUTURE.netcdf_no_unlimited = True mask = np.squeeze(Utils.get_mask(self.basin, True)) surface_mask = mask[0, ...] i_indexes = np.where(np.any(surface_mask != 0, 0))[0] j_indexes = np.where(np.any(surface_mask != 0, 1))[0] mask_small = np.take(np.take(mask, i_indexes, 2), j_indexes, 1) e1 = self._try_load_cube(1) e2 = self._try_load_cube(2) e3 = self._try_load_cube(3) depth = iris.util.squeeze(iris.load_cube('mesh_hgr.nc', 'gdept_0')) if self.box.min_depth == -1: min_level = 0 else: distance = abs((depth - self.box.min_depth).data) min_level = np.argmin(distance) if self.box.max_depth == -1: max_level = depth.shape[0] else: distance = abs((depth - self.box.max_depth).data) max_level = np.argmin(distance) def selected_i(cell): return cell.point - 1 in i_indexes def selected_j(cell): return cell.point - 1 in j_indexes def selected_level(cell): return min_level <= cell.point - 1 <= max_level e1_small = e1.extract(iris.Constraint(i=selected_i, j=selected_j)) e2_small = e2.extract(iris.Constraint(i=selected_i, j=selected_j)) e3_small = e3.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level)) mask_small = mask_small[min_level:max_level + 1, ...] mask_small = e3_small * mask_small e_small = e1_small * e2_small for coord in e_small.coords(): e_small.remove_coord(coord) for coord in mask_small.coords(): mask_small.remove_coord(coord) weights = mask_small * e_small weights.var_name = 'weights' i_indexes = iris.cube.Cube(i_indexes, var_name='i_indexes') j_indexes = iris.cube.Cube(j_indexes, var_name='j_indexes') lev_limits = iris.cube.Cube([min_level, max_level], var_name='lev_limits') iris.save((weights, i_indexes, j_indexes, lev_limits), self.weights_file) def _try_load_cube(self, number): try: cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}'.format(number, self.grid_point)) except iris.exceptions.ConstraintMismatchError: cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}_0'.format(number, self.grid_point)) return iris.util.squeeze(cube) def request_data(self): """Request data required by the diagnostic""" pass def declare_data_generated(self): """Declare data to be generated by the diagnostic""" pass