diagnostic.py 14.5 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import datetime
from datafile import StorageStatus, LocalStatus
from earthdiagnostics.constants import Basins, Basin
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.variable_type import VariableType
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.variable import VariableManager
from publisher import Publisher
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class DiagnosticStatus(object):
    WAITING = 0
    READY = 1
    RUNNING = 2
    COMPLETED = 3
    FAILED = 4

class Diagnostic(Publisher):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    Base class for the diagnostics. Provides a common interface for them and also
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    has a mechanism that allows diagnostic retrieval by name.
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    :param data_manager: data manager that will be used to store and retrieve the necessary data
    :type data_manager: DataManager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    alias = None
    """
    Alias to call the diagnostic. Must be overridden at the derived clases
    """
    _diag_list = dict()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, data_manager):
        super(Diagnostic, self).__init__()
        self._generated_files = []
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_manager = data_manager
        self._status = DiagnosticStatus.WAITING
        self._requests = []
        self.consumed_time = datetime.timedelta()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.subjobs = []
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __repr__(self):
        return str(self)

    @property
    def status(self):
        return self._status

    @status.setter
    def status(self, value):
        if self._status == value:
            return
        self._status = value
        if self.status == DiagnosticStatus.RUNNING:
            for generated_file in self._generated_files:
                generated_file.local_status = LocalStatus.COMPUTING
        if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED):
            self._unsuscribe_requests()
        self.dispatch(self)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Register a new diagnostic using the given alias. It must be call using the derived class.
        :param cls: diagnostic class to register
        :type cls: Type[Diagnostic]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        if not issubclass(cls, Diagnostic):
            raise ValueError('Class {0} must be derived from Diagnostic'.format(cls))
        if cls.alias is None:
            raise ValueError('Diagnostic class {0} must have defined an alias'.format(cls))
        Diagnostic._diag_list[cls.alias] = cls
    # noinspection PyProtectedMember
    @staticmethod
    def get_diagnostic(name):
        """
        Return the class for a diagnostic given its name

        :param name: diagnostic alias
        :type name: str
        :return: the selected Diagnostic class, None if name can not be found
        :rtype: Diagnostic
        """
        if name in Diagnostic._diag_list.keys():
            return Diagnostic._diag_list[name]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Calculates the diagnostic and stores the output

        Must be implemented by derived classes
        """
        raise NotImplementedError("Class must override compute method")
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def request_data(self):
        """
        Calculates the diagnostic and stores the output

        Must be implemented by derived classes
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        raise NotImplementedError("Class must override request_data method")
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def declare_data_generated(self):
        """
        Calculates the diagnostic and stores the output

        Must be implemented by derived classes
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        raise NotImplementedError("Class must override declare_data_generated method")
    def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
                      vartype=VariableType.MEAN):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """

        :param domain:
        :type domain: ModelingRealm
        :param var:
        :param startdate:
        :param member:
        :param chunk:
        :param grid:
        :param region:
        :param box:
        :param frequency:
        :type frequency: Frequency
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: datafile object
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :rtype: earthdiagnostics.datafile.DataFile
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        if isinstance(region, Basin):
            region = region.name
        generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box,
                                                          diagnostic=self, vartype=vartype, frequency=frequency)
        self._generated_files.append(generated_chunk)
        return generated_chunk
    def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
                     vartype=VariableType.MEAN):
        """

        :param domain:
        :type domain: ModelingRealm
        :param var:
        :param startdate:
        :param member:
        :param grid:
        :param box:
        :param year:
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: datafile object
        :rtype: DataFile
        """
        generated_year = self.data_manager.declare_year(domain, var, startdate, member, year, grid, box,
                                                        diagnostic=self, vartype=vartype)
        self._generated_files.append(generated_year)
        return generated_year

    def generate_jobs(cls, diags, options):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Generate the instances of the diagnostics that will be run by the manager

        Must be implemented by derived classes.

        :param diags: diagnostics manager
        :type diags: Diags
        :param options: list of strings containing the options passed to the diagnostic
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :type options: list[str]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :return:
        """
        raise NotImplementedError("Class must override generate_jobs class method")

    @classmethod
    def process_options(cls, options, options_available):
        processed = dict()
        options = options[1:]
        if len(options) > len(options_available):
            raise DiagnosticOptionError('You have specified more options than available for diagnostic '
                                        '{0}'.format(cls.alias))
        for x in range(len(options_available)):
            option_definition = options_available[x]
            if len(options) <= x:
                option_value = ''
            else:
                option_value = options[x]
            processed[option_definition.name] = option_definition.parse(option_value)
        return processed

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __str__(self):
        """
        Must be implemented by derived classes
        :return:
        """
        return 'Developer must override base class __str__ method'
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                      to_modify=False, vartype=VariableType.MEAN):
        request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency, vartype)
        if to_modify:
            request.add_modifier(self)
        self._requests.append(request)
        request.subscribe(self, self._updated_request)
        return request

    def request_year(self, domain, var, startdate, member, year, grid=None, box=None, frequency=None, to_modify=False):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        request = self.data_manager.request_year(self, domain, var, startdate, member, year, grid, box, frequency)
        if to_modify:
            request.add_modifier(self)
        self._requests.append(request)
        request.subscribe(self, self._updated_request)
        return request

    def _updated_request(self, request):
        if self.status != DiagnosticStatus.WAITING:
            return
        from datafile import LocalStatus
        if request.local_status == LocalStatus.FAILED:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self.message = 'Required file {0} is not available'.format(request.remote_file)
            self.status = DiagnosticStatus.FAILED
            return

        if request.local_status == LocalStatus.READY:
            self.check_is_ready()

    def check_is_ready(self):
        if all([request.ready_to_run(self) for request in self._requests]):
            self.status = DiagnosticStatus.READY
    def _unsuscribe_requests(self):
        for request in self._requests:
            request.unsubscribe(self)
    def all_requests_in_storage(self):
        return not any(request.storage_status != StorageStatus.READY for request in self._requests)


class DiagnosticOption(object):

    def __init__(self, name, default_value=None):
        self.name = name
        self.default_value = default_value

    def parse(self, option_value):
        option_value = self.check_default(option_value)
        return option_value

    def check_default(self, option_value):
        if option_value == '':
            if self.default_value is None:
                raise DiagnosticOptionError('Option {0} is not optional'.format(self.name))
            else:
                return self.default_value
        return option_value


class DiagnosticFloatOption(DiagnosticOption):
    def parse(self, option_value):
        return float(self.check_default(option_value))


class DiagnosticIntOption(DiagnosticOption):

    def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
        super(DiagnosticIntOption, self).__init__(name, default_value)
        self.min_limit = min_limit
        self.max_limit = max_limit

    def parse(self, option_value):
        value = int(self.check_default(option_value))
        if self.min_limit is not None and value < self.min_limit:
            raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
        if self.max_limit is not None and value > self.max_limit:
            raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))
        return value


class DiagnosticListIntOption(DiagnosticOption):
    """
    :param name:
    :type name: str
    :param default_value:
    :type default_value: int|NoneType
    :param min_limit:
    :type min_limit: int|NoneType
    :param max_limit:
    :type max_limit: int|NoneType
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, name, default_value=None, min_limit=None, max_limit=None):
        super(DiagnosticListIntOption, self).__init__(name, default_value)
        self.min_limit = min_limit
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """ Lower limit """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.max_limit = max_limit
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """ Upper limit """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def parse(self, option_value):
        option_value = self.check_default(option_value)
        if isinstance(option_value, tuple) or isinstance(option_value, list):
            return option_value
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        values = [int(i) for i in option_value.split('-')]
        for value in values:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            # noinspection PyTypeChecker
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if self.min_limit is not None and value < self.min_limit:
                raise DiagnosticOptionError('Value {0} is lower than minimum ({1})'.format(value, self.min_limit))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            # noinspection PyTypeChecker
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if self.max_limit is not None and value > self.max_limit:
                raise DiagnosticOptionError('Value {0} is higher than maximum ({1})'.format(value, self.max_limit))

        return values
