Newer
Older
from datafile import StorageStatus, LocalStatus
from earthdiagnostics.constants import Basins, Basin
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.variable_type import VariableType
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.variable import VariableManager
Javier Vegas-Regidor
committed
class DiagnosticStatus(object):
WAITING = 0
READY = 1
RUNNING = 2
COMPLETED = 3
FAILED = 4
Javier Vegas-Regidor
committed
Base class for the diagnostics. Provides a common interface for them and also
has a mechanism that allows diagnostic retrieval by name.
:param data_manager: data manager that will be used to store and retrieve the necessary data
:type data_manager: DataManager
Javier Vegas-Regidor
committed
alias = None
"""
Alias to call the diagnostic. Must be overridden at the derived clases
"""
self._status = DiagnosticStatus.WAITING
self._requests = []
self.consumed_time = datetime.timedelta()
@property
def status(self):
return self._status
@status.setter
def status(self, value):
if self.status == DiagnosticStatus.RUNNING:
for generated_file in self._generated_files:
generated_file.local_status = LocalStatus.COMPUTING
if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED):
self._unsuscribe_requests()
def register(cls):
"""
Register a new diagnostic using the given alias. It must be call using the derived class.
:param cls: diagnostic class to register
:type cls: Diagnostic
"""
if not issubclass(cls, Diagnostic):
raise ValueError('Class {0} must be derived from Diagnostic'.format(cls))
if cls.alias is None:
raise ValueError('Diagnostic class {0} must have defined an alias'.format(cls))
Diagnostic._diag_list[cls.alias] = cls
# noinspection PyProtectedMember
@staticmethod
def get_diagnostic(name):
"""
Return the class for a diagnostic given its name
:param name: diagnostic alias
:type name: str
:return: the selected Diagnostic class, None if name can not be found
:rtype: Diagnostic
"""
if name in Diagnostic._diag_list.keys():
return Diagnostic._diag_list[name]
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None,
Javier Vegas-Regidor
committed
box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
vartype=VariableType.MEAN):
Javier Vegas-Regidor
committed
"""
:param filetosend:
:param domain:
Javier Vegas-Regidor
committed
:param var:
:param startdate:
:param member:
:param chunk:
:param grid:
:param region:
:param box:
:param rename_var:
:param frequency:
:type frequency: Frequency
Javier Vegas-Regidor
committed
:param year:
:param date_str:
:param move_old:
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
Javier Vegas-Regidor
committed
:return:
"""
if isinstance(region, Basin):
region = region.fullname
self.data_manager.send_file(filetosend, domain, var, startdate, member, chunk, grid, region,
Javier Vegas-Regidor
committed
box, rename_var, frequency, year, date_str, move_old, diagnostic=self,
vartype=vartype)
Javier Vegas-Regidor
committed
def compute(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override compute method")
def request_data(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
pass
def declare_data_generated(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
pass
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
:param filetosend:
:param domain:
:type domain: ModelingRealm
:param var:
:param startdate:
:param member:
:param chunk:
:param grid:
:param region:
:param box:
:param rename_var:
:param frequency:
:type frequency: Frequency
:param year:
:param date_str:
:param move_old:
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return:
"""
if isinstance(region, Basin):
region = region.fullname
generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box,
diagnostic=self, vartype=vartype, frequency=frequency)
self._generated_files.append(generated_chunk)
return generated_chunk
def generate_jobs(cls, diags, options):
"""
Generate the instances of the diagnostics that will be run by the manager
Must be implemented by derived classes.
:param diags: diagnostics manager
:type diags: Diags
:param options: list of strings containing the options passed to the diagnostic
:return:
"""
raise NotImplementedError("Class must override generate_jobs class method")
Javier Vegas-Regidor
committed
@classmethod
def process_options(cls, options, options_available):
processed = dict()
options = options[1:]
Javier Vegas-Regidor
committed
if len(options) > len(options_available):
raise DiagnosticOptionError('You have specified more options than available for diagnostic '
'{0}'.format(cls.alias))
for x in range(len(options_available)):
option_definition = options_available[x]
if len(options) <= x:
option_value = ''
else:
option_value = options[x]
processed[option_definition.name] = option_definition.parse(option_value)
return processed
def __str__(self):
"""
Must be implemented by derived classes
:return:
"""
return 'Developer must override base class __str__ method'
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, to_modify=False):
request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def _updated_request(self, request):
if self.status != DiagnosticStatus.WAITING:
return
if request.local_status == LocalStatus.FAILED:
self.reason = 'Required file {0} is not available'.format(request.remote_file)
self.status = DiagnosticStatus.FAILED
return
if request.local_status == LocalStatus.READY:
self.check_is_ready()
def check_is_ready(self):
if all([request.ready_to_run(self) for request in self._requests]):
self.status = DiagnosticStatus.READY
def _unsuscribe_requests(self):
for request in self._requests:
request.unsubscribe(self)
def all_requests_in_storage(self):
return not any(request.storage_status != StorageStatus.READY for request in self._requests)
class DiagnosticOption(object):
def __init__(self, name, default_value=None):
self.name = name
self.default_value = default_value
def parse(self, option_value):
option_value = self.check_default(option_value)
return option_value
def check_default(self, option_value):
if option_value == '':
if self.default_value is None:
raise DiagnosticOptionError('Option {0} is not optional'.format(self.name))
else:
return self.default_value
return option_value
class DiagnosticFloatOption(DiagnosticOption):
def parse(self, option_value):
return float(self.check_default(option_value))
class DiagnosticIntOption(DiagnosticOption):
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
self.max_limit = max_limit
def parse(self, option_value):
value = int(self.check_default(option_value))
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return value
class DiagnosticListIntOption(DiagnosticOption):
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticListIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
self.max_limit = max_limit
def parse(self, option_value):
option_value = self.check_default(option_value)
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [int(i) for i in option_value.split('-')]
for value in values:
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return values
class DiagnosticListFrequenciesOption(DiagnosticOption):
def __init__(self, name, default_value=None):
super(DiagnosticListFrequenciesOption, self).__init__(name, default_value)
def parse(self, option_value):
option_value = self.check_default(option_value)
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [Frequency(i) for i in option_value.split('-')]
return values
class DiagnosticVariableOption(DiagnosticOption):
def parse(self, option_value):
option_value = self.check_default(option_value)
real_name = VariableManager().get_variable(option_value, False)
if real_name is None:
return option_value
return real_name.short_name
class DiagnosticDomainOption(DiagnosticOption):
def parse(self, option_value):
return ModelingRealms.parse(self.check_default(option_value))
class DiagnosticFrequencyOption(DiagnosticOption):
def parse(self, option_value):
return Frequency.parse(self.check_default(option_value))
Javier Vegas-Regidor
committed
class DiagnosticBasinOption(DiagnosticOption):
def parse(self, option_value):
return Basins.parse(self.check_default(option_value))
class DiagnosticComplexStrOption(DiagnosticOption):
def parse(self, option_value):
return self.check_default(option_value).replace('&;', ',').replace('&.', ' ')
class DiagnosticBoolOption(DiagnosticOption):
def parse(self, option_value):
option_value = self.check_default(option_value)
if isinstance(option_value, bool):
return option_value
else:
return option_value.lower() in ('true', 't', 'yes')
class DiagnosticChoiceOption(DiagnosticOption):
def __init__(self, name, choices, default_value=None, ignore_case=True):
super(DiagnosticChoiceOption, self).__init__(name, default_value)
self.choices = choices
self.ignore_case = ignore_case
def parse(self, value):
value = self.check_default(value)
if self.ignore_case:
value = value.lower()
for choice in self.choices:
if value == choice.lower():
return choice
else:
if value in self.choices:
return value
raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. '
'Options are {2}'.format(self.name, value, self.choices))
class DiagnosticOptionError(Exception):
pass