# coding=utf-8 """Classes to manage Earth Diagnostics configuration""" import os import six from bscearth.utils.config_parser import ConfigParser from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str, add_years, add_months, add_days from bscearth.utils.log import Log import bscearth.utils.path from earthdiagnostics.frequency import Frequency, Frequencies from earthdiagnostics.modelingrealm import ModelingRealm from earthdiagnostics.variable import VariableManager from earthdiagnostics.data_convention import SPECSConvention, CMIP6Convention, PrimaveraConvention, \ MeteoFranceConvention, PrefaceConvention class ConfigException(Exception): """Exception raised when there is a problem with the configuration""" pass class Config(object): """Class to read and manage the configuration""" def __init__(self): # Read diags config self.data_adaptor = None "Scratch folder path" self.scratch_dir = None "Scratch folder path" self.use_ramdisk = None "If True, the scratch dir is created as a ram disk" self.auto_clean = None "If True, the scratch dir is removed after finishing" self.scratch_masks = None "Common scratch folder for masks" self.data_dir = None "Root data folder path" self.data_type = None "Data type (experiment, observation or reconstruction)" self.con_files = None "Mask and meshes folder path" self.mesh_mask = None "Custom mesh mask file to use" self.new_mask_glo = None "Custom new mask glo file to use" self.mask_regions = None "Custom mask regions file to use" self.mask_regions_3d = None "Custom mask regions 3D file to use" self.data_convention = None "Data convention to use" self.var_manager = None self.skip_diags_done = None "Flag to control if already done diags must be recalculated" self.frequency = None "Default data frequency to be used by the diagnostics" self.cdftools_path = None "Path to CDFTOOLS executables" self.max_cores = None "Maximum number of cores to use" self.parallel_downloads = None "Maximum number of simultaneous downloads" self.parallel_uploads = None "Maximum number of simultaneous uploads" self.restore_meshes = None "If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence" # Read experiment config self.experiment = ExperimentConfig() """ Configuration related to the experiment Returns ------- ExperimentConfig """ self.cmor = None """ CMOR related configuration Returns ------- CMORConfig """ self.thredds = None """ THREDDS server configuration Returns ------- THREDDSConfig """ self.report = None """ Reporting configuration Returns ------- ReportConfig """ def parse(self, path): """ Read configuration from INI file Parameters ---------- path: str """ config_file_path = bscearth.utils.path.expand_path(path) if not os.path.isfile(config_file_path): Log.critical('Configuration file {0} can not be found', config_file_path) raise ValueError('Configuration file {0} can not be found'.format(config_file_path)) parser = ConfigParser() parser.optionxform = str parser.read(config_file_path) # Read diags config self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS', 'OBSRECON'), 'CMOR') self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR') self.use_ramdisk = parser.get_bool_option('DIAGNOSTICS', 'USE_RAMDISK', False) self.auto_clean = parser.get_bool_option('DIAGNOSTICS', 'AUTO_CLEAN', True) if not self.auto_clean and self.use_ramdisk: Log.warning('RAM disk scratch dir is always automatically cleaned.') self.auto_clean = True self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks') self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR') self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp') self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES') self.mesh_mask = parser.get_path_option('DIAGNOSTICS', 'MESH_MASK', '') self.new_mask_glo = parser.get_path_option('DIAGNOSTICS', 'NEW_MASK_GLO', '') self.mask_regions = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS', '') self.mask_regions_3d = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS_3D', '') data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION', ('specs', 'primavera', 'cmip6', 'preface', 'meteofrance'), 'specs', ignore_case=True) if data_convention == 'specs': self.data_convention = SPECSConvention(data_convention, self) elif data_convention == 'primavera': self.data_convention = PrimaveraConvention(data_convention, self) elif data_convention == 'cmip6': self.data_convention = CMIP6Convention(data_convention, self) elif data_convention == 'preface': self.data_convention = PrefaceConvention(data_convention, self) elif data_convention == 'meteofrance': self.data_convention = MeteoFranceConvention(data_convention, self) self.scratch_masks = self.data_convention.get_scratch_masks(self.scratch_masks) namelist_file = os.path.join(os.path.dirname(__file__), 'CDFTOOLS_{0}.namlist'.format(self.data_convention.name)) Log.debug(namelist_file) if os.path.isfile(namelist_file): Log.debug('Setting namelist {0}', namelist_file) os.environ['NAM_CDF_NAMES'] = namelist_file self.var_manager = VariableManager() self.var_manager.load_variables(data_convention) self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS') self.skip_diags_done = parser.get_bool_option('DIAGNOSTICS', 'SKIP_DIAGS_DONE', True) self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY')) self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '') self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0) self.parallel_downloads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_DOWNLOADS', 1) self.parallel_uploads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_UPLOADS', 1) self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False) # Read experiment config self.experiment = ExperimentConfig() self.experiment.parse_ini(parser) # Read aliases self._aliases = dict() if parser.has_section('ALIAS'): for option in parser.options('ALIAS'): self._aliases[option.lower()] = parser.get_list_option('ALIAS', option) Log.debug('Preparing command list') commands = self._diags.split() self._real_commands = list() for command in commands: command = command.strip() if command.startswith('#'): break if command.lower() in self._aliases: added_commands = self._aliases[command.lower()] Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands)) for add_command in added_commands: self._real_commands.append(add_command) else: self._real_commands.append(command) Log.debug('Command list ready ') self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid) self.cmor = CMORConfig(parser, self.var_manager) self.thredds = THREDDSConfig(parser) self.report = ReportConfig(parser) def get_commands(self): """ Return the list of commands after replacing the alias Returns ------- iterable of str """ return self._real_commands class CMORConfig(object): """ Configuration for the cmorization processes Parameters ---------- parser: ConfigParser var_manager: VariableManager """ def __init__(self, parser, var_manager): self.force = parser.get_bool_option('CMOR', 'FORCE', False) self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False) self.skip_prepare = parser.get_bool_option('CMOR', 'SKIP_PREPARE', False) self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '') self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True) self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True) self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True) self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS') self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled') self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled') self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled') self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1') self.initialization_number = parser.get_int_option('CMOR', 'INITIALIZATION_NUMBER', 1) self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled') self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1') self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled') self.version = parser.get_option('CMOR', 'VERSION', '') self.default_ocean_grid = parser.get_option('CMOR', 'DEFAULT_OCEAN_GRID', 'gn') self.default_atmos_grid = parser.get_option('CMOR', 'DEFAULT_ATMOS_GRID', 'gr') self.activity = parser.get_option('CMOR', 'ACTIVITY', 'CMIP') self.min_cmorized_vars = parser.get_int_option('CMOR', 'MIN_CMORIZED_VARS', 10) self.append_startdate = parser.get_bool_option('CMOR', 'APPEND_STARTDATE', False) vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '') self.var_manager = var_manager if vars_string: self._variable_list = list() for domain_var in vars_string.split(' '): domain_var = domain_var.strip() if domain_var.startswith('#'): break if not domain_var: continue splitted = domain_var.split(':') cmor_var = self.var_manager.get_variable(splitted[1], silent=True) if not cmor_var: Log.warning('Variable {0} not recognized. It will not be cmorized', domain_var) continue if ModelingRealm(splitted[0]) != cmor_var.domain: Log.warning('Domain {0} for variable {1} is not correct: is {2}', splitted[0], cmor_var.short_name, cmor_var.domain) continue self._variable_list.append('{0.domain}:{0.short_name}'.format(cmor_var)) if len(self._variable_list) == 0: raise ConfigException('Variable list value is specified, but no variables were found') else: self._variable_list = None self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', '')) self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', '')) self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', '')) def cmorize(self, var_cmor): """ Check if var_cmor is on variable list Parameters ---------- var_cmor: Variable """ if self._variable_list is None: return True if not var_cmor: return False return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name) in self._variable_list def any_required(self, variables): """ Check if any of the given variables is needed for cmorization Parameters ---------- variables: iterable of str Returns ------- bool """ if self._variable_list is None: return True for var in variables: if self.cmorize(self.var_manager.get_variable(var, silent=True)): return True return False def chunk_cmorization_requested(self, chunk): """ Check if the cmorization of a given chunk is required Parameters ---------- chunk: int Returns ------- bool """ if len(self._chunks) == 0: return True return chunk in self._chunks @staticmethod def _parse_variables(raw_string): variables = dict() if raw_string: splitted = raw_string.split(',') for var_section in splitted: splitted_var = var_section.split(':') if len(splitted_var) == 1: levels = None else: levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:]))) variables[int(splitted_var[0])] = levels return variables @staticmethod def _parse_levels(levels_splitted): if len(levels_splitted) == 1: return map(int, levels_splitted[0].split('-')) start = int(levels_splitted[0]) end = int(levels_splitted[1]) if len(levels_splitted) == 3: step = int(levels_splitted[2]) else: step = 1 return range(start, end, step) def get_variables(self, frequency): """ Get the variables to get from the grib file for a given frequency Parameters ---------- frequency: Frequency Returns ------- str Raises ------ ValueError If the frequency passed is not supported """ if frequency in (Frequencies.three_hourly, Frequencies.six_hourly): return self._var_hourly elif frequency == Frequencies.daily: return self._var_daily elif frequency == Frequencies.monthly: return self._var_monthly raise ValueError('Frequency not recognized: {0}'.format(frequency)) def get_requested_codes(self): """ Get all the codes to be extracted from the grib files Returns ------- set of int """ return set(list(self._var_hourly.keys()) + list(self._var_daily.keys()) + list(self._var_monthly.keys())) def get_levels(self, frequency, variable): """ Get the levels to extract for a given variable Parameters ---------- frequency: Frequency variable: str Returns ------- iterable of int """ return self.get_variables(frequency)[variable] class THREDDSConfig(object): """ Configuration related to the THREDDS server Parameters ---------- parser: ConfigParser """ def __init__(self, parser): self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '') class ExperimentConfig(object): """Configuration related to the experiment""" def __init__(self): self.chunk_list = None def parse_ini(self, parser): """ Parse experiment section from INI-like file Parameters ---------- parser: ConfigParser """ self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE') self.expid = parser.get_option('EXPERIMENT', 'EXPID') self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid) self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS') self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1) self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc') self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0) self._parse_members() self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard') self._parse_startdates(parser) self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE') self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS') self.chunk_list = parser.get_int_list_option('EXPERIMENT', 'CHUNK_LIST', []) self.model = parser.get_option('EXPERIMENT', 'MODEL') self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '') self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '') self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6) self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6) def _parse_startdates(self, parser): startdates = parser.get_list_option('EXPERIMENT', 'STARTDATES') import exrex self.startdates = [] for startdate_pattern in startdates: startdate_pattern = startdate_pattern.strip() if not startdate_pattern: continue if startdate_pattern[0] == '{' and startdate_pattern[-1] == '}': self._read_startdates(startdate_pattern[1:-1]) else: for startdate in exrex.generate(startdate_pattern): startdate = startdate.strip() self.startdates.append(startdate) def _parse_members(self): members = [] for mem in self.members: if '-' in mem: start, end = mem.split('-') if start.startswith(self.member_prefix): start = start[len(self.member_prefix):] if end.startswith(self.member_prefix): end = end[len(self.member_prefix):] for member in range(int(start), int(end) + 1): members.append(member) else: if mem.startswith(self.member_prefix): mem = mem[len(self.member_prefix):] members.append(int(mem)) self.members = members def _read_startdates(self, pattern): pattern = pattern.split(',') start = parse_date(pattern[0].strip()) end = parse_date(pattern[1].strip()) interval = pattern[2].strip() if len(interval) == 1: factor = 1 else: factor = int(interval[0:-1]) interval = interval[-1].upper() while start <= end: self.startdates.append(date2str(start)) if interval == 'Y': start = add_years(start, factor) elif interval == 'M': start = add_months(start, factor, cal=self.calendar) elif interval == 'W': start = add_days(start, factor * 7, cal=self.calendar) elif interval == 'D': start = add_days(start, factor, cal=self.calendar) else: raise ConfigException('Interval {0} not supported in STARTDATES definition: {1}', interval, pattern) def get_chunk_list(self): """ Return a list with all the chunks :return: List containing tuples of startdate, member and chunk :rtype: tuple[str, int, int] """ chunk_list = list() for startdate in self.startdates: for member in self.members: if len(self.chunk_list) == 0: for chunk in range(1, self.num_chunks + 1): chunk_list.append((startdate, member, chunk)) else: for chunk in self.chunk_list: chunk_list.append((startdate, member, chunk)) return chunk_list def get_member_list(self): """ Return a list with all the members :return: List containing tuples of startdate and member :rtype: tuple[str, int, int] """ member_list = list() for startdate in self.startdates: for member in self.members: member_list.append((startdate, member)) return member_list def get_year_chunks(self, startdate, year): """ Get the list of chunks containing timesteps from the given year :param startdate: startdate to use :type startdate: str :param year: reference year :type year: int :return: list of chunks containing data from the given year :rtype: list[int] """ date = parse_date(startdate) chunks = list() for chunk in range(1, self.num_chunks + 1): chunk_start = self.get_chunk_start(date, chunk) if chunk_start.year > year: break elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month', self.calendar).year == year: chunks.append(chunk) return chunks def get_chunk_start(self, startdate, chunk): """ Get chunk's first day Parameters ---------- startdate: str or datetime.datetime chunk: int Returns ------- datetime.datetime See Also -------- get_chunk_start_str """ # noinspection PyTypeChecker if isinstance(startdate, six.string_types): startdate = parse_date(startdate) return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar) def get_chunk_start_str(self, startdate, chunk): """ Get chunk's first day string representation Parameters ---------- startdate: str or datetime.datetime chunk: int Returns ------- str See Also -------- get_chunk_start """ return date2str(self.get_chunk_start(startdate, chunk)) def get_chunk_end(self, startdate, chunk): """ Get chunk's last day Parameters ---------- startdate: str or datetime.datetime chunk: int Returns ------- datetime.datetime See Also -------- get_chunk_end_str """ return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar) def get_chunk_end_str(self, startdate, chunk): """ Get chunk's last day as a string Parameters ---------- startdate: str or datetime.datetime chunk: int Returns ------- datetime.datetime See Also -------- get_chunk_end """ return date2str(self.get_chunk_end(startdate, chunk)) def get_full_years(self, startdate): """ Return the list of full years that are in the given startdate :param startdate: startdate to use :type startdate: str :return: list of full years :rtype: list[int] """ chunks_per_year = 12 // self.chunk_size date = parse_date(startdate) first_january = 0 first_year = date.year if date.month != 1: month = date.month first_year += 1 while month + self.chunk_size < 12: month += self.chunk_size first_january += 1 years = list() for _ in range(first_january, self.num_chunks, chunks_per_year,): years.append(first_year) first_year += 1 return years def get_member_str(self, member): """ Return the member name for a given member number. :param member: member's number :type member: int :return: member's name :rtype: str """ return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits)) class ReportConfig(object): """ Configuration for the reporting feature Parameters ---------- parser: ConfigParser """ def __init__(self, parser): self.maximum_priority = parser.get_int_option('REPORT', 'MAXIMUM_PRIORITY', 10) self.path = parser.get_path_option('REPORT', 'PATH', '')