Newer
Older
# coding=utf-8
from autosubmit.config.log import Log
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.utils import Utils, TempFile
Javier Vegas-Regidor
committed
from earthdiagnostics.variable import Domain, Variable, VarType
import numpy as np
class ClimatologicalPercentile(Diagnostic):
"""
Calculates the climatological percentiles for the given leadtimes
:param data_manager: data management object
:type data_manager: DataManager
:param variable: variable to average
:type variable: str
:param experiment_config:
:type experiment_config: ExperimentConfig
"""
alias = 'climpercent'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, domain, variable, leadtimes, experiment_config):
Diagnostic.__init__(self, data_manager)
self.variable = variable
self.domain = domain
self.leadtimes = leadtimes
self.experiment_config = experiment_config
Javier Vegas-Regidor
committed
self.realizations = None
self.lat_len = None
self.lon_len = None
Javier Vegas-Regidor
committed
self.num_bins = 2000
self._bins = None
Javier Vegas-Regidor
committed
self.percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
self.cmor_var = Variable.get_variable(variable, silent=True)
if self.cmor_var and self.cmor_var.valid_max and self.cmor_var.valid_min:
self.max_value = float(self.cmor_var.valid_max)
self.min_value = float(self.cmor_var.valid_min)
self.check_limit_values = False
else:
self.min_value = None
self.max_value = None
self.check_limit_values = True
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __eq__(self, other):
return self.domain == other.domain and self.variable == other.variable and self.leadtimes == other.leadtimes
def __str__(self):
return 'Climatological percentile Variable: {0}:{1} Leadtimes: {2}'.format(self.domain, self.variable,
self.leadtimes)
@classmethod
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: domain, variable, percentil number, maximum depth (level)
:type options: list[str]
:return:
"""
num_options = len(options) - 1
if num_options < 3:
raise Exception('You must specify the variable (and its domain) and the leadtimes you want to compute '
'the percentiles on')
if num_options > 3:
raise Exception('You must specify three parameters for the climatological percentiles')
domain = Domain(options[1])
variable = options[2]
leadtimes = [int(i) for i in options[3].split('-')]
job_list = list()
job_list.append(ClimatologicalPercentile(diags.data_manager, domain, variable, leadtimes,
diags.config.experiment))
return job_list
def compute(self):
"""
Runs the diagnostic
"""
Javier Vegas-Regidor
committed
member_files = self._get_data()
Javier Vegas-Regidor
committed
distribution = self._get_distribution(member_files)
Javier Vegas-Regidor
committed
percentile_values = self._calculate_percentiles(distribution)
Javier Vegas-Regidor
committed
self._save_results(percentile_values)
Javier Vegas-Regidor
committed
def _save_results(self, percentile_values):
temp = TempFile.get()
handler = Utils.openCdf(temp, 'w')
handler.createDimension('percentile', len(self.percentiles))
percentile_var = handler.createVariable('percentile', float, ('percentile',))
percentile_var[:] = self.percentiles
handler.createDimension('lat', self.lat_len)
lat_var = handler.createVariable('lat', float, ('lat',))
lat_var[:] = self.lat
handler.createDimension('lon', self.lon_len)
lon_var = handler.createVariable('lon', float, ('lon',))
lon_var[:] = self.lon
p75_var = handler.createVariable('percent', float, ('percentile', 'lat', 'lon'))
Javier Vegas-Regidor
committed
p75_var[...] = percentile_values
handler.close()
Javier Vegas-Regidor
committed
self.send_file(temp, self.domain, self.variable + 'percent', None, None, frequency='clim', rename_var='percent',
vartype=VarType.STATISTIC)
Javier Vegas-Regidor
committed
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def _calculate_percentiles(self, distribution):
Log.debug('Calculating percentiles')
def calculate_percentiles(point_distribution):
cs = np.cumsum(point_distribution)
total = cs[-1]
percentile_values = self.percentiles * total
index = np.searchsorted(cs, percentile_values)
return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index]
distribution = np.apply_along_axis(calculate_percentiles, 0, distribution)
return distribution
def _get_distribution(self, member_files):
distribution = None
for memberfile in member_files:
Log.debug('Discretizing file {0}', memberfile)
handler = Utils.openCdf(memberfile)
for realization in range(self.realizations):
if distribution is None:
distribution = self._calculate_distribution(handler, realization)
else:
distribution += self._calculate_distribution(handler, realization)
handler.close()
return distribution
def _get_data(self):
member_files = list()
for startdate, member in self.experiment_config.get_member_list():
Log.debug('Retrieving startdate {0}', startdate)
memberfile = self.data_manager.get_leadtimes(self.domain, self.variable, startdate, member, self.leadtimes)
Log.debug('Getting data for startdate {0}', startdate)
handler = Utils.openCdf(memberfile)
self._get_value_interval(handler)
self._get_realizations_present(handler)
self._get_var_size(handler)
handler.close()
member_files.append(memberfile)
return member_files
def _get_realizations_present(self, handler):
realizations = 1
if 'realization' in handler.dimensions:
realizations = handler.dimensions['realization'].size
if 'ensemble' in handler.dimensions:
realizations = handler.dimensions['ensemble'].size
if self.realizations is None:
self.realizations = realizations
if realizations != self.realizations:
self.realizations = min(self.realizations, realizations)
Log.warning('Different number of realizations in the data used by diagnostic {0}', self)
def _get_value_interval(self, handler):
Javier Vegas-Regidor
committed
if not self.check_limit_values:
return
values = handler.variables[self.variable][:]
file_max = np.amax(values)
file_min = np.amin(values)
self.max_value = max(self.min_value, file_max)
if self.min_value is None:
self.min_value = file_min
else:
self.min_value = min(self.min_value, file_min)
def _calculate_distribution(self, handler, realization):
Log.debug('Discretizing realization {0}', realization)
def calculate_histogram(time_series):
Javier Vegas-Regidor
committed
histogram, self._bins = np.histogram(time_series, bins=self.num_bins, range=(self.min_value, self.max_value))
return histogram
var = handler.variables[self.variable]
return np.apply_along_axis(calculate_histogram, 0, var[:, realization, ...])
def _get_var_size(self, handler):
if self.lat_len is not None:
return
self.lat = handler.variables['latitude'][:]
self.lon = handler.variables['longitude'][:]
self.lat = handler.dimensions['latitude'].size
self.lon = handler.dimensions['longitude'].size