# coding=utf-8 import six from bscearth.utils.date import parse_date, add_months from bscearth.utils.log import Log from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \ DiagnosticIntOption, DiagnosticListIntOption, DiagnosticFloatOption from earthdiagnostics.frequency import Frequencies from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.variable_type import VariableType import numpy as np import iris import iris.coord_categorisation from iris.time import PartialDateTime import iris.exceptions import iris.coords import math class ClimatologicalPercentile(Diagnostic): """ Calculates the climatological percentiles for the given leadtime :param data_manager: data management object :type data_manager: DataManager :param variable: variable to average :type variable: str :param experiment_config: :type experiment_config: ExperimentConfig """ alias = 'climpercent' "Diagnostic alias for the configuration file" Percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9]) def __init__(self, data_manager, domain, variable, start_year, end_year, forecast_month, experiment_config): Diagnostic.__init__(self, data_manager) self.variable = variable self.domain = domain self.experiment_config = experiment_config self.start_year = start_year self.end_year = end_year self.forecast_month = forecast_month self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True) def __eq__(self, other): return self.domain == other.domain and self.variable == other.variable and \ self.start_year == other.start_year and self.end_year == other.end_year and \ self.forecast_month == other.forecast_month def __str__(self): return 'Climatological percentile Variable: {0.domain}:{0.variable} Period: {0.start_year}-{0.end_year} ' \ 'Forecast month: {0.forecast_month}'.format(self) @classmethod def generate_jobs(cls, diags, options): """ Creates a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: domain, variable, percentil number, maximum depth (level) :type options: list[str] :return: """ options_available = (DiagnosticDomainOption(), DiagnosticVariableOption(), DiagnosticIntOption('start_year'), DiagnosticIntOption('end_year'), DiagnosticListIntOption('forecast_month'), ) options = cls.process_options(options, options_available) job_list = list() for forecast_month in options['forecast_month']: job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'], options['start_year'], options['end_year'], forecast_month, diags.config.experiment)) return job_list def requested_startdates(self): return ['{0}{1:02}01'.format(year, self.forecast_month) for year in range(self.start_year, self.end_year+1)] def request_data(self): self.leadtime_files = {} for startdate in self.requested_startdates(): if startdate not in self.leadtime_files: self.leadtime_files[startdate] = {} Log.debug('Retrieving startdate {0}', startdate) self.leadtime_files[startdate] = self.request_chunk(self.domain, '{0}_dis'.format(self.variable), startdate, None, None, vartype=VariableType.STATISTIC) def declare_data_generated(self): var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month:2d}'.format(self) self.percentiles_file = self.declare_chunk(self.domain, var_name, None, None, None, frequency=Frequencies.climatology, vartype=VariableType.STATISTIC) def compute(self): """ Runs the diagnostic """ iris.FUTURE.netcdf_promote = True percentile_values = self._calculate_percentiles() self._save_results(percentile_values) def _save_results(self, percentile_values): temp = TempFile.get() percentile_coord = iris.coords.DimCoord(ClimatologicalPercentile.Percentiles, long_name='percentile') results = iris.cube.CubeList() for leadtime in percentile_values.keys(): result = iris.cube.Cube(percentile_values[leadtime], var_name='percent', units=self.units) result.add_dim_coord(percentile_coord, 0) result.add_dim_coord(self.lat_coord, 1) result.add_dim_coord(self.lon_coord, 2) result.add_aux_coord(iris.coords.AuxCoord(np.int8(leadtime), long_name='leadtime')) results.append(result) iris.FUTURE.netcdf_no_unlimited = True iris.save(results.merge_cube(), temp, zlib=True) self.percentiles_file.set_local_file(temp, rename_var='percent') def _calculate_percentiles(self): Log.debug('Calculating percentiles') percentiles = iris.cube.CubeList() def calculate(point_distribution): cs = np.cumsum(point_distribution) total = cs[-1] percentile_values = ClimatologicalPercentile.Percentiles * total index = np.searchsorted(cs, percentile_values) return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index] for leadtime_slice in self.distribution.slices_over('leadtime'): percentiles.append(np.apply_along_axis(calculate, 0, leadtime_slice.data)) return percentiles.merge_cube() def _get_distribution(self): for startdate in self.leadtime_files: Log.info('Getting data for startdate {0}', startdate) data_cube = iris.load_cube(startdate.local_file) if self.distribution is None: self.distribution = data_cube else: self.distribution += data_cube