Newer
Older
# coding=utf-8
from bscearth.utils.log import Log
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticDomainOption, DiagnosticListIntOption, \
DiagnosticIntOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable_type import VariableType
import numpy as np
class ClimatologicalPercentile(Diagnostic):
"""
Calculates the climatological percentiles for the given leadtimes
:param data_manager: data management object
:type data_manager: DataManager
:param variable: variable to average
:type variable: str
:param experiment_config:
:type experiment_config: ExperimentConfig
"""
alias = 'climpercent'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, domain, variable, leadtimes, num_bins, experiment_config):
Diagnostic.__init__(self, data_manager)
self.variable = variable
self.domain = domain
self.leadtimes = leadtimes
self.experiment_config = experiment_config
Javier Vegas-Regidor
committed
self.realizations = None
self.lat_len = None
self.lon_len = None
self.num_bins = num_bins
self._bins = None
Javier Vegas-Regidor
committed
self.percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True)
Javier Vegas-Regidor
committed
if self.cmor_var and self.cmor_var.valid_max and self.cmor_var.valid_min:
self.max_value = float(self.cmor_var.valid_max)
self.min_value = float(self.cmor_var.valid_min)
self.check_limit_values = False
else:
self.min_value = None
self.max_value = None
self.check_limit_values = True
def __eq__(self, other):
return self.domain == other.domain and self.variable == other.variable and self.leadtimes == other.leadtimes
def __str__(self):
return 'Climatological percentile Variable: {0}:{1} Leadtimes: {2} ' \
'Bins: {3}'.format(self.domain, self.variable, self.leadtimes, self.num_bins)
@classmethod
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: domain, variable, percentil number, maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption('domain'),
DiagnosticOption('variable'),
DiagnosticListIntOption('leadtimes'),
DiagnosticIntOption('bins', 2000))
Javier Vegas-Regidor
committed
options = cls.process_options(options, options_available)
job_list = list()
job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'],
options['leadtimes'], options['bins'],
diags.config.experiment))
return job_list
def compute(self):
"""
Runs the diagnostic
"""
Javier Vegas-Regidor
committed
member_files = self._get_data()
Javier Vegas-Regidor
committed
distribution = self._get_distribution(member_files)
Javier Vegas-Regidor
committed
percentile_values = self._calculate_percentiles(distribution)
Javier Vegas-Regidor
committed
self._save_results(percentile_values)
Javier Vegas-Regidor
committed
def _save_results(self, percentile_values):
temp = TempFile.get()
handler = Utils.openCdf(temp, 'w')
handler.createDimension('percentile', len(self.percentiles))
percentile_var = handler.createVariable('percentile', float, ('percentile',))
percentile_var[:] = self.percentiles
Javier Vegas-Regidor
committed
handler.createDimension('lat', self.lat.size)
lat_var = handler.createVariable('lat', float, ('lat',))
lat_var[:] = self.lat
Javier Vegas-Regidor
committed
handler.createDimension('lon', self.lon.size)
lon_var = handler.createVariable('lon', float, ('lon',))
lon_var[:] = self.lon
Javier Vegas-Regidor
committed
percentile_var = handler.createVariable('percent', float, ('percentile', 'lat', 'lon'))
percentile_var[...] = percentile_values
Javier Vegas-Regidor
committed
handler.close()
self.send_file(temp, self.domain, self.variable + '_percentiles', None, None, frequency=Frequencies.climatology,
rename_var='percent', vartype=VariableType.STATISTIC)
Javier Vegas-Regidor
committed
def _calculate_percentiles(self, distribution):
Log.debug('Calculating percentiles')
Javier Vegas-Regidor
committed
def calculate(point_distribution):
Javier Vegas-Regidor
committed
cs = np.cumsum(point_distribution)
total = cs[-1]
percentile_values = self.percentiles * total
index = np.searchsorted(cs, percentile_values)
return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index]
Javier Vegas-Regidor
committed
distribution = np.apply_along_axis(calculate, 0, distribution)
Javier Vegas-Regidor
committed
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
return distribution
def _get_distribution(self, member_files):
distribution = None
for memberfile in member_files:
Log.debug('Discretizing file {0}', memberfile)
handler = Utils.openCdf(memberfile)
for realization in range(self.realizations):
if distribution is None:
distribution = self._calculate_distribution(handler, realization)
else:
distribution += self._calculate_distribution(handler, realization)
handler.close()
return distribution
def _get_data(self):
member_files = list()
for startdate, member in self.experiment_config.get_member_list():
Log.debug('Retrieving startdate {0}', startdate)
memberfile = self.data_manager.get_leadtimes(self.domain, self.variable, startdate, member, self.leadtimes)
Log.debug('Getting data for startdate {0}', startdate)
handler = Utils.openCdf(memberfile)
self._get_value_interval(handler)
self._get_realizations_present(handler)
self._get_var_size(handler)
handler.close()
member_files.append(memberfile)
return member_files
def _get_realizations_present(self, handler):
realizations = 1
if 'realization' in handler.dimensions:
realizations = handler.dimensions['realization'].size
if 'ensemble' in handler.dimensions:
realizations = handler.dimensions['ensemble'].size
if self.realizations is None:
self.realizations = realizations
if realizations != self.realizations:
self.realizations = min(self.realizations, realizations)
Log.warning('Different number of realizations in the data used by diagnostic {0}', self)
def _get_value_interval(self, handler):
Javier Vegas-Regidor
committed
if not self.check_limit_values:
return
values = handler.variables[self.variable][:]
file_max = np.amax(values)
file_min = np.amin(values)
self.max_value = max(self.min_value, file_max)
if self.min_value is None:
self.min_value = file_min
else:
self.min_value = min(self.min_value, file_min)
def _calculate_distribution(self, handler, realization):
Log.debug('Discretizing realization {0}', realization)
def calculate_histogram(time_series):
histogram, self._bins = np.histogram(time_series, bins=self.num_bins,
range=(self.min_value, self.max_value))
Javier Vegas-Regidor
committed
return histogram
var = handler.variables[self.variable]
Javier Vegas-Regidor
committed
if 'realization' in var.dimensions or 'ensemble' in var.dimensions:
return np.apply_along_axis(calculate_histogram, 0, var[:, realization, ...])
else:
return np.apply_along_axis(calculate_histogram, 0, var[:])