Newer
Older
from bscearth.utils.log import Log
from earthdiagnostics.constants import Basins, Basin
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.publisher import Publisher
from earthdiagnostics.variable_type import VariableType
Javier Vegas-Regidor
committed
WAITING = 0
READY = 1
RUNNING = 2
COMPLETED = 3
FAILED = 4
Base class for the diagnostics.
Provides a common interface for them and also has a mechanism that allows diagnostic retrieval by name.
:param data_manager: data manager that will be used to store and retrieve the necessary data
:type data_manager: DataManager
Javier Vegas-Regidor
committed
alias = None
""" Alias to call the diagnostic. Must be overridden at the derived clases"""
Parameters
----------
data_manager: DataManager
"""
self._status = DiagnosticStatus.WAITING
self._requests = []
self.consumed_time = datetime.timedelta()
def __ne__(self, other):
"""
Check if a diagnostic is different than other
Implementation is just the negation of the equal, that should be implemented by the derived classes
Parameters
----------
other: Diagnostic or NoneType
Diagnostic to be compared
Returns
-------
bool
"""
return not self == other
def can_skip_run(self):
"""
Check if a diagnostic calculation can be skipped
Looks if the data to be generated is already there and is not going to be modified
Returns
-------
bool
"""
for file_generated in self._generated_files:
if file_generated.storage_status != StorageStatus.READY:
return False
if file_generated.has_modifiers():
Log.warning('Can not skip diagnostics run when data is going to be modified: {0}'.format(self))
return False
"""Full string representation. Defaults to str"""
return self._status
@status.setter
def status(self, value):
if self.status == DiagnosticStatus.RUNNING:
for generated_file in self._generated_files:
generated_file.local_status = LocalStatus.COMPUTING
if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED):
self._unsuscribe_requests()
def register(cls):
Register a new diagnostic using the given alias.
It must be called using the derived class.
Parameters
----------
cls: Type[Diagnostic]
if not issubclass(cls, Diagnostic):
raise ValueError('Class {0} must be derived from Diagnostic'.format(cls))
if cls.alias is None:
raise ValueError('Diagnostic class {0} must have defined an alias'.format(cls))
Diagnostic._diag_list[cls.alias] = cls
# noinspection PyProtectedMember
@staticmethod
def get_diagnostic(name):
"""
Return the class for a diagnostic given its name
Parameters
----------
name: str
Returns
-------
Type[Diagnostic] or None
if name in Diagnostic._diag_list.keys():
return Diagnostic._diag_list[name]
Javier Vegas-Regidor
committed
def compute(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override compute method")
def request_data(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override request_data method")
def declare_data_generated(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override declare_data_generated method")
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN):
Declare a chunk that is going to be generated by the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str
region: Basin
box: Box
frequency: Frequency
vartype: VariableType
Returns
-------
DataFile
generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box,
diagnostic=self, vartype=vartype, frequency=frequency)
if region is not None:
generated_chunk.add_modifier(self)
self._generated_files.append(generated_chunk)
return generated_chunk
def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
vartype=VariableType.MEAN):
"""
Declare a year that is going to be generated by the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: 1
year: 1
grid: str
box: Box
vartype: VariableType
Returns
-------
DataFile
"""
generated_year = self.data_manager.declare_year(domain, var, startdate, member, year, grid, box,
diagnostic=self, vartype=vartype)
self._generated_files.append(generated_year)
return generated_year
def generate_jobs(cls, diags, options):
"""
Generate the instances of the diagnostics that will be run by the manager
Must be implemented by derived classes.
Parameters
----------
diags: Diags
options: list of str
Returns
-------
list of Diagnostic
"""
raise NotImplementedError("Class must override generate_jobs class method")
Javier Vegas-Regidor
committed
@classmethod
def process_options(cls, options, options_available):
"""
Process the configuration of a diagnostic
Parameters
----------
options: iterable of str
options_available: iterable of DiagnosticOptiion
Returns
-------
dict of str: str
Dictionary of names and values for the options
Raises
------
DiagnosticOptionError:
If there are more options that admitted for the diagnostic
"""
processed = dict()
options = options[1:]
Javier Vegas-Regidor
committed
if len(options) > len(options_available):
raise DiagnosticOptionError('You have specified more options than available for diagnostic '
'{0}'.format(cls.alias))
for x, option_definition in enumerate(options_available):
if len(options) <= x:
option_value = ''
else:
option_value = options[x]
processed[option_definition.name] = option_definition.parse(option_value)
return processed
"""
return 'Developer must override base class __str__ method'
Add a subjob
Add a diagnostic that must be run before the current one
Parameters
----------
subjob: Diagnostic
"""
self.subjobs.append(subjob)
subjob.subscribe(self, self._subjob_status_changed)
def _subjob_status_changed(self, job):
self.check_is_ready()
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
"""
Request one chunk of data required by the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str
box: Box
frequency: Frequency
to_modify: bool
Flag that must be active if the diagnostic is going to generate a modified version of this data. In this
case this data must not be declared as an output of the diagnostic
vartype: VariableType
Returns
-------
DataFile
See Also
--------
request_year
declare_chunk
declare_year
"""
request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency, vartype)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def request_year(self, domain, var, startdate, member, year, grid=None, box=None, frequency=None, to_modify=False):
Request one year of data that is required for the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str
box: Box
frequency: Frequency
to_modify: str
Returns
-------
DataFile
See Also
--------
request_chunk
declare_chunk
declare_year
"""
request = self.data_manager.request_year(self, domain, var, startdate, member, year, grid, box, frequency)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def _updated_request(self, request):
if self.status != DiagnosticStatus.WAITING:
return
if request.local_status == LocalStatus.FAILED:
self.message = 'Required file {0} is not available'.format(request.remote_file)
self.status = DiagnosticStatus.FAILED
return
if request.local_status == LocalStatus.READY:
self.check_is_ready()
def check_is_ready(self):
"""Check if a diagnostic is ready to run and change its status accordingly"""
if all([request.ready_to_run(self) for request in self._requests]) and\
all([subjob.status == DiagnosticStatus.COMPLETED for subjob in self.subjobs]):
self.status = DiagnosticStatus.READY
def _unsuscribe_requests(self):
for request in self._requests:
request.unsubscribe(self)
def all_requests_in_storage(self):
"""
Check if all the data requested is in the local scratch
Returns
-------
bool
"""
return self.pending_requests() == 0
def pending_requests(self):
"""
Get the number of data request pending to be fulfilled
Returns
-------
int
"""
return len([request.storage_status != StorageStatus.READY or request.local_status != LocalStatus.READY
for request in self._requests])
class DiagnosticOption(object):
"""Class to manage string options for the diagnostic"""
def __init__(self, name, default_value=None):
Parameters
----------
name: str
default_value: object, optional
If None, the option is required and an exception will be thrown at parse time if the value is empty
self.name = name
self.default_value = default_value
def parse(self, option_value):
"""
Get the final value for the option
If option_value is empty, return default_value
Parameters
----------
option_value: str
Returns
-------
str
Raises
------
DiagnosticOptionError:
If the option is empty and default_value is False
"""
option_value = self._check_default(option_value)
return option_value
if option_value == '':
if self.default_value is None:
raise DiagnosticOptionError('Option {0} is not optional'.format(self.name))
else:
return self.default_value
return option_value
class DiagnosticFloatOption(DiagnosticOption):
def parse(self, option_value):
class DiagnosticIntOption(DiagnosticOption):
"""
Class for parsing integer options
Parameters
----------
name: str
default_value: list, optional
min_limit: int, optional
If setted, any value below this will not be accepted
max_limit: int, optional
If setted, any value over this will not be accepted
"""
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
self.max_limit = max_limit
def parse(self, option_value):
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return value
class DiagnosticListIntOption(DiagnosticOption):
Class for parsing integer list options
Parameters
----------
name: str
default_value: list, optional
min_limit: int, optional
If setted, any value below this will not be accepted
max_limit: int, optional
If setted, any value over this will not be accepted
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticListIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [int(i) for i in option_value.split('-')]
for value in values:
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return values
class DiagnosticListFrequenciesOption(DiagnosticOption):
"""
Class for parsing an option which is a list of frequencies
Parameters
----------
name: str
default_value: list, optional
"""
def __init__(self, name, default_value=None):
super(DiagnosticListFrequenciesOption, self).__init__(name, default_value)
def parse(self, option_value):
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [Frequency(i) for i in option_value.split('-')]
return values
class DiagnosticVariableOption(DiagnosticOption):
"""
Class to parse variable options
Parameters
----------
var_manager: VariableManager
name: str, optional
default_value: str, optional
"""
def __init__(self, var_manager, name='variable', default_value=None):
super(DiagnosticVariableOption, self).__init__(name, default_value)
self.var_manager = var_manager
real_name = self.var_manager.get_variable(option_value, False)
if real_name is None:
return option_value
return real_name.short_name
class DiagnosticVariableListOption(DiagnosticOption):
"""
Class to parse variable list options
Parameters
----------
var_manager: VariableManager
name: str, optional
default_value: str, optional
"""
def __init__(self, var_manager, name, default_value=None):
super(DiagnosticVariableListOption, self).__init__(name, default_value)
self.var_manager = var_manager
def parse(self, option_value):
real_name = self.var_manager.get_variable(value, False)
if real_name is None:
var_names.append(value)
else:
var_names.append(real_name.short_name)
class DiagnosticDomainOption(DiagnosticOption):
"""
Class to parse domain options
Parameters
----------
name: str, optional
default_value: str, optional
"""
def __init__(self, name='domain', default_value=None):
super(DiagnosticDomainOption, self).__init__(name, default_value)
def parse(self, option_value):
return ModelingRealms.parse(self._check_default(option_value))
class DiagnosticFrequencyOption(DiagnosticOption):
"""
Class to parse frequency options
Parameters
----------
name: str, optional
default_value: Frequency,optional
"""
def __init__(self, name='frequency', default_value=None):
super(DiagnosticFrequencyOption, self).__init__(name, default_value)
def parse(self, option_value):
return Frequency.parse(self._check_default(option_value))
Javier Vegas-Regidor
committed
class DiagnosticBasinOption(DiagnosticOption):
Javier Vegas-Regidor
committed
def parse(self, option_value):
basin = Basins().parse(value)
if basin is None:
raise DiagnosticOptionError('Basin {0} not recognized'.format(value))
return basin
Javier Vegas-Regidor
committed
class DiagnosticComplexStrOption(DiagnosticOption):
"""
Class to parse complex string options
It replaces '&;' with ',' and '&.' with ' '
"""
def parse(self, option_value):
return self._check_default(option_value).replace('&;', ',').replace('&.', ' ')
class DiagnosticBoolOption(DiagnosticOption):
"""
Class to parse boolean options
"""
def parse(self, option_value):
if isinstance(option_value, bool):
return option_value
else:
return option_value.lower() in ('true', 't', 'yes')
class DiagnosticChoiceOption(DiagnosticOption):
"""
Class to parse choice option
Parameters
----------
name: str
choices: list of str
Valid options for the option
default_value: str, optional
If not None, it should ve a valid choice
ignore_case: bool, optional
If false, value must match case of the valid choice
"""
def __init__(self, name, choices, default_value=None, ignore_case=True):
super(DiagnosticChoiceOption, self).__init__(name, default_value)
self.choices = choices
self.ignore_case = ignore_case
# To check if it is valid
if default_value is not None:
self.parse(default_value)
if self.ignore_case:
for choice in self.choices:
return choice
else:
if option_value in self.choices:
return option_value
raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. '
'Options are {2}'.format(self.name, option_value, self.choices))
class DiagnosticOptionError(Exception):
"""Exception class for errors related to bad options for the diagnostics"""