# coding: utf-8 """Module for classes to manage storage manipulation""" import csv import os from datetime import datetime import iris import iris.cube import iris.coords import iris.exceptions import iris.experimental.equalise_cubes from bscearth.utils.log import Log from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.publisher import Publisher from earthdiagnostics.variable import VariableType class LocalStatus(object): """Local file status enumeration""" PENDING = 0 DOWNLOADING = 1 READY = 2 FAILED = 3 NOT_REQUESTED = 4 COMPUTING = 5 class StorageStatus(object): """Remote file status enumeration""" PENDING = 0 UPLOADING = 1 READY = 2 FAILED = 3 NO_STORE = 4 class DataFile(Publisher): """ Represent a data file Must be derived for each concrete data file format """ def __init__(self): super(DataFile, self).__init__() self.remote_file = None self.local_file = None self.domain = None self.var = None self.cmor_var = None self.region = None self.frequency = None self.data_convention = None self.diagnostic = None self.grid = None self.data_manager = None self.final_name = None self.var_type = VariableType.MEAN self._local_status = LocalStatus.NOT_REQUESTED self._storage_status = StorageStatus.READY self.job_added = False self._modifiers = [] self._size = None def __str__(self): return 'Data file for {0}'.format(self.remote_file) @property def size(self): """File size""" if self._size is None: self._get_size() return self._size def _get_size(self): try: if self.local_status == LocalStatus.READY: self._size = os.path.getsize(self.local_file) except OSError: self._size = None def clean_local(self): """Check if a local file is still needed and remove it if not""" if self.local_status != LocalStatus.READY or self.suscribers or self.upload_required() or \ self.storage_status == StorageStatus.UPLOADING: return Log.debug('File {0} no longer needed. Deleting from scratch...'.format( self.remote_file)) os.remove(self.local_file) Log.debug('File {0} deleted from scratch'.format(self.remote_file)) self.local_file = None self.local_status = LocalStatus.PENDING def upload_required(self): """ Get if an upload is needed for this file Returns ------- bool """ return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING def download_required(self): """ Get if a download is required for this file Returns ------- bool """ if not self.local_status == LocalStatus.PENDING: return False if self.storage_status == StorageStatus.READY: return True if self.has_modifiers(): return True return False def add_modifier(self, diagnostic): """ Register a diagnostic as a modifier of this data A modifier diagnostic is a diagnostic that read this data and changes it in any way. The diagnostic must be a modifier even if it only affects the metadata Parameters ---------- diagnostic: Diagnostic """ self._modifiers.append(diagnostic) def has_modifiers(self): """ Check if it has registered modifiers Returns ------- bool """ return bool(self._modifiers) def ready_to_run(self, diagnostic): """ Check if the data is ready to run for a given diagnostics To be ready to run, the datafile should be in the local storage and no modifiers can be pending. Parameters ---------- diagnostic: Diagnostic Returns ------- bool """ if not self.local_status == LocalStatus.READY: return False if not self._modifiers: return True return self._modifiers[0] is diagnostic @property def local_status(self): """Get local storage status""" return self._local_status @local_status.setter def local_status(self, value): if self._local_status == value: return self._local_status = value self._size = None self.dispatch(self) @property def storage_status(self): """Get remote storage status""" return self._storage_status @storage_status.setter def storage_status(self, value): if self._storage_status == value: return self._storage_status = value self._size = None self.dispatch(self) @classmethod def from_storage(cls, filepath, data_convention): """Create a new datafile to be downloaded from the storage""" file_object = cls() file_object.remote_file = filepath file_object.local_status = LocalStatus.PENDING file_object.data_convention = data_convention return file_object @classmethod def to_storage(cls, remote_file, data_convention): """Create a new datafile object for a file that is going to be generated and stored""" new_object = cls() new_object.remote_file = remote_file new_object.storage_status = StorageStatus.PENDING new_object.data_convention = data_convention return new_object def download(self): """ Get data from remote storage to the local one Must be overriden by the derived classes Raises ------ NotImplementedError If the derived classes do not override this """ raise NotImplementedError('Class must implement the download method') def prepare_to_upload(self, rename_var): """ Prepare a local file to be uploaded This includes renaming the variable if necessary, updating the metadata and adding the history and managing the possibility of multiple regions """ if rename_var: original_name = rename_var else: original_name = self.var if self.final_name != original_name: Utils.rename_variable( self.local_file, original_name, self.final_name) self._rename_coordinate_variables() self._correct_metadata() self._prepare_region() self.add_diagnostic_history() Utils.convert2netcdf4(self.local_file) def upload(self): """Send a local file to the storage""" self.storage_status = StorageStatus.UPLOADING remote_file = self.remote_file try: if '/cmorfiles/' in remote_file: remote_file = remote_file.replace('/cmorfiles/', '/diags/') Utils.copy_file(self.local_file, remote_file, save_hash=True) except (OSError, Exception) as ex: Log.error('File {0} can not be uploaded: {1}', remote_file, ex) self.storage_status = StorageStatus.FAILED return Log.info('File {0} uploaded!', self.remote_file) # self.create_link() self.storage_status = StorageStatus.READY def set_local_file(self, local_file, diagnostic=None, rename_var=''): """ Set the local file generated by EarthDiagnostics This also prepares it for the upload Parameters ---------- local_file: str diagnostic: Diagnostic or None rename_var: str Returns ------- None """ self.storage_status = StorageStatus.PENDING if diagnostic in self._modifiers: self._modifiers.remove(diagnostic) self.local_file = local_file self.prepare_to_upload(rename_var) self.local_status = LocalStatus.READY def create_link(self): """Create a link from the original data in the _ folder""" pass def _correct_metadata(self): handler = Utils.open_cdf(self.local_file) var_handler = handler.variables[self.final_name] coords = set.intersection( { 'time', 'lev', self.data_convention.lat_name, self.data_convention.lon_name, 'leadtime', 'region', 'time_centered' }, set(handler.variables.keys()) ) var_handler.coordinates = Utils.convert_to_ascii_if_possible( ' '.join(coords)) if 'time_centered' in handler.variables: if hasattr(handler.variables['time_centered'], 'standard_name'): del handler.variables['time_centered'].standard_name if not self.cmor_var: handler.close() return self._fix_variable_name(var_handler) handler.modeling_realm = self.cmor_var.domain.name table = self.cmor_var.get_table(self.frequency, self.data_convention) handler.table_id = 'Table {0} ({1})'.format(table.name, table.date) if self.cmor_var.units: self._fix_units(var_handler) handler.sync() self._fix_coordinate_variables_metadata(handler) var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type) def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name def _fix_values_metadata(self, var_type, file_path=None): if file_path is None: file_path = self.local_file valid_min = '' valid_max = '' if self.cmor_var is not None: if self.cmor_var.valid_min: valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_min) if self.cmor_var.valid_max: valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_max) Utils.nco().ncatted( input=file_path, output=file_path, options=( '-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format( self.final_name, var_type.char, valid_min, valid_max), ) ) def _fix_coordinate_variables_metadata(self, handler): lat_name = self.data_convention.lat_name lon_name = self.data_convention.lon_name if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if self.domain == ModelingRealms.ocean: handler.variables['lev'].standard_name = 'depth' if lon_name in handler.variables: handler.variables[lon_name].short_name = lon_name handler.variables[lon_name].standard_name = 'longitude' if lat_name in handler.variables: handler.variables[lat_name].short_name = lat_name handler.variables[lat_name].standard_name = 'latitude' def _fix_units(self, var_handler): if 'units' not in var_handler.ncattrs(): return if var_handler.units == '-': var_handler.units = '1.0' if var_handler.units == 'PSU': var_handler.units = 'psu' if var_handler.units == 'C' and self.cmor_var.units == 'K': var_handler.units = 'deg_C' if self.cmor_var.units != var_handler.units: self._convert_units(var_handler) var_handler.units = self.cmor_var.units def _convert_units(self, var_handler): try: Utils.convert_units(var_handler, self.cmor_var.units) except ValueError as ex: Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex, self.cmor_var.short_name) factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, self.cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float( var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float( var_handler.valid_max) * factor + offset def _prepare_region(self): if not self.check_is_in_storage(update_status=False): return cube = iris.load_cube(self.local_file) try: cube.coord('region') except iris.exceptions.CoordinateNotFoundError: return try: old_cube = iris.load_cube(self.remote_file) except Exception: try: old_cube = iris.load_cube( self.remote_file.replace('/cmorfiles/', '/diags/') ) except Exception: # Bad data, overwrite return new_data = {} for region_slice in cube.slices_over('region'): Log.debug(region_slice.coord('region').points[0]) new_data[region_slice.coord('region').points[0]] = region_slice for region_slice in old_cube.slices_over('region'): region = region_slice.coord('region').points[0] Log.debug(region) if region not in new_data: new_data[region] = region_slice cube_list = iris.cube.CubeList( [new_data[region] for region in sorted(new_data)] ) if len(cube_list) == 1: return iris.experimental.equalise_cubes.equalise_attributes(cube_list) final_cube = cube_list.merge_cube() temp = TempFile.get() iris.save(final_cube, temp, zlib=True) if '-' in final_cube.var_name: Utils.rename_variable( temp, final_cube.var_name.replace('-', '_'), final_cube.var_name, must_exist=False ) Utils.move_file(temp, self.local_file) self._correct_metadata() def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = self.data_convention.lat_name variables['nav_lon_grid_V'] = self.data_convention.lon_name variables['nav_lat_grid_U'] = self.data_convention.lat_name variables['nav_lon_grid_U'] = self.data_convention.lon_name variables['nav_lat_grid_T'] = self.data_convention.lat_name variables['nav_lon_grid_T'] = self.data_convention.lon_name Utils.rename_variables(self.local_file, variables, False) def add_diagnostic_history(self): """Add the history line corresponding to the diagnostic to the local file""" if not self.diagnostic: return from earthdiagnostics.earthdiags import EarthDiags history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version, self.diagnostic) self._add_history_line(history_line) def add_cmorization_history(self): """Add the history line corresponding to the cmorization to the local file""" from earthdiagnostics.earthdiags import EarthDiags history_line = 'CMORized with Earthdiagnostics version {0}'.format( EarthDiags.version) self._add_history_line(history_line) def _add_history_line(self, history_line): utc_datetime = 'UTC ' + datetime.utcnow().isoformat() history_line = '{0}: {1};'.format(utc_datetime, history_line) handler = Utils.open_cdf(self.local_file) try: history_line = history_line + handler.history except AttributeError: history_line = history_line handler.history = Utils.convert_to_ascii_if_possible(history_line) handler.close() class UnitConversion(object): """ Class to manage unit conversions Parameters ---------- source: str destiny: str factor: float offset: float """ _dict_conversions = None def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def load_conversions(cls): """Load conversions from the configuration file""" cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'r') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion( line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Add a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[( conversion.source, conversion.destiny)] = conversion @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Get the conversion factor and offset for two units. The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None def check_is_in_storage(self): return class NetCDFFile(DataFile): """Implementation of DataFile for netCDF files""" def download(self): """Get data from remote storage to the local one""" for path in (self.remote_file.replace('/cmorfiles/', '/diags/'), self.remote_file): if os.path.isfile(path): try: self.local_status = LocalStatus.DOWNLOADING Log.debug('Downloading file {0}...', path) if not self.local_file: self.local_file = TempFile.get() # Utils.get_file_hash(self.remote_file, use_stored=True, save=True) try: Utils.copy_file(path, self.local_file, retrials=1) except Utils.CopyException: # Utils.get_file_hash(self.remote_file, use_stored=False, save=True) Utils.copy_file(path, self.local_file, retrials=2) if self.data_convention == 'meteofrance': Log.debug('Converting variable names from meteofrance convention') alt_coord_names = { 'time_counter': 'time', 'time_counter_bounds': 'time_bnds', 'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i', 'y': 'j'} Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False) Log.info('File {0} ready!', path) self.local_status = LocalStatus.READY return except Exception as ex: if os.path.isfile(self.local_file): os.remove(self.local_file) Log.error('File {0} not available: {1}', path, ex) self.local_status = LocalStatus.FAILED return Log.error('File {0} not available: {1}', self.remote_file, 'FileNotFound') self.local_status = LocalStatus.FAILED def check_is_in_storage(self, update_status=True): for path in (self.remote_file, self.remote_file.replace('/cmorfiles/', '/diags/')): if os.path.isfile(path): if update_status: self.storage_status = StorageStatus.READY return True return False def create_link(self): """Create a link from the original data in the _ folder""" try: self.data_convention.create_link(self.domain, self.remote_file, self.frequency, self.final_name, self.grid, True, self.var_type) except (ValueError, Exception) as ex: Log.error('Can not create link to {1}: {0}'.format( ex, self.remote_file)) def _get_size(self): try: if self.local_status == LocalStatus.READY: self._size = os.path.getsize(self.local_file) if self.storage_status == StorageStatus.READY: self._size = os.path.getsize(self.remote_file) except OSError: self._size = None