config.py 12.7 KB
Newer Older
from bscearth.utils.log import Log
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str
from bscearth.utils.config_parser import ConfigParser
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.variable import VariableManager
class Config(object):
    """
    Class to read and manage the configuration

    :param path: path to the conf file
    :type path: str
    """
        parser = ConfigParser()
        self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS'), 'CMOR')
        "Scratch folder path"
        self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks')
        "Common scratch folder for masks"
        self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR')
        "Root data folder path"
        self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp')
        "Data type (experiment, observation or reconstruction)"
        self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES')
        "Mask and meshes folder path"
        self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION',
                                                        ('specs', 'primavera', 'cmip6'), 'specs', ignore_case=True)
        self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS')
        self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY'))
        "Default data frequency to be used by the diagnostics"
        self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '')
        "Path to CDFTOOLS executables"
        self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0)
        "Maximum number of cores to use"
        self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False)
        "If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence"

        # Read experiment config
        self.experiment = ExperimentConfig(parser)
        """
        Configuration related to the experiment

        :rtype: ExperimentConfig
        """
        # Read aliases
        self._aliases = dict()
        if parser.has_section('ALIAS'):
            for option in parser.options('ALIAS'):
                self._aliases[option.lower()] = parser.get_list_option('ALIAS', option)
        commands = self._diags.split()
            if command.lower() in self._aliases:
                added_commands = self._aliases[command.lower()]
                Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands))
                for add_command in added_commands:
                    self._real_commands.append(add_command)
            else:
                self._real_commands.append(command)
        Log.debug('Command list ready ')

        self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid)

        self.cmor = CMORConfig(parser)
        self.thredds = THREDDSConfig(parser)
        """
        Returns the list of commands after replacing the alias
        :return: full list of commands
        :rtype: list(str)
        """
        return self._real_commands
        

class CMORConfig(object):
    
    def __init__(self, parser):
        self.force = parser.get_bool_option('CMOR', 'FORCE', False)
        self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False)
        self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '')
        self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True)
        self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True)
        self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True)
        self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS')
        self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled')
        self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled')
        self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled')
        self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1')
        self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled')
        self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1')
        self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled')

        vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '')
        if vars_string:
            self._variable_list = list()
            for domain_var in vars_string.split(' '):
                self._variable_list.append(domain_var.lower())
        else:
            self._variable_list = None

        self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', ''))
        self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', ''))
        self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', ''))
    def cmorize(self, var_cmor):
        """
        Checks if var_cmor is on variable list

        :param var_cmor: CMOR variable object
        :rtype var_cmor: Variablle
        :return:
        """
        if self._variable_list is None:
            return True
        return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name).lower() in self._variable_list

    def any_required(self, variables):
        if self._variable_list is None:
            return True
        for var in variables:
            if self.cmorize(VariableManager().get_variable(var, silent=True)):
    def chunk_cmorization_requested(self, chunk):
        if len(self._chunks) == 0:
            return True
        return chunk in self._chunks

    @staticmethod
    def _parse_variables(raw_string):
        variables = dict()
        if raw_string:
            splitted = raw_string.split(',')
            for var_section in splitted:
                splitted_var = var_section.split(':')
                if len(splitted_var) == 1:
                    levels = None
                else:
                    levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:])))
                variables[int(splitted_var[0])] = levels
        return variables

    @staticmethod
    def _parse_levels(levels_splitted):
        if len(levels_splitted) == 1:
            return map(int, levels_splitted[0].split('-'))
        start = int(levels_splitted[0])
        end = int(levels_splitted[1])
        if len(levels_splitted) == 3:
            step = int(levels_splitted[2])
        else:
            step = 1
        return range(start, end, step)

    def get_variables(self, frequency):
        if frequency in (Frequencies.three_hourly, Frequencies.six_hourly):
            return self._var_hourly
        elif frequency == Frequencies.daily:
        elif frequency == Frequencies.monthly:
            return self._var_monthly
        raise Exception('Frequency not recognized: {0}'.format(frequency))

    def get_levels(self, frequency, variable):
        return self.get_variables(frequency)[variable]

class THREDDSConfig(object):
    def __init__(self, parser):
        self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '')


class ExperimentConfig(object):
    """
    Encapsulates all chunk related tasks

    :param parser: parser for the config file
    :type parser: Parser
    """

    def __init__(self, parser):
        self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE')
        self.expid = parser.get_option('EXPERIMENT', 'EXPID')
        self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid)
        self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS')
        self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1)
        self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0)
        self.members = [int(mem) if mem.startswith(self.member_prefix) else int(mem) for mem in self.members]
        self.startdates = parser.get_option('EXPERIMENT', 'STARTDATES').split()
        self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
        self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS')
        self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard')
        self.model = parser.get_option('EXPERIMENT', 'MODEL')
        self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '')
        self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6)
        self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6)

    def get_chunk_list(self):
        """
        Return a list with all the chunks
        :return: List containing tuples of startdate, member and chunk
        :rtype: tuple[str, int, int]
        """
        chunk_list = list()
        for startdate in self.startdates:
            for member in self.members:
                for chunk in range(1, self.num_chunks + 1):
                    chunk_list.append((startdate, member, chunk))
        return chunk_list

    def get_member_list(self):
        """
        Return a list with all the members
        :return: List containing tuples of startdate and member
        :rtype: tuple[str, int, int]
        """
        member_list = list()
        for startdate in self.startdates:
            for member in self.members:
                    member_list.append((startdate, member))
        return member_list

    def get_year_chunks(self, startdate, year):
        """
        Get the list of chunks containing timesteps from the given year
        :param startdate: startdate to use
        :type startdate: str
        :param year: reference year
        :type year: int
        :return: list of chunks containing data from the given year
        :rtype: list[int]
        """
        date = parse_date(startdate)
        chunks = list()
        for chunk in range(1, self.num_chunks + 1):
            chunk_start = self.get_chunk_start(date, chunk)
            if chunk_start.year > year:
                break
            elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
                                                            self.calendar).year == year:
                chunks.append(chunk)

        return chunks

    def get_chunk_start(self, startdate, chunk):
        if isinstance(startdate, basestring):
            startdate = parse_date(startdate)
        return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar)

    def get_chunk_start_str(self, startdate, chunk):
        return date2str(self.get_chunk_start(startdate, chunk))

    def get_chunk_end(self, startdate, chunk):
        return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar)

    def get_chunk_end_str(self, startdate, chunk):
        return date2str(self.get_chunk_end(startdate, chunk))

    def get_full_years(self, startdate):
        """
        Returns the list of full years that are in the given startdate
        :param startdate: startdate to use
        :type startdate: str
        :return: list of full years
        :rtype: list[int]
        """
        chunks_per_year = 12 / self.chunk_size
        date = parse_date(startdate)
        first_january = 0
        first_year = date.year
        if date.month != 1:
            month = date.month
            first_year += 1
            while month + self.chunk_size < 12:
                month += self.chunk_size
                first_january += 1

        years = list()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for chunk in range(first_january, chunks_per_year, self.num_chunks):
            years.append(first_year)
            first_year += 1
        return years

    def get_member_str(self, member):
        """
        Returns the member name for a given member number.
        :param member: member's number
        :type member: int
        :return: member's name
        :rtype: str
        """
        return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits))