Commit 7de0c4b8 authored by Javier Vegas-Regidor's avatar Javier Vegas-Regidor
Browse files

Added discretize

parent 1a26ef5b
......@@ -16,7 +16,7 @@ CON_FILES = /esnas/autosubmit/con_files/
# Diagnostics to run, space separated. You must provide for each one the name and the parameters (comma separated) or
# an alias defined in the ALIAS section (see more below). If you are using the diagnostics just to CMORize, leave it
# empty
DIAGS = climpercent,atmos,sfcWind,2000,2000,1 daysover,atmos,sfcWind,2000,2000,1
DIAGS = discretize,atmos,sfcWind,500,0,40 climpercent,atmos,sfcWind,1984,1984,1
# DIAGS = OHC
# Frequency of the data you want to use by default. Some diagnostics do not use this value: i.e. monmean always stores
# its results at monthly frequency (obvious) and has a parameter to specify input's frequency.
......@@ -87,7 +87,7 @@ OCEAN_TIMESTEP = 6
# CHUNK_SIZE is the size of each data file, given in months
# CHUNKS is the number of chunks. You can specify less chunks than present on the experiment
EXPID = testing_erainterim
STARTDATES = 20000101
STARTDATES = 19840101
MEMBERS = 0
MEMBER_DIGITS = 1
CHUNK_SIZE = 1
......
......@@ -201,8 +201,8 @@ class Diagnostic(Publisher):
return 'Developer must override base class __str__ method'
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
to_modify=False):
request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency)
to_modify=False, vartype=VariableType.MEAN):
request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency, vartype)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
......
......@@ -38,6 +38,8 @@ class Frequency(object):
freq_str = 'clim'
elif self in (Frequencies.three_hourly, Frequencies.six_hourly, Frequencies.hourly):
freq_str = self.frequency[:-2] + 'hourly'
if vartype != VariableType.MEAN:
freq_str = '{0}_{1}'.format(freq_str, VariableType.to_str(vartype))
else:
freq_str = 'monthly_{0}'.format(VariableType.to_str(vartype))
return freq_str
......
# coding=utf-8
import os
from bscearth.utils.log import Log
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.variable_type import VariableType
......@@ -221,6 +223,7 @@ class ObsReconManager(DataManager):
"""
var = self._get_final_var_name(box, var)
filepath = self.get_file_path(startdate, domain, var, frequency, vartype, grid, box)
Log.debug('{0} requested', filepath)
return self._get_file_from_storage(filepath)
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
......@@ -263,5 +266,6 @@ class ObsReconManager(DataManager):
netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
region, diagnostic, grid, vartype, original_name)
netcdf_file.frequency = frequency
Log.debug('{0} will be generated', filepath)
return netcdf_file
......@@ -2,3 +2,4 @@
from monthlypercentile import MonthlyPercentile
from climatologicalpercentile import ClimatologicalPercentile
from daysoverpercentile import DaysOverPercentile
from discretize import Discretize
......@@ -34,52 +34,27 @@ class ClimatologicalPercentile(Diagnostic):
Percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
def __init__(self, data_manager, domain, variable, num_bins, start_year, end_year, min_value, max_value,
def __init__(self, data_manager, domain, variable, start_year, end_year,
forecast_month, experiment_config):
Diagnostic.__init__(self, data_manager)
self.variable = variable
self.domain = domain
self.experiment_config = experiment_config
self.realizations = None
self.lat_coord = None
self.lon_coord = None
self.num_bins = num_bins
self._bins = None
self.start_year = start_year
self.end_year = end_year
self.forecast_month = forecast_month
self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True)
if not math.isnan(min_value):
self.min_value = min_value
self.check_min_value = False
elif self.cmor_var and self.cmor_var.valid_min:
self.min_value = float(self.cmor_var.valid_min)
self.check_min_value = False
else:
self.min_value = None
self.check_min_value = True
if not math.isnan(max_value):
self.max_value = max_value
self.check_max_value = False
elif self.cmor_var and self.cmor_var.valid_min:
self.max_value = float(self.cmor_var.valid_max)
self.check_max_value = False
else:
self.max_value = None
self.check_max_value = True
def __eq__(self, other):
return self.domain == other.domain and self.variable == other.variable and self.num_bins == other.num_bins and \
self.min_value == other.min_value and self.max_value == other.max_value and \
return self.domain == other.domain and self.variable == other.variable and \
self.start_year == other.start_year and self.end_year == other.end_year and \
self.forecast_month == other.forecast_month
def __str__(self):
return 'Climatological percentile Variable: {0.domain}:{0.variable} Period: {0.start_year}-{0.end_year} ' \
'Forecast month: {0.forecast_month} Bins: {0.num_bins}'.format(self)
'Forecast month: {0.forecast_month}'.format(self)
@classmethod
def generate_jobs(cls, diags, options):
......@@ -97,17 +72,13 @@ class ClimatologicalPercentile(Diagnostic):
DiagnosticIntOption('start_year'),
DiagnosticIntOption('end_year'),
DiagnosticListIntOption('forecast_month'),
DiagnosticIntOption('bins', 2000),
DiagnosticFloatOption('min_value', float('nan')),
DiagnosticFloatOption('max_value', float('nan')),
)
options = cls.process_options(options, options_available)
job_list = list()
for forecast_month in options['forecast_month']:
job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'],
options['bins'], options['start_year'], options['end_year'],
options['min_value'], options['max_value'],
options['start_year'], options['end_year'],
forecast_month, diags.config.experiment))
return job_list
......@@ -120,10 +91,11 @@ class ClimatologicalPercentile(Diagnostic):
if startdate not in self.leadtime_files:
self.leadtime_files[startdate] = {}
Log.debug('Retrieving startdate {0}', startdate)
self.leadtime_files[startdate] = self.request_chunk(self.domain, self.variable, startdate, None, None)
self.leadtime_files[startdate] = self.request_chunk(self.domain, '{0}_dis'.format(self.variable), startdate,
None, None, vartype=VariableType.STATISTIC)
def declare_data_generated(self):
var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month}'.format(self)
var_name = '{0.variable}prct{0.start_year}{0.forecast_month}-{0.end_year}{0.forecast_month:2d}'.format(self)
self.percentiles_file = self.declare_chunk(self.domain, var_name, None, None, None,
frequency=Frequencies.climatology, vartype=VariableType.STATISTIC)
......@@ -132,48 +104,9 @@ class ClimatologicalPercentile(Diagnostic):
Runs the diagnostic
"""
iris.FUTURE.netcdf_promote = True
for startdate in self.leadtime_files.keys():
Log.debug('Getting data for startdate {0}', startdate)
data_cube = self._load_cube(startdate)
self._get_value_interval(data_cube)
self._get_realizations_present(data_cube)
if self.lat_coord is None:
self.units = data_cube.units
self.lat_coord = data_cube.coord('latitude')
self.lon_coord = data_cube.coord('longitude')
Log.info('Range: [{0}, {1}]', self.min_value, self.max_value)
distribution = self._get_distribution()
percentile_values = self._calculate_percentiles(distribution)
percentile_values = self._calculate_percentiles()
self._save_results(percentile_values)
def _load_cube(self, startdate):
handler = Utils.openCdf(self.leadtime_files[startdate].local_file)
if 'realization' in handler.variables:
handler.variables[self.variable].coordinates = 'realization'
handler.close()
data_cube = iris.load_cube(self.leadtime_files[startdate].local_file)
date = parse_date(startdate)
lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar)
leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)}
def assign_leadtime(coord, x):
leadtime_month = 1
partial_date = leadtimes[leadtime_month]
while coord.units.num2date(x) >= partial_date:
leadtime_month += 1
try:
partial_date = leadtimes[leadtime_month]
except KeyError:
new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar)
partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day)
leadtimes[leadtime_month] = partial_date
return leadtime_month
iris.coord_categorisation.add_categorised_coord(data_cube, 'leadtime', 'time', assign_leadtime)
return data_cube
def _save_results(self, percentile_values):
temp = TempFile.get()
percentile_coord = iris.coords.DimCoord(ClimatologicalPercentile.Percentiles, long_name='percentile')
......@@ -190,9 +123,9 @@ class ClimatologicalPercentile(Diagnostic):
self.percentiles_file.set_local_file(temp, rename_var='percent')
def _calculate_percentiles(self, distribution):
def _calculate_percentiles(self):
Log.debug('Calculating percentiles')
percentiles = {}
percentiles = iris.cube.CubeList()
def calculate(point_distribution):
cs = np.cumsum(point_distribution)
......@@ -200,79 +133,16 @@ class ClimatologicalPercentile(Diagnostic):
percentile_values = ClimatologicalPercentile.Percentiles * total
index = np.searchsorted(cs, percentile_values)
return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index]
for leadtime, dist in six.iteritems(distribution):
Log.debug('Calculating leadtime {0}', leadtime)
percentiles[leadtime] = np.apply_along_axis(calculate, 0, dist)
return percentiles
for leadtime_slice in self.distribution.slices_over('leadtime'):
percentiles.append(np.apply_along_axis(calculate, 0, leadtime_slice.data))
return percentiles.merge_cube()
def _get_distribution(self):
distribution = {}
for startdate in self.leadtime_files:
Log.info('Getting data for startdate {0}', startdate)
data_cube = self._load_cube(startdate)
Log.debug('Discretizing file {0}', data_cube)
for leadtime in set(data_cube.coord('leadtime').points):
Log.debug('Discretizing leadtime {0}', leadtime)
leadtime_cube = data_cube.extract(iris.Constraint(leadtime=leadtime))
for realization in range(self.realizations):
Log.debug('Discretizing realization {0}', realization)
try:
realization_cube = leadtime_cube.extract(iris.Constraint(realization=realization+1))
except iris.exceptions.CoordinateNotFoundError:
realization_cube = leadtime_cube
if realization_cube is None and realization == 0:
realization_cube = leadtime_cube
if leadtime not in distribution:
distribution[leadtime] = self._calculate_distribution(realization_cube)
else:
distribution[leadtime] += self._calculate_distribution(realization_cube)
return distribution
def _get_realizations_present(self, data_cube):
realizations = 1
ensemble_dimensions = ('realization', 'ensemble')
for ensemble_dimension in ensemble_dimensions:
try:
realizations = data_cube.coord(ensemble_dimension).shape[0]
break
except iris.exceptions.CoordinateNotFoundError:
pass
if self.realizations is None:
self.realizations = realizations
if realizations != self.realizations:
# noinspection PyNoneFunctionAssignment
self.realizations = min(self.realizations, realizations)
Log.warning('Different number of realizations in the data used by diagnostic {0}', self)
def _get_value_interval(self, data_cube):
if self.check_min_value or self.check_max_value:
for time_slice in data_cube.slices_over('time'):
if self.check_min_value:
file_min = np.amin(time_slice.data)
if self.min_value is None:
self.min_value = file_min
self.min_value = min(self.min_value, file_min)
if self.check_max_value:
file_max = np.amax(time_slice.data)
self.max_value = max(self.min_value, file_max)
def _calculate_distribution(self, data_cube):
def calculate_histogram(time_series):
histogram, self._bins = np.histogram(time_series, bins=self.num_bins,
range=(self.min_value, self.max_value))
return histogram
return np.apply_along_axis(calculate_histogram, 0, data_cube.data)
data_cube = iris.load_cube(startdate.local_file)
if self.distribution is None:
self.distribution = data_cube
else:
self.distribution += data_cube
# coding=utf-8
import six
from bscearth.utils.date import parse_date, add_months
from bscearth.utils.log import Log
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \
DiagnosticIntOption, DiagnosticListIntOption, DiagnosticFloatOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable_type import VariableType
import numpy as np
import iris
from iris.cube import Cube
import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.exceptions
import iris.coords
import math
class Discretize(Diagnostic):
"""
Discretizes a variable
:param data_manager: data management object
:type data_manager: DataManager
:param variable: variable to average
:type variable: str
"""
alias = 'discretize'
"Diagnostic alias for the configuration file"
Percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
def __init__(self, data_manager, startdate, domain, variable, num_bins, min_value, max_value):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.variable = variable
self.domain = domain
self.realizations = None
self.num_bins = num_bins
self._bins = None
self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True)
if not math.isnan(min_value):
self.min_value = min_value
self.check_min_value = False
elif self.cmor_var and self.cmor_var.valid_min:
self.min_value = float(self.cmor_var.valid_min)
self.check_min_value = False
else:
self.min_value = None
self.check_min_value = True
if not math.isnan(max_value):
self.max_value = max_value
self.check_max_value = False
elif self.cmor_var and self.cmor_var.valid_min:
self.max_value = float(self.cmor_var.valid_max)
self.check_max_value = False
else:
self.max_value = None
self.check_max_value = True
@property
def bins(self):
if self._bins is None:
return self.num_bins
return self._bins
@bins.setter
def bins(self, value):
self._bins = value
def __eq__(self, other):
return self.domain == other.domain and self.variable == other.variable and self.num_bins == other.num_bins and \
self.min_value == other.min_value and self.max_value == other.max_value and \
self.startdate == other.startdate
def __str__(self):
return 'Discretizing variable: {0.domain}:{0.variable} Startdate: {0.startdate} ' \
'Bins: {0.num_bins} Range: [{0.min_value}, {0.max_value}]'.format(self)
@classmethod
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: domain, variable, percentil number, maximum depth (level)
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(),
DiagnosticVariableOption(),
DiagnosticIntOption('bins', 2000),
DiagnosticFloatOption('min_value', float('nan')),
DiagnosticFloatOption('max_value', float('nan')),
)
options = cls.process_options(options, options_available)
job_list = list()
for startdate in diags.config.experiment.startdates:
job_list.append(Discretize(diags.data_manager, startdate, options['domain'], options['variable'],
options['bins'], options['min_value'], options['max_value']))
return job_list
def request_data(self):
self.original_data = self.request_chunk(self.domain, self.variable, self.startdate, None, None)
def declare_data_generated(self):
var_name = '{0.variable}_dis'.format(self)
self.discretized_data = self.declare_chunk(self.domain, var_name, self.startdate, None, None,
vartype=VariableType.STATISTIC)
def compute(self):
"""
Runs the diagnostic
"""
iris.FUTURE.netcdf_promote = True
self._load_cube()
self._get_value_interval()
Log.info('Range: [{0}, {1}]', self.min_value, self.max_value)
self._get_distribution()
self._save_results()
def _load_cube(self):
handler = Utils.openCdf(self.original_data.local_file)
if 'realization' in handler.variables:
handler.variables[self.variable].coordinates = 'realization'
handler.close()
data_cube = iris.load_cube(self.original_data.local_file)
date = parse_date(self.startdate)
lead_date = add_months(date, 1, self.data_manager.config.experiment.calendar)
leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)}
def assign_leadtime(coord, x):
leadtime_month = 1
partial_date = leadtimes[leadtime_month]
while coord.units.num2date(x) >= partial_date:
leadtime_month += 1
try:
partial_date = leadtimes[leadtime_month]
except KeyError:
new_date = add_months(date, leadtime_month, self.data_manager.config.experiment.calendar)
partial_date = PartialDateTime(new_date.year, new_date.month, new_date.day)
leadtimes[leadtime_month] = partial_date
return leadtime_month
iris.coord_categorisation.add_categorised_coord(data_cube, 'leadtime', 'time', assign_leadtime)
self.data_cube = data_cube
def _save_results(self):
Log.debug('Saving results...')
bins = np.zeros(self.num_bins)
bins_bounds = np.zeros((self.num_bins, 2))
for x in range(self.num_bins):
bins[x] = (self.bins[x+1] - self.bins[x]) / 2 + self.bins[x]
bins_bounds[x, 0] = self.bins[x]
bins_bounds[x, 1] = self.bins[x+1]
bins_coord = iris.coords.DimCoord(bins, var_name='bin', units=self.data_cube.units, bounds=bins_bounds)
cubes = iris.cube.CubeList()
for leadtime, distribution in self.distribution.iteritems():
leadtime_cube = Cube(distribution.astype(np.uint32), var_name=self.data_cube.var_name,
standard_name=self.data_cube.standard_name, units='1')
leadtime_cube.add_dim_coord(bins_coord, 0)
leadtime_cube.add_dim_coord(self.data_cube.coord('latitude'), 1)
leadtime_cube.add_dim_coord(self.data_cube.coord('longitude'), 2)
leadtime_cube.add_aux_coord(iris.coords.AuxCoord(np.array((leadtime,), np.int8), var_name='leadtime', units='months'))
cubes.append(leadtime_cube)
temp = TempFile.get()
iris.FUTURE.netcdf_no_unlimited = True
iris.save(cubes.merge_cube(), temp, zlib=True)
self.discretized_data.set_local_file(temp, rename_var=self.data_cube.var_name)
def _get_distribution(self):
self.distribution = {}
Log.debug('Discretizing...')
for leadtime in set(self.data_cube.coord('leadtime').points):
Log.debug('Discretizing leadtime {0}', leadtime)
leadtime_cube = self.data_cube.extract(iris.Constraint(leadtime=leadtime))
for realization in self.data_cube.coord('realization').points:
Log.debug('Discretizing realization {0}', realization)
try:
realization_cube = leadtime_cube.extract(iris.Constraint(realization=realization))
except iris.exceptions.CoordinateNotFoundError:
realization_cube = leadtime_cube
if realization_cube is None and realization == 0:
realization_cube = leadtime_cube
if leadtime not in self.distribution:
self.distribution[leadtime] = self._calculate_distribution(realization_cube)
else:
self.distribution[leadtime] += self._calculate_distribution(realization_cube)
def _get_value_interval(self):
if self.check_min_value or self.check_max_value:
Log.debug('Calculating max and min values...')
for time_slice in self.data_cube.slices_over('time'):
if self.check_min_value:
file_min = np.amin(time_slice.data)
if self.min_value is None:
self.min_value = file_min
self.min_value = min(self.min_value, file_min)
if self.check_max_value:
file_max = np.amax(time_slice.data)
self.max_value = max(self.max_value, file_max)
def _calculate_distribution(self, data_cube):
def calculate_histogram(time_series):
histogram, self.bins = np.histogram(time_series, bins=self.bins,
range=(self.min_value, self.max_value))
return histogram
return np.apply_along_axis(calculate_histogram, 0, data_cube.data)
......@@ -182,6 +182,7 @@ class WorkManager(object):
Diagnostic.register(MonthlyPercentile)
Diagnostic.register(ClimatologicalPercentile)
Diagnostic.register(DaysOverPercentile)
Diagnostic.register(Discretize)
@staticmethod
def _register_general_diagnostics():
......
......@@ -25,32 +25,18 @@ class TestClimatologicalPercentile(TestCase):
jobs = ClimatologicalPercentile.generate_jobs(self.diags, ['climpercent', 'ocean', 'var', '2000', '2001', '11'])
self.assertEqual(len(jobs), 1)
self.assertEqual(jobs[0], ClimatologicalPercentile(self.data_manager, ModelingRealms.ocean, 'var',
2000, 2000, 2001, float('nan'), float('nan'), 11,
self.diags.config.experiment))
jobs = ClimatologicalPercentile.generate_jobs(self.diags, ['climpercent', 'ocean', 'var', '2000', '2001', '11',
'', '0', '40'])
self.assertEqual(len(jobs), 1)
self.assertEqual(jobs[0], ClimatologicalPercentile(self.data_manager, ModelingRealms.ocean, 'var',
2000, 2000, 2001, 0.0, 40.0, 11,
self.diags.config.experiment))