Newer
Older
from datafile import StorageStatus, LocalStatus
from earthdiagnostics.constants import Basins, Basin
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.variable_type import VariableType
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.variable import VariableManager
Javier Vegas-Regidor
committed
class DiagnosticStatus(object):
WAITING = 0
READY = 1
RUNNING = 2
COMPLETED = 3
FAILED = 4
Javier Vegas-Regidor
committed
Base class for the diagnostics. Provides a common interface for them and also
has a mechanism that allows diagnostic retrieval by name.
:param data_manager: data manager that will be used to store and retrieve the necessary data
:type data_manager: DataManager
Javier Vegas-Regidor
committed
alias = None
"""
Alias to call the diagnostic. Must be overridden at the derived clases
"""
self._status = DiagnosticStatus.WAITING
self._requests = []
self.consumed_time = datetime.timedelta()
@property
def status(self):
return self._status
@status.setter
def status(self, value):
if self.status == DiagnosticStatus.RUNNING:
for generated_file in self._generated_files:
generated_file.local_status = LocalStatus.COMPUTING
if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED):
self._unsuscribe_requests()
def register(cls):
"""
Register a new diagnostic using the given alias. It must be call using the derived class.
:param cls: diagnostic class to register
:type cls: Type[Diagnostic]
if not issubclass(cls, Diagnostic):
raise ValueError('Class {0} must be derived from Diagnostic'.format(cls))
if cls.alias is None:
raise ValueError('Diagnostic class {0} must have defined an alias'.format(cls))
Diagnostic._diag_list[cls.alias] = cls
# noinspection PyProtectedMember
@staticmethod
def get_diagnostic(name):
"""
Return the class for a diagnostic given its name
:param name: diagnostic alias
:type name: str
:return: the selected Diagnostic class, None if name can not be found
:rtype: Diagnostic
"""
if name in Diagnostic._diag_list.keys():
return Diagnostic._diag_list[name]
Javier Vegas-Regidor
committed
def compute(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override compute method")
def request_data(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override request_data method")
def declare_data_generated(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override declare_data_generated method")
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
:param domain:
:type domain: ModelingRealm
:param var:
:param startdate:
:param member:
:param chunk:
:param grid:
:param region:
:param box:
:param frequency:
:type frequency: Frequency
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box,
diagnostic=self, vartype=vartype, frequency=frequency)
self._generated_files.append(generated_chunk)
return generated_chunk
def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
vartype=VariableType.MEAN):
"""
:param domain:
:type domain: ModelingRealm
:param var:
:param startdate:
:param member:
:param grid:
:param box:
:param year:
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: datafile object
:rtype: DataFile
"""
generated_year = self.data_manager.declare_year(domain, var, startdate, member, year, grid, box,
diagnostic=self, vartype=vartype)
self._generated_files.append(generated_year)
return generated_year
def generate_jobs(cls, diags, options):
"""
Generate the instances of the diagnostics that will be run by the manager
Must be implemented by derived classes.
:param diags: diagnostics manager
:type diags: Diags
:param options: list of strings containing the options passed to the diagnostic
:return:
"""
raise NotImplementedError("Class must override generate_jobs class method")
Javier Vegas-Regidor
committed
@classmethod
def process_options(cls, options, options_available):
processed = dict()
options = options[1:]
Javier Vegas-Regidor
committed
if len(options) > len(options_available):
raise DiagnosticOptionError('You have specified more options than available for diagnostic '
'{0}'.format(cls.alias))
for x in range(len(options_available)):
option_definition = options_available[x]
if len(options) <= x:
option_value = ''
else:
option_value = options[x]
processed[option_definition.name] = option_definition.parse(option_value)
return processed
def __str__(self):
"""
Must be implemented by derived classes
:return:
"""
return 'Developer must override base class __str__ method'
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
to_modify=False):
request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def request_year(self, domain, var, startdate, member, year, grid=None, box=None, frequency=None, to_modify=False):
request = self.data_manager.request_year(self, domain, var, startdate, member, year, grid, box, frequency)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def _updated_request(self, request):
if self.status != DiagnosticStatus.WAITING:
return
if request.local_status == LocalStatus.FAILED:
self.message = 'Required file {0} is not available'.format(request.remote_file)
self.status = DiagnosticStatus.FAILED
return
if request.local_status == LocalStatus.READY:
self.check_is_ready()
def check_is_ready(self):
if all([request.ready_to_run(self) for request in self._requests]):
self.status = DiagnosticStatus.READY
def _unsuscribe_requests(self):
for request in self._requests:
request.unsubscribe(self)
def all_requests_in_storage(self):
return not any(request.storage_status != StorageStatus.READY for request in self._requests)
class DiagnosticOption(object):
def __init__(self, name, default_value=None):
self.name = name
self.default_value = default_value
def parse(self, option_value):
option_value = self.check_default(option_value)
return option_value
def check_default(self, option_value):
if option_value == '':
if self.default_value is None:
raise DiagnosticOptionError('Option {0} is not optional'.format(self.name))
else:
return self.default_value
return option_value
class DiagnosticFloatOption(DiagnosticOption):
def parse(self, option_value):
return float(self.check_default(option_value))
class DiagnosticIntOption(DiagnosticOption):
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
self.max_limit = max_limit
def parse(self, option_value):
value = int(self.check_default(option_value))
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return value
class DiagnosticListIntOption(DiagnosticOption):
"""
:param name:
:type name: str
:param default_value:
:type default_value: int|NoneType
:param min_limit:
:type min_limit: int|NoneType
:param max_limit:
:type max_limit: int|NoneType
"""
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticListIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
def parse(self, option_value):
option_value = self.check_default(option_value)
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [int(i) for i in option_value.split('-')]
for value in values:
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return values
class DiagnosticListFrequenciesOption(DiagnosticOption):
def __init__(self, name, default_value=None):
super(DiagnosticListFrequenciesOption, self).__init__(name, default_value)
def parse(self, option_value):
option_value = self.check_default(option_value)
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [Frequency(i) for i in option_value.split('-')]
return values
class DiagnosticVariableOption(DiagnosticOption):
def __init__(self, name='variable', default_value=None):
super(DiagnosticVariableOption, self).__init__(name, default_value)
def parse(self, option_value):
option_value = self.check_default(option_value)
real_name = VariableManager().get_variable(option_value, False)
if real_name is None:
return option_value
return real_name.short_name
class DiagnosticVariableListOption(DiagnosticOption):
def parse(self, option_value):
option_value = self.check_default(option_value)
var_names = []
for value in option_value.split('-'):
real_name = VariableManager().get_variable(value, False)
if real_name is None:
var_names.append(value)
else:
var_names.append(real_name.short_name)
class DiagnosticDomainOption(DiagnosticOption):
def __init__(self, name='domain', default_value=None):
super(DiagnosticDomainOption, self).__init__(name, default_value)
def parse(self, option_value):
return ModelingRealms.parse(self.check_default(option_value))
class DiagnosticFrequencyOption(DiagnosticOption):
def __init__(self, name='frequency', default_value=None):
super(DiagnosticFrequencyOption, self).__init__(name, default_value)
def parse(self, option_value):
return Frequency.parse(self.check_default(option_value))
Javier Vegas-Regidor
committed
class DiagnosticBasinOption(DiagnosticOption):
def parse(self, option_value):
return Basins().parse(self.check_default(option_value))
Javier Vegas-Regidor
committed
class DiagnosticComplexStrOption(DiagnosticOption):
def parse(self, option_value):
return self.check_default(option_value).replace('&;', ',').replace('&.', ' ')
class DiagnosticBoolOption(DiagnosticOption):
def parse(self, option_value):
option_value = self.check_default(option_value)
if isinstance(option_value, bool):
return option_value
else:
return option_value.lower() in ('true', 't', 'yes')
class DiagnosticChoiceOption(DiagnosticOption):
def __init__(self, name, choices, default_value=None, ignore_case=True):
super(DiagnosticChoiceOption, self).__init__(name, default_value)
self.choices = choices
self.ignore_case = ignore_case
def parse(self, value):
value = self.check_default(value)
if self.ignore_case:
value = value.lower()
for choice in self.choices:
if value == choice.lower():
return choice
else:
if value in self.choices:
return value
raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. '
'Options are {2}'.format(self.name, value, self.choices))
class DiagnosticOptionError(Exception):
pass