config.py 21.9 KB
Newer Older
"""Classes to manage Earth Diagnostics configuration"""
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
import six
from bscearth.utils.config_parser import ConfigParser
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str, add_years, add_months, add_days
from bscearth.utils.log import Log
from earthdiagnostics import cdftools
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealm
from earthdiagnostics.variable import VariableManager
class ConfigException(Exception):
    """Exception raised when there is a problem with the configuration"""
class Config(object):
    """
    Class to read and manage the configuration

    Parameters
    ----------
    path: str
        Path to the configuration file
        parser = ConfigParser()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS', 'OBSRECON'),
                                                     'CMOR')
        "Scratch folder path"
        self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR')
        self.use_ramdisk = parser.get_bool_option('DIAGNOSTICS', 'USE_RAMDISK', False)
        "If True, the scratch dir is created as a ram disk"
        self.auto_clean = parser.get_bool_option('DIAGNOSTICS', 'AUTO_CLEAN', True)
        "If True, the scratch dir is removed after finishing"
        if not self.auto_clean and self.use_ramdisk:
            Log.warning('RAM disk scratch dir is always automatically cleaned.')
            self.auto_clean = True

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks')
        "Common scratch folder for masks"
        self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR')
        "Root data folder path"
        self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp')
        "Data type (experiment, observation or reconstruction)"
        self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES')
        "Mask and meshes folder path"
        self.mesh_mask = parser.get_path_option('DIAGNOSTICS', 'MESH_MASK', '')
        "Custom mesh mask file to use"
        self.new_mask_glo = parser.get_path_option('DIAGNOSTICS', 'NEW_MASK_GLO', '')
        "Custom new mask glo file to use"
        self.mask_regions = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS', '')
        "Custom mask regions file to use"
        self.mask_regions_3d = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS_3D', '')
        "Custom mask regions 3D file to use"

        self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION',
                                                        ('specs', 'primavera', 'cmip6', 'preface', 'meteofrance'),
                                                        'specs',
                                                        ignore_case=True)

        if self.data_convention in ('primavera', 'cmip6'):
            self.scratch_masks = os.path.join(self.scratch_masks, self.data_convention)
        cdftools.data_convention = self.data_convention
        namelist_file = os.path.join(os.path.dirname(__file__), 'CDFTOOLS_{0}.namlist'.format(self.data_convention))
        Log.debug(namelist_file)
        if os.path.isfile(namelist_file):
            Log.debug('Setting namelist {0}', namelist_file)
            os.environ['NAM_CDF_NAMES'] = namelist_file

        self.var_manager = VariableManager()
        self.var_manager.load_variables(self.data_convention)
        self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS')
        self.skip_diags_done = parser.get_bool_option('DIAGNOSTICS', 'SKIP_DIAGS_DONE', True)
        self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY'))
        "Default data frequency to be used by the diagnostics"
        self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '')
        "Path to CDFTOOLS executables"
        self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0)
        "Maximum number of cores to use"
        self.parallel_downloads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_DOWNLOADS', 1)
        "Maximum number of simultaneous downloads"
        self.parallel_uploads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_UPLOADS', 1)
        "Maximum number of simultaneous uploads"
        self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False)
        "If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence"
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.experiment = ExperimentConfig()
        self.experiment.parse_ini(parser)
        """
        Configuration related to the experiment

        :rtype: ExperimentConfig
        """
        # Read aliases
        self._aliases = dict()
        if parser.has_section('ALIAS'):
            for option in parser.options('ALIAS'):
                self._aliases[option.lower()] = parser.get_list_option('ALIAS', option)
        commands = self._diags.split()
            command = command.strip()
            if command.startswith('#'):
                break
            if command.lower() in self._aliases:
                added_commands = self._aliases[command.lower()]
                Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands))
                for add_command in added_commands:
                    self._real_commands.append(add_command)
            else:
                self._real_commands.append(command)
        Log.debug('Command list ready ')

        self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid)

        self.cmor = CMORConfig(parser, self.var_manager)
        self.thredds = THREDDSConfig(parser)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.report = ReportConfig(parser)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Return the list of commands after replacing the alias

        Returns
        -------
        iterable of str
    """
    Configuration for the cmorization processes

    Parameters
    ----------
    parser: ConfigParser
    var_manager: VariableManager
    """
    def __init__(self, parser, var_manager):
        self.force = parser.get_bool_option('CMOR', 'FORCE', False)
        self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False)
        self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '')
        self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True)
        self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True)
        self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True)
        self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS')
        self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled')
        self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled')
        self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled')
        self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1')
        self.initialization_number = parser.get_int_option('CMOR', 'INITIALIZATION_NUMBER', 1)
        self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled')
        self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1')
        self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled')
        self.version = parser.get_option('CMOR', 'VERSION', '')
        self.default_ocean_grid = parser.get_option('CMOR', 'DEFAULT_OCEAN_GRID', 'gn')
        self.default_atmos_grid = parser.get_option('CMOR', 'DEFAULT_ATMOS_GRID', 'gr')
        self.activity = parser.get_option('CMOR', 'ACTIVITY', 'CMIP')
        self.min_cmorized_vars = parser.get_int_option('CMOR', 'MIN_CMORIZED_VARS', 10)
        vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '')
        if vars_string:
            self._variable_list = list()
            for domain_var in vars_string.split(' '):
                if domain_var.startswith('#'):
                    break
                splitted = domain_var.split(':')
                cmor_var = self.var_manager.get_variable(splitted[1], silent=True)
                if not cmor_var:
                    Log.warning('Variable {0} not recognized. It will not be cmorized', domain_var)
                    continue
                if ModelingRealm(splitted[0]) != cmor_var.domain:
                    Log.warning('Domain {0} for variable {1} is not correct: is {2}', splitted[0], cmor_var.short_name,
                                cmor_var.domain)
                self._variable_list.append('{0.domain}:{0.short_name}'.format(cmor_var))
            if len(self._variable_list) == 0:
                raise ConfigException('Variable list value is specified, but no variables were found')
        else:
            self._variable_list = None

        self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', ''))
        self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', ''))
        self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', ''))
    def cmorize(self, var_cmor):
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Check if var_cmor is on variable list

        Parameters
        ----------
        var_cmor: Variable

        """
        if self._variable_list is None:
            return True
        return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name) in self._variable_list
    def any_required(self, variables):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Check if any of the given variables is needed for cmorization

        Parameters
        ----------
        variables: iterable of str

        Returns
        -------
        bool

        """
        if self._variable_list is None:
            return True
        for var in variables:
            if self.cmorize(self.var_manager.get_variable(var, silent=True)):
    def chunk_cmorization_requested(self, chunk):
        """
        Check if the cmorization of a given chunk is required

        Parameters
        ----------
        chunk: int

        Returns
        -------
        bool

        """
        if len(self._chunks) == 0:
            return True
        return chunk in self._chunks

    @staticmethod
    def _parse_variables(raw_string):
        variables = dict()
        if raw_string:
            splitted = raw_string.split(',')
            for var_section in splitted:
                splitted_var = var_section.split(':')
                if len(splitted_var) == 1:
                    levels = None
                else:
                    levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:])))
                variables[int(splitted_var[0])] = levels
        return variables

    @staticmethod
    def _parse_levels(levels_splitted):
        if len(levels_splitted) == 1:
            return map(int, levels_splitted[0].split('-'))
        start = int(levels_splitted[0])
        end = int(levels_splitted[1])
        if len(levels_splitted) == 3:
            step = int(levels_splitted[2])
        else:
            step = 1
        return range(start, end, step)

    def get_variables(self, frequency):
        """
        Get the variables to get from the grib file for a given frequency

        Parameters
        ----------
        frequency: Frequency

        Returns
        -------
        str

        Raises
        ------
        ValueError
            If the frequency passed is not supported

        """
        if frequency in (Frequencies.three_hourly, Frequencies.six_hourly):
            return self._var_hourly
        elif frequency == Frequencies.daily:
        elif frequency == Frequencies.monthly:
            return self._var_monthly
        raise ValueError('Frequency not recognized: {0}'.format(frequency))
    def get_requested_codes(self):
        """
        Get all the codes to be extracted from the grib files

        Returns
        -------
        set of int

        """
        return set(list(self._var_hourly.keys()) + list(self._var_daily.keys()) + list(self._var_monthly.keys()))

    def get_levels(self, frequency, variable):
        """
        Get the levels to extract for a given variable

        Parameters
        ----------
        frequency: Frequency
        variable: str

        Returns
        -------
        iterable of int

        """
        return self.get_variables(frequency)[variable]

    """
    Configuration related to the THREDDS server

    Parameters
    ----------
    parser: ConfigParser

    """

    def __init__(self, parser):
        self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '')


    Configuration related to the experiment
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def parse_ini(self, parser):
        """
        Parse experiment section from INI-like file
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Parameters
        ----------
        parser: ConfigParser
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE')
        self.expid = parser.get_option('EXPERIMENT', 'EXPID')
        self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid)
        self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS')
        self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1)
        self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0)

        members = []
        for mem in self.members:
            if '-' in mem:
                start, end = mem.split('-')
                if start.startswith(self.member_prefix):
                    start = start[len(self.member_prefix):]
                if end.startswith(self.member_prefix):
                    end = end[len(self.member_prefix):]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                for member in range(int(start), int(end) + 1):
                    members.append(member)
            else:
                if mem.startswith(self.member_prefix):
                    mem = mem[len(self.member_prefix):]
                members.append(int(mem))
        self.members = members
        self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard')
        startdates = parser.get_list_option('EXPERIMENT', 'STARTDATES')

        import exrex
        self.startdates = []
        for startdate_pattern in startdates:
            startdate_pattern = startdate_pattern.strip()
            if not startdate_pattern:
                continue
            if startdate_pattern[0] == '{' and startdate_pattern[-1] == '}':
                self._read_startdates(startdate_pattern[1:-1])
            else:
                for startdate in exrex.generate(startdate_pattern):
                    startdate = startdate.strip()
                    if startdate:
                        self.startdates.append(startdate)
        self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
        self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS')
        self.chunk_list = parser.get_int_list_option('EXPERIMENT', 'CHUNK_LIST', [])
        self.model = parser.get_option('EXPERIMENT', 'MODEL')
        self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '')
        self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6)
        self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6)
    def _read_startdates(self, pattern):
        pattern = pattern.split(',')
        start = parse_date(pattern[0].strip())
        end = parse_date(pattern[1].strip())
        interval = pattern[2].strip()
        if len(interval) == 1:
            factor = 1
        else:
            factor = int(interval[0:-1])
        interval = interval[-1].upper()
        while start <= end:
            self.startdates.append(date2str(start))
            if interval == 'Y':
                start = add_years(start, factor)
            elif interval == 'M':
                start = add_months(start, factor, cal=self.calendar)
            elif interval == 'W':
                start = add_days(start, factor * 7, cal=self.calendar)
            elif interval == 'D':
                start = add_days(start, factor, cal=self.calendar)
            else:
                raise ConfigException('Interval {0} not supported in STARTDATES definition: {1}', interval, pattern)

    def get_chunk_list(self):
        """
        Return a list with all the chunks
        :return: List containing tuples of startdate, member and chunk
        :rtype: tuple[str, int, int]
        """
        chunk_list = list()
        for startdate in self.startdates:
            for member in self.members:
                if len(self.chunk_list) == 0:
                    for chunk in range(1, self.num_chunks + 1):
                        chunk_list.append((startdate, member, chunk))
                else:
                    for chunk in self.chunk_list:
                        chunk_list.append((startdate, member, chunk))
        return chunk_list

    def get_member_list(self):
        """
        Return a list with all the members
        :return: List containing tuples of startdate and member
        :rtype: tuple[str, int, int]
        """
        member_list = list()
        for startdate in self.startdates:
            for member in self.members:
                member_list.append((startdate, member))
        return member_list

    def get_year_chunks(self, startdate, year):
        """
        Get the list of chunks containing timesteps from the given year
        :param startdate: startdate to use
        :type startdate: str
        :param year: reference year
        :type year: int
        :return: list of chunks containing data from the given year
        :rtype: list[int]
        """
        date = parse_date(startdate)
        chunks = list()
        for chunk in range(1, self.num_chunks + 1):
            chunk_start = self.get_chunk_start(date, chunk)
            if chunk_start.year > year:
                break
            elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
                                                            self.calendar).year == year:
                chunks.append(chunk)

        return chunks

    def get_chunk_start(self, startdate, chunk):
        """
        Get chunk's first day

        Parameters
        ----------
        startdate: str or datetime.datetime
        chunk: int

        Returns
        -------
        datetime.datetime

        See Also
        --------
        get_chunk_start_str

        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        # noinspection PyTypeChecker
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if isinstance(startdate, six.string_types):
            startdate = parse_date(startdate)
        return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar)

    def get_chunk_start_str(self, startdate, chunk):
        """
        Get chunk's first day string representation

        Parameters
        ----------
        startdate: str or datetime.datetime
        chunk: int

        Returns
        -------
        str

        See Also
        --------
        get_chunk_start

        """
        return date2str(self.get_chunk_start(startdate, chunk))

    def get_chunk_end(self, startdate, chunk):
        """
        Get chunk's last day

        Parameters
        ----------
        startdate: str or datetime.datetime
        chunk: int

        Returns
        -------
        datetime.datetime

        See Also
        --------
        get_chunk_end_str

        """
        return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar)

    def get_chunk_end_str(self, startdate, chunk):
        """
        Get chunk's last day as a string

        Parameters
        ----------
        startdate: str or datetime.datetime
        chunk: int

        Returns
        -------
        datetime.datetime

        See Also
        --------
        get_chunk_end

        """
        return date2str(self.get_chunk_end(startdate, chunk))

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Return the list of full years that are in the given startdate

        :param startdate: startdate to use
        :type startdate: str
        :return: list of full years
        :rtype: list[int]
        """
        chunks_per_year = 12 / self.chunk_size
        date = parse_date(startdate)
        first_january = 0
        first_year = date.year
        if date.month != 1:
            month = date.month
            first_year += 1
            while month + self.chunk_size < 12:
                month += self.chunk_size
                first_january += 1

        years = list()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for _ in range(first_january, chunks_per_year, self.num_chunks):
            years.append(first_year)
            first_year += 1
        return years

    def get_member_str(self, member):
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Return the member name for a given member number.

        :param member: member's number
        :type member: int
        :return: member's name
        :rtype: str
        return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

class ReportConfig(object):
    """
    Configuration for the reporting feature

    Parameters
    ----------
    parser: ConfigParser

    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, parser):
        self.maximum_priority = parser.get_int_option('REPORT', 'MAXIMUM_PRIORITY', 10)
        self.path = parser.get_path_option('REPORT', 'PATH', '')