Newer
Older
# coding=utf-8
from bscearth.utils.date import parse_date, add_months
from bscearth.utils.log import Log
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \
Javier Vegas-Regidor
committed
DiagnosticIntOption, DiagnosticListIntOption, DiagnosticFloatOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable_type import VariableType
import numpy as np
import iris
import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.exceptions
import iris.coords
Javier Vegas-Regidor
committed
import math
class ClimatologicalPercentile(Diagnostic):
"""
Calculates the climatological percentiles for the given leadtime
:param data_manager: data management object
:type data_manager: DataManager
:param variable: variable to average
:type variable: str
:param experiment_config:
:type experiment_config: ExperimentConfig
"""
alias = 'climpercent'
"Diagnostic alias for the configuration file"
Percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
Javier Vegas-Regidor
committed
def __init__(self, data_manager, domain, variable, num_bins, start_year, end_year, min_value, max_value,
forecast_month, experiment_config):
Diagnostic.__init__(self, data_manager)
self.variable = variable
self.domain = domain
self.experiment_config = experiment_config
Javier Vegas-Regidor
committed
self.realizations = None
self.lat_coord = None
self.lon_coord = None
self.num_bins = num_bins
self._bins = None
self.start_year = start_year
self.end_year = end_year
self.forecast_month = forecast_month
self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True)
Javier Vegas-Regidor
committed
if not math.isnan(min_value):
self.min_value = min_value
self.check_min_value = False
elif self.cmor_var and self.cmor_var.valid_min:
Javier Vegas-Regidor
committed
self.min_value = float(self.cmor_var.valid_min)
Javier Vegas-Regidor
committed
self.check_min_value = False
Javier Vegas-Regidor
committed
else:
self.min_value = None
Javier Vegas-Regidor
committed
self.check_min_value = True
if not math.isnan(max_value):
self.max_value = max_value
self.check_max_value = False
elif self.cmor_var and self.cmor_var.valid_min:
self.max_value = float(self.cmor_var.valid_max)
self.check_max_value = False
else:
Javier Vegas-Regidor
committed
self.max_value = None
Javier Vegas-Regidor
committed
self.check_max_value = True
def __eq__(self, other):
return self.domain == other.domain and self.variable == other.variable and self.num_bins == other.num_bins and \
self.min_value == other.min_value and self.max_value == other.max_value and \
self.start_year == other.start_year and self.end_year == other.end_year and \
self.forecast_month == other.forecast_month
def __str__(self):
return 'Climatological percentile Variable: {0.domain}:{0.variable} Period: {0.start_year}-{0.end_year} ' \
'Forecast month: {0.forecast_month} Bins: {0.num_bins}'.format(self)
@classmethod
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: domain, variable, percentil number, maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(),
DiagnosticVariableOption(),
DiagnosticIntOption('start_year'),
DiagnosticIntOption('end_year'),
DiagnosticListIntOption('forecast_month'),
Javier Vegas-Regidor
committed
DiagnosticFloatOption('min_value', float('nan')),
DiagnosticFloatOption('max_value', float('nan')),
Javier Vegas-Regidor
committed
options = cls.process_options(options, options_available)
job_list = list()
for forecast_month in options['forecast_month']:
job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'],
options['bins'], options['start_year'], options['end_year'],
Javier Vegas-Regidor
committed
options['min_value'], options['max_value'],
forecast_month, diags.config.experiment))
return job_list
def requested_startdates(self):
return ['{0}{1:02}01'.format(year, self.forecast_month) for year in range(self.start_year, self.end_year+1)]
self.leadtime_files = {}
if startdate not in self.leadtime_files:
self.leadtime_files[startdate] = {}
Log.debug('Retrieving startdate {0}', startdate)
self.leadtime_files[startdate] = self.request_chunk(self.domain, self.variable, startdate, None, None)
var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month}'.format(self)
self.percentiles_file = self.declare_chunk(self.domain, var_name, None, None, None,
frequency=Frequencies.climatology, vartype=VariableType.STATISTIC)
def compute(self):
"""
Runs the diagnostic
"""
iris.FUTURE.netcdf_promote = True
for startdate in self.leadtime_files.keys():
Log.debug('Getting data for startdate {0}', startdate)
data_cube = self._load_cube(startdate)
self._get_value_interval(data_cube)
self._get_realizations_present(data_cube)
if self.lat_coord is None:
self.units = data_cube.units
self.lat_coord = data_cube.coord('latitude')
self.lon_coord = data_cube.coord('longitude')
Javier Vegas-Regidor
committed
Log.info('Range: [{0}, {1}]', self.min_value, self.max_value)
distribution = self._get_distribution()
Javier Vegas-Regidor
committed
percentile_values = self._calculate_percentiles(distribution)
self._save_results(percentile_values)
handler = Utils.openCdf(self.leadtime_files[startdate].local_file)
if 'realization' in handler.variables:
handler.variables[self.variable].coordinates = 'realization'
handler.close()
data_cube = iris.load_cube(self.leadtime_files[startdate].local_file)
lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar)
leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)}
def assign_leadtime(coord, x):
leadtime_month = 1
partial_date = leadtimes[leadtime_month]
while coord.units.num2date(x) >= partial_date:
leadtime_month += 1
try:
partial_date = leadtimes[leadtime_month]
except KeyError:
new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar)
partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day)
leadtimes[leadtime_month] = partial_date
return leadtime_month
iris.coord_categorisation.add_categorised_coord(data_cube, 'leadtime', 'time', assign_leadtime)
return data_cube
def _save_results(self, percentile_values):
temp = TempFile.get()
percentile_coord = iris.coords.DimCoord(ClimatologicalPercentile.Percentiles, long_name='percentile')
results = iris.cube.CubeList()
for leadtime in percentile_values.keys():
result = iris.cube.Cube(percentile_values[leadtime], var_name='percent', units=self.units)
result.add_dim_coord(percentile_coord, 0)
result.add_dim_coord(self.lat_coord, 1)
result.add_dim_coord(self.lon_coord, 2)
result.add_aux_coord(iris.coords.AuxCoord(np.int8(leadtime), long_name='leadtime'))
results.append(result)
iris.FUTURE.netcdf_no_unlimited = True
iris.save(results.merge_cube(), temp, zlib=True)
Javier Vegas-Regidor
committed
self.percentiles_file.set_local_file(temp, rename_var='percent')
Javier Vegas-Regidor
committed
def _calculate_percentiles(self, distribution):
Log.debug('Calculating percentiles')
Javier Vegas-Regidor
committed
def calculate(point_distribution):
Javier Vegas-Regidor
committed
cs = np.cumsum(point_distribution)
total = cs[-1]
percentile_values = ClimatologicalPercentile.Percentiles * total
Javier Vegas-Regidor
committed
index = np.searchsorted(cs, percentile_values)
return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index]
Log.debug('Calculating leadtime {0}', leadtime)
percentiles[leadtime] = np.apply_along_axis(calculate, 0, dist)
return percentiles
Javier Vegas-Regidor
committed
def _get_distribution(self):
distribution = {}
Log.info('Getting data for startdate {0}', startdate)
data_cube = self._load_cube(startdate)
Log.debug('Discretizing file {0}', data_cube)
for leadtime in set(data_cube.coord('leadtime').points):
Log.debug('Discretizing leadtime {0}', leadtime)
leadtime_cube = data_cube.extract(iris.Constraint(leadtime=leadtime))
for realization in range(self.realizations):
Log.debug('Discretizing realization {0}', realization)
try:
realization_cube = leadtime_cube.extract(iris.Constraint(realization=realization+1))
except iris.exceptions.CoordinateNotFoundError:
realization_cube = leadtime_cube
if realization_cube is None and realization == 0:
realization_cube = leadtime_cube
if leadtime not in distribution:
distribution[leadtime] = self._calculate_distribution(realization_cube)
else:
distribution[leadtime] += self._calculate_distribution(realization_cube)
Javier Vegas-Regidor
committed
def _get_realizations_present(self, data_cube):
realizations = 1
ensemble_dimensions = ('realization', 'ensemble')
for ensemble_dimension in ensemble_dimensions:
try:
realizations = data_cube.coord(ensemble_dimension).shape[0]
break
except iris.exceptions.CoordinateNotFoundError:
pass
if self.realizations is None:
self.realizations = realizations
if realizations != self.realizations:
self.realizations = min(self.realizations, realizations)
Log.warning('Different number of realizations in the data used by diagnostic {0}', self)
def _get_value_interval(self, data_cube):
if self.check_min_value or self.check_max_value:
for time_slice in data_cube.slices_over('time'):
if self.check_min_value:
Javier Vegas-Regidor
committed
file_min = np.amin(time_slice.data)
if self.min_value is None:
self.min_value = file_min
self.min_value = min(self.min_value, file_min)
if self.check_max_value:
Javier Vegas-Regidor
committed
file_max = np.amax(time_slice.data)
self.max_value = max(self.min_value, file_max)
def _calculate_distribution(self, data_cube):
def calculate_histogram(time_series):
histogram, self._bins = np.histogram(time_series, bins=self.num_bins,
range=(self.min_value, self.max_value))
Javier Vegas-Regidor
committed
return histogram
return np.apply_along_axis(calculate_histogram, 0, data_cube.data)