# coding=utf-8 from earthdiagnostics.constants import Basins from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticDomainOption, DiagnosticBoolOption from earthdiagnostics.utils import Utils, TempFile import numpy as np from earthdiagnostics.modelingrealm import ModelingRealm, ModelingRealms class InterpolateCDO(Diagnostic): """ 3-dimensional conservative interpolation to the regular atmospheric grid. It can also be used for 2D (i,j) variables :original author: Javier Vegas-Regidor :created: October 2016 :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int :param variable: variable's name :type variable: str :param domain: variable's domain :type domain: ModelingRealm :param model_version: model version :type model_version: str """ alias = 'interpcdo' "Diagnostic alias for the configuration file" def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid, model_version, mask_oceans): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.variable = variable self.domain = domain self.model_version = model_version self.required_vars = [variable] self.generated_vars = [variable] self.tempTemplate = '' self.grid = target_grid self.mask_oceans = mask_oceans def __eq__(self, other): return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \ self.model_version == other.model_version and self.domain == other.domain and \ self.variable == other.variable and self.grid == other.grid def __str__(self): return 'Interpolate with CDO Startdate: {0} Member: {1} Chunk: {2} ' \ 'Variable: {3}:{4} Target grid: {5} ' \ 'Model: {6}' .format(self.startdate, self.member, self.chunk, self.domain, self.variable, self.grid, self.model_version) @classmethod def generate_jobs(cls, diags, options): """ Creates a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: target_grid, variable, domain=ocean :type options: list[str] :return: """ options_available = (DiagnosticOption('variable'), DiagnosticOption('target_grid', diags.config.experiment.atmos_grid.lower()), DiagnosticDomainOption('domain', ModelingRealms.ocean), DiagnosticBoolOption('mask_oceans', True)) options = cls.process_options(options, options_available) target_grid = cls._translate_ifs_grids_to_cdo_names(options['target_grid']) job_list = list() for startdate, member, chunk in diags.config.experiment.get_chunk_list(): job_list.append(InterpolateCDO(diags.data_manager, startdate, member, chunk, options['domain'], options['variable'], target_grid, diags.config.experiment.model_version, options['mask_oceans'])) return job_list @classmethod def _translate_ifs_grids_to_cdo_names(cls, target_grid): if target_grid.upper().startswith('T159L'): target_grid = 't106grid' if target_grid.upper().startswith('T255L'): target_grid = 't170grid' if target_grid.upper().startswith('T511L'): target_grid = 't340grid' return target_grid def compute(self): """ Runs the diagnostic """ variable_file = self.data_manager.get_file(self.domain, self.variable, self.startdate, self.member, self.chunk) if self.mask_oceans: handler = Utils.openCdf(variable_file) var = handler.variables[self.variable] mask = Utils.get_mask(Basins.Global).astype(float) mask[mask == 0] = np.nan var[:] = mask * var[:] handler.close() cdo = Utils.cdo temp = TempFile.get() cdo.remapbil(self.grid, input=variable_file, output=temp) Utils.setminmax(temp, self.variable) self.send_file(temp, self.domain, self.variable, self.startdate, self.member, self.chunk, grid=self.grid)