From f0dc9eadeecfe4a2f76a2c8434742e854ca82ac9 Mon Sep 17 00:00:00 2001 From: Javier Vegas-Regidor Date: Fri, 9 Jul 2021 09:25:28 +0200 Subject: [PATCH] Remove references to region and remove cmorizer --- earthdiagnostics/cmorizer.py | 1129 -------------------------- earthdiagnostics/cmormanager.py | 5 - earthdiagnostics/datafile.py | 69 +- earthdiagnostics/datamanager.py | 4 - earthdiagnostics/diagnostic.py | 7 - earthdiagnostics/obsreconmanager.py | 3 - earthdiagnostics/ocean/regionmean.py | 1 - earthdiagnostics/threddsmanager.py | 2 - test/integration/__init__.py | 0 test/integration/test_cmorizer.py | 727 ----------------- test/unit/test_cmormanager.py | 24 - 11 files changed, 1 insertion(+), 1970 deletions(-) delete mode 100644 earthdiagnostics/cmorizer.py delete mode 100644 test/integration/__init__.py delete mode 100644 test/integration/test_cmorizer.py diff --git a/earthdiagnostics/cmorizer.py b/earthdiagnostics/cmorizer.py deleted file mode 100644 index 6cb10a3d..00000000 --- a/earthdiagnostics/cmorizer.py +++ /dev/null @@ -1,1129 +0,0 @@ -# coding=utf-8 -"""Cmorization classes""" -import glob -import os -import shutil -import uuid -import traceback -import time -from datetime import datetime - -import six -from bscearth.utils.date import ( - parse_date, - chunk_end_date, - previous_day, - date2str, - add_months, -) -from bscearth.utils.log import Log -from cdo import CDOException -import iris -from iris.coords import DimCoord -import iris.coord_categorisation -import iris.analysis -import iris.util -import iris.exceptions -import cf_units - -from earthdiagnostics.datafile import NetCDFFile -from earthdiagnostics.frequency import Frequency, Frequencies -from earthdiagnostics.modelingrealm import ModelingRealms -from earthdiagnostics.utils import TempFile, Utils - - -class Cmorizer(object): - """ - Class to manage CMORization - - Parameters - ---------- - data_manager: DataManager - startdate: str - member: int - - """ - - NON_DATA_VARIABLES = ( - "lon", - "lat", - "longitude", - "latitude", - "plev", - "time", - "time_bnds", - "leadtime", - "lev", - "lev_2", - "icethi", - "deptht", - "depthu", - "depthw", - "depthv", - "time_centered", - "time_centered_bounds", - "deptht_bounds", - "depthu_bounds", - "depthv_bounds", - "depthw_bounds", - "deptht_bnds", - "depthu_bnds", - "depthv_bnds", - "depthw_bnds", - "time_counter_bounds", - "ncatice", - "nav_lat_grid_V", - "nav_lat_grid_U", - "nav_lat_grid_T", - "nav_lon_grid_V", - "nav_lon_grid_U", - "nav_lon_grid_T", - "depth", - "depth_2", - "depth_3", - "depth_4", - "depth_bnds", - "depth_2_bnds", - "depth_3_bnds", - "depth_4_bnds", - "mlev", - "hyai", - "hybi", - "hyam", - "hybm", - ) - - def __init__(self, data_manager, startdate, member): - self.data_manager = data_manager - - self.startdate = startdate - self.member = member - self.config = data_manager.config - self.experiment = self.config.experiment - self.cmor = self.config.cmor - self.convetion = self.config.data_convention - self.member_str = self.experiment.get_member_str(member) - self.original_files_path = os.path.join( - self.config.data_dir, - self.experiment.expid, - "original_files", - self.startdate, - self.member_str, - "outputs", - ) - self.atmos_timestep = None - self.cmor_scratch = str( - os.path.join( - self.config.scratch_dir, - "CMOR", - self.startdate, - self.member_str, - ) - ) - - self.lon_name = self.config.data_convention.lon_name - self.lat_name = self.config.data_convention.lat_name - - self.alt_coord_names = { - "time_counter": "time", - "time_counter_bnds": "time_bnds", - "time_counter_bounds": "time_bnds", - "tbnds": "bnds", - "nav_lat": self.lat_name, - "nav_lon": self.lon_name, - "x": "i", - "y": "j", - } - - @property - def path_icm(self): - """Path to the ICM file""" - return os.path.join(self.config.scratch_dir, "ICM") - - def cmorize_ocean(self): - """Cmorize ocean files from MMO files""" - if not self.cmor.ocean: - Log.info("Skipping ocean cmorization due to configuration") - return True - Log.info("\nCMORizing ocean\n") - return self._cmorize_ocean_files("MMO", "PPO", "diags") - - def _cmorize_ocean_files(self, *args): - tar_files = () - for prefix in args: - tar_folder = os.path.join( - self.original_files_path, "{0}*".format(prefix) - ) - tar_files = glob.glob(tar_folder) - tar_files.sort() - if len(tar_files) > 0: - break - if not len(tar_files): - Log.error( - "No {1} files found in {0}".format( - self.original_files_path, args - ) - ) - return False - - count = 1 - result = True - for tarfile in tar_files: - if not self._cmorization_required( - self._get_chunk(os.path.basename(tarfile)), - ( - ModelingRealms.ocean, - ModelingRealms.seaIce, - ModelingRealms.ocnBgchem, - ), - ): - Log.info( - "No need to unpack file {0}/{1}".format( - count, len(tar_files) - ) - ) - count += 1 - else: - Log.info( - "Unpacking oceanic file {0}/{1}".format( - count, len(tar_files) - ) - ) - try: - self._unpack_tar_file(tarfile) - self._cmorize_nc_files() - Log.result( - "Oceanic file {0}/{1} finished".format( - count, len(tar_files) - ) - ) - count += 1 - except Exception as ex: - Log.error( - "Could not CMORize oceanic file {0}: {1}", count, ex - ) - result = False - count += 1 - return result - - def _filter_files(self, file_list): - if not self.cmor.filter_files: - return file_list - filtered = list() - for file_path in file_list: - filename = os.path.basename(file_path) - if any(f in filename for f in self.cmor.filter_files): - filtered.append(file_path) - else: - self._remove(file_path) - if not filtered: - raise CMORException( - f"Filters {self.cmor.filter_files} do not match any file", - ) - return filtered - - def _remove(self, file_path): - os.remove(file_path) - - def _cmorize_nc_files(self): - nc_files = glob.glob(os.path.join(self.cmor_scratch, "*.nc")) - for filename in self._filter_files(nc_files): - self._cmorize_nc_file(filename) - self._clean_cmor_scratch() - - def _correct_fluxes(self): - fluxes_vars = [ - self.data_manager.variable_list.get_variable( - cmor_var, True - ).short_name - for cmor_var in ( - "prc", - "prs", - "prsn", - "rss", - "rls", - "rsscs", - "rsds", - "rlds", - "hfss", - "hfls", - ) - ] - change_sign_vars = [ - self.data_manager.variable_list.get_variable( - cmor_var, True - ).short_name - for cmor_var in ("hfss", "hfls") - ] - total_seconds = self.experiment.atmos_timestep * 3600 - for filename in glob.glob(os.path.join(self.cmor_scratch, "*.nc")): - handler = Utils.open_cdf(filename) - for varname in handler.variables.keys(): - cmor_var = self.data_manager.variable_list.get_variable( - varname, True - ) - - if cmor_var is None or cmor_var.short_name not in fluxes_vars: - continue - - if cmor_var.short_name in change_sign_vars: - sign = -1 - else: - sign = 1 - - var_handler = handler.variables[varname] - var_handler[:] = sign * var_handler[:] / total_seconds - var_handler.units = "{0} {1}".format(var_handler.units, "s-1") - handler.close() - - def _unpack_tar_file(self, tarfile): - self._clean_cmor_scratch() - os.makedirs(self.cmor_scratch) - Utils.untar( - (tarfile,), self.cmor_scratch, self.config.cmor.filter_files - ) - if os.path.isdir(os.path.join(self.cmor_scratch, "backup")): - for filepath in glob.glob( - os.path.join(self.cmor_scratch, "backup", "*") - ): - Log.debug("Moving file {0}", filepath) - shutil.move(filepath, filepath.replace("/backup/", "/")) - zip_files = glob.glob(os.path.join(self.cmor_scratch, "*.gz")) - if zip_files: - for zip_file in self._filter_files(zip_files): - try: - Utils.unzip(zip_file) - except Utils.UnzipException as ex: - Log.error( - "File {0} could not be unzipped: {1}", tarfile, ex - ) - - def _clean_cmor_scratch(self): - if os.path.exists(self.cmor_scratch): - time.sleep(2) - shutil.rmtree(self.cmor_scratch) - - def _merge_mma_files(self, tarfile): - temp = TempFile.get() - for grid in ["SH", "GG"]: - files = glob.glob( - os.path.join(self.cmor_scratch, "MMA_*_{}_*.nc".format(grid)) - ) - if not files: - continue - merged = TempFile.get() - if grid == "SH": - for filename in files: - Utils.cdo().sp2gpl( - options="-O", input=filename, output=temp - ) - shutil.move(temp, filename) - cmorize_vars = set() - var_manager = self.config.var_manager - for filename in files: - handler = Utils.open_cdf(filename) - for variable in handler.variables.keys(): - - _, var_cmor = var_manager.get_variable_and_alias( - variable, silent=True, - ) - if self.cmor.cmorize(var_cmor): - cmorize_vars.add(variable) - handler.close() - if not cmorize_vars: - continue - var_str = ",".join([str(var) for var in cmorize_vars]) - Utils.cdo().mergetime( - input=[f"-selvar,{var_str} {filepath}" for filepath in files], - output=merged - ) - for filename in files: - self._remove(filename) - tar_startdate = ( - os.path.basename(tarfile[0:-4]).split("_")[4].split("-") - ) - filename = "MMA{0}_1m_{1[0]}_{1[1]}.nc".format(grid, tar_startdate) - shutil.move(merged, os.path.join(self.cmor_scratch, filename)) - - def cmorize_atmos(self): - """Cmorize atmospheric data, from grib or MMA files""" - if not self.cmor.atmosphere: - Log.info("Skipping atmosphere cmorization due to configuration") - return True - - Log.info("\nCMORizing atmosphere\n") - if self.cmor.use_grib and self._gribfiles_available(): - return self._cmorize_grib_files() - else: - return self._cmorize_mma_files() - - def _cmorize_mma_files(self): - tar_files = glob.glob( - os.path.join(self.original_files_path, "MMA*.tar") - ) - tar_files.sort() - count = 1 - if len(tar_files) == 0: - Log.error( - "MMA files not found in {0}".format(self.original_files_path) - ) - return False - - result = True - for tarfile in tar_files: - if not self._cmorization_required( - self._get_chunk(os.path.basename(tarfile)), - (ModelingRealms.atmos,), - ): - Log.info( - "No need to unpack file {0}/{1}".format( - count, len(tar_files) - ) - ) - count += 1 - continue - Log.info( - "Unpacking atmospheric file {0}/{1}".format( - count, len(tar_files) - ) - ) - try: - self._unpack_tar_file(tarfile) - self._merge_mma_files(tarfile) - self._correct_fluxes() - self._cmorize_nc_files() - Log.result( - "Atmospheric file {0}/{1} finished".format( - count, len(tar_files) - ) - ) - except Exception as ex: - Log.error( - "Could not cmorize atmospheric file {0}: {1}\n {2}", - count, - ex, - traceback.format_exc(), - ) - result = False - - count += 1 - return result - - def _cmorize_grib_files(self): - chunk = 1 - chunk_start = parse_date(self.startdate) - result = True - while os.path.exists( - self._get_original_grib_path(chunk_start, "GG") - ) or os.path.exists(self._get_original_grib_path(chunk_start, "SH")): - - if self._cmorization_required(chunk, (ModelingRealms.atmos,)): - chunk_end = chunk_end_date( - chunk_start, - self.experiment.chunk_size, - "month", - self.experiment.calendar, - ) - chunk_end = previous_day(chunk_end, self.experiment.calendar) - Log.info( - "CMORizing chunk {0}-{1}", - date2str(chunk_start), - date2str(chunk_end), - ) - try: - for grid in ("SH", "GG"): - Log.info("Processing {0} variables", grid) - - first_grib = self._get_original_grib_path( - chunk_start, grid - ) - if not os.path.exists(first_grib): - continue - var_list = Utils.cdo().showvar(input=first_grib)[0] - codes = { - int(var.replace("var", "")) - for var in var_list.split() - } - if not codes.intersection( - self.config.cmor.get_requested_codes() - ): - Log.info( - "No requested variables found in {0}. " - "Skipping...", - grid, - ) - continue - self._cmorize_grib_file(chunk_end, chunk_start, grid) - except Exception as ex: - Log.error( - "Can not cmorize GRIB file for chunk {0}-{1}: {2}", - date2str(chunk_start), - date2str(chunk_end), - ex, - ) - result = False - chunk_start = chunk_end_date( - chunk_start, - self.experiment.chunk_size, - "month", - self.experiment.calendar, - ) - chunk += 1 - return result - - def _cmorize_grib_file(self, chunk_end, chunk_start, grid): - for month in range(0, self.experiment.chunk_size): - current_date = add_months( - chunk_start, month, self.experiment.calendar - ) - original_gribfile = self._get_original_grib_path( - current_date, grid - ) - Log.info("Processing month {1}", grid, date2str(current_date)) - gribfile = self._get_scratch_grib_path(current_date, grid) - if not os.path.isfile(gribfile): - Log.info("Copying file...", grid, date2str(current_date)) - Utils.copy_file(original_gribfile, gribfile) - - self._obtain_atmos_timestep(gribfile) - full_file = self._get_monthly_grib(current_date, gribfile, grid) - if not self._unpack_grib( - full_file, gribfile, grid, current_date.month - ): - self._remove(gribfile) - return - - next_gribfile = self._get_original_grib_path( - add_months(current_date, 1, self.experiment.calendar), grid - ) - if not os.path.exists(next_gribfile): - self._remove(gribfile) - - self._ungrib_vars(gribfile, current_date.month) - - for splited_file in glob.glob("{0}_*.128.nc".format(gribfile)): - self._remove(splited_file) - - Log.result( - "Month {0}, {1} variables finished", - date2str(current_date), - grid, - ) - - self._merge_and_cmorize_atmos( - chunk_start, chunk_end, grid, Frequencies.monthly - ) - self._merge_and_cmorize_atmos( - chunk_start, chunk_end, grid, Frequencies.daily - ) - self._merge_and_cmorize_atmos( - chunk_start, chunk_end, grid, "{0}hr".format(self.atmos_timestep) - ) - - def _unpack_grib(self, full_file, gribfile, grid, month): - Log.info("Unpacking... ") - # remap on regular Gauss grid - - codes = self.cmor.get_requested_codes() - codes_str = ",".join([str(code) for code in codes]) - try: - if grid == "SH": - Utils.cdo().splitparam( - input="-sp2gpl -selcode,{0} {1} ".format( - codes_str, full_file - ), - output=gribfile + "_", - options="-f nc4 -t ecmwf", - ) - - else: - Utils.cdo().splitparam( - input="-selcode,{0} {1}".format(codes_str, full_file), - output=gribfile + "_", - options="-R -f nc4 -t ecmwf", - ) - return True - except CDOException: - Log.info("No requested codes found in {0} file".format(grid)) - return False - finally: - Utils.remove_file(self.path_icm) - - def _get_monthly_grib(self, current_date, gribfile, grid): - prev_gribfile = self._get_scratch_grib_path( - add_months(current_date, -1, self.experiment.calendar), grid - ) - if os.path.exists(prev_gribfile): - self._merge_grib_files(current_date, prev_gribfile, gribfile) - - full_file = self.path_icm - else: - full_file = gribfile - return full_file - - def _get_scratch_grib_path(self, current_date, grid): - return os.path.join( - self.config.scratch_dir, - self._get_grib_filename(grid, current_date), - ) - - def _obtain_atmos_timestep(self, gribfile): - if self.atmos_timestep is None: - self.atmos_timestep = self._get_atmos_timestep(gribfile) - - def _get_original_grib_path(self, current_date, grid): - return os.path.join( - self.original_files_path, - self._get_grib_filename(grid, current_date), - ) - - def _get_grib_filename(self, grid, month): - return "ICM{0}{1}+{2}.grb".format( - grid, self.experiment.expid, date2str(month)[:-2] - ) - - def _get_atmos_timestep(self, gribfile): - Log.info("Getting timestep...") - import cfgrib - - grib = cfgrib.open_file(gribfile) - dates = set() - valid_time = grib.variables["valid_time"] - for t in valid_time.data: - dates.add( - cf_units.num2date( - t, - valid_time.attributes["units"], - valid_time.attributes["calendar"], - ) - ) - dates = list(dates) - dates.sort() - atmos_timestep = dates[1] - dates[0] - atmos_timestep = int(atmos_timestep.total_seconds() / 3600) - self.experiment.atmos_timestep = atmos_timestep - return atmos_timestep - - def _cmorize_nc_file(self, filename): - Log.info("Processing file {0}", filename) - - if not self._contains_requested_variables(filename): - self._remove(filename) - return - - # Utils.convert2netcdf4(filename) - frequency = self._get_nc_file_frequency(filename) - Utils.rename_variables(filename, self.alt_coord_names, False) - handler = Utils.open_cdf(filename) - Cmorizer._remove_valid_limits(handler) - self._add_common_attributes(handler, frequency) - self._update_time_variables(handler) - - variables = handler.variables.keys() - handler.close() - - Log.info("Splitting file {0}", filename) - for variable in variables: - if variable in Cmorizer.NON_DATA_VARIABLES: - continue - try: - Log.debug("Checking variable {0}", variable) - self.extract_variable(filename, frequency, variable) - except Exception as ex: - Log.error( - "Variable {0} can not be cmorized: {1}", variable, ex - ) - Log.result("File {0} cmorized!", filename) - self._remove(filename) - - @staticmethod - def _remove_valid_limits(handler): - for variable in handler.variables.keys(): - var = handler.variables[variable] - if "valid_min" in var.ncattrs(): - del var.valid_min - if "valid_max" in var.ncattrs(): - del var.valid_max - handler.sync() - - def _get_nc_file_frequency(self, filename): - file_parts = os.path.basename(filename).split("_") - if self.experiment.expid in [file_parts[1], file_parts[2]]: - frequency = Frequency("m") - elif self.experiment.expid == file_parts[0]: - try: - parse_date(file_parts[1]) - frequency = Frequency("m") - except ValueError: - frequency = Frequency(file_parts[1]) - else: - frequency = Frequency(file_parts[1]) - return frequency - - def _contains_requested_variables(self, filename): - variables = Utils.get_file_variables(filename) - return self.cmor.any_required(variables) - - def extract_variable(self, file_path, frequency, variable): - """ - Extract a variable from a file and creates the CMOR file - - Parameters - ---------- - file_path:str - frequency: Frequency - variable: str - - Raises - ------ - CMORException - If the filename does not match any of the recognized patterns - - """ - alias, var_cmor = self.config.var_manager.get_variable_and_alias( - variable - ) - if var_cmor is None: - return - - if not self.cmor.cmorize(var_cmor): - return - - temp = TempFile.get() - lev_dimensions = self._set_coordinates_attribute( - file_path, var_cmor, variable - ) - self._rename_level_coords(file_path, lev_dimensions, temp, variable) - - if alias.basin is None: - region = None - else: - region = alias.basin.name - - date_str = self._get_date_str(file_path) - if date_str is None: - Log.error( - f"Variable {var_cmor.short_name,} can not be cmorized. " - "Original filename does not match a recognized pattern", - ) - raise CMORException( - f"Variable {var_cmor.domain}:{var_cmor.short_name} can not " - "be cmorized. Original filename does not match a recognized " - "pattern" - ) - - netcdf_file = NetCDFFile() - netcdf_file.data_manager = self.data_manager - netcdf_file.local_file = temp - netcdf_file.remote_file = self.config.data_convention.get_file_path( - self.startdate, - self.member, - var_cmor.domain, - var_cmor.short_name, - var_cmor, - None, - frequency, - grid=alias.grid, - year=None, - date_str=date_str, - ) - - netcdf_file.data_convention = self.config.data_convention - netcdf_file.region = region - netcdf_file.frequency = frequency - netcdf_file.domain = var_cmor.domain - netcdf_file.var = var_cmor.short_name - netcdf_file.final_name = var_cmor.short_name - - netcdf_file.prepare_to_upload(rename_var=variable) - netcdf_file.add_cmorization_history() - netcdf_file.upload() - - if region: - region_str = " (Region {})".format(region) - else: - region_str = "" - Log.info( - "Variable {0.domain}:{0.short_name} processed{1}", - var_cmor, - region_str, - ) - - def _rename_level_coords(self, file_path, lev_dimensions, temp, variable): - cube = iris.load_cube( - file_path, - iris.Constraint(cube_func=lambda c: c.var_name == variable), - ) - for lev_original, lev_target in six.iteritems(lev_dimensions): - try: - cube.coord(var_name=lev_original).var_name = lev_target - except iris.exceptions.CoordinateNotFoundError: - pass - if cube.coord("latitude").ndim > 1: - try: - cube.coord("j"), - except iris.exceptions.CoordinateNotFoundError: - cube.add_dim_coord( - DimCoord( - range(cube.shape[-2]), - var_name="j", - long_name="Cell index along second dimension", - units=1.0, - ), - len(cube.shape) - 2, - ) - try: - cube.coord("i"), - except iris.exceptions.CoordinateNotFoundError: - cube.add_dim_coord( - DimCoord( - range(cube.shape[-1]), - var_name="i", - long_name="Cell index along first dimension", - units=1.0, - ), - len(cube.shape) - 1, - ) - - iris.save(cube, temp, zlib=True) - - def _set_coordinates_attribute(self, file_path, var_cmor, variable): - handler = Utils.open_cdf(file_path) - coords = [self.lon_name, self.lat_name, "time"] - if "leadtime" in handler.variables.keys(): - coords.append("leadtime") - lev_dimensions = self._get_lev_dimensions(var_cmor) - for lev_dim in lev_dimensions.keys(): - if lev_dim in handler.variables[variable].dimensions: - coords.append(lev_dim) - handler.variables[variable].coordinates = " ".join(set(coords)) - handler.close() - return lev_dimensions - - def _get_lev_dimensions(self, var_cmor): - if var_cmor.domain == ModelingRealms.ocean: - lev_dimensions = { - "deptht": "lev", - "depthu": "lev", - "depthw": "lev", - "depthv": "lev", - "depth": "lev", - } - elif var_cmor.domain in [ModelingRealms.landIce, ModelingRealms.land]: - lev_dimensions = { - "depth": "sdepth", - "depth_2": "sdepth", - "depth_3": "sdepth", - "depth_4": "sdepth", - } - elif var_cmor.domain == ModelingRealms.atmos: - lev_dimensions = {"depth": "plev"} - else: - lev_dimensions = {} - return lev_dimensions - - def _get_date_str(self, file_path): - file_parts = os.path.basename(file_path).split("_") - valid_starts = (self.experiment.expid, "MMA", "MMASH", "MMAGG", "MMO") - if file_parts[0] in valid_starts or file_parts[0].startswith("ORCA"): - # Model output - if file_parts[-1].endswith(".tar"): - file_parts = file_parts[-1][0:-4].split("-") - return "{0}-{1}".format(file_parts[0][0:6], file_parts[1][0:6]) - else: - return "{0}-{1}".format(file_parts[2][0:6], file_parts[3][0:6]) - elif file_parts[1] == self.experiment.expid: - # Files generated by the old version of the diagnostics - return "{0}-{1}".format(file_parts[4][0:6], file_parts[5][0:6]) - else: - return None - - def _get_chunk(self, file_path): - chunk_start = parse_date(self._get_date_str(file_path).split("-")[0]) - current_date = parse_date(self.startdate) - chunk = 1 - while current_date < chunk_start: - current_date = chunk_end_date( - current_date, - self.experiment.chunk_size, - "month", - self.experiment.calendar, - ) - chunk += 1 - - if current_date != chunk_start: - raise Exception( - "File {0} start date is not a valid chunk start date".format( - file_path - ) - ) - return chunk - - def _merge_grib_files(self, current_month, prev_gribfile, gribfile): - Log.info("Merging data from different files...") - temp = TempFile.get(suffix=".grb") - Utils.cdo().selmon( - current_month.month, input=prev_gribfile, output=temp - ) - Utils.cdo().mergetime(input=[temp, gribfile], output=self.path_icm) - self._remove(prev_gribfile) - self._remove(temp) - - def _ungrib_vars(self, gribfile, month): - for var_code in self.cmor.get_requested_codes(): - file_path = "{0}_{1}.128.nc".format(gribfile, var_code) - if not os.path.exists(file_path): - continue - cube = iris.load_cube(file_path) - - cube = self._fix_time_coord(cube, var_code) - cube = cube.extract(iris.Constraint(month_number=month)) - cube = self._change_units(cube, var_code) - - for frequency in ( - Frequencies.monthly, - Frequencies.daily, - Frequency("{0}hr".format(self.atmos_timestep)), - ): - if var_code not in self.cmor.get_variables(frequency): - continue - time_cube = self._get_time_average(cube, frequency, var_code) - - levels = self.config.cmor.get_levels(frequency, var_code) - if levels: - time_cube = time_cube.extract(level=levels) - - if cube.var_name.endswith("_2"): - time_cube.var_name = cube.var_name[:-2] - - out_file = "{0}_{1}_{2}.nc".format( - gribfile, var_code, frequency - ) - time_cube.remove_coord("month_number") - time_cube.remove_coord("day_of_month") - iris.save(time_cube, out_file, zlib=True) - - def _fix_time_coord(self, cube, var_code): - time = cube.coord("time") - target_units = "days since 1950-01-01 00:00:00" - time.convert_units( - cf_units.Unit(target_units, calendar=time.units.calendar) - ) - time.units = target_units - - if var_code in ( - 144, - 146, - 147, - 169, - 175, - 176, - 177, - 179, - 180, - 181, - 182, - 201, - 202, - 205, - 212, - 228, - ): - time.points = time.points - (self.experiment.atmos_timestep / 24.0) - iris.coord_categorisation.add_day_of_month(cube, "time") - iris.coord_categorisation.add_month_number(cube, "time") - return cube - - @staticmethod - def _get_time_average(cube, frequency, var_code): - if frequency == Frequencies.monthly: - if var_code == 201: - cube = cube.aggregated_by( - ["month_number", "day_of_month"], iris.analysis.MAX - ) - elif var_code == 202: - cube = cube.aggregated_by( - ["month_number", "day_of_month"], iris.analysis.MIN - ) - cube = cube.aggregated_by(["month_number"], iris.analysis.MEAN) - elif frequency == Frequencies.daily: - if var_code == 201: - cube = cube.aggregated_by( - ["month_number", "day_of_month"], iris.analysis.MAX - ) - elif var_code == 202: - cube = cube.aggregated_by( - ["month_number", "day_of_month"], iris.analysis.MIN - ) - else: - cube = cube.aggregated_by( - ["month_number", "day_of_month"], iris.analysis.MEAN - ) - return cube - - def _change_units(self, cube, var_code): - var_name = cube.var_name - if var_code == 129: - # geopotential - cube = cube / 9.81 - cube.units = "m" - elif var_code in (146, 147, 169, 175, 176, 177, 179, 212): - # radiation - cube = cube / (self.experiment.atmos_timestep * 3600) - cube.units = "W m-2" - elif var_code in (180, 181): - # momentum flux - cube = cube / (self.experiment.atmos_timestep * 3600) - cube.units = "N m-2" - elif var_code in (144, 182, 205, 228): - # precipitation/evaporation/runoff - cube = cube * 1000 / (self.experiment.atmos_timestep * 3600) - cube.units = "kg m-2 s-1" - cube.var_name = var_name - return cube - - def _merge_and_cmorize_atmos( - self, chunk_start, chunk_end, grid, frequency - ): - merged_file = "MMA_{0}_{1}_{2}_{3}.nc".format( - frequency, date2str(chunk_start), date2str(chunk_end), grid - ) - files = glob.glob( - os.path.join( - self.config.scratch_dir, - "{0}_*_{1}.nc".format( - self._get_grib_filename(grid, chunk_start), frequency - ), - ) - ) - - def _load_cube(cube, field, filename): - if "history" in cube.attributes: - del cube.attributes["history"] - - for first_file in files: - var_files = [] - current_month = chunk_start - while current_month < chunk_end: - var_files.append( - first_file.replace( - "+{0}.grb".format(date2str(chunk_start)[:-2]), - "+{0}.grb".format(date2str(current_month)[:-2]), - ) - ) - current_month = add_months( - current_month, 1, self.experiment.calendar - ) - var_cubes = iris.load(var_files, callback=_load_cube) - iris.util.unify_time_units(var_cubes) - var_cube = var_cubes.concatenate_cube() - iris.save(var_cube, merged_file, zlib=True) - for var_file in var_files: - self._remove(var_file) - self._cmorize_nc_file(merged_file) - - def _update_time_variables(self, handler): - time_var = handler.variables["time"] - if hasattr(time_var, "calendar"): - calendar = time_var.calendar - else: - calendar = "standard" - if "time_bnds" in handler.variables: - time_var.bounds = "time_bnds" - handler.variables["time_bnds"].units = time_var.units - Utils.convert_units( - handler.variables["time_bnds"], - "days since 1850-01-01 00:00:00", - calendar, - calendar, - ) - Utils.convert_units(time_var, "days since 1850-1-1 00:00:00", calendar) - self._set_leadtime_var(handler) - - def _set_leadtime_var(self, handler): - if "leadtime" in handler.variables: - var = handler.variables["leadtime"] - else: - var = handler.createVariable("leadtime", float, "time") - var.units = "days" - var.long_name = "Time elapsed since the start of the forecast" - var.standard_name = "forecast_period" - leadtime = Utils.get_datetime_from_netcdf(handler) - startdate = parse_date(self.startdate) - leadtime = [ - datetime( - time.year, - time.month, - time.day, - time.hour, - time.minute, - time.second, - ) - - startdate - for time in leadtime - ] - for lt, lead in enumerate(leadtime): - var[lt] = lead.days - - def _add_common_attributes(self, handler, frequency): - cmor = self.config.cmor - experiment = self.config.experiment - handler.associated_experiment = cmor.associated_experiment - handler.batch = "{0}{1}".format( - experiment.institute, - datetime.now().strftime("%Y-%m-%d(T%H:%M:%SZ)"), - ) - handler.contact = ( - "Pierre-Antoine Bretonniere, pierre-antoine.bretonniere@bsc.es , " - "Javier Vegas-Regidor, javier.vegas@bsc.es " - ) - handler.Conventions = "CF-1.6" - handler.creation_date = datetime.now().strftime("%Y-%m-%d(T%H:%M:%SZ)") - handler.experiment_id = experiment.experiment_name - startdate = parse_date(self.startdate) - handler.forecast_reference_time = ( - "{0.year}-{0.month:02}-{0.day:02}" - "(T{0.hour:02}:{0.minute:02}:{0.second:02}Z)".format(startdate) - ) - handler.frequency = frequency.frequency - handler.institute_id = experiment.institute - handler.institution = experiment.institute - handler.initialization_method = cmor.initialization_method - handler.initialization_description = cmor.initialization_description - handler.physics_version = cmor.physics_version - handler.physics_description = cmor.physics_description - handler.model_id = experiment.model - handler.associated_model = cmor.associated_model - handler.project_id = self.config.data_convention.name.upper() - handler.realization = str(self.member + 1) - handler.source = cmor.source - handler.startdate = "S{0}".format(self.startdate) - handler.tracking_id = str(uuid.uuid1()) - handler.title = "{0} model output prepared for {2} {1}".format( - experiment.model, - experiment.experiment_name, - self.config.data_convention.name.upper(), - ) - - def _gribfiles_available(self): - grb_path = os.path.join(self.original_files_path, "*.grb") - gribfiles = glob.glob(grb_path) - return len(gribfiles) > 0 - - def _cmorization_required(self, chunk, domains): - if not self.config.cmor.chunk_cmorization_requested(chunk): - return False - if self.config.cmor.force: - return True - for domain in domains: - if self.data_manager.is_cmorized( - self.startdate, self.member, chunk, domain - ): - return False - return True - - -class CMORException(Exception): - """Error during cmorization""" - - pass diff --git a/earthdiagnostics/cmormanager.py b/earthdiagnostics/cmormanager.py index e9a0fc90..b83fc53b 100644 --- a/earthdiagnostics/cmormanager.py +++ b/earthdiagnostics/cmormanager.py @@ -9,7 +9,6 @@ from bscearth.utils.log import Log from earthdiagnostics.datafile import StorageStatus from earthdiagnostics.diagnostic import Diagnostic -from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.datamanager import DataManager from earthdiagnostics.frequency import Frequencies from earthdiagnostics.modelingrealm import ModelingRealms @@ -232,7 +231,6 @@ class CMORManager(DataManager): member, chunk, grid=None, - region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -249,7 +247,6 @@ class CMORManager(DataManager): member: int chunk: int grid: str or None - region: Basin or None box: Box or None frequency: Frequency or None vartype: VariableType @@ -284,7 +281,6 @@ class CMORManager(DataManager): final_name, cmor_var, self.config.data_convention, - region, diagnostic, grid, vartype, @@ -348,7 +344,6 @@ class CMORManager(DataManager): final_name, cmor_var, self.config.data_convention, - None, diagnostic, grid, vartype, diff --git a/earthdiagnostics/datafile.py b/earthdiagnostics/datafile.py index c6e47284..7e9e4ffa 100644 --- a/earthdiagnostics/datafile.py +++ b/earthdiagnostics/datafile.py @@ -53,7 +53,6 @@ class DataFile(Publisher): self.domain = None self.var = None self.cmor_var = None - self.region = None self.frequency = None self.data_convention = None self.diagnostic = None @@ -259,8 +258,7 @@ class DataFile(Publisher): Prepare a local file to be uploaded This includes renaming the variable if necessary, - updating the metadata and adding the history and - managing the possibility of multiple regions + updating the metadata and adding the history """ if rename_var: original_name = rename_var @@ -272,7 +270,6 @@ class DataFile(Publisher): ) self._rename_coordinate_variables() self._correct_metadata() - self._prepare_region() self.add_basin_history() self.add_diagnostic_history() Utils.convert2netcdf4(self.local_file) @@ -437,70 +434,6 @@ class DataFile(Publisher): var_handler.valid_max = ( float(var_handler.valid_max) * factor + offset ) - - def _prepare_region(self): - if not self.check_is_in_storage(update_status=False): - return - cube = iris.load_cube(self.local_file) - try: - cube.coord("region") - except iris.exceptions.CoordinateNotFoundError: - return - old_cube = None - for path in (self.remote_diags_file, self.remote_file, - self.remote_cmor_file): - try: - old_cube = iris.load_cube(path) - except Exception: - pass - else: - break - if old_cube is None: - return - - new_data = {} - for region_slice in cube.slices_over("region"): - Log.debug(region_slice.coord("region").points[0]) - new_data[region_slice.coord("region").points[0]] = region_slice - for region_slice in old_cube.slices_over("region"): - region = region_slice.coord("region").points[0] - Log.debug(region) - if region not in new_data: - new_data[region] = region_slice - cube_list = iris.cube.CubeList( - [new_data[region] for region in sorted(new_data)] - ) - if len(cube_list) == 1: - return - iris.util.equalise_attributes(cube_list) - iris.util.unify_time_units(cube_list) - final_cube = cube_list.merge_cube() - temp = TempFile.get() - iris.save(final_cube, temp, zlib=True) - handler = Utils.open_cdf(temp) - renames = {} - for dim in handler.dimensions: - if dim.startswith('dim'): - renames[dim] = 'region' - if dim.startswith('string'): - renames[dim] = 'region_length' - if '-' in final_cube.var_name: - renames[final_cube.var_name.replace('-', '_')] = \ - final_cube.var_name - - Utils.rename_variables( - temp, renames, must_exist=False, rename_dimension=True) - Utils.move_file(temp, self.local_file) - handler2 = Utils.open_cdf(self.local_file) - region_var = handler2.variables['region'] - for i, cube in enumerate(cube_list): - encode = 'utf-8' - name = region_var[i, ...].tobytes().strip().decode(encode) - length = handler2.dimensions['region_length'].size - region_var[i, ...] = netCDF4.stringtoarr(name, length) - handler2.close() - self._correct_metadata() - def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' diff --git a/earthdiagnostics/datamanager.py b/earthdiagnostics/datamanager.py index c938154e..3774a697 100644 --- a/earthdiagnostics/datamanager.py +++ b/earthdiagnostics/datamanager.py @@ -42,7 +42,6 @@ class DataManager(object): final_var, cmor_var, data_convention, - region, diagnostic, grid, var_type, @@ -61,7 +60,6 @@ class DataManager(object): file_object.var = original_var file_object.final_name = final_var file_object.cmor_var = cmor_var - file_object.region = region file_object.storage_status = StorageStatus.PENDING return file_object @@ -207,7 +205,6 @@ class DataManager(object): member, chunk, grid=None, - region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -224,7 +221,6 @@ class DataManager(object): member: int chunk: int grid: str or None, optional - region: Basin or None, optional box: Box or None, optional frequency: Frequency or None, optional vartype: VariableType, optional diff --git a/earthdiagnostics/diagnostic.py b/earthdiagnostics/diagnostic.py index 2d461cfd..7f3a4270 100644 --- a/earthdiagnostics/diagnostic.py +++ b/earthdiagnostics/diagnostic.py @@ -211,7 +211,6 @@ class Diagnostic(Publisher): member, chunk, grid=None, - region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -227,7 +226,6 @@ class Diagnostic(Publisher): member: int or None chunk: int or None grid: str or None - region: Basin or None box: Box or None frequency: Frequency or None vartype: VariableType @@ -237,8 +235,6 @@ class Diagnostic(Publisher): DataFile """ - if isinstance(region, Basin): - region = region.name generated_chunk = self.data_manager.declare_chunk( domain, var, @@ -246,14 +242,11 @@ class Diagnostic(Publisher): member, chunk, grid, - region, box, diagnostic=self, vartype=vartype, frequency=frequency, ) - # if region is not None: - # generated_chunk.add_modifier(self) self._generated_files.append(generated_chunk) return generated_chunk diff --git a/earthdiagnostics/obsreconmanager.py b/earthdiagnostics/obsreconmanager.py index ad74f5a9..5f45a1dd 100644 --- a/earthdiagnostics/obsreconmanager.py +++ b/earthdiagnostics/obsreconmanager.py @@ -162,7 +162,6 @@ class ObsReconManager(DataManager): member, chunk, grid=None, - region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -179,7 +178,6 @@ class ObsReconManager(DataManager): member: int chunk: int grid: str or None, optional - region: Basin or None, optional box: Box or None, optional frequency: Frequency or None, optional vartype: VariableType, optional @@ -206,7 +204,6 @@ class ObsReconManager(DataManager): final_name, cmor_var, self.config.data_convention, - region, diagnostic, grid, vartype, diff --git a/earthdiagnostics/ocean/regionmean.py b/earthdiagnostics/ocean/regionmean.py index e2d2db10..2a8c9cfa 100644 --- a/earthdiagnostics/ocean/regionmean.py +++ b/earthdiagnostics/ocean/regionmean.py @@ -308,7 +308,6 @@ class RegionMean(Diagnostic): self.member, self.chunk, box=box_save, - region=self.basins, frequency=self.frequency, ) diff --git a/earthdiagnostics/threddsmanager.py b/earthdiagnostics/threddsmanager.py index aeec37cb..a7ba36f1 100644 --- a/earthdiagnostics/threddsmanager.py +++ b/earthdiagnostics/threddsmanager.py @@ -327,7 +327,6 @@ class THREDDSManager(DataManager): member, chunk, grid=None, - region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -338,7 +337,6 @@ class THREDDSManager(DataManager): and returns the path to the scratch's copy :param diagnostic: - :param region: :param domain: CMOR domain :type domain: Domain :param var: variable name diff --git a/test/integration/__init__.py b/test/integration/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/test/integration/test_cmorizer.py b/test/integration/test_cmorizer.py deleted file mode 100644 index 29fab22b..00000000 --- a/test/integration/test_cmorizer.py +++ /dev/null @@ -1,727 +0,0 @@ -"""Tests for earthdiagnostics.cmorizer module""" -from earthdiagnostics.cmorizer import Cmorizer -from earthdiagnostics.utils import TempFile, Utils -from earthdiagnostics.data_convention import DataConvention -from bscearth.utils import log - -from unittest import TestCase -from mock import Mock, create_autospec -import os -import tempfile -import shutil -import iris -import iris.cube -from iris.coords import DimCoord -import tarfile -import numpy as np -import six -import calendar - - -class TestCmorizer(TestCase): - """Tests for Cmorizer class""" - - def _get_variable_and_alias(self, variable, silent=False): - mock_alias = Mock() - mock_alias.basin = None - mock_alias.grid = None - - mock_variable = self._get_variable(variable) - - return mock_alias, mock_variable - - def _get_variable(self, variable, silent=False): - mock_variable = Mock() - mock_variable.short_name = variable - mock_variable.domain = "domain" - - return mock_variable - - def _get_file_path(self, *args, **kwargs): - return os.path.join(self.tmp_dir, args[3], "{0[3]}.nc".format(args)) - - def _get_file_path_grib(self, *args, **kwargs): - return os.path.join( - self.tmp_dir, args[3], str(args[6]), "{0[3]}.nc".format(args) - ) - - def setUp(self): - """Prepare tests""" - self.tmp_dir = tempfile.mkdtemp() - - self.data_manager = Mock() - - self.data_manager.is_cmorized.return_value = False - self.data_manager.config.data_dir = os.path.join(self.tmp_dir, "data") - self.data_manager.config.scratch_dir = os.path.join( - self.tmp_dir, "scratch" - ) - TempFile.scratch_folder = self.data_manager.config.scratch_dir - self.data_manager.config.data_convention = create_autospec( - DataConvention - ) - self.data_manager.config.data_convention.name = "data_convention" - self.data_manager.config.data_convention.lat_name = "lat" - self.data_manager.config.data_convention.lon_name = "lon" - self.data_manager.config.data_convention.get_file_path = ( - self._get_file_path - ) - - self.data_manager.config.var_manager.get_variable_and_alias = ( - self._get_variable_and_alias - ) - self.data_manager.config.var_manager.get_variable = self._get_variable - self.data_manager.variable_list = self.data_manager.config.var_manager - - self.data_manager.config.experiment.expid = "expid" - self.data_manager.config.experiment.model = "model" - self.data_manager.config.experiment.experiment_name = "experiment_name" - self.data_manager.config.experiment.num_chunks = 1 - self.data_manager.config.experiment.chunk_size = 1 - self.data_manager.config.experiment.institute = "institute" - self.data_manager.config.experiment.get_member_str.return_value = ( - "member" - ) - self.data_manager.config.experiment.atmos_timestep = 6 - - self.data_manager.config.cmor.force = False - self.data_manager.config.cmor.ocean = True - self.data_manager.config.cmor.atmosphere = True - self.data_manager.config.cmor.use_grib = True - self.data_manager.config.cmor.filter_files = [] - self.data_manager.config.cmor.associated_experiment = ( - "associated_experiment" - ) - self.data_manager.config.cmor.initialization_method = ( - "initialization_method" - ) - self.data_manager.config.cmor.initialization_description = ( - "initialization_description" - ) - self.data_manager.config.cmor.physics_version = "physics_version" - self.data_manager.config.cmor.physics_description = ( - "physics_description" - ) - self.data_manager.config.cmor.initialization_description = ( - "initialization_description" - ) - self.data_manager.config.cmor.associated_model = ( - "initialization_description" - ) - self.data_manager.config.cmor.source = "source" - self.data_manager.config.cmor.get_requested_codes.return_value = { - 228, - 142, - 143, - 201, - 202, - 129, - 169, - 180, - } - self.data_manager.config.cmor.get_variables.return_value = { - 228, - 142, - 143, - 201, - 202, - 129, - 169, - 180, - } - self.data_manager.config.cmor.get_levels.return_value = None - - os.makedirs(self.data_manager.config.data_dir) - os.makedirs(self.data_manager.config.scratch_dir) - - def _create_ocean_files( - self, filename, tar_name, gzip=False, backup=False - ): - folder_path = os.path.join( - self.data_manager.config.data_dir, - "expid", - "original_files", - "19900101", - "member", - "outputs", - ) - file_path, filename = self._create_file(folder_path, filename, gzip) - - if backup: - filename = os.path.join("backup", filename) - - tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode="w") - tar.add(file_path, arcname=filename, recursive=False) - tar.close() - os.remove(file_path) - - def _create_mma_files(self, filename, tar_name, gzip=False): - folder_path = os.path.join( - self.data_manager.config.data_dir, - "expid", - "original_files", - "19900101", - "member", - "outputs", - ) - filepath_gg, filename_gg = self._create_file( - folder_path, filename.replace("??", "GG"), gzip - ) - filepath_sh, filename_sh = self._create_file( - folder_path, filename.replace("??", "SH"), gzip - ) - - tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode="w") - tar.add(filepath_gg, arcname=filename_gg, recursive=False) - tar.add(filepath_sh, arcname=filename_sh, recursive=False) - tar.close() - os.remove(filepath_gg) - os.remove(filepath_sh) - - def _create_file(self, folder_path, filename, gzip): - var1 = self._create_sample_cube( - "Variable 1", "var1", threed=False, time_bounds=True - ) - var2 = self._create_sample_cube( - "Variable 2", "var2", threed=True, time_bounds=True - ) - if not os.path.isdir(folder_path): - os.makedirs(folder_path) - file_path = os.path.join(folder_path, filename) - iris.save((var1, var2), file_path, zlib=True) - if gzip: - import subprocess - - process = subprocess.Popen( - ("gzip", file_path), stdout=subprocess.PIPE - ) - comunicate = process.communicate() - file_path = "{0}.gz".format(file_path) - filename = "{0}.gz".format(filename) - if process.returncode != 0: - raise Exception("Can not compress: {0}".format(comunicate)) - return file_path, filename - - def _create_sample_cube(self, long_name, var_name, threed, time_bounds): - coord_data = np.array([1, 2], np.float) - lat = DimCoord( - coord_data, - standard_name="latitude", - long_name="latitude", - var_name="lat", - units="degrees_north", - ) - lon = DimCoord( - coord_data, - standard_name="longitude", - long_name="longitude", - var_name="lon", - units="degrees_east", - ) - time = DimCoord( - coord_data, - standard_name="time", - long_name="time", - var_name="time", - units="days since 1950-01-01", - ) - if time_bounds: - time.bounds = np.array([[0.5, 1.5], [1.5, 2.5]], np.float) - - if threed: - data = np.random.rand(2, 2, 2, 2).astype(np.float) - depth = DimCoord( - coord_data, - standard_name="depth", - long_name="Depth", - var_name="lev", - units="m", - ) - else: - data = np.random.rand(2, 2, 2).astype(np.float) - - cube = iris.cube.Cube(data, long_name=long_name, var_name=var_name) - cube.add_dim_coord(time, 0) - cube.add_dim_coord(lat, 1) - cube.add_dim_coord(lon, 2) - if threed: - cube.add_dim_coord(depth, 3) - return cube - - def tearDown(self): - """Clean up after tests""" - shutil.rmtree(self.tmp_dir) - - def _test_ocean_cmor( - self, - success=True, - error=False, - critical=False, - warnings=False, - message="", - check_vars=None, - ): - self._test_cmorization( - success=success, - error=error, - critical=critical, - warnings=warnings, - message=message, - ocean=True, - check_vars=check_vars, - ) - - def _test_atmos_cmor( - self, - success=True, - error=False, - critical=False, - warnings=False, - message="", - check_vars=None, - ): - self._test_cmorization( - success=success, - error=error, - critical=critical, - warnings=warnings, - message=message, - ocean=False, - check_vars=check_vars, - ) - - def _test_cmorization( - self, - success=True, - error=False, - critical=False, - warnings=False, - message="", - ocean=True, - check_vars=None, - ): - self._check_logs(critical, error, message, ocean, success, warnings) - if check_vars: - for variable, status in six.iteritems(check_vars): - if status: - self.assertTrue( - os.path.isfile( - os.path.join( - self.tmp_dir, - variable, - "{}.nc".format(variable), - ) - ) - ) - else: - self.assertFalse( - os.path.isfile( - os.path.join( - self.tmp_dir, - variable, - "{}.nc".format(variable), - ) - ) - ) - - def _check_logs(self, critical, error, message, ocean, success, warnings): - if six.PY3: - with self.assertLogs(log.Log.log) as cmd: - cmorizer = Cmorizer(self.data_manager, "19900101", 0) - if ocean: - cmorizer.cmorize_ocean() - else: - cmorizer.cmorize_atmos() - if message: - self.assertTrue( - [ - record - for record in cmd.records - if record.message == message - ] - ) - else: - for level, value in six.iteritems( - { - log.Log.RESULT: success, - log.Log.ERROR: error, - log.Log.CRITICAL: critical, - log.Log.WARNING: warnings, - } - ): - try: - if value: - self.assertTrue( - [ - record - for record in cmd.records - if record.levelno == level - ] - ) - else: - self.assertFalse( - [ - record - for record in cmd.records - if record.levelno == level - ] - ) - except AssertionError: - print(cmd.records) - raise - else: - cmorizer = Cmorizer(self.data_manager, "19900101", 0) - if ocean: - cmorizer.cmorize_ocean() - else: - cmorizer.cmorize_atmos() - - def test_skip_ocean_cmorization(self): - """Test ocean cmorization flag disabled option""" - self.data_manager.config.cmor.ocean = False - self._test_ocean_cmor( - message="Skipping ocean cmorization due to configuration" - ) - - def test_skip_atmos_cmorization(self): - """Test atmos cmorization flag disabled option""" - self.data_manager.config.cmor.atmosphere = False - if six.PY3: - with self.assertLogs(log.Log.log) as cmd: - cmorizer = Cmorizer(self.data_manager, "19900101", 0) - cmorizer.cmorize_atmos() - self.assertTrue( - [ - record - for record in cmd.records - if record.message - == "Skipping atmosphere cmorization due to configuration" - ] - ) - else: - cmorizer = Cmorizer(self.data_manager, "19900101", 0) - cmorizer.cmorize_ocean() - - def test_skip_when_cmorized(self): - """Test cmorization skipped if already done""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" - ) - self.data_manager.is_cmorized.return_value = True - self._test_ocean_cmor(message="No need to unpack file 1/1") - - def test_skip_when_not_requested(self): - """Test cmorization skipped if chunk is not requested""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" - ) - chunk = self.data_manager.config.cmor.chunk_cmorization_requested - chunk.return_value = False - self._test_ocean_cmor(message="No need to unpack file 1/1") - - def test_force(self): - """Test cmorization force works""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" - ) - self.data_manager.is_cmorized.return_value = True - self.data_manager.config.cmor.force = True - self._test_ocean_cmor() - - def test_ocean_cmorization_no_files(self): - """Test ocean cmorization report error if no input data""" - self._test_ocean_cmor(success=False, error=True) - - def test_ocean_cmorization_not_vars_requested(self): - """Test ocean cmorization report success if no vars qhere requested""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" - ) - self.data_manager.config.cmor.any_required.return_value = False - self._test_ocean_cmor(check_vars={"var1": False, "var2": False}) - - def test_ocean_cmorization_no_vars_recognized(self): - """Test ocean cmorization report success if no vars where recognized""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" - ) - - def not_recognized(*args): - return None, None - - self.data_manager.config.var_manager.get_variable_and_alias = ( - not_recognized - ) - self._test_ocean_cmor(check_vars={"var1": False, "var2": False}) - - def test_ocean_cmorization_var2_not_requested(self): - """Test ocean cmorization with var2 not recognized""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" - ) - - def _reject_var2(cmor_var): - return cmor_var.short_name != "var2" - - self.data_manager.config.cmor.cmorize = _reject_var2 - self._test_ocean_cmor(check_vars={"var1": True, "var2": False}) - - def test_ocean_cmorization(self): - """Test basic ocean cmorization""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" - ) - self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) - - def test_ocean_cmorization_with_filter(self): - """Test ocean cmorization filtering files""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" - ) - self.data_manager.config.cmor.filter_files = ["expid"] - self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) - - def test_ocean_cmorization_with_bad_filter(self): - """Test ocean cmorization fails if a bad filter is added""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" - ) - self.data_manager.config.cmor.filter_files = ["badfilter"] - self._test_ocean_cmor( - success=False, - error=True, - check_vars={"var1": False, "var2": False}, - ) - - def test_ocean_cmorization_gzip(self): - """Test ocean cmorization if tars are also zipped""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", - "MMO_19900101-19900131.tar", - gzip=True, - ) - self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) - - def test_ocean_cmorization_backup(self): - """Test ocean cmorization when files are in backup path""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", - "MMO_19900101-19900131.tar", - backup=True, - ) - self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) - - def test_ocean_cmorization_ppo(self): - """Test ocean cmorization when files are PPO""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", - "PPO_expid_1D_xx_19900101_19900131.tar", - ) - self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) - - def test_ocean_cmorization_diags(self): - """Test ocean cmorization when files are diags""" - self._create_ocean_files( - "expid_1d_19900101_19900131.nc", - "diags_expid_1D_xx_19900101_19900131.tar", - ) - self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) - - def test_atmos_cmorization(self): - """Test basic atmos cmorization from nc""" - self._create_mma_files( - "MMA_1d_??_19900101_19900131.nc", - "MMA_expid_19901101_fc0_19900101-19900131.tar", - ) - self._test_atmos_cmor(check_vars={"var1": True, "var2": True}) - - def test_skip_when_not_requested_mma(self): - """Test atmos cmorization is skipped if chunk is not requested""" - self._create_mma_files( - "MMA_1d_??_19900101_19900131.nc", - "MMA_expid_19901101_fc0_19900101-19900131.tar", - ) - chunk = self.data_manager.config.cmor.chunk_cmorization_requested - chunk.return_value = (False) - self._test_atmos_cmor(message="No need to unpack file 1/1") - - def test_force_mma(self): - """Test force atmos cmorization""" - self._create_mma_files( - "MMA_1d_??_19900101_19900131.nc", - "MMA_expid_19901101_fc0_19900101-19900131.tar", - ) - self.data_manager.is_cmorized.return_value = True - self.data_manager.config.cmor.force = True - self._test_atmos_cmor() - - def test_atmos_cmorization_no_mma_files(self): - """Test atmos cmorization report error if there are no files""" - self._test_atmos_cmor(success=False, error=True) - - def _create_grib_files(self, filename, month): - filename = filename.format(month) - coord_data = np.array([0, 1], np.float) - folder_path = os.path.join( - self.data_manager.config.data_dir, - "expid", - "original_files", - "19900101", - "member", - "outputs", - ) - self._create_file_for_grib( - coord_data, - folder_path, - filename.replace("??", "GG"), - [142, 143, 129, 169, 180, 228], - month, - ) - self._create_file_for_grib( - coord_data, - folder_path, - filename.replace("??", "SH"), - [201, 202], - month, - ) - - def _create_file_for_grib( - self, coord_data, folder_path, filename, codes, month - ): - lat = DimCoord( - coord_data, - standard_name="latitude", - long_name="latitude", - var_name="lat", - units="degrees_north", - ) - lon = DimCoord( - coord_data, - standard_name="longitude", - long_name="longitude", - var_name="lon", - units="degrees_east", - ) - month_days = calendar.monthrange(1990, month)[1] - month -= 1 - time_data = ( - np.arange(0.25, month_days + 0.25, 0.25, np.float) + month * 31 - ) - time = DimCoord( - time_data, - standard_name="time", - long_name="time", - var_name="time", - units="days since 1990-01-01 00:00:00", - ) - variables = [] - for code in codes: - var = iris.cube.Cube( - np.ones((month_days * 4, 2, 2), np.float) * code, - long_name="Variable {}".format(code), - var_name="var{}".format(code), - ) - for x, data in enumerate(time_data): - var.data[x, ...] += data - var.add_dim_coord(time, 0) - var.add_dim_coord(lat, 1) - var.add_dim_coord(lon, 2) - var.attributes["table"] = np.int32(128) - var.attributes["code"] = np.int32(code) - variables.append(var) - - if not os.path.isdir(folder_path): - os.makedirs(folder_path) - file_path = os.path.join(folder_path, filename) - iris.save( - variables, file_path, zlib=True, local_keys=("table", "code") - ) - Utils.cdo().settaxis( - "1990-0{}-01,06:00,6hour".format(month + 1), - input=file_path, - output=file_path.replace(".nc", ".grb"), - options="-f grb2", - ) - os.remove(file_path) - - def test_grib_cmorization(self): - """Test atmos cmorization from grib""" - self.data_manager.config.data_convention.get_file_path = ( - self._get_file_path_grib - ) - self.data_manager.config.experiment.chunk_size = 2 - self.data_manager.get_file_path = self._get_file_path_grib - self._create_grib_files("ICM??expid+19900{}.nc", 1) - self._create_grib_files("ICM??expid+19900{}.nc", 2) - self._test_atmos_cmor() - variables = { - "CP": 143, - "EWSS": 180, - "LSP": 142, - "MN2T": 202, - "MX2T": 201, - "SSRD": 169, - "TP": 228, - "Z": 129, - } - for var, code in six.iteritems(variables): - self.assertTrue(os.path.isdir(os.path.join(self.tmp_dir, var))) - base_data = np.ones((2, 2), np.float) * code - - if var in ("EWSS", "TP", "MN2T", "MX2T", "SSRD", "TP"): - if var == "MX2T": - month_offsets = np.array((16, 45.5)) - daily_offsets = np.arange(1.0, 60.0) - elif var == "MN2T": - month_offsets = np.array((15.25, 44.75)) - daily_offsets = np.arange(0.25, 59.25) - else: - month_offsets = np.array((15.625, 45.125)) - daily_offsets = np.arange(0.625, 59.625) - - hourly_offsets = np.arange(0.25, 59.25, 0.25) - else: - month_offsets = np.array((15.5, 44.875)) - daily_offsets = np.arange(0.375, 59.375) - daily_offsets[0] = 0.5 - hourly_offsets = np.arange(0.25, 59, 0.25) - - if code == 129: - factor = 9.81 - elif code in (180, 169): - factor = 6 * 3600.0 - elif code in (228, ): - factor = 6 * 3.6 - else: - factor = 1.0 - - base_data /= factor - month_offsets /= factor - daily_offsets /= factor - hourly_offsets /= factor - - monthly = iris.load_cube( - os.path.join(self.tmp_dir, var, "mon", "{}.nc".format(var)) - ) - self._test_data(monthly, base_data, month_offsets, var, "Month") - - daily = iris.load_cube( - os.path.join(self.tmp_dir, var, "day", "{}.nc".format(var)) - ) - self._test_data(daily, base_data, daily_offsets, var, "Day") - - hourly = iris.load_cube( - os.path.join(self.tmp_dir, var, "6hr", "{}.nc".format(var)) - ) - self._test_data(hourly, base_data, hourly_offsets, var, "Hour") - - def _test_data(self, data, base_data, offsets, var, freq): - self.assertEqual(data.coord("time").shape, (len(offsets),)) - for x, offset in enumerate(offsets): - self.assertTrue( - np.allclose(data.data[x, ...], base_data + offset), - "{} {} data wrong for {}: {} {} {}".format( - freq, x, var, data.data[x, ...] - base_data, - data.data[x, ...], base_data - ), - ) diff --git a/test/unit/test_cmormanager.py b/test/unit/test_cmormanager.py index d9b2811f..1086c7e5 100644 --- a/test/unit/test_cmormanager.py +++ b/test/unit/test_cmormanager.py @@ -155,30 +155,6 @@ class TestCMORManager(TestCase): ) ) - @mock.patch("earthdiagnostics.cmormanager.Cmorizer", autospec=True) - def test_prepare_cmorize(self, mock_cmor): - mock_instance = mock_cmor.return_value - self.convention.is_cmorized.return_value = False - cmor_manager = CMORManager(self.config) - self.config.experiment.get_member_list.return_value = ( - ("20000101", 2), - ) - cmor_manager.prepare() - mock_instance.cmorize_ocean.assert_called_once() - mock_instance.cmorize_atmos.assert_called_once() - - @mock.patch("earthdiagnostics.cmormanager.Cmorizer", autospec=True) - def test_prepare_cmorize_force(self, mock_cmor): - mock_instance = mock_cmor.return_value - self.config.cmor.force = True - cmor_manager = CMORManager(self.config) - self.config.experiment.get_member_list.return_value = ( - ("20000101", 2), - ) - cmor_manager.prepare() - mock_instance.cmorize_ocean.assert_called_once() - mock_instance.cmorize_atmos.assert_called_once() - @mock.patch("earthdiagnostics.cmormanager.CMORManager.is_cmorized") @mock.patch("earthdiagnostics.utils.Utils.unzip") @mock.patch("earthdiagnostics.utils.Utils.untar") -- GitLab