# coding=utf-8 from autosubmit.config.log import Log from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticDomainOption, DiagnosticListIntOption, \ DiagnosticIntOption from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.variable import Variable, VarType import numpy as np class ClimatologicalPercentile(Diagnostic): """ Calculates the climatological percentiles for the given leadtimes :param data_manager: data management object :type data_manager: DataManager :param variable: variable to average :type variable: str :param experiment_config: :type experiment_config: ExperimentConfig """ alias = 'climpercent' "Diagnostic alias for the configuration file" def __init__(self, data_manager, domain, variable, leadtimes, num_bins, experiment_config): Diagnostic.__init__(self, data_manager) self.variable = variable self.domain = domain self.leadtimes = leadtimes self.experiment_config = experiment_config self.realizations = None self.lat_len = None self.lon_len = None self.num_bins = num_bins self._bins = None self.percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9]) self.cmor_var = Variable.get_variable(variable, silent=True) if self.cmor_var and self.cmor_var.valid_max and self.cmor_var.valid_min: self.max_value = float(self.cmor_var.valid_max) self.min_value = float(self.cmor_var.valid_min) self.check_limit_values = False else: self.min_value = None self.max_value = None self.check_limit_values = True def __eq__(self, other): return self.domain == other.domain and self.variable == other.variable and self.leadtimes == other.leadtimes def __str__(self): return 'Climatological percentile Variable: {0}:{1} Leadtimes: {2}'.format(self.domain, self.variable, self.leadtimes) @classmethod def generate_jobs(cls, diags, options): """ Creates a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: domain, variable, percentil number, maximum depth (level) :type options: list[str] :return: """ options_available = (DiagnosticDomainOption('domain'), DiagnosticOption('variable'), DiagnosticListIntOption('leadtimes'), DiagnosticIntOption('bins', 2000)) options = cls.process_options(options, options_available) job_list = list() job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'], options['leadtimes'], options['bins'], diags.config.experiment)) return job_list def compute(self): """ Runs the diagnostic """ member_files = self._get_data() distribution = self._get_distribution(member_files) percentile_values = self._calculate_percentiles(distribution) self._save_results(percentile_values) def _save_results(self, percentile_values): temp = TempFile.get() handler = Utils.openCdf(temp, 'w') handler.createDimension('percentile', len(self.percentiles)) percentile_var = handler.createVariable('percentile', float, ('percentile',)) percentile_var[:] = self.percentiles handler.createDimension('lat', self.lat.size) lat_var = handler.createVariable('lat', float, ('lat',)) lat_var[:] = self.lat handler.createDimension('lon', self.lon.size) lon_var = handler.createVariable('lon', float, ('lon',)) lon_var[:] = self.lon percentile_var = handler.createVariable('percent', float, ('percentile', 'lat', 'lon')) percentile_var[...] = percentile_values handler.close() self.send_file(temp, self.domain, self.variable + '_percentiles', None, None, frequency='clim', rename_var='percent', vartype=VarType.STATISTIC) def _calculate_percentiles(self, distribution): Log.debug('Calculating percentiles') def calculate(point_distribution): cs = np.cumsum(point_distribution) total = cs[-1] percentile_values = self.percentiles * total index = np.searchsorted(cs, percentile_values) return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index] distribution = np.apply_along_axis(calculate, 0, distribution) return distribution def _get_distribution(self, member_files): distribution = None for memberfile in member_files: Log.debug('Discretizing file {0}', memberfile) handler = Utils.openCdf(memberfile) for realization in range(self.realizations): if distribution is None: distribution = self._calculate_distribution(handler, realization) else: distribution += self._calculate_distribution(handler, realization) handler.close() return distribution def _get_data(self): member_files = list() for startdate, member in self.experiment_config.get_member_list(): Log.debug('Retrieving startdate {0}', startdate) memberfile = self.data_manager.get_leadtimes(self.domain, self.variable, startdate, member, self.leadtimes) Log.debug('Getting data for startdate {0}', startdate) handler = Utils.openCdf(memberfile) self._get_value_interval(handler) self._get_realizations_present(handler) self._get_var_size(handler) handler.close() member_files.append(memberfile) return member_files def _get_realizations_present(self, handler): realizations = 1 if 'realization' in handler.dimensions: realizations = handler.dimensions['realization'].size if 'ensemble' in handler.dimensions: realizations = handler.dimensions['ensemble'].size if self.realizations is None: self.realizations = realizations if realizations != self.realizations: self.realizations = min(self.realizations, realizations) Log.warning('Different number of realizations in the data used by diagnostic {0}', self) def _get_value_interval(self, handler): if not self.check_limit_values: return values = handler.variables[self.variable][:] file_max = np.amax(values) file_min = np.amin(values) self.max_value = max(self.min_value, file_max) if self.min_value is None: self.min_value = file_min else: self.min_value = min(self.min_value, file_min) def _calculate_distribution(self, handler, realization): Log.debug('Discretizing realization {0}', realization) def calculate_histogram(time_series): histogram, self._bins = np.histogram(time_series, bins=self.num_bins, range=(self.min_value, self.max_value)) return histogram var = handler.variables[self.variable] if 'realization' in var.dimensions or 'ensemble' in var.dimensions: return np.apply_along_axis(calculate_histogram, 0, var[:, realization, ...]) else: return np.apply_along_axis(calculate_histogram, 0, var[:]) def _get_var_size(self, handler): if self.lat_len is not None: return self.lat = handler.variables['latitude'][:] self.lon = handler.variables['longitude'][:]