# coding=utf-8 from unittest import TestCase import datetime import mock import os from earthdiagnostics.config import CMORConfig, ConfigException, THREDDSConfig, ReportConfig, ExperimentConfig, Config from earthdiagnostics.frequency import Frequencies from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.data_convention import SPECSConvention, PrimaveraConvention class VariableMock(object): def __init__(self): self.domain = ModelingRealms.ocean self.short_name = 'tos' def __eq__(self, other): return self.domain == other.domain and self.short_name == other.short_name class VariableManagerMock(object): def get_variable(self, alias, silent=False): if alias == 'bad': return None var = VariableMock() var.short_name = alias return var class ParserMock(mock.Mock): def __init__(self, **kwargs): super(mock.Mock, self).__init__(**kwargs) self._values = {} def add_value(self, section, var, value): self._values[self.get_var_string(section, var)] = value def get_var_string(self, section, var): return '{0}:{1}'.format(section, var) def get_value(self, section, var, default): try: return self._values[self.get_var_string(section, var)] except KeyError: return default def get_bool_option(self, section, var, default): return self.get_value(section, var, default) def get_path_option(self, section, var, default=""): return self.get_value(section, var, default) def get_int_option(self, section, var, default=0): return self.get_value(section, var, default) def get_choice_option(self, section, var, choices, default, ignore_case=True): return self.get_value(section, var, default) def get_int_list_option(self, section, var, default=list(), separator=' '): try: return [int(val) for val in self._values[self.get_var_string(section, var)].split(separator)] except KeyError: return default def get_list_option(self, section, var, default=list(), separator=' '): try: return [val for val in self._values[self.get_var_string(section, var)].split(separator)] except KeyError: return default def get_option(self, section, var, default=None): return self.get_value(section, var, default) def has_section(self, section): start = '{0}:'.format(section) return any(x.startswith(start) for x in self._values) def options(self, section): start = '{0}:'.format(section) return [x[len(start):] for x in self._values if x.startswith(start)] class TestCMORConfig(TestCase): def setUp(self): self.mock_parser = ParserMock() self.var_manager = VariableManagerMock() def test_basic_config(self): config = CMORConfig(self.mock_parser, self.var_manager) self.assertEqual(config.ocean, True) self.assertEqual(config.atmosphere, True) self.assertEqual(config.force, False) self.assertEqual(config.force_untar, False) self.assertEqual(config.use_grib, True) self.assertEqual(config.activity, 'CMIP') self.assertEqual(config.associated_experiment, 'to be filled') self.assertEqual(config.associated_model, 'to be filled') self.assertEqual(config.initialization_description, 'to be filled') self.assertEqual(config.initialization_method, '1') self.assertEqual(config.initialization_number, 1) self.assertEqual(config.source, 'to be filled') self.assertEqual(config.version, '') self.assertEqual(config.physics_version, '1') self.assertEqual(config.physics_description, 'to be filled') self.assertEqual(config.filter_files, '') self.assertEqual(config.default_atmos_grid, 'gr') self.assertEqual(config.default_ocean_grid, 'gn') self.assertEqual(config.min_cmorized_vars, 10) self.assertEqual(config.append_startdate, False) def test_cmorize(self): config = CMORConfig(self.mock_parser, self.var_manager) self.assertTrue(config.cmorize(VariableMock())) self.assertTrue(config.cmorize(None)) def test_cmorize_list(self): self.mock_parser.add_value('CMOR', 'VARIABLE_LIST', 'ocean:thetao ocean:tos') config = CMORConfig(self.mock_parser, self.var_manager) self.assertTrue(config.cmorize(VariableMock())) thetao_mock = VariableMock() thetao_mock.domain = ModelingRealms.ocean thetao_mock.short_name = 'thetao' self.assertTrue(config.cmorize(thetao_mock)) def test_bad_list(self): self.mock_parser.add_value('CMOR', 'VARIABLE_LIST', '#ocean:tos') with self.assertRaises(ConfigException): CMORConfig(self.mock_parser, self.var_manager) self.mock_parser.add_value('CMOR', 'VARIABLE_LIST', 'atmos:tos') with self.assertRaises(ConfigException): CMORConfig(self.mock_parser, self.var_manager) self.mock_parser.add_value('CMOR', 'VARIABLE_LIST', 'ocean:bad') with self.assertRaises(ConfigException): CMORConfig(self.mock_parser, self.var_manager) def test_not_cmorize(self): self.mock_parser.add_value('CMOR', 'VARIABLE_LIST', 'ocean:tos') config = CMORConfig(self.mock_parser, self.var_manager) self.assertTrue(config.cmorize(VariableMock())) self.assertFalse(config.cmorize(None)) tas_mock = VariableMock() tas_mock.domain = ModelingRealms.atmos tas_mock.short_name = 'tas' self.assertFalse(config.cmorize(tas_mock)) thetao_mock = VariableMock() thetao_mock.domain = ModelingRealms.ocean thetao_mock.short_name = 'thetao' self.assertFalse(config.cmorize(thetao_mock)) def test_comment(self): self.mock_parser.add_value('CMOR', 'VARIABLE_LIST', 'ocean:tos #ocean:thetao ') config = CMORConfig(self.mock_parser, self.var_manager) self.assertTrue(config.cmorize(VariableMock())) thetao_mock = VariableMock() thetao_mock.domain = ModelingRealms.ocean thetao_mock.short_name = 'thetao' self.assertFalse(config.cmorize(thetao_mock)) self.mock_parser.add_value('CMOR', 'VARIABLE_LIST', '#ocean:tos ocean:thetao ') with self.assertRaises(ConfigException): CMORConfig(self.mock_parser, self.var_manager) def test_cmorization_chunk(self): config = CMORConfig(self.mock_parser, self.var_manager) self.assertTrue(config.chunk_cmorization_requested(1)) def test_cmorize_only_some_chunks(self): self.mock_parser.add_value('CMOR', 'CHUNKS', '3 5') config = CMORConfig(self.mock_parser, self.var_manager) self.assertTrue(config.chunk_cmorization_requested(3)) self.assertTrue(config.chunk_cmorization_requested(5)) self.assertFalse(config.chunk_cmorization_requested(1)) self.assertFalse(config.chunk_cmorization_requested(4)) self.assertFalse(config.chunk_cmorization_requested(6)) def test_any_required(self): config = CMORConfig(self.mock_parser, self.var_manager) self.assertTrue(config.any_required(['tos'])) self.mock_parser.add_value('CMOR', 'VARIABLE_LIST', 'ocean:tos ocean:thetao') config = CMORConfig(self.mock_parser, self.var_manager) self.assertTrue(config.any_required(['tos', 'thetao', 'tas'])) self.assertTrue(config.any_required(['tos', 'tas'])) self.assertTrue(config.any_required(['thetao'])) self.assertFalse(config.any_required(['tas'])) def test_hourly_vars(self): config = CMORConfig(self.mock_parser, self.var_manager) self.assertEqual(config.get_variables(Frequencies.six_hourly), {}) self.mock_parser.add_value('CMOR', 'ATMOS_HOURLY_VARS', '128,129:1,130:1-2,131:1:10,132:0:10:5') config = CMORConfig(self.mock_parser, self.var_manager) self.assertEqual(config.get_variables(Frequencies.six_hourly), {128: None, 129: '1', 130: '1,2', 131: '1,2,3,4,5,6,7,8,9', 132: '0,5'}) self.assertEqual(config.get_levels(Frequencies.six_hourly, 128), None) self.assertEqual(config.get_levels(Frequencies.six_hourly, 129), '1') self.assertEqual(config.get_levels(Frequencies.six_hourly, 130), '1,2') self.assertEqual(config.get_levels(Frequencies.six_hourly, 131), '1,2,3,4,5,6,7,8,9',) self.assertEqual(config.get_levels(Frequencies.six_hourly, 132), '0,5') def test_daily_vars(self): config = CMORConfig(self.mock_parser, self.var_manager) self.assertEqual(config.get_variables(Frequencies.daily), {}) self.mock_parser.add_value('CMOR', 'ATMOS_DAILY_VARS', '128,129:1,130:1-2,131:1:10,132:0:10:5') config = CMORConfig(self.mock_parser, self.var_manager) self.assertEqual(config.get_variables(Frequencies.daily), {128: None, 129: '1', 130: '1,2', 131: '1,2,3,4,5,6,7,8,9', 132: '0,5'}) self.assertEqual(config.get_levels(Frequencies.daily, 128), None) self.assertEqual(config.get_levels(Frequencies.daily, 129), '1') self.assertEqual(config.get_levels(Frequencies.daily, 130), '1,2') self.assertEqual(config.get_levels(Frequencies.daily, 131), '1,2,3,4,5,6,7,8,9',) self.assertEqual(config.get_levels(Frequencies.daily, 132), '0,5') def test_monthly_vars(self): config = CMORConfig(self.mock_parser, self.var_manager) self.assertEqual(config.get_variables(Frequencies.monthly), {}) self.mock_parser.add_value('CMOR', 'ATMOS_MONTHLY_VARS', '128,129:1,130:1-2,131:1:10,132:0:10:5') config = CMORConfig(self.mock_parser, self.var_manager) self.assertEqual(config.get_variables(Frequencies.monthly), {128: None, 129: '1', 130: '1,2', 131: '1,2,3,4,5,6,7,8,9', 132: '0,5'}) self.assertEqual(config.get_levels(Frequencies.monthly, 128), None) self.assertEqual(config.get_levels(Frequencies.monthly, 129), '1') self.assertEqual(config.get_levels(Frequencies.monthly, 130), '1,2') self.assertEqual(config.get_levels(Frequencies.monthly, 131), '1,2,3,4,5,6,7,8,9',) self.assertEqual(config.get_levels(Frequencies.monthly, 132), '0,5') def test_bad_frequency_vars(self): config = CMORConfig(self.mock_parser, self.var_manager) with self.assertRaises(ValueError): config.get_variables(Frequencies.climatology) def test_requested_codes(self): self.mock_parser.add_value('CMOR', 'ATMOS_HOURLY_VARS', '128,129:1,130:1-2,131:1:10,132:0:10:5') self.mock_parser.add_value('CMOR', 'ATMOS_DAILY_VARS', '128,129:1,130:1-2,131:1:10,132:0:10:5') self.mock_parser.add_value('CMOR', 'ATMOS_MONTHLY_VARS', '128,129:1,130:1-2,131:1:10,132:0:10:5') config = CMORConfig(self.mock_parser, self.var_manager) self.assertEqual(config.get_requested_codes(), {128, 129, 130, 131, 132}) class TestTHREDDSConfig(TestCase): def setUp(self): self.mock_parser = ParserMock() def test_basic_config(self): config = THREDDSConfig(self.mock_parser) self.assertEqual(config.server_url, '') def test_url(self): self.mock_parser.add_value('THREDDS', 'SERVER_URL', 'test_url') config = THREDDSConfig(self.mock_parser) self.assertEqual(config.server_url, 'test_url') class TestReportConfig(TestCase): def setUp(self): self.mock_parser = ParserMock() def test_basic_config(self): config = ReportConfig(self.mock_parser) self.assertEqual(config.path, '') self.assertEqual(config.maximum_priority, 10) def test_path(self): self.mock_parser.add_value('REPORT', 'PATH', 'new_path') config = ReportConfig(self.mock_parser) self.assertEqual(config.path, 'new_path') def test_priority(self): self.mock_parser.add_value('REPORT', 'MAXIMUM_PRIORITY', 3) config = ReportConfig(self.mock_parser) self.assertEqual(config.maximum_priority, 3) class TestExperimentConfig(TestCase): def setUp(self): self.mock_parser = ParserMock() def test_basic_config(self): config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.startdates, []) self.assertEqual(config.members, []) self.assertEqual(config.chunk_size, 0) self.assertEqual(config.num_chunks, 0) self.assertEqual(config.atmos_grid, '') self.assertEqual(config.atmos_timestep, 6) self.assertEqual(config.ocean_timestep, 6) def test_members(self): self.mock_parser.add_value('EXPERIMENT', 'MEMBERS', 'fc0 1') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.members, [0, 1]) self.mock_parser.add_value('EXPERIMENT', 'MEMBERS', 'fc00 fc01') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.members, [0, 1]) self.mock_parser.add_value('EXPERIMENT', 'MEMBERS', 'fc1-fc3') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.members, [1, 2, 3]) self.mock_parser.add_value('EXPERIMENT', 'MEMBERS', '1-3') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.members, [1, 2, 3]) def test_startdates(self): self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '20001101 20011101') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.startdates, ['20001101', '20011101']) self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '200(0|1)1101') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.startdates, ['20001101', '20011101']) self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '200[0-2](02|05|08|11)01') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.startdates, [u'20000201', u'20000501', u'20000801', u'20001101', u'20010201', u'20010501', u'20010801', u'20011101', u'20020201', u'20020501', u'20020801', u'20021101']) def test_auto_startdates(self): self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '{20001101,20011101,1Y}') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.startdates, ['20001101', '20011101']) self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '{20001101,20011101,6M} ') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.startdates, ['20001101', '20010501', '20011101']) self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '{20001101,20001201,1W}') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.startdates, ['20001101', '20001108', '20001115', '20001122', '20001129']) self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '{20001101,20001201,W}') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.startdates, ['20001101', '20001108', '20001115', '20001122', '20001129']) self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '{20001101,20001201,7D}') config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.startdates, ['20001101', '20001108', '20001115', '20001122', '20001129']) self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '{20001101,20001201,7F}') with self.assertRaises(ConfigException): config = ExperimentConfig() config.parse_ini(self.mock_parser) def test_get_member_str(self): config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.get_member_str(1), 'fc1') def test_get_full_years(self): self.mock_parser.add_value('EXPERIMENT', 'CHUNK_SIZE', 3) self.mock_parser.add_value('EXPERIMENT', 'CHUNKS', 15) config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.get_full_years('20000601'), [2001, 2002, 2003, 2004]) self.assertEqual(config.get_full_years('20000101'), [2000, 2001, 2002, 2003]) def test_get_year_chunks(self): self.mock_parser.add_value('EXPERIMENT', 'CHUNK_SIZE', 3) self.mock_parser.add_value('EXPERIMENT', 'CHUNKS', 13) config = ExperimentConfig() config.parse_ini(self.mock_parser) self.assertEqual(config.get_year_chunks('20000601', 2003), [11, 12, 13]) self.assertEqual(config.get_year_chunks('20000601', 2001), [3, 4, 5, 6, 7]) self.assertEqual(config.get_year_chunks('20000101', 2000), [1, 2, 3, 4]) self.assertEqual(config.get_year_chunks('20000601', 2000), [1, 2, 3]) self.assertEqual(config.get_year_chunks('20000601', 1999), []) def test_get_chunk_list(self): config = ExperimentConfig() config.startdates = ('20010101', ) config.members = (0, 1, 2) config.chunk_list = [0] config.num_chunks = 2 self.assertEqual(config.get_chunk_list(), [('20010101', 0, 0), ('20010101', 1, 0), ('20010101', 2, 0)]) config.chunk_list = [] self.assertEqual(config.get_chunk_list(), [('20010101', 0, 1), ('20010101', 0, 2), ('20010101', 1, 1), ('20010101', 1, 2), ('20010101', 2, 1), ('20010101', 2, 2)]) def test_get_member_list(self): config = ExperimentConfig() config.startdates = ('20010101', ) config.members = (0, 1, 2) self.assertEqual(config.get_member_list(), [('20010101', 0), ('20010101', 1), ('20010101', 2)]) def test_get_chunk_start_str(self): config = ExperimentConfig() self.mock_parser.add_value('EXPERIMENT', 'CHUNK_SIZE', 12) self.mock_parser.add_value('EXPERIMENT', 'CHUNKS', 3) config.parse_ini(self.mock_parser) self.assertEqual(config.get_chunk_start_str('20001101', 3), '20021101') def test_get_chunk_start_str_datetime(self): config = ExperimentConfig() self.mock_parser.add_value('EXPERIMENT', 'CHUNK_SIZE', 12) self.mock_parser.add_value('EXPERIMENT', 'CHUNKS', 3) date = datetime.datetime(year=2000, month=11, day=1) config.parse_ini(self.mock_parser) self.assertEqual(config.get_chunk_start_str(date, 3), '20021101') def test_get_chunk_end_str(self): config = ExperimentConfig() self.mock_parser.add_value('EXPERIMENT', 'CHUNK_SIZE', 12) self.mock_parser.add_value('EXPERIMENT', 'CHUNKS', 3) config.parse_ini(self.mock_parser) self.assertEqual(config.get_chunk_end_str('20001101', 3), '20031101') class TestConfig(TestCase): def setUp(self): self.mock_parser = ParserMock() self.mock_parser.add_value('DIAGNOSTICS', 'FREQUENCY', 'mon') self.mock_parser.add_value('DIAGNOSTICS', 'DIAGS', 'diag1 diag2') self.mock_parser.add_value('DIAGNOSTICS', 'SCRATCH_DIR', 'scratch') self._environ = dict(os.environ) def tearDown(self): os.environ.clear() os.environ.update(self._environ) def _parse(self, config): def mock_new(): return self.mock_parser def mock_new_exp(): mock_exp = mock.Mock() mock_exp.expid = 'expid' return mock_exp with mock.patch('earthdiagnostics.config.ConfigParser', new=mock_new): with mock.patch('earthdiagnostics.config.VariableManager'): with mock.patch('earthdiagnostics.config.ExperimentConfig', new=mock_new_exp): with mock.patch('earthdiagnostics.config.CMORConfig'): with mock.patch('earthdiagnostics.config.THREDDSConfig'): with mock.patch('os.path.isfile'): config.parse('path') def test_diags(self): config = Config() self.mock_parser.add_value('DIAGNOSTICS', 'DIAGS', 'diag1 diag2,opt1,opt2 # Commented diag') self._parse(config) self.assertEqual(config.get_commands(), (['diag1', 'diag2,opt1,opt2'])) def test_file_not_found(self): config = Config() with self.assertRaises(ValueError): config.parse('path') def test_parse(self): config = Config() self._parse(config) self.assertEqual(config.frequency, Frequencies.monthly) self.assertEqual(config.auto_clean, True) self.assertEqual(config.cdftools_path, '') self.assertEqual(config.con_files, '') self.assertEqual(config.data_adaptor, 'CMOR') self.assertEqual(config.get_commands(), (['diag1', 'diag2'])) def test_alias(self): config = Config() self.mock_parser.add_value('ALIAS', 'diag1', 'diag3') self._parse(config) self.assertEqual(config.get_commands(), ['diag3', 'diag2']) def test_auto_clean_ram_disk(self): config = Config() self.mock_parser.add_value('DIAGNOSTICS', 'AUTO_CLEAN', False) self.mock_parser.add_value('DIAGNOSTICS', 'USE_RAMDISK', True) self._parse(config) self.assertEqual(config.auto_clean, True) self.assertEqual(config.use_ramdisk, True) def test_data_convention_primavera(self): config = Config() self.mock_parser.add_value('DIAGNOSTICS', 'DATA_CONVENTION', 'primavera') self._parse(config) self.assertIsInstance(config.data_convention, PrimaveraConvention) self.assertEqual(config.scratch_masks, '/scratch/Earth/ocean_masks/primavera') namelist = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'earthdiagnostics/CDFTOOLS_primavera.namlist')) self.assertEqual(os.environ['NAM_CDF_NAMES'], namelist)