Newer
Older
# coding=utf-8
from bscearth.utils.date import parse_date, add_months
from earthdiagnostics.diagnostic import *
from earthdiagnostics.frequency import Frequencies
import iris
import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.analysis
import iris.coords
import numpy as np
from earthdiagnostics.utils import Utils, TempFile
class DaysOverPercentile(Diagnostic):
"""
Calculates the montlhy percentiles
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable to average
:type variable: str
"""
alias = 'daysover'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, startdate, member, chunk, domain, variable, start_year, end_year, forecast_month):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.start_year = start_year
self.end_year = end_year
self.forecast_month = forecast_month
def __eq__(self, other):
return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
self.domain == other.domain and self.variable == other.variable
def __str__(self):
return 'Days over percentile Startdate: {0} Member: {1} Chunk: {2} ' \
'Variable: {3}:{4}'.format(self.startdate, self.member, self.chunk, self.domain, self.variable)
@classmethod
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: domain, variable, percentil number, maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(),
DiagnosticOption('variable'),
DiagnosticIntOption('start_year'),
DiagnosticIntOption('end_year'),
DiagnosticIntOption('forecast_month'),)
options = cls.process_options(options, options_available)
job_list = list()
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job_list.append(DaysOverPercentile(diags.data_manager, startdate, member, chunk,
options['domain'], options['variable'],
options['start_year'], options['end_year'], options['forecast_month'],))
return job_list
def request_data(self):
var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month}'.format(self)
self.percentiles_file = self.request_chunk(self.domain, var_name, None, None, None,
frequency=Frequencies.climatology)
self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk)
def declare_data_generated(self):
var_over = self.variable + '_daysover'
var_below = self.variable + '_daysbelow'
self.days_over_file = self.declare_chunk(self.domain, var_over, None, None, None,
frequency=Frequencies.monthly, vartype=VariableType.STATISTIC)
self.days_below_file = self.declare_chunk(self.domain, var_below, None, None, None,
frequency=Frequencies.monthly, vartype=VariableType.STATISTIC)
def compute(self):
"""
Runs the diagnostic
"""
percentiles = iris.load_cube(self.percentiles_file.local_file)
handler = Utils.openCdf(self.variable_file.local_file)
if 'realization' in handler.variables:
handler.variables[self.variable].coordinates = 'realization'
handler.close()
var = iris.load_cube(self.variable_file.local_file)
lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar)
leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)}
def assign_leadtime(coord, x):
try:
partial_date = leadtimes[leadtime_month]
while coord.units.num2date(x) >= partial_date:
leadtime_month += 1
try:
partial_date = leadtimes[leadtime_month]
except KeyError:
new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar)
partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day)
leadtimes[leadtime_month] = partial_date
return leadtime_month
iris.coord_categorisation.add_categorised_coord(var, 'leadtime', 'time', assign_leadtime)
iris.coord_categorisation.add_year(var, 'time')
iris.coord_categorisation.add_day_of_year(var, 'time')
realization_coord = var.coord('realization')
lat_coord = var.coord('latitude')
lon_coord = var.coord('longitude')
results_over = iris.cube.CubeList()
results_below = iris.cube.CubeList()
for leadtime in leadtimes.keys():
leadtime_slice = var.extract(iris.Constraint(leadtime=leadtime))
timesteps = leadtime_slice.coord('time').shape[0]
percentiles_leadtime = percentiles.extract(iris.Constraint(leadtime=leadtime))
for percentile_slice in percentiles_leadtime.slices_over('percentile'):
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
percentile = percentile_slice.coord('percentile').points[0]
days_over = np.sum(leadtime_slice.data > percentile_slice.data, 0) / float(timesteps)
result = iris.cube.Cube(days_over.astype(np.float32), var_name='daysover', units=1.0)
result.add_dim_coord(lat_coord, 1)
result.add_dim_coord(lon_coord, 2)
result.add_aux_coord(realization_coord, 0)
result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile'))
result.add_aux_coord(iris.coords.AuxCoord(np.int8(leadtime), long_name='leadtime'))
results_over.append(result)
days_below = np.sum(leadtime_slice.data < percentile_slice.data, 0) / float(timesteps)
result = iris.cube.Cube(days_below.astype(np.float32), var_name='daysbelow', units=1.0)
result.add_dim_coord(lat_coord, 1)
result.add_dim_coord(lon_coord, 2)
result.add_aux_coord(realization_coord, 0)
result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile'))
result.add_aux_coord(iris.coords.AuxCoord(np.int8(leadtime), long_name='leadtime'))
results_below.append(result)
iris.FUTURE.netcdf_no_unlimited = True
temp = TempFile.get()
iris.save(results_over.merge_cube(), temp, zlib=True)
Utils.rename_variable(temp, 'dim2', 'ensemble', must_exist=False, rename_dimension=True)
self.days_over_file.set_local_file(temp, rename_var='daysover')
temp = TempFile.get()
iris.save(results_below.merge_cube(), temp, zlib=True)
Utils.rename_variable(temp, 'dim2', 'ensemble', must_exist=False, rename_dimension=True)
self.days_below_file.set_local_file(temp, rename_var='daysbelow')