# coding=utf-8 import datetime from bscearth.utils.log import Log from earthdiagnostics.constants import Basins, Basin from earthdiagnostics.datafile import StorageStatus, LocalStatus from earthdiagnostics.frequency import Frequency from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.publisher import Publisher from earthdiagnostics.variable_type import VariableType class DiagnosticStatus(object): """Enumeration of diagnostic status""" WAITING = 0 READY = 1 RUNNING = 2 COMPLETED = 3 FAILED = 4 class Diagnostic(Publisher): """ Base class for the diagnostics. Provides a common interface for them and also has a mechanism that allows diagnostic retrieval by name. :param data_manager: data manager that will be used to store and retrieve the necessary data :type data_manager: DataManager """ alias = None """ Alias to call the diagnostic. Must be overridden at the derived clases""" _diag_list = dict() def __init__(self, data_manager): """ Diagnostic's constructor Parameters ---------- data_manager: DataManager """ super(Diagnostic, self).__init__() self._generated_files = [] self.data_manager = data_manager self._status = DiagnosticStatus.WAITING self._requests = [] self.consumed_time = datetime.timedelta() self.subjobs = [] self.message = None def __ne__(self, other): """ Check if a diagnostic is different than other Implementation is just the negation of the equal, that should be implemented by the derived classes Parameters ---------- other: Diagnostic or NoneType Diagnostic to be compared Returns ------- bool """ return not self == other def can_skip_run(self): """ Check if a diagnostic calculation can be skipped Looks if the data to be generated is already there and is not going to be modified Returns ------- bool """ for file_generated in self._generated_files: if file_generated.storage_status != StorageStatus.READY: return False if file_generated.has_modifiers(): Log.warning('Can not skip diagnostics run when data is going to be modified: {0}'.format(self)) return False return True def __repr__(self): """ Diagnostic's string represenatation. Defaults to str""" return str(self) @property def status(self): """ Diagnostic's current status""" return self._status @status.setter def status(self, value): if self._status == value: return self._status = value if self.status == DiagnosticStatus.RUNNING: for generated_file in self._generated_files: generated_file.local_status = LocalStatus.COMPUTING if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED): self._unsuscribe_requests() self.dispatch(self) @staticmethod def register(cls): """ Register a new diagnostic using the given alias. It must be called using the derived class. Parameters ---------- cls: Type[Diagnostic] """ if not issubclass(cls, Diagnostic): raise ValueError('Class {0} must be derived from Diagnostic'.format(cls)) if cls.alias is None: raise ValueError('Diagnostic class {0} must have defined an alias'.format(cls)) Diagnostic._diag_list[cls.alias] = cls # noinspection PyProtectedMember @staticmethod def get_diagnostic(name): """ Return the class for a diagnostic given its name Parameters ---------- name: str Returns ------- Type[Diagnostic] or None """ if name in Diagnostic._diag_list.keys(): return Diagnostic._diag_list[name] return None def compute(self): """ Calculates the diagnostic and stores the output Must be implemented by derived classes """ raise NotImplementedError("Class must override compute method") def request_data(self): """ Calculates the diagnostic and stores the output Must be implemented by derived classes """ raise NotImplementedError("Class must override request_data method") def declare_data_generated(self): """ Calculates the diagnostic and stores the output Must be implemented by derived classes """ raise NotImplementedError("Class must override declare_data_generated method") def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None, vartype=VariableType.MEAN): """ Declare a chunk that is going to be generated by the diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str member: int chunk: int grid: str region: Basin box: Box frequency: Frequency vartype: VariableType Returns ------- DataFile """ if isinstance(region, Basin): region = region.name generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box, diagnostic=self, vartype=vartype, frequency=frequency) if region is not None: generated_chunk.add_modifier(self) self._generated_files.append(generated_chunk) return generated_chunk def declare_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN): """ Declare a year that is going to be generated by the diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str member: 1 year: 1 grid: str box: Box vartype: VariableType Returns ------- DataFile """ generated_year = self.data_manager.declare_year(domain, var, startdate, member, year, grid, box, diagnostic=self, vartype=vartype) self._generated_files.append(generated_year) return generated_year @classmethod def generate_jobs(cls, diags, options): """ Generate the instances of the diagnostics that will be run by the manager Must be implemented by derived classes. Parameters ---------- diags: Diags options: list of str Returns ------- list of Diagnostic """ raise NotImplementedError("Class must override generate_jobs class method") @classmethod def process_options(cls, options, options_available): """ Process the configuration of a diagnostic Parameters ---------- options: iterable of str options_available: iterable of DiagnosticOptiion Returns ------- dict of str: str Dictionary of names and values for the options Raises ------ DiagnosticOptionError: If there are more options that admitted for the diagnostic """ processed = dict() options = options[1:] if len(options) > len(options_available): raise DiagnosticOptionError('You have specified more options than available for diagnostic ' '{0}'.format(cls.alias)) for x, option_definition in enumerate(options_available): if len(options) <= x: option_value = '' else: option_value = options[x] processed[option_definition.name] = option_definition.parse(option_value) return processed def __str__(self): """ String represenation of the diagnostic Must be implemented by derived classes """ return 'Developer must override base class __str__ method' def add_subjob(self, subjob): """ Add a subjob Add a diagnostic that must be run before the current one Parameters ---------- subjob: Diagnostic """ self.subjobs.append(subjob) subjob.subscribe(self, self._subjob_status_changed) def _subjob_status_changed(self, job): self.check_is_ready() def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, to_modify=False, vartype=VariableType.MEAN): """ Request one chunk of data required by the diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str member: int chunk: int grid: str box: Box frequency: Frequency to_modify: bool Flag that must be active if the diagnostic is going to generate a modified version of this data. In this case this data must not be declared as an output of the diagnostic vartype: VariableType Returns ------- DataFile See Also -------- request_year declare_chunk declare_year """ request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency, vartype) if to_modify: request.add_modifier(self) self._requests.append(request) request.subscribe(self, self._updated_request) return request def request_year(self, domain, var, startdate, member, year, grid=None, box=None, frequency=None, to_modify=False): """ Requests one year of data that is required for the diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str member: int year: int grid: str box: Box frequency: Frequency to_modify: str Returns ------- DataFile See Also -------- request_chunk declare_chunk declare_year """ request = self.data_manager.request_year(self, domain, var, startdate, member, year, grid, box, frequency) if to_modify: request.add_modifier(self) self._requests.append(request) request.subscribe(self, self._updated_request) return request def _updated_request(self, request): if self.status != DiagnosticStatus.WAITING: return if request.local_status == LocalStatus.FAILED: self.message = 'Required file {0} is not available'.format(request.remote_file) self.status = DiagnosticStatus.FAILED return if request.local_status == LocalStatus.READY: self.check_is_ready() def check_is_ready(self): """ Check if a diagnostic is ready to run and change its status accordingly """ if all([request.ready_to_run(self) for request in self._requests]) and\ all([subjob.status == DiagnosticStatus.COMPLETED for subjob in self.subjobs]): self.status = DiagnosticStatus.READY def _unsuscribe_requests(self): for request in self._requests: request.unsubscribe(self) def all_requests_in_storage(self): """ Check if all the data requested is in the local scratch Returns ------- bool """ return self.pending_requests() == 0 def pending_requests(self): """ Get the number of data request pending to be fulfilled Returns ------- int """ return len([request.storage_status != StorageStatus.READY or request.local_status != LocalStatus.READY for request in self._requests]) class DiagnosticOption(object): """Class to manage string options for the diagnostic""" def __init__(self, name, default_value=None): """ Parameters ---------- name: str default_value: object, optional """ self.name = name self.default_value = default_value def parse(self, option_value): """ Get the final value for the option If option_value is empty, return default_value Parameters ---------- option_value: str Returns ------- str Raises ------ DiagnosticOptionError: If the option is empty and default_value is False """ option_value = self._check_default(option_value) return option_value def _check_default(self, option_value): if option_value == '': if self.default_value is None: raise DiagnosticOptionError('Option {0} is not optional'.format(self.name)) else: return self.default_value return option_value class DiagnosticFloatOption(DiagnosticOption): """ Class for parsing float options """ def parse(self, option_value): return float(self._check_default(option_value)) class DiagnosticIntOption(DiagnosticOption): """ Class for parsing integer options """ def __init__(self, name, default_value=None, min_limit=None, max_limit=None): super(DiagnosticIntOption, self).__init__(name, default_value) self.min_limit = min_limit self.max_limit = max_limit def parse(self, option_value): value = int(self._check_default(option_value)) if self.min_limit is not None and value < self.min_limit: raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit)) if self.max_limit is not None and value > self.max_limit: raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit)) return value class DiagnosticListIntOption(DiagnosticOption): """ :param name: :type name: str :param default_value: :type default_value: int|NoneType :param min_limit: :type min_limit: int|NoneType :param max_limit: :type max_limit: int|NoneType """ def __init__(self, name, default_value=None, min_limit=None, max_limit=None): super(DiagnosticListIntOption, self).__init__(name, default_value) self.min_limit = min_limit """ Lower limit """ self.max_limit = max_limit """ Upper limit """ def parse(self, option_value): option_value = self._check_default(option_value) if isinstance(option_value, tuple) or isinstance(option_value, list): return option_value values = [int(i) for i in option_value.split('-')] for value in values: # noinspection PyTypeChecker if self.min_limit is not None and value < self.min_limit: raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit)) # noinspection PyTypeChecker if self.max_limit is not None and value > self.max_limit: raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit)) return values class DiagnosticListFrequenciesOption(DiagnosticOption): def __init__(self, name, default_value=None): super(DiagnosticListFrequenciesOption, self).__init__(name, default_value) def parse(self, option_value): option_value = self._check_default(option_value) if isinstance(option_value, tuple) or isinstance(option_value, list): return option_value values = [Frequency(i) for i in option_value.split('-')] return values class DiagnosticVariableOption(DiagnosticOption): def __init__(self, var_manager, name='variable', default_value=None): super(DiagnosticVariableOption, self).__init__(name, default_value) self.var_manager = var_manager def parse(self, option_value): option_value = self._check_default(option_value) real_name = self.var_manager.get_variable(option_value, False) if real_name is None: return option_value return real_name.short_name class DiagnosticVariableListOption(DiagnosticOption): def __init__(self, var_manager, name, default_value=None): super(DiagnosticVariableListOption, self).__init__(name, default_value) self.var_manager = var_manager def parse(self, option_value): option_value = self._check_default(option_value) var_names = [] for value in option_value.split(':'): real_name = self.var_manager.get_variable(value, False) if real_name is None: var_names.append(value) else: var_names.append(real_name.short_name) return var_names class DiagnosticDomainOption(DiagnosticOption): def __init__(self, name='domain', default_value=None): super(DiagnosticDomainOption, self).__init__(name, default_value) def parse(self, option_value): return ModelingRealms.parse(self._check_default(option_value)) class DiagnosticFrequencyOption(DiagnosticOption): def __init__(self, name='frequency', default_value=None): super(DiagnosticFrequencyOption, self).__init__(name, default_value) def parse(self, option_value): return Frequency.parse(self._check_default(option_value)) class DiagnosticBasinOption(DiagnosticOption): def parse(self, option_value): value = self._check_default(option_value) basin = Basins().parse(value) if basin is None: raise DiagnosticOptionError('Basin {0} not recognized'.format(value)) return basin class DiagnosticComplexStrOption(DiagnosticOption): def parse(self, option_value): return self._check_default(option_value).replace('&;', ',').replace('&.', ' ') class DiagnosticBoolOption(DiagnosticOption): def parse(self, option_value): option_value = self._check_default(option_value) if isinstance(option_value, bool): return option_value else: return option_value.lower() in ('true', 't', 'yes') class DiagnosticChoiceOption(DiagnosticOption): def __init__(self, name, choices, default_value=None, ignore_case=True): super(DiagnosticChoiceOption, self).__init__(name, default_value) self.choices = choices self.ignore_case = ignore_case # To check if it is valid if default_value is not None: self.parse(default_value) def parse(self, option_value): option_value = self._check_default(option_value) if self.ignore_case: option_value = option_value.lower() for choice in self.choices: if option_value == choice.lower(): return choice else: if option_value in self.choices: return option_value raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. ' 'Options are {2}'.format(self.name, option_value, self.choices)) class DiagnosticOptionError(Exception): pass