# coding=utf-8 """Diagnostic to calculate a region total""" import iris import iris.util import iris.coords import iris.analysis import iris.exceptions from bscearth.utils.log import Log import numpy as np import netCDF4 from earthdiagnostics.box import Box from earthdiagnostics.diagnostic import ( Diagnostic, DiagnosticOption, DiagnosticIntOption, DiagnosticDomainOption, DiagnosticBoolOption, DiagnosticBasinListOption, DiagnosticVariableListOption, DiagnosticFrequencyOption ) from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile import diagonals.regsum as regsum from diagonals.mesh_helpers.nemo import Nemo class RegionSum(Diagnostic): """ Computes the sum of the field (3D, weighted). For 3D fields, a horizontal mean for each level is also given. If a spatial window is specified, the mean value is computed only in this window. :original author: Javier Vegas-Regidor :created: March 2017 :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int :param variable: variable to average :type variable: str :param box: box used to restrict the vertical mean :type box: Box """ alias = "regsum" "Diagnostic alias for the configuration file" def __init__( self, data_manager, startdate, member, chunk, domain, variable, grid_point, box, save3d, basins, grid, frequency, ): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.domain = domain self.variable = variable self.grid_point = grid_point self.box = box self.save3d = save3d self.basins = basins self.grid = grid self.frequency = frequency self.declared = {} self.lat_name = "lat" self.lon_name = "lon" self._hash = None def __eq__(self, other): if self._different_type(other): return False return ( self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and self.box == other.box and self.variable == other.variable and self.grid_point == other.grid_point and self.grid == other.grid and self.basins == other.basins ) def __str__(self): return ( "Region sum Startdate: {0.startdate} Member: {0.member} " "Chunk: {0.chunk} Variable: {0.variable} " "Grid point: {0.grid_point} Box: {0.box} Save 3D: {0.save3d} " "Original grid: {0.grid} Basin: {0.basins}".format(self) ) def __hash__(self): return hash(str(self)) @classmethod def generate_jobs(cls, diags, options): """ Create a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: variable, minimum depth (level), maximum depth (level) :type options: list[str] :return: """ options_available = ( DiagnosticDomainOption(), DiagnosticVariableListOption( diags.data_manager.config.var_manager, "variable" ), DiagnosticBasinListOption("basins", "global"), DiagnosticOption("grid_point", "T"), DiagnosticIntOption("min_depth", -1), DiagnosticIntOption("max_depth", -1), DiagnosticBoolOption("save3D", True), DiagnosticOption("grid", ""), DiagnosticFrequencyOption("frequency", diags.config.frequency), ) options = cls.process_options(options, options_available) box = Box() box.min_depth = options["min_depth"] box.max_depth = options["max_depth"] basins = options["basins"] if not basins: Log.error("Basins not recognized") return () basins.sort() job_list = list() for var in options["variable"]: for ( startdate, member, chunk, ) in diags.config.experiment.get_chunk_list(): job_list.append( RegionSum( diags.data_manager, startdate, member, chunk, options["domain"], var, options["grid_point"].lower(), box, options["save3D"], options["basins"], options["grid"], options["frequency"], ) ) return job_list def request_data(self): """Request data required by the diagnostic""" self.variable_file = self.request_chunk( self.domain, self.variable, self.startdate, self.member, self.chunk, grid=self.grid, frequency=self.frequency ) def declare_data_generated(self): """Declare data to be generated by the diagnostic""" if self.box.min_depth == -1 and self.box.max_depth == -1: box_save = None else: box_save = self.box self._declare_var("sum", False, box_save) self._declare_var("sum", True, box_save) def compute(self): """Run the diagnostic""" has_levels = self._fix_file_metadata() data = self._load_data() masks = {} for basin in self.basins: masks[basin] = Utils.get_mask(basin) mesh = Nemo("mesh_hgr.nc", "mask_regions.nc") if has_levels: self._sum_3d_variable(data, mesh, masks) else: self._sum_2d_var(data, mesh, masks) def _sum_2d_var(self, data, mesh, masks): areacello = mesh.get_areacello(cell_point=self.grid_point) varsum = regsum.compute_regsum_2d(data.data, masks, areacello) self._save_result_2d("sum", varsum, data) def _sum_3d_variable(self, data, mesh, masks): areacello = mesh.get_areacello(cell_point=self.grid_point) tmask = iris.load_cube( "mesh_hgr.nc", "{0}mask".format(self.grid_point) ) e3 = self._try_load_cube(3) e3 = self._rename_depth(e3) e3.coord("depth").bounds = data.coord("depth").bounds if self.box.min_depth != -1 and self.box.max_depth != -1: depth_constraint = iris.Constraint( depth=lambda c: self.box.min_depth <= c <= self.box.max_depth ) e3 = e3.extract(depth_constraint) data = data.extract(depth_constraint) tmask = tmask.extract(depth_constraint) volcello = areacello * e3.data.astype(np.float32) varsum = regsum.compute_regsum_3d( data.data, masks, volcello, tmask.data ) self._save_result_2d("sum", varsum, data) if self.save3d: varsum3d = regsum.compute_regsum_levels( data.data, masks, volcello, tmask.data ) self._save_result_3d("sum", varsum3d, data) def _try_load_cube(self, number): try: cube = iris.load_cube( "mesh_hgr.nc", "e{0}{1}".format(number, self.grid_point) ) except iris.exceptions.ConstraintMismatchError: cube = iris.load_cube( "mesh_hgr.nc", "e{0}{1}_0".format(number, self.grid_point) ) cube = iris.util.squeeze(cube) dims = len(cube.shape) try: cube.coord("i") except iris.exceptions.CoordinateNotFoundError: cube.add_dim_coord( iris.coords.DimCoord( np.arange(cube.shape[dims - 1]), var_name="i" ), dims - 1, ) try: cube.coord("j") except iris.exceptions.CoordinateNotFoundError: cube.add_dim_coord( iris.coords.DimCoord( np.arange(cube.shape[dims - 2]), var_name="j" ), dims - 2, ) return cube def _load_data(self): coords = [] handler = Utils.open_cdf(self.variable_file.local_file) for variable in handler.variables: if variable in ( "time", "lev", "lat", "lon", "latitude", "longitude", "leadtime", "time_centered", ): coords.append(variable) if variable == "time_centered": handler.variables[variable].standard_name = "" handler.variables[self.variable].coordinates = " ".join(coords) handler.close() data = iris.load_cube(self.variable_file.local_file) return self._rename_depth(data) def _rename_depth(self, data): for coord_name in ("model_level_number", "Vertical T levels", "lev"): if data.coords(coord_name): coord = data.coord(coord_name) coord.standard_name = "depth" coord.long_name = "depth" break return data def _fix_file_metadata(self): handler = Utils.open_cdf(self.variable_file.local_file) var = handler.variables[self.variable] coordinates = "" has_levels = False for dimension in handler.variables.keys(): if dimension in [ "time", "lev", "lat", "latitude", "lon", "longitude", "i", "j", ]: coordinates += " {0}".format(dimension) if dimension == "lev": has_levels = True var.coordinates = coordinates handler.close() return has_levels def _send_var(self, threed, mean_file): var = "sum" if threed: if not self.save3d: return original_name = "{0}_{1}".format(var, self.variable) final_name = "{1}3d{0}".format(var, self.variable) levels = ",lev" else: original_name = "{0}_3D{1}".format(var, self.variable) final_name = "{1}{0}".format(var, self.variable) levels = "" temp2 = TempFile.get() Utils.nco().ncks( input=mean_file, output=temp2, options=( "-v {0},{2.lat_name},{2.lon_name}{1}".format( original_name, levels, self ), ), ) handler = Utils.open_cdf(temp2) var_handler = handler.variables[original_name] if hasattr(var_handler, "valid_min"): del var_handler.valid_min if hasattr(var_handler, "valid_max"): del var_handler.valid_max handler.close() self.declared[final_name].set_local_file( temp2, diagnostic=self, rename_var=original_name, region=self.basins, ) def _declare_var(self, var, threed, box_save): if threed: if not self.save3d: return final_name = "{1}3d{0}".format(var, self.variable) else: final_name = "{1}{0}".format(var, self.variable) self.declared[final_name] = self.declare_chunk( ModelingRealms.ocean, final_name, self.startdate, self.member, self.chunk, box=box_save, region=self.basins, grid=self.grid, frequency=self.frequency, ) def _save_result_2d(self, var, result, data): final_name = "{1}{0}".format(var, self.variable) temp = TempFile.get() handler_source = Utils.open_cdf(self.variable_file.local_file) handler_temp = Utils.open_cdf(temp, "w") Utils.copy_variable(handler_source, handler_temp, "time", True, True) handler_temp.createDimension("region", len(result)) handler_temp.createDimension("region_length", 50) var_region = handler_temp.createVariable( "region", "S1", ("region", "region_length") ) var = handler_temp.createVariable( "{1}{0}".format(var, self.variable), float, ("time", "region",), ) var.units = "{0}".format(data.units) for i, basin in enumerate(result): var_region[i, ...] = netCDF4.stringtoarr(str(basin), 50) var[..., i] = result[basin] handler_temp.close() self.declared[final_name].set_local_file(temp, diagnostic=self) def _save_result_3d(self, var, result, data): final_name = "{1}3d{0}".format(var, self.variable) temp = TempFile.get() handler_source = Utils.open_cdf(self.variable_file.local_file) handler_temp = Utils.open_cdf(temp, "w") Utils.copy_variable(handler_source, handler_temp, "time", True, True) handler_temp.createDimension("region", len(result)) handler_temp.createDimension("region_length", 50) handler_temp.createDimension("lev", data.shape[1]) var_level = handler_temp.createVariable("lev", float, "lev") var_level[...] = data.coord("depth").points var_level.units = "m" var_level.axis = "Z" var_level.positive = "down" var_level.long_name = "ocean depth coordinate" var_level.standard_name = "depth" var_region = handler_temp.createVariable( "region", "S1", ("region", "region_length") ) var = handler_temp.createVariable( "{1}3d{0}".format(var, self.variable), float, ("time", "lev", "region",), ) var.units = "{0}".format(data.units) for i, basin in enumerate(result): var_region[i, ...] = netCDF4.stringtoarr(str(basin), 50) var[..., i] = result[basin] handler_temp.close() self.declared[final_name].set_local_file(temp, diagnostic=self)