# coding=utf-8 import datetime from datafile import StorageStatus, LocalStatus from earthdiagnostics.constants import Basins, Basin from earthdiagnostics.frequency import Frequency from earthdiagnostics.variable_type import VariableType from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.variable import VariableManager from publisher import Publisher class DiagnosticStatus(object): WAITING = 0 READY = 1 RUNNING = 2 COMPLETED = 3 FAILED = 4 class Diagnostic(Publisher): """ Base class for the diagnostics. Provides a common interface for them and also has a mechanism that allows diagnostic retrieval by name. :param data_manager: data manager that will be used to store and retrieve the necessary data :type data_manager: DataManager """ alias = None """ Alias to call the diagnostic. Must be overridden at the derived clases """ _diag_list = dict() def __init__(self, data_manager): super(Diagnostic, self).__init__() self._generated_files = [] self.data_manager = data_manager self._status = DiagnosticStatus.WAITING self._requests = [] self.consumed_time = datetime.timedelta() self.subjobs = [] def __repr__(self): return str(self) @property def status(self): return self._status @status.setter def status(self, value): if self._status == value: return self._status = value if self.status == DiagnosticStatus.RUNNING: for generated_file in self._generated_files: generated_file.local_status = LocalStatus.COMPUTING if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED): self._unsuscribe_requests() self.dispatch(self) @staticmethod def register(cls): """ Register a new diagnostic using the given alias. It must be call using the derived class. :param cls: diagnostic class to register :type cls: Type[Diagnostic] """ if not issubclass(cls, Diagnostic): raise ValueError('Class {0} must be derived from Diagnostic'.format(cls)) if cls.alias is None: raise ValueError('Diagnostic class {0} must have defined an alias'.format(cls)) Diagnostic._diag_list[cls.alias] = cls # noinspection PyProtectedMember @staticmethod def get_diagnostic(name): """ Return the class for a diagnostic given its name :param name: diagnostic alias :type name: str :return: the selected Diagnostic class, None if name can not be found :rtype: Diagnostic """ if name in Diagnostic._diag_list.keys(): return Diagnostic._diag_list[name] return None def compute(self): """ Calculates the diagnostic and stores the output Must be implemented by derived classes """ raise NotImplementedError("Class must override compute method") def request_data(self): """ Calculates the diagnostic and stores the output Must be implemented by derived classes """ raise NotImplementedError("Class must override request_data method") def declare_data_generated(self): """ Calculates the diagnostic and stores the output Must be implemented by derived classes """ raise NotImplementedError("Class must override declare_data_generated method") def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None, vartype=VariableType.MEAN): """ :param domain: :type domain: ModelingRealm :param var: :param startdate: :param member: :param chunk: :param grid: :param region: :param box: :param frequency: :type frequency: Frequency :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: datafile object :rtype: earthdiagnostics.datafile.DataFile """ if isinstance(region, Basin): region = region.name generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box, diagnostic=self, vartype=vartype, frequency=frequency) self._generated_files.append(generated_chunk) return generated_chunk def declare_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN): """ :param domain: :type domain: ModelingRealm :param var: :param startdate: :param member: :param grid: :param box: :param year: :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: datafile object :rtype: DataFile """ generated_year = self.data_manager.declare_year(domain, var, startdate, member, year, grid, box, diagnostic=self, vartype=vartype) self._generated_files.append(generated_year) return generated_year @classmethod def generate_jobs(cls, diags, options): """ Generate the instances of the diagnostics that will be run by the manager Must be implemented by derived classes. :param diags: diagnostics manager :type diags: Diags :param options: list of strings containing the options passed to the diagnostic :type options: list[str] :return: """ raise NotImplementedError("Class must override generate_jobs class method") @classmethod def process_options(cls, options, options_available): processed = dict() options = options[1:] if len(options) > len(options_available): raise DiagnosticOptionError('You have specified more options than available for diagnostic ' '{0}'.format(cls.alias)) for x in range(len(options_available)): option_definition = options_available[x] if len(options) <= x: option_value = '' else: option_value = options[x] processed[option_definition.name] = option_definition.parse(option_value) return processed def __str__(self): """ Must be implemented by derived classes :return: """ return 'Developer must override base class __str__ method' def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, to_modify=False, vartype=VariableType.MEAN): request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency, vartype) if to_modify: request.add_modifier(self) self._requests.append(request) request.subscribe(self, self._updated_request) return request def request_year(self, domain, var, startdate, member, year, grid=None, box=None, frequency=None, to_modify=False): request = self.data_manager.request_year(self, domain, var, startdate, member, year, grid, box, frequency) if to_modify: request.add_modifier(self) self._requests.append(request) request.subscribe(self, self._updated_request) return request def _updated_request(self, request): if self.status != DiagnosticStatus.WAITING: return from datafile import LocalStatus if request.local_status == LocalStatus.FAILED: self.message = 'Required file {0} is not available'.format(request.remote_file) self.status = DiagnosticStatus.FAILED return if request.local_status == LocalStatus.READY: self.check_is_ready() def check_is_ready(self): if all([request.ready_to_run(self) for request in self._requests]): self.status = DiagnosticStatus.READY def _unsuscribe_requests(self): for request in self._requests: request.unsubscribe(self) def all_requests_in_storage(self): return not any(request.storage_status != StorageStatus.READY for request in self._requests) class DiagnosticOption(object): def __init__(self, name, default_value=None): self.name = name self.default_value = default_value def parse(self, option_value): option_value = self.check_default(option_value) return option_value def check_default(self, option_value): if option_value == '': if self.default_value is None: raise DiagnosticOptionError('Option {0} is not optional'.format(self.name)) else: return self.default_value return option_value class DiagnosticFloatOption(DiagnosticOption): def parse(self, option_value): return float(self.check_default(option_value)) class DiagnosticIntOption(DiagnosticOption): def __init__(self, name, default_value=None, min_limit=None, max_limit=None): super(DiagnosticIntOption, self).__init__(name, default_value) self.min_limit = min_limit self.max_limit = max_limit def parse(self, option_value): value = int(self.check_default(option_value)) if self.min_limit is not None and value < self.min_limit: raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit)) if self.max_limit is not None and value > self.max_limit: raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit)) return value class DiagnosticListIntOption(DiagnosticOption): """ :param name: :type name: str :param default_value: :type default_value: int|NoneType :param min_limit: :type min_limit: int|NoneType :param max_limit: :type max_limit: int|NoneType """ def __init__(self, name, default_value=None, min_limit=None, max_limit=None): super(DiagnosticListIntOption, self).__init__(name, default_value) self.min_limit = min_limit """ Lower limit """ self.max_limit = max_limit """ Upper limit """ def parse(self, option_value): option_value = self.check_default(option_value) if isinstance(option_value, tuple) or isinstance(option_value, list): return option_value values = [int(i) for i in option_value.split('-')] for value in values: # noinspection PyTypeChecker if self.min_limit is not None and value < self.min_limit: raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit)) # noinspection PyTypeChecker if self.max_limit is not None and value > self.max_limit: raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit)) return values class DiagnosticListFrequenciesOption(DiagnosticOption): def __init__(self, name, default_value=None): super(DiagnosticListFrequenciesOption, self).__init__(name, default_value) def parse(self, option_value): option_value = self.check_default(option_value) if isinstance(option_value, tuple) or isinstance(option_value, list): return option_value values = [Frequency(i) for i in option_value.split('-')] return values class DiagnosticVariableOption(DiagnosticOption): def __init__(self, name='variable', default_value=None): super(DiagnosticVariableOption, self).__init__(name, default_value) def parse(self, option_value): option_value = self.check_default(option_value) real_name = VariableManager().get_variable(option_value, False) if real_name is None: return option_value return real_name.short_name class DiagnosticVariableListOption(DiagnosticOption): def parse(self, option_value): option_value = self.check_default(option_value) var_names = [] for value in option_value.split('-'): real_name = VariableManager().get_variable(value, False) if real_name is None: var_names.append(value) else: var_names.append(real_name.short_name) return var_names class DiagnosticDomainOption(DiagnosticOption): def __init__(self, name='domain', default_value=None): super(DiagnosticDomainOption, self).__init__(name, default_value) def parse(self, option_value): return ModelingRealms.parse(self.check_default(option_value)) class DiagnosticFrequencyOption(DiagnosticOption): def __init__(self, name='frequency', default_value=None): super(DiagnosticFrequencyOption, self).__init__(name, default_value) def parse(self, option_value): return Frequency.parse(self.check_default(option_value)) class DiagnosticBasinOption(DiagnosticOption): def parse(self, option_value): return Basins().parse(self.check_default(option_value)) class DiagnosticComplexStrOption(DiagnosticOption): def parse(self, option_value): return self.check_default(option_value).replace('&;', ',').replace('&.', ' ') class DiagnosticBoolOption(DiagnosticOption): def parse(self, option_value): option_value = self.check_default(option_value) if isinstance(option_value, bool): return option_value else: return option_value.lower() in ('true', 't', 'yes') class DiagnosticChoiceOption(DiagnosticOption): def __init__(self, name, choices, default_value=None, ignore_case=True): super(DiagnosticChoiceOption, self).__init__(name, default_value) self.choices = choices self.ignore_case = ignore_case # To check if it is valid if default_value is not None: self.parse(default_value) def parse(self, value): value = self.check_default(value) if self.ignore_case: value = value.lower() for choice in self.choices: if value == choice.lower(): return choice else: if value in self.choices: return value raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. ' 'Options are {2}'.format(self.name, value, self.choices)) class DiagnosticOptionError(Exception): pass