# coding: utf-8 """Module for classes to manage storage manipulation""" import csv import os import shutil from datetime import datetime import six import iris import iris.coords import iris.exceptions import numpy as np from bscearth.utils.log import Log from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.publisher import Publisher from earthdiagnostics.variable import VariableType class LocalStatus(object): """Local file status enumeration""" PENDING = 0 DOWNLOADING = 1 READY = 2 FAILED = 3 NOT_REQUESTED = 4 COMPUTING = 5 class StorageStatus(object): """Remote file status enumeration""" PENDING = 0 UPLOADING = 1 READY = 2 FAILED = 3 NO_STORE = 4 class DataFile(Publisher): """ Represent a data file Must be derived for each concrete data file format """ def __init__(self): super(DataFile, self).__init__() self.remote_file = None self.local_file = None self.domain = None self.var = None self.cmor_var = None self.region = None self.frequency = None self.data_convention = None self.diagnostic = None self.grid = None self.data_manager = None self.final_name = None self.var_type = VariableType.MEAN self._local_status = LocalStatus.NOT_REQUESTED self._storage_status = StorageStatus.READY self.job_added = False self._modifiers = [] self._size = None self.lon_name = None self.lat_name = None def __str__(self): return 'Data file for {0}'.format(self.remote_file) @property def size(self): """File size""" if self._size is None: self._get_size() return self._size def _get_size(self): try: if self.local_status == LocalStatus.READY: self._size = os.path.getsize(self.local_file) except OSError: self._size = None def clean_local(self): """Check if a local file is still needed and remove it if not""" if self.local_status != LocalStatus.READY or self.suscribers or self.upload_required() or \ self.storage_status == StorageStatus.UPLOADING: return Log.debug('File {0} no longer needed. Deleting from scratch...'.format(self.remote_file)) os.remove(self.local_file) Log.debug('File {0} deleted from scratch'.format(self.remote_file)) self.local_file = None self.local_status = LocalStatus.PENDING def upload_required(self): """ Get if an upload is needed for this file Returns ------- bool """ return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING def download_required(self): """ Get if a download is required for this file Returns ------- bool """ if not self.local_status == LocalStatus.PENDING: return False if self.storage_status == StorageStatus.READY: return True if self.has_modifiers(): return True return False def add_modifier(self, diagnostic): """ Register a diagnostic as a modifier of this data A modifier diagnostic is a diagnostic that read this data and changes it in any way. The diagnostic must be a modifier even if it only affects the metadata Parameters ---------- diagnostic: Diagnostic """ self._modifiers.append(diagnostic) def has_modifiers(self): """ Check if it has registered modifiers Returns ------- bool """ return bool(self._modifiers) def ready_to_run(self, diagnostic): """ Check if the data is ready to run for a given diagnostics To be ready to run, the datafile should be in the local storage and no modifiers can be pending. Parameters ---------- diagnostic: Diagnostic Returns ------- bool """ if not self.local_status == LocalStatus.READY: return False if not self._modifiers: return True return self._modifiers[0] is diagnostic @property def local_status(self): """Get local storage status""" return self._local_status @local_status.setter def local_status(self, value): if self._local_status == value: return self._local_status = value self._size = None self.dispatch(self) @property def storage_status(self): """Get remote storage status""" return self._storage_status @storage_status.setter def storage_status(self, value): if self._storage_status == value: return self._storage_status = value self._size = None self.dispatch(self) @classmethod def from_storage(cls, filepath, data_convention): """Create a new datafile to be downloaded from the storage""" file_object = cls() file_object.remote_file = filepath file_object.local_status = LocalStatus.PENDING file_object.data_convention = data_convention return file_object @classmethod def to_storage(cls, remote_file, data_convention): """Create a new datafile object for a file that is going to be generated and stored""" new_object = cls() new_object.remote_file = remote_file new_object.storage_status = StorageStatus.PENDING new_object.data_convention = data_convention return new_object def download(self): """ Get data from remote storage to the local one Must be overriden by the derived classes Raises ------ NotImplementedError If the derived classes do not override this """ raise NotImplementedError('Class must implement the download method') def prepare_to_upload(self, rename_var): """ Prepare a local file to be uploaded This includes renaming the variable if necessary, updating the metadata and adding the history and managing the possibility of multiple regions """ if self.data_convention in ('primavera', 'cmip6'): self.lon_name = 'longitude' self.lat_name = 'latitude' else: self.lon_name = 'lon' self.lat_name = 'lat' Utils.convert2netcdf4(self.local_file) if rename_var: original_name = rename_var else: original_name = self.var if self.final_name != original_name: Utils.rename_variable(self.local_file, original_name, self.final_name) self._rename_coordinate_variables() self._correct_metadata() self._prepare_region() self.add_diagnostic_history() if self.region is not None: self.upload() def upload(self): """Send a loal file to the storage""" self.storage_status = StorageStatus.UPLOADING try: Utils.copy_file(self.local_file, self.remote_file, save_hash=True) except (OSError, Exception) as ex: Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex) self.storage_status = StorageStatus.FAILED return Log.info('File {0} uploaded!', self.remote_file) self.create_link() self.storage_status = StorageStatus.READY def set_local_file(self, local_file, diagnostic=None, rename_var='', region=None): """ Set the local file generated by EarthDiagnostics This also prepares it for the upload Parameters ---------- local_file: str diagnostic: Diagnostic or None rename_var: str region: Basin or None Returns ------- None """ if diagnostic in self._modifiers: self._modifiers.remove(diagnostic) if region is not None: self.region = region else: self.region = None self.local_file = local_file self.prepare_to_upload(rename_var) self.local_status = LocalStatus.READY def create_link(self): """Create a link from the original data in the _ folder""" pass def _correct_metadata(self): handler = Utils.open_cdf(self.local_file) var_handler = handler.variables[self.final_name] coords = set.intersection({'time', 'lev', self.lat_name, self.lon_name, 'leadtime', 'region', 'time_centered'}, set(handler.variables.keys())) var_handler.coordinates = Utils.convert_to_ascii_if_possible(' '.join(coords)) if 'time_centered' in handler.variables: if hasattr(handler.variables['time_centered'], 'standard_name'): del handler.variables['time_centered'].standard_name if not self.cmor_var: handler.close() return self._fix_variable_name(var_handler) handler.modeling_realm = self.cmor_var.domain.name table = self.cmor_var.get_table(self.frequency, self.data_convention) handler.table_id = 'Table {0} ({1})'.format(table.name, table.date) if self.cmor_var.units: self._fix_units(var_handler) handler.sync() self._fix_coordinate_variables_metadata(handler) var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type) def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name def _fix_values_metadata(self, var_type, file_path=None): if file_path is None: file_path = self.local_file valid_min = '' valid_max = '' if self.cmor_var is not None: if self.cmor_var.valid_min: valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_min) if self.cmor_var.valid_max: valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_max) Utils.nco.ncatted(input=file_path, output=file_path, options=('-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.final_name, var_type.char, valid_min, valid_max),)) def _fix_coordinate_variables_metadata(self, handler): if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if self.domain == ModelingRealms.ocean: handler.variables['lev'].standard_name = 'depth' if self.lon_name in handler.variables: handler.variables[self.lon_name].short_name = self.lon_name handler.variables[self.lon_name].standard_name = 'longitude' if self.lat_name in handler.variables: handler.variables[self.lat_name].short_name = self.lat_name handler.variables[self.lat_name].standard_name = 'latitude' def _fix_units(self, var_handler): if 'units' not in var_handler.ncattrs(): return if var_handler.units == '-': var_handler.units = '1.0' if var_handler.units == 'PSU': var_handler.units = 'psu' if var_handler.units == 'C' and self.cmor_var.units == 'K': var_handler.units = 'deg_C' if self.cmor_var.units != var_handler.units: self._convert_units(var_handler) var_handler.units = self.cmor_var.units def _convert_units(self, var_handler): try: Utils.convert_units(var_handler, self.cmor_var.units) except ValueError as ex: Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex, self.cmor_var.short_name) factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, self.cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float(var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float(var_handler.valid_max) * factor + offset def _prepare_region(self): if not self.region: return if not os.path.exists(self.remote_file): self._add_region_dimension_to_var() else: self._update_var_with_region_data() self._correct_metadata() Utils.nco.ncks(input=self.local_file, output=self.local_file, options=['--fix_rec_dmn region']) handler = Utils.open_cdf(self.local_file) regions = handler.variables['region'][...].tolist() if len(regions) > 1: ordered_regions = sorted(regions) new_indexes = [regions.index(region) for region in ordered_regions] for var in handler.variables.values(): if 'region' not in var.dimensions: continue index_region = var.dimensions.index('region') var_values = var[...] var_ordered = np.take(var_values, new_indexes, index_region) var[...] = var_ordered handler.close() def _update_var_with_region_data(self): temp = TempFile.get() shutil.copyfile(self.remote_file, temp) handler = Utils.open_cdf(temp) var_handler = handler.variables[self.final_name] var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type, temp) Utils.nco.ncks(input=temp, output=temp, options=['--mk_rec_dmn region']) cubes = iris.load(self.local_file) for cube in cubes: if self.final_name == cube.var_name: value = cube break for index_region, region in enumerate(value.coord('region').points): handler = Utils.open_cdf(temp) region_slice = value.data[index_region, ...] original_regions = handler.variables['region'][...] var = handler.variables[self.final_name] if region in original_regions: indices = list() region_index = np.where(original_regions == region)[0][0] for dim in var.dimensions: if dim == 'region': indices.append(region_index) else: indices.append(slice(None)) var[indices] = region_slice else: var[original_regions.shape[0], ...] = region_slice handler.variables[-1] = region handler.close() # handler.close() Utils.move_file(temp, self.local_file) def _add_region_dimension_to_var(self): handler = Utils.open_cdf(self.local_file) if 'region' in handler.variables: handler.close() return handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = self.region original_var = handler.variables[self.final_name] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('-x -v {0}'.format(self.final_name),)) Utils.rename_variable(self.local_file, 'new_var', self.final_name) def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = self.lat_name variables['nav_lon_grid_V'] = self.lon_name variables['nav_lat_grid_U'] = self.lat_name variables['nav_lon_grid_U'] = self.lon_name variables['nav_lat_grid_T'] = self.lat_name variables['nav_lon_grid_T'] = self.lon_name Utils.rename_variables(self.local_file, variables, False) def add_diagnostic_history(self): """Add the history line corresponding to the diagnostic to the local file""" if not self.diagnostic: return from earthdiagnostics.earthdiags import EarthDiags history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version, self.diagnostic) self._add_history_line(history_line) def add_cmorization_history(self): """Add the history line corresponding to the cmorization to the local file""" from earthdiagnostics.earthdiags import EarthDiags history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version) self._add_history_line(history_line) def _add_history_line(self, history_line): utc_datetime = 'UTC ' + datetime.utcnow().isoformat() history_line = '{0}: {1};'.format(utc_datetime, history_line) handler = Utils.open_cdf(self.local_file) try: history_line = history_line + handler.history except AttributeError: history_line = history_line handler.history = Utils.convert_to_ascii_if_possible(history_line) handler.close() class UnitConversion(object): """ Class to manage unit conversions Parameters ---------- source: str destiny: str factor: float offset: float """ _dict_conversions = None def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def load_conversions(cls): """Load conversions from the configuration file""" cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'r') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Add a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Get the conversion factor and offset for two units. The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None def check_is_in_storage(self): return class NetCDFFile(DataFile): """Implementation of DataFile for netCDF files""" def download(self): """Get data from remote storage to the local one""" try: self.local_status = LocalStatus.DOWNLOADING Log.debug('Downloading file {0}...', self.remote_file) if not self.local_file: self.local_file = TempFile.get() Utils.get_file_hash(self.remote_file, use_stored=True, save=True) try: Utils.copy_file(self.remote_file, self.local_file, retrials=1) except Utils.CopyException: Utils.get_file_hash(self.remote_file, use_stored=False, save=True) Utils.copy_file(self.remote_file, self.local_file, retrials=2) if self.data_convention == 'meteofrance': Log.debug('Converting variable names from meteofrance convention') alt_coord_names = {'time_counter': 'time', 'time_counter_bounds': 'time_bnds', 'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i', 'y': 'j'} Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False) Log.info('File {0} ready!', self.remote_file) self.local_status = LocalStatus.READY except Exception as ex: if os.path.isfile(self.local_file): os.remove(self.local_file) Log.error('File {0} not available: {1}', self.remote_file, ex) self.local_status = LocalStatus.FAILED def check_is_in_storage(self): if os.path.isfile(self.remote_file): if self.region: try: cubes = iris.load(self.remote_file) for cube in cubes: try: if isinstance(self.region, six.string_types): regions = {self.region.name} else: regions = {basin.name for basin in self.region} if regions.issubset(set(cube.coord('region').points)): self.storage_status = StorageStatus.READY except iris.exceptions.CoordinateNotFoundError: pass except iris.exceptions.TranslationError as ex: # If the check goes wrong, we must execute everything os.remove(self.remote_file) except Exception as ex: Log.debug('Exception when checking file {0}: {1}', self.remote_file, ex) else: self.storage_status = StorageStatus.READY def create_link(self): """Create a link from the original data in the _ folder""" try: self.data_manager.create_link(self.domain, self.remote_file, self.frequency, self.final_name, self.grid, True, self.var_type) except (ValueError, Exception) as ex: Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file)) def _get_size(self): try: if self.local_status == LocalStatus.READY: self._size = os.path.getsize(self.local_file) if self.storage_status == StorageStatus.READY: self._size = os.path.getsize(self.remote_file) except OSError: self._size = None