# coding=utf-8 """Diagnostic to compute regional averages""" import iris import iris.util import iris.coords import iris.analysis import iris.exceptions from bscearth.utils.log import Log import numpy as np import netCDF4 from earthdiagnostics.box import Box from earthdiagnostics.diagnostic import ( Diagnostic, DiagnosticOption, DiagnosticIntOption, DiagnosticDomainOption, DiagnosticBoolOption, DiagnosticBasinListOption, DiagnosticVariableListOption, DiagnosticFrequencyOption, ) from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile import diagonals.regmean as regmean from diagonals.mesh_helpers.nemo import Nemo class RegionMean(Diagnostic): """ Computes the mean value of the field (3D, weighted). For 3D fields, a horizontal mean for each level is also given. If a spatial window is specified, the mean value is computed only in this window. :original author: Javier Vegas-Regidor :created: March 2017 :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int :param variable: variable to average :type variable: str :param box: box used to restrict the vertical mean :type box: Box """ alias = "regmean" "Diagnostic alias for the configuration file" def __init__( self, data_manager, startdate, member, chunk, domain, variable, box, save3d, basins, grid_point, frequency, ): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.domain = domain self.variable = variable self.box = box self.save3d = save3d self.basins = basins self.grid_point = grid_point self.frequency = frequency self.declared = {} self.lat_name = "lat" self.lon_name = "lon" self._length = max(len(basin.name) for basin in self.basins) Log.debug(f'Max basin length: {self._length}') def __eq__(self, other): if self._different_type(other): return False return ( self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and self.box == other.box and self.variable == other.variable ) def __str__(self): return ( "Region mean Startdate: {0.startdate} Member: {0.member} " "Chunk: {0.chunk} Variable: {0.variable} " "Box: {0.box} Save 3D: {0.save3d} " "Grid point: {0.grid_point}".format(self) ) def __hash__(self): return hash(str(self)) @classmethod def generate_jobs(cls, diags, options): """ Create a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: variable, minimum depth (level), maximum depth (level) :type options: list[str] :return: """ options_available = ( DiagnosticDomainOption(), DiagnosticVariableListOption( diags.data_manager.config.var_manager, "variable" ), DiagnosticBasinListOption("basins", "global"), DiagnosticOption("grid_point", "T"), DiagnosticIntOption("min_depth", -1), DiagnosticIntOption("max_depth", -1), DiagnosticBoolOption("save3D", True), DiagnosticOption("grid", ""), DiagnosticFrequencyOption("frequency", diags.config.frequency), ) options = cls.process_options(options, options_available) box = Box() box.min_depth = options["min_depth"] box.max_depth = options["max_depth"] basins = options["basins"] if not basins: Log.error("Basins not recognized") return () job_list = list() for var in options["variable"]: for ( startdate, member, chunk, ) in diags.config.experiment.get_chunk_list(): job = RegionMean( diags.data_manager, startdate, member, chunk, options["domain"], var, box, options["save3D"], options["basins"], options["grid_point"].lower(), options["frequency"], ) job_list.append(job) return job_list def request_data(self): """Request data required by the diagnostic""" self.variable_file = self.request_chunk( self.domain, self.variable, self.startdate, self.member, self.chunk ) def declare_data_generated(self): """Declare data to be generated by the diagnostic""" if self.box.min_depth == -1 and self.box.max_depth == -1: box_save = None else: box_save = self.box self._declare_var("mean", False, box_save) self._declare_var("mean", True, box_save) def compute(self): """Run the diagnostic""" has_levels = self._fix_file_metadata() data = self._load_data() masks = {} self.basins.sort() for basin in self.basins: masks[basin] = Utils.get_mask(basin) mesh = Nemo("mesh_hgr.nc", "mask_regions.nc") if has_levels: self._meand_3d_variable(data, mesh, masks) else: self._mean_2d_var(data, mesh, masks) def _mean_2d_var(self, data, mesh, masks): areacello = mesh.get_areacello(cell_point=self.grid_point) mean = regmean.compute_regmean_2d(data.core_data(), masks, areacello) self._save_result_2d("mean", mean, data) def _meand_3d_variable(self, data, mesh, masks): areacello = mesh.get_areacello(cell_point=self.grid_point) e3 = self._try_load_cube(3) e3 = self._rename_depth(e3) e3.coord("depth").bounds = data.coord("depth").bounds if self.box.min_depth != -1 and self.box.max_depth != -1: depth_constraint = iris.Constraint( depth=lambda c: self.box.min_depth <= c <= self.box.max_depth ) e3 = e3.extract(depth_constraint) data = data.extract(depth_constraint) if self.box.min_depth == -1 and self.box.max_depth != -1: self.box.min_depth = 0 depth_constraint = iris.Constraint( depth=lambda c: self.box.min_depth <= c <= self.box.max_depth ) e3 = e3.extract(depth_constraint) data = data.extract(depth_constraint) if self.box.min_depth != -1 and self.box.max_depth == -1: self.box.max_depth = 6000 depth_constraint = iris.Constraint( depth=lambda c: self.box.min_depth <= c <= self.box.max_depth ) e3 = e3.extract(depth_constraint) data = data.extract(depth_constraint) volcello = areacello * e3.data.astype(np.float32) mean = regmean.compute_regmean_3d(data.core_data(), masks, volcello) self._save_result_2d("mean", mean, data) if self.save3d: mean3d = regmean.compute_regmean_levels( data.core_data(), masks, volcello ) self._save_result_3d("mean", mean3d, data) def _try_load_cube(self, number): try: cube = iris.load_cube("mesh_hgr.nc", f"e{number}{self.grid_point}") except iris.exceptions.ConstraintMismatchError: cube = iris.load_cube( "mesh_hgr.nc", f"e{number}{self.grid_point}_0" ) cube = iris.util.squeeze(cube) dims = len(cube.shape) try: cube.coord("i") except iris.exceptions.CoordinateNotFoundError: cube.add_dim_coord( iris.coords.DimCoord( np.arange(cube.shape[dims - 1]), var_name="i" ), dims - 1, ) try: cube.coord("j") except iris.exceptions.CoordinateNotFoundError: cube.add_dim_coord( iris.coords.DimCoord( np.arange(cube.shape[dims - 2]), var_name="j" ), dims - 2, ) return cube def _load_data(self): data = iris.load_cube(self.variable_file.local_file) return self._rename_depth(data) def _rename_depth(self, data): for coord_name in ("model_level_number", "Vertical T levels", "lev"): if data.coords(coord_name): coord = data.coord(coord_name) coord.standard_name = "depth" coord.long_name = "depth" break return data def _fix_file_metadata(self): handler = Utils.open_cdf(self.variable_file.local_file) has_levels = False for dimension in handler.variables.keys(): if dimension == "lev": has_levels = True handler.close() return has_levels def _declare_var(self, var, threed, box_save): if threed: if not self.save3d: return False final_name = "{1}3d{0}".format(var, self.variable) else: final_name = "{1}{0}".format(var, self.variable) self.declared[final_name] = self.declare_chunk( ModelingRealms.ocean, final_name, self.startdate, self.member, self.chunk, box=box_save, region=self.basins, frequency=self.frequency, ) def _save_result_2d(self, var, result, data): final_name = "{1}{0}".format(var, self.variable) temp = TempFile.get() handler_source = Utils.open_cdf(self.variable_file.local_file) handler_temp = Utils.open_cdf(temp, "w") Utils.copy_variable(handler_source, handler_temp, "time", True, True) handler_temp.createDimension("region", len(result)) handler_temp.createDimension("region_length", self._length) var_region = handler_temp.createVariable( "region", "S1", ("region", "region_length") ) var = handler_temp.createVariable( "{1}{0}".format(var, self.variable), float, ("region", "time"), ) var.units = "{0}".format(data.units) for i, basin in enumerate(result): var_region[i, ...] = netCDF4.stringtoarr(str(basin), self._length) var[i, ...] = result[basin] handler_temp.close() self.declared[final_name].set_local_file(temp, diagnostic=self) def _save_result_3d(self, var, result, data): final_name = "{1}3d{0}".format(var, self.variable) temp = TempFile.get() handler_source = Utils.open_cdf(self.variable_file.local_file) handler_temp = Utils.open_cdf(temp, "w") Utils.copy_variable(handler_source, handler_temp, "time", True, True) handler_temp.createDimension("region", len(result)) handler_temp.createDimension("region_length", self._length) handler_temp.createDimension("lev", data.shape[1]) var_level = handler_temp.createVariable("lev", float, "lev") var_level[...] = data.coord("depth").points var_level.units = "m" var_level.axis = "Z" var_level.positive = "down" var_level.long_name = "ocean depth coordinate" var_level.standard_name = "depth" var_region = handler_temp.createVariable( "region", "S1", ("region", "region_length") ) var = handler_temp.createVariable( "{1}3d{0}".format(var, self.variable), float, ("time", "lev", "region",), ) var.units = "{0}".format(data.units) for i, basin in enumerate(result): var_region[i, ...] = netCDF4.stringtoarr(str(basin), self._length) var[..., i] = result[basin] handler_temp.close() self.declared[final_name].set_local_file(temp, diagnostic=self)