Newer
Older
# coding=utf-8
from unittest import TestCase
from earthdiagnostics.config import (
CMORConfig,
ConfigException,
THREDDSConfig,
ReportConfig,
ExperimentConfig,
Config,
)
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.data_convention import (
SPECSConvention,
PrimaveraConvention,
PrefaceConvention,
MeteoFranceConvention,
CMIP6Convention,
)
class VariableMock(object):
def __init__(self):
self.domain = ModelingRealms.ocean
def __eq__(self, other):
return (
self.domain == other.domain and self.short_name == other.short_name
)
class VariableManagerMock(object):
def get_variable(self, alias, silent=False):
return None
var = VariableMock()
var.short_name = alias
return var
class ParserMock(mock.Mock):
def __init__(self, **kwargs):
self._values = {}
def add_value(self, section, var, value):
self._values[self.get_var_string(section, var)] = value
def get_var_string(self, section, var):
def get_value(self, section, var, default):
try:
return self._values[self.get_var_string(section, var)]
except KeyError:
return default
def get_bool_option(self, section, var, default):
return self.get_value(section, var, default)
def get_path_option(self, section, var, default=""):
return self.get_value(section, var, default)
def get_int_option(self, section, var, default=0):
return self.get_value(section, var, default)
def get_choice_option(
self, section, var, choices, default, ignore_case=True
):
return self.get_value(section, var, default)
def get_int_list_option(self, section, var, default=list(), separator=" "):
try:
return [
int(val)
for val in self._values[
self.get_var_string(section, var)
].split(separator)
]
except KeyError:
return default
def get_list_option(self, section, var, default=list(), separator=" "):
return [
val
for val in self._values[
self.get_var_string(section, var)
].split(separator)
]
except KeyError:
return default
def get_option(self, section, var, default=None):
return self.get_value(section, var, default)
return any(x.startswith(start) for x in self._values)
def options(self, section):
"""Return all options for a given section"""
return [x[len(start):] for x in self._values if x.startswith(start)]
class TestCMORConfig(TestCase):
def setUp(self):
self.mock_parser = ParserMock()
self.var_manager = VariableManagerMock()
def test_basic_config(self):
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertEqual(config.ocean, True)
self.assertEqual(config.atmosphere, True)
self.assertEqual(config.force, False)
self.assertEqual(config.force_untar, False)
self.assertEqual(config.use_grib, True)
self.assertEqual(config.activity, "CMIP")
self.assertEqual(config.associated_experiment, "to be filled")
self.assertEqual(config.associated_model, "to be filled")
self.assertEqual(config.initialization_description, "to be filled")
self.assertEqual(config.initialization_method, "1")
self.assertEqual(config.initialization_number, 1)
self.assertEqual(config.source, "to be filled")
self.assertEqual(config.version, "")
self.assertEqual(config.physics_version, "1")
self.assertEqual(config.physics_description, "to be filled")
self.assertEqual(config.filter_files, [])
self.assertEqual(config.default_atmos_grid, "gr")
self.assertEqual(config.default_ocean_grid, "gn")
self.assertEqual(config.min_cmorized_vars, 10)
self.assertEqual(config.append_startdate, False)
def test_cmorize(self):
"""Test cmorize returns true always if list is not configured"""
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertTrue(config.cmorize(VariableMock()))
self.assertTrue(config.cmorize(None))
def test_cmorize_list(self):
"""Test cmorize return true for variables in the given list"""
self.mock_parser.add_value(
"CMOR", "VARIABLE_LIST", "ocean:thetao ocean:tos"
)
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertTrue(config.cmorize(VariableMock()))
thetao_mock = VariableMock()
thetao_mock.domain = ModelingRealms.ocean
self.assertTrue(config.cmorize(thetao_mock))
def test_bad_list(self):
"""Test cmorize return false for malformed variables"""
self.mock_parser.add_value("CMOR", "VARIABLE_LIST", "#ocean:tos")
with self.assertRaises(ConfigException):
CMORConfig(self.mock_parser, self.var_manager)
self.mock_parser.add_value("CMOR", "VARIABLE_LIST", "atmos:tos")
with self.assertRaises(ConfigException):
CMORConfig(self.mock_parser, self.var_manager)
self.mock_parser.add_value("CMOR", "VARIABLE_LIST", "ocean:bad")
with self.assertRaises(ConfigException):
CMORConfig(self.mock_parser, self.var_manager)
def test_not_cmorize(self):
"""Test cmorize return false for variables not in the given list"""
self.mock_parser.add_value("CMOR", "VARIABLE_LIST", "ocean:tos")
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertTrue(config.cmorize(VariableMock()))
self.assertFalse(config.cmorize(None))
tas_mock = VariableMock()
tas_mock.domain = ModelingRealms.atmos
self.assertFalse(config.cmorize(tas_mock))
thetao_mock = VariableMock()
thetao_mock.domain = ModelingRealms.ocean
self.assertFalse(config.cmorize(thetao_mock))
"""Test cmorize return false for variables after the comment mark"""
self.mock_parser.add_value(
"CMOR", "VARIABLE_LIST", "ocean:tos #ocean:thetao "
)
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertTrue(config.cmorize(VariableMock()))
thetao_mock = VariableMock()
thetao_mock.domain = ModelingRealms.ocean
self.assertFalse(config.cmorize(thetao_mock))
self.mock_parser.add_value(
"CMOR", "VARIABLE_LIST", "#ocean:tos ocean:thetao "
)
with self.assertRaises(ConfigException):
CMORConfig(self.mock_parser, self.var_manager)
def test_cmorization_chunk(self):
"""Test chunk cmorization requested returns true if not given a list"""
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertTrue(config.chunk_cmorization_requested(1))
def test_cmorize_only_some_chunks(self):
"""Test chunk cmor requested returns true only if it is in the list"""
self.mock_parser.add_value("CMOR", "CHUNKS", "3 5")
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertTrue(config.chunk_cmorization_requested(3))
self.assertTrue(config.chunk_cmorization_requested(5))
self.assertFalse(config.chunk_cmorization_requested(1))
self.assertFalse(config.chunk_cmorization_requested(4))
self.assertFalse(config.chunk_cmorization_requested(6))
config = CMORConfig(self.mock_parser, self.var_manager)
self.mock_parser.add_value(
"CMOR", "VARIABLE_LIST", "ocean:tos ocean:thetao"
)
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertTrue(config.any_required(["tos", "thetao", "tas"]))
self.assertTrue(config.any_required(["tos", "tas"]))
self.assertTrue(config.any_required(["thetao"]))
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertEqual(config.get_variables(Frequencies.six_hourly), {})
self.mock_parser.add_value(
"CMOR",
"ATMOS_HOURLY_VARS",
"128,129:1,130:1-2,131:1:10,132:0:10:5",
)
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertEqual(
config.get_variables(Frequencies.six_hourly),
{
128: None,
129: "1",
130: "1,2",
131: "1,2,3,4,5,6,7,8,9",
132: "0,5",
},
)
self.assertEqual(config.get_levels(Frequencies.six_hourly, 128), None)
self.assertEqual(config.get_levels(Frequencies.six_hourly, 129), "1")
self.assertEqual(config.get_levels(Frequencies.six_hourly, 130), "1,2")
self.assertEqual(
config.get_levels(Frequencies.six_hourly, 131),
"1,2,3,4,5,6,7,8,9",
)
self.assertEqual(config.get_levels(Frequencies.six_hourly, 132), "0,5")
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertEqual(config.get_variables(Frequencies.daily), {})
self.mock_parser.add_value(
"CMOR", "ATMOS_DAILY_VARS", "128,129:1,130:1-2,131:1:10,132:0:10:5"
)
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertEqual(
config.get_variables(Frequencies.daily),
{
128: None,
129: "1",
130: "1,2",
131: "1,2,3,4,5,6,7,8,9",
132: "0,5",
},
)
self.assertEqual(config.get_levels(Frequencies.daily, 128), None)
self.assertEqual(config.get_levels(Frequencies.daily, 129), "1")
self.assertEqual(config.get_levels(Frequencies.daily, 130), "1,2")
self.assertEqual(
config.get_levels(Frequencies.daily, 131), "1,2,3,4,5,6,7,8,9",
)
self.assertEqual(config.get_levels(Frequencies.daily, 132), "0,5")
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertEqual(config.get_variables(Frequencies.monthly), {})
self.mock_parser.add_value(
"CMOR",
"ATMOS_MONTHLY_VARS",
"128,129:1,130:1-2,131:1:10,132:0:10:5",
)
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertEqual(
config.get_variables(Frequencies.monthly),
{
128: None,
129: "1",
130: "1,2",
131: "1,2,3,4,5,6,7,8,9",
132: "0,5",
},
)
self.assertEqual(config.get_levels(Frequencies.monthly, 128), None)
self.assertEqual(config.get_levels(Frequencies.monthly, 129), "1")
self.assertEqual(config.get_levels(Frequencies.monthly, 130), "1,2")
self.assertEqual(
config.get_levels(Frequencies.monthly, 131), "1,2,3,4,5,6,7,8,9",
)
self.assertEqual(config.get_levels(Frequencies.monthly, 132), "0,5")
def test_bad_frequency_vars(self):
"""Test get variables fails if a bada frequency is specified"""
config = CMORConfig(self.mock_parser, self.var_manager)
with self.assertRaises(ValueError):
config.get_variables(Frequencies.climatology)
def test_requested_codes(self):
"""Test requested codes returns a set of codes at all frequencies"""
self.mock_parser.add_value(
"CMOR",
"ATMOS_HOURLY_VARS",
"128,129:1,130:1-2,131:1:10,132:0:10:5",
)
self.mock_parser.add_value(
"CMOR", "ATMOS_DAILY_VARS", "128,129:1,130:1-2,131:1:10,132:0:10:5"
)
self.mock_parser.add_value(
"CMOR",
"ATMOS_MONTHLY_VARS",
"128,129:1,130:1-2,131:1:10,132:0:10:5",
)
config = CMORConfig(self.mock_parser, self.var_manager)
self.assertEqual(
config.get_requested_codes(), {128, 129, 130, 131, 132}
)
class TestTHREDDSConfig(TestCase):
self.mock_parser = ParserMock()
def test_basic_config(self):
config = THREDDSConfig(self.mock_parser)
self.mock_parser.add_value("THREDDS", "SERVER_URL", "test_url")
config = THREDDSConfig(self.mock_parser)
class TestReportConfig(TestCase):
self.mock_parser = ParserMock()
def test_basic_config(self):
config = ReportConfig(self.mock_parser)
self.assertEqual(config.maximum_priority, 10)
self.mock_parser.add_value("REPORT", "PATH", "new_path")
config = ReportConfig(self.mock_parser)
self.mock_parser.add_value("REPORT", "MAXIMUM_PRIORITY", 3)
config = ReportConfig(self.mock_parser)
self.assertEqual(config.maximum_priority, 3)
class TestExperimentConfig(TestCase):
def setUp(self):
self.mock_parser = ParserMock()
def test_basic_config(self):
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(config.startdates, [])
self.assertEqual(config.members, [])
self.assertEqual(config.chunk_size, 0)
self.assertEqual(config.num_chunks, 0)
self.assertEqual(config.atmos_timestep, 6)
self.assertEqual(config.ocean_timestep, 6)
"""
Test all ways of specifing the members
Including:
- Adding the prefix string or not
- Providing them as a space separated list
- Providing them as a range with start and end separated by -
(both extremes included)
self.mock_parser.add_value("EXPERIMENT", "MEMBERS", "fc0 1")
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.mock_parser.add_value("EXPERIMENT", "MEMBERS", "fc00 fc01")
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.mock_parser.add_value("EXPERIMENT", "MEMBERS", "fc1-fc3")
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(config.members, [1, 2, 3])
self.mock_parser.add_value("EXPERIMENT", "MEMBERS", "1-3")
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(config.members, [1, 2, 3])
def test_startdates(self):
"""
Test startadates parsing
Two options:
- A simple list
- Using a regex
"""
self.mock_parser.add_value(
"EXPERIMENT", "STARTDATES", "20001101 20011101"
)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(config.startdates, ["20001101", "20011101"])
def test_auto_startdates(self):
"""
Test parsing startdates using the automatic generation
Reference syntax: {START,STOP,INTERVAL} where start and stop are
dates and the interval is given with the code NI where N is the number
of intervals (default 1) and I is the base interval (Y,M,W,D)
self.mock_parser.add_value(
"EXPERIMENT", "STARTDATES", "{2000,2001,1Y}"
)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(config.startdates, ["20000101", "20010101"])
self.mock_parser.add_value(
"EXPERIMENT", "STARTDATES", "{20001101,20011101,1Y}"
)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(config.startdates, ["20001101", "20011101"])
self.mock_parser.add_value(
"EXPERIMENT", "STARTDATES", "{20001101,20011101,6M} "
)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(
config.startdates, ["20001101", "20010501", "20011101"]
)
self.mock_parser.add_value(
"EXPERIMENT", "STARTDATES", "{20001101,20001201,1W}"
)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(
config.startdates,
["20001101", "20001108", "20001115", "20001122", "20001129"],
)
self.mock_parser.add_value(
"EXPERIMENT", "STARTDATES", "{20001101,20001201,W}"
)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(
config.startdates,
["20001101", "20001108", "20001115", "20001122", "20001129"],
)
self.mock_parser.add_value(
"EXPERIMENT", "STARTDATES", "{20001101,20001201,7D}"
)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(
config.startdates,
["20001101", "20001108", "20001115", "20001122", "20001129"],
)
self.mock_parser.add_value(
"EXPERIMENT", "STARTDATES", "{20001101,20001201,7F}"
)
with self.assertRaises(ConfigException):
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(config.get_member_str(1), "fc1")
def test_get_full_years(self):
"""Test get full years method"""
self.mock_parser.add_value("EXPERIMENT", "CHUNK_SIZE", 3)
self.mock_parser.add_value("EXPERIMENT", "CHUNKS", 15)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(
config.get_full_years("20000601"), [2001, 2002, 2003, 2004]
)
self.assertEqual(
config.get_full_years("20000101"), [2000, 2001, 2002, 2003]
)
def test_get_year_chunks(self):
self.mock_parser.add_value("EXPERIMENT", "CHUNK_SIZE", 3)
self.mock_parser.add_value("EXPERIMENT", "CHUNKS", 13)
config = ExperimentConfig()
config.parse_ini(self.mock_parser)
self.assertEqual(
config.get_year_chunks("20000601", 2003), [11, 12, 13]
)
self.assertEqual(
config.get_year_chunks("20000601", 2001), [3, 4, 5, 6, 7]
)
self.assertEqual(
config.get_year_chunks("20000101", 2000), [1, 2, 3, 4]
)
self.assertEqual(config.get_year_chunks("20000601", 2000), [1, 2, 3])
self.assertEqual(config.get_year_chunks("20000601", 1999), [])
def test_get_chunk_list(self):
"""Test get complete chunk list"""
config.members = (0, 1, 2)
config.chunk_list = [0]
config.num_chunks = 2
self.assertEqual(
config.get_chunk_list(),
[("20010101", 0, 0), ("20010101", 1, 0), ("20010101", 2, 0)],
)
self.assertEqual(
config.get_chunk_list(),
[
("20010101", 0, 1),
("20010101", 0, 2),
("20010101", 1, 1),
("20010101", 1, 2),
("20010101", 2, 1),
("20010101", 2, 2),
],
)
def test_get_member_list(self):
self.assertEqual(
config.get_member_list(),
[("20010101", 0), ("20010101", 1), ("20010101", 2)],
)
def test_get_chunk_start_str(self):
"""Test get_chunk_start_str"""
self.mock_parser.add_value("EXPERIMENT", "CHUNK_SIZE", 12)
self.mock_parser.add_value("EXPERIMENT", "CHUNKS", 3)
config.parse_ini(self.mock_parser)
self.assertEqual(config.get_chunk_start_str("20001101", 3), "20021101")
def test_get_chunk_start_str_datetime(self):
"""Test get_chunk_start_str when receiving a date object"""
self.mock_parser.add_value("EXPERIMENT", "CHUNK_SIZE", 12)
self.mock_parser.add_value("EXPERIMENT", "CHUNKS", 3)
date = datetime.datetime(year=2000, month=11, day=1)
config.parse_ini(self.mock_parser)
self.assertEqual(config.get_chunk_start_str(date, 3), "20021101")
def test_get_chunk_end_str(self):
"""Test get_chunk_end_str"""
self.mock_parser.add_value("EXPERIMENT", "CHUNK_SIZE", 12)
self.mock_parser.add_value("EXPERIMENT", "CHUNKS", 3)
config.parse_ini(self.mock_parser)
self.assertEqual(config.get_chunk_end_str("20001101", 3), "20031101")
class TestConfig(TestCase):
self.mock_parser = ParserMock()
self.mock_parser.add_value("DIAGNOSTICS", "FREQUENCY", "mon")
self.mock_parser.add_value("DIAGNOSTICS", "DIAGS", "diag1 diag2")
self.mock_parser.add_value("DIAGNOSTICS", "SCRATCH_DIR", "scratch")
os.environ.clear()
os.environ.update(self._environ)
def mock_new():
return self.mock_parser
def mock_new_exp():
mock_exp = mock.Mock()
with mock.patch("earthdiagnostics.config.ConfigParser", new=mock_new):
with mock.patch("earthdiagnostics.config.VariableManager"):
with mock.patch(
"earthdiagnostics.config.ExperimentConfig",
new=mock_new_exp,
):
with mock.patch("earthdiagnostics.config.CMORConfig"):
with mock.patch(
"earthdiagnostics.config.THREDDSConfig"
):
with mock.patch("os.path.isfile"):
config.parse("path")
self.mock_parser.add_value(
"DIAGNOSTICS", "DIAGS", "diag1 diag2,opt1,opt2 # Commented diag"
)
self.assertEqual(config.get_commands(), (["diag1", "diag2,opt1,opt2"]))
"""Test value error is raised if config file is not found"""
config = Config()
with self.assertRaises(ValueError):
self.assertEqual(config.frequency, Frequencies.monthly)
self.assertEqual(config.auto_clean, True)
self.assertEqual(config.cdftools_path, "")
self.assertEqual(config.con_files, "")
self.assertEqual(config.data_adaptor, "CMOR")
self.assertEqual(config.get_commands(), (["diag1", "diag2"]))
self.assertIsInstance(config.data_convention, SPECSConvention)
self.assertEqual(config.scratch_masks, "/scratch/Earth/ocean_masks")
namelist = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"earthdiagnostics/CDFTOOLS_specs.namlist",
)
)
self.assertEqual(os.environ["NAM_CDF_NAMES"], namelist)
self.mock_parser.add_value("ALIAS", "diag1", "diag3")
self.assertEqual(config.get_commands(), ["diag3", "diag2"])
def test_empty_alias(self):
"""Test alias parsing"""
config = Config()
self.mock_parser.add_value("ALIAS", "diag1", "")
self._parse(config)
self.assertEqual(config.get_commands(), ["diag2"])
"""Test that USE_RAMDISK forces AUTO_CLEAN to true"""
self.mock_parser.add_value("DIAGNOSTICS", "AUTO_CLEAN", False)
self.mock_parser.add_value("DIAGNOSTICS", "USE_RAMDISK", True)
self._parse(config)
self.assertEqual(config.auto_clean, True)
self.assertEqual(config.use_ramdisk, True)
def test_data_convention_primavera(self):
"""Test parsing data convention PRIMAVERA"""
self.mock_parser.add_value(
"DIAGNOSTICS", "DATA_CONVENTION", "primavera"
)
self.assertIsInstance(config.data_convention, PrimaveraConvention)
self.assertEqual(
config.scratch_masks, "/scratch/Earth/ocean_masks/primavera"
)
namelist = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"earthdiagnostics/CDFTOOLS_primavera.namlist",
)
)
self.assertEqual(os.environ["NAM_CDF_NAMES"], namelist)
self.mock_parser.add_value("DIAGNOSTICS", "DATA_CONVENTION", "cmip6")
self._parse(config)
self.assertIsInstance(config.data_convention, CMIP6Convention)
self.assertEqual(
config.scratch_masks, "/scratch/Earth/ocean_masks/cmip6"
)
namelist = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"earthdiagnostics/CDFTOOLS_cmip6.namlist",
)
)
self.assertEqual(os.environ["NAM_CDF_NAMES"], namelist)
"""Test parsing data convention MeteoFrance"""
self.mock_parser.add_value(
"DIAGNOSTICS", "DATA_CONVENTION", "meteofrance"
)
self._parse(config)
self.assertIsInstance(config.data_convention, MeteoFranceConvention)
self.assertEqual(config.scratch_masks, "/scratch/Earth/ocean_masks")
namelist = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"earthdiagnostics/CDFTOOLS_meteofrance.namlist",
)
)
self.assertEqual(os.environ["NAM_CDF_NAMES"], namelist)
"""Test parsing data convention Preface"""
self.mock_parser.add_value("DIAGNOSTICS", "DATA_CONVENTION", "preface")
self._parse(config)
self.assertIsInstance(config.data_convention, PrefaceConvention)
self.assertEqual(config.scratch_masks, "/scratch/Earth/ocean_masks")
namelist = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"earthdiagnostics/CDFTOOLS_preface.namlist",
)
)
self.assertEqual(os.environ["NAM_CDF_NAMES"], namelist)