# coding=utf-8 from bscearth.utils.date import parse_date, add_months from earthdiagnostics.diagnostic import * from earthdiagnostics.frequency import Frequencies import iris import iris.coord_categorisation from iris.time import PartialDateTime import iris.analysis import iris.coords import numpy as np from earthdiagnostics.utils import Utils, TempFile class DaysOverPercentile(Diagnostic): """ Calculates the montlhy percentiles :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int :param variable: variable to average :type variable: str """ alias = 'daysover' "Diagnostic alias for the configuration file" def __init__(self, data_manager, startdate, member, chunk, domain, variable, start_year, end_year, forecast_month): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.variable = variable self.domain = domain self.start_year = start_year self.end_year = end_year self.forecast_month = forecast_month def __eq__(self, other): return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \ self.domain == other.domain and self.variable == other.variable def __str__(self): return 'Days over percentile Startdate: {0} Member: {1} Chunk: {2} ' \ 'Variable: {3}:{4}'.format(self.startdate, self.member, self.chunk, self.domain, self.variable) @classmethod def generate_jobs(cls, diags, options): """ Creates a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: domain, variable, percentil number, maximum depth (level) :type options: list[str] :return: """ options_available = (DiagnosticDomainOption(), DiagnosticOption('variable'), DiagnosticIntOption('start_year'), DiagnosticIntOption('end_year'), DiagnosticIntOption('forecast_month'),) options = cls.process_options(options, options_available) job_list = list() for startdate, member, chunk in diags.config.experiment.get_chunk_list(): job_list.append(DaysOverPercentile(diags.data_manager, startdate, member, chunk, options['domain'], options['variable'], options['start_year'], options['end_year'], options['forecast_month'],)) return job_list def request_data(self): var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month}'.format(self) self.percentiles_file = self.request_chunk(self.domain, var_name, None, None, None, frequency=Frequencies.climatology) self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk) def declare_data_generated(self): var_over = self.variable + '_daysover' var_below = self.variable + '_daysbelow' self.days_over_file = self.declare_chunk(self.domain, var_over, None, None, None, frequency=Frequencies.monthly, vartype=VariableType.STATISTIC) self.days_below_file = self.declare_chunk(self.domain, var_below, None, None, None, frequency=Frequencies.monthly, vartype=VariableType.STATISTIC) def compute(self): """ Runs the diagnostic """ iris.FUTURE.netcdf_promote = True percentiles = iris.load_cube(self.percentiles_file.local_file) handler = Utils.openCdf(self.variable_file.local_file) if 'realization' in handler.variables: handler.variables[self.variable].coordinates = 'realization' handler.close() var = iris.load_cube(self.variable_file.local_file) date = parse_date(self.startdate) lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar) leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)} def assign_leadtime(coord, x): try: leadtime_month = 1 partial_date = leadtimes[leadtime_month] while coord.units.num2date(x) >= partial_date: leadtime_month += 1 try: partial_date = leadtimes[leadtime_month] except KeyError: new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar) partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day) leadtimes[leadtime_month] = partial_date return leadtime_month except Exception: pass iris.coord_categorisation.add_categorised_coord(var, 'leadtime', 'time', assign_leadtime) iris.coord_categorisation.add_year(var, 'time') iris.coord_categorisation.add_day_of_year(var, 'time') realization_coord = var.coord('realization') lat_coord = var.coord('latitude') lon_coord = var.coord('longitude') results_over = iris.cube.CubeList() results_below = iris.cube.CubeList() for leadtime in leadtimes.keys(): leadtime_slice = var.extract(iris.Constraint(leadtime=leadtime)) timesteps = leadtime_slice.coord('time').shape[0] percentiles_leadtime = percentiles.extract(iris.Constraint(leadtime=leadtime)) for percentile_slice in percentiles_leadtime.slices_over('percentile'): percentile = percentile_slice.coord('percentile').points[0] days_over = np.sum(leadtime_slice.data > percentile_slice.data, 0) / float(timesteps) result = iris.cube.Cube(days_over.astype(np.float32), var_name='daysover', units=1.0) result.add_dim_coord(lat_coord, 1) result.add_dim_coord(lon_coord, 2) result.add_aux_coord(realization_coord, 0) result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile')) result.add_aux_coord(iris.coords.AuxCoord(np.int8(leadtime), long_name='leadtime')) results_over.append(result) days_below = np.sum(leadtime_slice.data < percentile_slice.data, 0) / float(timesteps) result = iris.cube.Cube(days_below.astype(np.float32), var_name='daysbelow', units=1.0) result.add_dim_coord(lat_coord, 1) result.add_dim_coord(lon_coord, 2) result.add_aux_coord(realization_coord, 0) result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile')) result.add_aux_coord(iris.coords.AuxCoord(np.int8(leadtime), long_name='leadtime')) results_below.append(result) iris.FUTURE.netcdf_no_unlimited = True temp = TempFile.get() iris.save(results_over.merge_cube(), temp, zlib=True) Utils.rename_variable(temp, 'dim2', 'ensemble', must_exist=False, rename_dimension=True) self.days_over_file.set_local_file(temp, rename_var='daysover') temp = TempFile.get() iris.save(results_below.merge_cube(), temp, zlib=True) Utils.rename_variable(temp, 'dim2', 'ensemble', must_exist=False, rename_dimension=True) self.days_below_file.set_local_file(temp, rename_var='daysbelow')