class DiagnosticListFrequenciesOption(DiagnosticOption):

    def __init__(self, name, default_value=None):
        super(DiagnosticListFrequenciesOption, self).__init__(name, default_value)

    def parse(self, option_value):
        option_value = self.check_default(option_value)
        if isinstance(option_value, tuple) or isinstance(option_value, list):
            return option_value
        values = [Frequency(i) for i in option_value.split('-')]
        return values


class DiagnosticVariableOption(DiagnosticOption):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, name='variable', default_value=None):
        super(DiagnosticVariableOption, self).__init__(name, default_value)

    def parse(self, option_value):
        option_value = self.check_default(option_value)
        real_name = VariableManager().get_variable(option_value, False)
        if real_name is None:
            return option_value
        return real_name.short_name
class DiagnosticVariableListOption(DiagnosticOption):
    def parse(self, option_value):
        option_value = self.check_default(option_value)
        var_names = []
        for value in option_value.split('-'):
            real_name = VariableManager().get_variable(value, False)
            if real_name is None:
                var_names.append(value)
            else:
                var_names.append(real_name.short_name)
class DiagnosticDomainOption(DiagnosticOption):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, name='domain', default_value=None):
        super(DiagnosticDomainOption, self).__init__(name, default_value)

    def parse(self, option_value):
        return ModelingRealms.parse(self.check_default(option_value))
class DiagnosticFrequencyOption(DiagnosticOption):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, name='frequency', default_value=None):
        super(DiagnosticFrequencyOption, self).__init__(name, default_value)

    def parse(self, option_value):
        return Frequency.parse(self.check_default(option_value))


class DiagnosticBasinOption(DiagnosticOption):
    def parse(self, option_value):
        return Basins().parse(self.check_default(option_value))
class DiagnosticComplexStrOption(DiagnosticOption):
    def parse(self, option_value):
        return self.check_default(option_value).replace('&;', ',').replace('&.', ' ')


class DiagnosticBoolOption(DiagnosticOption):
    def parse(self, option_value):
        option_value = self.check_default(option_value)
        if isinstance(option_value, bool):
            return option_value
        else:
            return option_value.lower() in ('true', 't', 'yes')


class DiagnosticChoiceOption(DiagnosticOption):
    def __init__(self, name, choices, default_value=None, ignore_case=True):
        super(DiagnosticChoiceOption, self).__init__(name, default_value)
        self.choices = choices
        self.ignore_case = ignore_case

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        # To check if it is valid
        if default_value is not None:
            self.parse(default_value)

    def parse(self, value):
        value = self.check_default(value)
        if self.ignore_case:
            value = value.lower()
            for choice in self.choices:
                if value == choice.lower():
                    return choice
        else:
            if value in self.choices:
                return value
        raise DiagnosticOptionError('Value {1} in option {0} is not a valid choice. '
                                    'Options are {2}'.format(self.name, value, self.choices))


class DiagnosticOptionError(Exception):
    pass