# coding=utf-8 import os from bscearth.utils.log import Log from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str from bscearth.utils.config_parser import ConfigParser from earthdiagnostics.frequency import Frequency, Frequencies from earthdiagnostics.variable import VariableManager from modelingrealm import ModelingRealm class Config(object): """ Class to read and manage the configuration :param path: path to the conf file :type path: str """ def __init__(self, path): parser = ConfigParser() parser.optionxform = str parser.read(path) # Read diags config self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS'), 'CMOR') "Scratch folder path" self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR') "Scratch folder path" self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks') "Common scratch folder for masks" self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR') "Root data folder path" self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp') "Data type (experiment, observation or reconstruction)" self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES') "Mask and meshes folder path" self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION', ('specs', 'primavera', 'cmip6'), 'specs', ignore_case=True) VariableManager().load_variables(self.data_convention) self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS') self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY')) "Default data frequency to be used by the diagnostics" self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '') "Path to CDFTOOLS executables" self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0) "Maximum number of cores to use" self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False) "If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence" # Read experiment config self.experiment = ExperimentConfig(parser) """ Configuration related to the experiment :rtype: ExperimentConfig """ # Read aliases self._aliases = dict() if parser.has_section('ALIAS'): for option in parser.options('ALIAS'): self._aliases[option.lower()] = parser.get_list_option('ALIAS', option) Log.debug('Preparing command list') commands = self._diags.split() self._real_commands = list() for command in commands: if command.lower() in self._aliases: added_commands = self._aliases[command.lower()] Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands)) for add_command in added_commands: self._real_commands.append(add_command) else: self._real_commands.append(command) Log.debug('Command list ready ') self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid) self.cmor = CMORConfig(parser) self.thredds = THREDDSConfig(parser) def get_commands(self): """ Returns the list of commands after replacing the alias :return: full list of commands :rtype: list(str) """ return self._real_commands class CMORConfig(object): def __init__(self, parser): self.force = parser.get_bool_option('CMOR', 'FORCE', False) self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False) self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '') self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True) self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True) self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True) self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS') self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled') self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled') self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled') self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1') self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled') self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1') self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled') vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '') var_manager = VariableManager() if vars_string: self._variable_list = list() for domain_var in vars_string.split(' '): if domain_var.startswith('#'): break splitted = domain_var.split(':') cmor_var = var_manager.get_variable(splitted[1], silent=True) if not cmor_var: continue if ModelingRealm(splitted[0]) != cmor_var.domain: Log.warning('Domain {0} for variable {1} is not correct: is {2}', splitted[0], cmor_var.short_name, cmor_var.domain) return self._variable_list.append('{0.domain}:{0.short_name}'.format(cmor_var)) else: self._variable_list = None self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', '')) self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', '')) self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', '')) def cmorize(self, var_cmor): """ Checks if var_cmor is on variable list :param var_cmor: CMOR variable object :rtype var_cmor: Variablle :return: """ if self._variable_list is None: return True if not var_cmor: return False return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name) in self._variable_list def any_required(self, variables): if self._variable_list is None: return True for var in variables: if self.cmorize(VariableManager().get_variable(var, silent=True)): return True return False def chunk_cmorization_requested(self, chunk): if len(self._chunks) == 0: return True return chunk in self._chunks @staticmethod def _parse_variables(raw_string): variables = dict() if raw_string: splitted = raw_string.split(',') for var_section in splitted: splitted_var = var_section.split(':') if len(splitted_var) == 1: levels = None else: levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:]))) variables[int(splitted_var[0])] = levels return variables @staticmethod def _parse_levels(levels_splitted): if len(levels_splitted) == 1: return map(int, levels_splitted[0].split('-')) start = int(levels_splitted[0]) end = int(levels_splitted[1]) if len(levels_splitted) == 3: step = int(levels_splitted[2]) else: step = 1 return range(start, end, step) def get_variables(self, frequency): if frequency in (Frequencies.three_hourly, Frequencies.six_hourly): return self._var_hourly elif frequency == Frequencies.daily: return self._var_daily elif frequency == Frequencies.monthly: return self._var_monthly raise Exception('Frequency not recognized: {0}'.format(frequency)) def get_levels(self, frequency, variable): return self.get_variables(frequency)[variable] class THREDDSConfig(object): def __init__(self, parser): self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '') class ExperimentConfig(object): """ Encapsulates all chunk related tasks :param parser: parser for the config file :type parser: Parser """ def __init__(self, parser): self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE') self.expid = parser.get_option('EXPERIMENT', 'EXPID') self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid) self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS') self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1) self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc') self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0) self.members = [int(mem) if mem.startswith(self.member_prefix) else int(mem) for mem in self.members] self.startdates = parser.get_option('EXPERIMENT', 'STARTDATES').split() self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE') self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS') self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard') self.model = parser.get_option('EXPERIMENT', 'MODEL') self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '') self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '') self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6) self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6) def get_chunk_list(self): """ Return a list with all the chunks :return: List containing tuples of startdate, member and chunk :rtype: tuple[str, int, int] """ chunk_list = list() for startdate in self.startdates: for member in self.members: for chunk in range(1, self.num_chunks + 1): chunk_list.append((startdate, member, chunk)) return chunk_list def get_member_list(self): """ Return a list with all the members :return: List containing tuples of startdate and member :rtype: tuple[str, int, int] """ member_list = list() for startdate in self.startdates: for member in self.members: member_list.append((startdate, member)) return member_list def get_year_chunks(self, startdate, year): """ Get the list of chunks containing timesteps from the given year :param startdate: startdate to use :type startdate: str :param year: reference year :type year: int :return: list of chunks containing data from the given year :rtype: list[int] """ date = parse_date(startdate) chunks = list() for chunk in range(1, self.num_chunks + 1): chunk_start = self.get_chunk_start(date, chunk) if chunk_start.year > year: break elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month', self.calendar).year == year: chunks.append(chunk) return chunks def get_chunk_start(self, startdate, chunk): if isinstance(startdate, basestring): startdate = parse_date(startdate) return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar) def get_chunk_start_str(self, startdate, chunk): return date2str(self.get_chunk_start(startdate, chunk)) def get_chunk_end(self, startdate, chunk): return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar) def get_chunk_end_str(self, startdate, chunk): return date2str(self.get_chunk_end(startdate, chunk)) def get_full_years(self, startdate): """ Returns the list of full years that are in the given startdate :param startdate: startdate to use :type startdate: str :return: list of full years :rtype: list[int] """ chunks_per_year = 12 / self.chunk_size date = parse_date(startdate) first_january = 0 first_year = date.year if date.month != 1: month = date.month first_year += 1 while month + self.chunk_size < 12: month += self.chunk_size first_january += 1 years = list() for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year): years.append(first_year) first_year += 1 return years def get_member_str(self, member): """ Returns the member name for a given member number. :param member: member's number :type member: int :return: member's name :rtype: str """ return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits))