# coding: utf-8 import csv import os import shutil from datetime import datetime import iris import numpy as np from bscearth.utils.log import Log from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils, TempFile from publisher import Publisher from variable_type import VariableType class LocalStatus(object): """Local file status enumeration""" PENDING = 0 DOWNLOADING = 1 READY = 2 FAILED = 3 NOT_REQUESTED = 4 COMPUTING = 5 class StorageStatus(object): """Remote file status enumeration""" PENDING = 0 UPLOADING = 1 READY = 2 FAILED = 3 NO_STORE = 4 class DataFile(Publisher): """ Represent a data file Must be derived for each concrete data file format """ def __init__(self): super(DataFile, self).__init__() self.remote_file = None self.local_file = None self.domain = None self.var = None self.cmor_var = None self.region = None self.frequency = None self.data_convention = None self.diagnostic = None self.grid = None self.data_manager = None self.final_name = None self.var_type = VariableType.MEAN self._local_status = LocalStatus.NOT_REQUESTED self._storage_status = StorageStatus.READY self.job_added = False self._modifiers = [] self._size = None self.lon_name = None self.lat_name = None def __str__(self): return 'Data file for {0}'.format(self.remote_file) @property def size(self): """File size""" if self._size is None: self._get_size() return self._size def _get_size(self): try: if self.local_status == LocalStatus.READY: self._size = os.path.getsize(self.local_file) except OSError: self._size = None def clean_local(self): if self.local_status != LocalStatus.READY or len(self.suscribers) > 0 or self.upload_required() or \ self.storage_status == StorageStatus.UPLOADING: return Log.debug('File {0} no longer needed. Deleting from scratch...'.format(self.remote_file)) os.remove(self.local_file) Log.debug('File {0} deleted from scratch'.format(self.remote_file)) self.local_file = None self.local_status = LocalStatus.PENDING def only_suscriber(self, who): if len(self._subscribers) != 1: return return who in self._subscribers def upload_required(self): return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING def download_required(self): if not self.local_status == LocalStatus.PENDING: return False if self.storage_status == StorageStatus.READY: return True if self.has_modifiers(): return True def add_modifier(self, diagnostic): self._modifiers.append(diagnostic) def has_modifiers(self): return self._modifiers def ready_to_run(self, diagnostic): if not self.local_status == LocalStatus.READY: return False if len(self._modifiers) == 0: return True return self._modifiers[0] is diagnostic @property def local_status(self): return self._local_status @local_status.setter def local_status(self, value): if self._local_status == value: return self._local_status = value self._size = None self.dispatch(self) @property def storage_status(self): return self._storage_status @storage_status.setter def storage_status(self, value): if self._storage_status == value: return self._storage_status = value self._size = None self.dispatch(self) @classmethod def from_storage(cls, filepath, data_convention): file_object = cls() file_object.remote_file = filepath file_object.local_status = LocalStatus.PENDING file_object.data_convention = data_convention return file_object @classmethod def to_storage(cls, remote_file, data_convention): new_object = cls() new_object.remote_file = remote_file new_object.storage_status = StorageStatus.PENDING new_object.data_convention = data_convention return new_object def download(self): raise NotImplementedError('Class must implement the download method') def prepare_to_upload(self, rename_var): if self.data_convention in ('primavera', 'cmip6'): self.lon_name = 'longitude' self.lat_name = 'latitude' else: self.lon_name = 'longitude' self.lat_name = 'latitude' Utils.convert2netcdf4(self.local_file) if rename_var: original_name = rename_var else: original_name = self.var if self.final_name != original_name: Utils.rename_variable(self.local_file, original_name, self.final_name) self._rename_coordinate_variables() self._correct_metadata() self._prepare_region() self.add_diagnostic_history() if self.region is not None: self.upload() def upload(self): self.storage_status = StorageStatus.UPLOADING try: Utils.copy_file(self.local_file, self.remote_file, save_hash=True) except (OSError, Exception) as ex: Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex) self.storage_status = StorageStatus.FAILED return Log.info('File {0} uploaded!', self.remote_file) self.create_link() self.storage_status = StorageStatus.READY def set_local_file(self, local_file, diagnostic=None, rename_var='', region=None): if diagnostic in self._modifiers: self._modifiers.remove(diagnostic) if region is not None: self.region = region.name else: self.region = None self.local_file = local_file self.prepare_to_upload(rename_var) self.local_status = LocalStatus.READY def create_link(self): pass def _correct_metadata(self): handler = Utils.open_cdf(self.local_file) var_handler = handler.variables[self.final_name] coords = set.intersection({'time', 'lev', self.lat_name, self.lon_name}, set(handler.variables.keys())) var_handler.coordinates = ' '.join(coords) if not self.cmor_var: handler.close() return self._fix_variable_name(var_handler) handler.modeling_realm = self.cmor_var.domain.name table = self.cmor_var.get_table(self.frequency, self.data_convention) handler.table_id = 'Table {0} ({1})'.format(table.name, table.date) if self.cmor_var.units: self._fix_units(var_handler) handler.sync() self._fix_coordinate_variables_metadata(handler) var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type) def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name def _fix_values_metadata(self, var_type, file_path=None): if file_path is None: file_path = self.local_file valid_min = '' valid_max = '' if self.cmor_var is not None: if self.cmor_var.valid_min: valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_min) if self.cmor_var.valid_max: valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_max) Utils.nco.ncatted(input=file_path, output=file_path, options=('-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.final_name, var_type.char, valid_min, valid_max),)) def _fix_coordinate_variables_metadata(self, handler): if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if self.domain == ModelingRealms.ocean: handler.variables['lev'].standard_name = 'depth' if self.lon_name in handler.variables: handler.variables[self.lon_name].short_name = self.lon_name handler.variables[self.lon_name].standard_name = 'longitude' if self.lat_name in handler.variables: handler.variables[self.lat_name].short_name = self.lat_name handler.variables[self.lat_name].standard_name = 'latitude' def _fix_units(self, var_handler): if 'units' not in var_handler.ncattrs(): return if var_handler.units == '-': var_handler.units = '1.0' if var_handler.units == 'PSU': var_handler.units = 'psu' if var_handler.units == 'C' and self.cmor_var.units == 'K': var_handler.units = 'deg_C' if self.cmor_var.units != var_handler.units: self._convert_units(var_handler) var_handler.units = self.cmor_var.units def _convert_units(self, var_handler): try: Utils.convert_units(var_handler, self.cmor_var.units) except ValueError as ex: Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex, self.cmor_var.short_name) factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, self.cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float(var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float(var_handler.valid_max) * factor + offset def _prepare_region(self): if not self.region: return if not os.path.exists(self.remote_file): self._add_region_dimension_to_var() else: self._update_var_with_region_data() self._correct_metadata() Utils.nco.ncks(input=self.local_file, output=self.local_file, options=['--fix_rec_dmn region']) handler = Utils.open_cdf(self.local_file) regions = handler.variables['region'][...].tolist() if len(regions) > 1: ordered_regions = sorted(regions) print(regions) print(ordered_regions) new_indexes = [regions.index(region) for region in ordered_regions] print(new_indexes) for var in handler.variables.values(): if 'region' not in var.dimensions: continue index_region = var.dimensions.index('region') var_values = var[...] var_ordered = np.take(var_values, new_indexes, index_region) var[...] = var_ordered handler.close() def _update_var_with_region_data(self): temp = TempFile.get() shutil.copyfile(self.remote_file, temp) handler = Utils.open_cdf(temp) var_handler = handler.variables[self.final_name] var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type, temp) Utils.nco.ncks(input=temp, output=temp, options=['--mk_rec_dmn region']) handler = Utils.open_cdf(temp) var_handler = handler.variables[self.final_name] if hasattr(var_handler, 'valid_min'): del var_handler.valid_min if hasattr(var_handler, 'valid_max'): del var_handler.valid_max handler.sync() cubes = iris.load(self.local_file) for cube in cubes: if self.final_name == cube.var_name: value = cube.data break if isinstance(value, np.ma.MaskedArray): value = np.ma.getdata(value) var_region = handler.variables['region'] basin_index = np.where(var_region[:] == self.region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = self.region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[self.final_name][..., basin_index] = np.multiply(np.ones(value.shape), value) handler.close() Utils.move_file(temp, self.local_file) def _add_region_dimension_to_var(self): handler = Utils.open_cdf(self.local_file) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = self.region original_var = handler.variables[self.final_name] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('-x -v {0}'.format(self.final_name),)) Utils.rename_variable(self.local_file, 'new_var', self.final_name) def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = self.lat_name variables['nav_lon_grid_V'] = self.lon_name variables['nav_lat_grid_U'] = self.lat_name variables['nav_lon_grid_U'] = self.lon_name variables['nav_lat_grid_T'] = self.lat_name variables['nav_lon_grid_T'] = self.lon_name Utils.rename_variables(self.local_file, variables, False, True) def add_diagnostic_history(self): if not self.diagnostic: return from earthdiagnostics.earthdiags import EarthDiags history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version, self.diagnostic) self._add_history_line(history_line) def add_cmorization_history(self): from earthdiagnostics.earthdiags import EarthDiags history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version) self._add_history_line(history_line) def _add_history_line(self, history_line): utc_datetime = 'UTC ' + datetime.utcnow().isoformat() history_line = '{0}: {1};'.format(utc_datetime, history_line) handler = Utils.open_cdf(self.local_file) try: history_line = history_line + handler.history except AttributeError: history_line = history_line handler.history = Utils.convert_to_ascii_if_possible(history_line) handler.close() class UnitConversion(object): """Class to manage unit conversions""" _dict_conversions = None @classmethod def load_conversions(cls): """Load conversions from the configuration file""" cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Add a conversion to the dictionary Parameters ---------- conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Get the conversion factor and offset for two units. The conversion has to be done in the following way: converted = original * factor + offset Parameters ---------- input_units: str output_units = str Returns ------- float Factor float Offset """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 if (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset if (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset return None, None class NetCDFFile(DataFile): def download(self): try: self.local_status = LocalStatus.DOWNLOADING Log.debug('Downloading file {0}...', self.remote_file) if not self.local_file: self.local_file = TempFile.get() Utils.get_file_hash(self.remote_file, use_stored=True, save=True) Utils.copy_file(self.remote_file, self.local_file) if self.data_convention == 'meteofrance': Log.debug('Converting variable names from meteofrance convention') alt_coord_names = {'time_counter': 'time', 'time_counter_bounds': 'time_bnds', 'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i', 'y': 'j'} Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False, rename_dimension=True) Log.info('File {0} ready!', self.remote_file) self.local_status = LocalStatus.READY except Exception as ex: if os.path.isfile(self.local_file): os.remove(self.local_file) Log.error('File {0} not available: {1}', self.remote_file, ex) self.local_status = LocalStatus.FAILED def create_link(self): try: self.data_manager.create_link(self.domain, self.remote_file, self.frequency, self.final_name, self.grid, True, self.var_type) except (ValueError, Exception) as ex: Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file)) def _get_size(self): try: if self.local_status == LocalStatus.READY: self._size = os.path.getsize(self.local_file) if self.storage_status == StorageStatus.READY: self._size = os.path.getsize(self.remote_file) except OSError: self._size = None