diff --git a/VERSION b/VERSION index 944880fa15e85084780c290b929924d3f8b6085f..15a279981720791464e46ab21ae96b3c1c65c3b6 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.2.0 +3.3.0 diff --git a/earthdiagnostics/datafile.py b/earthdiagnostics/datafile.py index 98052c69909226b338b2e91ef9db54a722612954..f964024057b9a336678036e0c998fdbd4e4233b7 100644 --- a/earthdiagnostics/datafile.py +++ b/earthdiagnostics/datafile.py @@ -2,16 +2,14 @@ """Module for classes to manage storage manipulation""" import csv import os -import shutil from datetime import datetime -import six import iris +import iris.cube import iris.coords import iris.exceptions -import numpy as np +import iris.experimental.equalise_cubes from bscearth.utils.log import Log -import netCDF4 from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile @@ -92,7 +90,8 @@ class DataFile(Publisher): if self.local_status != LocalStatus.READY or self.suscribers or self.upload_required() or \ self.storage_status == StorageStatus.UPLOADING: return - Log.debug('File {0} no longer needed. Deleting from scratch...'.format(self.remote_file)) + Log.debug('File {0} no longer needed. Deleting from scratch...'.format( + self.remote_file)) os.remove(self.local_file) Log.debug('File {0} deleted from scratch'.format(self.remote_file)) self.local_file = None @@ -252,17 +251,16 @@ class DataFile(Publisher): else: original_name = self.var if self.final_name != original_name: - Utils.rename_variable(self.local_file, original_name, self.final_name) + Utils.rename_variable( + self.local_file, original_name, self.final_name) self._rename_coordinate_variables() self._correct_metadata() self._prepare_region() self.add_diagnostic_history() Utils.convert2netcdf4(self.local_file) - if self.region is not None: - self.upload() def upload(self): - """Send a loal file to the storage""" + """Send a local file to the storage""" self.storage_status = StorageStatus.UPLOADING remote_file = self.remote_file try: @@ -278,7 +276,7 @@ class DataFile(Publisher): # self.create_link() self.storage_status = StorageStatus.READY - def set_local_file(self, local_file, diagnostic=None, rename_var='', region=None): + def set_local_file(self, local_file, diagnostic=None, rename_var=''): """ Set the local file generated by EarthDiagnostics @@ -289,19 +287,15 @@ class DataFile(Publisher): local_file: str diagnostic: Diagnostic or None rename_var: str - region: Basin or None Returns ------- None """ + self.storage_status = StorageStatus.PENDING if diagnostic in self._modifiers: self._modifiers.remove(diagnostic) - if region is not None: - self.region = region - else: - self.region = None self.local_file = local_file self.prepare_to_upload(rename_var) self.local_status = LocalStatus.READY @@ -313,9 +307,15 @@ class DataFile(Publisher): def _correct_metadata(self): handler = Utils.open_cdf(self.local_file) var_handler = handler.variables[self.final_name] - coords = set.intersection({'time', 'lev', self.lat_name, self.lon_name, 'leadtime', 'region', 'time_centered'}, - set(handler.variables.keys())) - var_handler.coordinates = Utils.convert_to_ascii_if_possible(' '.join(coords)) + coords = set.intersection( + { + 'time', 'lev', self.lat_name, self.lon_name, + 'leadtime', 'region', 'time_centered' + }, + set(handler.variables.keys()) + ) + var_handler.coordinates = Utils.convert_to_ascii_if_possible( + ' '.join(coords)) if 'time_centered' in handler.variables: if hasattr(handler.variables['time_centered'], 'standard_name'): del handler.variables['time_centered'].standard_name @@ -400,99 +400,46 @@ class DataFile(Publisher): var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): - var_handler.valid_min = float(var_handler.valid_min) * factor + offset + var_handler.valid_min = float( + var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): - var_handler.valid_max = float(var_handler.valid_max) * factor + offset + var_handler.valid_max = float( + var_handler.valid_max) * factor + offset def _prepare_region(self): - if not self.region: + if not self.check_is_in_storage(update_status=False): return - self._add_region_dimension_to_var() - if os.path.exists(self.remote_file): - self._update_var_with_region_data() - self._correct_metadata() - Utils.nco().ncks(input=self.local_file, output=self.local_file, options=['--fix_rec_dmn region']) - handler = Utils.open_cdf(self.local_file) - regions = handler.variables['region'][...].tolist() - if len(regions) > 1: - ordered_regions = sorted(regions) - new_indexes = [regions.index(region) for region in ordered_regions] - - for var in handler.variables.values(): - if 'region' not in var.dimensions: - continue - index_region = var.dimensions.index('region') - var_values = var[...] - var_ordered = np.take(var_values, new_indexes, index_region) - var[...] = var_ordered - handler.close() - - def _update_var_with_region_data(self): - temp = TempFile.get() - shutil.copyfile(self.remote_file, temp) - handler = Utils.open_cdf(temp) - var_handler = handler.variables[self.final_name] - var_type = var_handler.dtype - handler.close() - self._fix_values_metadata(var_type, temp) - - Utils.nco().ncks(input=temp, output=temp, options=['--mk_rec_dmn region']) - cubes = iris.load(self.local_file) - for cube in cubes: - if self.final_name == cube.var_name: - value = cube - break - for index_region, region in enumerate(value.coord('region').points): - handler = Utils.open_cdf(temp) - region_slice = value.data[...] - original_regions = handler.variables['region'][...] - str_regions = [] - for x in range(original_regions.shape[0]): - str_regions.append(''.join( - [str(value) for value in original_regions[x, ...] - if value != '-'] - )) - Log.debug(str(str_regions)) - var = handler.variables[self.final_name] - if region in str_regions: - region_index = str_regions.index(region) - else: - region_index = original_regions.shape[0] - handler.variables['region'][region_index, ...] = netCDF4.stringtoarr(region, 50) - indices = list() - for dim in var.dimensions: - if dim == 'region': - indices.append(region_index) - else: - indices.append(slice(None)) - var[indices] = region_slice - handler.close() - - # handler.close() - Utils.move_file(temp, self.local_file) - - def _add_region_dimension_to_var(self): - handler = Utils.open_cdf(self.local_file) - if 'region' in handler.variables: - handler.close() + cube = iris.load_cube(self.local_file) + try: + cube.coord('region') + except iris.exceptions.CoordinateNotFoundError: return - handler.createDimension('region') - handler.createDimension('region_length', 50) - var_region = handler.createVariable('region', 'S1', ('region', 'region_length')) - var_region[0, ...] = netCDF4.stringtoarr(self.region.name, 50) - original_var = handler.variables[self.final_name] - new_var = handler.createVariable( - 'new_var', - original_var.datatype, - original_var.dimensions + ('region',), + try: + old_cube = iris.load_cube(self.remote_file) + except Exception: + old_cube = iris.load_cube( + self.remote_file.replace('/cmorfiles/', '/diags/') + ) + new_data = {} + for region_slice in cube.slices_over('region'): + Log.debug(region_slice.coord('region').points[0]) + new_data[region_slice.coord('region').points[0]] = region_slice + for region_slice in old_cube.slices_over('region'): + region = region_slice.coord('region').points[0] + Log.debug(region) + if region not in new_data: + new_data[region] = region_slice + cube_list = iris.cube.CubeList( + [new_data[region] for region in sorted(new_data)] ) - new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) - new_var.coordinates = new_var.coordinates + ' region' - value = original_var[:] - new_var[..., 0] = value - handler.close() - Utils.nco().ncks(input=self.local_file, output=self.local_file, options=('-x -v {0}'.format(self.final_name),)) - Utils.rename_variable(self.local_file, 'new_var', self.final_name) + if len(cube_list) == 1: + return + iris.experimental.equalise_cubes.equalise_attributes(cube_list) + final_cube = cube_list.merge_cube() + temp = TempFile.get() + iris.save(final_cube, temp, zlib=True) + Utils.move_file(temp, self.local_file) + self._correct_metadata() def _rename_coordinate_variables(self): variables = dict() @@ -518,7 +465,8 @@ class DataFile(Publisher): def add_cmorization_history(self): """Add the history line corresponding to the cmorization to the local file""" from earthdiagnostics.earthdiags import EarthDiags - history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version) + history_line = 'CMORized with Earthdiagnostics version {0}'.format( + EarthDiags.version) self._add_history_line(history_line) def _add_history_line(self, history_line): @@ -564,7 +512,8 @@ class UnitConversion(object): for line in reader: if line[0] == 'original': continue - cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) + cls.add_conversion(UnitConversion( + line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): @@ -574,7 +523,8 @@ class UnitConversion(object): :param conversion: conversion to add :type conversion: UnitConversion """ - cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion + cls._dict_conversions[( + conversion.source, conversion.destiny)] = conversion @classmethod def get_conversion_factor_offset(cls, input_units, output_units): @@ -677,36 +627,17 @@ class NetCDFFile(DataFile): Log.error('File {0} not available: {1}', path, ex) self.local_status = LocalStatus.FAILED return - Log.error('File {0} not available: {1}', self.remote_file, 'FileNotFound') + Log.error('File {0} not available: {1}', + self.remote_file, 'FileNotFound') self.local_status = LocalStatus.FAILED - def check_is_in_storage(self): + def check_is_in_storage(self, update_status=True): for path in (self.remote_file, self.remote_file.replace('/cmorfiles/', '/diags/')): if os.path.isfile(path): - if self.region: - try: - cubes = iris.load(path) - self._check_regions(cubes) - except iris.exceptions.TranslationError as ex: - # If the check goes wrong, we must execute everything - os.remove(path) - except Exception as ex: - Log.debug('Exception when checking file {0}: {1}', path, ex) - else: + if update_status: self.storage_status = StorageStatus.READY - return - - def _check_regions(self, cubes): - for cube in cubes: - try: - if isinstance(self.region, six.string_types): - regions = {self.region.name} - else: - regions = {basin.name for basin in self.region} - if regions.issubset(set(cube.coord('region').points)): - self.storage_status = StorageStatus.READY - except iris.exceptions.CoordinateNotFoundError: - pass + return True + return False def create_link(self): """Create a link from the original data in the _ folder""" @@ -714,7 +645,8 @@ class NetCDFFile(DataFile): self.data_convention.create_link(self.domain, self.remote_file, self.frequency, self.final_name, self.grid, True, self.var_type) except (ValueError, Exception) as ex: - Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file)) + Log.error('Can not create link to {1}: {0}'.format( + ex, self.remote_file)) def _get_size(self): try: diff --git a/earthdiagnostics/ocean/psi.py b/earthdiagnostics/ocean/psi.py index d415d831e8ede402a89c745622abbe090a26c2fb..e13a20bebadbb0080ae50d672fdc4fe82bee8c08 100644 --- a/earthdiagnostics/ocean/psi.py +++ b/earthdiagnostics/ocean/psi.py @@ -67,9 +67,6 @@ class Psi(Diagnostic): def __hash__(self): return hash(str(self)) - def __hash__(self): - return hash(str(self)) - @classmethod def generate_jobs(cls, diags, options): """ @@ -81,8 +78,7 @@ class Psi(Diagnostic): :type options: list[str] :return: """ - options_available = (DiagnosticBasinListOption('basins', - [Basins().Global]),) + options_available = (DiagnosticBasinListOption('basins', 'global'),) options = cls.process_options(options, options_available) basins = options['basins'] @@ -120,8 +116,8 @@ class Psi(Diagnostic): uo_cube = iris.load_cube(self.uo.local_file) vo_cube = iris.load_cube(self.vo.local_file) - uo = np.ma.filled(uo_cube.data, 0.0).astype(np.float32) - vo = np.ma.filled(vo_cube.data, 0.0).astype(np.float32) + uo = np.ma.filled(uo_cube.data, 0.0).astype(np.float32) + vo = np.ma.filled(vo_cube.data, 0.0).astype(np.float32) mesh = Nemo('mesh_hgr.nc', 'mask_regions.nc') e2u = mesh.get_j_length(cell_point='U') @@ -151,8 +147,8 @@ class Psi(Diagnostic): handler_temp.createDimension('region_length', 50) var_region = handler_temp.createVariable('region', 'S1', ('region', 'region_length')) - var = handler_temp.createVariable('vsftbarot', float, - ('time', 'j', 'i', 'region')) + var = handler_temp.createVariable( + 'vsftbarot', float, ('time', 'j', 'i', 'region')) var.units = 'm3/s' var.coordinates = ' '.join((lat_name, lon_name)) var.missing_value = 1e20 @@ -163,7 +159,7 @@ class Psi(Diagnostic): for i, basin in enumerate(result): var_region[i, ...] = netCDF4.stringtoarr(str(basin), 50) - result[basin].mask = self.masks[basin]<1 + result[basin].mask = self.masks[basin] < 1 var[..., i] = result[basin] handler_temp.close() self.psi.set_local_file(temp, diagnostic=self) @@ -177,8 +173,3 @@ class Psi(Diagnostic): handler.variables[var_name].coordinates = \ ' '.join(set(coordinates) | add_coordinates) handler.close() - - - - - diff --git a/earthdiagnostics/ocean/regionmean.py b/earthdiagnostics/ocean/regionmean.py index 50da4315c38118c9aa87e73ba282ac217c334e98..b05c1e1866514430869dfc65d40ce3fd9a37ac4c 100644 --- a/earthdiagnostics/ocean/regionmean.py +++ b/earthdiagnostics/ocean/regionmean.py @@ -185,7 +185,7 @@ class RegionMean(Diagnostic): ) e3 = e3.extract(depth_constraint) data = data.extract(depth_constraint) - volcello = areacello*e3.data.astype(np.float32) + volcello = areacello * e3.data.astype(np.float32) mean = regmean.compute_regmean_3D(data.data, masks, volcello) self._save_result_2D('mean', mean, data) if self.save3d: diff --git a/earthdiagnostics/ocean/siasiesiv.py b/earthdiagnostics/ocean/siasiesiv.py index dd13da4eef922e1c3834015c55ce16f747a79efa..0b81ff6c254c2f86af5fa9cefe518e021e85f64c 100644 --- a/earthdiagnostics/ocean/siasiesiv.py +++ b/earthdiagnostics/ocean/siasiesiv.py @@ -210,7 +210,7 @@ class Siasiesiv(Diagnostic): else: var_res.units = 'm^2' for i, basin in enumerate(self.masks): - if not np.all(res[i, ...]==0): + if not np.all(res[i, ...] == 0): var_region[i, ...] = netCDF4.stringtoarr(str(basin), 50) var_res[..., i] = res[i, ...] handler_temp.close() diff --git a/src/mixdiags b/src/mixdiags deleted file mode 160000 index 199979700e38d3918a82bd2052855d46375e48ab..0000000000000000000000000000000000000000 --- a/src/mixdiags +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 199979700e38d3918a82bd2052855d46375e48ab diff --git a/test/integration/test_cmorizer.py b/test/integration/test_cmorizer.py index a9eb3130ac8f31caa894688c227a2cce1dfb8b4b..b6b35e4a6ec4c95911f7e09a385cf58f025e2824 100644 --- a/test/integration/test_cmorizer.py +++ b/test/integration/test_cmorizer.py @@ -287,7 +287,7 @@ class TestCmorizer(TestCase): """Test ocean cmorization fails if a bad filter is added""" self._create_ocean_files('expid_1d_19900101_19900131.nc', 'MMO_19900101-19900131.tar') self.data_manager.config.cmor.filter_files = ['badfilter'] - self._test_ocean_cmor(error==True, check_vars={'var1': False, 'var2': False}) + self._test_ocean_cmor(success=False, error=True, check_vars={'var1': False, 'var2': False}) def test_ocean_cmorization_gzip(self): """Test ocean cmorization if tars are also zipped""" diff --git a/test/unit/ocean/test_heatcontentlayer.py b/test/unit/ocean/test_heatcontentlayer.py index c46d657597b74be569dd883590a9d1af3d8eaa0b..6000a732539b79510e3888b53a70867e1f76957b 100644 --- a/test/unit/ocean/test_heatcontentlayer.py +++ b/test/unit/ocean/test_heatcontentlayer.py @@ -12,7 +12,9 @@ class TestHeatContentLayer(TestCase): self.diags = Mock() self.diags.model_version = 'model_version' - self.diags.config.experiment.get_chunk_list.return_value = (('20010101', 0, 0), ('20010101', 0, 1)) + self.diags.config.experiment.get_chunk_list.return_value = ( + ('20010101', 0, 0), ('20010101', 0, 1) + ) self.weight = Mock() @@ -21,5 +23,10 @@ class TestHeatContentLayer(TestCase): self.box.max_depth = 100 def test_str(self): - diag = HeatContentLayer(self.data_manager, '20000101', 1, 1, self.box, self.weight, 0, 10, None, Mock(), 0, 0) - self.assertEqual(str(diag), 'Heat content layer Startdate: 20000101 Member: 1 Chunk: 1 Box: 0-100m') + diag = HeatContentLayer( + self.data_manager, '20000101', 1, 1, self.box, self.weight, 0, 10, None, Mock(), 0, 0 + ) + self.assertEqual( + str(diag), + 'Heat content layer Startdate: 20000101 Member: 1 Chunk: 1 Box: 0-100m' + ) diff --git a/test/unit/ocean/test_psi.py b/test/unit/ocean/test_psi.py index 1bdf597ceeb5a8be9161d85e96487e85a48badf6..e80ffae41400868ef2953c59cb761d80b2e3dae7 100644 --- a/test/unit/ocean/test_psi.py +++ b/test/unit/ocean/test_psi.py @@ -1,7 +1,10 @@ # coding=utf-8 from unittest import TestCase +from mock import Mock, patch + +from earthdiagnostics.constants import Basins from earthdiagnostics.ocean.psi import Psi -from mock import Mock +from earthdiagnostics.utils import Utils class TestPsi(TestCase): @@ -10,16 +13,26 @@ class TestPsi(TestCase): self.data_manager = Mock() self.diags = Mock() self.diags.config.experiment.get_chunk_list.return_value = (('20010101', 0, 0), ('20010101', 0, 1)) - self.psi = Psi(self.data_manager, '20000101', 1, 1) + @staticmethod + def fake_get(basin): + return None + + @patch.object(Utils, 'get_mask', fake_get) def test_generate_jobs(self): jobs = Psi.generate_jobs(self.diags, ['diagnostic']) self.assertEqual(len(jobs), 2) - self.assertEqual(jobs[0], Psi(self.data_manager, '20010101', 0, 0)) - self.assertEqual(jobs[1], Psi(self.data_manager, '20010101', 0, 1)) + self.assertEqual(jobs[0], Psi(self.data_manager, '20010101', 0, 0, {Basins().Global: None})) + self.assertEqual(jobs[1], Psi(self.data_manager, '20010101', 0, 1, {Basins().Global: None})) + + jobs = Psi.generate_jobs(self.diags, ['diagnostic', 'atl']) + self.assertEqual(len(jobs), 2) + self.assertEqual(jobs[0], Psi(self.data_manager, '20010101', 0, 0, {Basins().Atlantic: None})) + self.assertEqual(jobs[1], Psi(self.data_manager, '20010101', 0, 1, {Basins().Atlantic: None})) with self.assertRaises(Exception): - Psi.generate_jobs(self.diags, ['diagnostic', 'badoption']) + Psi.generate_jobs(self.diags, ['diagnostic', 'atl', 'badoption']) def test_str(self): - self.assertEqual(str(self.psi), 'PSI Startdate: 20000101 Member: 1 Chunk: 1') + psi = Psi(self.data_manager, '20000101', 1, 1, {Basins().Global: None}) + self.assertEqual(str(psi), 'PSI Startdate: 20000101 Member: 1 Chunk: 1 Basins: Global') diff --git a/test/unit/ocean/test_region_mean.py b/test/unit/ocean/test_region_mean.py index eba2e84be98c22a9983c8708dac3f978518c2973..fb554abf0967fbe3341ca8c3a7677cd37c9f72c0 100644 --- a/test/unit/ocean/test_region_mean.py +++ b/test/unit/ocean/test_region_mean.py @@ -33,7 +33,6 @@ class TestRegionMean(TestCase): @patch.object(DiagnosticVariableListOption, 'parse', fake_parse) @patch.object(TempFile, 'get', fake_get) def test_generate_jobs(self): - box = Box() box.min_depth = -1 box.max_depth = -1 @@ -77,7 +76,7 @@ class TestRegionMean(TestCase): jobs = RegionMean.generate_jobs( self.diags, ['diagnostic', 'ocean', ['var'], 'global', 'U', - '1', '10', '', '', '', '', 'false'] + '1', '10', 'false'] ) self.assertEqual(len(jobs), 2) self.assertEqual(jobs[0], RegionMean(self.data_manager, '20010101', 0, 0, ModelingRealms.ocean, 'var', @@ -88,7 +87,7 @@ class TestRegionMean(TestCase): jobs = RegionMean.generate_jobs( self.diags, ['diagnostic', 'ocean', ['var'], 'global', 'U', - '1', '10', '', '', '', '', 'false', 'True'] + '1', '10', 'false', 'True'] ) self.assertEqual(len(jobs), 2) self.assertEqual(jobs[0], RegionMean(self.data_manager, '20010101', 0, 0, ModelingRealms.ocean, 'var', @@ -99,7 +98,7 @@ class TestRegionMean(TestCase): jobs = RegionMean.generate_jobs( self.diags, ['diagnostic', 'ocean', ['var'], 'global', 'U', '1', - '10', '', '', '', '', 'false', 'True', 'grid'] + '10', 'false', 'True', 'grid'] ) self.assertEqual(len(jobs), 2) self.assertEqual(jobs[0], RegionMean(self.data_manager, '20010101', 0, 0, ModelingRealms.ocean, 'var', @@ -107,14 +106,25 @@ class TestRegionMean(TestCase): self.assertEqual(jobs[1], RegionMean(self.data_manager, '20010101', 0, 1, ModelingRealms.ocean, 'var', box, False, Basins().Global, True, 'grid', Frequencies.monthly)) + jobs = RegionMean.generate_jobs( + self.diags, + ['diagnostic', 'ocean', ['var'], 'global', 'U', '1', + '10', 'false', 'True', 'grid', 'day'] + ) + self.assertEqual(len(jobs), 2) + self.assertEqual(jobs[0], RegionMean(self.data_manager, '20010101', 0, 0, ModelingRealms.ocean, 'var', + box, False, Basins().Global, True, 'grid', Frequencies.daily)) + self.assertEqual(jobs[1], RegionMean(self.data_manager, '20010101', 0, 1, ModelingRealms.ocean, 'var', + box, False, Basins().Global, True, 'grid', Frequencies.daily)) + with self.assertRaises(DiagnosticOptionError): RegionMean.generate_jobs(self.diags, ['diagnostic']) with self.assertRaises(DiagnosticOptionError): RegionMean.generate_jobs( self.diags, - ['diagnostic', 'ocean', ['var'], 'global', 'U', '1', '10', '', '', '', '', 'false', - 'True', 'grid', 'extra'] + ['diagnostic', 'ocean', ['var'], 'global', 'U', '1', '10', 'false', + 'True', 'grid', 'day', 'extra'] ) def test_str(self):