# coding: utf-8 import csv import shutil import threading from datetime import datetime import numpy as np import os import re from bscearth.utils.log import Log from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.variable import VariableManager from earthdiagnostics.variable_type import VariableType from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.datafile import NetCDFFile as NCfile, StorageStatus, LocalStatus class DataManager(object): """ Class to manage the data repositories :param config: :type config: Config """ def __init__(self, config): self.config = config self.experiment = config.experiment self._checked_vars = list() self.variable_list = VariableManager() self.variable_list.load_variables(self.config.data_convention) UnitConversion.load_conversions() self.lock = threading.Lock() self.requested_files = {} def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=VariableType.MEAN): """ Checks if a given file exists :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ raise NotImplementedError() def _get_file_from_storage(self, filepath): if filepath not in self.requested_files: self.requested_files[filepath] = NCfile.from_storage(filepath) file_object = self.requested_files[filepath] file_object.local_satatus = LocalStatus.PENDING return self.requested_files[filepath] def _declare_generated_file(self, remote_file, domain, var, cmor_var, data_convention, region, diagnostic, var_type): if remote_file not in self.requested_files: self.requested_files[remote_file] = NCfile.to_storage(remote_file, domain, var, cmor_var, data_convention, region) file_object = self.requested_files[remote_file] file_object.diagnostic = diagnostic file_object.var_type = var_type file_object.storage_status = StorageStatus.PENDING return file_object def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=VariableType.MEAN): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ raise NotImplementedError() def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False, diagnostic=None, cmorized=False, vartype=VariableType.MEAN): """ Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge with already existing ones as needed :param move_old: if true, moves files following older conventions that may be found on the links folder :type move_old: bool :param date_str: exact date_str to use in the cmorized file :type: str :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param rename_var: if exists, the given variable will be renamed to the one given by var :type rename_var: str :param filetosend: path to the file to send to the CMOR repository :type filetosend: str :param region: specifies the region represented by the file. If it is defined, the data will be appended to the CMOR repository as a new region in the file or will overwrite if region was already present :type region: str :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency :param diagnostic: diagnostic used to generate the file :type diagnostic: Diagnostic :param cmorized: flag to indicate if file was generated in cmorization process :type cmorized: bool :param vartype: Variable type (mean, statistic) :type vartype: VariableType """ raise NotImplementedError() def get_year(self, domain, var, startdate, member, year, grid=None, box=None): """ Ge a file containing all the data for one year for one variable :param domain: variable's domain :type domain: Domain :param var: variable's name :type var: str :param startdate: startdate to retrieve :type startdate: str :param member: member to retrieve :type member: int :param year: year to retrieve :type year: int :param grid: variable's grid :type grid: str :param box: variable's box :type box: Box :return: """ raise NotImplementedError() @staticmethod def _get_final_var_name(box, var): if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() return var def get_varfolder(self, domain, var, grid=None): if grid: var = '{0}-{1}'.format(var, grid) if domain in [ModelingRealms.ocean, ModelingRealms.seaIce]: return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep) else: return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep) def _create_link(self, domain, filepath, frequency, var, grid, move_old, vartype): freq_str = frequency.folder_name(vartype) if not grid: grid = 'original' variable_folder = self.get_varfolder(domain, var) vargrid_folder = self.get_varfolder(domain, var, grid) self.lock.acquire() try: if grid == 'original': link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) if os.path.islink(link_path): link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) else: link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder.replace('-{0}_f'.format(grid), '-original_f')) if os.path.islink(default_path): os.remove(default_path) elif os.path.isdir(default_path): shutil.move(default_path, original_path) os.symlink(link_path, default_path) if move_old and link_path not in self._checked_vars: self._checked_vars.append(link_path) old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, 'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep)) regex = re.compile(var + '_[0-9]{6,8}\.nc') for filename in os.listdir(link_path): if regex.match(filename): Utils.create_folder_tree(old_path) Utils.move_file(os.path.join(link_path, filename), os.path.join(old_path, filename)) link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) if not os.path.exists(filepath): raise ValueError('Original file {0} does not exists'.format(filepath)) if not os.path.isdir(os.path.dirname(link_path)): Utils.create_folder_tree(os.path.dirname(link_path)) os.symlink(filepath, link_path) except: raise finally: self.lock.release() # Overridable methods (not mandatory) def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None, frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN): """ Creates the link of a given file from the CMOR repository. :param cmor_var: :param move_old: :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ pass def prepare(self): """ Prepares the data to be used by the diagnostic. :return: """ pass class NetCDFFile(object): """ Class to manage netCDF file and pr """ def __init__(self): self.remote_file = None self.local_file = None self.domain = None self.var = None self.cmor_var = None self.region = None self.frequency = None self.data_convention = None @staticmethod def from_storage(filepath): file_object = NetCDFFile() file_object.remote_file = filepath return file_object @staticmethod def to_storage(domain, var, cmor_var, data_convention, region): new_object = NetCDFFile() new_object.domain = domain new_object.var = var new_object.cmor_var = cmor_var new_object.region = region new_object.frequency = None new_object.data_convention = data_convention return new_object def download(self): try: if not self.local_file: self.local_file = TempFile.get() Utils.copy_file(self.remote_file, self.local_file) Log.info('File {0} ready!', self.remote_file) except Exception as ex: os.remove(self.local_file) Log.error('File {0} not available: {1}', self.remote_file, ex) def send(self): Utils.convert2netcdf4(self.local_file) self._correct_metadata() self._prepare_region() self._rename_coordinate_variables() Utils.move_file(self.local_file, self.remote_file) def _prepare_region(self): if not self.region: return if not os.path.exists(self.remote_file): self._add_region_dimension_to_var() else: self._update_var_with_region_data() self._correct_metadata() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O --fix_rec_dmn region') def _update_var_with_region_data(self): temp = TempFile.get() shutil.copyfile(self.remote_file, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(self.local_file) value = handler_send.variables[self.var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == self.region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = self.region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[self.var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, self.local_file) def _add_region_dimension_to_var(self): handler = Utils.openCdf(self.local_file) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = self.region original_var = handler.variables[self.var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.var)) Utils.rename_variable(self.local_file, 'new_var', self.var) def _correct_metadata(self): if not self.cmor_var: return handler = Utils.openCdf(self.local_file) var_handler = handler.variables[self.var] self._fix_variable_name(var_handler) handler.modeling_realm = self.cmor_var.domain.name table = self.cmor_var.get_table(self.frequency, self.data_convention) handler.table_id = 'Table {0} ({1})'.format(table.name, table.date) if self.cmor_var.units: self._fix_units(var_handler) handler.sync() self._fix_coordinate_variables_metadata(handler) var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type) def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name var_handler.short_name = self.cmor_var.short_name def _fix_values_metadata(self, var_type): if self.cmor_var.valid_min != '': valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min) else: valid_min = '' if self.cmor_var.valid_max != '': valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max) else: valid_max = '' Utils.nco.ncatted(input=self.local_file, output=self.local_file, options='-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.var, var_type.char, valid_min, valid_max)) def _fix_coordinate_variables_metadata(self, handler): if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if self.domain == ModelingRealms.ocean: handler.variables['lev'].standard_name = 'depth' if 'lon' in handler.variables: handler.variables['lon'].short_name = 'lon' handler.variables['lon'].standard_name = 'longitude' if 'lat' in handler.variables: handler.variables['lat'].short_name = 'lat' handler.variables['lat'].standard_name = 'latitude' def _fix_units(self, var_handler): if 'units' not in var_handler.ncattrs(): return if var_handler.units == '-': var_handler.units = '1.0' if var_handler.units == 'PSU': var_handler.units = 'psu' if var_handler.units == 'C' and self.cmor_var.units == 'K': var_handler.units = 'deg_C' if self.cmor_var.units != var_handler.units: self._convert_units(var_handler) var_handler.units = self.cmor_var.units def _convert_units(self, var_handler): try: Utils.convert_units(var_handler, self.cmor_var.units) except ValueError as ex: Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex, self.cmor_var.short_name) factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, self.cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float(var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float(var_handler.valid_max) * factor + offset def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = 'lat' variables['nav_lon_grid_V'] = 'lon' variables['nav_lat_grid_U'] = 'lat' variables['nav_lon_grid_U'] = 'lon' variables['nav_lat_grid_T'] = 'lat' variables['nav_lon_grid_T'] = 'lon' Utils.rename_variables(self.local_file, variables, False, True) def add_diagnostic_history(self, diagnostic): from earthdiagnostics.earthdiags import EarthDiags history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version, diagnostic) self._add_history_line(history_line) def add_cmorization_history(self): from earthdiagnostics.earthdiags import EarthDiags history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version) self._add_history_line(history_line) def _add_history_line(self, history_line): utc_datetime = 'UTC ' + datetime.utcnow().isoformat() history_line = '{0}: {1};'.format(utc_datetime, history_line) handler = Utils.openCdf(self.local_file) try: history_line = history_line + handler.history except AttributeError: history_line = history_line handler.history = Utils.convert_to_ASCII_if_possible(history_line) handler.close() class UnitConversion(object): """ Class to manage unit conversions """ _dict_conversions = None @classmethod def load_conversions(cls): """ Load conversions from the configuration file """ cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Adds a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Gets the conversion factor and offset for two units . The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None