Newer
Older
from datafile import StorageStatus, LocalStatus
from earthdiagnostics.constants import Basins, Basin
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.variable_type import VariableType
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.variable import VariableManager
Javier Vegas-Regidor
committed
class DiagnosticStatus(object):
WAITING = 0
READY = 1
RUNNING = 2
COMPLETED = 3
FAILED = 4
Javier Vegas-Regidor
committed
Base class for the diagnostics. Provides a common interface for them and also
has a mechanism that allows diagnostic retrieval by name.
:param data_manager: data manager that will be used to store and retrieve the necessary data
:type data_manager: DataManager
Javier Vegas-Regidor
committed
alias = None
"""
Alias to call the diagnostic. Must be overridden at the derived clases
"""
self._status = DiagnosticStatus.WAITING
self._requests = []
self.consumed_time = datetime.timedelta()
@property
def status(self):
return self._status
@status.setter
def status(self, value):
if self.status == DiagnosticStatus.RUNNING:
for generated_file in self._generated_files:
generated_file.local_status = LocalStatus.COMPUTING
if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED):
self._unsuscribe_requests()
def register(cls):
"""
Register a new diagnostic using the given alias. It must be call using the derived class.
:param cls: diagnostic class to register
:type cls: Diagnostic
"""
if not issubclass(cls, Diagnostic):
raise ValueError('Class {0} must be derived from Diagnostic'.format(cls))
if cls.alias is None:
raise ValueError('Diagnostic class {0} must have defined an alias'.format(cls))
Diagnostic._diag_list[cls.alias] = cls
# noinspection PyProtectedMember
@staticmethod
def get_diagnostic(name):
"""
Return the class for a diagnostic given its name
:param name: diagnostic alias
:type name: str
:return: the selected Diagnostic class, None if name can not be found
:rtype: Diagnostic
"""
if name in Diagnostic._diag_list.keys():
return Diagnostic._diag_list[name]
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None,
Javier Vegas-Regidor
committed
box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
vartype=VariableType.MEAN):
Javier Vegas-Regidor
committed
"""
:param filetosend:
:param domain:
Javier Vegas-Regidor
committed
:param var:
:param startdate:
:param member:
:param chunk:
:param grid:
:param region:
:param box:
:param rename_var:
:param frequency:
:type frequency: Frequency
Javier Vegas-Regidor
committed
:param year:
:param date_str:
:param move_old:
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
Javier Vegas-Regidor
committed
:return:
"""
if isinstance(region, Basin):
region = region.fullname
self.data_manager.send_file(filetosend, domain, var, startdate, member, chunk, grid, region,
Javier Vegas-Regidor
committed
box, rename_var, frequency, year, date_str, move_old, diagnostic=self,
vartype=vartype)
Javier Vegas-Regidor
committed
def compute(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override compute method")
def request_data(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override request_data method")
def declare_data_generated(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override declare_data_generated method")
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
:param filetosend:
:param domain:
:type domain: ModelingRealm
:param var:
:param startdate:
:param member:
:param chunk:
:param grid:
:param region:
:param box:
:param rename_var:
:param frequency:
:type frequency: Frequency
:param year:
:param date_str:
:param move_old:
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: datafile object
:rtype: DataFile
"""
if isinstance(region, Basin):
region = region.fullname
generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box,
diagnostic=self, vartype=vartype, frequency=frequency)
self._generated_files.append(generated_chunk)
return generated_chunk
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
vartype=VariableType.MEAN):
"""
:param filetosend:
:param domain:
:type domain: ModelingRealm
:param var:
:param startdate:
:param member:
:param chunk:
:param grid:
:param region:
:param box:
:param rename_var:
:param frequency:
:type frequency: Frequency
:param year:
:param date_str:
:param move_old:
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: datafile object
:rtype: DataFile
"""
generated_year = self.data_manager.declare_year(domain, var, startdate, member, year, grid, box,
diagnostic=self, vartype=vartype)
self._generated_files.append(generated_year)
return generated_year
def generate_jobs(cls, diags, options):
"""
Generate the instances of the diagnostics that will be run by the manager
Must be implemented by derived classes.
:param diags: diagnostics manager
:type diags: Diags
:param options: list of strings containing the options passed to the diagnostic
:return:
"""
raise NotImplementedError("Class must override generate_jobs class method")
Javier Vegas-Regidor
committed
@classmethod
def process_options(cls, options, options_available):
processed = dict()
options = options[1:]
Javier Vegas-Regidor
committed
if len(options) > len(options_available):
raise DiagnosticOptionError('You have specified more options than available for diagnostic '
'{0}'.format(cls.alias))
for x in range(len(options_available)):
option_definition = options_available[x]
if len(options) <= x:
option_value = ''
else:
option_value = options[x]
processed[option_definition.name] = option_definition.parse(option_value)
return processed
def __str__(self):
"""
Must be implemented by derived classes
:return:
"""
return 'Developer must override base class __str__ method'
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, to_modify=False):
request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def request_year(self, domain, var, startdate, member, year, grid=None, box=None, frequency=None, to_modify=False):
request = self.data_manager.request_year(self, domain, var, startdate, member, year, grid, box, frequency)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def _updated_request(self, request):
if self.status != DiagnosticStatus.WAITING:
return
if request.local_status == LocalStatus.FAILED:
self.message = 'Required file {0} is not available'.format(request.remote_file)
self.status = DiagnosticStatus.FAILED
return
if request.local_status == LocalStatus.READY:
self.check_is_ready()
def check_is_ready(self):
if all([request.ready_to_run(self) for request in self._requests]):
self.status = DiagnosticStatus.READY
def _unsuscribe_requests(self):
for request in self._requests:
request.unsubscribe(self)
def all_requests_in_storage(self):
return not any(request.storage_status != StorageStatus.READY for request in self._requests)
class DiagnosticOption(object):
def __init__(self, name, default_value=None):
self.name = name
self.default_value = default_value
def parse(self, option_value):
option_value = self.check_default(option_value)
return option_value
def check_default(self, option_value):
if option_value == '':
if self.default_value is None:
raise DiagnosticOptionError('Option {0} is not optional'.format(self.name))
else:
return self.default_value
return option_value
class DiagnosticFloatOption(DiagnosticOption):
def parse(self, option_value):
return float(self.check_default(option_value))
class DiagnosticIntOption(DiagnosticOption):
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
self.max_limit = max_limit
def parse(self, option_value):
value = int(self.check_default(option_value))
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return value
class DiagnosticListIntOption(DiagnosticOption):
"""
:param name:
:param default_value:
:param min_limit:
:param max_limit:
:type name:
:type default_value:
:type min_limit: int|NoneType
:type max_limit: int|NoneType
"""
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticListIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
self.max_limit = max_limit
def parse(self, option_value):
option_value = self.check_default(option_value)
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [int(i) for i in option_value.split('-')]
for value in values:
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return values
class DiagnosticListFrequenciesOption(DiagnosticOption):
def __init__(self, name, default_value=None):
super(DiagnosticListFrequenciesOption, self).__init__(name, default_value)
def parse(self, option_value):
option_value = self.check_default(option_value)
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [Frequency(i) for i in option_value.split('-')]
return values
class DiagnosticVariableOption(DiagnosticOption):
def parse(self, option_value):
option_value = self.check_default(option_value)
real_name = VariableManager().get_variable(option_value, False)
if real_name is None:
return option_value
return real_name.short_name
class DiagnosticVariableListOption(DiagnosticOption):
def parse(self, option_value):
option_value = self.check_default(option_value)
var_names = []
for value in option_value.split('-'):
real_name = VariableManager().get_variable(value, False)
if real_name is None:
var_names.append(value)
else:
var_names.append(real_name.short_name)
class DiagnosticDomainOption(DiagnosticOption):
def parse(self, option_value):
return ModelingRealms.parse(self.check_default(option_value))
class DiagnosticFrequencyOption(DiagnosticOption):
def parse(self, option_value):
return Frequency.parse(self.check_default(option_value))
Javier Vegas-Regidor
committed
class DiagnosticBasinOption(DiagnosticOption):
def parse(self, option_value):
return Basins.parse(self.check_default(option_value))
class DiagnosticComplexStrOption(DiagnosticOption):
def parse(self, option_value):
return self.check_default(option_value).replace('&;', ',').replace('&.', ' ')
class DiagnosticBoolOption(DiagnosticOption):
def parse(self, option_value):
option_value = self.check_default(option_value)
if isinstance(option_value, bool):
return option_value
else:
return option_value.lower() in ('true', 't', 'yes')
class DiagnosticChoiceOption(DiagnosticOption):
def __init__(self, name, choices, default_value=None, ignore_case=True):
super(DiagnosticChoiceOption, self).__init__(name, default_value)
self.choices = choices
self.ignore_case = ignore_case
def parse(self, value):
value = self.check_default(value)
if self.ignore_case:
value = value.lower()
for choice in self.choices:
if value == choice.lower():
return choice
else:
if value in self.choices:
return value
raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. '
'Options are {2}'.format(self.name, value, self.choices))
class DiagnosticOptionError(Exception):
pass