From 0a841767893e09d859b3b00b6a55b0c59b9a1c01 Mon Sep 17 00:00:00 2001 From: sloosvel Date: Fri, 25 Mar 2022 09:18:01 +0100 Subject: [PATCH] Revert "Remove references to region and remove cmorizer" This reverts commit f0dc9eadeecfe4a2f76a2c8434742e854ca82ac9. --- earthdiagnostics/cmorizer.py | 1129 ++++++++++++++++++++++++++ earthdiagnostics/cmormanager.py | 5 + earthdiagnostics/datafile.py | 69 +- earthdiagnostics/datamanager.py | 4 + earthdiagnostics/diagnostic.py | 7 + earthdiagnostics/obsreconmanager.py | 3 + earthdiagnostics/ocean/regionmean.py | 1 + earthdiagnostics/threddsmanager.py | 2 + test/integration/__init__.py | 0 test/integration/test_cmorizer.py | 727 +++++++++++++++++ test/unit/test_cmormanager.py | 24 + 11 files changed, 1970 insertions(+), 1 deletion(-) create mode 100644 earthdiagnostics/cmorizer.py create mode 100644 test/integration/__init__.py create mode 100644 test/integration/test_cmorizer.py diff --git a/earthdiagnostics/cmorizer.py b/earthdiagnostics/cmorizer.py new file mode 100644 index 00000000..6cb10a3d --- /dev/null +++ b/earthdiagnostics/cmorizer.py @@ -0,0 +1,1129 @@ +# coding=utf-8 +"""Cmorization classes""" +import glob +import os +import shutil +import uuid +import traceback +import time +from datetime import datetime + +import six +from bscearth.utils.date import ( + parse_date, + chunk_end_date, + previous_day, + date2str, + add_months, +) +from bscearth.utils.log import Log +from cdo import CDOException +import iris +from iris.coords import DimCoord +import iris.coord_categorisation +import iris.analysis +import iris.util +import iris.exceptions +import cf_units + +from earthdiagnostics.datafile import NetCDFFile +from earthdiagnostics.frequency import Frequency, Frequencies +from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.utils import TempFile, Utils + + +class Cmorizer(object): + """ + Class to manage CMORization + + Parameters + ---------- + data_manager: DataManager + startdate: str + member: int + + """ + + NON_DATA_VARIABLES = ( + "lon", + "lat", + "longitude", + "latitude", + "plev", + "time", + "time_bnds", + "leadtime", + "lev", + "lev_2", + "icethi", + "deptht", + "depthu", + "depthw", + "depthv", + "time_centered", + "time_centered_bounds", + "deptht_bounds", + "depthu_bounds", + "depthv_bounds", + "depthw_bounds", + "deptht_bnds", + "depthu_bnds", + "depthv_bnds", + "depthw_bnds", + "time_counter_bounds", + "ncatice", + "nav_lat_grid_V", + "nav_lat_grid_U", + "nav_lat_grid_T", + "nav_lon_grid_V", + "nav_lon_grid_U", + "nav_lon_grid_T", + "depth", + "depth_2", + "depth_3", + "depth_4", + "depth_bnds", + "depth_2_bnds", + "depth_3_bnds", + "depth_4_bnds", + "mlev", + "hyai", + "hybi", + "hyam", + "hybm", + ) + + def __init__(self, data_manager, startdate, member): + self.data_manager = data_manager + + self.startdate = startdate + self.member = member + self.config = data_manager.config + self.experiment = self.config.experiment + self.cmor = self.config.cmor + self.convetion = self.config.data_convention + self.member_str = self.experiment.get_member_str(member) + self.original_files_path = os.path.join( + self.config.data_dir, + self.experiment.expid, + "original_files", + self.startdate, + self.member_str, + "outputs", + ) + self.atmos_timestep = None + self.cmor_scratch = str( + os.path.join( + self.config.scratch_dir, + "CMOR", + self.startdate, + self.member_str, + ) + ) + + self.lon_name = self.config.data_convention.lon_name + self.lat_name = self.config.data_convention.lat_name + + self.alt_coord_names = { + "time_counter": "time", + "time_counter_bnds": "time_bnds", + "time_counter_bounds": "time_bnds", + "tbnds": "bnds", + "nav_lat": self.lat_name, + "nav_lon": self.lon_name, + "x": "i", + "y": "j", + } + + @property + def path_icm(self): + """Path to the ICM file""" + return os.path.join(self.config.scratch_dir, "ICM") + + def cmorize_ocean(self): + """Cmorize ocean files from MMO files""" + if not self.cmor.ocean: + Log.info("Skipping ocean cmorization due to configuration") + return True + Log.info("\nCMORizing ocean\n") + return self._cmorize_ocean_files("MMO", "PPO", "diags") + + def _cmorize_ocean_files(self, *args): + tar_files = () + for prefix in args: + tar_folder = os.path.join( + self.original_files_path, "{0}*".format(prefix) + ) + tar_files = glob.glob(tar_folder) + tar_files.sort() + if len(tar_files) > 0: + break + if not len(tar_files): + Log.error( + "No {1} files found in {0}".format( + self.original_files_path, args + ) + ) + return False + + count = 1 + result = True + for tarfile in tar_files: + if not self._cmorization_required( + self._get_chunk(os.path.basename(tarfile)), + ( + ModelingRealms.ocean, + ModelingRealms.seaIce, + ModelingRealms.ocnBgchem, + ), + ): + Log.info( + "No need to unpack file {0}/{1}".format( + count, len(tar_files) + ) + ) + count += 1 + else: + Log.info( + "Unpacking oceanic file {0}/{1}".format( + count, len(tar_files) + ) + ) + try: + self._unpack_tar_file(tarfile) + self._cmorize_nc_files() + Log.result( + "Oceanic file {0}/{1} finished".format( + count, len(tar_files) + ) + ) + count += 1 + except Exception as ex: + Log.error( + "Could not CMORize oceanic file {0}: {1}", count, ex + ) + result = False + count += 1 + return result + + def _filter_files(self, file_list): + if not self.cmor.filter_files: + return file_list + filtered = list() + for file_path in file_list: + filename = os.path.basename(file_path) + if any(f in filename for f in self.cmor.filter_files): + filtered.append(file_path) + else: + self._remove(file_path) + if not filtered: + raise CMORException( + f"Filters {self.cmor.filter_files} do not match any file", + ) + return filtered + + def _remove(self, file_path): + os.remove(file_path) + + def _cmorize_nc_files(self): + nc_files = glob.glob(os.path.join(self.cmor_scratch, "*.nc")) + for filename in self._filter_files(nc_files): + self._cmorize_nc_file(filename) + self._clean_cmor_scratch() + + def _correct_fluxes(self): + fluxes_vars = [ + self.data_manager.variable_list.get_variable( + cmor_var, True + ).short_name + for cmor_var in ( + "prc", + "prs", + "prsn", + "rss", + "rls", + "rsscs", + "rsds", + "rlds", + "hfss", + "hfls", + ) + ] + change_sign_vars = [ + self.data_manager.variable_list.get_variable( + cmor_var, True + ).short_name + for cmor_var in ("hfss", "hfls") + ] + total_seconds = self.experiment.atmos_timestep * 3600 + for filename in glob.glob(os.path.join(self.cmor_scratch, "*.nc")): + handler = Utils.open_cdf(filename) + for varname in handler.variables.keys(): + cmor_var = self.data_manager.variable_list.get_variable( + varname, True + ) + + if cmor_var is None or cmor_var.short_name not in fluxes_vars: + continue + + if cmor_var.short_name in change_sign_vars: + sign = -1 + else: + sign = 1 + + var_handler = handler.variables[varname] + var_handler[:] = sign * var_handler[:] / total_seconds + var_handler.units = "{0} {1}".format(var_handler.units, "s-1") + handler.close() + + def _unpack_tar_file(self, tarfile): + self._clean_cmor_scratch() + os.makedirs(self.cmor_scratch) + Utils.untar( + (tarfile,), self.cmor_scratch, self.config.cmor.filter_files + ) + if os.path.isdir(os.path.join(self.cmor_scratch, "backup")): + for filepath in glob.glob( + os.path.join(self.cmor_scratch, "backup", "*") + ): + Log.debug("Moving file {0}", filepath) + shutil.move(filepath, filepath.replace("/backup/", "/")) + zip_files = glob.glob(os.path.join(self.cmor_scratch, "*.gz")) + if zip_files: + for zip_file in self._filter_files(zip_files): + try: + Utils.unzip(zip_file) + except Utils.UnzipException as ex: + Log.error( + "File {0} could not be unzipped: {1}", tarfile, ex + ) + + def _clean_cmor_scratch(self): + if os.path.exists(self.cmor_scratch): + time.sleep(2) + shutil.rmtree(self.cmor_scratch) + + def _merge_mma_files(self, tarfile): + temp = TempFile.get() + for grid in ["SH", "GG"]: + files = glob.glob( + os.path.join(self.cmor_scratch, "MMA_*_{}_*.nc".format(grid)) + ) + if not files: + continue + merged = TempFile.get() + if grid == "SH": + for filename in files: + Utils.cdo().sp2gpl( + options="-O", input=filename, output=temp + ) + shutil.move(temp, filename) + cmorize_vars = set() + var_manager = self.config.var_manager + for filename in files: + handler = Utils.open_cdf(filename) + for variable in handler.variables.keys(): + + _, var_cmor = var_manager.get_variable_and_alias( + variable, silent=True, + ) + if self.cmor.cmorize(var_cmor): + cmorize_vars.add(variable) + handler.close() + if not cmorize_vars: + continue + var_str = ",".join([str(var) for var in cmorize_vars]) + Utils.cdo().mergetime( + input=[f"-selvar,{var_str} {filepath}" for filepath in files], + output=merged + ) + for filename in files: + self._remove(filename) + tar_startdate = ( + os.path.basename(tarfile[0:-4]).split("_")[4].split("-") + ) + filename = "MMA{0}_1m_{1[0]}_{1[1]}.nc".format(grid, tar_startdate) + shutil.move(merged, os.path.join(self.cmor_scratch, filename)) + + def cmorize_atmos(self): + """Cmorize atmospheric data, from grib or MMA files""" + if not self.cmor.atmosphere: + Log.info("Skipping atmosphere cmorization due to configuration") + return True + + Log.info("\nCMORizing atmosphere\n") + if self.cmor.use_grib and self._gribfiles_available(): + return self._cmorize_grib_files() + else: + return self._cmorize_mma_files() + + def _cmorize_mma_files(self): + tar_files = glob.glob( + os.path.join(self.original_files_path, "MMA*.tar") + ) + tar_files.sort() + count = 1 + if len(tar_files) == 0: + Log.error( + "MMA files not found in {0}".format(self.original_files_path) + ) + return False + + result = True + for tarfile in tar_files: + if not self._cmorization_required( + self._get_chunk(os.path.basename(tarfile)), + (ModelingRealms.atmos,), + ): + Log.info( + "No need to unpack file {0}/{1}".format( + count, len(tar_files) + ) + ) + count += 1 + continue + Log.info( + "Unpacking atmospheric file {0}/{1}".format( + count, len(tar_files) + ) + ) + try: + self._unpack_tar_file(tarfile) + self._merge_mma_files(tarfile) + self._correct_fluxes() + self._cmorize_nc_files() + Log.result( + "Atmospheric file {0}/{1} finished".format( + count, len(tar_files) + ) + ) + except Exception as ex: + Log.error( + "Could not cmorize atmospheric file {0}: {1}\n {2}", + count, + ex, + traceback.format_exc(), + ) + result = False + + count += 1 + return result + + def _cmorize_grib_files(self): + chunk = 1 + chunk_start = parse_date(self.startdate) + result = True + while os.path.exists( + self._get_original_grib_path(chunk_start, "GG") + ) or os.path.exists(self._get_original_grib_path(chunk_start, "SH")): + + if self._cmorization_required(chunk, (ModelingRealms.atmos,)): + chunk_end = chunk_end_date( + chunk_start, + self.experiment.chunk_size, + "month", + self.experiment.calendar, + ) + chunk_end = previous_day(chunk_end, self.experiment.calendar) + Log.info( + "CMORizing chunk {0}-{1}", + date2str(chunk_start), + date2str(chunk_end), + ) + try: + for grid in ("SH", "GG"): + Log.info("Processing {0} variables", grid) + + first_grib = self._get_original_grib_path( + chunk_start, grid + ) + if not os.path.exists(first_grib): + continue + var_list = Utils.cdo().showvar(input=first_grib)[0] + codes = { + int(var.replace("var", "")) + for var in var_list.split() + } + if not codes.intersection( + self.config.cmor.get_requested_codes() + ): + Log.info( + "No requested variables found in {0}. " + "Skipping...", + grid, + ) + continue + self._cmorize_grib_file(chunk_end, chunk_start, grid) + except Exception as ex: + Log.error( + "Can not cmorize GRIB file for chunk {0}-{1}: {2}", + date2str(chunk_start), + date2str(chunk_end), + ex, + ) + result = False + chunk_start = chunk_end_date( + chunk_start, + self.experiment.chunk_size, + "month", + self.experiment.calendar, + ) + chunk += 1 + return result + + def _cmorize_grib_file(self, chunk_end, chunk_start, grid): + for month in range(0, self.experiment.chunk_size): + current_date = add_months( + chunk_start, month, self.experiment.calendar + ) + original_gribfile = self._get_original_grib_path( + current_date, grid + ) + Log.info("Processing month {1}", grid, date2str(current_date)) + gribfile = self._get_scratch_grib_path(current_date, grid) + if not os.path.isfile(gribfile): + Log.info("Copying file...", grid, date2str(current_date)) + Utils.copy_file(original_gribfile, gribfile) + + self._obtain_atmos_timestep(gribfile) + full_file = self._get_monthly_grib(current_date, gribfile, grid) + if not self._unpack_grib( + full_file, gribfile, grid, current_date.month + ): + self._remove(gribfile) + return + + next_gribfile = self._get_original_grib_path( + add_months(current_date, 1, self.experiment.calendar), grid + ) + if not os.path.exists(next_gribfile): + self._remove(gribfile) + + self._ungrib_vars(gribfile, current_date.month) + + for splited_file in glob.glob("{0}_*.128.nc".format(gribfile)): + self._remove(splited_file) + + Log.result( + "Month {0}, {1} variables finished", + date2str(current_date), + grid, + ) + + self._merge_and_cmorize_atmos( + chunk_start, chunk_end, grid, Frequencies.monthly + ) + self._merge_and_cmorize_atmos( + chunk_start, chunk_end, grid, Frequencies.daily + ) + self._merge_and_cmorize_atmos( + chunk_start, chunk_end, grid, "{0}hr".format(self.atmos_timestep) + ) + + def _unpack_grib(self, full_file, gribfile, grid, month): + Log.info("Unpacking... ") + # remap on regular Gauss grid + + codes = self.cmor.get_requested_codes() + codes_str = ",".join([str(code) for code in codes]) + try: + if grid == "SH": + Utils.cdo().splitparam( + input="-sp2gpl -selcode,{0} {1} ".format( + codes_str, full_file + ), + output=gribfile + "_", + options="-f nc4 -t ecmwf", + ) + + else: + Utils.cdo().splitparam( + input="-selcode,{0} {1}".format(codes_str, full_file), + output=gribfile + "_", + options="-R -f nc4 -t ecmwf", + ) + return True + except CDOException: + Log.info("No requested codes found in {0} file".format(grid)) + return False + finally: + Utils.remove_file(self.path_icm) + + def _get_monthly_grib(self, current_date, gribfile, grid): + prev_gribfile = self._get_scratch_grib_path( + add_months(current_date, -1, self.experiment.calendar), grid + ) + if os.path.exists(prev_gribfile): + self._merge_grib_files(current_date, prev_gribfile, gribfile) + + full_file = self.path_icm + else: + full_file = gribfile + return full_file + + def _get_scratch_grib_path(self, current_date, grid): + return os.path.join( + self.config.scratch_dir, + self._get_grib_filename(grid, current_date), + ) + + def _obtain_atmos_timestep(self, gribfile): + if self.atmos_timestep is None: + self.atmos_timestep = self._get_atmos_timestep(gribfile) + + def _get_original_grib_path(self, current_date, grid): + return os.path.join( + self.original_files_path, + self._get_grib_filename(grid, current_date), + ) + + def _get_grib_filename(self, grid, month): + return "ICM{0}{1}+{2}.grb".format( + grid, self.experiment.expid, date2str(month)[:-2] + ) + + def _get_atmos_timestep(self, gribfile): + Log.info("Getting timestep...") + import cfgrib + + grib = cfgrib.open_file(gribfile) + dates = set() + valid_time = grib.variables["valid_time"] + for t in valid_time.data: + dates.add( + cf_units.num2date( + t, + valid_time.attributes["units"], + valid_time.attributes["calendar"], + ) + ) + dates = list(dates) + dates.sort() + atmos_timestep = dates[1] - dates[0] + atmos_timestep = int(atmos_timestep.total_seconds() / 3600) + self.experiment.atmos_timestep = atmos_timestep + return atmos_timestep + + def _cmorize_nc_file(self, filename): + Log.info("Processing file {0}", filename) + + if not self._contains_requested_variables(filename): + self._remove(filename) + return + + # Utils.convert2netcdf4(filename) + frequency = self._get_nc_file_frequency(filename) + Utils.rename_variables(filename, self.alt_coord_names, False) + handler = Utils.open_cdf(filename) + Cmorizer._remove_valid_limits(handler) + self._add_common_attributes(handler, frequency) + self._update_time_variables(handler) + + variables = handler.variables.keys() + handler.close() + + Log.info("Splitting file {0}", filename) + for variable in variables: + if variable in Cmorizer.NON_DATA_VARIABLES: + continue + try: + Log.debug("Checking variable {0}", variable) + self.extract_variable(filename, frequency, variable) + except Exception as ex: + Log.error( + "Variable {0} can not be cmorized: {1}", variable, ex + ) + Log.result("File {0} cmorized!", filename) + self._remove(filename) + + @staticmethod + def _remove_valid_limits(handler): + for variable in handler.variables.keys(): + var = handler.variables[variable] + if "valid_min" in var.ncattrs(): + del var.valid_min + if "valid_max" in var.ncattrs(): + del var.valid_max + handler.sync() + + def _get_nc_file_frequency(self, filename): + file_parts = os.path.basename(filename).split("_") + if self.experiment.expid in [file_parts[1], file_parts[2]]: + frequency = Frequency("m") + elif self.experiment.expid == file_parts[0]: + try: + parse_date(file_parts[1]) + frequency = Frequency("m") + except ValueError: + frequency = Frequency(file_parts[1]) + else: + frequency = Frequency(file_parts[1]) + return frequency + + def _contains_requested_variables(self, filename): + variables = Utils.get_file_variables(filename) + return self.cmor.any_required(variables) + + def extract_variable(self, file_path, frequency, variable): + """ + Extract a variable from a file and creates the CMOR file + + Parameters + ---------- + file_path:str + frequency: Frequency + variable: str + + Raises + ------ + CMORException + If the filename does not match any of the recognized patterns + + """ + alias, var_cmor = self.config.var_manager.get_variable_and_alias( + variable + ) + if var_cmor is None: + return + + if not self.cmor.cmorize(var_cmor): + return + + temp = TempFile.get() + lev_dimensions = self._set_coordinates_attribute( + file_path, var_cmor, variable + ) + self._rename_level_coords(file_path, lev_dimensions, temp, variable) + + if alias.basin is None: + region = None + else: + region = alias.basin.name + + date_str = self._get_date_str(file_path) + if date_str is None: + Log.error( + f"Variable {var_cmor.short_name,} can not be cmorized. " + "Original filename does not match a recognized pattern", + ) + raise CMORException( + f"Variable {var_cmor.domain}:{var_cmor.short_name} can not " + "be cmorized. Original filename does not match a recognized " + "pattern" + ) + + netcdf_file = NetCDFFile() + netcdf_file.data_manager = self.data_manager + netcdf_file.local_file = temp + netcdf_file.remote_file = self.config.data_convention.get_file_path( + self.startdate, + self.member, + var_cmor.domain, + var_cmor.short_name, + var_cmor, + None, + frequency, + grid=alias.grid, + year=None, + date_str=date_str, + ) + + netcdf_file.data_convention = self.config.data_convention + netcdf_file.region = region + netcdf_file.frequency = frequency + netcdf_file.domain = var_cmor.domain + netcdf_file.var = var_cmor.short_name + netcdf_file.final_name = var_cmor.short_name + + netcdf_file.prepare_to_upload(rename_var=variable) + netcdf_file.add_cmorization_history() + netcdf_file.upload() + + if region: + region_str = " (Region {})".format(region) + else: + region_str = "" + Log.info( + "Variable {0.domain}:{0.short_name} processed{1}", + var_cmor, + region_str, + ) + + def _rename_level_coords(self, file_path, lev_dimensions, temp, variable): + cube = iris.load_cube( + file_path, + iris.Constraint(cube_func=lambda c: c.var_name == variable), + ) + for lev_original, lev_target in six.iteritems(lev_dimensions): + try: + cube.coord(var_name=lev_original).var_name = lev_target + except iris.exceptions.CoordinateNotFoundError: + pass + if cube.coord("latitude").ndim > 1: + try: + cube.coord("j"), + except iris.exceptions.CoordinateNotFoundError: + cube.add_dim_coord( + DimCoord( + range(cube.shape[-2]), + var_name="j", + long_name="Cell index along second dimension", + units=1.0, + ), + len(cube.shape) - 2, + ) + try: + cube.coord("i"), + except iris.exceptions.CoordinateNotFoundError: + cube.add_dim_coord( + DimCoord( + range(cube.shape[-1]), + var_name="i", + long_name="Cell index along first dimension", + units=1.0, + ), + len(cube.shape) - 1, + ) + + iris.save(cube, temp, zlib=True) + + def _set_coordinates_attribute(self, file_path, var_cmor, variable): + handler = Utils.open_cdf(file_path) + coords = [self.lon_name, self.lat_name, "time"] + if "leadtime" in handler.variables.keys(): + coords.append("leadtime") + lev_dimensions = self._get_lev_dimensions(var_cmor) + for lev_dim in lev_dimensions.keys(): + if lev_dim in handler.variables[variable].dimensions: + coords.append(lev_dim) + handler.variables[variable].coordinates = " ".join(set(coords)) + handler.close() + return lev_dimensions + + def _get_lev_dimensions(self, var_cmor): + if var_cmor.domain == ModelingRealms.ocean: + lev_dimensions = { + "deptht": "lev", + "depthu": "lev", + "depthw": "lev", + "depthv": "lev", + "depth": "lev", + } + elif var_cmor.domain in [ModelingRealms.landIce, ModelingRealms.land]: + lev_dimensions = { + "depth": "sdepth", + "depth_2": "sdepth", + "depth_3": "sdepth", + "depth_4": "sdepth", + } + elif var_cmor.domain == ModelingRealms.atmos: + lev_dimensions = {"depth": "plev"} + else: + lev_dimensions = {} + return lev_dimensions + + def _get_date_str(self, file_path): + file_parts = os.path.basename(file_path).split("_") + valid_starts = (self.experiment.expid, "MMA", "MMASH", "MMAGG", "MMO") + if file_parts[0] in valid_starts or file_parts[0].startswith("ORCA"): + # Model output + if file_parts[-1].endswith(".tar"): + file_parts = file_parts[-1][0:-4].split("-") + return "{0}-{1}".format(file_parts[0][0:6], file_parts[1][0:6]) + else: + return "{0}-{1}".format(file_parts[2][0:6], file_parts[3][0:6]) + elif file_parts[1] == self.experiment.expid: + # Files generated by the old version of the diagnostics + return "{0}-{1}".format(file_parts[4][0:6], file_parts[5][0:6]) + else: + return None + + def _get_chunk(self, file_path): + chunk_start = parse_date(self._get_date_str(file_path).split("-")[0]) + current_date = parse_date(self.startdate) + chunk = 1 + while current_date < chunk_start: + current_date = chunk_end_date( + current_date, + self.experiment.chunk_size, + "month", + self.experiment.calendar, + ) + chunk += 1 + + if current_date != chunk_start: + raise Exception( + "File {0} start date is not a valid chunk start date".format( + file_path + ) + ) + return chunk + + def _merge_grib_files(self, current_month, prev_gribfile, gribfile): + Log.info("Merging data from different files...") + temp = TempFile.get(suffix=".grb") + Utils.cdo().selmon( + current_month.month, input=prev_gribfile, output=temp + ) + Utils.cdo().mergetime(input=[temp, gribfile], output=self.path_icm) + self._remove(prev_gribfile) + self._remove(temp) + + def _ungrib_vars(self, gribfile, month): + for var_code in self.cmor.get_requested_codes(): + file_path = "{0}_{1}.128.nc".format(gribfile, var_code) + if not os.path.exists(file_path): + continue + cube = iris.load_cube(file_path) + + cube = self._fix_time_coord(cube, var_code) + cube = cube.extract(iris.Constraint(month_number=month)) + cube = self._change_units(cube, var_code) + + for frequency in ( + Frequencies.monthly, + Frequencies.daily, + Frequency("{0}hr".format(self.atmos_timestep)), + ): + if var_code not in self.cmor.get_variables(frequency): + continue + time_cube = self._get_time_average(cube, frequency, var_code) + + levels = self.config.cmor.get_levels(frequency, var_code) + if levels: + time_cube = time_cube.extract(level=levels) + + if cube.var_name.endswith("_2"): + time_cube.var_name = cube.var_name[:-2] + + out_file = "{0}_{1}_{2}.nc".format( + gribfile, var_code, frequency + ) + time_cube.remove_coord("month_number") + time_cube.remove_coord("day_of_month") + iris.save(time_cube, out_file, zlib=True) + + def _fix_time_coord(self, cube, var_code): + time = cube.coord("time") + target_units = "days since 1950-01-01 00:00:00" + time.convert_units( + cf_units.Unit(target_units, calendar=time.units.calendar) + ) + time.units = target_units + + if var_code in ( + 144, + 146, + 147, + 169, + 175, + 176, + 177, + 179, + 180, + 181, + 182, + 201, + 202, + 205, + 212, + 228, + ): + time.points = time.points - (self.experiment.atmos_timestep / 24.0) + iris.coord_categorisation.add_day_of_month(cube, "time") + iris.coord_categorisation.add_month_number(cube, "time") + return cube + + @staticmethod + def _get_time_average(cube, frequency, var_code): + if frequency == Frequencies.monthly: + if var_code == 201: + cube = cube.aggregated_by( + ["month_number", "day_of_month"], iris.analysis.MAX + ) + elif var_code == 202: + cube = cube.aggregated_by( + ["month_number", "day_of_month"], iris.analysis.MIN + ) + cube = cube.aggregated_by(["month_number"], iris.analysis.MEAN) + elif frequency == Frequencies.daily: + if var_code == 201: + cube = cube.aggregated_by( + ["month_number", "day_of_month"], iris.analysis.MAX + ) + elif var_code == 202: + cube = cube.aggregated_by( + ["month_number", "day_of_month"], iris.analysis.MIN + ) + else: + cube = cube.aggregated_by( + ["month_number", "day_of_month"], iris.analysis.MEAN + ) + return cube + + def _change_units(self, cube, var_code): + var_name = cube.var_name + if var_code == 129: + # geopotential + cube = cube / 9.81 + cube.units = "m" + elif var_code in (146, 147, 169, 175, 176, 177, 179, 212): + # radiation + cube = cube / (self.experiment.atmos_timestep * 3600) + cube.units = "W m-2" + elif var_code in (180, 181): + # momentum flux + cube = cube / (self.experiment.atmos_timestep * 3600) + cube.units = "N m-2" + elif var_code in (144, 182, 205, 228): + # precipitation/evaporation/runoff + cube = cube * 1000 / (self.experiment.atmos_timestep * 3600) + cube.units = "kg m-2 s-1" + cube.var_name = var_name + return cube + + def _merge_and_cmorize_atmos( + self, chunk_start, chunk_end, grid, frequency + ): + merged_file = "MMA_{0}_{1}_{2}_{3}.nc".format( + frequency, date2str(chunk_start), date2str(chunk_end), grid + ) + files = glob.glob( + os.path.join( + self.config.scratch_dir, + "{0}_*_{1}.nc".format( + self._get_grib_filename(grid, chunk_start), frequency + ), + ) + ) + + def _load_cube(cube, field, filename): + if "history" in cube.attributes: + del cube.attributes["history"] + + for first_file in files: + var_files = [] + current_month = chunk_start + while current_month < chunk_end: + var_files.append( + first_file.replace( + "+{0}.grb".format(date2str(chunk_start)[:-2]), + "+{0}.grb".format(date2str(current_month)[:-2]), + ) + ) + current_month = add_months( + current_month, 1, self.experiment.calendar + ) + var_cubes = iris.load(var_files, callback=_load_cube) + iris.util.unify_time_units(var_cubes) + var_cube = var_cubes.concatenate_cube() + iris.save(var_cube, merged_file, zlib=True) + for var_file in var_files: + self._remove(var_file) + self._cmorize_nc_file(merged_file) + + def _update_time_variables(self, handler): + time_var = handler.variables["time"] + if hasattr(time_var, "calendar"): + calendar = time_var.calendar + else: + calendar = "standard" + if "time_bnds" in handler.variables: + time_var.bounds = "time_bnds" + handler.variables["time_bnds"].units = time_var.units + Utils.convert_units( + handler.variables["time_bnds"], + "days since 1850-01-01 00:00:00", + calendar, + calendar, + ) + Utils.convert_units(time_var, "days since 1850-1-1 00:00:00", calendar) + self._set_leadtime_var(handler) + + def _set_leadtime_var(self, handler): + if "leadtime" in handler.variables: + var = handler.variables["leadtime"] + else: + var = handler.createVariable("leadtime", float, "time") + var.units = "days" + var.long_name = "Time elapsed since the start of the forecast" + var.standard_name = "forecast_period" + leadtime = Utils.get_datetime_from_netcdf(handler) + startdate = parse_date(self.startdate) + leadtime = [ + datetime( + time.year, + time.month, + time.day, + time.hour, + time.minute, + time.second, + ) + - startdate + for time in leadtime + ] + for lt, lead in enumerate(leadtime): + var[lt] = lead.days + + def _add_common_attributes(self, handler, frequency): + cmor = self.config.cmor + experiment = self.config.experiment + handler.associated_experiment = cmor.associated_experiment + handler.batch = "{0}{1}".format( + experiment.institute, + datetime.now().strftime("%Y-%m-%d(T%H:%M:%SZ)"), + ) + handler.contact = ( + "Pierre-Antoine Bretonniere, pierre-antoine.bretonniere@bsc.es , " + "Javier Vegas-Regidor, javier.vegas@bsc.es " + ) + handler.Conventions = "CF-1.6" + handler.creation_date = datetime.now().strftime("%Y-%m-%d(T%H:%M:%SZ)") + handler.experiment_id = experiment.experiment_name + startdate = parse_date(self.startdate) + handler.forecast_reference_time = ( + "{0.year}-{0.month:02}-{0.day:02}" + "(T{0.hour:02}:{0.minute:02}:{0.second:02}Z)".format(startdate) + ) + handler.frequency = frequency.frequency + handler.institute_id = experiment.institute + handler.institution = experiment.institute + handler.initialization_method = cmor.initialization_method + handler.initialization_description = cmor.initialization_description + handler.physics_version = cmor.physics_version + handler.physics_description = cmor.physics_description + handler.model_id = experiment.model + handler.associated_model = cmor.associated_model + handler.project_id = self.config.data_convention.name.upper() + handler.realization = str(self.member + 1) + handler.source = cmor.source + handler.startdate = "S{0}".format(self.startdate) + handler.tracking_id = str(uuid.uuid1()) + handler.title = "{0} model output prepared for {2} {1}".format( + experiment.model, + experiment.experiment_name, + self.config.data_convention.name.upper(), + ) + + def _gribfiles_available(self): + grb_path = os.path.join(self.original_files_path, "*.grb") + gribfiles = glob.glob(grb_path) + return len(gribfiles) > 0 + + def _cmorization_required(self, chunk, domains): + if not self.config.cmor.chunk_cmorization_requested(chunk): + return False + if self.config.cmor.force: + return True + for domain in domains: + if self.data_manager.is_cmorized( + self.startdate, self.member, chunk, domain + ): + return False + return True + + +class CMORException(Exception): + """Error during cmorization""" + + pass diff --git a/earthdiagnostics/cmormanager.py b/earthdiagnostics/cmormanager.py index b83fc53b..e9a0fc90 100644 --- a/earthdiagnostics/cmormanager.py +++ b/earthdiagnostics/cmormanager.py @@ -9,6 +9,7 @@ from bscearth.utils.log import Log from earthdiagnostics.datafile import StorageStatus from earthdiagnostics.diagnostic import Diagnostic +from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.datamanager import DataManager from earthdiagnostics.frequency import Frequencies from earthdiagnostics.modelingrealm import ModelingRealms @@ -231,6 +232,7 @@ class CMORManager(DataManager): member, chunk, grid=None, + region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -247,6 +249,7 @@ class CMORManager(DataManager): member: int chunk: int grid: str or None + region: Basin or None box: Box or None frequency: Frequency or None vartype: VariableType @@ -281,6 +284,7 @@ class CMORManager(DataManager): final_name, cmor_var, self.config.data_convention, + region, diagnostic, grid, vartype, @@ -344,6 +348,7 @@ class CMORManager(DataManager): final_name, cmor_var, self.config.data_convention, + None, diagnostic, grid, vartype, diff --git a/earthdiagnostics/datafile.py b/earthdiagnostics/datafile.py index 7e9e4ffa..c6e47284 100644 --- a/earthdiagnostics/datafile.py +++ b/earthdiagnostics/datafile.py @@ -53,6 +53,7 @@ class DataFile(Publisher): self.domain = None self.var = None self.cmor_var = None + self.region = None self.frequency = None self.data_convention = None self.diagnostic = None @@ -258,7 +259,8 @@ class DataFile(Publisher): Prepare a local file to be uploaded This includes renaming the variable if necessary, - updating the metadata and adding the history + updating the metadata and adding the history and + managing the possibility of multiple regions """ if rename_var: original_name = rename_var @@ -270,6 +272,7 @@ class DataFile(Publisher): ) self._rename_coordinate_variables() self._correct_metadata() + self._prepare_region() self.add_basin_history() self.add_diagnostic_history() Utils.convert2netcdf4(self.local_file) @@ -434,6 +437,70 @@ class DataFile(Publisher): var_handler.valid_max = ( float(var_handler.valid_max) * factor + offset ) + + def _prepare_region(self): + if not self.check_is_in_storage(update_status=False): + return + cube = iris.load_cube(self.local_file) + try: + cube.coord("region") + except iris.exceptions.CoordinateNotFoundError: + return + old_cube = None + for path in (self.remote_diags_file, self.remote_file, + self.remote_cmor_file): + try: + old_cube = iris.load_cube(path) + except Exception: + pass + else: + break + if old_cube is None: + return + + new_data = {} + for region_slice in cube.slices_over("region"): + Log.debug(region_slice.coord("region").points[0]) + new_data[region_slice.coord("region").points[0]] = region_slice + for region_slice in old_cube.slices_over("region"): + region = region_slice.coord("region").points[0] + Log.debug(region) + if region not in new_data: + new_data[region] = region_slice + cube_list = iris.cube.CubeList( + [new_data[region] for region in sorted(new_data)] + ) + if len(cube_list) == 1: + return + iris.util.equalise_attributes(cube_list) + iris.util.unify_time_units(cube_list) + final_cube = cube_list.merge_cube() + temp = TempFile.get() + iris.save(final_cube, temp, zlib=True) + handler = Utils.open_cdf(temp) + renames = {} + for dim in handler.dimensions: + if dim.startswith('dim'): + renames[dim] = 'region' + if dim.startswith('string'): + renames[dim] = 'region_length' + if '-' in final_cube.var_name: + renames[final_cube.var_name.replace('-', '_')] = \ + final_cube.var_name + + Utils.rename_variables( + temp, renames, must_exist=False, rename_dimension=True) + Utils.move_file(temp, self.local_file) + handler2 = Utils.open_cdf(self.local_file) + region_var = handler2.variables['region'] + for i, cube in enumerate(cube_list): + encode = 'utf-8' + name = region_var[i, ...].tobytes().strip().decode(encode) + length = handler2.dimensions['region_length'].size + region_var[i, ...] = netCDF4.stringtoarr(name, length) + handler2.close() + self._correct_metadata() + def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' diff --git a/earthdiagnostics/datamanager.py b/earthdiagnostics/datamanager.py index 3774a697..c938154e 100644 --- a/earthdiagnostics/datamanager.py +++ b/earthdiagnostics/datamanager.py @@ -42,6 +42,7 @@ class DataManager(object): final_var, cmor_var, data_convention, + region, diagnostic, grid, var_type, @@ -60,6 +61,7 @@ class DataManager(object): file_object.var = original_var file_object.final_name = final_var file_object.cmor_var = cmor_var + file_object.region = region file_object.storage_status = StorageStatus.PENDING return file_object @@ -205,6 +207,7 @@ class DataManager(object): member, chunk, grid=None, + region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -221,6 +224,7 @@ class DataManager(object): member: int chunk: int grid: str or None, optional + region: Basin or None, optional box: Box or None, optional frequency: Frequency or None, optional vartype: VariableType, optional diff --git a/earthdiagnostics/diagnostic.py b/earthdiagnostics/diagnostic.py index 7f3a4270..2d461cfd 100644 --- a/earthdiagnostics/diagnostic.py +++ b/earthdiagnostics/diagnostic.py @@ -211,6 +211,7 @@ class Diagnostic(Publisher): member, chunk, grid=None, + region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -226,6 +227,7 @@ class Diagnostic(Publisher): member: int or None chunk: int or None grid: str or None + region: Basin or None box: Box or None frequency: Frequency or None vartype: VariableType @@ -235,6 +237,8 @@ class Diagnostic(Publisher): DataFile """ + if isinstance(region, Basin): + region = region.name generated_chunk = self.data_manager.declare_chunk( domain, var, @@ -242,11 +246,14 @@ class Diagnostic(Publisher): member, chunk, grid, + region, box, diagnostic=self, vartype=vartype, frequency=frequency, ) + # if region is not None: + # generated_chunk.add_modifier(self) self._generated_files.append(generated_chunk) return generated_chunk diff --git a/earthdiagnostics/obsreconmanager.py b/earthdiagnostics/obsreconmanager.py index 5f45a1dd..ad74f5a9 100644 --- a/earthdiagnostics/obsreconmanager.py +++ b/earthdiagnostics/obsreconmanager.py @@ -162,6 +162,7 @@ class ObsReconManager(DataManager): member, chunk, grid=None, + region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -178,6 +179,7 @@ class ObsReconManager(DataManager): member: int chunk: int grid: str or None, optional + region: Basin or None, optional box: Box or None, optional frequency: Frequency or None, optional vartype: VariableType, optional @@ -204,6 +206,7 @@ class ObsReconManager(DataManager): final_name, cmor_var, self.config.data_convention, + region, diagnostic, grid, vartype, diff --git a/earthdiagnostics/ocean/regionmean.py b/earthdiagnostics/ocean/regionmean.py index 2a8c9cfa..e2d2db10 100644 --- a/earthdiagnostics/ocean/regionmean.py +++ b/earthdiagnostics/ocean/regionmean.py @@ -308,6 +308,7 @@ class RegionMean(Diagnostic): self.member, self.chunk, box=box_save, + region=self.basins, frequency=self.frequency, ) diff --git a/earthdiagnostics/threddsmanager.py b/earthdiagnostics/threddsmanager.py index a7ba36f1..aeec37cb 100644 --- a/earthdiagnostics/threddsmanager.py +++ b/earthdiagnostics/threddsmanager.py @@ -327,6 +327,7 @@ class THREDDSManager(DataManager): member, chunk, grid=None, + region=None, box=None, frequency=None, vartype=VariableType.MEAN, @@ -337,6 +338,7 @@ class THREDDSManager(DataManager): and returns the path to the scratch's copy :param diagnostic: + :param region: :param domain: CMOR domain :type domain: Domain :param var: variable name diff --git a/test/integration/__init__.py b/test/integration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/integration/test_cmorizer.py b/test/integration/test_cmorizer.py new file mode 100644 index 00000000..29fab22b --- /dev/null +++ b/test/integration/test_cmorizer.py @@ -0,0 +1,727 @@ +"""Tests for earthdiagnostics.cmorizer module""" +from earthdiagnostics.cmorizer import Cmorizer +from earthdiagnostics.utils import TempFile, Utils +from earthdiagnostics.data_convention import DataConvention +from bscearth.utils import log + +from unittest import TestCase +from mock import Mock, create_autospec +import os +import tempfile +import shutil +import iris +import iris.cube +from iris.coords import DimCoord +import tarfile +import numpy as np +import six +import calendar + + +class TestCmorizer(TestCase): + """Tests for Cmorizer class""" + + def _get_variable_and_alias(self, variable, silent=False): + mock_alias = Mock() + mock_alias.basin = None + mock_alias.grid = None + + mock_variable = self._get_variable(variable) + + return mock_alias, mock_variable + + def _get_variable(self, variable, silent=False): + mock_variable = Mock() + mock_variable.short_name = variable + mock_variable.domain = "domain" + + return mock_variable + + def _get_file_path(self, *args, **kwargs): + return os.path.join(self.tmp_dir, args[3], "{0[3]}.nc".format(args)) + + def _get_file_path_grib(self, *args, **kwargs): + return os.path.join( + self.tmp_dir, args[3], str(args[6]), "{0[3]}.nc".format(args) + ) + + def setUp(self): + """Prepare tests""" + self.tmp_dir = tempfile.mkdtemp() + + self.data_manager = Mock() + + self.data_manager.is_cmorized.return_value = False + self.data_manager.config.data_dir = os.path.join(self.tmp_dir, "data") + self.data_manager.config.scratch_dir = os.path.join( + self.tmp_dir, "scratch" + ) + TempFile.scratch_folder = self.data_manager.config.scratch_dir + self.data_manager.config.data_convention = create_autospec( + DataConvention + ) + self.data_manager.config.data_convention.name = "data_convention" + self.data_manager.config.data_convention.lat_name = "lat" + self.data_manager.config.data_convention.lon_name = "lon" + self.data_manager.config.data_convention.get_file_path = ( + self._get_file_path + ) + + self.data_manager.config.var_manager.get_variable_and_alias = ( + self._get_variable_and_alias + ) + self.data_manager.config.var_manager.get_variable = self._get_variable + self.data_manager.variable_list = self.data_manager.config.var_manager + + self.data_manager.config.experiment.expid = "expid" + self.data_manager.config.experiment.model = "model" + self.data_manager.config.experiment.experiment_name = "experiment_name" + self.data_manager.config.experiment.num_chunks = 1 + self.data_manager.config.experiment.chunk_size = 1 + self.data_manager.config.experiment.institute = "institute" + self.data_manager.config.experiment.get_member_str.return_value = ( + "member" + ) + self.data_manager.config.experiment.atmos_timestep = 6 + + self.data_manager.config.cmor.force = False + self.data_manager.config.cmor.ocean = True + self.data_manager.config.cmor.atmosphere = True + self.data_manager.config.cmor.use_grib = True + self.data_manager.config.cmor.filter_files = [] + self.data_manager.config.cmor.associated_experiment = ( + "associated_experiment" + ) + self.data_manager.config.cmor.initialization_method = ( + "initialization_method" + ) + self.data_manager.config.cmor.initialization_description = ( + "initialization_description" + ) + self.data_manager.config.cmor.physics_version = "physics_version" + self.data_manager.config.cmor.physics_description = ( + "physics_description" + ) + self.data_manager.config.cmor.initialization_description = ( + "initialization_description" + ) + self.data_manager.config.cmor.associated_model = ( + "initialization_description" + ) + self.data_manager.config.cmor.source = "source" + self.data_manager.config.cmor.get_requested_codes.return_value = { + 228, + 142, + 143, + 201, + 202, + 129, + 169, + 180, + } + self.data_manager.config.cmor.get_variables.return_value = { + 228, + 142, + 143, + 201, + 202, + 129, + 169, + 180, + } + self.data_manager.config.cmor.get_levels.return_value = None + + os.makedirs(self.data_manager.config.data_dir) + os.makedirs(self.data_manager.config.scratch_dir) + + def _create_ocean_files( + self, filename, tar_name, gzip=False, backup=False + ): + folder_path = os.path.join( + self.data_manager.config.data_dir, + "expid", + "original_files", + "19900101", + "member", + "outputs", + ) + file_path, filename = self._create_file(folder_path, filename, gzip) + + if backup: + filename = os.path.join("backup", filename) + + tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode="w") + tar.add(file_path, arcname=filename, recursive=False) + tar.close() + os.remove(file_path) + + def _create_mma_files(self, filename, tar_name, gzip=False): + folder_path = os.path.join( + self.data_manager.config.data_dir, + "expid", + "original_files", + "19900101", + "member", + "outputs", + ) + filepath_gg, filename_gg = self._create_file( + folder_path, filename.replace("??", "GG"), gzip + ) + filepath_sh, filename_sh = self._create_file( + folder_path, filename.replace("??", "SH"), gzip + ) + + tar = tarfile.TarFile(os.path.join(folder_path, tar_name), mode="w") + tar.add(filepath_gg, arcname=filename_gg, recursive=False) + tar.add(filepath_sh, arcname=filename_sh, recursive=False) + tar.close() + os.remove(filepath_gg) + os.remove(filepath_sh) + + def _create_file(self, folder_path, filename, gzip): + var1 = self._create_sample_cube( + "Variable 1", "var1", threed=False, time_bounds=True + ) + var2 = self._create_sample_cube( + "Variable 2", "var2", threed=True, time_bounds=True + ) + if not os.path.isdir(folder_path): + os.makedirs(folder_path) + file_path = os.path.join(folder_path, filename) + iris.save((var1, var2), file_path, zlib=True) + if gzip: + import subprocess + + process = subprocess.Popen( + ("gzip", file_path), stdout=subprocess.PIPE + ) + comunicate = process.communicate() + file_path = "{0}.gz".format(file_path) + filename = "{0}.gz".format(filename) + if process.returncode != 0: + raise Exception("Can not compress: {0}".format(comunicate)) + return file_path, filename + + def _create_sample_cube(self, long_name, var_name, threed, time_bounds): + coord_data = np.array([1, 2], np.float) + lat = DimCoord( + coord_data, + standard_name="latitude", + long_name="latitude", + var_name="lat", + units="degrees_north", + ) + lon = DimCoord( + coord_data, + standard_name="longitude", + long_name="longitude", + var_name="lon", + units="degrees_east", + ) + time = DimCoord( + coord_data, + standard_name="time", + long_name="time", + var_name="time", + units="days since 1950-01-01", + ) + if time_bounds: + time.bounds = np.array([[0.5, 1.5], [1.5, 2.5]], np.float) + + if threed: + data = np.random.rand(2, 2, 2, 2).astype(np.float) + depth = DimCoord( + coord_data, + standard_name="depth", + long_name="Depth", + var_name="lev", + units="m", + ) + else: + data = np.random.rand(2, 2, 2).astype(np.float) + + cube = iris.cube.Cube(data, long_name=long_name, var_name=var_name) + cube.add_dim_coord(time, 0) + cube.add_dim_coord(lat, 1) + cube.add_dim_coord(lon, 2) + if threed: + cube.add_dim_coord(depth, 3) + return cube + + def tearDown(self): + """Clean up after tests""" + shutil.rmtree(self.tmp_dir) + + def _test_ocean_cmor( + self, + success=True, + error=False, + critical=False, + warnings=False, + message="", + check_vars=None, + ): + self._test_cmorization( + success=success, + error=error, + critical=critical, + warnings=warnings, + message=message, + ocean=True, + check_vars=check_vars, + ) + + def _test_atmos_cmor( + self, + success=True, + error=False, + critical=False, + warnings=False, + message="", + check_vars=None, + ): + self._test_cmorization( + success=success, + error=error, + critical=critical, + warnings=warnings, + message=message, + ocean=False, + check_vars=check_vars, + ) + + def _test_cmorization( + self, + success=True, + error=False, + critical=False, + warnings=False, + message="", + ocean=True, + check_vars=None, + ): + self._check_logs(critical, error, message, ocean, success, warnings) + if check_vars: + for variable, status in six.iteritems(check_vars): + if status: + self.assertTrue( + os.path.isfile( + os.path.join( + self.tmp_dir, + variable, + "{}.nc".format(variable), + ) + ) + ) + else: + self.assertFalse( + os.path.isfile( + os.path.join( + self.tmp_dir, + variable, + "{}.nc".format(variable), + ) + ) + ) + + def _check_logs(self, critical, error, message, ocean, success, warnings): + if six.PY3: + with self.assertLogs(log.Log.log) as cmd: + cmorizer = Cmorizer(self.data_manager, "19900101", 0) + if ocean: + cmorizer.cmorize_ocean() + else: + cmorizer.cmorize_atmos() + if message: + self.assertTrue( + [ + record + for record in cmd.records + if record.message == message + ] + ) + else: + for level, value in six.iteritems( + { + log.Log.RESULT: success, + log.Log.ERROR: error, + log.Log.CRITICAL: critical, + log.Log.WARNING: warnings, + } + ): + try: + if value: + self.assertTrue( + [ + record + for record in cmd.records + if record.levelno == level + ] + ) + else: + self.assertFalse( + [ + record + for record in cmd.records + if record.levelno == level + ] + ) + except AssertionError: + print(cmd.records) + raise + else: + cmorizer = Cmorizer(self.data_manager, "19900101", 0) + if ocean: + cmorizer.cmorize_ocean() + else: + cmorizer.cmorize_atmos() + + def test_skip_ocean_cmorization(self): + """Test ocean cmorization flag disabled option""" + self.data_manager.config.cmor.ocean = False + self._test_ocean_cmor( + message="Skipping ocean cmorization due to configuration" + ) + + def test_skip_atmos_cmorization(self): + """Test atmos cmorization flag disabled option""" + self.data_manager.config.cmor.atmosphere = False + if six.PY3: + with self.assertLogs(log.Log.log) as cmd: + cmorizer = Cmorizer(self.data_manager, "19900101", 0) + cmorizer.cmorize_atmos() + self.assertTrue( + [ + record + for record in cmd.records + if record.message + == "Skipping atmosphere cmorization due to configuration" + ] + ) + else: + cmorizer = Cmorizer(self.data_manager, "19900101", 0) + cmorizer.cmorize_ocean() + + def test_skip_when_cmorized(self): + """Test cmorization skipped if already done""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" + ) + self.data_manager.is_cmorized.return_value = True + self._test_ocean_cmor(message="No need to unpack file 1/1") + + def test_skip_when_not_requested(self): + """Test cmorization skipped if chunk is not requested""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" + ) + chunk = self.data_manager.config.cmor.chunk_cmorization_requested + chunk.return_value = False + self._test_ocean_cmor(message="No need to unpack file 1/1") + + def test_force(self): + """Test cmorization force works""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" + ) + self.data_manager.is_cmorized.return_value = True + self.data_manager.config.cmor.force = True + self._test_ocean_cmor() + + def test_ocean_cmorization_no_files(self): + """Test ocean cmorization report error if no input data""" + self._test_ocean_cmor(success=False, error=True) + + def test_ocean_cmorization_not_vars_requested(self): + """Test ocean cmorization report success if no vars qhere requested""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" + ) + self.data_manager.config.cmor.any_required.return_value = False + self._test_ocean_cmor(check_vars={"var1": False, "var2": False}) + + def test_ocean_cmorization_no_vars_recognized(self): + """Test ocean cmorization report success if no vars where recognized""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" + ) + + def not_recognized(*args): + return None, None + + self.data_manager.config.var_manager.get_variable_and_alias = ( + not_recognized + ) + self._test_ocean_cmor(check_vars={"var1": False, "var2": False}) + + def test_ocean_cmorization_var2_not_requested(self): + """Test ocean cmorization with var2 not recognized""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" + ) + + def _reject_var2(cmor_var): + return cmor_var.short_name != "var2" + + self.data_manager.config.cmor.cmorize = _reject_var2 + self._test_ocean_cmor(check_vars={"var1": True, "var2": False}) + + def test_ocean_cmorization(self): + """Test basic ocean cmorization""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" + ) + self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) + + def test_ocean_cmorization_with_filter(self): + """Test ocean cmorization filtering files""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" + ) + self.data_manager.config.cmor.filter_files = ["expid"] + self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) + + def test_ocean_cmorization_with_bad_filter(self): + """Test ocean cmorization fails if a bad filter is added""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", "MMO_19900101-19900131.tar" + ) + self.data_manager.config.cmor.filter_files = ["badfilter"] + self._test_ocean_cmor( + success=False, + error=True, + check_vars={"var1": False, "var2": False}, + ) + + def test_ocean_cmorization_gzip(self): + """Test ocean cmorization if tars are also zipped""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", + "MMO_19900101-19900131.tar", + gzip=True, + ) + self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) + + def test_ocean_cmorization_backup(self): + """Test ocean cmorization when files are in backup path""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", + "MMO_19900101-19900131.tar", + backup=True, + ) + self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) + + def test_ocean_cmorization_ppo(self): + """Test ocean cmorization when files are PPO""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", + "PPO_expid_1D_xx_19900101_19900131.tar", + ) + self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) + + def test_ocean_cmorization_diags(self): + """Test ocean cmorization when files are diags""" + self._create_ocean_files( + "expid_1d_19900101_19900131.nc", + "diags_expid_1D_xx_19900101_19900131.tar", + ) + self._test_ocean_cmor(check_vars={"var1": True, "var2": True}) + + def test_atmos_cmorization(self): + """Test basic atmos cmorization from nc""" + self._create_mma_files( + "MMA_1d_??_19900101_19900131.nc", + "MMA_expid_19901101_fc0_19900101-19900131.tar", + ) + self._test_atmos_cmor(check_vars={"var1": True, "var2": True}) + + def test_skip_when_not_requested_mma(self): + """Test atmos cmorization is skipped if chunk is not requested""" + self._create_mma_files( + "MMA_1d_??_19900101_19900131.nc", + "MMA_expid_19901101_fc0_19900101-19900131.tar", + ) + chunk = self.data_manager.config.cmor.chunk_cmorization_requested + chunk.return_value = (False) + self._test_atmos_cmor(message="No need to unpack file 1/1") + + def test_force_mma(self): + """Test force atmos cmorization""" + self._create_mma_files( + "MMA_1d_??_19900101_19900131.nc", + "MMA_expid_19901101_fc0_19900101-19900131.tar", + ) + self.data_manager.is_cmorized.return_value = True + self.data_manager.config.cmor.force = True + self._test_atmos_cmor() + + def test_atmos_cmorization_no_mma_files(self): + """Test atmos cmorization report error if there are no files""" + self._test_atmos_cmor(success=False, error=True) + + def _create_grib_files(self, filename, month): + filename = filename.format(month) + coord_data = np.array([0, 1], np.float) + folder_path = os.path.join( + self.data_manager.config.data_dir, + "expid", + "original_files", + "19900101", + "member", + "outputs", + ) + self._create_file_for_grib( + coord_data, + folder_path, + filename.replace("??", "GG"), + [142, 143, 129, 169, 180, 228], + month, + ) + self._create_file_for_grib( + coord_data, + folder_path, + filename.replace("??", "SH"), + [201, 202], + month, + ) + + def _create_file_for_grib( + self, coord_data, folder_path, filename, codes, month + ): + lat = DimCoord( + coord_data, + standard_name="latitude", + long_name="latitude", + var_name="lat", + units="degrees_north", + ) + lon = DimCoord( + coord_data, + standard_name="longitude", + long_name="longitude", + var_name="lon", + units="degrees_east", + ) + month_days = calendar.monthrange(1990, month)[1] + month -= 1 + time_data = ( + np.arange(0.25, month_days + 0.25, 0.25, np.float) + month * 31 + ) + time = DimCoord( + time_data, + standard_name="time", + long_name="time", + var_name="time", + units="days since 1990-01-01 00:00:00", + ) + variables = [] + for code in codes: + var = iris.cube.Cube( + np.ones((month_days * 4, 2, 2), np.float) * code, + long_name="Variable {}".format(code), + var_name="var{}".format(code), + ) + for x, data in enumerate(time_data): + var.data[x, ...] += data + var.add_dim_coord(time, 0) + var.add_dim_coord(lat, 1) + var.add_dim_coord(lon, 2) + var.attributes["table"] = np.int32(128) + var.attributes["code"] = np.int32(code) + variables.append(var) + + if not os.path.isdir(folder_path): + os.makedirs(folder_path) + file_path = os.path.join(folder_path, filename) + iris.save( + variables, file_path, zlib=True, local_keys=("table", "code") + ) + Utils.cdo().settaxis( + "1990-0{}-01,06:00,6hour".format(month + 1), + input=file_path, + output=file_path.replace(".nc", ".grb"), + options="-f grb2", + ) + os.remove(file_path) + + def test_grib_cmorization(self): + """Test atmos cmorization from grib""" + self.data_manager.config.data_convention.get_file_path = ( + self._get_file_path_grib + ) + self.data_manager.config.experiment.chunk_size = 2 + self.data_manager.get_file_path = self._get_file_path_grib + self._create_grib_files("ICM??expid+19900{}.nc", 1) + self._create_grib_files("ICM??expid+19900{}.nc", 2) + self._test_atmos_cmor() + variables = { + "CP": 143, + "EWSS": 180, + "LSP": 142, + "MN2T": 202, + "MX2T": 201, + "SSRD": 169, + "TP": 228, + "Z": 129, + } + for var, code in six.iteritems(variables): + self.assertTrue(os.path.isdir(os.path.join(self.tmp_dir, var))) + base_data = np.ones((2, 2), np.float) * code + + if var in ("EWSS", "TP", "MN2T", "MX2T", "SSRD", "TP"): + if var == "MX2T": + month_offsets = np.array((16, 45.5)) + daily_offsets = np.arange(1.0, 60.0) + elif var == "MN2T": + month_offsets = np.array((15.25, 44.75)) + daily_offsets = np.arange(0.25, 59.25) + else: + month_offsets = np.array((15.625, 45.125)) + daily_offsets = np.arange(0.625, 59.625) + + hourly_offsets = np.arange(0.25, 59.25, 0.25) + else: + month_offsets = np.array((15.5, 44.875)) + daily_offsets = np.arange(0.375, 59.375) + daily_offsets[0] = 0.5 + hourly_offsets = np.arange(0.25, 59, 0.25) + + if code == 129: + factor = 9.81 + elif code in (180, 169): + factor = 6 * 3600.0 + elif code in (228, ): + factor = 6 * 3.6 + else: + factor = 1.0 + + base_data /= factor + month_offsets /= factor + daily_offsets /= factor + hourly_offsets /= factor + + monthly = iris.load_cube( + os.path.join(self.tmp_dir, var, "mon", "{}.nc".format(var)) + ) + self._test_data(monthly, base_data, month_offsets, var, "Month") + + daily = iris.load_cube( + os.path.join(self.tmp_dir, var, "day", "{}.nc".format(var)) + ) + self._test_data(daily, base_data, daily_offsets, var, "Day") + + hourly = iris.load_cube( + os.path.join(self.tmp_dir, var, "6hr", "{}.nc".format(var)) + ) + self._test_data(hourly, base_data, hourly_offsets, var, "Hour") + + def _test_data(self, data, base_data, offsets, var, freq): + self.assertEqual(data.coord("time").shape, (len(offsets),)) + for x, offset in enumerate(offsets): + self.assertTrue( + np.allclose(data.data[x, ...], base_data + offset), + "{} {} data wrong for {}: {} {} {}".format( + freq, x, var, data.data[x, ...] - base_data, + data.data[x, ...], base_data + ), + ) diff --git a/test/unit/test_cmormanager.py b/test/unit/test_cmormanager.py index 1086c7e5..d9b2811f 100644 --- a/test/unit/test_cmormanager.py +++ b/test/unit/test_cmormanager.py @@ -155,6 +155,30 @@ class TestCMORManager(TestCase): ) ) + @mock.patch("earthdiagnostics.cmormanager.Cmorizer", autospec=True) + def test_prepare_cmorize(self, mock_cmor): + mock_instance = mock_cmor.return_value + self.convention.is_cmorized.return_value = False + cmor_manager = CMORManager(self.config) + self.config.experiment.get_member_list.return_value = ( + ("20000101", 2), + ) + cmor_manager.prepare() + mock_instance.cmorize_ocean.assert_called_once() + mock_instance.cmorize_atmos.assert_called_once() + + @mock.patch("earthdiagnostics.cmormanager.Cmorizer", autospec=True) + def test_prepare_cmorize_force(self, mock_cmor): + mock_instance = mock_cmor.return_value + self.config.cmor.force = True + cmor_manager = CMORManager(self.config) + self.config.experiment.get_member_list.return_value = ( + ("20000101", 2), + ) + cmor_manager.prepare() + mock_instance.cmorize_ocean.assert_called_once() + mock_instance.cmorize_atmos.assert_called_once() + @mock.patch("earthdiagnostics.cmormanager.CMORManager.is_cmorized") @mock.patch("earthdiagnostics.utils.Utils.unzip") @mock.patch("earthdiagnostics.utils.Utils.untar") -- GitLab