test_cmorizer.py 21.6 KB
Newer Older
"""Tests for earthdiagnostics.cmorizer module"""
from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.utils import TempFile, Utils
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.data_convention import DataConvention
from bscearth.utils import log

from unittest import TestCase
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from mock import Mock, create_autospec
import os
import tempfile
import shutil
import iris
import iris.cube
from iris.coords import DimCoord
import tarfile
import numpy as np
import six


class TestCmorizer(TestCase):
    """Tests for Cmorizer class"""

    def _get_variable_and_alias(self, variable):
        mock_alias = Mock()
        mock_alias.basin = None
        mock_alias.grid = None

        mock_variable = self._get_variable(variable)

        return mock_alias, mock_variable

    def _get_variable(self, variable, silent=False):
        mock_variable = Mock()
        mock_variable.short_name = variable
        mock_variable.domain = 'domain'

        return mock_variable

    def _get_file_path(self, *args, **kwargs):
        return os.path.join(self.tmp_dir,  args[3], '{0[3]}.nc'.format(args))
    def _get_file_path_grib(self, *args, **kwargs):
        return os.path.join(self.tmp_dir,  args[3], str(args[6]), '{0[3]}.nc'.format(args))

    def setUp(self):
        """Prepare tests"""
        self.tmp_dir = tempfile.mkdtemp()

        self.data_manager = Mock()
        self.data_manager.is_cmorized.return_value = False
        self.data_manager.config.data_dir = os.path.join(self.tmp_dir, 'data')
        self.data_manager.config.scratch_dir = os.path.join(self.tmp_dir, 'scratch')
        TempFile.scratch_folder = self.data_manager.config.scratch_dir
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_manager.config.data_convention = create_autospec(DataConvention)
        self.data_manager.config.data_convention.name = 'data_convention'
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_manager.config.data_convention.lat_name = 'lat'
        self.data_manager.config.data_convention.lon_name = 'lon'
        self.data_manager.config.data_convention.get_file_path = self._get_file_path

        self.data_manager.config.var_manager.get_variable_and_alias = self._get_variable_and_alias
        self.data_manager.config.var_manager.get_variable = self._get_variable
        self.data_manager.variable_list = self.data_manager.config.var_manager

        self.data_manager.config.experiment.expid = 'expid'
        self.data_manager.config.experiment.model = 'model'
        self.data_manager.config.experiment.experiment_name = 'experiment_name'
        self.data_manager.config.experiment.num_chunks = 1
        self.data_manager.config.experiment.chunk_size = 1
        self.data_manager.config.experiment.institute = 'institute'
        self.data_manager.config.experiment.get_member_str.return_value = 'member'
        self.data_manager.config.experiment.atmos_timestep = 6
        self.data_manager.config.cmor.force = False
        self.data_manager.config.cmor.ocean = True
        self.data_manager.config.cmor.atmosphere = True
        self.data_manager.config.cmor.use_grib = True
        self.data_manager.config.cmor.filter_files = ''
        self.data_manager.config.cmor.associated_experiment = 'associated_experiment'
        self.data_manager.config.cmor.initialization_method = 'initialization_method'
        self.data_manager.config.cmor.initialization_description = 'initialization_description'
        self.data_manager.config.cmor.physics_version = 'physics_version'
        self.data_manager.config.cmor.physics_description = 'physics_description'
        self.data_manager.config.cmor.initialization_description = 'initialization_description'
        self.data_manager.config.cmor.associated_model = 'initialization_description'
        self.data_manager.config.cmor.source = 'source'
        self.data_manager.config.cmor.get_requested_codes.return_value = {228, 142, 143, 201, 202, 129, 169, 180}
        self.data_manager.config.cmor.get_variables.return_value = {228, 142, 143, 201, 202, 129, 169, 180}
        self.data_manager.config.cmor.get_levels.return_value = None

        os.makedirs(self.data_manager.config.data_dir)
        os.makedirs(self.data_manager.config.scratch_dir)

    def _create_ocean_files(self, filename, tar_name, gzip=False, backup=False):
        folder_path = os.path.join(self.data_manager.config.data_dir, 'expid', 'original_files', '19900101', 'member',
                                   'outputs')
        file_path, filename = self._create_file(folder_path, filename, gzip)
        if backup:
            filename = os.path.join('backup', filename)

        tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode='w')
        tar.add(file_path, arcname=filename, recursive=False)
        tar.close()
        os.remove(file_path)

    def _create_mma_files(self, filename, tar_name, gzip=False):
        folder_path = os.path.join(self.data_manager.config.data_dir, 'expid', 'original_files', '19900101', 'member',
                                   'outputs')
        filepath_gg, filename_gg = self._create_file(folder_path, filename.replace('??', 'GG'), gzip)
        filepath_sh, filename_sh = self._create_file(folder_path, filename.replace('??', 'SH'), gzip)

        tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode='w')
        tar.add(filepath_gg, arcname=filename_gg, recursive=False)
        tar.add(filepath_sh, arcname=filename_sh, recursive=False)
        tar.close()
        os.remove(filepath_gg)
        os.remove(filepath_sh)

    def _create_file(self, folder_path, filename, gzip):
        var1 = self._create_sample_cube('Variable 1', 'var1', threed=False, time_bounds=True)
        var2 = self._create_sample_cube('Variable 2', 'var2', threed=True, time_bounds=True)
        if not os.path.isdir(folder_path):
            os.makedirs(folder_path)
        file_path = os.path.join(folder_path, filename)
        iris.save((var1, var2), file_path, zlib=True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if gzip:
            import subprocess
            process = subprocess.Popen(('gzip', file_path), stdout=subprocess.PIPE)
            comunicate = process.communicate()
            file_path = "{0}.gz".format(file_path)
            filename = "{0}.gz".format(filename)
            if process.returncode != 0:
                raise Exception('Can not compress: {0}'.format(comunicate))
        return file_path, filename
    def _create_sample_cube(self, long_name, var_name, threed, time_bounds):
        coord_data = np.array([1, 2], np.float)
        lat = DimCoord(coord_data, standard_name='latitude', long_name='latitude', var_name='lat',
                       units='degrees_north')
        lon = DimCoord(coord_data, standard_name='longitude', long_name='longitude', var_name='lon',
                       units='degrees_east')
        time = DimCoord(coord_data, standard_name='time', long_name='time', var_name='time',
                        units='days since 1950-01-01')
        if time_bounds:
            time.bounds = np.array([[0.5, 1.5], [1.5, 2.5]], np.float)

        if threed:
            data = np.random.rand(2, 2, 2, 2).astype(np.float)
            depth = DimCoord(coord_data, standard_name='depth', long_name='Depth', var_name='lev', units='m')
        else:
            data = np.random.rand(2, 2, 2).astype(np.float)

        cube = iris.cube.Cube(data, long_name=long_name, var_name=var_name)
        cube.add_dim_coord(time, 0)
        cube.add_dim_coord(lat, 1)
        cube.add_dim_coord(lon, 2)
        if threed:
            cube.add_dim_coord(depth, 3)
        return cube

    def tearDown(self):
        """Clean up after tests"""
        shutil.rmtree(self.tmp_dir)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _test_ocean_cmor(self, success=True, error=False, critical=False, warnings=False, message='', check_vars=None):
        self._test_cmorization(success=success, error=error, critical=critical, warnings=warnings, message=message,
                               ocean=True, check_vars=check_vars)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _test_atmos_cmor(self, success=True, error=False, critical=False, warnings=False, message='', check_vars=None):
        self._test_cmorization(success=success, error=error, critical=critical, warnings=warnings, message=message,
                               ocean=False, check_vars=check_vars)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _test_cmorization(self, success=True, error=False, critical=False, warnings=False, message='', ocean=True,
                          check_vars=None):
        self._check_logs(critical, error, message, ocean, success, warnings)
        if check_vars:
            for variable, status in six.iteritems(check_vars):
                if status:
                    self.assertTrue(os.path.isfile(os.path.join(self.tmp_dir, variable, '{}.nc'.format(variable))))
                else:
                    self.assertFalse(os.path.isfile(os.path.join(self.tmp_dir, variable, '{}.nc'.format(variable))))

    def _check_logs(self, critical, error, message, ocean, success, warnings):
        if six.PY3:
            with self.assertLogs(log.Log.log) as cmd:
                cmorizer = Cmorizer(self.data_manager, '19900101', 0)
                if ocean:
                    cmorizer.cmorize_ocean()
                else:
                    cmorizer.cmorize_atmos()
            if message:
                self.assertTrue([record for record in cmd.records if record.message == message])
            else:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                for level, value in six.iteritems({log.Log.RESULT: success, log.Log.ERROR: error,
                                                   log.Log.CRITICAL: critical, log.Log.WARNING: warnings}):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    if value:
                        self.assertTrue([record for record in cmd.records if record.levelno == level])
                    else:
                        self.assertFalse([record for record in cmd.records if record.levelno == level])
        else:
            cmorizer = Cmorizer(self.data_manager, '19900101', 0)
            if ocean:
                cmorizer.cmorize_ocean()
            else:
                cmorizer.cmorize_atmos()

    def test_skip_ocean_cmorization(self):
        """Test ocean cmorization flag disabled option"""
        self.data_manager.config.cmor.ocean = False
        self._test_ocean_cmor(message='Skipping ocean cmorization due to configuration')

    def test_skip_atmos_cmorization(self):
        """Test atmos cmorization flag disabled option"""
        self.data_manager.config.cmor.atmosphere = False
        if six.PY3:
            with self.assertLogs(log.Log.log) as cmd:
                cmorizer = Cmorizer(self.data_manager, '19900101', 0)
                cmorizer.cmorize_atmos()
            self.assertTrue([record for record in cmd.records if
                             record.message == 'Skipping atmosphere cmorization due to configuration'])
        else:
            cmorizer = Cmorizer(self.data_manager, '19900101', 0)
            cmorizer.cmorize_ocean()

    def test_skip_when_cmorized(self):
        """Test cmorization skipped if already done"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar')
        self.data_manager.is_cmorized.return_value = True
        self._test_ocean_cmor(message='No need to unpack file 1/1')
    def test_skip_when_not_requested(self):
        """Test cmorization skipped if chunk is not requested"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar')
        self.data_manager.config.cmor.chunk_cmorization_requested.return_value = False
        self._test_ocean_cmor(message='No need to unpack file 1/1')
    def test_force(self):
        """Test cmorization force works"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar')
        self.data_manager.is_cmorized.return_value = True
        self.data_manager.config.cmor.force = True
        self._test_ocean_cmor()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def test_ocean_cmorization_no_files(self):
        """Test ocean cmorization report error if no input data"""
        self._test_ocean_cmor(success=False, error=True)
    def test_ocean_cmorization_not_vars_requested(self):
        """Test ocean cmorization report success if no vars qhere requested"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar')
        self.data_manager.config.cmor.any_required.return_value = False
        self._test_ocean_cmor(check_vars={'var1': False, 'var2': False})

    def test_ocean_cmorization_no_vars_recognized(self):
        """Test ocean cmorization report success if no vars where recognized"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar')

        def not_recognized(*args):
            return None, None
        self.data_manager.config.var_manager.get_variable_and_alias = not_recognized
        self._test_ocean_cmor(check_vars={'var1': False, 'var2': False})

    def test_ocean_cmorization_var2_not_requested(self):
        """Test ocean cmorization with var2 not recognized"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar')

        def _reject_var2(cmor_var):
            return cmor_var.short_name != 'var2'

        self.data_manager.config.cmor.cmorize = _reject_var2
        self._test_ocean_cmor(check_vars={'var1': True, 'var2': False})
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def test_ocean_cmorization(self):
        """Test basic ocean cmorization"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar')
        self._test_ocean_cmor(check_vars={'var1': True, 'var2': True})
    def test_ocean_cmorization_with_filter(self):
        """Test ocean cmorization filtering files"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar')
        self.data_manager.config.cmor.filter_files = 'expid'
        self._test_ocean_cmor(check_vars={'var1': True, 'var2': True})

    def test_ocean_cmorization_with_bad_filter(self):
        """Test ocean cmorization fails if a bad filter is added"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar')
        self.data_manager.config.cmor.filter_files = 'badfilter'
        self._test_ocean_cmor(warnings=True, check_vars={'var1': False, 'var2': False})
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def test_ocean_cmorization_gzip(self):
        """Test ocean cmorization if tars are also zipped"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar', gzip=True)
        self._test_ocean_cmor(check_vars={'var1': True, 'var2': True})
    def test_ocean_cmorization_backup(self):
        """Test ocean cmorization when files are in backup path"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar', backup=True)
        self._test_ocean_cmor(check_vars={'var1': True, 'var2': True})
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def test_ocean_cmorization_PPO(self):
        """Test ocean cmorization when files are PPO"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'PPO_expid_1D_xx_19900101_19900131.tar')
        self._test_ocean_cmor(check_vars={'var1': True, 'var2': True})
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def test_ocean_cmorization_diags(self):
        """Test ocean cmorization when files are diags"""
        self._create_ocean_files('expid_1d_19900101_19900131.nc', 'diags_expid_1D_xx_19900101_19900131.tar')
        self._test_ocean_cmor(check_vars={'var1': True, 'var2': True})

    def test_atmos_cmorization(self):
        """Test basic atmos cmorization from nc"""
        self._create_mma_files('MMA_1d_??_19900101_19900131.nc', 'MMA_expid_19901101_fc0_19900101-19900131.tar')
        self._test_atmos_cmor(check_vars={'var1': True, 'var2': True})
    def test_skip_when_not_requested_mma(self):
        """Test atmos cmorization is skipped if chunk is not requested"""
        self._create_mma_files('MMA_1d_??_19900101_19900131.nc', 'MMA_expid_19901101_fc0_19900101-19900131.tar')
        self.data_manager.config.cmor.chunk_cmorization_requested.return_value = False
        self._test_atmos_cmor(message='No need to unpack file 1/1')

    def test_force_mma(self):
        """Test force atmos cmorization"""
        self._create_mma_files('MMA_1d_??_19900101_19900131.nc', 'MMA_expid_19901101_fc0_19900101-19900131.tar')
        self.data_manager.is_cmorized.return_value = True
        self.data_manager.config.cmor.force = True
        self._test_atmos_cmor()

    def test_atmos_cmorization_no_mma_files(self):
        """Test atmos cmorization report error if there are no files"""
        self._test_atmos_cmor(success=False, error=True)
    def _create_grib_files(self, filename, month):
        filename = filename.format(month)
        coord_data = np.array([0, 1], np.float)
        folder_path = os.path.join(self.data_manager.config.data_dir, 'expid', 'original_files', '19900101', 'member',
                                   'outputs')
        self._create_file_for_grib(coord_data, folder_path, filename.replace('??', 'GG'), [142, 143, 129, 169, 180],
                                   month)
        self._create_file_for_grib(coord_data, folder_path, filename.replace('??', 'SH'), [201, 202],
                                   month)
    def _create_file_for_grib(self, coord_data, folder_path, filename, codes, month):
        lat = DimCoord(coord_data, standard_name='latitude', long_name='latitude', var_name='lat',
                       units='degrees_north')
        lon = DimCoord(coord_data, standard_name='longitude', long_name='longitude', var_name='lon',
                       units='degrees_east')
        month_days = calendar.monthrange(1990, month)[1]
        month -= 1
        time_data = np.arange(0.25, month_days + 0.25, 0.25, np.float) + month * 31
        time = DimCoord(time_data, standard_name='time', long_name='time', var_name='time',
                        units='days since 1990-01-01 00:00:00')
        for code in codes:
            var = iris.cube.Cube(np.ones((month_days * 4, 2, 2), np.float) * code,
                                 long_name='Variable {}'.format(code),
                                 var_name='var{}'.format(code))
            for x, data in enumerate(time_data):
                var.data[x, ...] += data
            var.add_dim_coord(time, 0)
            var.add_dim_coord(lat, 1)
            var.add_dim_coord(lon, 2)
            var.attributes['table'] = np.int32(128)
            var.attributes['code'] = np.int32(code)
            variables.append(var)
        if not os.path.isdir(folder_path):
            os.makedirs(folder_path)
        file_path = os.path.join(folder_path, filename)
        iris.save(variables, file_path, zlib=True, local_keys=('table', 'code'))
        Utils.cdo.settaxis('1990-0{}-01,06:00,6hour'.format(month + 1),
                           input=file_path,
                           output=file_path.replace('.nc', '.grb'),
                           options='-f grb2')
        os.remove(file_path)

    def test_grib_cmorization(self):
        """Test atmos cmorization from grib"""
        self.data_manager.config.data_convention.get_file_path = self._get_file_path_grib
        self.data_manager.config.experiment.chunk_size = 2
        self.data_manager.get_file_path = self._get_file_path_grib
        self._create_grib_files('ICM??expid+19900{}.nc', 1)
        self._create_grib_files('ICM??expid+19900{}.nc', 2)
        self._test_atmos_cmor()
        variables = {
            'CP': 143,
            'EWSS': 180,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            'LSP': 142,
            'MN2T': 202,
            'MX2T': 201,
            'SSRD': 169,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            'TP': 228,
            'Z': 129
        }
        for var, code in six.iteritems(variables):
            self.assertTrue(os.path.isdir(os.path.join(self.tmp_dir, var)))
            base_data = np.ones((2, 2), np.float) * code

            if var in ('EWSS', 'TP', 'MN2T', 'MX2T', 'SSRD', 'TP'):
                if var == 'MX2T':
                    month_offsets = np.array((16, 45.5))
                    daily_offsets = np.arange(1.0, 60.0)
                elif var == 'MN2T':
                    month_offsets = np.array((15.25, 44.75))
                    daily_offsets = np.arange(0.25, 59.25)
                else:
                    month_offsets = np.array((15.625, 45.125))
                    daily_offsets = np.arange(0.625, 59.625)

                hourly_offsets = np.arange(0.25, 59.25, 0.25)
            else:
                month_offsets = np.array((15.5, 44.875))
                daily_offsets = np.arange(0.375, 59.375)
                daily_offsets[0] = 0.5
                hourly_offsets = np.arange(0.25, 59, 0.25)
            if code == 129:
                factor = 9.81
            elif code in (180, 169):
                factor = 6 * 3600.0
            elif code == 228:
                base_data = np.ones((2, 2, 2), np.float) * (142 + 143)
                month_offsets *= 2
                daily_offsets *= 2
                hourly_offsets *= 2
                factor = 6 * 3600.0 / 1000
            base_data /= factor
            month_offsets /= factor
            daily_offsets /= factor
            hourly_offsets /= factor

            monthly = iris.load_cube(os.path.join(self.tmp_dir, var, 'mon', '{}.nc'.format(var)))
            self._test_data(monthly, base_data, month_offsets, var, 'Month')

            daily = iris.load_cube(os.path.join(self.tmp_dir, var, 'day', '{}.nc'.format(var)))
            self._test_data(daily, base_data, daily_offsets, var, 'Day')

            hourly = iris.load_cube(os.path.join(self.tmp_dir, var, '6hr', '{}.nc'.format(var)))
            self._test_data(hourly, base_data, hourly_offsets, var, 'Hour')

    def _test_data(self, data, base_data, offsets, var, freq):
        self.assertEqual(data.coord('time').shape, (len(offsets),))
        for x, offset in enumerate(offsets):
            self.assertTrue(np.allclose(data.data[x, ...], base_data + offset),
                            '{} {} data wrong for {}: {}'.format(freq, x, var, data.data[x, ...] - base_data))