Newer
Older
Javier Vegas-Regidor
committed
# coding=utf-8
"""Classes to manage Earth Diagnostics configuration"""
Javier Vegas-Regidor
committed
import os
from bscearth.utils.config_parser import ConfigParser
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str, add_years, add_months, add_days
Javier Vegas-Regidor
committed
from earthdiagnostics import cdftools
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealm
from earthdiagnostics.variable import VariableManager
Javier Vegas-Regidor
committed
"""Exception raised when there is a problem with the configuration"""
class Config(object):
"""
Class to read and manage the configuration
Parameters
----------
path: str
Path to the configuration file
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
def __init__(self):
# Read diags config
self.data_adaptor = None
"Scratch folder path"
self.scratch_dir = None
"Scratch folder path"
self.use_ramdisk = None
"If True, the scratch dir is created as a ram disk"
self.auto_clean = None
"If True, the scratch dir is removed after finishing"
self.scratch_masks = None
"Common scratch folder for masks"
self.data_dir = None
"Root data folder path"
self.data_type = None
"Data type (experiment, observation or reconstruction)"
self.con_files = None
"Mask and meshes folder path"
self.mesh_mask = None
"Custom mesh mask file to use"
self.new_mask_glo = None
"Custom new mask glo file to use"
self.mask_regions = None
"Custom mask regions file to use"
self.mask_regions_3d = None
"Custom mask regions 3D file to use"
self.data_convention = None
"Data convention to use"
self.var_manager = None
self.skip_diags_done = None
"Flag to control if already done diags must be recalculated"
self.frequency = None
"Default data frequency to be used by the diagnostics"
self.cdftools_path = None
"Path to CDFTOOLS executables"
self.max_cores = None
"Maximum number of cores to use"
self.parallel_downloads = None
"Maximum number of simultaneous downloads"
self.parallel_uploads = None
"Maximum number of simultaneous uploads"
self.restore_meshes = None
"If True, forces the tool to copy all the mesh and mask files for the model, regardless of existence"
# Read experiment config
self.experiment = ExperimentConfig()
"""
Configuration related to the experiment
Returns
-------
ExperimentConfig
"""
self.cmor = None
"""
CMOR related configuration
Returns
-------
CMORConfig
"""
self.thredds = None
"""
THREDDS server configuration
Returns
-------
THREDDSConfig
"""
self.report = None
"""
Reporting configuration
Returns
-------
ReportConfig
"""
Javier Vegas-Regidor
committed
parser.optionxform = str
parser.read(path)
# Read diags config
self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS', 'OBSRECON'),
'CMOR')
self.scratch_dir = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_DIR')
self.use_ramdisk = parser.get_bool_option('DIAGNOSTICS', 'USE_RAMDISK', False)
self.auto_clean = parser.get_bool_option('DIAGNOSTICS', 'AUTO_CLEAN', True)
if not self.auto_clean and self.use_ramdisk:
Log.warning('RAM disk scratch dir is always automatically cleaned.')
self.auto_clean = True
self.scratch_masks = parser.get_path_option('DIAGNOSTICS', 'SCRATCH_MASKS', '/scratch/Earth/ocean_masks')
self.data_dir = parser.get_path_option('DIAGNOSTICS', 'DATA_DIR')
self.data_type = parser.get_choice_option('DIAGNOSTICS', 'DATA_TYPE', ('exp', 'obs', 'recon'), 'exp')
self.con_files = parser.get_path_option('DIAGNOSTICS', 'CON_FILES')
self.mesh_mask = parser.get_path_option('DIAGNOSTICS', 'MESH_MASK', '')
self.new_mask_glo = parser.get_path_option('DIAGNOSTICS', 'NEW_MASK_GLO', '')
self.mask_regions = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS', '')
self.mask_regions_3d = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS_3D', '')
self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION',
('specs', 'primavera', 'cmip6', 'preface', 'meteofrance'),
'specs',
if self.data_convention in ('primavera', 'cmip6'):
self.scratch_masks = os.path.join(self.scratch_masks, self.data_convention)
cdftools.data_convention = self.data_convention
namelist_file = os.path.join(os.path.dirname(__file__), 'CDFTOOLS_{0}.namlist'.format(self.data_convention))
Log.debug(namelist_file)
if os.path.isfile(namelist_file):
Log.debug('Setting namelist {0}', namelist_file)
os.environ['NAM_CDF_NAMES'] = namelist_file
self.var_manager = VariableManager()
self.var_manager.load_variables(self.data_convention)
self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS')
self.skip_diags_done = parser.get_bool_option('DIAGNOSTICS', 'SKIP_DIAGS_DONE', True)
self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY'))
self.cdftools_path = parser.get_path_option('DIAGNOSTICS', 'CDFTOOLS_PATH', '')
self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 0)
self.parallel_downloads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_DOWNLOADS', 1)
self.parallel_uploads = parser.get_int_option('DIAGNOSTICS', 'PARALLEL_UPLOADS', 1)
Javier Vegas-Regidor
committed
self.restore_meshes = parser.get_bool_option('DIAGNOSTICS', 'RESTORE_MESHES', False)
# Read experiment config
Javier Vegas-Regidor
committed
# Read aliases
self._aliases = dict()
if parser.has_section('ALIAS'):
for option in parser.options('ALIAS'):
self._aliases[option.lower()] = parser.get_list_option('ALIAS', option)
Javier Vegas-Regidor
committed
Log.debug('Preparing command list')
commands = self._diags.split()
Javier Vegas-Regidor
committed
self._real_commands = list()
for command in commands:
command = command.strip()
if command.startswith('#'):
break
if command.lower() in self._aliases:
added_commands = self._aliases[command.lower()]
Javier Vegas-Regidor
committed
Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands))
for add_command in added_commands:
self._real_commands.append(add_command)
else:
self._real_commands.append(command)
Log.debug('Command list ready ')
self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.experiment.expid)
self.cmor = CMORConfig(parser, self.var_manager)
self.thredds = THREDDSConfig(parser)
Javier Vegas-Regidor
committed
def get_commands(self):
Return the list of commands after replacing the alias
Returns
-------
iterable of str
Javier Vegas-Regidor
committed
return self._real_commands
Javier Vegas-Regidor
committed
class CMORConfig(object):
"""
Configuration for the cmorization processes
Parameters
----------
parser: ConfigParser
var_manager: VariableManager
"""
def __init__(self, parser, var_manager):
Javier Vegas-Regidor
committed
self.force = parser.get_bool_option('CMOR', 'FORCE', False)
self.force_untar = parser.get_bool_option('CMOR', 'FORCE_UNTAR', False)
self.filter_files = parser.get_option('CMOR', 'FILTER_FILES', '')
Javier Vegas-Regidor
committed
self.ocean = parser.get_bool_option('CMOR', 'OCEAN_FILES', True)
self.atmosphere = parser.get_bool_option('CMOR', 'ATMOSPHERE_FILES', True)
Javier Vegas-Regidor
committed
self.use_grib = parser.get_bool_option('CMOR', 'USE_GRIB', True)
self._chunks = parser.get_int_list_option('CMOR', 'CHUNKS')
Javier Vegas-Regidor
committed
self.associated_experiment = parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled')
self.associated_model = parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled')
self.initialization_description = parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION', 'to be filled')
self.initialization_method = parser.get_option('CMOR', 'INITIALIZATION_METHOD', '1')
self.initialization_number = parser.get_int_option('CMOR', 'INITIALIZATION_NUMBER', 1)
Javier Vegas-Regidor
committed
self.physics_description = parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled')
self.physics_version = parser.get_option('CMOR', 'PHYSICS_VERSION', '1')
self.source = parser.get_option('CMOR', 'SOURCE', 'to be filled')
self.version = parser.get_option('CMOR', 'VERSION', '')
self.default_ocean_grid = parser.get_option('CMOR', 'DEFAULT_OCEAN_GRID', 'gn')
self.default_atmos_grid = parser.get_option('CMOR', 'DEFAULT_ATMOS_GRID', 'gr')
self.activity = parser.get_option('CMOR', 'ACTIVITY', 'CMIP')
self.min_cmorized_vars = parser.get_int_option('CMOR', 'MIN_CMORIZED_VARS', 10)
Javier Vegas-Regidor
committed
vars_string = parser.get_option('CMOR', 'VARIABLE_LIST', '')
self.var_manager = var_manager
if vars_string:
self._variable_list = list()
for domain_var in vars_string.split(' '):
if domain_var.startswith('#'):
break
splitted = domain_var.split(':')
cmor_var = self.var_manager.get_variable(splitted[1], silent=True)
Log.warning('Variable {0} not recognized. It will not be cmorized', domain_var)
continue
if ModelingRealm(splitted[0]) != cmor_var.domain:
Log.warning('Domain {0} for variable {1} is not correct: is {2}', splitted[0], cmor_var.short_name,
cmor_var.domain)
self._variable_list.append('{0.domain}:{0.short_name}'.format(cmor_var))
if len(self._variable_list) == 0:
raise ConfigException('Variable list value is specified, but no variables were found')
else:
self._variable_list = None
self._var_hourly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_HOURLY_VARS', ''))
self._var_daily = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_DAILY_VARS', ''))
self._var_monthly = CMORConfig._parse_variables(parser.get_option('CMOR', 'ATMOS_MONTHLY_VARS', ''))
Javier Vegas-Regidor
committed
def cmorize(self, var_cmor):
"""
Check if var_cmor is on variable list
Parameters
----------
var_cmor: Variable
"""
if self._variable_list is None:
return True
if not var_cmor:
return False
return '{0}:{1}'.format(var_cmor.domain, var_cmor.short_name) in self._variable_list
def any_required(self, variables):
"""
Check if any of the given variables is needed for cmorization
Parameters
----------
variables: iterable of str
Returns
-------
bool
"""
if self._variable_list is None:
return True
for var in variables:
if self.cmorize(self.var_manager.get_variable(var, silent=True)):
return True
return False
def chunk_cmorization_requested(self, chunk):
"""
Check if the cmorization of a given chunk is required
Parameters
----------
chunk: int
Returns
-------
bool
"""
if len(self._chunks) == 0:
return True
return chunk in self._chunks
@staticmethod
def _parse_variables(raw_string):
Javier Vegas-Regidor
committed
variables = dict()
if raw_string:
splitted = raw_string.split(',')
for var_section in splitted:
splitted_var = var_section.split(':')
if len(splitted_var) == 1:
levels = None
else:
levels = ','.join(map(str, CMORConfig._parse_levels(splitted_var[1:])))
variables[int(splitted_var[0])] = levels
return variables
@staticmethod
def _parse_levels(levels_splitted):
if len(levels_splitted) == 1:
return map(int, levels_splitted[0].split('-'))
start = int(levels_splitted[0])
end = int(levels_splitted[1])
if len(levels_splitted) == 3:
step = int(levels_splitted[2])
else:
step = 1
return range(start, end, step)
def get_variables(self, frequency):
"""
Get the variables to get from the grib file for a given frequency
Parameters
----------
frequency: Frequency
Returns
-------
str
Raises
------
ValueError
If the frequency passed is not supported
"""
if frequency in (Frequencies.three_hourly, Frequencies.six_hourly):
return self._var_hourly
elif frequency == Frequencies.daily:
return self._var_daily
elif frequency == Frequencies.monthly:
return self._var_monthly
raise ValueError('Frequency not recognized: {0}'.format(frequency))
"""
Get all the codes to be extracted from the grib files
Returns
-------
set of int
"""
return set(list(self._var_hourly.keys()) + list(self._var_daily.keys()) + list(self._var_monthly.keys()))
def get_levels(self, frequency, variable):
"""
Get the levels to extract for a given variable
Parameters
----------
frequency: Frequency
variable: str
Returns
-------
iterable of int
"""
return self.get_variables(frequency)[variable]
Javier Vegas-Regidor
committed
class THREDDSConfig(object):
"""
Configuration related to the THREDDS server
Parameters
----------
parser: ConfigParser
"""
def __init__(self, parser):
self.server_url = parser.get_option('THREDDS', 'SERVER_URL', '')
Javier Vegas-Regidor
committed
class ExperimentConfig(object):
"""Configuration related to the experiment"""
def __init__(self):
self.chunk_list = None
def parse_ini(self, parser):
"""
Parse experiment section from INI-like file
Javier Vegas-Regidor
committed
Parameters
----------
parser: ConfigParser
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
self.institute = parser.get_option('EXPERIMENT', 'INSTITUTE')
self.expid = parser.get_option('EXPERIMENT', 'EXPID')
self.experiment_name = parser.get_option('EXPERIMENT', 'NAME', self.expid)
Javier Vegas-Regidor
committed
self.members = parser.get_list_option('EXPERIMENT', 'MEMBERS')
self.member_digits = parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1)
Javier Vegas-Regidor
committed
self.member_prefix = parser.get_option('EXPERIMENT', 'MEMBER_PREFIX', 'fc')
self.member_count_start = parser.get_int_option('EXPERIMENT', 'MEMBER_COUNT_START', 0)
self.calendar = parser.get_option('EXPERIMENT', 'CALENDAR', 'standard')
self._parse_startdates(parser)
self.chunk_size = parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
self.num_chunks = parser.get_int_option('EXPERIMENT', 'CHUNKS')
self.chunk_list = parser.get_int_list_option('EXPERIMENT', 'CHUNK_LIST', [])
self.model = parser.get_option('EXPERIMENT', 'MODEL')
self.model_version = parser.get_option('EXPERIMENT', 'MODEL_VERSION', '')
self.atmos_grid = parser.get_option('EXPERIMENT', 'ATMOS_GRID', '')
self.atmos_timestep = parser.get_int_option('EXPERIMENT', 'ATMOS_TIMESTEP', 6)
self.ocean_timestep = parser.get_int_option('EXPERIMENT', 'OCEAN_TIMESTEP', 6)
def _parse_startdates(self, parser):
startdates = parser.get_list_option('EXPERIMENT', 'STARTDATES')
import exrex
self.startdates = []
for startdate_pattern in startdates:
startdate_pattern = startdate_pattern.strip()
if not startdate_pattern:
continue
if startdate_pattern[0] == '{' and startdate_pattern[-1] == '}':
self._read_startdates(startdate_pattern[1:-1])
else:
for startdate in exrex.generate(startdate_pattern):
startdate = startdate.strip()
if startdate:
self.startdates.append(startdate)
def _parse_members(self):
members = []
for mem in self.members:
if '-' in mem:
start, end = mem.split('-')
if start.startswith(self.member_prefix):
start = start[len(self.member_prefix):]
if end.startswith(self.member_prefix):
end = end[len(self.member_prefix):]
for member in range(int(start), int(end) + 1):
members.append(member)
else:
if mem.startswith(self.member_prefix):
mem = mem[len(self.member_prefix):]
members.append(int(mem))
self.members = members
Javier Vegas-Regidor
committed
def _read_startdates(self, pattern):
pattern = pattern.split(',')
start = parse_date(pattern[0].strip())
end = parse_date(pattern[1].strip())
interval = pattern[2].strip()
if len(interval) == 1:
factor = 1
else:
factor = int(interval[0:-1])
interval = interval[-1].upper()
while start <= end:
self.startdates.append(date2str(start))
if interval == 'Y':
start = add_years(start, factor)
elif interval == 'M':
start = add_months(start, factor, cal=self.calendar)
elif interval == 'W':
start = add_days(start, factor * 7, cal=self.calendar)
elif interval == 'D':
start = add_days(start, factor, cal=self.calendar)
else:
raise ConfigException('Interval {0} not supported in STARTDATES definition: {1}', interval, pattern)
Javier Vegas-Regidor
committed
def get_chunk_list(self):
"""
Return a list with all the chunks
:return: List containing tuples of startdate, member and chunk
:rtype: tuple[str, int, int]
"""
chunk_list = list()
for startdate in self.startdates:
for member in self.members:
if len(self.chunk_list) == 0:
for chunk in range(1, self.num_chunks + 1):
chunk_list.append((startdate, member, chunk))
else:
for chunk in self.chunk_list:
chunk_list.append((startdate, member, chunk))
Javier Vegas-Regidor
committed
return chunk_list
def get_member_list(self):
"""
Return a list with all the members
:return: List containing tuples of startdate and member
:rtype: tuple[str, int, int]
"""
member_list = list()
for startdate in self.startdates:
for member in self.members:
Javier Vegas-Regidor
committed
return member_list
def get_year_chunks(self, startdate, year):
"""
Get the list of chunks containing timesteps from the given year
:param startdate: startdate to use
:type startdate: str
:param year: reference year
:type year: int
:return: list of chunks containing data from the given year
:rtype: list[int]
"""
date = parse_date(startdate)
chunks = list()
for chunk in range(1, self.num_chunks + 1):
chunk_start = self.get_chunk_start(date, chunk)
Javier Vegas-Regidor
committed
if chunk_start.year > year:
break
elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
self.calendar).year == year:
chunks.append(chunk)
return chunks
def get_chunk_start(self, startdate, chunk):
"""
Get chunk's first day
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
datetime.datetime
See Also
--------
get_chunk_start_str
"""
startdate = parse_date(startdate)
return chunk_start_date(startdate, chunk, self.chunk_size, 'month', self.calendar)
def get_chunk_start_str(self, startdate, chunk):
"""
Get chunk's first day string representation
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
str
See Also
--------
get_chunk_start
"""
return date2str(self.get_chunk_start(startdate, chunk))
def get_chunk_end(self, startdate, chunk):
"""
Get chunk's last day
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
datetime.datetime
See Also
--------
get_chunk_end_str
"""
return chunk_end_date(self.get_chunk_start(startdate, chunk), self.chunk_size, 'month', self.calendar)
def get_chunk_end_str(self, startdate, chunk):
"""
Get chunk's last day as a string
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
datetime.datetime
See Also
--------
get_chunk_end
"""
return date2str(self.get_chunk_end(startdate, chunk))
Javier Vegas-Regidor
committed
def get_full_years(self, startdate):
"""
Return the list of full years that are in the given startdate
Javier Vegas-Regidor
committed
:param startdate: startdate to use
:type startdate: str
:return: list of full years
:rtype: list[int]
Javier Vegas-Regidor
committed
"""
chunks_per_year = 12 // self.chunk_size
Javier Vegas-Regidor
committed
date = parse_date(startdate)
first_january = 0
first_year = date.year
if date.month != 1:
month = date.month
first_year += 1
while month + self.chunk_size < 12:
month += self.chunk_size
first_january += 1
years = list()
for _ in range(first_january, self.num_chunks, chunks_per_year,):
Javier Vegas-Regidor
committed
years.append(first_year)
first_year += 1
return years
def get_member_str(self, member):
"""
Return the member name for a given member number.
Javier Vegas-Regidor
committed
:param member: member's number
:type member: int
:return: member's name
:rtype: str
Javier Vegas-Regidor
committed
"""
Javier Vegas-Regidor
committed
return '{0}{1}'.format(self.member_prefix, str(member).zfill(self.member_digits))
Javier Vegas-Regidor
committed
"""
Configuration for the reporting feature
Parameters
----------
parser: ConfigParser
"""
def __init__(self, parser):
self.maximum_priority = parser.get_int_option('REPORT', 'MAXIMUM_PRIORITY', 10)
self.path = parser.get_path_option('REPORT', 'PATH', '')