# coding=utf-8 from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticFloatOption, DiagnosticDomainOption, \ DiagnosticVariableOption from earthdiagnostics.utils import Utils from earthdiagnostics.modelingrealm import ModelingRealm import numpy as np import math class Scale(Diagnostic): """ Scales a variable by the given value also adding at offset Can be useful to correct units or other known errors (think of a tas file declaring K as units but with the data stored as Celsius) :original author: Javier Vegas-Regidor :created: July 2016 :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int : :param variable: variable's name :type variable: str :param domain: variable's domain :type domain: ModelingRealm """ alias = 'scale' "Diagnostic alias for the configuration file" def __init__(self, data_manager, startdate, member, chunk, value, offset, domain, variable, grid, min_limit, max_limit): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.variable = variable self.domain = domain self.grid = grid self.value = value self.offset = offset self.min_limit = min_limit self.max_limit = max_limit self.original_values = None def __str__(self): return 'Scale output Startdate: {0} Member: {1} Chunk: {2} ' \ 'Scale value: {5} Offset: {6} Variable: {3}:{4}'.format(self.startdate, self.member, self.chunk, self.domain, self.variable, self.value, self.offset) def __eq__(self, other): return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \ self.domain == other.domain and self.variable == other.variable @classmethod def generate_jobs(cls, diags, options): """ Creates a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: variable, domain, grid :type options: list[str] :return: """ options_available = (DiagnosticVariableOption('variable'), DiagnosticDomainOption('domain'), DiagnosticFloatOption('value'), DiagnosticFloatOption('offset'), DiagnosticOption('grid', ''), DiagnosticFloatOption('min_limit', float('nan')), DiagnosticFloatOption('max_limit', float('nan'))) options = cls.process_options(options, options_available) job_list = list() for startdate, member, chunk in diags.config.experiment.get_chunk_list(): job_list.append(Scale(diags.data_manager, startdate, member, chunk, options['value'], options['offset'], options['domain'], options['variable'], options['grid'], options['min_limit'], options['max_limit'])) return job_list def compute(self): """ Runs the diagnostic """ variable_file = self.data_manager.get_file(self.domain, self.variable, self.startdate, self.member, self.chunk, grid=self.grid) handler = Utils.openCdf(variable_file) var_handler = handler.variables[self.variable] self.original_values = var_handler[:] if self._check_limits(): var_handler[:] = self.original_values * self.value + self.offset handler.close() self.send_file(variable_file, self.domain, self.variable, self.startdate, self.member, self.chunk, grid=self.grid) def _check_limits(self): if not math.isnan(self.min_limit) and (self.original_values < self.min_limit).any(): return False if not math.isnan(self.max_limit) and (self.original_values > self.max_limit).any(): return False return True