Newer
Older
# coding=utf-8
from bscearth.utils.date import add_months, parse_date
from bscearth.utils.log import Log
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \
DiagnosticListIntOption, DiagnosticIntOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable_type import VariableType
import numpy as np
import iris
import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.exceptions
import iris.coords
class ClimatologicalPercentile(Diagnostic):
"""
Calculates the climatological percentiles for the given leadtime
:param data_manager: data management object
:type data_manager: DataManager
:param variable: variable to average
:type variable: str
:param experiment_config:
:type experiment_config: ExperimentConfig
"""
alias = 'climpercent'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, domain, variable, num_bins, experiment_config):
Diagnostic.__init__(self, data_manager)
self.variable = variable
self.domain = domain
self.experiment_config = experiment_config
Javier Vegas-Regidor
committed
self.realizations = None
self.lat_coord = None
self.lon_coord = None
self.num_bins = num_bins
self._bins = None
Javier Vegas-Regidor
committed
self.percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True)
Javier Vegas-Regidor
committed
if self.cmor_var and self.cmor_var.valid_max and self.cmor_var.valid_min:
self.max_value = float(self.cmor_var.valid_max)
self.min_value = float(self.cmor_var.valid_min)
self.check_limit_values = False
else:
self.min_value = None
self.max_value = None
self.check_limit_values = True
def __eq__(self, other):
return self.domain == other.domain and self.variable == other.variable and self.leadtime == other.leadtime
def __str__(self):
return 'Climatological percentile Variable: {0}:{1}' \
'Bins: {3}'.format(self.domain, self.variable, self.leadtime, self.num_bins)
@classmethod
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: domain, variable, percentil number, maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(),
DiagnosticVariableOption(),
Javier Vegas-Regidor
committed
options = cls.process_options(options, options_available)
job_list = list()
job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'],
options['bins'], diags.config.experiment))
return job_list
self.leadtime_files = {}
for startdate, member in self.experiment_config.get_member_list():
if startdate not in self.leadtime_files:
self.leadtime_files[startdate] = {}
Log.debug('Retrieving startdate {0}', startdate)
self.leadtime_files[startdate][member] = self.request_chunk(self.domain, self.variable, startdate,
member, 1)
var_name = self.variable + 'prct'
self.percentiles_file = self.declare_chunk(self.domain, var_name, None, None, None,
frequency=Frequencies.climatology, vartype=VariableType.STATISTIC)
def compute(self):
"""
Runs the diagnostic
"""
iris.FUTURE.netcdf_promote = True
for startdate in self.leadtime_files.keys():
Log.debug('Getting data for startdate {0}', startdate)
for member in self.leadtime_files[startdate].keys():
data_cube = self._load_cube(startdate, member)
self._get_value_interval(data_cube)
self._get_realizations_present(data_cube)
if self.lat_coord is None:
self.units = data_cube.units
self.lat_coord = data_cube.coord('latitude')
self.lon_coord = data_cube.coord('longitude')
distribution = self._get_distribution()
Javier Vegas-Regidor
committed
percentile_values = self._calculate_percentiles(distribution)
self._save_results(percentile_values)
def _load_cube(self, startdate, member):
date = parse_date(startdate)
reference = PartialDateTime(year=date.year, month=date.month)
handler = Utils.openCdf(self.leadtime_files[startdate][member].local_file)
if 'realization' in handler.variables:
handler.variables[self.variable].coordinates = 'realization'
handler.close()
data_cube = iris.load_cube(self.leadtime_files[startdate][member].local_file)
def assign_leadtime(coord, x):
date = coord.units.num2date(x)
return PartialDateTime(date.year, date.month) - reference
iris.coord_categorisation.add_categorised_coord(data_cube, 'leadtime', 'time', assign_leadtime)
return data_cube
def _save_results(self, percentile_values):
temp = TempFile.get()
results = iris.cube.Cube(percentile_values, var_name='percent', units=self.units)
percentile_coord = iris.coords.DimCoord(self.percentiles, long_name='percentile')
results.add_dim_coord(percentile_coord, 0)
results.add_dim_coord(self.lat_coord, 1)
results.add_dim_coord(self.lon_coord, 2)
iris.FUTURE.netcdf_no_unlimited = True
iris.save(results, temp, zlib=True)
Javier Vegas-Regidor
committed
self.percentiles_file.set_local_file(temp, rename_var='percent')
Javier Vegas-Regidor
committed
def _calculate_percentiles(self, distribution):
Log.debug('Calculating percentiles')
Javier Vegas-Regidor
committed
def calculate(point_distribution):
Javier Vegas-Regidor
committed
cs = np.cumsum(point_distribution)
total = cs[-1]
percentile_values = self.percentiles * total
index = np.searchsorted(cs, percentile_values)
return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index]
Javier Vegas-Regidor
committed
distribution = np.apply_along_axis(calculate, 0, distribution)
Javier Vegas-Regidor
committed
return distribution
def _get_distribution(self):
distribution = {}
for startdate in self.leadtime_files.keys():
Javier Vegas-Regidor
committed
Log.debug('Getting data for startdate {0}', startdate)
for member in self.leadtime_files[startdate].keys():
data_cube = self._load_cube(startdate, member)
Log.debug('Discretizing file {0}', data_cube)
for leadtime in data_cube.coord(leadtime).points:
leadtime_cube = data_cube.extract(iris.Constraint(leadtime=leadtime))
for realization in range(self.realizations):
Log.debug('Discretizing realization {0}', realization)
try:
realization_cube = leadtime_cube.extract(iris.Constraint(realization=realization+1))
except iris.exceptions.CoordinateNotFoundError:
realization_cube = data_cube
if distribution is None:
distribution[leadtime] = self._calculate_distribution(realization_cube)
else:
distribution[leadtime] += self._calculate_distribution(realization_cube)
return distribution
Javier Vegas-Regidor
committed
def _get_realizations_present(self, data_cube):
realizations = 1
ensemble_dimensions = ('realization', 'ensemble')
for ensemble_dimension in ensemble_dimensions:
try:
realizations = data_cube.coord(ensemble_dimension).shape[0]
break
except iris.exceptions.CoordinateNotFoundError:
pass
if self.realizations is None:
self.realizations = realizations
if realizations != self.realizations:
self.realizations = min(self.realizations, realizations)
Log.warning('Different number of realizations in the data used by diagnostic {0}', self)
def _get_value_interval(self, data_cube):
Javier Vegas-Regidor
committed
if not self.check_limit_values:
return
file_max = np.amax(data_cube.data)
file_min = np.amin(data_cube.data)
self.max_value = max(self.min_value, file_max)
if self.min_value is None:
self.min_value = file_min
else:
self.min_value = min(self.min_value, file_min)
def _calculate_distribution(self, data_cube):
def calculate_histogram(time_series):
histogram, self._bins = np.histogram(time_series, bins=self.num_bins,
range=(self.min_value, self.max_value))
Javier Vegas-Regidor
committed
return histogram
return np.apply_along_axis(calculate_histogram, 0, data_cube.data)