climatologicalpercentile.py 9.08 KB
Newer Older
from bscearth.utils.date import add_months, parse_date
from bscearth.utils.log import Log
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \
    DiagnosticListIntOption, DiagnosticIntOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable_type import VariableType
import iris
import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.exceptions
import iris.coords


class ClimatologicalPercentile(Diagnostic):
    """
    Calculates the climatological percentiles for the given leadtime

    :param data_manager: data management object
    :type data_manager: DataManager
    :param variable: variable to average
    :type variable: str
    :param experiment_config:
    :type experiment_config: ExperimentConfig
    """

    alias = 'climpercent'
    "Diagnostic alias for the configuration file"

    def __init__(self, data_manager, domain, variable, num_bins, experiment_config):
        Diagnostic.__init__(self, data_manager)
        self.variable = variable
        self.domain = domain
        self.experiment_config = experiment_config
        self.lat_coord = None
        self.lon_coord = None
        self.percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
        self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True)
        if self.cmor_var and self.cmor_var.valid_max and self.cmor_var.valid_min:
            self.max_value = float(self.cmor_var.valid_max)
            self.min_value = float(self.cmor_var.valid_min)
            self.check_limit_values = False
        else:
            self.min_value = None
            self.max_value = None
            self.check_limit_values = True
        return self.domain == other.domain and self.variable == other.variable and self.leadtime == other.leadtime
        return 'Climatological percentile Variable: {0}:{1}' \
               'Bins: {3}'.format(self.domain, self.variable, self.leadtime, self.num_bins)

    @classmethod
    def generate_jobs(cls, diags, options):
        """
        Creates a job for each chunk to compute the diagnostic

        :param diags: Diagnostics manager class
        :type diags: Diags
        :param options: domain, variable, percentil number, maximum depth (level)
        :type options: list[str]
        :return:
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        options_available = (DiagnosticDomainOption(),
                             DiagnosticVariableOption(),
                             DiagnosticIntOption('bins', 2000))
        options = cls.process_options(options, options_available)
        job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'],
                                                 options['bins'], diags.config.experiment))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def request_data(self):
        self.leadtime_files = {}
        for startdate, member in self.experiment_config.get_member_list():
            if startdate not in self.leadtime_files:
                self.leadtime_files[startdate] = {}
            Log.debug('Retrieving startdate {0}', startdate)
            self.leadtime_files[startdate][member] = self.request_chunk(self.domain, self.variable, startdate,
                                                                        member, 1)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def declare_data_generated(self):
        var_name = self.variable + 'prct'
        self.percentiles_file = self.declare_chunk(self.domain, var_name, None, None, None,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                                   frequency=Frequencies.climatology, vartype=VariableType.STATISTIC)

    def compute(self):
        """
        Runs the diagnostic
        """
        iris.FUTURE.netcdf_promote = True
        for startdate in self.leadtime_files.keys():
            Log.debug('Getting data for startdate {0}', startdate)
            for member in self.leadtime_files[startdate].keys():
                data_cube = self._load_cube(startdate, member)
                self._get_value_interval(data_cube)
                self._get_realizations_present(data_cube)
                if self.lat_coord is None:
                    self.units = data_cube.units
                    self.lat_coord = data_cube.coord('latitude')
                    self.lon_coord = data_cube.coord('longitude')

        distribution = self._get_distribution()
        percentile_values = self._calculate_percentiles(distribution)
        self._save_results(percentile_values)
    def _load_cube(self, startdate, member):
        date = parse_date(startdate)
        reference = PartialDateTime(year=date.year, month=date.month)
        handler = Utils.openCdf(self.leadtime_files[startdate][member].local_file)
        if 'realization' in handler.variables:
            handler.variables[self.variable].coordinates = 'realization'
        handler.close()
        data_cube = iris.load_cube(self.leadtime_files[startdate][member].local_file)
        def assign_leadtime(coord, x):
            date = coord.units.num2date(x)
            return PartialDateTime(date.year, date.month) - reference
        iris.coord_categorisation.add_categorised_coord(data_cube, 'leadtime', 'time', assign_leadtime)
        return data_cube
    def _save_results(self, percentile_values):
        temp = TempFile.get()
        results = iris.cube.Cube(percentile_values, var_name='percent', units=self.units)
        percentile_coord = iris.coords.DimCoord(self.percentiles, long_name='percentile')
        results.add_dim_coord(percentile_coord, 0)
        results.add_dim_coord(self.lat_coord, 1)
        results.add_dim_coord(self.lon_coord, 2)
        iris.FUTURE.netcdf_no_unlimited = True
        iris.save(results, temp, zlib=True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.percentiles_file.set_local_file(temp, rename_var='percent')
    def _calculate_percentiles(self, distribution):
        Log.debug('Calculating percentiles')

            cs = np.cumsum(point_distribution)
            total = cs[-1]
            percentile_values = self.percentiles * total
            index = np.searchsorted(cs, percentile_values)
            return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index]

        distribution = np.apply_along_axis(calculate, 0, distribution)
    def _get_distribution(self):
        distribution = {}
        for startdate in self.leadtime_files.keys():
            Log.debug('Getting data for startdate {0}', startdate)
            for member in self.leadtime_files[startdate].keys():
                data_cube = self._load_cube(startdate, member)
                Log.debug('Discretizing file {0}', data_cube)
                for leadtime in data_cube.coord(leadtime).points:
                    leadtime_cube = data_cube.extract(iris.Constraint(leadtime=leadtime))
                    for realization in range(self.realizations):
                        Log.debug('Discretizing realization {0}', realization)
                        try:
                            realization_cube = leadtime_cube.extract(iris.Constraint(realization=realization+1))
                        except iris.exceptions.CoordinateNotFoundError:
                            realization_cube = data_cube
                        if distribution is None:
                            distribution[leadtime] = self._calculate_distribution(realization_cube)
                        else:
                            distribution[leadtime] += self._calculate_distribution(realization_cube)
        return distribution
    def _get_realizations_present(self, data_cube):
        ensemble_dimensions = ('realization', 'ensemble')
        for ensemble_dimension in ensemble_dimensions:
            try:
                realizations = data_cube.coord(ensemble_dimension).shape[0]
                break
            except iris.exceptions.CoordinateNotFoundError:
                pass

        if self.realizations is None:
            self.realizations = realizations
        if realizations != self.realizations:
            self.realizations = min(self.realizations, realizations)
            Log.warning('Different number of realizations in the data used by diagnostic {0}', self)

    def _get_value_interval(self, data_cube):
        file_max = np.amax(data_cube.data)
        file_min = np.amin(data_cube.data)
        self.max_value = max(self.min_value, file_max)
        if self.min_value is None:
            self.min_value = file_min
        else:
            self.min_value = min(self.min_value, file_min)

    def _calculate_distribution(self, data_cube):
        def calculate_histogram(time_series):
            histogram, self._bins = np.histogram(time_series, bins=self.num_bins,
                                                 range=(self.min_value, self.max_value))
        return np.apply_along_axis(calculate_histogram, 0, data_cube.data)