# coding: utf-8 """Module for classes to manage storage manipulation""" import csv import os from datetime import datetime import iris import iris.cube import iris.coords import iris.exceptions import iris.experimental.equalise_cubes from bscearth.utils.log import Log from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.publisher import Publisher from earthdiagnostics.variable import VariableType import netCDF4 class LocalStatus(object): """Local file status enumeration""" PENDING = 0 DOWNLOADING = 1 READY = 2 FAILED = 3 NOT_REQUESTED = 4 COMPUTING = 5 class StorageStatus(object): """Remote file status enumeration""" PENDING = 0 UPLOADING = 1 READY = 2 FAILED = 3 NO_STORE = 4 class DataFile(Publisher): """ Represent a data file Must be derived for each concrete data file format """ def __init__(self): super(DataFile, self).__init__() self.remote_file = None self.local_file = None self.domain = None self.var = None self.cmor_var = None self.region = None self.frequency = None self.data_convention = None self.diagnostic = None self.grid = None self.data_manager = None self.final_name = None self.var_type = VariableType.MEAN self._local_status = LocalStatus.NOT_REQUESTED self._storage_status = StorageStatus.READY self.job_added = False self._modifiers = [] self._size = None def __str__(self): return "Data file for {0}".format(self.remote_file) @property def size(self): """File size""" if self._size is None: self._get_size() return self._size def _get_size(self): try: if self.local_status == LocalStatus.READY: self._size = os.path.getsize(self.local_file) except OSError: self._size = None def clean_local(self): """Check if a local file is still needed and remove it if not""" if ( self.local_status != LocalStatus.READY or self.suscribers or self.upload_required() or self.storage_status == StorageStatus.UPLOADING ): return Log.debug( "File {0} no longer needed. Deleting from scratch...".format( self.remote_file ) ) os.remove(self.local_file) Log.debug("File {0} deleted from scratch".format(self.remote_file)) self.local_file = None self.local_status = LocalStatus.PENDING def upload_required(self): """ Get if an upload is needed for this file Returns ------- bool """ return ( self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING ) def download_required(self): """ Get if a download is required for this file Returns ------- bool """ if not self.local_status == LocalStatus.PENDING: return False if self.storage_status == StorageStatus.READY: return True if self.has_modifiers(): return True return False def add_modifier(self, diagnostic): """ Register a diagnostic as a modifier of this data A modifier diagnostic is a diagnostic that read this data and changes it in any way. The diagnostic must be a modifier even if it only affects the metadata Parameters ---------- diagnostic: Diagnostic """ self._modifiers.append(diagnostic) def has_modifiers(self): """ Check if it has registered modifiers Returns ------- bool """ return bool(self._modifiers) def ready_to_run(self, diagnostic): """ Check if the data is ready to run for a given diagnostics To be ready to run, the datafile should be in the local storage and no modifiers can be pending. Parameters ---------- diagnostic: Diagnostic Returns ------- bool """ if not self.local_status == LocalStatus.READY: return False if not self._modifiers: return True return self._modifiers[0] is diagnostic @property def remote_diags_file(self): remote_diags_file = self.remote_file.replace("/cmorfiles/", "/diags/") remote_diags_file = remote_diags_file.replace("/original_files/", "/") return remote_diags_file @property def local_status(self): """Get local storage status""" return self._local_status @local_status.setter def local_status(self, value): if self._local_status == value: return self._local_status = value self._size = None self.dispatch(self) @property def storage_status(self): """Get remote storage status""" return self._storage_status @storage_status.setter def storage_status(self, value): if self._storage_status == value: return self._storage_status = value self._size = None self.dispatch(self) @classmethod def from_storage(cls, filepath, data_convention): """Create a new datafile to be downloaded from the storage""" file_object = cls() file_object.remote_file = filepath file_object.local_status = LocalStatus.PENDING file_object.data_convention = data_convention return file_object @classmethod def to_storage(cls, remote_file, data_convention): """Creates new datafile object for a file to be generated and stored""" new_object = cls() new_object.remote_file = remote_file new_object.storage_status = StorageStatus.PENDING new_object.data_convention = data_convention return new_object def download(self): """ Get data from remote storage to the local one Must be overriden by the derived classes Raises ------ NotImplementedError If the derived classes do not override this """ raise NotImplementedError("Class must implement the download method") def prepare_to_upload(self, rename_var): """ Prepare a local file to be uploaded This includes renaming the variable if necessary, updating the metadata and adding the history and managing the possibility of multiple regions """ if rename_var: original_name = rename_var else: original_name = self.var if self.final_name != original_name: Utils.rename_variable( self.local_file, original_name, self.final_name ) self._rename_coordinate_variables() self._correct_metadata() self._prepare_region() self.add_basin_history() self.add_diagnostic_history() Utils.convert2netcdf4(self.local_file) def upload(self): """Send a local file to the storage""" self.storage_status = StorageStatus.UPLOADING remote_file = self.remote_file try: Utils.copy_file( self.local_file, self.remote_diags_file, save_hash=True) except (OSError, Exception) as ex: Log.error("File {0} can not be uploaded: {1}", remote_file, ex) self.storage_status = StorageStatus.FAILED return Log.info("File {0} uploaded!", self.remote_file) # self.create_link() self.storage_status = StorageStatus.READY def set_local_file(self, local_file, diagnostic=None, rename_var=""): """ Set the local file generated by EarthDiagnostics This also prepares it for the upload Parameters ---------- local_file: str diagnostic: Diagnostic or None rename_var: str Returns ------- None """ self.storage_status = StorageStatus.PENDING if diagnostic in self._modifiers: self._modifiers.remove(diagnostic) self.local_file = local_file self.prepare_to_upload(rename_var) self.local_status = LocalStatus.READY def create_link(self): """Create a link from the original in _ folder""" pass def _correct_metadata(self): handler = Utils.open_cdf(self.local_file) var_handler = handler.variables[self.final_name] coords = set.intersection( { 'time', 'lev', self.data_convention.lat_name, self.data_convention.lon_name, 'leadtime', 'region', 'time_centered' }, set(handler.variables.keys()), ) var_handler.coordinates = Utils.convert_to_ascii_if_possible( " ".join(coords) ) if "time_centered" in handler.variables: if hasattr(handler.variables["time_centered"], "standard_name"): del handler.variables["time_centered"].standard_name if not self.cmor_var: handler.close() return self._fix_variable_name(var_handler) handler.modeling_realm = self.cmor_var.domain.name table = self.cmor_var.get_table(self.frequency, self.data_convention) handler.table_id = "Table {0} ({1})".format(table.name, table.date) if self.cmor_var.units: self._fix_units(var_handler) handler.sync() self._fix_coordinate_variables_metadata(handler) var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type) def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name def _fix_values_metadata(self, var_type, file_path=None): if file_path is None: file_path = self.local_file valid_min = "" valid_max = "" if self.cmor_var is not None: if self.cmor_var.valid_min: valid_min = '-a valid_min,{0},o,{1},"{2}" '.format( self.final_name, var_type.char, self.cmor_var.valid_min ) if self.cmor_var.valid_max: valid_max = '-a valid_max,{0},o,{1},"{2}" '.format( self.final_name, var_type.char, self.cmor_var.valid_max ) Utils.nco().ncatted( input=file_path, output=file_path, options=( '-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format( self.final_name, var_type.char, valid_min, valid_max ), ), ) def _fix_coordinate_variables_metadata(self, handler): lat_name = self.data_convention.lat_name lon_name = self.data_convention.lon_name if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if self.domain == ModelingRealms.ocean: handler.variables['lev'].standard_name = 'depth' if lon_name in handler.variables: handler.variables[lon_name].short_name = lon_name handler.variables[lon_name].standard_name = 'longitude' if lat_name in handler.variables: handler.variables[lat_name].short_name = lat_name handler.variables[lat_name].standard_name = 'latitude' def _fix_units(self, var_handler): if "units" not in var_handler.ncattrs(): return if var_handler.units == "-": var_handler.units = "1.0" if var_handler.units == "PSU": var_handler.units = "psu" if var_handler.units == "C" and self.cmor_var.units == "K": var_handler.units = "deg_C" if self.cmor_var.units != var_handler.units: self._convert_units(var_handler) var_handler.units = self.cmor_var.units def _convert_units(self, var_handler): try: Utils.convert_units(var_handler, self.cmor_var.units) except ValueError as ex: Log.warning( "Can not convert {3} from {0} to {1}: {2}", var_handler.units, self.cmor_var.units, ex, self.cmor_var.short_name, ) factor, offset = UnitConversion.get_conversion_factor_offset( var_handler.units, self.cmor_var.units ) var_handler[:] = var_handler[:] * factor + offset if "valid_min" in var_handler.ncattrs(): var_handler.valid_min = ( float(var_handler.valid_min) * factor + offset ) if "valid_max" in var_handler.ncattrs(): var_handler.valid_max = ( float(var_handler.valid_max) * factor + offset ) def _prepare_region(self): if not self.check_is_in_storage(update_status=False): return cube = iris.load_cube(self.local_file) try: cube.coord("region") except iris.exceptions.CoordinateNotFoundError: return try: old_cube = iris.load_cube(self.remote_diags_file) except Exception: try: old_cube = iris.load_cube(self.remote_file) except Exception: # Bad data, overwrite return new_data = {} for region_slice in cube.slices_over("region"): Log.debug(region_slice.coord("region").points[0]) new_data[region_slice.coord("region").points[0]] = region_slice for region_slice in old_cube.slices_over("region"): region = region_slice.coord("region").points[0] Log.debug(region) if region not in new_data: new_data[region] = region_slice cube_list = iris.cube.CubeList( [new_data[region] for region in sorted(new_data)] ) if len(cube_list) == 1: return iris.experimental.equalise_cubes.equalise_attributes(cube_list) final_cube = cube_list.merge_cube() temp = TempFile.get() iris.save(final_cube, temp, zlib=True) handler = Utils.open_cdf(temp) renames = {} for dim in handler.dimensions: if dim.startswith('dim'): renames[dim] = 'region' if dim.startswith('string'): renames[dim] = 'region_length' if '-' in final_cube.var_name: renames[final_cube.var_name.replace('-', '_')] = \ final_cube.var_name Utils.rename_variables( temp, renames, must_exist=False, rename_dimension=True) Utils.move_file(temp, self.local_file) handler2 = Utils.open_cdf(self.local_file) region_var = handler2.variables['region'] for i, cube in enumerate(cube_list): encode = 'utf-8' name = region_var[i, ...].tobytes().strip().decode(encode) length = handler2.dimensions['region_length'].size region_var[i, ...] = netCDF4.stringtoarr(name, length) handler2.close() self._correct_metadata() def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = self.data_convention.lat_name variables['nav_lon_grid_V'] = self.data_convention.lon_name variables['nav_lat_grid_U'] = self.data_convention.lat_name variables['nav_lon_grid_U'] = self.data_convention.lon_name variables['nav_lat_grid_T'] = self.data_convention.lat_name variables['nav_lon_grid_T'] = self.data_convention.lon_name Utils.rename_variables(self.local_file, variables, False) def add_diagnostic_history(self): """Add diagnostic history line to local file""" if not self.diagnostic: return from earthdiagnostics.earthdiags import EarthDiags history_line = ( f"Diagnostic {self.diagnostic} calculated with EarthDiagnostics " f"version {EarthDiags.version}" ) self._add_history_line(history_line) def add_cmorization_history(self): """Add cmorization history line to local file""" from earthdiagnostics.earthdiags import EarthDiags history_line = "CMORized with Earthdiagnostics version {0}".format( EarthDiags.version ) self._add_history_line(history_line) def add_basin_history(self): """Add basin history line to local file""" if not os.path.isfile('basins.nc'): return basins = iris.load_cube('basins.nc') history_line = ( "Using Basins masks file " f"version {basins.attributes['version']} with " f"grid {basins.attributes['grid']}. " "Original file can be found in /esarchive/autosubmit/conf_files." ) self._add_history_line(history_line) def _add_history_line(self, history_line): utc_datetime = "UTC " + datetime.utcnow().isoformat() history_line = "{0}: {1};".format(utc_datetime, history_line) handler = Utils.open_cdf(self.local_file) try: history_line = history_line + handler.history except AttributeError: history_line = history_line handler.history = Utils.convert_to_ascii_if_possible(history_line) handler.close() class UnitConversion(object): """ Class to manage unit conversions Parameters ---------- source: str destiny: str factor: float offset: float """ _dict_conversions = None def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def load_conversions(cls): """Load conversions from the configuration file""" cls._dict_conversions = dict() with open( os.path.join( os.path.dirname(os.path.realpath(__file__)), "conversions.csv" ), "r", ) as csvfile: reader = csv.reader(csvfile, dialect="excel") for line in reader: if line[0] == "original": continue cls.add_conversion( UnitConversion(line[0], line[1], line[2], line[3]) ) @classmethod def add_conversion(cls, conversion): """ Add a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[ (conversion.source, conversion.destiny) ] = conversion @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Get the conversion factor and offset for two units. The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if "^" in units[0]: values = units[0].split("^") scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if "^" in units[0]: values = units[0].split("^") scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None def check_is_in_storage(self): return class NetCDFFile(DataFile): """Implementation of DataFile for netCDF files""" def download(self): """Get data from remote storage to the local one""" for path in ( self.remote_diags_file, self.remote_file, ): if os.path.isfile(path): try: self.local_status = LocalStatus.DOWNLOADING Log.debug("Downloading file {0}...", path) if not self.local_file: self.local_file = TempFile.get() try: Utils.copy_file(path, self.local_file, retrials=1) except Utils.CopyException: Utils.copy_file(path, self.local_file, retrials=2) if self.data_convention == "meteofrance": Log.debug( "Converting variable names from " "meteofrance convention" ) alt_coord_names = { "time_counter": "time", "time_counter_bounds": "time_bnds", "tbnds": "bnds", "nav_lat": "lat", "nav_lon": "lon", "x": "i", "y": "j", } Utils.rename_variables( self.local_file, alt_coord_names, must_exist=False ) Log.info("File {0} ready!", path) self.local_status = LocalStatus.READY return except Exception as ex: if os.path.isfile(self.local_file): os.remove(self.local_file) Log.error("File {0} not available: {1}", path, ex) self.local_status = LocalStatus.FAILED return Log.error( "File {0} not available: {1}", self.remote_file, "FileNotFound" ) self.local_status = LocalStatus.FAILED def check_is_in_storage(self, update_status=True): for path in ( self.remote_file, self.remote_diags_file ): if os.path.isfile(path): if update_status: self.storage_status = StorageStatus.READY return True return False def create_link(self): """Create a link from original data to _ folder""" try: self.data_convention.create_link( self.domain, self.remote_file, self.frequency, self.final_name, self.grid, True, self.var_type, ) except (ValueError, Exception) as ex: Log.error( "Can not create link to {1}: {0}".format(ex, self.remote_file) ) def _get_size(self): try: if self.local_status == LocalStatus.READY: self._size = os.path.getsize(self.local_file) if self.storage_status == StorageStatus.READY: self._size = os.path.getsize(self.remote_file) except OSError: self._size = None