# coding: utf-8 import csv import shutil from datetime import datetime import numpy as np import os from bscearth.utils.log import Log from earthdiagnostics.utils import Utils, TempFile from publisher import Publisher from earthdiagnostics.modelingrealm import ModelingRealms class LocalStatus(object): PENDING = 0 DOWNLOADING = 1 READY = 2 FAILED = 3 TO_COMPUTE = 4 class StorageStatus(object): PENDING = 0 UPLOADING = 1 READY = 2 FAILED = 3 class DataFile(Publisher): def __init__(self): super(DataFile, self).__init__() self.remote_file = None self.local_file = None self.domain = None self.var = None self.cmor_var = None self.region = None self.frequency = None self.data_convention = None self._local_status = LocalStatus.PENDING self._storage_status = StorageStatus.READY @property def local_status(self): return self._local_status @local_status.setter def local_status(self, value): self._local_status = value self.dispatch(self, value) @classmethod def from_storage(cls, filepath): file_object = cls() file_object.remote_file = filepath file_object.local_status = LocalStatus.PENDING return file_object @classmethod def to_storage(cls, domain, var, cmor_var, data_convention, region): new_object = cls() new_object.domain = domain new_object.var = var new_object.cmor_var = cmor_var new_object.region = region new_object.frequency = None new_object.data_convention = data_convention self.storage_status = StorageStatus.PENDING return new_object def download(self): raise NotImplementedError() def upload(self): self.storage_status = StorageStatus.UPLOADING Utils.convert2netcdf4(self.local_file) self._correct_metadata() self._prepare_region() self._rename_coordinate_variables() Utils.move_file(self.local_file, self.remote_file) self.storage_status = StorageStatus.READY def _correct_metadata(self): if not self.cmor_var: return handler = Utils.openCdf(self.local_file) var_handler = handler.variables[self.var] self._fix_variable_name(var_handler) handler.modeling_realm = self.cmor_var.domain.name table = self.cmor_var.get_table(self.frequency, self.data_convention) handler.table_id = 'Table {0} ({1})'.format(table.name, table.date) if self.cmor_var.units: self._fix_units(var_handler) handler.sync() self._fix_coordinate_variables_metadata(handler) var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type) def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name var_handler.short_name = self.cmor_var.short_name def _fix_values_metadata(self, var_type): if self.cmor_var.valid_min != '': valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min) else: valid_min = '' if self.cmor_var.valid_max != '': valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max) else: valid_max = '' Utils.nco.ncatted(input=self.local_file, output=self.local_file, options='-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.var, var_type.char, valid_min, valid_max)) def _fix_coordinate_variables_metadata(self, handler): if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if self.domain == ModelingRealms.ocean: handler.variables['lev'].standard_name = 'depth' if 'lon' in handler.variables: handler.variables['lon'].short_name = 'lon' handler.variables['lon'].standard_name = 'longitude' if 'lat' in handler.variables: handler.variables['lat'].short_name = 'lat' handler.variables['lat'].standard_name = 'latitude' def _fix_units(self, var_handler): if 'units' not in var_handler.ncattrs(): return if var_handler.units == '-': var_handler.units = '1.0' if var_handler.units == 'PSU': var_handler.units = 'psu' if var_handler.units == 'C' and self.cmor_var.units == 'K': var_handler.units = 'deg_C' if self.cmor_var.units != var_handler.units: self._convert_units(var_handler) var_handler.units = self.cmor_var.units def _convert_units(self, var_handler): try: Utils.convert_units(var_handler, self.cmor_var.units) except ValueError as ex: Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex, self.cmor_var.short_name) factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, self.cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float(var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float(var_handler.valid_max) * factor + offset def _prepare_region(self): if not self.region: return if not os.path.exists(self.remote_file): self._add_region_dimension_to_var() else: self._update_var_with_region_data() self._correct_metadata() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O --fix_rec_dmn region') def _update_var_with_region_data(self): temp = TempFile.get() shutil.copyfile(self.remote_file, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(self.local_file) value = handler_send.variables[self.var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == self.region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = self.region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[self.var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, self.local_file) def _add_region_dimension_to_var(self): handler = Utils.openCdf(self.local_file) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = self.region original_var = handler.variables[self.var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.var)) Utils.rename_variable(self.local_file, 'new_var', self.var) def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = 'lat' variables['nav_lon_grid_V'] = 'lon' variables['nav_lat_grid_U'] = 'lat' variables['nav_lon_grid_U'] = 'lon' variables['nav_lat_grid_T'] = 'lat' variables['nav_lon_grid_T'] = 'lon' Utils.rename_variables(self.local_file, variables, False, True) def add_diagnostic_history(self, diagnostic): from earthdiagnostics.earthdiags import EarthDiags history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version, diagnostic) self._add_history_line(history_line) def add_cmorization_history(self): from earthdiagnostics.earthdiags import EarthDiags history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version) self._add_history_line(history_line) def _add_history_line(self, history_line): utc_datetime = 'UTC ' + datetime.utcnow().isoformat() history_line = '{0}: {1};'.format(utc_datetime, history_line) handler = Utils.openCdf(self.local_file) try: history_line = history_line + handler.history except AttributeError: history_line = history_line handler.history = Utils.convert_to_ASCII_if_possible(history_line) handler.close() class UnitConversion(object): """ Class to manage unit conversions """ _dict_conversions = None @classmethod def load_conversions(cls): """ Load conversions from the configuration file """ cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Adds a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Gets the conversion factor and offset for two units . The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None class THREDDSData(DataFile): def download(self): self.local_path = TempFile.get() Utils.execute_shell_command(['nccopy', '-s', '-d', '-4', self.remote_file, self.local_path]) if not Utils.check_netcdf_file(self.local_path): self.message = 'Can not retrieve {0} from server'.format(self.remote_file) self.storage_status = LocalStatus.FAILED return self.status = LocalStatus.READY class NetCDFFile(DataFile): def download(self): try: self.local_status = LocalStatus.DOWNLOADING if not self.local_file: self.local_file = TempFile.get() Utils.copy_file(self.remote_file, self.local_file) Log.info('File {0} ready!', self.remote_file) self.local_status = LocalStatus.READY except Exception as ex: os.remove(self.local_file) Log.error('File {0} not available: {1}', self.remote_file, ex) self.local_status = LocalStatus.FAILED