# coding: utf-8 import csv import shutil import threading from datetime import datetime import numpy as np import os import re from cfunits import Units from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.variable import Variable class DataManager(object): """ Class to manage the data repositories :param config: :type config: Config """ def __init__(self, config): self.config = config self.experiment = config.experiment self._checked_vars = list() Variable.load_variables() UnitConversion.load_conversions() self.lock = threading.Lock() self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles') def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ raise NotImplementedError() def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False, diagnostic=None, cmorized=False): """ Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge with already existing ones as needed :param move_old: if true, moves files following older conventions that may be found on the links folder :type move_old: bool :param date_str: exact date_str to use in the cmorized file :type: str :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param rename_var: if exists, the given variable will be renamed to the one given by var :type rename_var: str :param filetosend: path to the file to send to the CMOR repository :type filetosend: str :param region: specifies the region represented by the file. If it is defined, the data will be appended to the CMOR repository as a new region in the file or will overwrite if region was already present :type region: str :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ raise NotImplementedError() def get_year(self, domain, var, startdate, member, year, grid=None, box=None): """ Ge a file containing all the data for one year for one variable :param domain: variable's domain :type domain: str :param var: variable's name :type var: str :param startdate: startdate to retrieve :type startdate: str :param member: member to retrieve :type member: int :param year: year to retrieve :type year: int :param grid: variable's grid :type grid: str :param box: variable's box :type box: Box :return: """ raise NotImplementedError() @staticmethod def _get_final_var_name(box, var): if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() return var @staticmethod def correct_domain(domain): """ Corrects domain capitalization :param domain: domain name :type domain: str :return: domain with correct capitalization :rtype: str """ domain = domain.lower() if domain == 'seaice': return 'seaIce' elif domain == 'landice': return 'landIce' return domain def get_varfolder(self, domain, var, grid=None): if grid: var = '{0}-{1}'.format(var, grid) if domain in ['ocean', 'seaIce']: return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep) else: return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep) def _create_link(self, domain, filepath, frequency, var, grid, move_old): freq_str = self.frequency_folder_name(frequency) if not grid: grid = 'original' variable_folder = self.get_varfolder(domain, var) vargrid_folder = self.get_varfolder(domain, var, grid) if grid == 'original': link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) if os.path.islink(link_path): link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) else: link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder.replace('-{0}_f'.format(grid), '-original_f')) if os.path.islink(default_path): os.remove(default_path) elif os.path.isdir(default_path): shutil.move(default_path, original_path) os.symlink(link_path, default_path) if move_old: if self.lock.acquire(False): if link_path not in self._checked_vars: self._checked_vars.append(link_path) self.lock.release() old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, 'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep)) regex = re.compile(var + '_[0-9]{6,8}\.nc') for filename in os.listdir(link_path): if regex.match(filename): Utils.create_folder_tree(old_path) Utils.move_file(os.path.join(link_path, filename), os.path.join(old_path, filename)) link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) if not os.path.exists(filepath): raise ValueError('Original file {0} does not exists'.format(filepath)) os.symlink(filepath, link_path) @staticmethod def frequency_folder_name(frequency): if frequency in ('d', 'daily', 'day'): freq_str = 'daily_mean' elif frequency.endswith('hr'): freq_str = frequency[:-2] + 'hourly' else: freq_str = 'monthly_mean' return freq_str # Overridable methods (not mandatory) def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None, frequency=None, year=None, date_str=None, move_old=False): """ Creates the link of a given file from the CMOR repository. :param move_old: :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ pass def prepare(self): """ Prepares the data to be used by the diagnostic. :return: """ pass class NetCDFFile(object): """ Class to manage netCDF file and pr :param remote_file: :type remote_file: str :param local_file: :type local_file: str :param domain: :type domain: str :param var: :type var: str :param cmor_var: :type cmor_var: Variable """ def __init__(self, remote_file, local_file, domain, var, cmor_var): self.remote_file = remote_file self.local_file = local_file self.domain = domain self.var = var self.cmor_var = cmor_var self.region = None self.frequency = None def send(self): Utils.convert2netcdf4(self.local_file) if self.region: self._prepare_region() if self.cmor_var: self._correct_metadata() self._rename_coordinate_variables() Utils.move_file(self.local_file, self.remote_file) def _prepare_region(self): if not os.path.exists(self.remote_file): self._add_region_dimension_to_var() else: self._update_var_with_region_data() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O --fix_rec_dmn region') def _update_var_with_region_data(self): temp = TempFile.get() shutil.copyfile(self.remote_file, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(self.local_file) value = handler_send.variables[self.var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == self.region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = self.region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[self.var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, self.local_file) def _add_region_dimension_to_var(self): handler = Utils.openCdf(self.local_file) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = self.region original_var = handler.variables[self.var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.var)) Utils.rename_variable(self.local_file, 'new_var', self.var) def _correct_metadata(self): handler = Utils.openCdf(self.local_file) var_handler = handler.variables[self.var] self._fix_variable_name(var_handler) handler.modeling_realm = self.cmor_var.domain handler.table_id = 'Table {0} (December 2013)'.format(Variable.get_table_name(self.cmor_var.domain, self.frequency)) if self.cmor_var.units: self._fix_units(var_handler) handler.sync() self._fix_coordinate_variables_metadata(handler) var_type = var_handler.dtype handler.close() self._fix_values_metadata(var_type) def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name var_handler.short_name = self.cmor_var.short_name def _fix_values_metadata(self, var_type): if self.cmor_var.valid_min != '': valid_min = '-a valid_min, {0}, o, {1}, "{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min) else: valid_min = '' if self.cmor_var.valid_max != '': valid_max = '-a valid_max, {0}, o, {1}, "{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max) else: valid_max = '' Utils.nco.ncatted(input=self.local_file, output=self.local_file, options='-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.var, var_type.char, valid_min, valid_max)) def _fix_coordinate_variables_metadata(self, handler): if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if self.domain == 'ocean': handler.variables['lev'].standard_name = 'depth' if 'lon' in handler.variables: handler.variables['lon'].short_name = 'lon' handler.variables['lon'].standard_name = 'longitude' if 'lat' in handler.variables: handler.variables['lat'].short_name = 'lat' handler.variables['lat'].standard_name = 'latitude' def _fix_units(self, var_handler): if 'units' not in var_handler.ncattrs(): return if var_handler.units == 'PSU': var_handler.units = 'psu' if var_handler.units == 'C' and self.cmor_var.units == 'K': var_handler.units = 'deg_C' if self.cmor_var.units != var_handler.units: self._convert_units(var_handler) var_handler.units = self.cmor_var.units def _convert_units(self, var_handler): try: self._convert_using_cfunits(var_handler) except ValueError: factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, self.cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float(var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float(var_handler.valid_max) * factor + offset def _convert_using_cfunits(self, var_handler): new_unit = Units(self.cmor_var.units) old_unit = Units(var_handler.units) var_handler[:] = Units.conform(var_handler[:], old_unit, new_unit, inplace=True) if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = Units.conform(float(var_handler.valid_min), old_unit, new_unit, inplace=True) if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = Units.conform(float(var_handler.valid_max), old_unit, new_unit, inplace=True) def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = 'lat' variables['nav_lon_grid_V'] = 'lon' variables['nav_lat_grid_U'] = 'lat' variables['nav_lon_grid_U'] = 'lon' variables['nav_lat_grid_T'] = 'lat' variables['nav_lon_grid_T'] = 'lon' Utils.rename_variables(self.local_file, variables, False, True) def add_diagnostic_history(self, diagnostic): from earthdiagnostics.earthdiags import EarthDiags history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version, diagnostic) self._add_history_line(history_line) def add_cmorization_history(self): from earthdiagnostics.earthdiags import EarthDiags history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version) self._add_history_line(history_line) def _add_history_line(self, history_line): utc_datetime = 'UTC ' + datetime.utcnow().isoformat() history_line = '{0}: {1};'.format(utc_datetime, history_line) handler = Utils.openCdf(self.local_file) try: handler.history += history_line except AttributeError: handler.history = history_line handler.close() class UnitConversion(object): """ Class to manage unit conversions """ _dict_conversions = None @classmethod def load_conversions(cls): """ Load conversions from the configuration file """ cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Adds a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Gets the conversion factor and offset for two units . The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None