# coding=utf-8 from bscearth.utils.date import parse_date, add_months from earthdiagnostics.statistics.climatologicalpercentile import ClimatologicalPercentile from earthdiagnostics.diagnostic import * from earthdiagnostics.frequency import Frequencies import iris import iris.exceptions import iris.coord_categorisation from iris.time import PartialDateTime import iris.analysis import iris.coords import numpy as np from earthdiagnostics.utils import Utils, TempFile class DaysOverPercentile(Diagnostic): """ Calculates the montlhy percentiles :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int :param variable: variable to average :type variable: str """ alias = 'daysover' "Diagnostic alias for the configuration file" def __init__(self, data_manager, domain, variable, start_year, end_year, year_to_compute, forecast_month): Diagnostic.__init__(self, data_manager) self.variable = variable self.domain = domain self.start_year = start_year self.end_year = end_year self.year_to_compute = year_to_compute self.forecast_month = forecast_month self.startdate = '{0}{1:02}01'.format(self.start_year, self.forecast_month) def __eq__(self, other): return self.startdate == other.startdate and self.domain == other.domain and self.variable == other.variable def __str__(self): return 'Days over percentile Startdate: {0} ' \ 'Variable: {1}:{2}'.format(self.startdate, self.domain, self.variable) @classmethod def generate_jobs(cls, diags, options): """ Creates a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: domain, variable, percentil number, maximum depth (level) :type options: list[str] :return: """ options_available = (DiagnosticDomainOption(), DiagnosticOption('variable'), DiagnosticIntOption('start_year'), DiagnosticIntOption('end_year'), DiagnosticListIntOption('forecast_month'),) options = cls.process_options(options, options_available) job_list = list() year = options['start_year'] while year <= options['end_year']: for forecast_month in options['forecast_month']: job_list.append(DaysOverPercentile(diags.data_manager, options['domain'], options['variable'], options['start_year'], options['end_year'], year, forecast_month)) year += 1 return job_list def request_data(self): var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month}'.format(self) self.percentiles_file = self.request_chunk(self.domain, var_name, None, None, None, frequency=Frequencies.climatology) self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, None, None) def declare_data_generated(self): var_over = self.variable + '_daysover_q{0}' var_below = self.variable + '_daysbelow_q{0}' self.days_over_file = {} self.days_below_file = {} for perc in ClimatologicalPercentile.Percentiles: self.days_over_file[perc] = self.declare_chunk(self.domain, var_over.format(int(perc * 100)), self.startdate, None, None, frequency=Frequencies.monthly, vartype=VariableType.STATISTIC) self.days_below_file[perc] = self.declare_chunk(self.domain, var_below.format(int(perc * 100)), self.startdate, None, None, frequency=Frequencies.monthly, vartype=VariableType.STATISTIC) def compute(self): """ Runs the diagnostic """ iris.FUTURE.netcdf_promote = True percentiles = iris.load_cube(self.percentiles_file.local_file) handler = Utils.openCdf(self.variable_file.local_file) if 'realization' in handler.variables: handler.variables[self.variable].coordinates = 'realization' handler.close() var = iris.load_cube(self.variable_file.local_file) date = parse_date(self.startdate) lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar) leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)} def assign_leadtime(coord, x): try: leadtime_month = 1 partial_date = leadtimes[leadtime_month] while coord.units.num2date(x) >= partial_date: leadtime_month += 1 try: partial_date = leadtimes[leadtime_month] except KeyError: new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar) partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day) leadtimes[leadtime_month] = partial_date return leadtime_month except Exception: pass iris.coord_categorisation.add_categorised_coord(var, 'leadtime', 'time', assign_leadtime) iris.coord_categorisation.add_year(var, 'time') iris.coord_categorisation.add_day_of_year(var, 'time') try: realization_coord = var.coord('realization') except iris.exceptions.CoordinateNotFoundError: realization_coord = None lat_coord = var.coord('latitude') lon_coord = var.coord('longitude') results_over = {perc: iris.cube.CubeList() for perc in ClimatologicalPercentile.Percentiles} results_below = {perc: iris.cube.CubeList() for perc in ClimatologicalPercentile.Percentiles} for leadtime in leadtimes.keys(): leadtime_slice = var.extract(iris.Constraint(leadtime=leadtime)) if len(percentiles.coords('leadtime')) >0: percentiles_leadtime = percentiles.extract(iris.Constraint(leadtime=leadtime)) else: percentiles_leadtime = percentiles time_coord = iris.coords.AuxCoord.from_coord(leadtime_slice.coord('time')) first_time = time_coord.points[0] last_time = time_coord.points[-1] timesteps = leadtime_slice.coord('time').shape[0] days = time_coord.units.num2date(last_time) - time_coord.units.num2date(first_time) if days.seconds > 0: days = days.days + 1 else: days = days.days timesteps_per_day = timesteps / days time_coord = time_coord.copy(first_time + (last_time - first_time) / 2, (first_time, last_time)) for percentile_slice in percentiles_leadtime.slices_over('percentile'): percentile = percentile_slice.coord('percentile').points[0] days_over = np.sum(leadtime_slice.data > percentile_slice.data, 0) / float(timesteps_per_day) result = iris.cube.Cube(days_over.astype(np.float32), var_name='daysover', units=1.0) if realization_coord is not None: result.add_aux_coord(realization_coord, 0) result.add_dim_coord(lat_coord, 1) result.add_dim_coord(lon_coord, 2) else: result.add_dim_coord(lat_coord, 0) result.add_dim_coord(lon_coord, 1) result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile')) result.add_aux_coord(time_coord) results_over[percentile].append(result) days_below = np.sum(leadtime_slice.data < percentile_slice.data, 0) / float(timesteps_per_day) result = iris.cube.Cube(days_below.astype(np.float32), var_name='daysbelow', units=1.0) if realization_coord is not None: result.add_aux_coord(realization_coord, 0) result.add_dim_coord(lat_coord, 1) result.add_dim_coord(lon_coord, 2) else: result.add_dim_coord(lat_coord, 0) result.add_dim_coord(lon_coord, 1) result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile')) result.add_aux_coord(time_coord) results_below[percentile].append(result) for perc in ClimatologicalPercentile.Percentiles: iris.FUTURE.netcdf_no_unlimited = True temp = TempFile.get() iris.save(results_over[perc].merge_cube(), temp, zlib=True) Utils.rename_variables(temp, {'dim2': 'ensemble', 'dim1': 'ensemble'}, must_exist=False, rename_dimension=True) self.days_over_file[perc].set_local_file(temp, rename_var='daysover') temp = TempFile.get() iris.save(results_below[perc].merge_cube(), temp, zlib=True) Utils.rename_variables(temp, {'dim2': 'ensemble', 'dim1': 'ensemble'}, must_exist=False, rename_dimension=True) self.days_below_file[perc].set_local_file(temp, rename_var='daysbelow')