climatologicalpercentile.py 12.1 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
import six
from bscearth.utils.date import parse_date, add_months
from bscearth.utils.log import Log
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \
    DiagnosticIntOption, DiagnosticListIntOption, DiagnosticFloatOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable_type import VariableType
import iris
import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.exceptions
import iris.coords


class ClimatologicalPercentile(Diagnostic):
    """
    Calculates the climatological percentiles for the given leadtime

    :param data_manager: data management object
    :type data_manager: DataManager
    :param variable: variable to average
    :type variable: str
    :param experiment_config:
    :type experiment_config: ExperimentConfig
    """

    alias = 'climpercent'
    "Diagnostic alias for the configuration file"

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    Percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])

    def __init__(self, data_manager, domain, variable, num_bins, start_year, end_year, min_value, max_value,
                 forecast_month, experiment_config):
        Diagnostic.__init__(self, data_manager)
        self.variable = variable
        self.domain = domain
        self.experiment_config = experiment_config
        self.lat_coord = None
        self.lon_coord = None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.start_year = start_year
        self.end_year = end_year
        self.forecast_month = forecast_month
        self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True)

        if not math.isnan(min_value):
            self.min_value = min_value
            self.check_min_value = False
        elif self.cmor_var and self.cmor_var.valid_min:
            self.min_value = float(self.cmor_var.valid_min)
            self.check_min_value = True

        if not math.isnan(max_value):
            self.max_value = max_value
            self.check_max_value = False
        elif self.cmor_var and self.cmor_var.valid_min:
            self.max_value = float(self.cmor_var.valid_max)
            self.check_max_value = False
        else:
        return self.domain == other.domain and self.variable == other.variable and self.num_bins == other.num_bins
        return 'Climatological percentile Variable: {0}:{1} ' \
               'Bins: {2}'.format(self.domain, self.variable, self.num_bins)

    @classmethod
    def generate_jobs(cls, diags, options):
        """
        Creates a job for each chunk to compute the diagnostic

        :param diags: Diagnostics manager class
        :type diags: Diags
        :param options: domain, variable, percentil number, maximum depth (level)
        :type options: list[str]
        :return:
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        options_available = (DiagnosticDomainOption(),
                             DiagnosticVariableOption(),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                             DiagnosticIntOption('start_year'),
                             DiagnosticIntOption('end_year'),
                             DiagnosticListIntOption('forecast_month'),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                             DiagnosticIntOption('bins', 2000),
                             DiagnosticFloatOption('min_value', float('nan')),
                             DiagnosticFloatOption('max_value', float('nan')),
        options = cls.process_options(options, options_available)
        for forecast_month in options['forecast_month']:
            job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'],
                                                     options['bins'], options['start_year'], options['end_year'],
                                                     forecast_month, diags.config.experiment))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def requested_startdates(self):
        return ['{0}{1:02}01'.format(year, self.forecast_month) for year in range(self.start_year, self.end_year+1)]

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def request_data(self):
        self.leadtime_files = {}
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for startdate in self.requested_startdates():
            if startdate not in self.leadtime_files:
                self.leadtime_files[startdate] = {}
            Log.debug('Retrieving startdate {0}', startdate)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self.leadtime_files[startdate] = self.request_chunk(self.domain, self.variable, startdate,  None, None)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def declare_data_generated(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month}'.format(self)
        self.percentiles_file = self.declare_chunk(self.domain, var_name, None, None, None,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                                   frequency=Frequencies.climatology, vartype=VariableType.STATISTIC)

    def compute(self):
        """
        Runs the diagnostic
        """
        iris.FUTURE.netcdf_promote = True
        for startdate in self.leadtime_files.keys():
            Log.debug('Getting data for startdate {0}', startdate)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            data_cube = self._load_cube(startdate)
            self._get_value_interval(data_cube)
            self._get_realizations_present(data_cube)
            if self.lat_coord is None:
                self.units = data_cube.units
                self.lat_coord = data_cube.coord('latitude')
                self.lon_coord = data_cube.coord('longitude')
        Log.info('Range: [{0}, {1}]', self.min_value, self.max_value)
        distribution = self._get_distribution()
        percentile_values = self._calculate_percentiles(distribution)
        self._save_results(percentile_values)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _load_cube(self, startdate):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.openCdf(self.leadtime_files[startdate].local_file)
        if 'realization' in handler.variables:
            handler.variables[self.variable].coordinates = 'realization'
        handler.close()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        data_cube = iris.load_cube(self.leadtime_files[startdate].local_file)
        date = parse_date(startdate)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar)
        leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)}

        def assign_leadtime(coord, x):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            leadtime_month = 1
            partial_date = leadtimes[leadtime_month]
            while coord.units.num2date(x) >= partial_date:
                leadtime_month += 1
                try:
                    partial_date = leadtimes[leadtime_month]
                except KeyError:
                    new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar)
                    partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day)
                    leadtimes[leadtime_month] = partial_date
            return leadtime_month

        iris.coord_categorisation.add_categorised_coord(data_cube, 'leadtime', 'time', assign_leadtime)
        return data_cube
    def _save_results(self, percentile_values):
        temp = TempFile.get()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        percentile_coord = iris.coords.DimCoord(ClimatologicalPercentile.Percentiles, long_name='percentile')
        results = iris.cube.CubeList()
        for leadtime in percentile_values.keys():
            result = iris.cube.Cube(percentile_values[leadtime], var_name='percent', units=self.units)
            result.add_dim_coord(percentile_coord, 0)
            result.add_dim_coord(self.lat_coord, 1)
            result.add_dim_coord(self.lon_coord, 2)
            result.add_aux_coord(iris.coords.AuxCoord(np.int8(leadtime), long_name='leadtime'))
            results.append(result)
        iris.FUTURE.netcdf_no_unlimited = True
        iris.save(results.merge_cube(), temp, zlib=True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.percentiles_file.set_local_file(temp, rename_var='percent')
    def _calculate_percentiles(self, distribution):
        Log.debug('Calculating percentiles')
        percentiles = {}
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

            cs = np.cumsum(point_distribution)
            total = cs[-1]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            percentile_values = ClimatologicalPercentile.Percentiles * total
            index = np.searchsorted(cs, percentile_values)
            return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for leadtime, dist in six.iteritems(distribution):
            Log.debug('Calculating leadtime {0}', leadtime)
            percentiles[leadtime] = np.apply_along_axis(calculate, 0, dist)
        return percentiles
    def _get_distribution(self):
        distribution = {}
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for startdate in self.leadtime_files:
            Log.debug('Getting data for startdate {0}', startdate)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            data_cube = self._load_cube(startdate)
            Log.debug('Discretizing file {0}', data_cube)

            for leadtime in set(data_cube.coord('leadtime').points):
                Log.debug('Discretizing leadtime {0}', leadtime)
                leadtime_cube = data_cube.extract(iris.Constraint(leadtime=leadtime))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                for realization in range(self.realizations):
                    Log.debug('Discretizing realization {0}', realization)
                    try:
                        realization_cube = leadtime_cube.extract(iris.Constraint(realization=realization+1))
                    except iris.exceptions.CoordinateNotFoundError:
                        realization_cube = leadtime_cube
                    if realization_cube is None and realization == 0:
                        realization_cube = leadtime_cube
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    if leadtime not in distribution:
                        distribution[leadtime] = self._calculate_distribution(realization_cube)
                    else:
                        distribution[leadtime] += self._calculate_distribution(realization_cube)
        return distribution
    def _get_realizations_present(self, data_cube):
        ensemble_dimensions = ('realization', 'ensemble')
        for ensemble_dimension in ensemble_dimensions:
            try:
                realizations = data_cube.coord(ensemble_dimension).shape[0]
                break
            except iris.exceptions.CoordinateNotFoundError:
                pass

        if self.realizations is None:
            self.realizations = realizations
        if realizations != self.realizations:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            # noinspection PyNoneFunctionAssignment
            self.realizations = min(self.realizations, realizations)
            Log.warning('Different number of realizations in the data used by diagnostic {0}', self)

    def _get_value_interval(self, data_cube):
        if self.check_min_value:
            if self.check_max_value:
                for time_slice in data_cube.slices_over('time'):
                    for value in time_slice.data.flat:
                        if value < self.min_value:
                            self.min_value = value
                        if value > self.max_value:
                            self.max_value = value
                for time_slice in data_cube.slices_over('time'):
                    file_min = np.amin(time_slice.data)
                    if self.min_value is None:
                        self.min_value = file_min
                    self.max_value = min(self.min_value, file_min)
        else:
            if self.check_max_value:
                for time_slice in data_cube.slices_over('time'):
                    file_max = np.amax(time_slice.data)
                    self.max_value = max(self.min_value, file_max)
    def _calculate_distribution(self, data_cube):
        def calculate_histogram(time_series):
            histogram, self._bins = np.histogram(time_series, bins=self.num_bins,
                                                 range=(self.min_value, self.max_value))
        return np.apply_along_axis(calculate_histogram, 0, data_cube.data)