# coding=utf-8 """CDO-based interpolation""" import os from cdo import CDOException import numpy as np from earthdiagnostics.constants import Basins from earthdiagnostics.diagnostic import ( Diagnostic, DiagnosticDomainOption, DiagnosticVariableListOption, DiagnosticChoiceOption, DiagnosticBoolOption, DiagnosticOption, DiagnosticFrequencyOption, ) from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile class InterpolateCDO(Diagnostic): """ 3-dimensional conservative interpolation to the regular atmospheric grid. It can also be used for 2D (i,j) variables :original author: Javier Vegas-Regidor :created: October 2016 :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int :param variable: variable's name :type variable: str :param domain: variable's domain :type domain: ModelingRealm :param model_version: model version :type model_version: str """ alias = "interpcdo" "Diagnostic alias for the configuration file" BILINEAR = "bilinear" BICUBIC = "bicubic" CONSERVATIVE = "conservative" CONSERVATIVE2 = "conservative2" METHODS = [BILINEAR, BICUBIC, CONSERVATIVE, CONSERVATIVE2] def __init__( self, data_manager, startdate, member, chunk, domain, variable, target_grid, model_version, mask_oceans, original_grid, weights, frequency, ): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.variable = variable self.domain = domain self.model_version = model_version self.required_vars = [variable] self.generated_vars = [variable] self.tempTemplate = "" self.grid = target_grid self.mask_oceans = mask_oceans self.original_grid = original_grid self.weights = weights self.frequency = frequency def __eq__(self, other): if self._different_type(other): return False return ( self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and self.model_version == other.model_version and self.domain == other.domain and self.variable == other.variable and self.mask_oceans == other.mask_oceans and self.grid == other.grid and self.original_grid == other.original_grid and self.frequency == other.frequency ) def __hash__(self): return hash(str(self)) def __str__(self): return ( "Interpolate with CDO Startdate: {0.startdate} Member: {0.member} " "Chunk: {0.chunk} Variable: {0.domain}:{0.variable} " "Frequency: {0.frequency} " "Target grid: {0.grid} Original grid: {0.original_grid} " "Mask ocean: {0.mask_oceans} Model: {0.model_version}".format(self) ) @classmethod def generate_jobs(cls, diags, options): """ Create a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: target_grid, variable, domain=ocean :type options: list[str] :return: """ # Initialize CDO on the main thread Utils.cdo() options_available = ( DiagnosticDomainOption(default_value=ModelingRealms.ocean), DiagnosticVariableListOption( diags.data_manager.config.var_manager, "variables" ), DiagnosticOption( "target_grid", diags.config.experiment.atmos_grid.lower() ), DiagnosticChoiceOption( "method", InterpolateCDO.METHODS, InterpolateCDO.BILINEAR ), DiagnosticBoolOption("mask_oceans", True), DiagnosticOption("original_grid", ""), DiagnosticBoolOption("weights_from_mask", True), DiagnosticFrequencyOption( default_value=diags.config.frequency), ) options = cls.process_options(options, options_available) target_grid = cls._translate_ifs_grids_to_cdo_names( options["target_grid"] ) if not target_grid: raise Exception("Target grid not provided") job_list = list() weights = TempFile.get() method = options["method"].lower() if options["weights_from_mask"]: temp = cls.get_sample_grid_file() cls.compute_weights(method, target_grid, temp, weights) os.remove(temp) weights_job = None else: ( startdate, member, chunk, ) = diags.config.experiment.get_chunk_list()[0] weights_job = ComputeWeights( diags.data_manager, startdate, member, chunk, options["domain"], options["variables"][0], target_grid, options["original_grid"], weights, options["method"], options["frequency"], ) for var in options["variables"]: for ( startdate, member, chunk, ) in diags.config.experiment.get_chunk_list(): job = InterpolateCDO( diags.data_manager, startdate, member, chunk, options["domain"], var, target_grid, diags.config.experiment.model_version, options["mask_oceans"], options["original_grid"], weights, options["frequency"] ) if weights_job is not None: job.add_subjob(weights_job) job_list.append(job) return job_list @classmethod def compute_weights(cls, method, target_grid, sample_file, weights): """ Compute weights for interpolation from sample file Parameters ---------- method: int Interpolation method target_grid: str Grid to intepolate to. Can be anything understand by CDO sample_file: str Path to a file containing original mesh information weights: Path to the file to store the weights """ if method == InterpolateCDO.BILINEAR: Utils.cdo().genbil(target_grid, input=sample_file, output=weights) elif method == InterpolateCDO.BICUBIC: Utils.cdo().genbic(target_grid, input=sample_file, output=weights) elif method == InterpolateCDO.CONSERVATIVE: Utils.cdo().genycon(target_grid, input=sample_file, output=weights) elif method == InterpolateCDO.CONSERVATIVE2: Utils.cdo().gencon2(target_grid, input=sample_file, output=weights) @classmethod def get_sample_grid_file(cls): """ Get a sample grid file Create a sample grid file from the definition in the masks file Returns ------- str """ temp = TempFile.get() handler = Utils.open_cdf("mask.nc") lat_name, lon_name = cls._get_lat_lon_alias(handler) lon_bnds_name = "{0}_bnds".format(lon_name) lat_bnds_name = "{0}_bnds".format(lat_name) Utils.nco().ncks( input="mask.nc", output=temp, options=( "-O -v tmask,{0},{1},gphif,glamf".format(lat_name, lon_name), ), ) handler = Utils.open_cdf(temp) lon = handler.variables[lon_name] lon.units = "degrees_east" lon.long_name = "Longitude" lon.nav_model = "Default grid" lon.standard_name = "longitude" lon.short_name = lon_name lon.bounds = lon_bnds_name lat = handler.variables[lat_name] lat.units = "degrees_north" lat.long_name = "Latitude" lat.nav_model = "Default grid" lat.standard_name = "latitude" lat.short_name = lat_name lat.bounds = lat_bnds_name handler.createDimension("bounds", 4) lon_bnds = handler.createVariable( lon_bnds_name, lon.datatype, ("j", "i", "bounds") ) corner_lat = handler.variables["glamf"][0, ...] lon_bnds[:, :, 0] = corner_lat lon_bnds[:, :, 1] = np.roll(corner_lat, 1, 0) lon_bnds[:, :, 2] = np.roll(corner_lat, -1, 1) lon_bnds[:, :, 3] = np.roll(lon_bnds[:, :, 1], -1, 1) lat_bnds = handler.createVariable( lat_bnds_name, lat.datatype, ("j", "i", "bounds") ) corner_lat = handler.variables["gphif"][0, ...] lat_bnds[:, :, 0] = corner_lat lat_bnds[:, :, 1] = np.roll(corner_lat, 1, 0) lat_bnds[:, :, 2] = np.roll(corner_lat, 1, 1) lat_bnds[:, :, 3] = np.roll(lat_bnds[:, :, 1], 1, 1) lat_bnds[0, :, 1] = lat_bnds[1, 0, 1] - 1 lat_bnds[0, :, 3] = lat_bnds[1, 0, 3] - 1 tmask = handler.variables["tmask"] tmask.coordinates = "time lev {0} {1}".format(lat_name, lon_name) handler.close() Utils.nco().ncks( input=temp, output=temp, options=("-O -x -v gphif,glamf",) ) return temp @classmethod def _get_lat_lon_alias(cls, handler): lat_name = None for lat_alias in ["lat", "latitude"]: if lat_alias in handler.variables: lat_name = lat_alias break lon_name = None for lon_alias in ["lon", "longitude"]: if lon_alias in handler.variables: lon_name = lon_alias break return lat_name, lon_name @classmethod def _translate_ifs_grids_to_cdo_names(cls, target_grid): if target_grid.upper().startswith("T159L"): target_grid = "t106grid" if target_grid.upper().startswith("T255L"): target_grid = "t170grid" if target_grid.upper().startswith("T511L"): target_grid = "t340grid" return target_grid def request_data(self): """Request data required by the diagnostic""" self.original = self.request_chunk( self.domain, self.variable, self.startdate, self.member, self.chunk, grid=self.original_grid, frequency=self.frequency, ) def declare_data_generated(self): """Declare data to be generated by the diagnostic""" self.regridded = self.declare_chunk( self.domain, self.variable, self.startdate, self.member, self.chunk, grid=self.grid, frequency=self.frequency, ) def compute(self): """Run the diagnostic""" variable_file = TempFile.get() Utils.copy_file(self.original.local_file, variable_file) Utils.rename_variables( variable_file, { "jpib": "i", "jpjb": "j", "x": "i", "y": "j", "dim1": "j", "dim2": "i", "time_counter": "time", "t": "time", "SSTK_ens0": "tos", "SSTK_ens1": "tos", "SSTK_ens2": "tos", "nav_lat": "lat", "nav_lon": "lon", "time_centered": None, "time_centered_bnds": None, }, must_exist=False, ) handler = Utils.open_cdf(variable_file) lat_name, lon_name = self._get_lat_lon_alias(handler) var = handler.variables[self.variable] try: units = var.units except AttributeError: units = None coordinates = list() for dim in var.dimensions: if dim == "i": coordinates.append(lon_name) elif dim == "j": coordinates.append(lat_name) else: coordinates.append(dim) var.coordinates = " ".join(coordinates) if self.mask_oceans: mask = Utils.get_mask(Basins().Global).astype(float) mask[mask == 0] = np.nan var[:] = mask * var[:] handler.close() temp = TempFile.get() try: Utils.cdo().remap( ",".join((self.grid.split("_")[0], self.weights)), input=variable_file, output=temp, ) except CDOException as ex: msg = ( '(cdf_put_vara_double): NetCDF: Numeric ' 'conversion not representable' ) if msg not in ex.msg: raise Utils.cdo().remap( ",".join((self.grid.split("_")[0], self.weights)), input=variable_file, output=temp, options='-b F64' ) handler = Utils.open_cdf(temp) if units: handler.variables[self.variable].units = units handler.close() if lat_name != "lat": Utils.rename_variables( temp, {"lat": lat_name, "lon": lon_name}, True ) self.regridded.set_local_file(temp) class ComputeWeights(Diagnostic): """ Diagnostic used to compute interpolation weights Parameters ---------- data_manager: DataManager startdate: str member: int chunk: int domain: ModelingRealm variable: str target_grid: str original_grid: str weights_file: str method: str """ alias = "computeinterpcdoweights" "Diagnostic alias for the configuration file" @classmethod def generate_jobs(cls, diags, options): """ Generate the instances of the diagnostics that will be run This method does not does anything as this diagnostic is not expected to be called by the users """ pass def __init__( self, data_manager, startdate, member, chunk, domain, variable, target_grid, original_grid, weights_file, method, frequency ): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.variable = variable self.domain = domain self.grid = target_grid self.original_grid = original_grid self.weights_file = weights_file self.method = method self.frequency = frequency def __str__(self): return ( "Computing weights for CDO interpolation: Method {0.method} " "Target grid: {0.grid}".format(self) ) def compute(self): """Compute weights""" InterpolateCDO.compute_weights( self.method, self.grid, self.sample_data.local_file, self.weights_file, ) def request_data(self): """Request data required by the diagnostic""" self.sample_data = self.request_chunk( self.domain, self.variable, self.startdate, self.member, self.chunk, grid=self.original_grid, frequency=self.frequency, ) def declare_data_generated(self): """Declare data to be generated by the diagnostic""" pass