Newer
Older
Javier Vegas-Regidor
committed
# coding=utf-8
"""Classes to manage Earth Diagnostics configuration"""
Javier Vegas-Regidor
committed
import os
from bscearth.utils.config_parser import ConfigParser
from bscearth.utils.date import (
parse_date,
chunk_start_date,
chunk_end_date,
date2str,
add_years,
add_months,
add_days,
)
Javier Vegas-Regidor
committed
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealm
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.data_convention import (
SPECSConvention,
CMIP6Convention,
PrimaveraConvention,
MeteoFranceConvention,
PrefaceConvention,
)
Javier Vegas-Regidor
committed
"""Exception raised when there is a problem with the configuration"""
class Config(object):
"""Class to read and manage the configuration"""
def __init__(self):
# Read diags config
self.data_adaptor = None
"Scratch folder path"
self.scratch_dir = None
"Scratch folder path"
self.use_ramdisk = None
"If True, the scratch dir is created as a ram disk"
self.auto_clean = None
"If True, the scratch dir is removed after finishing"
self.scratch_masks = None
"Common scratch folder for masks"
self.data_dir = None
"Root data folder path"
self.data_type = None
"Data type (experiment, observation or reconstruction)"
self.con_files = None
"Mask and meshes folder path"
self.mesh_mask = None
"Custom mesh mask file to use"
self.new_mask_glo = None
"Custom new mask glo file to use"
self.basins = None
"Custom basins file to use"
self.data_convention = None
"Data convention to use"
self.var_manager = None
self.skip_diags_done = None
"Flag to control if already done diags must be recalculated"
self.frequency = None
"Default data frequency to be used by the diagnostics"
self.cdftools_path = None
"Path to CDFTOOLS executables"
self.max_cores = None
"Maximum number of cores to use"
self.parallel_downloads = None
"Maximum number of simultaneous downloads"
self.parallel_uploads = None
"Maximum number of simultaneous uploads"
self.restore_meshes = None
"If True, force the copy of all the mesh and mask files for the model"
# Read experiment config
self.experiment = ExperimentConfig()
"""
Configuration related to the experiment
Returns
-------
ExperimentConfig
"""
self.cmor = None
"""
CMOR related configuration
Returns
-------
CMORConfig
"""
self.thredds = None
"""
THREDDS server configuration
Returns
-------
THREDDSConfig
"""
self.report = None
"""
Reporting configuration
Returns
-------
ReportConfig
"""
"""
Read configuration from INI file
Parameters
----------
path: str
"""
config_file_path = bscearth.utils.path.expand_path(path)
if not os.path.isfile(config_file_path):
Log.critical(
"Configuration file {0} can not be found", config_file_path
)
raise ValueError(
"Configuration file {0} can not be found".format(
config_file_path
)
)
Javier Vegas-Regidor
committed
parser.optionxform = str
Javier Vegas-Regidor
committed
# Read diags config
self.data_adaptor = parser.get_choice_option(
"DIAGNOSTICS",
"DATA_ADAPTOR",
("CMOR", "THREDDS", "OBSRECON"),
"CMOR",
)
self.scratch_dir = parser.get_path_option("DIAGNOSTICS", "SCRATCH_DIR")
self.use_ramdisk = parser.get_bool_option(
"DIAGNOSTICS", "USE_RAMDISK", False
)
self.auto_clean = parser.get_bool_option(
"DIAGNOSTICS", "AUTO_CLEAN", True
)
if not self.auto_clean and self.use_ramdisk:
Log.warning(
"RAM disk scratch dir is always automatically cleaned."
)
self.scratch_masks = parser.get_path_option(
"DIAGNOSTICS", "SCRATCH_MASKS", "/scratch/Earth/ocean_masks"
)
self.data_dir = parser.get_path_option("DIAGNOSTICS", "DATA_DIR")
self.data_type = parser.get_choice_option(
"DIAGNOSTICS", "DATA_TYPE", ("exp", "obs", "recon"), "exp"
)
self.con_files = parser.get_path_option("DIAGNOSTICS", "CON_FILES")
self.mesh_mask = parser.get_path_option("DIAGNOSTICS", "MESH_MASK", "")
self.new_mask_glo = parser.get_path_option(
"DIAGNOSTICS", "NEW_MASK_GLO", ""
)
self.mask_regions = parser.get_path_option(
"DIAGNOSTICS", "MASK_REGIONS", ""
)
self.mask_regions_3d = parser.get_path_option(
"DIAGNOSTICS", "MASK_REGIONS_3D", ""
)
self.var_manager = VariableManager()
self.var_manager.load_variables(self.data_convention.name)
self._diags = parser.get_option("DIAGNOSTICS", "DIAGS")
self.skip_diags_done = parser.get_bool_option(
"DIAGNOSTICS", "SKIP_DIAGS_DONE", True
)
self.frequency = Frequency(
parser.get_option("DIAGNOSTICS", "FREQUENCY")
)
self.cdftools_path = parser.get_path_option(
"DIAGNOSTICS", "CDFTOOLS_PATH", ""
)
self.max_cores = parser.get_int_option("DIAGNOSTICS", "MAX_CORES", 0)
self.parallel_downloads = parser.get_int_option(
"DIAGNOSTICS", "PARALLEL_DOWNLOADS", 1
)
self.parallel_uploads = parser.get_int_option(
"DIAGNOSTICS", "PARALLEL_UPLOADS", 1
)
self.restore_meshes = parser.get_bool_option(
"DIAGNOSTICS", "RESTORE_MESHES", False
)
Javier Vegas-Regidor
committed
# Read experiment config
Javier Vegas-Regidor
committed
# Read aliases
self._aliases = dict()
if parser.has_section("ALIAS"):
for option in parser.options("ALIAS"):
self._aliases[option.lower()] = parser.get_list_option(
"ALIAS", option
)
Log.debug("Preparing command list")
commands = self._diags.split()
Javier Vegas-Regidor
committed
self._real_commands = list()
self.scratch_dir = os.path.join(
self.scratch_dir, "diags", self.experiment.expid
)
self.cmor = CMORConfig(parser, self.var_manager)
self.thredds = THREDDSConfig(parser)
self.report = ReportConfig(parser)
def _apply_aliases(self, commands):
Javier Vegas-Regidor
committed
for command in commands:
command = command.strip()
if command.lower() in self._aliases:
added_commands = self._aliases[command.lower()]
Log.info(
"Changing alias {0} for {1}",
command,
" ".join(added_commands),
)
Javier Vegas-Regidor
committed
for add_command in added_commands:
self._real_commands.append(add_command)
else:
self._real_commands.append(command)
def _parse_dataconvention(self, parser):
data_convention = parser.get_choice_option(
"DIAGNOSTICS",
"DATA_CONVENTION",
("specs", "primavera", "cmip6", "preface", "meteofrance"),
"specs",
ignore_case=True,
)
if data_convention == "specs":
self.data_convention = SPECSConvention(data_convention, self)
self.data_convention = PrimaveraConvention(data_convention, self)
self.data_convention = CMIP6Convention(data_convention, self)
self.data_convention = PrefaceConvention(data_convention, self)
self.data_convention = MeteoFranceConvention(data_convention, self)
self.scratch_masks = self.data_convention.get_scratch_masks(
self.scratch_masks
)
namelist_file = os.path.join(
os.path.dirname(__file__),
"CDFTOOLS_{0}.namlist".format(self.data_convention.name),
)
Log.debug("Setting namelist {0}", namelist_file)
os.environ["NAM_CDF_NAMES"] = namelist_file
Javier Vegas-Regidor
committed
def get_commands(self):
Return the list of commands after replacing the alias
Returns
-------
iterable of str
Javier Vegas-Regidor
committed
return self._real_commands
Javier Vegas-Regidor
committed
class CMORConfig(object):
"""
Configuration for the cmorization processes
Parameters
----------
parser: ConfigParser
var_manager: VariableManager
"""
def __init__(self, parser, var_manager):
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
self.force = parser.get_bool_option("CMOR", "FORCE", False)
self.force_untar = parser.get_bool_option("CMOR", "FORCE_UNTAR", False)
self.skip_prepare = parser.get_bool_option(
"CMOR", "SKIP_PREPARE", False
)
self.filter_files = parser.get_list_option("CMOR", "FILTER_FILES", [])
self.ocean = parser.get_bool_option("CMOR", "OCEAN_FILES", True)
self.atmosphere = parser.get_bool_option(
"CMOR", "ATMOSPHERE_FILES", True
)
self.use_grib = parser.get_bool_option("CMOR", "USE_GRIB", True)
self._chunks = parser.get_int_list_option("CMOR", "CHUNKS")
self.associated_experiment = parser.get_option(
"CMOR", "ASSOCIATED_EXPERIMENT", "to be filled"
)
self.associated_model = parser.get_option(
"CMOR", "ASSOCIATED_MODEL", "to be filled"
)
self.initialization_description = parser.get_option(
"CMOR", "INITIALIZATION_DESCRIPTION", "to be filled"
)
self.initialization_method = parser.get_option(
"CMOR", "INITIALIZATION_METHOD", "1"
)
self.initialization_number = parser.get_int_option(
"CMOR", "INITIALIZATION_NUMBER", 1
)
self.physics_description = parser.get_option(
"CMOR", "PHYSICS_DESCRIPTION", "to be filled"
)
self.physics_version = parser.get_option(
"CMOR", "PHYSICS_VERSION", "1"
)
self.source = parser.get_option("CMOR", "SOURCE", "to be filled")
self.version = parser.get_option("CMOR", "VERSION", "")
self.default_ocean_grid = parser.get_option(
"CMOR", "DEFAULT_OCEAN_GRID", "gn"
)
self.default_atmos_grid = parser.get_option(
"CMOR", "DEFAULT_ATMOS_GRID", "gr"
)
self.activity = parser.get_option("CMOR", "ACTIVITY", "CMIP")
self.min_cmorized_vars = parser.get_int_option(
"CMOR", "MIN_CMORIZED_VARS", 10
)
self.append_startdate = parser.get_bool_option(
"CMOR", "APPEND_STARTDATE", False
)
self.append_startdate_year_only = parser.get_bool_option(
"CMOR", "APPEND_STARTDATE_YEAR_ONLY", False
)
vars_string = parser.get_option("CMOR", "VARIABLE_LIST", "")
self.var_manager = var_manager
if vars_string:
self._variable_list = list()
splitted = domain_var.split(":")
cmor_var = self.var_manager.get_variable(
splitted[1], silent=True
)
Log.warning(
"Variable {0} not recognized. It will not be cmorized",
domain_var,
)
continue
if ModelingRealm(splitted[0]) != cmor_var.domain:
Log.warning(
"Domain {0} for variable {1} is not correct: is {2}",
splitted[0],
splitted[1],
cmor_var.domain,
)
self._variable_list.append(
"{0.domain}:{0.short_name}".format(cmor_var)
)
raise ConfigException(
"Variable list value is specified, but no variables were "
"found"
)
else:
self._variable_list = None
self._var_hourly = CMORConfig._parse_variables(
parser.get_option("CMOR", "ATMOS_HOURLY_VARS", "")
)
self._var_daily = CMORConfig._parse_variables(
parser.get_option("CMOR", "ATMOS_DAILY_VARS", "")
)
self._var_monthly = CMORConfig._parse_variables(
parser.get_option("CMOR", "ATMOS_MONTHLY_VARS", "")
)
Javier Vegas-Regidor
committed
def cmorize(self, var_cmor):
"""
Check if var_cmor is on variable list
Parameters
----------
var_cmor: Variable
"""
if self._variable_list is None:
return True
if not var_cmor:
return False
return (
"{0}:{1}".format(var_cmor.domain, var_cmor.short_name)
in self._variable_list
)
def any_required(self, variables):
"""
Check if any of the given variables is needed for cmorization
Parameters
----------
variables: iterable of str
Returns
-------
bool
"""
if self._variable_list is None:
return True
for var in variables:
if self.cmorize(self.var_manager.get_variable(var, silent=True)):
return True
return False
def chunk_cmorization_requested(self, chunk):
"""
Check if the cmorization of a given chunk is required
Parameters
----------
chunk: int
Returns
-------
bool
"""
if len(self._chunks) == 0:
return True
return chunk in self._chunks
@staticmethod
def _parse_variables(raw_string):
Javier Vegas-Regidor
committed
variables = dict()
if raw_string:
Javier Vegas-Regidor
committed
for var_section in splitted:
Javier Vegas-Regidor
committed
if len(splitted_var) == 1:
levels = None
else:
levels = ",".join(
map(str, CMORConfig._parse_levels(splitted_var[1:]))
)
Javier Vegas-Regidor
committed
variables[int(splitted_var[0])] = levels
return variables
@staticmethod
def _parse_levels(levels_splitted):
if len(levels_splitted) == 1:
Javier Vegas-Regidor
committed
start = int(levels_splitted[0])
end = int(levels_splitted[1])
if len(levels_splitted) == 3:
step = int(levels_splitted[2])
else:
step = 1
return range(start, end, step)
def get_variables(self, frequency):
"""
Get the variables to get from the grib file for a given frequency
Parameters
----------
frequency: Frequency
Returns
-------
str
Raises
------
ValueError
If the frequency passed is not supported
"""
if frequency in (Frequencies.three_hourly, Frequencies.six_hourly):
return self._var_hourly
elif frequency == Frequencies.daily:
return self._var_daily
elif frequency == Frequencies.monthly:
return self._var_monthly
raise ValueError("Frequency not recognized: {0}".format(frequency))
"""
Get all the codes to be extracted from the grib files
Returns
-------
set of int
"""
return set(
list(self._var_hourly.keys())
+ list(self._var_daily.keys())
+ list(self._var_monthly.keys())
)
def get_levels(self, frequency, variable):
"""
Get the levels to extract for a given variable
Parameters
----------
frequency: Frequency
variable: str
Returns
-------
iterable of int
"""
return self.get_variables(frequency)[variable]
Javier Vegas-Regidor
committed
class THREDDSConfig(object):
"""
Configuration related to the THREDDS server
Parameters
----------
parser: ConfigParser
"""
def __init__(self, parser):
self.server_url = parser.get_option("THREDDS", "SERVER_URL", "")
Javier Vegas-Regidor
committed
class ExperimentConfig(object):
"""Configuration related to the experiment"""
Javier Vegas-Regidor
committed
def __init__(self):
self.chunk_list = None
Javier Vegas-Regidor
committed
def parse_ini(self, parser):
"""
Parse experiment section from INI-like file
Javier Vegas-Regidor
committed
Parameters
----------
parser: ConfigParser
Javier Vegas-Regidor
committed
self.institute = parser.get_option("EXPERIMENT", "INSTITUTE")
self.expid = parser.get_option("EXPERIMENT", "EXPID")
self.experiment_name = parser.get_option(
"EXPERIMENT", "NAME", self.expid
)
self.members = parser.get_list_option("EXPERIMENT", "MEMBERS")
self.member_digits = parser.get_int_option(
"EXPERIMENT", "MEMBER_DIGITS", 1
)
self.member_prefix = parser.get_option(
"EXPERIMENT", "MEMBER_PREFIX", "fc"
)
self.member_count_start = parser.get_int_option(
"EXPERIMENT", "MEMBER_COUNT_START", 0
)
self.calendar = parser.get_option("EXPERIMENT", "CALENDAR", "standard")
self.chunk_size = parser.get_int_option("EXPERIMENT", "CHUNK_SIZE")
self.num_chunks = parser.get_int_option("EXPERIMENT", "CHUNKS")
self.chunk_list = parser.get_int_list_option(
"EXPERIMENT", "CHUNK_LIST", []
)
self.model = parser.get_option("EXPERIMENT", "MODEL")
self.model_version = parser.get_option(
"EXPERIMENT", "MODEL_VERSION", ""
)
self.atmos_grid = parser.get_option("EXPERIMENT", "ATMOS_GRID", "")
self.atmos_timestep = parser.get_int_option(
"EXPERIMENT", "ATMOS_TIMESTEP", 6
)
self.ocean_timestep = parser.get_int_option(
"EXPERIMENT", "OCEAN_TIMESTEP", 6
)
startdates = parser.get_list_option("EXPERIMENT", "STARTDATES")
self.startdates = []
for startdate_pattern in startdates:
startdate_pattern = startdate_pattern.strip()
if not startdate_pattern:
continue
if startdate_pattern[0] == "{" and startdate_pattern[-1] == "}":
self._read_startdates(startdate_pattern[1:-1])
else:
self.startdates.append(startdate_pattern)
members = []
for mem in self.members:
if start.startswith(self.member_prefix):
start = start[len(self.member_prefix):]
if end.startswith(self.member_prefix):
end = end[len(self.member_prefix):]
members.append(member)
else:
if mem.startswith(self.member_prefix):
mem = mem[len(self.member_prefix):]
members.append(int(mem))
self.members = members
def _read_startdates(self, pattern):
start = parse_date(pattern[0].strip())
end = parse_date(pattern[1].strip())
interval = pattern[2].strip()
if len(interval) == 1:
factor = 1
else:
factor = int(interval[0:-1])
interval = interval[-1].upper()
while start <= end:
self.startdates.append(date2str(start))
start = add_years(start, factor)
start = add_months(start, factor, cal=self.calendar)
start = add_days(start, factor * 7, cal=self.calendar)
start = add_days(start, factor, cal=self.calendar)
else:
raise ConfigException(
"Interval {0} not supported in STARTDATES definition: {1}",
interval,
pattern,
)
Javier Vegas-Regidor
committed
def get_chunk_list(self):
"""
Return a list with all the chunks
Javier Vegas-Regidor
committed
:return: List containing tuples of startdate, member and chunk
:rtype: tuple[str, int, int]
"""
chunk_list = list()
for startdate in self.startdates:
for member in self.members:
if len(self.chunk_list) == 0:
for chunk in range(1, self.num_chunks + 1):
chunk_list.append((startdate, member, chunk))
else:
for chunk in self.chunk_list:
chunk_list.append((startdate, member, chunk))
Javier Vegas-Regidor
committed
return chunk_list
def get_member_list(self):
"""
Return a list with all the members
Javier Vegas-Regidor
committed
:return: List containing tuples of startdate and member
:rtype: tuple[str, int, int]
"""
member_list = list()
for startdate in self.startdates:
for member in self.members:
Javier Vegas-Regidor
committed
return member_list
def get_year_chunks(self, startdate, year):
"""
Get the list of chunks containing timesteps from the given year
Javier Vegas-Regidor
committed
:param startdate: startdate to use
:type startdate: str
:param year: reference year
:type year: int
:return: list of chunks containing data from the given year
:rtype: list[int]
"""
date = parse_date(startdate)
chunks = list()
for chunk in range(1, self.num_chunks + 1):
chunk_start = self.get_chunk_start(date, chunk)
Javier Vegas-Regidor
committed
if chunk_start.year > year:
break
elif (
chunk_start.year == year
or chunk_end_date(
chunk_start, self.chunk_size, "month", self.calendar
).year
== year
):
Javier Vegas-Regidor
committed
chunks.append(chunk)
return chunks
def get_chunk_start(self, startdate, chunk):
"""
Get chunk's first day
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
datetime.datetime
See Also
--------
get_chunk_start_str
"""
startdate = parse_date(startdate)
return chunk_start_date(
startdate, chunk, self.chunk_size, "month", self.calendar
)
def get_chunk_start_str(self, startdate, chunk):
"""
Get chunk's first day string representation
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
str
See Also
--------
get_chunk_start
"""
return date2str(self.get_chunk_start(startdate, chunk))
def get_chunk_end(self, startdate, chunk):
"""
Get chunk's last day
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
datetime.datetime
See Also
--------
get_chunk_end_str
"""
return chunk_end_date(
self.get_chunk_start(startdate, chunk),
self.chunk_size,
"month",
self.calendar,
)
def get_chunk_end_str(self, startdate, chunk):
"""
Get chunk's last day as a string
Parameters
----------
startdate: str or datetime.datetime
chunk: int
Returns
-------
datetime.datetime
See Also
--------
get_chunk_end
"""
return date2str(self.get_chunk_end(startdate, chunk))
Javier Vegas-Regidor
committed
def get_full_years(self, startdate):
"""
Return the list of full years that are in the given startdate
Javier Vegas-Regidor
committed
:param startdate: startdate to use
:type startdate: str
:return: list of full years
:rtype: list[int]
Javier Vegas-Regidor
committed
"""
chunks_per_year = 12 // self.chunk_size
Javier Vegas-Regidor
committed
date = parse_date(startdate)
first_january = 0
first_year = date.year
if date.month != 1:
month = date.month
first_year += 1
while month + self.chunk_size < 12:
month += self.chunk_size
first_january += 1
years = list()
for _ in range(first_january, self.num_chunks, chunks_per_year,):
Javier Vegas-Regidor
committed
years.append(first_year)
first_year += 1
return years
def get_member_str(self, member):
"""
Return the member name for a given member number.
Javier Vegas-Regidor
committed
:param member: member's number
:type member: int
:return: member's name
:rtype: str
Javier Vegas-Regidor
committed
"""
return "{0}{1}".format(
self.member_prefix, str(member).zfill(self.member_digits)
)
Javier Vegas-Regidor
committed
"""
Configuration for the reporting feature
Parameters
----------
parser: ConfigParser
"""
self.maximum_priority = parser.get_int_option(
"REPORT", "MAXIMUM_PRIORITY", 10
)
self.path = parser.get_path_option("REPORT", "PATH", "")