# coding=utf-8 import six from bscearth.utils.date import parse_date, add_months from bscearth.utils.log import Log from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \ DiagnosticIntOption, DiagnosticListIntOption, DiagnosticFloatOption from earthdiagnostics.frequency import Frequencies from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.variable_type import VariableType import numpy as np import iris import iris.coord_categorisation from iris.time import PartialDateTime import iris.exceptions import iris.coords import math class ClimatologicalPercentile(Diagnostic): """ Calculates the climatological percentiles for the given leadtime :param data_manager: data management object :type data_manager: DataManager :param variable: variable to average :type variable: str :param experiment_config: :type experiment_config: ExperimentConfig """ alias = 'climpercent' "Diagnostic alias for the configuration file" Percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9]) def __init__(self, data_manager, domain, variable, num_bins, start_year, end_year, min_value, max_value, forecast_month, experiment_config): Diagnostic.__init__(self, data_manager) self.variable = variable self.domain = domain self.experiment_config = experiment_config self.realizations = None self.lat_coord = None self.lon_coord = None self.num_bins = num_bins self._bins = None self.start_year = start_year self.end_year = end_year self.forecast_month = forecast_month self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True) if not math.isnan(min_value): self.min_value = min_value self.check_min_value = False elif self.cmor_var and self.cmor_var.valid_min: self.min_value = float(self.cmor_var.valid_min) self.check_min_value = False else: self.min_value = None self.check_min_value = True if not math.isnan(max_value): self.max_value = max_value self.check_max_value = False elif self.cmor_var and self.cmor_var.valid_min: self.max_value = float(self.cmor_var.valid_max) self.check_max_value = False else: self.max_value = None self.check_max_value = True def __eq__(self, other): return self.domain == other.domain and self.variable == other.variable and self.num_bins == other.num_bins def __str__(self): return 'Climatological percentile Variable: {0}:{1} ' \ 'Bins: {2}'.format(self.domain, self.variable, self.num_bins) @classmethod def generate_jobs(cls, diags, options): """ Creates a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: domain, variable, percentil number, maximum depth (level) :type options: list[str] :return: """ options_available = (DiagnosticDomainOption(), DiagnosticVariableOption(), DiagnosticIntOption('start_year'), DiagnosticIntOption('end_year'), DiagnosticListIntOption('forecast_month'), DiagnosticIntOption('bins', 2000), DiagnosticFloatOption('min_value', float('nan')), DiagnosticFloatOption('max_value', float('nan')), ) options = cls.process_options(options, options_available) job_list = list() for forecast_month in options['forecast_month']: job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'], options['bins'], options['start_year'], options['end_year'], options['min_value'], options['max_value'], forecast_month, diags.config.experiment)) return job_list def requested_startdates(self): return ['{0}{1:02}01'.format(year, self.forecast_month) for year in range(self.start_year, self.end_year+1)] def request_data(self): self.leadtime_files = {} for startdate in self.requested_startdates(): if startdate not in self.leadtime_files: self.leadtime_files[startdate] = {} Log.debug('Retrieving startdate {0}', startdate) self.leadtime_files[startdate] = self.request_chunk(self.domain, self.variable, startdate, None, None) def declare_data_generated(self): var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month}'.format(self) self.percentiles_file = self.declare_chunk(self.domain, var_name, None, None, None, frequency=Frequencies.climatology, vartype=VariableType.STATISTIC) def compute(self): """ Runs the diagnostic """ iris.FUTURE.netcdf_promote = True for startdate in self.leadtime_files.keys(): Log.debug('Getting data for startdate {0}', startdate) data_cube = self._load_cube(startdate) self._get_value_interval(data_cube) self._get_realizations_present(data_cube) if self.lat_coord is None: self.units = data_cube.units self.lat_coord = data_cube.coord('latitude') self.lon_coord = data_cube.coord('longitude') Log.info('Range: [{0}, {1}]', self.min_value, self.max_value) distribution = self._get_distribution() percentile_values = self._calculate_percentiles(distribution) self._save_results(percentile_values) def _load_cube(self, startdate): handler = Utils.openCdf(self.leadtime_files[startdate].local_file) if 'realization' in handler.variables: handler.variables[self.variable].coordinates = 'realization' handler.close() data_cube = iris.load_cube(self.leadtime_files[startdate].local_file) date = parse_date(startdate) lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar) leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)} def assign_leadtime(coord, x): leadtime_month = 1 partial_date = leadtimes[leadtime_month] while coord.units.num2date(x) >= partial_date: leadtime_month += 1 try: partial_date = leadtimes[leadtime_month] except KeyError: new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar) partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day) leadtimes[leadtime_month] = partial_date return leadtime_month iris.coord_categorisation.add_categorised_coord(data_cube, 'leadtime', 'time', assign_leadtime) return data_cube def _save_results(self, percentile_values): temp = TempFile.get() percentile_coord = iris.coords.DimCoord(ClimatologicalPercentile.Percentiles, long_name='percentile') results = iris.cube.CubeList() for leadtime in percentile_values.keys(): result = iris.cube.Cube(percentile_values[leadtime], var_name='percent', units=self.units) result.add_dim_coord(percentile_coord, 0) result.add_dim_coord(self.lat_coord, 1) result.add_dim_coord(self.lon_coord, 2) result.add_aux_coord(iris.coords.AuxCoord(np.int8(leadtime), long_name='leadtime')) results.append(result) iris.FUTURE.netcdf_no_unlimited = True iris.save(results.merge_cube(), temp, zlib=True) self.percentiles_file.set_local_file(temp, rename_var='percent') def _calculate_percentiles(self, distribution): Log.debug('Calculating percentiles') percentiles = {} def calculate(point_distribution): cs = np.cumsum(point_distribution) total = cs[-1] percentile_values = ClimatologicalPercentile.Percentiles * total index = np.searchsorted(cs, percentile_values) return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index] for leadtime, dist in six.iteritems(distribution): Log.debug('Calculating leadtime {0}', leadtime) percentiles[leadtime] = np.apply_along_axis(calculate, 0, dist) return percentiles def _get_distribution(self): distribution = {} for startdate in self.leadtime_files: Log.debug('Getting data for startdate {0}', startdate) data_cube = self._load_cube(startdate) Log.debug('Discretizing file {0}', data_cube) for leadtime in set(data_cube.coord('leadtime').points): Log.debug('Discretizing leadtime {0}', leadtime) leadtime_cube = data_cube.extract(iris.Constraint(leadtime=leadtime)) for realization in range(self.realizations): Log.debug('Discretizing realization {0}', realization) try: realization_cube = leadtime_cube.extract(iris.Constraint(realization=realization+1)) except iris.exceptions.CoordinateNotFoundError: realization_cube = leadtime_cube if realization_cube is None and realization == 0: realization_cube = leadtime_cube if leadtime not in distribution: distribution[leadtime] = self._calculate_distribution(realization_cube) else: distribution[leadtime] += self._calculate_distribution(realization_cube) return distribution def _get_realizations_present(self, data_cube): realizations = 1 ensemble_dimensions = ('realization', 'ensemble') for ensemble_dimension in ensemble_dimensions: try: realizations = data_cube.coord(ensemble_dimension).shape[0] break except iris.exceptions.CoordinateNotFoundError: pass if self.realizations is None: self.realizations = realizations if realizations != self.realizations: # noinspection PyNoneFunctionAssignment self.realizations = min(self.realizations, realizations) Log.warning('Different number of realizations in the data used by diagnostic {0}', self) def _get_value_interval(self, data_cube): if self.check_min_value: if self.check_max_value: for time_slice in data_cube.slices_over('time'): for value in time_slice.data.flat: if value < self.min_value: self.min_value = value if value > self.max_value: self.max_value = value else: for time_slice in data_cube.slices_over('time'): file_min = np.amin(time_slice.data) if self.min_value is None: self.min_value = file_min self.max_value = min(self.min_value, file_min) else: if self.check_max_value: for time_slice in data_cube.slices_over('time'): file_max = np.amax(time_slice.data) self.max_value = max(self.min_value, file_max) def _calculate_distribution(self, data_cube): def calculate_histogram(time_series): histogram, self._bins = np.histogram(time_series, bins=self.num_bins, range=(self.min_value, self.max_value)) return histogram return np.apply_along_axis(calculate_histogram, 0, data_cube.data)