Newer
Older
Javier Vegas-Regidor
committed
# coding=utf-8
import os
from bscearth.utils.log import Log
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str
from bscearth.utils.config_parser import ConfigParser
Javier Vegas-Regidor
committed
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.variable import VariableManager
Javier Vegas-Regidor
committed
class Config(object):
"""
Class to read and manage the configuration
:param path: path to the conf file
:type path: str
"""
Javier Vegas-Regidor
committed
def __init__(self, path):
Javier Vegas-Regidor
committed
parser.optionxform = str
parser.read(path)
# Read diags config
self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS'), 'CMOR')
self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR')
"Scratch folder path"
self.use_ramdisk = parser.get_bool_option('DIAGNOSTICS', 'USE_RAMDISK', False)
"If True, the scratch dir is created as a ram disk"
self.auto_clean = parser.get_bool_option('DIAGNOSTICS', 'AUTO_CLEAN', True)
"If True, the scratch dir is removed after finishing"
if not self.auto_clean and self.use_ramdisk:
Log.warning('RAM disk scratch dir is always automatically cleaned.')
self.auto_clean = True
self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks')
"Common scratch folder for masks"
self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR')
"Root data folder path"
self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp')
"Data type (experiment, observation or reconstruction)"
self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES')
"Mask and meshes folder path"
self.mesh_mask = parser.get_path_option('DIAGNOSTICS', 'MESH_MASK', '')
"Custom mesh mask file to use"
self.new_mask_glo = parser.get_path_option('DIAGNOSTICS', 'NEW_MASK_GLO', '')
"Custom new mask glo file to use"
self.mask_regions = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS', '')
"Custom mask regions file to use"
self.mask_regions_3d = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS_3D', '')
"Custom mask regions 3D file to use"
self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION',
('specs', 'primavera', 'cmip6'), 'specs', ignore_case=True)
VariableManager().load_variables(self.data_convention)
self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS')
self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY'))
"Default data frequency to be used by the diagnostics"
self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '')
"Path to CDFTOOLS executables"
self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0)
"Maximum number of cores to use"
Javier Vegas-Regidor
committed
self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False)
"If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence"
Javier Vegas-Regidor
committed
# Read experiment config
self.experiment = ExperimentConfig(parser)
"""
Configuration related to the experiment
:rtype: ExperimentConfig
"""
Javier Vegas-Regidor
committed
# Read aliases
self._aliases = dict()
if parser.has_section('ALIAS'):
for option in parser.options('ALIAS'):
self._aliases[option.lower()] = parser.get_list_option('ALIAS', option)
Javier Vegas-Regidor
committed
Log.debug('Preparing command list')
commands = self._diags.split()
Javier Vegas-Regidor
committed
self._real_commands = list()
for command in commands:
if command.lower() in self._aliases:
added_commands = self._aliases[command.lower()]
Javier Vegas-Regidor
committed
Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands))
for add_command in added_commands:
self._real_commands.append(add_command)
else:
self._real_commands.append(command)
Log.debug('Command list ready ')
self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid)
self.cmor = CMORConfig(parser)
self.thredds = THREDDSConfig(parser)
Javier Vegas-Regidor
committed
def get_commands(self):
"""
Returns the list of commands after replacing the alias
:return: full list of commands
:rtype: list(str)
"""
Javier Vegas-Regidor
committed
return self._real_commands
class CMORConfig(object):
def __init__(self, parser):
self.force = parser.get_bool_option('CMOR', 'FORCE', False)
self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False)
self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '')
Javier Vegas-Regidor
committed
self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True)
self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True)
Javier Vegas-Regidor
committed
self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True)
self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS')
Javier Vegas-Regidor
committed
self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled')
self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled')
self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled')
self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1')
self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled')
self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1')
self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled')
vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '')
if vars_string:
self._variable_list = list()
for domain_var in vars_string.split(' '):
if domain_var.startswith('#'):
break
splitted = domain_var.split(':')
cmor_var = var_manager.get_variable(splitted[1], silent=True)
if not cmor_var:
continue
if ModelingRealm(splitted[0]) != cmor_var.domain:
Log.warning('Domain {0} for variable {1} is not correct: is {2}', splitted[0], cmor_var.short_name,
cmor_var.domain)
return
self._variable_list.append('{0.domain}:{0.short_name}'.format(cmor_var))
else:
self._variable_list = None
self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', ''))
self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', ''))
self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', ''))
Javier Vegas-Regidor
committed
def cmorize(self, var_cmor):
"""
Checks if var_cmor is on variable list
:param var_cmor: CMOR variable object
:rtype var_cmor: Variablle
:return:
"""
if self._variable_list is None:
return True
if not var_cmor:
return False
return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name) in self._variable_list
def any_required(self, variables):
if self._variable_list is None:
return True
for var in variables:
if self.cmorize(VariableManager().get_variable(var, silent=True)):
return True
return False
def chunk_cmorization_requested(self, chunk):
if len(self._chunks) == 0:
return True
return chunk in self._chunks
@staticmethod
def _parse_variables(raw_string):
Javier Vegas-Regidor
committed
variables = dict()
if raw_string:
splitted = raw_string.split(',')
for var_section in splitted:
splitted_var = var_section.split(':')
if len(splitted_var) == 1:
levels = None
else:
levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:])))
variables[int(splitted_var[0])] = levels
return variables
@staticmethod
def _parse_levels(levels_splitted):
if len(levels_splitted) == 1:
return map(int, levels_splitted[0].split('-'))
start = int(levels_splitted[0])
end = int(levels_splitted[1])
if len(levels_splitted) == 3:
step = int(levels_splitted[2])
else:
step = 1
return range(start, end, step)
def get_variables(self, frequency):
if frequency in (Frequencies.three_hourly, Frequencies.six_hourly):
return self._var_hourly
elif frequency == Frequencies.daily:
return self._var_daily
elif frequency == Frequencies.monthly:
return self._var_monthly
raise Exception('Frequency not recognized: {0}'.format(frequency))
def get_levels(self, frequency, variable):
return self.get_variables(frequency)[variable]
Javier Vegas-Regidor
committed
class THREDDSConfig(object):
def __init__(self, parser):
self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '')
Javier Vegas-Regidor
committed
class ExperimentConfig(object):
"""
Encapsulates all chunk related tasks
:param parser: parser for the config file
:type parser: Parser
"""
def __init__(self, parser):
self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE')
self.expid = parser.get_option('EXPERIMENT', 'EXPID')
self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid)
Javier Vegas-Regidor
committed
self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS')
self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1)
Javier Vegas-Regidor
committed
self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc')
self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0)
members = []
for mem in self.members:
if '-' in mem:
start, end = mem.split('-')
if start.startswith(self.member_prefix):
start = start[len(self.member_prefix):]
if end.startswith(self.member_prefix):
end = end[len(self.member_prefix):]
for member in range(int(start), int(end) +1):
members.append(member)
else:
if mem.startswith(self.member_prefix):
mem = mem[len(self.member_prefix):]
members.append(int(mem))
self.members = members
self.startdates = parser.get_option('EXPERIMENT', 'STARTDATES').split()
self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS')
self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard')
Javier Vegas-Regidor
committed
self.model = parser.get_option('EXPERIMENT', 'MODEL')
self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '')
self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '')
self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6)
self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6)
Javier Vegas-Regidor
committed
def get_chunk_list(self):
"""
Return a list with all the chunks
:return: List containing tuples of startdate, member and chunk
:rtype: tuple[str, int, int]
"""
chunk_list = list()
for startdate in self.startdates:
for member in self.members:
for chunk in range(1, self.num_chunks + 1):
chunk_list.append((startdate, member, chunk))
return chunk_list
def get_member_list(self):
"""
Return a list with all the members
:return: List containing tuples of startdate and member
:rtype: tuple[str, int, int]
"""
member_list = list()
for startdate in self.startdates:
for member in self.members:
Javier Vegas-Regidor
committed
return member_list
def get_year_chunks(self, startdate, year):
"""
Get the list of chunks containing timesteps from the given year
:param startdate: startdate to use
:type startdate: str
:param year: reference year
:type year: int
:return: list of chunks containing data from the given year
:rtype: list[int]
"""
date = parse_date(startdate)
chunks = list()
for chunk in range(1, self.num_chunks + 1):
chunk_start = self.get_chunk_start(date, chunk)
Javier Vegas-Regidor
committed
if chunk_start.year > year:
break
elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
self.calendar).year == year:
chunks.append(chunk)
return chunks
def get_chunk_start(self, startdate, chunk):
if isinstance(startdate, basestring):
startdate = parse_date(startdate)
return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar)
def get_chunk_start_str(self, startdate, chunk):
return date2str(self.get_chunk_start(startdate, chunk))
def get_chunk_end(self, startdate, chunk):
return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar)
def get_chunk_end_str(self, startdate, chunk):
return date2str(self.get_chunk_end(startdate, chunk))
Javier Vegas-Regidor
committed
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def get_full_years(self, startdate):
"""
Returns the list of full years that are in the given startdate
:param startdate: startdate to use
:type startdate: str
:return: list of full years
:rtype: list[int]
"""
chunks_per_year = 12 / self.chunk_size
date = parse_date(startdate)
first_january = 0
first_year = date.year
if date.month != 1:
month = date.month
first_year += 1
while month + self.chunk_size < 12:
month += self.chunk_size
first_january += 1
years = list()
for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year):
years.append(first_year)
first_year += 1
return years
def get_member_str(self, member):
"""
Returns the member name for a given member number.
:param member: member's number
:type member: int
:return: member's name
:rtype: str
"""
Javier Vegas-Regidor
committed
return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits))