Newer
Older
import os
from bscearth.utils.date import parse_date, add_months
from bscearth.utils.log import Log
from iris.time import PartialDateTime
from earthdiagnostics.diagnostic import *
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.statistics.climatologicalpercentile import ClimatologicalPercentile
from earthdiagnostics.utils import Utils, TempFile
class DaysOverPercentile(Diagnostic):
"""
Calculates the montlhy percentiles
:param data_manager: data management object
:type data_manager: DataManager
:param variable: variable to average
:type variable: str
"""
alias = 'daysover'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, domain, variable, start_year, end_year, startdate, forecast_month):
Diagnostic.__init__(self, data_manager)
self.variable = variable
self.domain = domain
self.start_year = start_year
self.end_year = end_year
self.forecast_month = forecast_month
def __eq__(self, other):
return self.startdate == other.startdate and self.domain == other.domain and \
self.variable == other.variable and self.start_year == other.start_year and \
self.end_year == other.end_year
return 'Days over percentile Startdate: {0.startdate} Variable: {0.domain}:{0.variable} ' \
'Climatology: {0.start_year}-{0.end_year}'.format(self)
@classmethod
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: domain, variable, percentil number, maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(),
DiagnosticOption('variable'),
DiagnosticIntOption('start_year'),
DiagnosticIntOption('end_year'),
DiagnosticListIntOption('forecast_month'),)
options = cls.process_options(options, options_available)
job_list = list()
for startdate in diags.config.experiment.startdates:
for forecast_month in options['forecast_month']:
job_list.append(DaysOverPercentile(diags.data_manager, options['domain'], options['variable'],
options['start_year'], options['end_year'],
return job_list
def request_data(self):
var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month}'.format(self)
self.percentiles_file = self.request_chunk(self.domain, var_name, None, None, None,
frequency=Frequencies.climatology)
self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, None, None)
def declare_data_generated(self):
var_over = self.variable + '_daysover_q{0}_{1.start_year}-{1.end_year}'
var_below = self.variable + '_daysbelow_q{0}_{1.start_year}-{1.end_year}'
self.days_over_file = {}
self.days_below_file = {}
for perc in ClimatologicalPercentile.Percentiles:
self.days_over_file[perc] = self.declare_chunk(self.domain, var_over.format(int(perc * 100), self),
None, frequency=Frequencies.monthly,
vartype=VariableType.STATISTIC)
self.days_below_file[perc] = self.declare_chunk(self.domain, var_below.format(int(perc * 100), self),
None, frequency=Frequencies.monthly,
vartype=VariableType.STATISTIC)
def compute(self):
"""
Runs the diagnostic
"""
percentiles = iris.load_cube(self.percentiles_file.local_file)
handler = Utils.openCdf(self.variable_file.local_file)
if 'realization' in handler.variables:
handler.variables[self.variable].coordinates = 'realization'
handler.close()
var = iris.load_cube(self.variable_file.local_file)
lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar)
leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)}
partial_date = leadtimes[leadtime_month]
while coord.units.num2date(x) >= partial_date:
leadtime_month += 1
try:
partial_date = leadtimes[leadtime_month]
except KeyError:
new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar)
partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day)
leadtimes[leadtime_month] = partial_date
return leadtime_month
iris.coord_categorisation.add_categorised_coord(var, 'leadtime', 'time', assign_leadtime)
iris.coord_categorisation.add_year(var, 'time')
iris.coord_categorisation.add_day_of_year(var, 'time')
try:
realization_coord = var.coord('realization')
except iris.exceptions.CoordinateNotFoundError:
realization_coord = None
self.lat_coord = var.coord('latitude')
self.lon_coord = var.coord('longitude')
results_over = {perc: iris.cube.CubeList() for perc in ClimatologicalPercentile.Percentiles}
results_below = {perc: iris.cube.CubeList() for perc in ClimatologicalPercentile.Percentiles}
var_daysover = 'days_over'
var_days_below = 'days_below'
long_name_days_over = 'Proportion of days over a given percentile for {0.start_year}-{0.end_year} ' \
'climatology'.format(self)
long_name_days_below = 'Proportion of days below a given percentile for {0.start_year}-{0.end_year} ' \
'climatology'.format(self)
Log.debug('Computing startdate {0} leadtime {1}', self.startdate, leadtime)
leadtime_slice = var.extract(iris.Constraint(leadtime=leadtime))
percentiles_leadtime = percentiles.extract(iris.Constraint(leadtime=leadtime))
else:
percentiles_leadtime = percentiles
time_coord = iris.coords.AuxCoord.from_coord(leadtime_slice.coord('time'))
first_time = time_coord.points[0]
last_time = time_coord.points[-1]
timesteps = leadtime_slice.coord('time').shape[0]
time_coord = time_coord.copy(first_time + (last_time - first_time) / 2, (first_time, last_time))
for percentile_slice in percentiles_leadtime.slices_over('percentile'):
percentile = percentile_slice.coord('percentile').points[0]
days_over = np.sum(leadtime_slice.data > percentile_slice.data, 0) / float(timesteps)
result = self.create_results_cube(days_over, percentile, realization_coord,
time_coord, var_daysover, long_name_days_over)
days_below = np.sum(leadtime_slice.data < percentile_slice.data, 0) / float(timesteps)
result = self.create_results_cube(days_below, percentile, realization_coord,
time_coord, var_days_below, long_name_days_below)
Log.debug('Saving percentiles startdate {0}', self.startdate)
for perc in ClimatologicalPercentile.Percentiles:
iris.FUTURE.netcdf_no_unlimited = True
self.days_over_file[perc].set_local_file(self.save_to_file(perc, results_over, var_daysover),
rename_var=var_daysover)
self.days_below_file[perc].set_local_file(self.save_to_file(perc, results_below, var_days_below),
rename_var=var_days_below)
del self.days_over_file
del self.days_below_file
del self.lat_coord
del self.lon_coord
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def save_to_file(self, perc, results_over, var_daysover):
temp = TempFile.get()
iris.save(results_over[perc].merge_cube(), temp, zlib=True, unlimited_dimensions=['time'])
Utils.rename_variables(temp, {'dim2': 'ensemble', 'dim1': 'ensemble'},
must_exist=False, rename_dimension=True)
handler = Utils.openCdf(temp)
if 'time' not in handler.dimensions:
new_file = TempFile.get()
new_handler = Utils.openCdf(new_file, 'w')
new_handler.createDimension('time', 1)
for dimension in handler.dimensions:
Utils.copy_dimension(handler, new_handler, dimension)
for variable in handler.variables.keys():
if variable in (var_daysover, 'time', 'time_bnds'):
continue
Utils.copy_variable(handler, new_handler, variable)
old_var = handler.variables[var_daysover]
new_var = new_handler.createVariable(var_daysover, old_var.dtype, ('time',) + old_var.dimensions,
zlib=True, fill_value=1.0e20)
Utils.copy_attributes(new_var, old_var)
new_var[0, :] = old_var[:]
old_var = handler.variables['time']
new_var = new_handler.createVariable('time', old_var.dtype, ('time',))
Utils.copy_attributes(new_var, old_var)
new_var[0] = old_var[0]
old_var = handler.variables['time_bnds']
new_var = new_handler.createVariable('time_bnds', old_var.dtype, ('time',) + old_var.dimensions)
Utils.copy_attributes(new_var, old_var)
new_var[0, :] = old_var[:]
new_handler.close()
os.remove(temp)
temp = new_file
handler.close()
return temp
def create_results_cube(self, days_over, percentile, realization_coord, time_coord,
var_name, long_name):
result = iris.cube.Cube(days_over.astype(np.float32), var_name=var_name, long_name=long_name, units=1.0)
if realization_coord is not None:
result.add_aux_coord(realization_coord, 0)
result.add_dim_coord(self.lat_coord, 1)
result.add_dim_coord(self.lon_coord, 2)
result.add_dim_coord(self.lat_coord, 0)
result.add_dim_coord(self.lon_coord, 1)
result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile'))
result.add_aux_coord(time_coord)
return result