Newer
Older
from bscearth.utils.log import Log
from earthdiagnostics.constants import Basins, Basin
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.publisher import Publisher
from earthdiagnostics.variable_type import VariableType
Javier Vegas-Regidor
committed
WAITING = 0
READY = 1
RUNNING = 2
COMPLETED = 3
FAILED = 4
Base class for the diagnostics.
Provides a common interface for them and also has a mechanism that allows diagnostic retrieval by name.
:param data_manager: data manager that will be used to store and retrieve the necessary data
:type data_manager: DataManager
Javier Vegas-Regidor
committed
alias = None
""" Alias to call the diagnostic. Must be overridden at the derived clases"""
"""
Diagnostic's constructor
Parameters
----------
data_manager: DataManager
"""
self._status = DiagnosticStatus.WAITING
self._requests = []
self.consumed_time = datetime.timedelta()
def __ne__(self, other):
"""
Check if a diagnostic is different than other
Implementation is just the negation of the equal, that should be implemented by the derived classes
Parameters
----------
other: Diagnostic or NoneType
Diagnostic to be compared
Returns
-------
bool
"""
return not self == other
def can_skip_run(self):
"""
Check if a diagnostic calculation can be skipped
Looks if the data to be generated is already there and is not going to be modified
Returns
-------
bool
"""
for file_generated in self._generated_files:
if file_generated.storage_status != StorageStatus.READY:
return False
if file_generated.has_modifiers():
Log.warning('Can not skip diagnostics run when data is going to be modified: {0}'.format(self))
return False
""" Diagnostic's string represenatation. Defaults to str"""
return self._status
@status.setter
def status(self, value):
if self.status == DiagnosticStatus.RUNNING:
for generated_file in self._generated_files:
generated_file.local_status = LocalStatus.COMPUTING
if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED):
self._unsuscribe_requests()
def register(cls):
Register a new diagnostic using the given alias.
It must be called using the derived class.
Parameters
----------
cls: Type[Diagnostic]
if not issubclass(cls, Diagnostic):
raise ValueError('Class {0} must be derived from Diagnostic'.format(cls))
if cls.alias is None:
raise ValueError('Diagnostic class {0} must have defined an alias'.format(cls))
Diagnostic._diag_list[cls.alias] = cls
# noinspection PyProtectedMember
@staticmethod
def get_diagnostic(name):
"""
Return the class for a diagnostic given its name
Parameters
----------
name: str
Returns
-------
Type[Diagnostic] or None
if name in Diagnostic._diag_list.keys():
return Diagnostic._diag_list[name]
Javier Vegas-Regidor
committed
def compute(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override compute method")
def request_data(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override request_data method")
def declare_data_generated(self):
"""
Calculates the diagnostic and stores the output
Must be implemented by derived classes
"""
raise NotImplementedError("Class must override declare_data_generated method")
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN):
Declare a chunk that is going to be generated by the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str
region: Basin
box: Box
frequency: Frequency
vartype: VariableType
Returns
-------
DataFile
generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box,
diagnostic=self, vartype=vartype, frequency=frequency)
if region is not None:
generated_chunk.add_modifier(self)
self._generated_files.append(generated_chunk)
return generated_chunk
def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
vartype=VariableType.MEAN):
"""
Declare a year that is going to be generated by the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: 1
year: 1
grid: str
box: Box
vartype: VariableType
Returns
-------
DataFile
"""
generated_year = self.data_manager.declare_year(domain, var, startdate, member, year, grid, box,
diagnostic=self, vartype=vartype)
self._generated_files.append(generated_year)
return generated_year
def generate_jobs(cls, diags, options):
"""
Generate the instances of the diagnostics that will be run by the manager
Must be implemented by derived classes.
Parameters
----------
diags: Diags
options: list of str
Returns
-------
list of Diagnostic
"""
raise NotImplementedError("Class must override generate_jobs class method")
Javier Vegas-Regidor
committed
@classmethod
def process_options(cls, options, options_available):
"""
Process the configuration of a diagnostic
Parameters
----------
options: iterable of str
options_available: iterable of DiagnosticOptiion
Returns
-------
dict of str: str
Dictionary of names and values for the options
Raises
------
DiagnosticOptionError:
If there are more options that admitted for the diagnostic
"""
processed = dict()
options = options[1:]
Javier Vegas-Regidor
committed
if len(options) > len(options_available):
raise DiagnosticOptionError('You have specified more options than available for diagnostic '
'{0}'.format(cls.alias))
for x, option_definition in enumerate(options_available):
if len(options) <= x:
option_value = ''
else:
option_value = options[x]
processed[option_definition.name] = option_definition.parse(option_value)
return processed
Must be implemented by derived classes
"""
return 'Developer must override base class __str__ method'
Add a subjob
Add a diagnostic that must be run before the current one
Parameters
----------
subjob: Diagnostic
"""
self.subjobs.append(subjob)
subjob.subscribe(self, self._subjob_status_changed)
def _subjob_status_changed(self, job):
self.check_is_ready()
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
"""
Request one chunk of data required by the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str
box: Box
frequency: Frequency
to_modify: bool
Flag that must be active if the diagnostic is going to generate a modified version of this data. In this
case this data must not be declared as an output of the diagnostic
vartype: VariableType
Returns
-------
DataFile
See Also
--------
request_year
declare_chunk
declare_year
"""
request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency, vartype)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def request_year(self, domain, var, startdate, member, year, grid=None, box=None, frequency=None, to_modify=False):
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
"""
Requests one year of data that is required for the diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str
box: Box
frequency: Frequency
to_modify: str
Returns
-------
DataFile
See Also
--------
request_chunk
declare_chunk
declare_year
"""
request = self.data_manager.request_year(self, domain, var, startdate, member, year, grid, box, frequency)
if to_modify:
request.add_modifier(self)
self._requests.append(request)
request.subscribe(self, self._updated_request)
return request
def _updated_request(self, request):
if self.status != DiagnosticStatus.WAITING:
return
if request.local_status == LocalStatus.FAILED:
self.message = 'Required file {0} is not available'.format(request.remote_file)
self.status = DiagnosticStatus.FAILED
return
if request.local_status == LocalStatus.READY:
self.check_is_ready()
def check_is_ready(self):
""" Check if a diagnostic is ready to run and change its status accordingly """
if all([request.ready_to_run(self) for request in self._requests]) and\
all([subjob.status == DiagnosticStatus.COMPLETED for subjob in self.subjobs]):
self.status = DiagnosticStatus.READY
def _unsuscribe_requests(self):
for request in self._requests:
request.unsubscribe(self)
def all_requests_in_storage(self):
"""
Check if all the data requested is in the local scratch
Returns
-------
bool
"""
return self.pending_requests() == 0
def pending_requests(self):
"""
Get the number of data request pending to be fulfilled
Returns
-------
int
"""
return len([request.storage_status != StorageStatus.READY or request.local_status != LocalStatus.READY
for request in self._requests])
class DiagnosticOption(object):
"""Class to manage string options for the diagnostic"""
def __init__(self, name, default_value=None):
"""
Parameters
----------
name: str
default_value: object, optional
"""
self.name = name
self.default_value = default_value
def parse(self, option_value):
"""
Get the final value for the option
If option_value is empty, return default_value
Parameters
----------
option_value: str
Returns
-------
str
Raises
------
DiagnosticOptionError:
If the option is empty and default_value is False
"""
option_value = self._check_default(option_value)
return option_value
if option_value == '':
if self.default_value is None:
raise DiagnosticOptionError('Option {0} is not optional'.format(self.name))
else:
return self.default_value
return option_value
class DiagnosticFloatOption(DiagnosticOption):
def parse(self, option_value):
class DiagnosticIntOption(DiagnosticOption):
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
self.max_limit = max_limit
def parse(self, option_value):
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return value
class DiagnosticListIntOption(DiagnosticOption):
"""
:param name:
:type name: str
:param default_value:
:type default_value: int|NoneType
:param min_limit:
:type min_limit: int|NoneType
:param max_limit:
:type max_limit: int|NoneType
"""
def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
super(DiagnosticListIntOption, self).__init__(name, default_value)
self.min_limit = min_limit
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [int(i) for i in option_value.split('-')]
for value in values:
if self.min_limit is not None and value < self.min_limit:
raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
if self.max_limit is not None and value > self.max_limit:
raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
return values
class DiagnosticListFrequenciesOption(DiagnosticOption):
def __init__(self, name, default_value=None):
super(DiagnosticListFrequenciesOption, self).__init__(name, default_value)
def parse(self, option_value):
if isinstance(option_value, tuple) or isinstance(option_value, list):
return option_value
values = [Frequency(i) for i in option_value.split('-')]
return values
class DiagnosticVariableOption(DiagnosticOption):
def __init__(self, var_manager, name='variable', default_value=None):
super(DiagnosticVariableOption, self).__init__(name, default_value)
self.var_manager = var_manager
real_name = self.var_manager.get_variable(option_value, False)
if real_name is None:
return option_value
return real_name.short_name
class DiagnosticVariableListOption(DiagnosticOption):
def __init__(self, var_manager, name, default_value=None):
super(DiagnosticVariableListOption, self).__init__(name, default_value)
self.var_manager = var_manager
def parse(self, option_value):
real_name = self.var_manager.get_variable(value, False)
if real_name is None:
var_names.append(value)
else:
var_names.append(real_name.short_name)
class DiagnosticDomainOption(DiagnosticOption):
def __init__(self, name='domain', default_value=None):
super(DiagnosticDomainOption, self).__init__(name, default_value)
def parse(self, option_value):
return ModelingRealms.parse(self._check_default(option_value))
class DiagnosticFrequencyOption(DiagnosticOption):
def __init__(self, name='frequency', default_value=None):
super(DiagnosticFrequencyOption, self).__init__(name, default_value)
def parse(self, option_value):
return Frequency.parse(self._check_default(option_value))
Javier Vegas-Regidor
committed
class DiagnosticBasinOption(DiagnosticOption):
def parse(self, option_value):
basin = Basins().parse(value)
if basin is None:
raise DiagnosticOptionError('Basin {0} not recognized'.format(value))
return basin
Javier Vegas-Regidor
committed
class DiagnosticComplexStrOption(DiagnosticOption):
def parse(self, option_value):
return self._check_default(option_value).replace('&;', ',').replace('&.', ' ')
class DiagnosticBoolOption(DiagnosticOption):
def parse(self, option_value):
if isinstance(option_value, bool):
return option_value
else:
return option_value.lower() in ('true', 't', 'yes')
class DiagnosticChoiceOption(DiagnosticOption):
def __init__(self, name, choices, default_value=None, ignore_case=True):
super(DiagnosticChoiceOption, self).__init__(name, default_value)
self.choices = choices
self.ignore_case = ignore_case
# To check if it is valid
if default_value is not None:
self.parse(default_value)
if self.ignore_case:
for choice in self.choices:
return choice
else:
if option_value in self.choices:
return option_value
raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. '
'Options are {2}'.format(self.name, option_value, self.choices))
class DiagnosticOptionError(Exception):
pass