# coding: utf-8 import csv import shutil from datetime import datetime import numpy as np import os from bscearth.utils.log import Log from earthdiagnostics.utils import Utils, TempFile from publisher import Publisher from earthdiagnostics.modelingrealm import ModelingRealms from variable_type import VariableType class LocalStatus(object): PENDING = 0 DOWNLOADING = 1 READY = 2 FAILED = 3 NOT_REQUESTED = 4 COMPUTING = 5 class StorageStatus(object): PENDING = 0 UPLOADING = 1 READY = 2 FAILED = 3 NO_STORE = 4 class DataFile(Publisher): def __init__(self): super(DataFile, self).__init__() self.remote_file = None self.local_file = None self.domain = None self.var = None self.cmor_var = None self.region = None self.frequency = None self.data_convention = None self.diagnostic = None self.grid = None self.data_manager = None self.final_name = None self.var_type = VariableType.MEAN self._local_status = LocalStatus.NOT_REQUESTED self._storage_status = StorageStatus.READY self.job_added = False self._modifiers = [] def __str__(self): return 'Data file for {0}'.format(self.remote_file) def unsubscribe(self, who): super(DataFile, self).unsubscribe(who) if self.local_status == LocalStatus.READY and len(self.subscribers) == 0: os.remove(self.local_file) def upload_required(self): return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING def download_required(self): if not self.local_status == LocalStatus.PENDING: return False if self.storage_status == StorageStatus.READY: return True if self.has_modifiers(): return True def add_modifier(self, diagnostic): self._modifiers.append(diagnostic) def has_modifiers(self): return len(self._modifiers) > 0 def ready_to_run(self, diagnostic): if not self.local_status == LocalStatus.READY: return False if len(self._modifiers) == 0: return True return self._modifiers[0] is diagnostic @property def local_status(self): return self._local_status @local_status.setter def local_status(self, value): if self._local_status == value: return self._local_status = value self.dispatch(self) @property def storage_status(self): return self._storage_status @storage_status.setter def storage_status(self, value): if self._storage_status == value: return self._storage_status = value self.dispatch(self) @classmethod def from_storage(cls, filepath): file_object = cls() file_object.remote_file = filepath file_object.local_status = LocalStatus.PENDING return file_object @classmethod def to_storage(cls, remote_file): new_object = cls() new_object.remote_file = remote_file new_object.storage_status = StorageStatus.PENDING return new_object def download(self): raise NotImplementedError() def prepare_to_upload(self, rename_var): Utils.convert2netcdf4(self.local_file) if rename_var: original_name = rename_var else: original_name = self.var if self.final_name != original_name: Utils.rename_variable(self.local_file, original_name, self.final_name) self._correct_metadata() self._prepare_region() self._rename_coordinate_variables() self.add_diagnostic_history() def upload(self): self.storage_status = StorageStatus.UPLOADING try: Utils.copy_file(self.local_file, self.remote_file) except Exception as ex: Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex) self.storage_status = StorageStatus.FAILED return Log.info('File {0} uploaded!', self.remote_file) try: self.create_link() except Exception as ex: Log.warning('Link for file {0} can not be created: {1}', self.remote_file, ex) self.storage_status = StorageStatus.READY def set_local_file(self, local_file, diagnostic=None, rename_var=''): if diagnostic in self._modifiers: self._modifiers.remove(diagnostic) self.local_file = local_file self.prepare_to_upload(rename_var) self.local_status = LocalStatus.READY def create_link(self): pass def _correct_metadata(self): if not self.cmor_var: return handler = Utils.openCdf(self.local_file) var_handler = handler.variables[self.final_name] self._fix_variable_name(var_handler) handler.modeling_realm = self.cmor_var.domain.name table = self.cmor_var.get_table(self.frequency, self.data_convention) handler.table_id = 'Table {0} ({1})'.format(table.name, table.date) if self.cmor_var.units: self._fix_units(var_handler) handler.sync() self._fix_coordinate_variables_metadata(handler) var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type) def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name # var_handler.short_name = self.cmor_var.short_name def _fix_values_metadata(self, var_type): if self.cmor_var.valid_min != '': valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_min) else: valid_min = '' if self.cmor_var.valid_max != '': valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_max) else: valid_max = '' Utils.nco.ncatted(input=self.local_file, output=self.local_file, options='-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.final_name, var_type.char, valid_min, valid_max)) def _fix_coordinate_variables_metadata(self, handler): if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if self.domain == ModelingRealms.ocean: handler.variables['lev'].standard_name = 'depth' if 'lon' in handler.variables: handler.variables['lon'].short_name = 'lon' handler.variables['lon'].standard_name = 'longitude' if 'lat' in handler.variables: handler.variables['lat'].short_name = 'lat' handler.variables['lat'].standard_name = 'latitude' def _fix_units(self, var_handler): if 'units' not in var_handler.ncattrs(): return if var_handler.units == '-': var_handler.units = '1.0' if var_handler.units == 'PSU': var_handler.units = 'psu' if var_handler.units == 'C' and self.cmor_var.units == 'K': var_handler.units = 'deg_C' if self.cmor_var.units != var_handler.units: self._convert_units(var_handler) var_handler.units = self.cmor_var.units def _convert_units(self, var_handler): try: Utils.convert_units(var_handler, self.cmor_var.units) except ValueError as ex: Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex, self.cmor_var.short_name) factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, self.cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float(var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float(var_handler.valid_max) * factor + offset def _prepare_region(self): if not self.region: return if not os.path.exists(self.remote_file): self._add_region_dimension_to_var() else: self._update_var_with_region_data() self._correct_metadata() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O --fix_rec_dmn region') def _update_var_with_region_data(self): temp = TempFile.get() shutil.copyfile(self.remote_file, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(self.local_file) value = handler_send.variables[self.final_name][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == self.region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = self.region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[self.final_name][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, self.local_file) def _add_region_dimension_to_var(self): handler = Utils.openCdf(self.local_file) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = self.region original_var = handler.variables[self.final_name] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.final_name)) Utils.rename_variable(self.local_file, 'new_var', self.final_name) def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = 'lat' variables['nav_lon_grid_V'] = 'lon' variables['nav_lat_grid_U'] = 'lat' variables['nav_lon_grid_U'] = 'lon' variables['nav_lat_grid_T'] = 'lat' variables['nav_lon_grid_T'] = 'lon' Utils.rename_variables(self.local_file, variables, False, True) def add_diagnostic_history(self): if not self.diagnostic: return from earthdiagnostics.earthdiags import EarthDiags history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version, self.diagnostic) self._add_history_line(history_line) def add_cmorization_history(self): from earthdiagnostics.earthdiags import EarthDiags history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version) self._add_history_line(history_line) def _add_history_line(self, history_line): utc_datetime = 'UTC ' + datetime.utcnow().isoformat() history_line = '{0}: {1};'.format(utc_datetime, history_line) handler = Utils.openCdf(self.local_file) try: history_line = history_line + handler.history except AttributeError: history_line = history_line handler.history = Utils.convert_to_ASCII_if_possible(history_line) handler.close() class UnitConversion(object): """ Class to manage unit conversions """ _dict_conversions = None @classmethod def load_conversions(cls): """ Load conversions from the configuration file """ cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Adds a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Gets the conversion factor and offset for two units . The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None class THREDDSData(DataFile): def download(self): self.local_path = TempFile.get() Utils.execute_shell_command(['nccopy', '-s', '-d', '-4', self.remote_file, self.local_path]) if not Utils.check_netcdf_file(self.local_path): self.message = 'Can not retrieve {0} from server'.format(self.remote_file) self.storage_status = LocalStatus.FAILED return self.status = LocalStatus.READY class NetCDFFile(DataFile): def download(self): try: self.local_status = LocalStatus.DOWNLOADING if not self.local_file: self.local_file = TempFile.get() Utils.copy_file(self.remote_file, self.local_file) Log.info('File {0} ready!', self.remote_file) self.local_status = LocalStatus.READY except Exception as ex: if os.path.isfile(self.local_file): os.remove(self.local_file) Log.error('File {0} not available: {1}', self.remote_file, ex) self.local_status = LocalStatus.FAILED def create_link(self): try: self.data_manager._create_link(self.domain, self.remote_file, self.frequency, self.final_name, self.grid, False, self.var_type) except Exception as ex: Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file))