"""Tests for earthdiagnostics.cmorizer module""" from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.utils import TempFile, Utils from earthdiagnostics.data_convention import DataConvention from bscearth.utils import log from unittest import TestCase from mock import Mock, create_autospec import os import tempfile import shutil import iris import iris.cube from iris.coords import DimCoord import tarfile import numpy as np import six import calendar class TestCmorizer(TestCase): """Tests for Cmorizer class""" def _get_variable_and_alias(self, variable): mock_alias = Mock() mock_alias.basin = None mock_alias.grid = None mock_variable = self._get_variable(variable) return mock_alias, mock_variable def _get_variable(self, variable, silent=False): mock_variable = Mock() mock_variable.short_name = variable mock_variable.domain = 'domain' return mock_variable def _get_file_path(self, *args, **kwargs): return os.path.join(self.tmp_dir, args[3], '{0[3]}.nc'.format(args)) def _get_file_path_grib(self, *args, **kwargs): return os.path.join(self.tmp_dir, args[3], str(args[6]), '{0[3]}.nc'.format(args)) def setUp(self): """Prepare tests""" self.tmp_dir = tempfile.mkdtemp() self.data_manager = Mock() self.data_manager.is_cmorized.return_value = False self.data_manager.config.data_dir = os.path.join(self.tmp_dir, 'data') self.data_manager.config.scratch_dir = os.path.join(self.tmp_dir, 'scratch') TempFile.scratch_folder = self.data_manager.config.scratch_dir self.data_manager.config.data_convention = create_autospec(DataConvention) self.data_manager.config.data_convention.name = 'data_convention' self.data_manager.config.data_convention.lat_name = 'lat' self.data_manager.config.data_convention.lon_name = 'lon' self.data_manager.config.data_convention.get_file_path = self._get_file_path self.data_manager.config.var_manager.get_variable_and_alias = self._get_variable_and_alias self.data_manager.config.var_manager.get_variable = self._get_variable self.data_manager.variable_list = self.data_manager.config.var_manager self.data_manager.config.experiment.expid = 'expid' self.data_manager.config.experiment.model = 'model' self.data_manager.config.experiment.experiment_name = 'experiment_name' self.data_manager.config.experiment.num_chunks = 1 self.data_manager.config.experiment.chunk_size = 1 self.data_manager.config.experiment.institute = 'institute' self.data_manager.config.experiment.get_member_str.return_value = 'member' self.data_manager.config.experiment.atmos_timestep = 6 self.data_manager.config.cmor.force = False self.data_manager.config.cmor.ocean = True self.data_manager.config.cmor.atmosphere = True self.data_manager.config.cmor.use_grib = True self.data_manager.config.cmor.filter_files = '' self.data_manager.config.cmor.associated_experiment = 'associated_experiment' self.data_manager.config.cmor.initialization_method = 'initialization_method' self.data_manager.config.cmor.initialization_description = 'initialization_description' self.data_manager.config.cmor.physics_version = 'physics_version' self.data_manager.config.cmor.physics_description = 'physics_description' self.data_manager.config.cmor.initialization_description = 'initialization_description' self.data_manager.config.cmor.associated_model = 'initialization_description' self.data_manager.config.cmor.source = 'source' self.data_manager.config.cmor.get_requested_codes.return_value = {228, 142, 143, 201, 202, 129, 169, 180} self.data_manager.config.cmor.get_variables.return_value = {228, 142, 143, 201, 202, 129, 169, 180} self.data_manager.config.cmor.get_levels.return_value = None os.makedirs(self.data_manager.config.data_dir) os.makedirs(self.data_manager.config.scratch_dir) def _create_ocean_files(self, filename, tar_name, gzip=False, backup=False): folder_path = os.path.join(self.data_manager.config.data_dir, 'expid', 'original_files', '19900101', 'member', 'outputs') file_path, filename = self._create_file(folder_path, filename, gzip) if backup: filename = os.path.join('backup', filename) tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode='w') tar.add(file_path, arcname=filename, recursive=False) tar.close() os.remove(file_path) def _create_mma_files(self, filename, tar_name, gzip=False): folder_path = os.path.join(self.data_manager.config.data_dir, 'expid', 'original_files', '19900101', 'member', 'outputs') filepath_gg, filename_gg = self._create_file(folder_path, filename.replace('??', 'GG'), gzip) filepath_sh, filename_sh = self._create_file(folder_path, filename.replace('??', 'SH'), gzip) tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode='w') tar.add(filepath_gg, arcname=filename_gg, recursive=False) tar.add(filepath_sh, arcname=filename_sh, recursive=False) tar.close() os.remove(filepath_gg) os.remove(filepath_sh) def _create_file(self, folder_path, filename, gzip): var1 = self._create_sample_cube('Variable 1', 'var1', threed=False, time_bounds=True) var2 = self._create_sample_cube('Variable 2', 'var2', threed=True, time_bounds=True) if not os.path.isdir(folder_path): os.makedirs(folder_path) file_path = os.path.join(folder_path, filename) iris.save((var1, var2), file_path, zlib=True) if gzip: import subprocess process = subprocess.Popen(('gzip', file_path), stdout=subprocess.PIPE) comunicate = process.communicate() file_path = "{0}.gz".format(file_path) filename = "{0}.gz".format(filename) if process.returncode != 0: raise Exception('Can not compress: {0}'.format(comunicate)) return file_path, filename def _create_sample_cube(self, long_name, var_name, threed, time_bounds): coord_data = np.array([1, 2], np.float) lat = DimCoord(coord_data, standard_name='latitude', long_name='latitude', var_name='lat', units='degrees_north') lon = DimCoord(coord_data, standard_name='longitude', long_name='longitude', var_name='lon', units='degrees_east') time = DimCoord(coord_data, standard_name='time', long_name='time', var_name='time', units='days since 1950-01-01') if time_bounds: time.bounds = np.array([[0.5, 1.5], [1.5, 2.5]], np.float) if threed: data = np.random.rand(2, 2, 2, 2).astype(np.float) depth = DimCoord(coord_data, standard_name='depth', long_name='Depth', var_name='lev', units='m') else: data = np.random.rand(2, 2, 2).astype(np.float) cube = iris.cube.Cube(data, long_name=long_name, var_name=var_name) cube.add_dim_coord(time, 0) cube.add_dim_coord(lat, 1) cube.add_dim_coord(lon, 2) if threed: cube.add_dim_coord(depth, 3) return cube def tearDown(self): """Clean up after tests""" shutil.rmtree(self.tmp_dir) def _test_ocean_cmor(self, success=True, error=False, critical=False, warnings=False, message='', check_vars=None): self._test_cmorization(success=success, error=error, critical=critical, warnings=warnings, message=message, ocean=True, check_vars=check_vars) def _test_atmos_cmor(self, success=True, error=False, critical=False, warnings=False, message='', check_vars=None): self._test_cmorization(success=success, error=error, critical=critical, warnings=warnings, message=message, ocean=False, check_vars=check_vars) def _test_cmorization(self, success=True, error=False, critical=False, warnings=False, message='', ocean=True, check_vars=None): self._check_logs(critical, error, message, ocean, success, warnings) if check_vars: for variable, status in six.iteritems(check_vars): if status: self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, variable, '{}.nc'.format(variable)))) else: self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, variable, '{}.nc'.format(variable)))) def _check_logs(self, critical, error, message, ocean, success, warnings): if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) if ocean: cmorizer.cmorize_ocean() else: cmorizer.cmorize_atmos() if message: self.assertTrue([record for record in cmd.records if record.message == message]) else: for level, value in six.iteritems({log.Log.RESULT: success, log.Log.ERROR: error, log.Log.CRITICAL: critical, log.Log.WARNING: warnings}): if value: self.assertTrue([record for record in cmd.records if record.levelno == level]) else: self.assertFalse([record for record in cmd.records if record.levelno == level]) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) if ocean: cmorizer.cmorize_ocean() else: cmorizer.cmorize_atmos() def test_skip_ocean_cmorization(self): """Test ocean cmorization flag disabled option""" self.data_manager.config.cmor.ocean = False self._test_ocean_cmor(message='Skipping ocean cmorization due to configuration') def test_skip_atmos_cmorization(self): """Test atmos cmorization flag disabled option""" self.data_manager.config.cmor.atmosphere = False if six.PY3: with self.assertLogs(log.Log.log) as cmd: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_atmos() self.assertTrue([record for record in cmd.records if record.message == 'Skipping atmosphere cmorization due to configuration']) else: cmorizer = Cmorizer(self.data_manager, '19900101', 0) cmorizer.cmorize_ocean() def test_skip_when_cmorized(self): """Test cmorization skipped if already done""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.is_cmorized.return_value = True self._test_ocean_cmor(message='No need to unpack file 1/1') def test_skip_when_not_requested(self): """Test cmorization skipped if chunk is not requested""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.config.cmor.chunk_cmorization_requested.return_value = False self._test_ocean_cmor(message='No need to unpack file 1/1') def test_force(self): """Test cmorization force works""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.is_cmorized.return_value = True self.data_manager.config.cmor.force = True self._test_ocean_cmor() def test_ocean_cmorization_no_files(self): """Test ocean cmorization report error if no input data""" self._test_ocean_cmor(success=False, error=True) def test_ocean_cmorization_not_vars_requested(self): """Test ocean cmorization report success if no vars qhere requested""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.config.cmor.any_required.return_value = False self._test_ocean_cmor(check_vars={'var1': False, 'var2': False}) def test_ocean_cmorization_no_vars_recognized(self): """Test ocean cmorization report success if no vars where recognized""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') def not_recognized(*args): return None, None self.data_manager.config.var_manager.get_variable_and_alias = not_recognized self._test_ocean_cmor(check_vars={'var1': False, 'var2': False}) def test_ocean_cmorization_var2_not_requested(self): """Test ocean cmorization with var2 not recognized""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') def _reject_var2(cmor_var): return cmor_var.short_name != 'var2' self.data_manager.config.cmor.cmorize = _reject_var2 self._test_ocean_cmor(check_vars={'var1': True, 'var2': False}) def test_ocean_cmorization(self): """Test basic ocean cmorization""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self._test_ocean_cmor(check_vars={'var1': True, 'var2': True}) def test_ocean_cmorization_with_filter(self): """Test ocean cmorization filtering files""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.config.cmor.filter_files = 'expid' self._test_ocean_cmor(check_vars={'var1': True, 'var2': True}) def test_ocean_cmorization_with_bad_filter(self): """Test ocean cmorization fails if a bad filter is added""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.config.cmor.filter_files = 'badfilter' self._test_ocean_cmor(warnings=True, check_vars={'var1': False, 'var2': False}) def test_ocean_cmorization_gzip(self): """Test ocean cmorization if tars are also zipped""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar', gzip=True) self._test_ocean_cmor(check_vars={'var1': True, 'var2': True}) def test_ocean_cmorization_backup(self): """Test ocean cmorization when files are in backup path""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar', backup=True) self._test_ocean_cmor(check_vars={'var1': True, 'var2': True}) def test_ocean_cmorization_PPO(self): """Test ocean cmorization when files are PPO""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'PPO_expid_1D_xx_19900101_19900131.tar') self._test_ocean_cmor(check_vars={'var1': True, 'var2': True}) def test_ocean_cmorization_diags(self): """Test ocean cmorization when files are diags""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'diags_expid_1D_xx_19900101_19900131.tar') self._test_ocean_cmor(check_vars={'var1': True, 'var2': True}) def test_atmos_cmorization(self): """Test basic atmos cmorization from nc""" self._create_mma_files('MMA_1d_??_19900101_19900131.nc', 'MMA_expid_19901101_fc0_19900101-19900131.tar') self._test_atmos_cmor(check_vars={'var1': True, 'var2': True}) def test_skip_when_not_requested_mma(self): """Test atmos cmorization is skipped if chunk is not requested""" self._create_mma_files('MMA_1d_??_19900101_19900131.nc', 'MMA_expid_19901101_fc0_19900101-19900131.tar') self.data_manager.config.cmor.chunk_cmorization_requested.return_value = False self._test_atmos_cmor(message='No need to unpack file 1/1') def test_force_mma(self): """Test force atmos cmorization""" self._create_mma_files('MMA_1d_??_19900101_19900131.nc', 'MMA_expid_19901101_fc0_19900101-19900131.tar') self.data_manager.is_cmorized.return_value = True self.data_manager.config.cmor.force = True self._test_atmos_cmor() def test_atmos_cmorization_no_mma_files(self): """Test atmos cmorization report error if there are no files""" self._test_atmos_cmor(success=False, error=True) def _create_grib_files(self, filename, month): filename = filename.format(month) coord_data = np.array([0, 1], np.float) folder_path = os.path.join(self.data_manager.config.data_dir, 'expid', 'original_files', '19900101', 'member', 'outputs') self._create_file_for_grib(coord_data, folder_path, filename.replace('??', 'GG'), [142, 143, 129, 169, 180], month) self._create_file_for_grib(coord_data, folder_path, filename.replace('??', 'SH'), [201, 202], month) def _create_file_for_grib(self, coord_data, folder_path, filename, codes, month): lat = DimCoord(coord_data, standard_name='latitude', long_name='latitude', var_name='lat', units='degrees_north') lon = DimCoord(coord_data, standard_name='longitude', long_name='longitude', var_name='lon', units='degrees_east') month_days = calendar.monthrange(1990, month)[1] month -= 1 time_data = np.arange(0.25, month_days + 0.25, 0.25, np.float) + month * 31 time = DimCoord(time_data, standard_name='time', long_name='time', var_name='time', units='days since 1990-01-01 00:00:00') variables = [] for code in codes: var = iris.cube.Cube(np.ones((month_days * 4, 2, 2), np.float) * code, long_name='Variable {}'.format(code), var_name='var{}'.format(code)) for x, data in enumerate(time_data): var.data[x, ...] += data var.add_dim_coord(time, 0) var.add_dim_coord(lat, 1) var.add_dim_coord(lon, 2) var.attributes['table'] = np.int32(128) var.attributes['code'] = np.int32(code) variables.append(var) if not os.path.isdir(folder_path): os.makedirs(folder_path) file_path = os.path.join(folder_path, filename) iris.save(variables, file_path, zlib=True, local_keys=('table', 'code')) Utils.cdo.settaxis('1990-0{}-01,06:00,6hour'.format(month + 1), input=file_path, output=file_path.replace('.nc', '.grb'), options='-f grb2') os.remove(file_path) def test_grib_cmorization(self): """Test atmos cmorization from grib""" self.data_manager.config.data_convention.get_file_path = self._get_file_path_grib self.data_manager.config.experiment.chunk_size = 2 self.data_manager.get_file_path = self._get_file_path_grib self._create_grib_files('ICM??expid+19900{}.nc', 1) self._create_grib_files('ICM??expid+19900{}.nc', 2) self._test_atmos_cmor() variables = { 'CP': 143, 'EWSS': 180, 'LSP': 142, 'MN2T': 202, 'MX2T': 201, 'SSRD': 169, 'TP': 228, 'Z': 129 } for var, code in six.iteritems(variables): self.assertTrue(os.path.isdir(os.path.join(self.tmp_dir, var))) base_data = np.ones((2, 2), np.float) * code if var in ('EWSS', 'TP', 'MN2T', 'MX2T', 'SSRD', 'TP'): if var == 'MX2T': month_offsets = np.array((16, 45.5)) daily_offsets = np.arange(1.0, 60.0) elif var == 'MN2T': month_offsets = np.array((15.25, 44.75)) daily_offsets = np.arange(0.25, 59.25) else: month_offsets = np.array((15.625, 45.125)) daily_offsets = np.arange(0.625, 59.625) hourly_offsets = np.arange(0.25, 59.25, 0.25) else: month_offsets = np.array((15.5, 44.875)) daily_offsets = np.arange(0.375, 59.375) daily_offsets[0] = 0.5 hourly_offsets = np.arange(0.25, 59, 0.25) factor = 1.0 if code == 129: factor = 9.81 elif code in (180, 169): factor = 6 * 3600.0 elif code == 228: base_data = np.ones((2, 2, 2), np.float) * (142 + 143) month_offsets *= 2 daily_offsets *= 2 hourly_offsets *= 2 factor = 6 * 3600.0 / 1000 base_data /= factor month_offsets /= factor daily_offsets /= factor hourly_offsets /= factor monthly = iris.load_cube(os.path.join(self.tmp_dir, var, 'mon', '{}.nc'.format(var))) self._test_data(monthly, base_data, month_offsets, var, 'Month') daily = iris.load_cube(os.path.join(self.tmp_dir, var, 'day', '{}.nc'.format(var))) self._test_data(daily, base_data, daily_offsets, var, 'Day') hourly = iris.load_cube(os.path.join(self.tmp_dir, var, '6hr', '{}.nc'.format(var))) self._test_data(hourly, base_data, hourly_offsets, var, 'Hour') def _test_data(self, data, base_data, offsets, var, freq): self.assertEqual(data.coord('time').shape, (len(offsets),)) for x, offset in enumerate(offsets): self.assertTrue(np.allclose(data.data[x, ...], base_data + offset), '{} {} data wrong for {}: {}'.format(freq, x, var, data.data[x, ...] - base_data))