Newer
Older
Javier Vegas-Regidor
committed
# coding=utf-8
import os
from bscearth.utils.config_parser import ConfigParser
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str
from bscearth.utils.log import Log
Javier Vegas-Regidor
committed
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.variable import VariableManager
Javier Vegas-Regidor
committed
class ConfigException(Exception):
pass
class Config(object):
"""
Class to read and manage the configuration
:param path: path to the conf file
:type path: str
"""
Javier Vegas-Regidor
committed
def __init__(self, path):
Javier Vegas-Regidor
committed
parser.optionxform = str
parser.read(path)
# Read diags config
self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS', 'OBSRECON'),
'CMOR')
self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR')
"Scratch folder path"
self.use_ramdisk = parser.get_bool_option('DIAGNOSTICS', 'USE_RAMDISK', False)
"If True, the scratch dir is created as a ram disk"
self.auto_clean = parser.get_bool_option('DIAGNOSTICS', 'AUTO_CLEAN', True)
"If True, the scratch dir is removed after finishing"
if not self.auto_clean and self.use_ramdisk:
Log.warning('RAM disk scratch dir is always automatically cleaned.')
self.auto_clean = True
self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks')
"Common scratch folder for masks"
self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR')
"Root data folder path"
self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp')
"Data type (experiment, observation or reconstruction)"
self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES')
"Mask and meshes folder path"
self.mesh_mask = parser.get_path_option('DIAGNOSTICS', 'MESH_MASK', '')
"Custom mesh mask file to use"
self.new_mask_glo = parser.get_path_option('DIAGNOSTICS', 'NEW_MASK_GLO', '')
"Custom new mask glo file to use"
self.mask_regions = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS', '')
"Custom mask regions file to use"
self.mask_regions_3d = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS_3D', '')
"Custom mask regions 3D file to use"
self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION',
('specs', 'primavera', 'cmip6', 'preface'), 'specs',
ignore_case=True)
self.var_manager = VariableManager()
self.var_manager.load_variables(self.data_convention)
self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS')
self.skip_diags_done = parser.get_bool_option('DIAGNOSTICS', 'SKIP_DIAGS_DONE', True)
self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY'))
"Default data frequency to be used by the diagnostics"
self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '')
"Path to CDFTOOLS executables"
self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0)
"Maximum number of cores to use"
self.parallel_downloads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_DOWNLOADS', 1)
"Maximum number of simultaneous downloads"
self.parallel_uploads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_UPLOADS', 1)
"Maximum number of simultaneous uploads"
Javier Vegas-Regidor
committed
self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False)
"If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence"
Javier Vegas-Regidor
committed
# Read experiment config
self.experiment = ExperimentConfig(parser)
"""
Configuration related to the experiment
:rtype: ExperimentConfig
"""
Javier Vegas-Regidor
committed
# Read aliases
self._aliases = dict()
if parser.has_section('ALIAS'):
for option in parser.options('ALIAS'):
self._aliases[option.lower()] = parser.get_list_option('ALIAS', option)
Javier Vegas-Regidor
committed
Log.debug('Preparing command list')
commands = self._diags.split()
Javier Vegas-Regidor
committed
self._real_commands = list()
for command in commands:
if command.lower() in self._aliases:
added_commands = self._aliases[command.lower()]
Javier Vegas-Regidor
committed
Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands))
for add_command in added_commands:
self._real_commands.append(add_command)
else:
self._real_commands.append(command)
Log.debug('Command list ready ')
self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid)
self.cmor = CMORConfig(parser, self.var_manager)
self.thredds = THREDDSConfig(parser)
Javier Vegas-Regidor
committed
def get_commands(self):
"""
Returns the list of commands after replacing the alias
:return: full list of commands
:rtype: list(str)
"""
Javier Vegas-Regidor
committed
return self._real_commands
class CMORConfig(object):
def __init__(self, parser, var_manager):
Javier Vegas-Regidor
committed
self.force = parser.get_bool_option('CMOR', 'FORCE', False)
self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False)
self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '')
Javier Vegas-Regidor
committed
self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True)
self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True)
Javier Vegas-Regidor
committed
self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True)
self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS')
Javier Vegas-Regidor
committed
self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled')
self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled')
self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled')
self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1')
self.initialization_number = parser.get_int_option('CMOR', 'INITIALIZATION_NUMBER', 1)
Javier Vegas-Regidor
committed
self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled')
self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1')
self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled')
self.version = parser.get_option('CMOR', 'VERSION', '')
self.default_ocean_grid = parser.get_option('CMOR', 'DEFAULT_OCEAN_GRID', 'gn')
self.default_atmos_grid = parser.get_option('CMOR', 'DEFAULT_ATMOS_GRID', 'gr')
self.activity = parser.get_option('CMOR', 'ACTIVITY', 'CMIP')
self.min_cmorized_vars = parser.get_int_option('CMOR', 'MIN_CMORIZED_VARS', 10)
Javier Vegas-Regidor
committed
vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '')
self.var_manager = var_manager
if vars_string:
self._variable_list = list()
for domain_var in vars_string.split(' '):
if domain_var.startswith('#'):
break
splitted = domain_var.split(':')
cmor_var = self.var_manager.get_variable(splitted[1], silent=True)
Log.warning('Variable {0} not recognized. It will not be cmorized', domain_var)
continue
if ModelingRealm(splitted[0]) != cmor_var.domain:
Log.warning('Domain {0} for variable {1} is not correct: is {2}', splitted[0], cmor_var.short_name,
cmor_var.domain)
self._variable_list.append('{0.domain}:{0.short_name}'.format(cmor_var))
if len(self._variable_list) == 0:
raise ConfigException('Variable list value is specified, but no variables were found')
else:
self._variable_list = None
self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', ''))
self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', ''))
self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', ''))
Javier Vegas-Regidor
committed
def cmorize(self, var_cmor):
"""
Checks if var_cmor is on variable list
:param var_cmor: CMOR variable object
:rtype var_cmor: Variablle
:return:
"""
if self._variable_list is None:
return True
if not var_cmor:
return False
return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name) in self._variable_list
def any_required(self, variables):
if self._variable_list is None:
return True
for var in variables:
if self.cmorize(self.var_manager.get_variable(var, silent=True)):
return True
return False
def chunk_cmorization_requested(self, chunk):
if len(self._chunks) == 0:
return True
return chunk in self._chunks
@staticmethod
def _parse_variables(raw_string):
Javier Vegas-Regidor
committed
variables = dict()
if raw_string:
splitted = raw_string.split(',')
for var_section in splitted:
splitted_var = var_section.split(':')
if len(splitted_var) == 1:
levels = None
else:
levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:])))
variables[int(splitted_var[0])] = levels
return variables
@staticmethod
def _parse_levels(levels_splitted):
if len(levels_splitted) == 1:
return map(int, levels_splitted[0].split('-'))
start = int(levels_splitted[0])
end = int(levels_splitted[1])
if len(levels_splitted) == 3:
step = int(levels_splitted[2])
else:
step = 1
return range(start, end, step)
def get_variables(self, frequency):
if frequency in (Frequencies.three_hourly, Frequencies.six_hourly):
return self._var_hourly
elif frequency == Frequencies.daily:
return self._var_daily
elif frequency == Frequencies.monthly:
return self._var_monthly
raise ValueError('Frequency not recognized: {0}'.format(frequency))
def get_requested_codes(self):
return set(list(self._var_hourly.keys()) + list(self._var_daily.keys()) + list(self._var_monthly.keys()))
def get_levels(self, frequency, variable):
return self.get_variables(frequency)[variable]
Javier Vegas-Regidor
committed
class THREDDSConfig(object):
def __init__(self, parser):
self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '')
Javier Vegas-Regidor
committed
class ExperimentConfig(object):
"""
Encapsulates all chunk related tasks
:param parser: parser for the config file
:type parser: Parser
"""
def __init__(self, parser):
self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE')
self.expid = parser.get_option('EXPERIMENT', 'EXPID')
self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid)
Javier Vegas-Regidor
committed
self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS')
self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1)
Javier Vegas-Regidor
committed
self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc')
self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0)
members = []
for mem in self.members:
if '-' in mem:
start, end = mem.split('-')
if start.startswith(self.member_prefix):
start = start[len(self.member_prefix):]
if end.startswith(self.member_prefix):
end = end[len(self.member_prefix):]
members.append(member)
else:
if mem.startswith(self.member_prefix):
mem = mem[len(self.member_prefix):]
members.append(int(mem))
self.members = members
startdates = parser.get_list_option('EXPERIMENT', 'STARTDATES')
import exrex
self.startdates = []
for startdate_pattern in startdates:
for startdate in exrex.generate(startdate_pattern):
self.startdates.append(startdate)
self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS')
self.chunk_list = parser.get_int_list_option('EXPERIMENT', 'CHUNK_LIST', [])
self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard')
Javier Vegas-Regidor
committed
self.model = parser.get_option('EXPERIMENT', 'MODEL')
self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '')
self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '')
self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6)
self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6)
Javier Vegas-Regidor
committed
def get_chunk_list(self):
"""
Return a list with all the chunks
:return: List containing tuples of startdate, member and chunk
:rtype: tuple[str, int, int]
"""
chunk_list = list()
for startdate in self.startdates:
for member in self.members:
if len(self.chunk_list) == 0:
for chunk in range(1, self.num_chunks + 1):
chunk_list.append((startdate, member, chunk))
else:
for chunk in self.chunk_list:
chunk_list.append((startdate, member, chunk))
Javier Vegas-Regidor
committed
return chunk_list
def get_member_list(self):
"""
Return a list with all the members
:return: List containing tuples of startdate and member
:rtype: tuple[str, int, int]
"""
member_list = list()
for startdate in self.startdates:
for member in self.members:
Javier Vegas-Regidor
committed
return member_list
def get_year_chunks(self, startdate, year):
"""
Get the list of chunks containing timesteps from the given year
:param startdate: startdate to use
:type startdate: str
:param year: reference year
:type year: int
:return: list of chunks containing data from the given year
:rtype: list[int]
"""
date = parse_date(startdate)
chunks = list()
for chunk in range(1, self.num_chunks + 1):
chunk_start = self.get_chunk_start(date, chunk)
Javier Vegas-Regidor
committed
if chunk_start.year > year:
break
elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
self.calendar).year == year:
chunks.append(chunk)
return chunks
def get_chunk_start(self, startdate, chunk):
startdate = parse_date(startdate)
return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar)
def get_chunk_start_str(self, startdate, chunk):
return date2str(self.get_chunk_start(startdate, chunk))
def get_chunk_end(self, startdate, chunk):
return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar)
def get_chunk_end_str(self, startdate, chunk):
return date2str(self.get_chunk_end(startdate, chunk))
Javier Vegas-Regidor
committed
def get_full_years(self, startdate):
"""
Returns the list of full years that are in the given startdate
:param startdate: startdate to use
:type startdate: str
:return: list of full years
:rtype: list[int]
"""
chunks_per_year = 12 / self.chunk_size
date = parse_date(startdate)
first_january = 0
first_year = date.year
if date.month != 1:
month = date.month
first_year += 1
while month + self.chunk_size < 12:
month += self.chunk_size
first_january += 1
years = list()
for chunk in range(first_january, chunks_per_year, self.num_chunks):
Javier Vegas-Regidor
committed
years.append(first_year)
first_year += 1
return years
def get_member_str(self, member):
"""
Returns the member name for a given member number.
:param member: member's number
:type member: int
:return: member's name
:rtype: str
"""
Javier Vegas-Regidor
committed
return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits))
Javier Vegas-Regidor
committed
class ReportConfig(object):
def __init__(self, parser):
self.maximum_priority = parser.get_int_option('REPORT', 'MAXIMUM_PRIORITY', 10)
self.path = parser.get_path_option('REPORT', 'PATH', '')