# coding: utf-8 import csv import shutil import threading from datetime import datetime import numpy as np import os import re from bscearth.utils.log import Log from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.variable import VariableManager from earthdiagnostics.variable_type import VariableType from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.datafile import NetCDFFile as NCfile, StorageStatus, LocalStatus class DataManager(object): """ Class to manage the data repositories :param config: :type config: Config """ def __init__(self, config): self.config = config self.experiment = config.experiment self._checked_vars = list() self.variable_list = VariableManager() self.variable_list.load_variables(self.config.data_convention) UnitConversion.load_conversions() self.lock = threading.Lock() self.requested_files = {} def _get_file_from_storage(self, filepath): if filepath not in self.requested_files: self.requested_files[filepath] = NCfile.from_storage(filepath) file_object = self.requested_files[filepath] file_object.local_satatus = LocalStatus.PENDING return self.requested_files[filepath] def _declare_generated_file(self, remote_file, domain, final_var, cmor_var, data_convention, region, diagnostic, grid, var_type, original_var): if remote_file not in self.requested_files: self.requested_files[remote_file] = NCfile.to_storage(remote_file) file_object = self.requested_files[remote_file] file_object.diagnostic = diagnostic file_object.var_type = var_type file_object.grid = grid file_object.data_manager = self file_object.domain = domain file_object.var = original_var file_object.final_name = final_var file_object.cmor_var = cmor_var file_object.region = region file_object.data_convention = data_convention file_object.storage_status = StorageStatus.PENDING return file_object @staticmethod def _get_final_var_name(box, var): if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() return var def get_varfolder(self, domain, var, grid=None): if grid: var = '{0}-{1}'.format(var, grid) if domain in [ModelingRealms.ocean, ModelingRealms.seaIce]: return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep) else: return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep) def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype): freq_str = frequency.folder_name(vartype) if not grid: grid = 'original' variable_folder = self.get_varfolder(domain, var) vargrid_folder = self.get_varfolder(domain, var, grid) self.lock.acquire() try: if grid == 'original': link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) if os.path.islink(link_path): link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) else: link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder.replace('-{0}_f'.format(grid), '-original_f')) if os.path.islink(default_path): os.remove(default_path) elif os.path.isdir(default_path): shutil.move(default_path, original_path) os.symlink(link_path, default_path) if move_old and link_path not in self._checked_vars: self._checked_vars.append(link_path) old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, 'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep)) regex = re.compile(var + '_[0-9]{6,8}\.nc') for filename in os.listdir(link_path): if regex.match(filename): Utils.create_folder_tree(old_path) Utils.move_file(os.path.join(link_path, filename), os.path.join(old_path, filename)) link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) if not os.path.exists(filepath): raise ValueError('Original file {0} does not exists'.format(filepath)) if not os.path.isdir(os.path.dirname(link_path)): Utils.create_folder_tree(os.path.dirname(link_path)) relative_path = os.path.relpath(filepath, os.path.dirname(link_path)) os.symlink(relative_path, link_path) except: raise finally: self.lock.release() # Overridable methods (not mandatory) def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None, frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN): """ Creates the link of a given file from the CMOR repository. :param cmor_var: :param move_old: :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ pass def prepare(self): """ Prepares the data to be used by the diagnostic. :return: """ pass class NetCDFFile(object): """ Class to manage netCDF file and pr """ def __init__(self): self.remote_file = None self.local_file = None self.domain = None self.var = None self.cmor_var = None self.region = None self.frequency = None self.data_convention = None @staticmethod def from_storage(filepath): file_object = NetCDFFile() file_object.remote_file = filepath return file_object @staticmethod def to_storage(domain, var, cmor_var, data_convention, region): new_object = NetCDFFile() new_object.domain = domain new_object.var = var new_object.cmor_var = cmor_var new_object.region = region new_object.frequency = None new_object.data_convention = data_convention return new_object def download(self): try: if not self.local_file: self.local_file = TempFile.get() Utils.copy_file(self.remote_file, self.local_file) Log.info('File {0} ready!', self.remote_file) except Exception as ex: os.remove(self.local_file) Log.error('File {0} not available: {1}', self.remote_file, ex) def send(self): Utils.convert2netcdf4(self.local_file) self._correct_metadata() self._prepare_region() self._rename_coordinate_variables() Utils.move_file(self.local_file, self.remote_file) def _prepare_region(self): if not self.region: return if not os.path.exists(self.remote_file): self._add_region_dimension_to_var() else: self._update_var_with_region_data() self._correct_metadata() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O --fix_rec_dmn region') def _update_var_with_region_data(self): temp = TempFile.get() shutil.copyfile(self.remote_file, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(self.local_file) value = handler_send.variables[self.var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == self.region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = self.region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[self.var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, self.local_file) def _add_region_dimension_to_var(self): handler = Utils.openCdf(self.local_file) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = self.region original_var = handler.variables[self.var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.var)) Utils.rename_variable(self.local_file, 'new_var', self.var) def _correct_metadata(self): if not self.cmor_var: return handler = Utils.openCdf(self.local_file) var_handler = handler.variables[self.var] self._fix_variable_name(var_handler) handler.modeling_realm = self.cmor_var.domain.name table = self.cmor_var.get_table(self.frequency, self.data_convention) handler.table_id = 'Table {0} ({1})'.format(table.name, table.date) if self.cmor_var.units: self._fix_units(var_handler) handler.sync() self._fix_coordinate_variables_metadata(handler) var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type) def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name var_handler.short_name = self.cmor_var.short_name def _fix_values_metadata(self, var_type): if self.cmor_var.valid_min != '': valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min) else: valid_min = '' if self.cmor_var.valid_max != '': valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max) else: valid_max = '' Utils.nco.ncatted(input=self.local_file, output=self.local_file, options='-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.var, var_type.char, valid_min, valid_max)) def _fix_coordinate_variables_metadata(self, handler): if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if self.domain == ModelingRealms.ocean: handler.variables['lev'].standard_name = 'depth' if 'lon' in handler.variables: handler.variables['lon'].short_name = 'lon' handler.variables['lon'].standard_name = 'longitude' if 'lat' in handler.variables: handler.variables['lat'].short_name = 'lat' handler.variables['lat'].standard_name = 'latitude' EQUIVALENT_UNITS = {'-': '1.0', 'fractional': '1.0', 'psu': 'psu'} def _fix_units(self, var_handler): if 'units' not in var_handler.ncattrs(): return if var_handler.units.lower() in NetCDFFile.EQUIVALENT_UNITS: var_handler.units = NetCDFFile.EQUIVALENT_UNITS[var_handler.units.lower()] elif var_handler.units == 'C' or self.cmor_var.units == 'K': var_handler.units = 'deg_C' if self.cmor_var.units != var_handler.units: self._convert_units(var_handler) var_handler.units = self.cmor_var.units def _convert_units(self, var_handler): try: Utils.convert_units(var_handler, self.cmor_var.units) except ValueError as ex: Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex, self.cmor_var.short_name) factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, self.cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float(var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float(var_handler.valid_max) * factor + offset def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = 'lat' variables['nav_lon_grid_V'] = 'lon' variables['nav_lat_grid_U'] = 'lat' variables['nav_lon_grid_U'] = 'lon' variables['nav_lat_grid_T'] = 'lat' variables['nav_lon_grid_T'] = 'lon' Utils.rename_variables(self.local_file, variables, False, True) def add_diagnostic_history(self, diagnostic): from earthdiagnostics.earthdiags import EarthDiags history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version, diagnostic) self._add_history_line(history_line) def add_cmorization_history(self): from earthdiagnostics.earthdiags import EarthDiags history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version) self._add_history_line(history_line) def _add_history_line(self, history_line): utc_datetime = 'UTC ' + datetime.utcnow().isoformat() history_line = '{0}: {1};'.format(utc_datetime, history_line) handler = Utils.openCdf(self.local_file) try: history_line = history_line + handler.history except AttributeError: history_line = history_line handler.history = Utils.convert_to_ASCII_if_possible(history_line) handler.close() class UnitConversion(object): """ Class to manage unit conversions """ _dict_conversions = None @classmethod def load_conversions(cls): """ Load conversions from the configuration file """ cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Adds a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Gets the conversion factor and offset for two units . The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None