Newer
Older
"""This module contains the Diagnostic base class and all the classes for parsing the options passed to them"""
from bscearth.utils.log import Log
from earthdiagnostics.constants import Basins, Basin
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.publisher import Publisher
from earthdiagnostics.variable import VariableType
Javier Vegas-Regidor
committed
WAITING = 0
READY = 1
RUNNING = 2
COMPLETED = 3
FAILED = 4
Base class for the diagnostics.
Provides a common interface for them and also has a mechanism that allows diagnostic retrieval by name.
:param data_manager: data manager that will be used to store and retrieve the necessary data
:type data_manager: DataManager
Javier Vegas-Regidor
committed
alias = None
""" Alias to call the diagnostic. Must be overridden at the derived clases"""
Parameters
----------
data_manager: DataManager
self._status = DiagnosticStatus.WAITING
self._requests = []
self.consumed_time = datetime.timedelta()
def __ne__(self, other):
"""
Check if a diagnostic is different than other
Implementation is just the negation of the equal, that should be implemented by the derived classes
Parameters
----------
Diagnostic to be compared
Returns
-------
bool
"""
return not self == other
def __hash__(self):
return hash(str(self))
def can_skip_run(self):
"""
Check if a diagnostic calculation can be skipped
Looks if the data to be generated is already there and is not going to be modified
Returns
-------
bool
"""
if not self._generated_files:
return False
for file_generated in self._generated_files:
if file_generated.storage_status != StorageStatus.READY:
return False
if file_generated.has_modifiers():
Log.warning('Can not skip diagnostics run when data is going to be modified: {0}'.format(self))
return False
Javier Vegas-Regidor
committed
return True
"""Full string representation. Defaults to str"""
return self._status
@status.setter
def status(self, value):
if self.status == DiagnosticStatus.RUNNING:
for generated_file in self._generated_files:
generated_file.local_status = LocalStatus.COMPUTING
if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED):
self._unsuscribe_requests()
Register a new diagnostic using the given alias.
It must be called using the derived class.
Parameters
----------
if not issubclass(diagnostic_class, Diagnostic):
raise ValueError('Class {0} must be derived from Diagnostic'.format(diagnostic_class))
if diagnostic_class.alias is None:
raise ValueError('Diagnostic class {0} must have defined an alias'.format(diagnostic_class))
Diagnostic._diag_list[diagnostic_class.alias] = diagnostic_class
# noinspection PyProtectedMember
@staticmethod
def get_diagnostic(name):
"""
Return the class for a diagnostic given its name
Parameters
----------
name: str
Returns
-------
Type[Diagnostic] or None
if name in Diagnostic._diag_list.keys():
return Diagnostic._diag_list[name]
Javier Vegas-Regidor
committed
def compute(self):
Calculate the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override compute method")
Request the data required by the diagnostic
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override request_data method")
Declare the data to be generated by the diagnostic
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override declare_data_generated method")
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN):
Declare a chunk that is going to be generated by the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int or None
chunk: int or None
grid: str or None
region: Basin or None
box: Box or None
frequency: Frequency or None
vartype: VariableType
Returns
-------
DataFile
generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box,
diagnostic=self, vartype=vartype, frequency=frequency)
if region is not None:
generated_chunk.add_modifier(self)
self._generated_files.append(generated_chunk)
return generated_chunk
def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
vartype=VariableType.MEAN):
"""
Declare a year that is going to be generated by the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
grid: str or None
box: Box or None
vartype: VariableType
Returns
-------
DataFile
"""
generated_year = self.data_manager.declare_year(domain, var, startdate, member, year, grid, box,
diagnostic=self, vartype=vartype)
self._generated_files.append(generated_year)
return generated_year
def generate_jobs(cls, diags, options):
"""
Generate the instances of the diagnostics that will be run by the manager
Must be implemented by derived classes.
Parameters
----------
diags: Diags
options: list of str
Returns
-------
list of Diagnostic
"""
raise NotImplementedError("Class must override generate_jobs class method")
Javier Vegas-Regidor
committed
@classmethod
def process_options(cls, options, options_available):
"""
Process the configuration of a diagnostic
Parameters
----------
options: iterable of str
options_available: iterable of DiagnosticOptiion
Returns
-------
dict of str: str
Dictionary of names and values for the options
Raises
------
DiagnosticOptionError:
If there are more options that admitted for the diagnostic
processed = dict()
options = options[1:]
Javier Vegas-Regidor
committed
if len(options) > len(options_available):
raise DiagnosticOptionError('You have specified more options than available for diagnostic '
'{0}'.format(cls.alias))
for x, option_definition in enumerate(options_available):
if len(options) <= x:
option_value = ''
else:
option_value = options[x]
processed[option_definition.name] = option_definition.parse(option_value)
return processed
Represenation of the diagnostic as a string
"""
return 'Developer must override base class __str__ method'
Add a subjob
Add a diagnostic that must be run before the current one
Parameters
----------
subjob: Diagnostic
"""
self.subjobs.append(subjob)
subjob.subscribe(self, self._subjob_status_changed)
def _subjob_status_changed(self, job):
self.check_is_ready()
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
"""
Request one chunk of data required by the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str or None
member: int or None
chunk: int or None
grid: str or None
box: Box or None
frequency: Frequency or str or None
to_modify: bool
Flag that must be active if the diagnostic is going to generate a modified version of this data. In this
case this data must not be declared as an output of the diagnostic
vartype: VariableType
Returns
-------
DataFile
See Also
--------
request_year
declare_chunk
declare_year
request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency, vartype)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def request_year(self, domain, var, startdate, member, year, grid=None, box=None, frequency=None, to_modify=False):
Request one year of data that is required for the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str
box: Box
frequency: Frequency
to_modify: str
Returns
-------
DataFile
See Also
--------
request_chunk
declare_chunk
declare_year
request = self.data_manager.request_year(self, domain, var, startdate, member, year, grid, box, frequency)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def _updated_request(self, request):
if self.status != DiagnosticStatus.WAITING:
return
if request.local_status == LocalStatus.FAILED:
self.message = 'Required file {0} is not available'.format(request.remote_file)
self.status = DiagnosticStatus.FAILED
return
if request.local_status == LocalStatus.READY:
self.check_is_ready()
def check_is_ready(self):
"""Check if a diagnostic is ready to run and change its status accordingly"""
if all([request.ready_to_run(self) for request in self._requests]) and\
all([subjob.status == DiagnosticStatus.COMPLETED for subjob in self.subjobs]):
self.status = DiagnosticStatus.READY
def _unsuscribe_requests(self):
for request in self._requests:
request.unsubscribe(self)
def all_requests_in_storage(self):
"""
Check if all the data requested is in the local scratch
Returns
-------
bool
return self.pending_requests() == 0
def pending_requests(self):
"""
Get the number of data request pending to be fulfilled
Returns
-------
int
return len([request.storage_status != StorageStatus.READY or request.local_status != LocalStatus.READY
for request in self._requests])
def _different_type(self, other):
return type(self) is not type(other)
class DiagnosticOption(object):
"""Class to manage string options for the diagnostic"""
def __init__(self, name, default_value=None):
Parameters
----------
name: str
default_value: object, optional
If None, the option is required and an exception will be thrown at parse time if the value is empty
self.name = name
self.default_value = default_value
def parse(self, option_value):
"""
Get the final value for the option
If option_value is empty, return default_value
Parameters
----------
option_value: str
Returns
-------
str
Raises
------
DiagnosticOptionError:
If the option is empty and default_value is False
"""
option_value = self._check_default(option_value)
return option_value
if option_value == '':
if self.default_value is None:
raise DiagnosticOptionError('Option {0} is not optional'.format(self.name))
else:
return self.default_value
return option_value
class DiagnosticFloatOption(DiagnosticOption):
def parse(self, option_value):
"""
Parse option value
Parameters
----------
option_value:str
Returns
-------
float
"""
class DiagnosticIntOption(DiagnosticOption):
"""
Class for parsing integer options
Parameters
----------
name: str
default_value: int, optional
min_limit: int, optional
If setted, any value below this will not be accepted
max_limit: int, optional
If setted, any value over this will not be accepted
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
self.max_limit = max_limit
def parse(self, option_value):
"""
Parse option value
Parameters
----------
option_value:str
Returns
-------
int
Raises
------
DiagnosticOptionError
If parsed values is outside limits
"""
self._check_limits(value)
return value
def _check_limits(self, value):
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
class DiagnosticListIntOption(DiagnosticIntOption):
Class for parsing integer list options
Parameters
----------
name: str
default_value: list, optional
min_limit: int, optional
If setted, any value below this will not be accepted
max_limit: int, optional
If setted, any value over this will not be accepted
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticListIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
"""
Parse option value
Parameters
----------
option_value:str
Returns
-------
list(int)
Raises
------
DiagnosticOptionError
If parsed values is outside limits
"""
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [int(i) for i in option_value.split('-')]
for value in values:
self._check_limits(value)
class DiagnosticListFrequenciesOption(DiagnosticOption):
"""
Class for parsing an option which is a list of frequencies
Parameters
----------
name: str
default_value: list, optional
def __init__(self, name, default_value=None):
super(DiagnosticListFrequenciesOption, self).__init__(name, default_value)
def parse(self, option_value):
"""
Parse option value
Returns
-------
List of Frequency
"""
if isinstance(option_value, (tuple, list)):
return option_value
values = [Frequency(i) for i in option_value.split('-')]
return values
class DiagnosticVariableOption(DiagnosticOption):
"""
Class to parse variable options
Parameters
----------
var_manager: VariableManager
name: str, optional
default_value: str, optional
def __init__(self, var_manager, name='variable', default_value=None):
super(DiagnosticVariableOption, self).__init__(name, default_value)
self.var_manager = var_manager
"""
Parse option value
Returns
-------
Variable
"""
real_name = self.var_manager.get_variable(option_value, False)
if real_name is None:
return option_value
return real_name.short_name
class DiagnosticVariableListOption(DiagnosticOption):
"""
Class to parse variable list options
Parameters
----------
var_manager: VariableManager
name: str, optional
default_value: str, optional
def __init__(self, var_manager, name, default_value=None):
super(DiagnosticVariableListOption, self).__init__(name, default_value)
self.var_manager = var_manager
def parse(self, option_value):
"""
Parse option value
Returns
-------
List[Variable]
"""
real_name = self.var_manager.get_variable(value, False)
if real_name is None:
var_names.append(value)
else:
var_names.append(real_name.short_name)
class DiagnosticDomainOption(DiagnosticOption):
"""
Class to parse domain options
Parameters
----------
name: str, optional
default_value: str, optional
def __init__(self, name='domain', default_value=None):
super(DiagnosticDomainOption, self).__init__(name, default_value)
def parse(self, option_value):
"""
Parse option value
Returns
-------
ModelingRealm
"""
return ModelingRealms.parse(self._check_default(option_value))
class DiagnosticFrequencyOption(DiagnosticOption):
"""
Class to parse frequency options
Parameters
----------
name: str, optional
default_value: Frequency,optional
def __init__(self, name='frequency', default_value=None):
super(DiagnosticFrequencyOption, self).__init__(name, default_value)
def parse(self, option_value):
"""
Parse option value
Parameters
----------
option_value: str
Returns
-------
Frequency
"""
return Frequency.parse(self._check_default(option_value))
Javier Vegas-Regidor
committed
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
class DiagnosticBasinListOption(DiagnosticOption):
"""Class to parse list of basins options"""
def parse(self, option_value):
"""
Parse option value
Parameters
----------
option_value: str
Returns
-------
Basin
"""
option_value = self._check_default(option_value)
basins = []
for value in option_value.split(':'):
basin = Basins().parse(value)
if basin is None:
raise DiagnosticOptionError('Basin {0} not recognized'.format(value))
basins.append(basin)
return basins
Javier Vegas-Regidor
committed
class DiagnosticBasinOption(DiagnosticOption):
Javier Vegas-Regidor
committed
def parse(self, option_value):
"""
Parse option value
Parameters
----------
option_value: str
Returns
-------
Basin
"""
basin = Basins().parse(value)
if basin is None:
raise DiagnosticOptionError('Basin {0} not recognized'.format(value))
return basin
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
class DiagnosticComplexStrOption(DiagnosticOption):
"""
Class to parse complex string options
It replaces '&;' with ',' and '&.' with ' '
def parse(self, option_value):
"""
Parse option value
Parameters
----------
option_value:str
Returns
-------
str
"""
return self._check_default(option_value).replace('&;', ',').replace('&.', ' ')
class DiagnosticBoolOption(DiagnosticOption):
def parse(self, option_value):
"""
Parse option value
Parameters
----------
option_value:str
Returns
-------
Bool
"""
if isinstance(option_value, bool):
return option_value
return option_value.lower() in ('true', 't', 'yes')
class DiagnosticChoiceOption(DiagnosticOption):
"""
Class to parse choice option
Parameters
----------
name: str
choices: list of str
Valid options for the option
default_value: str, optional
If not None, it should ve a valid choice
ignore_case: bool, optional
If false, value must match case of the valid choice
def __init__(self, name, choices, default_value=None, ignore_case=True):
super(DiagnosticChoiceOption, self).__init__(name, default_value)
self.choices = choices
self.ignore_case = ignore_case
# To check if it is valid
if default_value is not None:
self.parse(default_value)
"""
Parse option value
Parameters
----------
option_value:str
Returns
-------
str
"""
if self.ignore_case:
for choice in self.choices:
return choice
else:
if option_value in self.choices:
return option_value
raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. '
'Options are {2}'.format(self.name, option_value, self.choices))
class DiagnosticOptionError(Exception):
"""Exception class for errors related to bad options for the diagnostics"""