Newer
Older
# coding=utf-8
from bscearth.utils.date import parse_date, add_months
from earthdiagnostics.statistics.climatologicalpercentile import ClimatologicalPercentile
from earthdiagnostics.diagnostic import *
from earthdiagnostics.frequency import Frequencies
import iris
import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.analysis
import iris.coords
import numpy as np
from earthdiagnostics.utils import Utils, TempFile
class DaysOverPercentile(Diagnostic):
"""
Calculates the montlhy percentiles
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable to average
:type variable: str
"""
alias = 'daysover'
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, domain, variable, start_year, end_year, year_to_compute, forecast_month):
Diagnostic.__init__(self, data_manager)
self.variable = variable
self.domain = domain
self.start_year = start_year
self.end_year = end_year
self.year_to_compute = year_to_compute
self.startdate = '{0}{1:02}01'.format(self.year_to_compute, self.forecast_month)
def __eq__(self, other):
return self.startdate == other.startdate and self.domain == other.domain and self.variable == other.variable
return 'Days over percentile Startdate: {0} ' \
'Variable: {1}:{2} Climatology: {3}-{4}'.format(self.startdate, self.domain, self.variable,
self.start_year, self.end_year)
@classmethod
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: domain, variable, percentil number, maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(),
DiagnosticOption('variable'),
DiagnosticIntOption('start_year'),
DiagnosticIntOption('end_year'),
DiagnosticListIntOption('forecast_month'),)
options = cls.process_options(options, options_available)
job_list = list()
year = options['start_year']
while year <= options['end_year']:
for forecast_month in options['forecast_month']:
job_list.append(DaysOverPercentile(diags.data_manager, options['domain'], options['variable'],
options['start_year'], options['end_year'],
year, forecast_month))
year += 1
return job_list
def request_data(self):
var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month}'.format(self)
self.percentiles_file = self.request_chunk(self.domain, var_name, None, None, None,
frequency=Frequencies.climatology)
self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, None, None)
def declare_data_generated(self):
var_over = self.variable + '_daysover_q{0}'
var_below = self.variable + '_daysbelow_q{0}'
self.days_over_file = {}
self.days_below_file = {}
for perc in ClimatologicalPercentile.Percentiles:
self.days_over_file[perc] = self.declare_chunk(self.domain, var_over.format(int(perc * 100)), self.startdate, None,
None, frequency=Frequencies.monthly,
vartype=VariableType.STATISTIC)
self.days_below_file[perc] = self.declare_chunk(self.domain, var_below.format(int(perc * 100)), self.startdate, None,
None, frequency=Frequencies.monthly,
vartype=VariableType.STATISTIC)
def compute(self):
"""
Runs the diagnostic
"""
percentiles = iris.load_cube(self.percentiles_file.local_file)
handler = Utils.openCdf(self.variable_file.local_file)
if 'realization' in handler.variables:
handler.variables[self.variable].coordinates = 'realization'
handler.close()
var = iris.load_cube(self.variable_file.local_file)
lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar)
leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)}
def assign_leadtime(coord, x):
try:
partial_date = leadtimes[leadtime_month]
while coord.units.num2date(x) >= partial_date:
leadtime_month += 1
try:
partial_date = leadtimes[leadtime_month]
except KeyError:
new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar)
partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day)
leadtimes[leadtime_month] = partial_date
return leadtime_month
iris.coord_categorisation.add_categorised_coord(var, 'leadtime', 'time', assign_leadtime)
iris.coord_categorisation.add_year(var, 'time')
iris.coord_categorisation.add_day_of_year(var, 'time')
try:
realization_coord = var.coord('realization')
except iris.exceptions.CoordinateNotFoundError:
realization_coord = None
lat_coord = var.coord('latitude')
lon_coord = var.coord('longitude')
results_over = {perc: iris.cube.CubeList() for perc in ClimatologicalPercentile.Percentiles}
results_below = {perc: iris.cube.CubeList() for perc in ClimatologicalPercentile.Percentiles}
var_daysover = 'daysover'
var_days_below = 'daysbelow'
long_name_days_over = 'Proportion of days over a given percentile for {0.start_year}-{0.end_year} ' \
'climatology'.format(self)
long_name_days_below = 'Proportion of days below a given percentile for {0.start_year}-{0.end_year} ' \
'climatology'.format(self)
for leadtime in leadtimes.keys():
leadtime_slice = var.extract(iris.Constraint(leadtime=leadtime))
if len(percentiles.coords('leadtime')) >0:
percentiles_leadtime = percentiles.extract(iris.Constraint(leadtime=leadtime))
else:
percentiles_leadtime = percentiles
time_coord = iris.coords.AuxCoord.from_coord(leadtime_slice.coord('time'))
first_time = time_coord.points[0]
last_time = time_coord.points[-1]
timesteps = leadtime_slice.coord('time').shape[0]
time_coord = time_coord.copy(first_time + (last_time - first_time) / 2, (first_time, last_time))
for percentile_slice in percentiles_leadtime.slices_over('percentile'):
percentile = percentile_slice.coord('percentile').points[0]
days_over = np.sum(leadtime_slice.data > percentile_slice.data, 0) / float(timesteps)
result = self.create_results_cube(days_over, lat_coord, lon_coord, percentile, realization_coord,
time_coord, var_daysover, long_name_days_over)
days_below = np.sum(leadtime_slice.data < percentile_slice.data, 0) / float(timesteps)
result = self.create_results_cube(days_below, lat_coord, lon_coord, percentile, realization_coord,
time_coord, var_days_below, long_name_days_below)
results_below[percentile].append(result)
for perc in ClimatologicalPercentile.Percentiles:
iris.FUTURE.netcdf_no_unlimited = True
temp = TempFile.get()
iris.save(results_over[perc].merge_cube(), temp, zlib=True, unlimited_dimensions=['time'])
Utils.rename_variables(temp, {'dim2': 'ensemble', 'dim1': 'ensemble'},
must_exist=False, rename_dimension=True)
self.days_over_file[perc].set_local_file(temp, rename_var='daysover')
temp = TempFile.get()
iris.save(results_below[perc].merge_cube(), temp, zlib=True, unlimited_dimensions=['time'])
Utils.rename_variables(temp, {'dim2': 'ensemble', 'dim1': 'ensemble'},
must_exist=False, rename_dimension=True)
self.days_below_file[perc].set_local_file(temp, rename_var='daysbelow')
def create_results_cube(self, days_over, lat_coord, lon_coord, percentile, realization_coord, time_coord,
var_name, long_name):
result = iris.cube.Cube(days_over.astype(np.float32), var_name=var_name, long_name=long_name, units=1.0)
if realization_coord is not None:
result.add_aux_coord(realization_coord, 0)
result.add_dim_coord(lat_coord, 1)
result.add_dim_coord(lon_coord, 2)
else:
result.add_dim_coord(lat_coord, 0)
result.add_dim_coord(lon_coord, 1)
result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile'))
result.add_aux_coord(time_coord)
return result