from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.utils import TempFile from bscearth.utils import log from unittest import TestCase from mock import Mock import os import tempfile import shutil import iris import iris.cube from iris.coords import DimCoord, AuxCoord import tarfile import gzip as gz import numpy as np import six class TestCmorizer(TestCase): def _get_variable_and_alias(self, variable): mock_alias = Mock() mock_alias.basin = None mock_alias.grid = None mock_variable = Mock() mock_variable.short_name = variable mock_variable.domain = 'domain' return mock_alias, mock_variable def _get_file_path(self, *args, **kwargs): return os.path.join(self.tmp_dir, args[3], '{0[3]}.nc'.format(args)) def setUp(self): self.tmp_dir = tempfile.mkdtemp() self.data_manager = Mock() self.data_manager.get_file_path = self._get_file_path self.data_manager.config.data_dir = os.path.join(self.tmp_dir, 'data') self.data_manager.config.scratch_dir = os.path.join(self.tmp_dir, 'scratch') TempFile.scratch_folder = self.data_manager.config.scratch_dir self.data_manager.config.data_convention = 'data_convention' self.data_manager.config.var_manager.get_variable_and_alias = self._get_variable_and_alias self.data_manager.config.experiment.expid = 'expid' self.data_manager.config.experiment.model = 'model' self.data_manager.config.experiment.experiment_name = 'experiment_name' self.data_manager.config.experiment.num_chunks = 1 self.data_manager.config.experiment.chunk_size = 1 self.data_manager.config.experiment.institute = 'institute' self.data_manager.config.experiment.get_member_str.return_value = 'member' self.data_manager.config.cmor.ocean = True self.data_manager.config.cmor.atmosphere = True self.data_manager.config.cmor.filter_files = '' self.data_manager.config.cmor.associated_experiment = 'associated_experiment' self.data_manager.config.cmor.initialization_method = 'initialization_method' self.data_manager.config.cmor.initialization_description = 'initialization_description' self.data_manager.config.cmor.physics_version = 'physics_version' self.data_manager.config.cmor.physics_description = 'physics_description' self.data_manager.config.cmor.initialization_description = 'initialization_description' self.data_manager.config.cmor.associated_model = 'initialization_description' self.data_manager.config.cmor.source = 'source' os.makedirs(self.data_manager.config.data_dir) os.makedirs(self.data_manager.config.scratch_dir) def create_ocean_files(self, filename, tar_name, gzip=False): iris.FUTURE.netcdf_no_unlimited = True coord_data = np.array([0, 1], np.float) lat = DimCoord(coord_data, standard_name='latitude', long_name='latitude', var_name='lat', units='degrees_north') lon = DimCoord(coord_data, standard_name='longitude', long_name='longitude', var_name='lon', units='degrees_east') time = DimCoord(coord_data, standard_name='time', long_name='time', var_name='time', units='days since 1950-01-01') depth = DimCoord(coord_data, standard_name='depth', long_name='Depth', var_name='lev', units='m') var1 = iris.cube.Cube(np.random.rand(2, 2, 2).astype(np.float), long_name='Variable 1', var_name='var1') var1.add_dim_coord(time, 0) var1.add_dim_coord(lat, 1) var1.add_dim_coord(lon, 2) var2 = iris.cube.Cube(np.random.rand(2, 2, 2, 2).astype(np.float), long_name='Variable 2', var_name='var2') var2.add_dim_coord(time, 0) var2.add_dim_coord(lat, 1) var2.add_dim_coord(lon, 2) var2.add_dim_coord(depth, 3) folder_path = os.path.join(self.data_manager.config.data_dir, 'expid', 'original_files', '19900101', 'member', 'outputs') os.makedirs(folder_path) file_path = os.path.join(folder_path, filename) iris.save((var1, var2), file_path, zlib=True) if gzip: import subprocess process = subprocess.Popen(('gzip', file_path), stdout=subprocess.PIPE) comunicate = process.communicate() file_path = "{0}.gz".format(file_path) filename = "{0}.gz".format(filename) if process.returncode != 0: raise Exception('Can not compress: {0}'.format(comunicate)) tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode='w') tar.add(file_path, arcname=filename, recursive=False) tar.close() os.remove(file_path) def tearDown(self): shutil.rmtree(self.tmp_dir) def test_skip_ocean_cmorization(self): self.data_manager.config.cmor.ocean = False cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def test_skip_atmos_cmorization(self): self.data_manager.config.cmor.atmosphere = False cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_atmos() def test_ocean_cmorization_no_files(self): if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertFalse([record for record in cmd.records if record.levelno == log.Log.RESULT]) self.assertTrue([record for record in cmd.records if record.levelno == log.Log.ERROR]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.CRITICAL]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.WARNING]) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def _test_ocean_cmor(self): if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() self.assertTrue([record for record in cmd.records if record.levelno == log.Log.RESULT]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.ERROR]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.CRITICAL]) self.assertFalse([record for record in cmd.records if record.levelno == log.Log.WARNING]) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def test_ocean_cmorization(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_gzip(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar', gzip=True) self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_PPO(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'PPO_expid_1D_xx_19900101_19900131.tar') self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc'))) def test_ocean_cmorization_diags(self): self.create_ocean_files('expid_1d_19900101_19900131.nc', 'diags_expid_1D_xx_19900101_19900131.tar') self._test_ocean_cmor() self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var1', 'var1.nc'))) self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, 'var2', 'var2.nc')))