Newer
Older
Javier Vegas-Regidor
committed
# coding=utf-8
"""Classes to manage Earth Diagnostics configuration"""
Javier Vegas-Regidor
committed
import os
from bscearth.utils.config_parser import ConfigParser
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str, add_years, add_months, add_days
Javier Vegas-Regidor
committed
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealm
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.data_convention import SPECSConvention, CMIP6Convention, PrimaveraConvention, \
MeteoFranceConvention, PrefaceConvention
Javier Vegas-Regidor
committed
"""Exception raised when there is a problem with the configuration"""
class Config(object):
"""Class to read and manage the configuration"""
def __init__(self):
# Read diags config
self.data_adaptor = None
"Scratch folder path"
self.scratch_dir = None
"Scratch folder path"
self.use_ramdisk = None
"If True, the scratch dir is created as a ram disk"
self.auto_clean = None
"If True, the scratch dir is removed after finishing"
self.scratch_masks = None
"Common scratch folder for masks"
self.data_dir = None
"Root data folder path"
self.data_type = None
"Data type (experiment, observation or reconstruction)"
self.con_files = None
"Mask and meshes folder path"
self.mesh_mask = None
"Custom mesh mask file to use"
self.new_mask_glo = None
"Custom new mask glo file to use"
self.mask_regions = None
"Custom mask regions file to use"
self.mask_regions_3d = None
"Custom mask regions 3D file to use"
self.data_convention = None
"Data convention to use"
self.var_manager = None
self.skip_diags_done = None
"Flag to control if already done diags must be recalculated"
self.frequency = None
"Default data frequency to be used by the diagnostics"
self.cdftools_path = None
"Path to CDFTOOLS executables"
self.max_cores = None
"Maximum number of cores to use"
self.parallel_downloads = None
"Maximum number of simultaneous downloads"
self.parallel_uploads = None
"Maximum number of simultaneous uploads"
self.restore_meshes = None
"If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence"
# Read experiment config
self.experiment = ExperimentConfig()
"""
Configuration related to the experiment
Returns
-------
ExperimentConfig
"""
self.cmor = None
"""
CMOR related configuration
Returns
-------
CMORConfig
"""
self.thredds = None
"""
THREDDS server configuration
Returns
-------
THREDDSConfig
"""
self.report = None
"""
Reporting configuration
Returns
-------
ReportConfig
"""
"""
Read configuration from INI file
Parameters
----------
path: str
"""
config_file_path = bscearth.utils.path.expand_path(path)
if not os.path.isfile(config_file_path):
Log.critical('Configuration file {0} can not be found', config_file_path)
raise ValueError('Configuration file {0} can not be found'.format(config_file_path))
Javier Vegas-Regidor
committed
parser.optionxform = str
Javier Vegas-Regidor
committed
# Read diags config
self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS', 'OBSRECON'),
'CMOR')
self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR')
self.use_ramdisk = parser.get_bool_option('DIAGNOSTICS', 'USE_RAMDISK', False)
self.auto_clean = parser.get_bool_option('DIAGNOSTICS', 'AUTO_CLEAN', True)
if not self.auto_clean and self.use_ramdisk:
Log.warning('RAM disk scratch dir is always automatically cleaned.')
self.auto_clean = True
self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks')
self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR')
self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp')
self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES')
self.mesh_mask = parser.get_path_option('DIAGNOSTICS', 'MESH_MASK', '')
self.new_mask_glo = parser.get_path_option('DIAGNOSTICS', 'NEW_MASK_GLO', '')
self.mask_regions = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS', '')
self.mask_regions_3d = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS_3D', '')
data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION',
('specs', 'primavera', 'cmip6', 'preface', 'meteofrance'),
'specs',
ignore_case=True)
if data_convention == 'specs':
self.data_convention = SPECSConvention(data_convention, self)
elif data_convention == 'primavera':
self.data_convention = PrimaveraConvention(data_convention, self)
elif data_convention == 'cmip6':
self.data_convention = CMIP6Convention(data_convention, self)
elif data_convention == 'preface':
self.data_convention = PrefaceConvention(data_convention, self)
elif data_convention == 'meteofrance':
self.data_convention = MeteoFranceConvention(data_convention, self)
self.scratch_masks = self.data_convention.get_scratch_masks(self.scratch_masks)
namelist_file = os.path.join(os.path.dirname(__file__),
'CDFTOOLS_{0}.namlist'.format(self.data_convention.name))
Log.debug(namelist_file)
if os.path.isfile(namelist_file):
Log.debug('Setting namelist {0}', namelist_file)
os.environ['NAM_CDF_NAMES'] = namelist_file
self.var_manager = VariableManager()
self.var_manager.load_variables(data_convention)
self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS')
self.skip_diags_done = parser.get_bool_option('DIAGNOSTICS', 'SKIP_DIAGS_DONE', True)
self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY'))
self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '')
self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0)
self.parallel_downloads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_DOWNLOADS', 1)
self.parallel_uploads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_UPLOADS', 1)
Javier Vegas-Regidor
committed
self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False)
# Read experiment config
Javier Vegas-Regidor
committed
# Read aliases
self._aliases = dict()
if parser.has_section('ALIAS'):
for option in parser.options('ALIAS'):
self._aliases[option.lower()] = parser.get_list_option('ALIAS', option)
Javier Vegas-Regidor
committed
Log.debug('Preparing command list')
commands = self._diags.split()
Javier Vegas-Regidor
committed
self._real_commands = list()
for command in commands:
command = command.strip()
if command.startswith('#'):
break
if command.lower() in self._aliases:
added_commands = self._aliases[command.lower()]
Javier Vegas-Regidor
committed
Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands))
for add_command in added_commands:
self._real_commands.append(add_command)
else:
self._real_commands.append(command)
Log.debug('Command list ready ')
self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid)
self.cmor = CMORConfig(parser, self.var_manager)
self.thredds = THREDDSConfig(parser)
Javier Vegas-Regidor
committed
def get_commands(self):
Return the list of commands after replacing the alias
Returns
-------
iterable of str
Javier Vegas-Regidor
committed
return self._real_commands
Javier Vegas-Regidor
committed
class CMORConfig(object):
"""
Configuration for the cmorization processes
Parameters
----------
parser: ConfigParser
var_manager: VariableManager
"""
def __init__(self, parser, var_manager):
Javier Vegas-Regidor
committed
self.force = parser.get_bool_option('CMOR', 'FORCE', False)
self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False)
self.skip_prepare = parser.get_bool_option('CMOR', 'SKIP_PREPARE', False)
self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '')
Javier Vegas-Regidor
committed
self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True)
self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True)
Javier Vegas-Regidor
committed
self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True)
self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS')
Javier Vegas-Regidor
committed
self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled')
self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled')
self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled')
self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1')
self.initialization_number = parser.get_int_option('CMOR', 'INITIALIZATION_NUMBER', 1)
Javier Vegas-Regidor
committed
self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled')
self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1')
self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled')
self.version = parser.get_option('CMOR', 'VERSION', '')
self.default_ocean_grid = parser.get_option('CMOR', 'DEFAULT_OCEAN_GRID', 'gn')
self.default_atmos_grid = parser.get_option('CMOR', 'DEFAULT_ATMOS_GRID', 'gr')
self.activity = parser.get_option('CMOR', 'ACTIVITY', 'CMIP')
self.min_cmorized_vars = parser.get_int_option('CMOR', 'MIN_CMORIZED_VARS', 10)
self.append_startdate = parser.get_bool_option('CMOR', 'APPEND_STARTDATE', False)
Javier Vegas-Regidor
committed
vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '')
self.var_manager = var_manager
if vars_string:
self._variable_list = list()
for domain_var in vars_string.split(' '):
if domain_var.startswith('#'):
break
cmor_var = self.var_manager.get_variable(splitted[1], silent=True)
Log.warning('Variable {0} not recognized. It will not be cmorized', domain_var)
continue
if ModelingRealm(splitted[0]) != cmor_var.domain:
Log.warning('Domain {0} for variable {1} is not correct: is {2}', splitted[0], cmor_var.short_name,
cmor_var.domain)
self._variable_list.append('{0.domain}:{0.short_name}'.format(cmor_var))
if len(self._variable_list) == 0:
raise ConfigException('Variable list value is specified, but no variables were found')
else:
self._variable_list = None
self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', ''))
self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', ''))
self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', ''))
Javier Vegas-Regidor
committed
def cmorize(self, var_cmor):
"""
Check if var_cmor is on variable list
Parameters
----------
var_cmor: Variable
"""
if self._variable_list is None:
return True
if not var_cmor:
return False
return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name) in self._variable_list
def any_required(self, variables):
"""
Check if any of the given variables is needed for cmorization
Parameters
----------
variables: iterable of str
Returns
-------
bool
"""
if self._variable_list is None:
return True
for var in variables:
if self.cmorize(self.var_manager.get_variable(var, silent=True)):
return True
return False
def chunk_cmorization_requested(self, chunk):
"""
Check if the cmorization of a given chunk is required
Parameters
----------
chunk: int
Returns
-------
bool
"""
if len(self._chunks) == 0:
return True
return chunk in self._chunks
@staticmethod
def _parse_variables(raw_string):
Javier Vegas-Regidor
committed
variables = dict()
if raw_string:
splitted = raw_string.split(',')
for var_section in splitted:
splitted_var = var_section.split(':')
if len(splitted_var) == 1:
levels = None
else:
levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:])))
variables[int(splitted_var[0])] = levels
return variables
@staticmethod
def _parse_levels(levels_splitted):
if len(levels_splitted) == 1:
return map(int, levels_splitted[0].split('-'))
start = int(levels_splitted[0])
end = int(levels_splitted[1])
if len(levels_splitted) == 3:
step = int(levels_splitted[2])
else:
step = 1
return range(start, end, step)
def get_variables(self, frequency):
"""
Get the variables to get from the grib file for a given frequency
Parameters
----------
frequency: Frequency
Returns
-------
str
Raises
------
ValueError
If the frequency passed is not supported
"""
if frequency in (Frequencies.three_hourly, Frequencies.six_hourly):
return self._var_hourly
elif frequency == Frequencies.daily:
return self._var_daily
elif frequency == Frequencies.monthly:
return self._var_monthly
raise ValueError('Frequency not recognized: {0}'.format(frequency))
"""
Get all the codes to be extracted from the grib files
Returns
-------
set of int
"""
return set(list(self._var_hourly.keys()) + list(self._var_daily.keys()) + list(self._var_monthly.keys()))
def get_levels(self, frequency, variable):
"""
Get the levels to extract for a given variable
Parameters
----------
frequency: Frequency
variable: str
Returns
-------
iterable of int
"""
return self.get_variables(frequency)[variable]
Javier Vegas-Regidor
committed
class THREDDSConfig(object):
"""
Configuration related to the THREDDS server
Parameters
----------
parser: ConfigParser
"""
def __init__(self, parser):
self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '')
Javier Vegas-Regidor
committed
class ExperimentConfig(object):
"""Configuration related to the experiment"""
Javier Vegas-Regidor
committed
def __init__(self):
self.chunk_list = None
Javier Vegas-Regidor
committed
def parse_ini(self, parser):
"""
Parse experiment section from INI-like file
Javier Vegas-Regidor
committed
Parameters
----------
parser: ConfigParser
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE')
self.expid = parser.get_option('EXPERIMENT', 'EXPID')
self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid)
Javier Vegas-Regidor
committed
self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS')
self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1)
Javier Vegas-Regidor
committed
self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc')
self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0)
self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard')
self._parse_startdates(parser)
self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS')
self.chunk_list = parser.get_int_list_option('EXPERIMENT', 'CHUNK_LIST', [])
self.model = parser.get_option('EXPERIMENT', 'MODEL')
self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '')
self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '')
self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6)
self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6)
def _parse_startdates(self, parser):
startdates = parser.get_list_option('EXPERIMENT', 'STARTDATES')
import exrex
self.startdates = []
for startdate_pattern in startdates:
startdate_pattern = startdate_pattern.strip()
if not startdate_pattern:
continue
if startdate_pattern[0] == '{' and startdate_pattern[-1] == '}':
self._read_startdates(startdate_pattern[1:-1])
else:
for startdate in exrex.generate(startdate_pattern):
members = []
for mem in self.members:
if '-' in mem:
start, end = mem.split('-')
if start.startswith(self.member_prefix):
start = start[len(self.member_prefix):]
if end.startswith(self.member_prefix):
end = end[len(self.member_prefix):]
members.append(member)
else:
if mem.startswith(self.member_prefix):
mem = mem[len(self.member_prefix):]
members.append(int(mem))
self.members = members
def _read_startdates(self, pattern):
pattern = pattern.split(',')
start = parse_date(pattern[0].strip())
end = parse_date(pattern[1].strip())
interval = pattern[2].strip()
if len(interval) == 1:
factor = 1
else:
factor = int(interval[0:-1])
interval = interval[-1].upper()
while start <= end:
self.startdates.append(date2str(start))
if interval == 'Y':
start = add_years(start, factor)
elif interval == 'M':
start = add_months(start, factor, cal=self.calendar)
elif interval == 'W':
start = add_days(start, factor * 7, cal=self.calendar)
elif interval == 'D':
start = add_days(start, factor, cal=self.calendar)
else:
raise ConfigException('Interval {0} not supported in STARTDATES definition: {1}', interval, pattern)
Javier Vegas-Regidor
committed
def get_chunk_list(self):
"""
Return a list with all the chunks
Javier Vegas-Regidor
committed
:return: List containing tuples of startdate, member and chunk
:rtype: tuple[str, int, int]
"""
chunk_list = list()
for startdate in self.startdates:
for member in self.members:
if len(self.chunk_list) == 0:
for chunk in range(1, self.num_chunks + 1):
chunk_list.append((startdate, member, chunk))
else:
for chunk in self.chunk_list:
chunk_list.append((startdate, member, chunk))
Javier Vegas-Regidor
committed
return chunk_list
def get_member_list(self):
"""
Return a list with all the members
Javier Vegas-Regidor
committed
:return: List containing tuples of startdate and member
:rtype: tuple[str, int, int]
"""
member_list = list()
for startdate in self.startdates:
for member in self.members:
Javier Vegas-Regidor
committed
return member_list
def get_year_chunks(self, startdate, year):
"""
Get the list of chunks containing timesteps from the given year
Javier Vegas-Regidor
committed
:param startdate: startdate to use
:type startdate: str
:param year: reference year
:type year: int
:return: list of chunks containing data from the given year
:rtype: list[int]
"""
date = parse_date(startdate)
chunks = list()
for chunk in range(1, self.num_chunks + 1):
chunk_start = self.get_chunk_start(date, chunk)
Javier Vegas-Regidor
committed
if chunk_start.year > year:
break
elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
self.calendar).year == year:
chunks.append(chunk)
return chunks
def get_chunk_start(self, startdate, chunk):
"""
Get chunk's first day
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
datetime.datetime
See Also
--------
get_chunk_start_str
"""
startdate = parse_date(startdate)
return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar)
def get_chunk_start_str(self, startdate, chunk):
"""
Get chunk's first day string representation
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
str
See Also
--------
get_chunk_start
"""
return date2str(self.get_chunk_start(startdate, chunk))
def get_chunk_end(self, startdate, chunk):
"""
Get chunk's last day
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
datetime.datetime
See Also
--------
get_chunk_end_str
"""
return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar)
def get_chunk_end_str(self, startdate, chunk):
"""
Get chunk's last day as a string
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
datetime.datetime
See Also
--------
get_chunk_end
"""
return date2str(self.get_chunk_end(startdate, chunk))
Javier Vegas-Regidor
committed
def get_full_years(self, startdate):
"""
Return the list of full years that are in the given startdate
Javier Vegas-Regidor
committed
:param startdate: startdate to use
:type startdate: str
:return: list of full years
:rtype: list[int]
Javier Vegas-Regidor
committed
"""
chunks_per_year = 12 // self.chunk_size
Javier Vegas-Regidor
committed
date = parse_date(startdate)
first_january = 0
first_year = date.year
if date.month != 1:
month = date.month
first_year += 1
while month + self.chunk_size < 12:
month += self.chunk_size
first_january += 1
years = list()
for _ in range(first_january, self.num_chunks, chunks_per_year,):
Javier Vegas-Regidor
committed
years.append(first_year)
first_year += 1
return years
def get_member_str(self, member):
"""
Return the member name for a given member number.
Javier Vegas-Regidor
committed
:param member: member's number
:type member: int
:return: member's name
:rtype: str
Javier Vegas-Regidor
committed
"""
Javier Vegas-Regidor
committed
return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits))
Javier Vegas-Regidor
committed
"""
Configuration for the reporting feature
Parameters
----------
parser: ConfigParser
"""
def __init__(self, parser):
self.maximum_priority = parser.get_int_option('REPORT', 'MAXIMUM_PRIORITY', 10)
self.path = parser.get_path_option('REPORT', 'PATH', '')