config.py 16.8 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
import six
from bscearth.utils.config_parser import ConfigParser
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str
from bscearth.utils.log import Log
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.variable import VariableManager
from modelingrealm import ModelingRealm
class ConfigException(Exception):
    pass


class Config(object):
    """
    Class to read and manage the configuration

    :param path: path to the conf file
    :type path: str
    """
        parser = ConfigParser()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS', 'OBSRECON'),
                                                     'CMOR')
        "Scratch folder path"
        self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR')
        self.use_ramdisk = parser.get_bool_option('DIAGNOSTICS', 'USE_RAMDISK', False)
        "If True, the scratch dir is created as a ram disk"
        self.auto_clean = parser.get_bool_option('DIAGNOSTICS', 'AUTO_CLEAN', True)
        "If True, the scratch dir is removed after finishing"
        if not self.auto_clean and self.use_ramdisk:
            Log.warning('RAM disk scratch dir is always automatically cleaned.')
            self.auto_clean = True

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks')
        "Common scratch folder for masks"
        self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR')
        "Root data folder path"
        self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp')
        "Data type (experiment, observation or reconstruction)"
        self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES')
        "Mask and meshes folder path"
        self.mesh_mask = parser.get_path_option('DIAGNOSTICS', 'MESH_MASK', '')
        "Custom mesh mask file to use"
        self.new_mask_glo = parser.get_path_option('DIAGNOSTICS', 'NEW_MASK_GLO', '')
        "Custom new mask glo file to use"
        self.mask_regions = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS', '')
        "Custom mask regions file to use"
        self.mask_regions_3d = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS_3D', '')
        "Custom mask regions 3D file to use"

        self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION',
                                                        ('specs', 'primavera', 'cmip6', 'preface'), 'specs',
                                                        ignore_case=True)
        self.var_manager = VariableManager()
        self.var_manager.load_variables(self.data_convention)
        self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS')
        self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY'))
        "Default data frequency to be used by the diagnostics"
        self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '')
        "Path to CDFTOOLS executables"
        self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0)
        "Maximum number of cores to use"
        self.parallel_downloads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_DOWNLOADS', 1)
        "Maximum number of simultaneous downloads"
        self.parallel_uploads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_UPLOADS', 1)
        "Maximum number of simultaneous uploads"
        self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False)
        "If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence"

        # Read experiment config
        self.experiment = ExperimentConfig(parser)
        """
        Configuration related to the experiment

        :rtype: ExperimentConfig
        """
        # Read aliases
        self._aliases = dict()
        if parser.has_section('ALIAS'):
            for option in parser.options('ALIAS'):
                self._aliases[option.lower()] = parser.get_list_option('ALIAS', option)
        commands = self._diags.split()
            if command.lower() in self._aliases:
                added_commands = self._aliases[command.lower()]
                Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands))
                for add_command in added_commands:
                    self._real_commands.append(add_command)
            else:
                self._real_commands.append(command)
        Log.debug('Command list ready ')

        self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid)

        self.cmor = CMORConfig(parser, self.var_manager)
        self.thredds = THREDDSConfig(parser)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.report = ReportConfig(parser)
        """
        Returns the list of commands after replacing the alias
        :return: full list of commands
        :rtype: list(str)
        """
    def __init__(self, parser, var_manager):
        self.force = parser.get_bool_option('CMOR', 'FORCE', False)
        self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False)
        self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '')
        self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True)
        self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True)
        self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True)
        self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS')
        self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled')
        self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled')
        self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled')
        self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1')
        self.initialization_number = parser.get_int_option('CMOR', 'INITIALIZATION_NUMBER', 1)
        self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled')
        self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1')
        self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled')
        self.version = parser.get_option('CMOR', 'VERSION', '')
        self.default_ocean_grid = parser.get_option('CMOR', 'DEFAULT_OCEAN_GRID', 'gn')
        self.default_atmos_grid = parser.get_option('CMOR', 'DEFAULT_ATMOS_GRID', 'gr')
        self.activity = parser.get_option('CMOR', 'ACTIVITY', 'CMIP')
        vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '')
        if vars_string:
            self._variable_list = list()
            for domain_var in vars_string.split(' '):
                if domain_var.startswith('#'):
                    break
                splitted = domain_var.split(':')
                cmor_var = self.var_manager.get_variable(splitted[1], silent=True)
                if not cmor_var:
                    Log.warning('Variable {0} not recognized. It will not be cmorized', domain_var)
                    continue
                if ModelingRealm(splitted[0]) != cmor_var.domain:
                    Log.warning('Domain {0} for variable {1} is not correct: is {2}', splitted[0], cmor_var.short_name,
                                cmor_var.domain)
                self._variable_list.append('{0.domain}:{0.short_name}'.format(cmor_var))
            if len(self._variable_list) == 0:
                raise ConfigException('Variable list value is specified, but no variables were found')
        else:
            self._variable_list = None

        self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', ''))
        self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', ''))
        self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', ''))
    def cmorize(self, var_cmor):
        """
        Checks if var_cmor is on variable list

        :param var_cmor: CMOR variable object
        :rtype var_cmor: Variablle
        :return:
        """
        if self._variable_list is None:
            return True
        return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name) in self._variable_list
    def any_required(self, variables):
        if self._variable_list is None:
            return True
        for var in variables:
            if self.cmorize(self.var_manager.get_variable(var, silent=True)):
    def chunk_cmorization_requested(self, chunk):
        if len(self._chunks) == 0:
            return True
        return chunk in self._chunks

    @staticmethod
    def _parse_variables(raw_string):
        variables = dict()
        if raw_string:
            splitted = raw_string.split(',')
            for var_section in splitted:
                splitted_var = var_section.split(':')
                if len(splitted_var) == 1:
                    levels = None
                else:
                    levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:])))
                variables[int(splitted_var[0])] = levels
        return variables

    @staticmethod
    def _parse_levels(levels_splitted):
        if len(levels_splitted) == 1:
            return map(int, levels_splitted[0].split('-'))
        start = int(levels_splitted[0])
        end = int(levels_splitted[1])
        if len(levels_splitted) == 3:
            step = int(levels_splitted[2])
        else:
            step = 1
        return range(start, end, step)

    def get_variables(self, frequency):
        if frequency in (Frequencies.three_hourly, Frequencies.six_hourly):
            return self._var_hourly
        elif frequency == Frequencies.daily:
        elif frequency == Frequencies.monthly:
            return self._var_monthly
        raise ValueError('Frequency not recognized: {0}'.format(frequency))

    def get_levels(self, frequency, variable):
        return self.get_variables(frequency)[variable]

class THREDDSConfig(object):
    def __init__(self, parser):
        self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '')


class ExperimentConfig(object):
    """
    Encapsulates all chunk related tasks

    :param parser: parser for the config file
    :type parser: Parser
    """

    def __init__(self, parser):
        self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE')
        self.expid = parser.get_option('EXPERIMENT', 'EXPID')
        self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid)
        self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS')
        self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1)
        self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0)

        members = []
        for mem in self.members:
            if '-' in mem:
                start, end = mem.split('-')
                if start.startswith(self.member_prefix):
                    start = start[len(self.member_prefix):]
                if end.startswith(self.member_prefix):
                    end = end[len(self.member_prefix):]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                for member in range(int(start), int(end) + 1):
                    members.append(member)
            else:
                if mem.startswith(self.member_prefix):
                    mem = mem[len(self.member_prefix):]
                members.append(int(mem))
        self.members = members

        startdates = parser.get_list_option('EXPERIMENT', 'STARTDATES')

        import exrex
        self.startdates = []
        for startdate_pattern in startdates:
            for startdate in exrex.generate(startdate_pattern):
                self.startdates.append(startdate)

        self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
        self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS')
        self.chunk_list = parser.get_int_list_option('EXPERIMENT', 'CHUNK_LIST', [])
        self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard')
        self.model = parser.get_option('EXPERIMENT', 'MODEL')
        self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '')
        self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6)
        self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6)

    def get_chunk_list(self):
        """
        Return a list with all the chunks
        :return: List containing tuples of startdate, member and chunk
        :rtype: tuple[str, int, int]
        """
        chunk_list = list()
        for startdate in self.startdates:
            for member in self.members:
                if len(self.chunk_list) == 0:
                    for chunk in range(1, self.num_chunks + 1):
                        chunk_list.append((startdate, member, chunk))
                else:
                    for chunk in self.chunk_list:
                        chunk_list.append((startdate, member, chunk))
        return chunk_list

    def get_member_list(self):
        """
        Return a list with all the members
        :return: List containing tuples of startdate and member
        :rtype: tuple[str, int, int]
        """
        member_list = list()
        for startdate in self.startdates:
            for member in self.members:
                member_list.append((startdate, member))
        return member_list

    def get_year_chunks(self, startdate, year):
        """
        Get the list of chunks containing timesteps from the given year
        :param startdate: startdate to use
        :type startdate: str
        :param year: reference year
        :type year: int
        :return: list of chunks containing data from the given year
        :rtype: list[int]
        """
        date = parse_date(startdate)
        chunks = list()
        for chunk in range(1, self.num_chunks + 1):
            chunk_start = self.get_chunk_start(date, chunk)
            if chunk_start.year > year:
                break
            elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
                                                            self.calendar).year == year:
                chunks.append(chunk)

        return chunks

    def get_chunk_start(self, startdate, chunk):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        # noinspection PyTypeChecker
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if isinstance(startdate, six.string_types):
            startdate = parse_date(startdate)
        return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar)

    def get_chunk_start_str(self, startdate, chunk):
        return date2str(self.get_chunk_start(startdate, chunk))

    def get_chunk_end(self, startdate, chunk):
        return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar)

    def get_chunk_end_str(self, startdate, chunk):
        return date2str(self.get_chunk_end(startdate, chunk))

    def get_full_years(self, startdate):
        """
        Returns the list of full years that are in the given startdate
        :param startdate: startdate to use
        :type startdate: str
        :return: list of full years
        :rtype: list[int]
        """
        chunks_per_year = 12 / self.chunk_size
        date = parse_date(startdate)
        first_january = 0
        first_year = date.year
        if date.month != 1:
            month = date.month
            first_year += 1
            while month + self.chunk_size < 12:
                month += self.chunk_size
                first_january += 1

        years = list()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for chunk in range(first_january, chunks_per_year, self.num_chunks):
            years.append(first_year)
            first_year += 1
        return years

    def get_member_str(self, member):
        """
        Returns the member name for a given member number.
        :param member: member's number
        :type member: int
        :return: member's name
        :rtype: str
        """
        return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

class ReportConfig(object):
    def __init__(self, parser):
        self.maximum_priority = parser.get_int_option('REPORT', 'MAXIMUM_PRIORITY', 10)
        self.path = parser.get_path_option('REPORT', 'PATH', '')