from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.utils import TempFile from bscearth.utils import log from unittest import TestCase from mock import Mock import os import tempfile import shutil import iris import iris.cube from iris.coords import DimCoord, AuxCoord import tarfile import gzip as gz import numpy as np import six class TestCmorizer(TestCase): def _get_variable_and_alias(self, variable): mock_alias = Mock() mock_alias.basin = None mock_alias.grid = None mock_variable = Mock() mock_variable.short_name = variable mock_variable.domain = 'domain' return mock_alias, mock_variable def _get_file_path(self, *args, **kwargs): return os.path.join(self.tmp_dir, args[3], '{0[3]}.nc'.format(args)) def setUp(self): self.tmp_dir = tempfile.mkdtemp() self.data_manager = Mock() self.data_manager.get_file_path = self._get_file_path self.data_manager.is_cmorized.return_value = False self.data_manager.config.data_dir = os.path.join(self.tmp_dir, 'data') self.data_manager.config.scratch_dir = os.path.join(self.tmp_dir, 'scratch') TempFile.scratch_folder = self.data_manager.config.scratch_dir self.data_manager.config.data_convention = 'data_convention' self.data_manager.config.var_manager.get_variable_and_alias = self._get_variable_and_alias self.data_manager.config.experiment.expid = 'expid' self.data_manager.config.experiment.model = 'model' self.data_manager.config.experiment.experiment_name = 'experiment_name' self.data_manager.config.experiment.num_chunks = 1 self.data_manager.config.experiment.chunk_size = 1 self.data_manager.config.experiment.institute = 'institute' self.data_manager.config.experiment.get_member_str.return_value = 'member' self.data_manager.config.cmor.force = False self.data_manager.config.cmor.ocean = True self.data_manager.config.cmor.atmosphere = True self.data_manager.config.cmor.filter_files = '' self.data_manager.config.cmor.associated_experiment = 'associated_experiment' self.data_manager.config.cmor.initialization_method = 'initialization_method' self.data_manager.config.cmor.initialization_description = 'initialization_description' self.data_manager.config.cmor.physics_version = 'physics_version' self.data_manager.config.cmor.physics_description = 'physics_description' self.data_manager.config.cmor.initialization_description = 'initialization_description' self.data_manager.config.cmor.associated_model = 'initialization_description' self.data_manager.config.cmor.source = 'source' os.makedirs(self.data_manager.config.data_dir) os.makedirs(self.data_manager.config.scratch_dir) def create_ocean_files(self, filename, tar_name, gzip=False, backup=False): iris.FUTURE.netcdf_no_unlimited = True coord_data = np.array([0, 1], np.float) lat = DimCoord(coord_data, standard_name='latitude', long_name='latitude', var_name='lat', units='degrees_north') lon = DimCoord(coord_data, standard_name='longitude', long_name='longitude', var_name='lon', units='degrees_east') time = DimCoord(coord_data, standard_name='time', long_name='time', var_name='time', units='days since 1950-01-01') depth = DimCoord(coord_data, standard_name='depth', long_name='Depth', var_name='lev', units='m') var1 = iris.cube.Cube(np.random.rand(2, 2, 2).astype(np.float), long_name='Variable 1', var_name='var1') var1.add_dim_coord(time, 0) var1.add_dim_coord(lat, 1) var1.add_dim_coord(lon, 2) var2 = iris.cube.Cube(np.random.rand(2, 2, 2, 2).astype(np.float), long_name='Variable 2', var_name='var2') var2.add_dim_coord(time, 0) var2.add_dim_coord(lat, 1) var2.add_dim_coord(lon, 2) var2.add_dim_coord(depth, 3) folder_path = os.path.join(self.data_manager.config.data_dir, 'expid', 'original_files', '19900101', 'member', 'outputs') os.makedirs(folder_path) file_path = os.path.join(folder_path, filename) iris.save((var1, var2), file_path, zlib=True) if gzip: import subprocess process = subprocess.Popen(('gzip', file_path), stdout=subprocess.PIPE) comunicate = process.communicate() file_path = "{0}.gz".format(file_path) filename = "{0}.gz".format(filename) if process.returncode != 0: raise Exception('Can not compress: {0}'.format(comunicate)) if backup: filename = os.path.join('backup', filename) tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode='w') tar.add(file_path, arcname=filename, recursive=False) tar.close() os.remove(file_path) def tearDown(self): shutil.rmtree(self.tmp_dir) def test_skip_ocean_cmorization(self): self.data_manager.config.cmor.ocean = False if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertLogs([record for record in cmd.records if record.message == 'Skipping ocean cmorization due to configuration']) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def test_skip_atmos_cmorization(self): self.data_manager.config.cmor.atmosphere = False if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_atmos() self.assertLogs([record for record in cmd.records if record.message == 'Skipping atmosphere cmorization due to configuration']) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def test_skip_when_cmorized(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.is_cmorized.return_value = True if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertTrue([record for record in cmd.records if record.message == 'No need to unpack file 1/1']) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def test_skip_when_not_requested(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.config.cmor.chunk_cmorization_requested.return_value = False if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertTrue([record for record in cmd.records if record.message == 'No need to unpack file 1/1']) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def test_force(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.is_cmorized.return_value = True self.data_manager.config.cmor.force = True self._test_ocean_cmor() def test_ocean_cmorization_no_files(self): if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertFalse([record for record in cmd.records if record.levelno == log.Log.RESULT]) self.assertTrue([record for record in cmd.records if record.levelno == log.Log.ERROR]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.CRITICAL]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.WARNING]) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def test_ocean_cmorization_not_vars_requested(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.config.cmor.any_required.return_value = False if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertTrue([record for record in cmd.records if record.levelno == log.Log.RESULT]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.ERROR]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.CRITICAL]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.WARNING]) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_no_vars_recognized(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') def not_recognized(*args): return None, None self.data_manager.config.var_manager.get_variable_and_alias= not_recognized if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertTrue([record for record in cmd.records if record.levelno == log.Log.RESULT]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.ERROR]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.CRITICAL]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.WARNING]) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_var2_not_requested(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') def _reject_var2(cmor_var): return cmor_var.short_name != 'var2' self.data_manager.config.cmor.cmorize = _reject_var2 if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertTrue([record for record in cmd.records if record.levelno == log.Log.RESULT]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.ERROR]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.CRITICAL]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.WARNING]) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def _test_ocean_cmor(self): if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertTrue([record for record in cmd.records if record.levelno == log.Log.RESULT]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.ERROR]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.CRITICAL]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.WARNING]) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def test_ocean_cmorization(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_with_filter(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.config.cmor.filter_files = 'expid' self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_with_bad_filter(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.config.cmor.filter_files = 'badfilter' if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertTrue([record for record in cmd.records if record.levelno == log.Log.RESULT]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.ERROR]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.CRITICAL]) self.assertTrue([record for record in cmd.records if record.levelno == log.Log.WARNING]) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_gzip(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar', gzip=True) self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_backup(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar', backup=True) self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_PPO(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'PPO_expid_1D_xx_19900101_19900131.tar') self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_diags(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'diags_expid_1D_xx_19900101_19900131.tar') self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc')))