Newer
Older
# coding=utf-8
from bscearth.utils.date import parse_date, add_months
from bscearth.utils.log import Log
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \
Javier Vegas-Regidor
committed
DiagnosticIntOption, DiagnosticListIntOption, DiagnosticFloatOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable_type import VariableType
import numpy as np
import iris
import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.exceptions
import iris.coords
Javier Vegas-Regidor
committed
import math
class ClimatologicalPercentile(Diagnostic):
"""
Calculates the climatological percentiles for the given leadtime
:param data_manager: data management object
:type data_manager: DataManager
:param variable: variable to average
:type variable: str
:param experiment_config:
:type experiment_config: ExperimentConfig
"""
alias = 'climpercent'
"Diagnostic alias for the configuration file"
Percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
def __init__(self, data_manager, domain, variable, start_year, end_year,
Javier Vegas-Regidor
committed
forecast_month, experiment_config):
Diagnostic.__init__(self, data_manager)
self.variable = variable
self.domain = domain
self.experiment_config = experiment_config
Javier Vegas-Regidor
committed
self.start_year = start_year
self.end_year = end_year
self.forecast_month = forecast_month
self.distribution = None
Javier Vegas-Regidor
committed
self.leadtime_files = {}
def __eq__(self, other):
return self.domain == other.domain and self.variable == other.variable and \
self.start_year == other.start_year and self.end_year == other.end_year and \
self.forecast_month == other.forecast_month
def __str__(self):
return 'Climatological percentile Variable: {0.domain}:{0.variable} Period: {0.start_year}-{0.end_year} ' \
'Forecast month: {0.forecast_month}'.format(self)
@classmethod
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: domain, variable, percentil number, maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(),
DiagnosticVariableOption(),
DiagnosticIntOption('start_year'),
DiagnosticIntOption('end_year'),
DiagnosticListIntOption('forecast_month'),
Javier Vegas-Regidor
committed
options = cls.process_options(options, options_available)
job_list = list()
for forecast_month in options['forecast_month']:
job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'],
forecast_month, diags.config.experiment))
return job_list
def requested_startdates(self):
return ['{0}{1:02}01'.format(year, self.forecast_month) for year in range(self.start_year, self.end_year+1)]
if startdate not in self.leadtime_files:
self.leadtime_files[startdate] = {}
Log.debug('Retrieving startdate {0}', startdate)
self.leadtime_files[startdate] = self.request_chunk(self.domain, '{0}_dis'.format(self.variable), startdate,
None, None, vartype=VariableType.STATISTIC)
var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month:02d}'.format(self)
self.percentiles_file = self.declare_chunk(self.domain, var_name, None, None, None,
frequency=Frequencies.climatology, vartype=VariableType.STATISTIC)
def compute(self):
"""
Runs the diagnostic
"""
iris.FUTURE.netcdf_promote = True
self._get_distribution()
percentile_values = self._calculate_percentiles()
Javier Vegas-Regidor
committed
self._save_results(percentile_values)
def _save_results(self, percentile_values):
temp = TempFile.get()
percentile_coord = iris.coords.DimCoord(ClimatologicalPercentile.Percentiles, long_name='percentile')
for leadtime, data in percentile_values.items():
result = iris.cube.Cube(data, var_name='percent',
units=self.distribution.coord('bin').units)
result.add_dim_coord(percentile_coord, 0)
result.add_dim_coord(self.distribution.coord('latitude'), 1)
result.add_dim_coord(self.distribution.coord('longitude'), 2)
result.add_aux_coord(iris.coords.AuxCoord(np.int8(leadtime), long_name='leadtime'))
results.append(result)
iris.FUTURE.netcdf_no_unlimited = True
iris.save(results.merge_cube(), temp, zlib=True)
Javier Vegas-Regidor
committed
self.percentiles_file.set_local_file(temp, rename_var='percent')
Javier Vegas-Regidor
committed
Log.debug('Calculating percentiles')
percentiles = {}
bins = self.distribution.coord('bin').points
Javier Vegas-Regidor
committed
def calculate(point_distribution):
Javier Vegas-Regidor
committed
cs = np.cumsum(point_distribution)
total = cs[-1]
percentile_values = ClimatologicalPercentile.Percentiles * total
Javier Vegas-Regidor
committed
index = np.searchsorted(cs, percentile_values)
return [bins[i] for i in index]
for leadtime_slice in self.distribution.slices_over('leadtime'):
leadtime = leadtime_slice.coord('leadtime').points[0]
percentiles[leadtime]=np.apply_along_axis(calculate, 0, leadtime_slice.data)
return percentiles
Javier Vegas-Regidor
committed
def _get_distribution(self):
for startdate, startdate_file in self.leadtime_files.iteritems():
Log.info('Getting data for startdate {0}', startdate)
data_cube = iris.load_cube(startdate_file.local_file)
if self.distribution is None:
self.distribution = data_cube
else:
self.distribution += data_cube