# coding=utf-8 import os import six from bscearth.utils.config_parser import ConfigParser from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str from bscearth.utils.log import Log from earthdiagnostics.frequency import Frequency, Frequencies from earthdiagnostics.variable import VariableManager from modelingrealm import ModelingRealm class ConfigException(Exception): pass class Config(object): """ Class to read and manage the configuration :param path: path to the conf file :type path: str """ def __init__(self, path): parser = ConfigParser() parser.optionxform = str parser.read(path) # Read diags config self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS', 'OBSRECON'), 'CMOR') "Scratch folder path" self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR') "Scratch folder path" self.use_ramdisk = parser.get_bool_option('DIAGNOSTICS', 'USE_RAMDISK', False) "If True, the scratch dir is created as a ram disk" self.auto_clean = parser.get_bool_option('DIAGNOSTICS', 'AUTO_CLEAN', True) "If True, the scratch dir is removed after finishing" if not self.auto_clean and self.use_ramdisk: Log.warning('RAM disk scratch dir is always automatically cleaned.') self.auto_clean = True self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks') "Common scratch folder for masks" self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR') "Root data folder path" self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp') "Data type (experiment, observation or reconstruction)" self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES') "Mask and meshes folder path" self.mesh_mask = parser.get_path_option('DIAGNOSTICS', 'MESH_MASK', '') "Custom mesh mask file to use" self.new_mask_glo = parser.get_path_option('DIAGNOSTICS', 'NEW_MASK_GLO', '') "Custom new mask glo file to use" self.mask_regions = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS', '') "Custom mask regions file to use" self.mask_regions_3d = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS_3D', '') "Custom mask regions 3D file to use" self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION', ('specs', 'primavera', 'cmip6', 'preface'), 'specs', ignore_case=True) self.var_manager = VariableManager() self.var_manager.load_variables(self.data_convention) self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS') self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY')) "Default data frequency to be used by the diagnostics" self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '') "Path to CDFTOOLS executables" self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0) "Maximum number of cores to use" self.parallel_downloads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_DOWNLOADS', 1) "Maximum number of simultaneous downloads" self.parallel_uploads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_UPLOADS', 1) "Maximum number of simultaneous uploads" self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False) "If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence" # Read experiment config self.experiment = ExperimentConfig(parser) """ Configuration related to the experiment :rtype: ExperimentConfig """ # Read aliases self._aliases = dict() if parser.has_section('ALIAS'): for option in parser.options('ALIAS'): self._aliases[option.lower()] = parser.get_list_option('ALIAS', option) Log.debug('Preparing command list') commands = self._diags.split() self._real_commands = list() for command in commands: if command.lower() in self._aliases: added_commands = self._aliases[command.lower()] Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands)) for add_command in added_commands: self._real_commands.append(add_command) else: self._real_commands.append(command) Log.debug('Command list ready ') self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid) self.cmor = CMORConfig(parser, self.var_manager) self.thredds = THREDDSConfig(parser) self.report = ReportConfig(parser) def get_commands(self): """ Returns the list of commands after replacing the alias :return: full list of commands :rtype: list(str) """ return self._real_commands class CMORConfig(object): def __init__(self, parser, var_manager): self.force = parser.get_bool_option('CMOR', 'FORCE', False) self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False) self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '') self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True) self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True) self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True) self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS') self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled') self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled') self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled') self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1') self.initialization_number = parser.get_int_option('CMOR', 'INITIALIZATION_NUMBER', 1) self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled') self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1') self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled') self.version = parser.get_option('CMOR', 'VERSION', '') self.default_ocean_grid = parser.get_option('CMOR', 'DEFAULT_OCEAN_GRID', 'gn') self.default_atmos_grid = parser.get_option('CMOR', 'DEFAULT_ATMOS_GRID', 'gr') self.activity = parser.get_option('CMOR', 'ACTIVITY', 'CMIP') vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '') self.var_manager = var_manager if vars_string: self._variable_list = list() for domain_var in vars_string.split(' '): if domain_var.startswith('#'): break splitted = domain_var.split(':') cmor_var = self.var_manager.get_variable(splitted[1], silent=True) if not cmor_var: Log.warning('Variable {0} not recognized. It will not be cmorized', domain_var) continue if ModelingRealm(splitted[0]) != cmor_var.domain: Log.warning('Domain {0} for variable {1} is not correct: is {2}', splitted[0], cmor_var.short_name, cmor_var.domain) continue self._variable_list.append('{0.domain}:{0.short_name}'.format(cmor_var)) if len(self._variable_list) == 0: raise ConfigException('Variable list value is specified, but no variables were found') else: self._variable_list = None self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', '')) self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', '')) self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', '')) def cmorize(self, var_cmor): """ Checks if var_cmor is on variable list :param var_cmor: CMOR variable object :rtype var_cmor: Variablle :return: """ if self._variable_list is None: return True if not var_cmor: return False return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name) in self._variable_list def any_required(self, variables): if self._variable_list is None: return True for var in variables: if self.cmorize(self.var_manager.get_variable(var, silent=True)): return True return False def chunk_cmorization_requested(self, chunk): if len(self._chunks) == 0: return True return chunk in self._chunks @staticmethod def _parse_variables(raw_string): variables = dict() if raw_string: splitted = raw_string.split(',') for var_section in splitted: splitted_var = var_section.split(':') if len(splitted_var) == 1: levels = None else: levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:]))) variables[int(splitted_var[0])] = levels return variables @staticmethod def _parse_levels(levels_splitted): if len(levels_splitted) == 1: return map(int, levels_splitted[0].split('-')) start = int(levels_splitted[0]) end = int(levels_splitted[1]) if len(levels_splitted) == 3: step = int(levels_splitted[2]) else: step = 1 return range(start, end, step) def get_variables(self, frequency): if frequency in (Frequencies.three_hourly, Frequencies.six_hourly): return self._var_hourly elif frequency == Frequencies.daily: return self._var_daily elif frequency == Frequencies.monthly: return self._var_monthly raise ValueError('Frequency not recognized: {0}'.format(frequency)) def get_levels(self, frequency, variable): return self.get_variables(frequency)[variable] class THREDDSConfig(object): def __init__(self, parser): self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '') class ExperimentConfig(object): """ Encapsulates all chunk related tasks :param parser: parser for the config file :type parser: Parser """ def __init__(self, parser): self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE') self.expid = parser.get_option('EXPERIMENT', 'EXPID') self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid) self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS') self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1) self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc') self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0) members = [] for mem in self.members: if '-' in mem: start, end = mem.split('-') if start.startswith(self.member_prefix): start = start[len(self.member_prefix):] if end.startswith(self.member_prefix): end = end[len(self.member_prefix):] for member in range(int(start), int(end) + 1): members.append(member) else: if mem.startswith(self.member_prefix): mem = mem[len(self.member_prefix):] members.append(int(mem)) self.members = members startdates = parser.get_list_option('EXPERIMENT', 'STARTDATES') import exrex self.startdates = [] for startdate_pattern in startdates: for startdate in exrex.generate(startdate_pattern): self.startdates.append(startdate) self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE') self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS') self.chunk_list = parser.get_int_list_option('EXPERIMENT', 'CHUNK_LIST', []) self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard') self.model = parser.get_option('EXPERIMENT', 'MODEL') self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '') self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '') self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6) self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6) def get_chunk_list(self): """ Return a list with all the chunks :return: List containing tuples of startdate, member and chunk :rtype: tuple[str, int, int] """ chunk_list = list() for startdate in self.startdates: for member in self.members: if len(self.chunk_list) == 0: for chunk in range(1, self.num_chunks + 1): chunk_list.append((startdate, member, chunk)) else: for chunk in self.chunk_list: chunk_list.append((startdate, member, chunk)) return chunk_list def get_member_list(self): """ Return a list with all the members :return: List containing tuples of startdate and member :rtype: tuple[str, int, int] """ member_list = list() for startdate in self.startdates: for member in self.members: member_list.append((startdate, member)) return member_list def get_year_chunks(self, startdate, year): """ Get the list of chunks containing timesteps from the given year :param startdate: startdate to use :type startdate: str :param year: reference year :type year: int :return: list of chunks containing data from the given year :rtype: list[int] """ date = parse_date(startdate) chunks = list() for chunk in range(1, self.num_chunks + 1): chunk_start = self.get_chunk_start(date, chunk) if chunk_start.year > year: break elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month', self.calendar).year == year: chunks.append(chunk) return chunks def get_chunk_start(self, startdate, chunk): # noinspection PyTypeChecker if isinstance(startdate, six.string_types): startdate = parse_date(startdate) return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar) def get_chunk_start_str(self, startdate, chunk): return date2str(self.get_chunk_start(startdate, chunk)) def get_chunk_end(self, startdate, chunk): return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar) def get_chunk_end_str(self, startdate, chunk): return date2str(self.get_chunk_end(startdate, chunk)) def get_full_years(self, startdate): """ Returns the list of full years that are in the given startdate :param startdate: startdate to use :type startdate: str :return: list of full years :rtype: list[int] """ chunks_per_year = 12 / self.chunk_size date = parse_date(startdate) first_january = 0 first_year = date.year if date.month != 1: month = date.month first_year += 1 while month + self.chunk_size < 12: month += self.chunk_size first_january += 1 years = list() for chunk in range(first_january, chunks_per_year, self.num_chunks): years.append(first_year) first_year += 1 return years def get_member_str(self, member): """ Returns the member name for a given member number. :param member: member's number :type member: int :return: member's name :rtype: str """ return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits)) class ReportConfig(object): def __init__(self, parser): self.maximum_priority = parser.get_int_option('REPORT', 'MAXIMUM_PRIORITY', 10) self.path = parser.get_path_option('REPORT', 'PATH', '')