# coding: utf-8 import csv import os import re import shutil import threading from earthdiagnostics.datafile import NetCDFFile as NCfile, StorageStatus, LocalStatus from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import Utils from earthdiagnostics.variable_type import VariableType class DataManager(object): """ Class to manage the data repositories :param config: :type config: Config """ def __init__(self, config): self.config = config self.experiment = config.experiment self._checked_vars = list() self.variable_list = config.var_manager UnitConversion.load_conversions() self.lock = threading.Lock() self.requested_files = {} def _get_file_from_storage(self, filepath): if filepath not in self.requested_files: self.requested_files[filepath] = NCfile.from_storage(filepath) file_object = self.requested_files[filepath] file_object.local_satatus = LocalStatus.PENDING return self.requested_files[filepath] def _declare_generated_file(self, remote_file, domain, final_var, cmor_var, data_convention, region, diagnostic, grid, var_type, original_var): if remote_file not in self.requested_files: self.requested_files[remote_file] = NCfile.to_storage(remote_file) file_object = self.requested_files[remote_file] file_object.diagnostic = diagnostic file_object.var_type = var_type file_object.grid = grid file_object.data_manager = self file_object.domain = domain file_object.var = original_var file_object.final_name = final_var file_object.cmor_var = cmor_var file_object.region = region file_object.data_convention = data_convention file_object.storage_status = StorageStatus.PENDING return file_object @staticmethod def _get_final_var_name(box, var): if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() return var def get_varfolder(self, domain, var, grid=None, frequency=None): if grid: var = '{0}-{1}'.format(var, grid) if domain in [ModelingRealms.ocean, ModelingRealms.seaIce, ModelingRealms.ocnBgchem]: return self._apply_fxh(var, self.experiment.ocean_timestep, frequency) else: return self._apply_fxh(var, self.experiment.atmos_timestep, frequency) def _apply_fxh(self, folder_name, timestep, frequency=None): is_base_frequency = frequency is not None and frequency.frequency.endswith('hr') if not is_base_frequency and timestep > 0: return '{0}_f{1}h'.format(folder_name, timestep) return folder_name def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype): freq_str = frequency.folder_name(vartype) if not grid: grid = 'original' variable_folder = self.get_varfolder(domain, var) vargrid_folder = self.get_varfolder(domain, var, grid) self.lock.acquire() try: if grid == 'original': link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) if os.path.islink(link_path): link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) else: link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder.replace('-{0}_f'.format(grid), '-original_f')) if os.path.islink(default_path): os.remove(default_path) elif os.path.isdir(default_path): shutil.move(default_path, original_path) os.symlink(link_path, default_path) if move_old and link_path not in self._checked_vars: self._checked_vars.append(link_path) old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, 'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep)) regex = re.compile(var + '_[0-9]{6,8}\.nc') for filename in os.listdir(link_path): if regex.match(filename): Utils.create_folder_tree(old_path) Utils.move_file(os.path.join(link_path, filename), os.path.join(old_path, filename)) link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) if not os.path.exists(filepath): raise ValueError('Original file {0} does not exists'.format(filepath)) if not os.path.isdir(os.path.dirname(link_path)): Utils.create_folder_tree(os.path.dirname(link_path)) relative_path = os.path.relpath(filepath, os.path.dirname(link_path)) os.symlink(relative_path, link_path) except Exception: raise finally: self.lock.release() # Overridable methods (not mandatory) def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None, frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN): """ Creates the link of a given file from the CMOR repository. :param cmor_var: :param move_old: :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ pass def prepare(self): """ Prepares the data to be used by the diagnostic. :return: """ pass def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str|NoneType :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency|NoneType :return: path to the copy created on the scratch folder :param vartype: Variable type (mean, statistic) :type vartype: VariableType :rtype: str """ raise NotImplementedError('Class must override request_chunk method') class UnitConversion(object): """ Class to manage unit conversions """ _dict_conversions = None @classmethod def load_conversions(cls): """ Load conversions from the configuration file """ cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Adds a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Gets the conversion factor and offset for two units . The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None