climatologicalpercentile.py 7.88 KB
Newer Older
# coding=utf-8
from autosubmit.config.log import Log

from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticDomainOption, DiagnosticListIntOption, \
    DiagnosticIntOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable_type import VariableType
import numpy as np


class ClimatologicalPercentile(Diagnostic):
    """
    Calculates the climatological percentiles for the given leadtimes

    :param data_manager: data management object
    :type data_manager: DataManager
    :param variable: variable to average
    :type variable: str
    :param experiment_config:
    :type experiment_config: ExperimentConfig
    """

    alias = 'climpercent'
    "Diagnostic alias for the configuration file"

    def __init__(self, data_manager, domain, variable, leadtimes, num_bins, experiment_config):
        Diagnostic.__init__(self, data_manager)
        self.variable = variable
        self.domain = domain
        self.leadtimes = leadtimes
        self.experiment_config = experiment_config
        self.realizations = None
        self.lat_len = None
        self.lon_len = None
        self.percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
        self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True)
        if self.cmor_var and self.cmor_var.valid_max and self.cmor_var.valid_min:
            self.max_value = float(self.cmor_var.valid_max)
            self.min_value = float(self.cmor_var.valid_min)
            self.check_limit_values = False
        else:
            self.min_value = None
            self.max_value = None
            self.check_limit_values = True

    def __eq__(self, other):
        return self.domain == other.domain and self.variable == other.variable and self.leadtimes == other.leadtimes

    def __str__(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return 'Climatological percentile Variable: {0}:{1} Leadtimes: {2} ' \
               'Bins: {3}'.format(self.domain, self.variable, self.leadtimes, self.num_bins)

    @classmethod
    def generate_jobs(cls, diags, options):
        """
        Creates a job for each chunk to compute the diagnostic

        :param diags: Diagnostics manager class
        :type diags: Diags
        :param options: domain, variable, percentil number, maximum depth (level)
        :type options: list[str]
        :return:
        """
        options_available = (DiagnosticDomainOption('domain'),
                             DiagnosticOption('variable'),
                             DiagnosticListIntOption('leadtimes'),
                             DiagnosticIntOption('bins', 2000))
        options = cls.process_options(options, options_available)
        job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'],
                                                 options['leadtimes'], options['bins'],
                                                 diags.config.experiment))
        return job_list

    def compute(self):
        """
        Runs the diagnostic
        """
        distribution = self._get_distribution(member_files)
        percentile_values = self._calculate_percentiles(distribution)
    def _save_results(self, percentile_values):
        temp = TempFile.get()
        handler = Utils.openCdf(temp, 'w')

        handler.createDimension('percentile', len(self.percentiles))
        percentile_var = handler.createVariable('percentile', float, ('percentile',))
        percentile_var[:] = self.percentiles

        handler.createDimension('lat', self.lat.size)
        lat_var = handler.createVariable('lat', float, ('lat',))
        lat_var[:] = self.lat

        handler.createDimension('lon', self.lon.size)
        lon_var = handler.createVariable('lon', float, ('lon',))
        lon_var[:] = self.lon

        percentile_var = handler.createVariable('percent', float, ('percentile', 'lat', 'lon'))
        percentile_var[...] = percentile_values
        self.send_file(temp, self.domain, self.variable + '_percentiles', None, None, frequency=Frequencies.climatology,
                       rename_var='percent', vartype=VariableType.STATISTIC)
    def _calculate_percentiles(self, distribution):
        Log.debug('Calculating percentiles')

            cs = np.cumsum(point_distribution)
            total = cs[-1]
            percentile_values = self.percentiles * total
            index = np.searchsorted(cs, percentile_values)
            return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index]

        distribution = np.apply_along_axis(calculate, 0, distribution)
        return distribution

    def _get_distribution(self, member_files):
        distribution = None
        for memberfile in member_files:
            Log.debug('Discretizing file {0}', memberfile)
            handler = Utils.openCdf(memberfile)
            for realization in range(self.realizations):
                if distribution is None:
                    distribution = self._calculate_distribution(handler, realization)
                else:
                    distribution += self._calculate_distribution(handler, realization)
            handler.close()
        return distribution

    def _get_data(self):
        member_files = list()
        for startdate, member in self.experiment_config.get_member_list():
            Log.debug('Retrieving startdate {0}', startdate)
            memberfile = self.data_manager.get_leadtimes(self.domain, self.variable, startdate, member, self.leadtimes)

            Log.debug('Getting data for startdate {0}', startdate)
            handler = Utils.openCdf(memberfile)
            self._get_value_interval(handler)
            self._get_realizations_present(handler)
            self._get_var_size(handler)
            handler.close()

            member_files.append(memberfile)
        return member_files

    def _get_realizations_present(self, handler):
        realizations = 1
        if 'realization' in handler.dimensions:
            realizations = handler.dimensions['realization'].size
        if 'ensemble' in handler.dimensions:
            realizations = handler.dimensions['ensemble'].size
        if self.realizations is None:
            self.realizations = realizations
        if realizations != self.realizations:
            self.realizations = min(self.realizations, realizations)
            Log.warning('Different number of realizations in the data used by diagnostic {0}', self)

    def _get_value_interval(self, handler):
        values = handler.variables[self.variable][:]
        file_max = np.amax(values)
        file_min = np.amin(values)
        self.max_value = max(self.min_value, file_max)
        if self.min_value is None:
            self.min_value = file_min
        else:
            self.min_value = min(self.min_value, file_min)

    def _calculate_distribution(self, handler, realization):
        Log.debug('Discretizing realization {0}', realization)

        def calculate_histogram(time_series):
            histogram, self._bins = np.histogram(time_series, bins=self.num_bins,
                                                 range=(self.min_value, self.max_value))

        var = handler.variables[self.variable]
        if 'realization' in var.dimensions or 'ensemble' in var.dimensions:
            return np.apply_along_axis(calculate_histogram, 0, var[:, realization, ...])
        else:
            return np.apply_along_axis(calculate_histogram, 0, var[:])

    def _get_var_size(self, handler):
        if self.lat_len is not None:
            return
        self.lat = handler.variables['latitude'][:]
        self.lon = handler.variables['longitude'][:]