diff --git a/earthdiagnostics/cmorizer.py b/earthdiagnostics/cmorizer.py index edb17b206e1ed96384046d04046fc919e3961753..a95984c6671442a1d1073f16052e5efd1b95fdd1 100644 --- a/earthdiagnostics/cmorizer.py +++ b/earthdiagnostics/cmorizer.py @@ -98,24 +98,22 @@ class Cmorizer(object): count = 1 result = True for tarfile in tar_files: - if not self._cmorization_required(self._get_chunk(os.path.basename(tarfile)), (ModelingRealms.ocean, - ModelingRealms.seaIce, - ModelingRealms.ocnBgchem)): + if not self._cmorization_required( + self._get_chunk(os.path.basename(tarfile)), + (ModelingRealms.ocean, ModelingRealms.seaIce,ModelingRealms.ocnBgchem) + ): Log.info('No need to unpack file {0}/{1}'.format(count, len(tar_files))) count += 1 - continue - - Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files))) - try: - self._unpack_tar_file(tarfile) - self._cmorize_nc_files() - Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files))) - except Exception as ex: - Log.error('Could not CMORize oceanic file {0}: {1}', count, ex) - result = False - count += 1 - if count > self.experiment.num_chunks: - return result + else: + Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files))) + try: + self._unpack_tar_file(tarfile) + self._cmorize_nc_files() + Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files))) + except Exception as ex: + Log.error('Could not CMORize oceanic file {0}: {1}', count, ex) + result = False + return result def _filter_files(self, file_list): if not self.cmor.filter_files: @@ -412,6 +410,7 @@ class Cmorizer(object): if variable in Cmorizer.NON_DATA_VARIABLES: continue try: + Log.debug('Checking variable {0}', variable) self.extract_variable(filename, frequency, variable) except Exception as ex: Log.error('Variable {0} can not be cmorized: {1}', variable, ex) diff --git a/earthdiagnostics/data_convention.py b/earthdiagnostics/data_convention.py index 054a64d3854f1f6452a224249f7e84e2cb44f528..77188e70ed62fe8562f89e031d1920270d1e0621 100644 --- a/earthdiagnostics/data_convention.py +++ b/earthdiagnostics/data_convention.py @@ -266,11 +266,17 @@ class DataConvention(object): link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): - os.remove(link_path) + try: + os.remove(link_path) + except OSError: + pass if not os.path.exists(filepath): raise ValueError('Original file {0} does not exists'.format(filepath)) relative_path = os.path.relpath(filepath, os.path.dirname(link_path)) - os.symlink(relative_path, link_path) + try: + os.symlink(relative_path, link_path) + except OSError: + pass except Exception: raise finally: diff --git a/earthdiagnostics/datafile.py b/earthdiagnostics/datafile.py index e2fe921c925746171d292edb1e43aaf46ee43a37..668316f77cf600cdb225aaf41dbeac23ac254427 100644 --- a/earthdiagnostics/datafile.py +++ b/earthdiagnostics/datafile.py @@ -11,6 +11,7 @@ import iris.coords import iris.exceptions import numpy as np from bscearth.utils.log import Log +import netCDF4 from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile @@ -246,7 +247,6 @@ class DataFile(Publisher): self.lon_name = 'lon' self.lat_name = 'lat' - Utils.convert2netcdf4(self.local_file) if rename_var: original_name = rename_var else: @@ -257,6 +257,7 @@ class DataFile(Publisher): self._correct_metadata() self._prepare_region() self.add_diagnostic_history() + Utils.convert2netcdf4(self.local_file) if self.region is not None: self.upload() @@ -403,9 +404,8 @@ class DataFile(Publisher): def _prepare_region(self): if not self.region: return - if not os.path.exists(self.remote_file): - self._add_region_dimension_to_var() - else: + self._add_region_dimension_to_var() + if os.path.exists(self.remote_file): self._update_var_with_region_data() self._correct_metadata() Utils.nco().ncks(input=self.local_file, output=self.local_file, options=['--fix_rec_dmn region']) @@ -439,24 +439,30 @@ class DataFile(Publisher): if self.final_name == cube.var_name: value = cube break - for index_region, region in enumerate(value.coord('region').points): handler = Utils.open_cdf(temp) - region_slice = value.data[index_region, ...] + region_slice = value.data[...] original_regions = handler.variables['region'][...] + str_regions = [] + for x in range(original_regions.shape[0]): + str_regions.append(''.join( + [str(value) for value in original_regions[x, ...] + if value != '-'] + )) + Log.debug(str(str_regions)) var = handler.variables[self.final_name] - if region in original_regions: - indices = list() - region_index = np.where(original_regions == region)[0][0] - for dim in var.dimensions: - if dim == 'region': - indices.append(region_index) - else: - indices.append(slice(None)) - var[indices] = region_slice + if region in str_regions: + region_index = str_regions.index(region) else: - var[original_regions.shape[0], ...] = region_slice - handler.variables[-1] = region + region_index = original_regions.shape[0] + handler.variables['region'][region_index, ...] = netCDF4.stringtoarr(region, 50) + indices = list() + for dim in var.dimensions: + if dim == 'region': + indices.append(region_index) + else: + indices.append(slice(None)) + var[indices] = region_slice handler.close() # handler.close() @@ -468,12 +474,17 @@ class DataFile(Publisher): handler.close() return handler.createDimension('region') - var_region = handler.createVariable('region', str, 'region') - var_region[0] = self.region + handler.createDimension('region_length', 50) + var_region = handler.createVariable('region', 'S1', ('region', 'region_length')) + var_region[0, ...] = netCDF4.stringtoarr(self.region.name, 50) original_var = handler.variables[self.final_name] - new_var = handler.createVariable('new_var', original_var.datatype, - original_var.dimensions + ('region',)) + new_var = handler.createVariable( + 'new_var', + original_var.datatype, + original_var.dimensions + ('region',), + ) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) + new_var.coordinates = new_var.coordinates + ' region' value = original_var[:] new_var[..., 0] = value handler.close() @@ -660,19 +671,18 @@ class NetCDFFile(DataFile): self.local_status = LocalStatus.FAILED def check_is_in_storage(self): - - if os.path.isfile(self.remote_file): - if self.region: - try: - cubes = iris.load(self.remote_file) - self._check_regions(cubes) - except iris.exceptions.TranslationError as ex: - # If the check goes wrong, we must execute everything - os.remove(self.remote_file) - except Exception as ex: - Log.debug('Exception when checking file {0}: {1}', self.remote_file, ex) - else: - self.storage_status = StorageStatus.READY + if os.path.isfile(self.remote_file): + if self.region: + try: + cubes = iris.load(self.remote_file) + self._check_regions(cubes) + except iris.exceptions.TranslationError as ex: + # If the check goes wrong, we must execute everything + os.remove(self.remote_file) + except Exception as ex: + Log.debug('Exception when checking file {0}: {1}', self.remote_file, ex) + else: + self.storage_status = StorageStatus.READY def _check_regions(self, cubes): for cube in cubes: diff --git a/earthdiagnostics/diagnostic.py b/earthdiagnostics/diagnostic.py index 86c07a62eeb33243bb867d16fbe67d48ee69e353..7874afc8e2700403f1c6dab206a1911fc32b8ae1 100644 --- a/earthdiagnostics/diagnostic.py +++ b/earthdiagnostics/diagnostic.py @@ -313,7 +313,7 @@ class Diagnostic(Publisher): self.subjobs.append(subjob) subjob.subscribe(self, self._subjob_status_changed) - def _subjob_status_changed(self, job): + def _subjob_status_changed(self, job, status): self.check_is_ready() def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, diff --git a/earthdiagnostics/ocean/regionmean.py b/earthdiagnostics/ocean/regionmean.py index 12121f525154fb497ec01369db12e089e1ffbdc1..5ca7c3d32194e9ea462d57c78915c1e8f5b85a9e 100644 --- a/earthdiagnostics/ocean/regionmean.py +++ b/earthdiagnostics/ocean/regionmean.py @@ -44,8 +44,8 @@ class RegionMean(Diagnostic): alias = 'regmean' "Diagnostic alias for the configuration file" - def __init__(self, data_manager, startdate, member, chunk, domain, variable, box, save3d, weights_file, - variance, basin): + def __init__(self, data_manager, startdate, member, chunk, domain, variable, box, save3d, + variance, basin, grid_point): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member @@ -54,9 +54,9 @@ class RegionMean(Diagnostic): self.variable = variable self.box = box self.save3d = save3d - self.weights_file = weights_file self.variance = variance self.basin = basin + self.grid_point = grid_point self.declared = {} @@ -71,7 +71,7 @@ class RegionMean(Diagnostic): def __str__(self): return 'Region mean Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} Variable: {0.variable} ' \ - 'Box: {0.box} Save 3D: {0.save3d} Save variance: {0.variance}'.format(self) + 'Box: {0.box} Save 3D: {0.save3d} Save variance: {0.variance} Grid point: {0.grid_point}'.format(self) def __hash__(self): return hash(str(self)) @@ -102,16 +102,12 @@ class RegionMean(Diagnostic): box.min_depth = options['min_depth'] box.max_depth = options['max_depth'] - weights_file = TempFile.get() - weight_diagnostics = ComputeWeights(diags.data_manager, options['grid_point'], options['basin'], box, - weights_file) - job_list = list() for startdate, member, chunk in diags.config.experiment.get_chunk_list(): job = RegionMean(diags.data_manager, startdate, member, chunk, options['domain'], options['variable'], box, - options['save3D'], weights_file, options['variance'], options['basin']) - job.add_subjob(weight_diagnostics) + options['save3D'], options['variance'], options['basin'], + options['grid_point'].lower()) job_list.append(job) return job_list @@ -122,8 +118,7 @@ class RegionMean(Diagnostic): def declare_data_generated(self): """Declare data to be generated by the diagnostic""" - if self.box.min_depth == 0: - # To cdftools, this means all levels + if self.box.min_depth == -1 and self.box.max_depth == -1: box_save = None else: box_save = self.box @@ -144,53 +139,57 @@ class RegionMean(Diagnostic): data = self._load_data() - weights = iris.load_cube(self.weights_file, 'weights').data - i_indexes = iris.load_cube(self.weights_file, 'i_indexes').data - j_indexes = iris.load_cube(self.weights_file, 'j_indexes').data - lev_limits = iris.load_cube(self.weights_file, 'lev_limits').data - - def selected_i(cell): - return cell.point - 1 in i_indexes - - def selected_j(cell): - return cell.point - 1 in j_indexes - - def selected_level(cell): - return lev_limits[0] <= cell.point - 1 <= lev_limits[1] - - data = data.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level)) if has_levels: - self._meand_3d_variable(data, weights) + self._meand_3d_variable(data) else: - self._mean_2d_var(data, weights) + self._mean_2d_var(data) - def _mean_2d_var(self, data, weights): + def _mean_2d_var(self, data): mean = iris.cube.CubeList() var = iris.cube.CubeList() + mask = np.squeeze(Utils.get_mask(self.basin, True)) + if len(mask.shape) == 3: + mask = mask[0, ...] + + e1 = self._try_load_cube(1) + e2 = self._try_load_cube(2) + weights = e1 * e2 * mask for time_slice in data.slices_over('time'): - mean.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.MEAN, weights=weights)) - var.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.VARIANCE, weights=weights)) - self._send_var('mean', False, mean.merge_cube()) + mean.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.MEAN, weights=weights.data)) + if self.variance: + var.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.VARIANCE)) + self._send_var('mean', False, mean) if self.variance: - self._send_var('var', False, var.merge_cube()) + self._send_var('var', False, var) - def _meand_3d_variable(self, data, weights): + def _meand_3d_variable(self, data): mean = iris.cube.CubeList() mean3d = iris.cube.CubeList() var = iris.cube.CubeList() var3d = iris.cube.CubeList() + mask = np.squeeze(Utils.get_mask(self.basin, True)) + e1 = self._try_load_cube(1) + e2 = self._try_load_cube(2) + e3 = self._try_load_cube(3) + e3 = self._rename_depth(e3) + weights = e1 * e2 + weights = e3 * weights.data * mask + depth_constraint = iris.Constraint(depth=lambda c: self.box.min_depth <= c <= self.box.max_depth) + weights = weights.extract(depth_constraint).data + data = data.extract(depth_constraint) + for time_slice in data.slices_over('time'): mean.append(time_slice.collapsed(['latitude', 'longitude', 'depth'], - iris.analysis.MEAN, weights=weights)) + iris.analysis.MEAN, weights=weights)) if self.save3d: mean3d.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.MEAN, weights=weights)) if self.variance: var.append(time_slice.collapsed(['latitude', 'longitude', 'depth'], - iris.analysis.VARIANCE, weights=weights)) + iris.analysis.VARIANCE)) if self.save3d: var3d.append(time_slice.collapsed(['latitude', 'longitude'], - iris.analysis.VARIANCE, weights=weights)) + iris.analysis.VARIANCE)) self._send_var('mean', True, mean3d) self._send_var('mean', False, mean) if self.variance: @@ -198,43 +197,45 @@ class RegionMean(Diagnostic): self._send_var('var', False, var) - def _load_data(self): - def add_i_j(cube, field, filename): - if cube.var_name != self.variable: - raise iris.exceptions.IgnoreCubeException() - if not cube.coords('i'): - index = field.dimensions.index('i') - i = np.arange(1, field.shape[index] + 1) - i_coord = iris.coords.DimCoord(i, var_name='i') - cube.add_dim_coord(i_coord, index) - if not cube.coords('j'): - index = field.dimensions.index('j') - i = np.arange(1, field.shape[index] + 1) - i_coord = iris.coords.DimCoord(i, var_name='j') - cube.add_dim_coord(i_coord, index) - if not cube.coords('lev'): - index = field.dimensions.index('lev') - i = np.arange(1, field.shape[index] + 1) - lev = iris.coords.AuxCoord(i, var_name='lev') - cube.add_aux_coord(lev, index) + def _try_load_cube(self, number): + try: + cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}'.format(number, self.grid_point)) + except iris.exceptions.ConstraintMismatchError: + cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}_0'.format(number, self.grid_point)) + cube = iris.util.squeeze(cube) + dims = len(cube.shape) + try: + cube.coord('i') + except iris.exceptions.CoordinateNotFoundError: + cube.add_dim_coord(iris.coords.DimCoord(np.arange(cube.shape[dims - 1]), var_name='i'), dims - 1) + try: + cube.coord('j') + except iris.exceptions.CoordinateNotFoundError: + cube.add_dim_coord(iris.coords.DimCoord(np.arange(cube.shape[dims - 2]), var_name='j'), dims -2) + return cube + def _load_data(self): coords = [] handler = Utils.open_cdf(self.variable_file.local_file) for variable in handler.variables: - if variable in ('time', 'lev', 'lat', 'lon', 'latitude', 'longitude'): + if variable in ('time', 'lev', 'lat', 'lon', 'latitude', 'longitude', 'leadtime', 'time_centered'): coords.append(variable) + if variable == 'time_centered': + handler.variables[variable].standard_name = '' handler.variables[self.variable].coordinates = ' '.join(coords) handler.close() - data = iris.load_cube(self.variable_file.local_file, - callback=add_i_j) - - if data.coords('model_level_number'): - coord = data.coord('model_level_number') - coord.standard_name = 'depth' - coord.long_name = 'depth' + data = iris.load_cube(self.variable_file.local_file) + return self._rename_depth(data) + def _rename_depth(self, data): + for coord_name in ('model_level_number', 'Vertical T levels', 'lev'): + if data.coords(coord_name): + coord = data.coord(coord_name) + coord.standard_name = 'depth' + coord.long_name = 'depth' + break return data def _fix_file_metadata(self): @@ -270,127 +271,17 @@ class RegionMean(Diagnostic): else: final_name = '{1}{0}'.format(var, self.variable) cube = cube_list.merge_cube() - print(cube) - print(cube.data) cube.var_name = 'result' cube.remove_coord('latitude') cube.remove_coord('longitude') - cube.remove_coord('depth') - cube.remove_coord('lev') + try: + cube.remove_coord('depth') + except iris.exceptions.CoordinateNotFoundError: + pass + try: + cube.remove_coord('lev') + except iris.exceptions.CoordinateNotFoundError: + pass temp = TempFile.get() iris.save(cube, temp) self.declared[final_name].set_local_file(temp, diagnostic=self, rename_var='result', region=self.basin) - - -class ComputeWeights(Diagnostic): - """ - Diagnostic used to compute regional mean and sum weights - - Parameters - ---------- - data_manager: DataManager - grid_point: str - basin: int - weights_file: str - """ - - alias = 'computeregmeanweights' - "Diagnostic alias for the configuration file" - - @classmethod - def generate_jobs(cls, diags, options): - """ - Generate the instances of the diagnostics that will be run by the manager - - This method does not does anything as this diagnostic is not expected to be called by the users - """ - pass - - def __init__(self, data_manager, grid_point, basin, box, weights_file): - Diagnostic.__init__(self, data_manager) - self.weights_file = weights_file - self.basin = basin - self.grid_point = grid_point.lower() - self.box = box - - def __eq__(self, other): - if self._different_type(other): - return False - return self.weights_file == other.weights_file and self.basin == other.basin and \ - self.grid_point == other.grid_point and self.box != other.box - - def __str__(self): - return 'Computing weights for region averaging: Point {0.grid_point} Basin: {0.basin} Box: {0.box}'\ - .format(self) - - def __hash__(self): - return hash(str(self)) - - def compute(self): - """Compute weights""" - iris.FUTURE.netcdf_promote = True - iris.FUTURE.netcdf_no_unlimited = True - - mask = np.squeeze(Utils.get_mask(self.basin, True)) - surface_mask = mask[0, ...] - i_indexes = np.where(np.any(surface_mask != 0, 0))[0] - j_indexes = np.where(np.any(surface_mask != 0, 1))[0] - mask_small = np.take(np.take(mask, i_indexes, 2), j_indexes, 1) - - e1 = self._try_load_cube(1) - e2 = self._try_load_cube(2) - e3 = self._try_load_cube(3) - depth = iris.util.squeeze(iris.load_cube('mesh_hgr.nc', 'gdept_0')) - if self.box.min_depth == -1: - min_level = 0 - else: - distance = abs((depth - self.box.min_depth).data) - min_level = np.argmin(distance) - - if self.box.max_depth == -1: - max_level = depth.shape[0] - else: - distance = abs((depth - self.box.max_depth).data) - max_level = np.argmin(distance) - - def selected_i(cell): - return cell.point - 1 in i_indexes - - def selected_j(cell): - return cell.point - 1 in j_indexes - - def selected_level(cell): - return min_level <= cell.point - 1 <= max_level - - e1_small = e1.extract(iris.Constraint(i=selected_i, j=selected_j)) - e2_small = e2.extract(iris.Constraint(i=selected_i, j=selected_j)) - e3_small = e3.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level)) - mask_small = mask_small[min_level:max_level + 1, ...] - - mask_small = e3_small * mask_small - e_small = e1_small * e2_small - for coord in e_small.coords(): - e_small.remove_coord(coord) - for coord in mask_small.coords(): - mask_small.remove_coord(coord) - weights = mask_small * e_small - weights.var_name = 'weights' - i_indexes = iris.cube.Cube(i_indexes, var_name='i_indexes') - j_indexes = iris.cube.Cube(j_indexes, var_name='j_indexes') - lev_limits = iris.cube.Cube([min_level, max_level], var_name='lev_limits') - iris.save((weights, i_indexes, j_indexes, lev_limits), self.weights_file) - - def _try_load_cube(self, number): - try: - cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}'.format(number, self.grid_point)) - except iris.exceptions.ConstraintMismatchError: - cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}_0'.format(number, self.grid_point)) - return iris.util.squeeze(cube) - - def request_data(self): - """Request data required by the diagnostic""" - pass - - def declare_data_generated(self): - """Declare data to be generated by the diagnostic""" - pass diff --git a/earthdiagnostics/utils.py b/earthdiagnostics/utils.py index 2f1de662b3aaf381f280607b36f35733c8e836b8..aeea82ee6263eb0251cf01f5b30692fd8d464c62 100644 --- a/earthdiagnostics/utils.py +++ b/earthdiagnostics/utils.py @@ -704,7 +704,7 @@ class Utils(object): destiny.createDimension(new_name, source.dimensions[dimension].size) if dimension in source.variables: Utils.copy_variable(source, destiny, dimension, - new_names=new_names, rename_dimension=rename_dimension) + new_names=new_names, rename_dimension=rename_dimension, add_dimensions=True) @staticmethod def concat_variables(source, destiny, remove_source=False): diff --git a/earthdiagnostics/variable.py b/earthdiagnostics/variable.py index 30c483c00fd686387214928773341035033884b4..f86710b9426d58a87e915ae3ea0aebaaeb44cf8a 100644 --- a/earthdiagnostics/variable.py +++ b/earthdiagnostics/variable.py @@ -233,8 +233,6 @@ class VariableManager(object): @staticmethod def _get_aliases(line): aliases = line[0].split(':') - if line[1] not in aliases: - aliases.append(line[1]) return aliases def _register_aliases(self, aliases, cmor_var, line): @@ -258,6 +256,8 @@ class VariableManager(object): """Create aliases dictionary for the registered variables""" self._dict_aliases = {} for cmor_var_name in self._dict_variables: + if cmor_var_name == 'tos': + pass cmor_var = self._dict_variables[cmor_var_name] base_alias = VariableAlias(cmor_var_name) if base_alias not in cmor_var.known_aliases: diff --git a/earthdiagnostics/work_manager.py b/earthdiagnostics/work_manager.py index 6b7a5ad53941c046b61e97bccdc406d5dff0fac6..2f9640dcab35a5f9534cbb17600131a03b513c64 100644 --- a/earthdiagnostics/work_manager.py +++ b/earthdiagnostics/work_manager.py @@ -50,7 +50,6 @@ class WorkManager(object): for fulldiag in self.config.get_commands(): Log.info("Adding {0} to diagnostic list", fulldiag) diag_options = fulldiag.split(',') - diag_class = Diagnostic.get_diagnostic(diag_options[0]) if diag_class: try: @@ -58,7 +57,6 @@ class WorkManager(object): self.add_job(job) for subjob in job.subjobs: self.add_job(subjob) - continue except DiagnosticOptionError as ex: Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex) self.had_errors = True