datamanager.py 11.2 KB
Newer Older
import csv
import os
from earthdiagnostics.datafile import NetCDFFile as NCfile, StorageStatus, LocalStatus
from earthdiagnostics.modelingrealm import ModelingRealms
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.utils import Utils
from earthdiagnostics.variable_type import VariableType


class DataManager(object):
    """
    Class to manage the data repositories
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def __init__(self, config):
        self.config = config
        self.experiment = config.experiment
        self._checked_vars = list()
        self.variable_list = config.var_manager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        UnitConversion.load_conversions()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.requested_files = {}
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _get_file_from_storage(self, filepath):
        if filepath not in self.requested_files:
            self.requested_files[filepath] = NCfile.from_storage(filepath, self.config.data_convention)
        file_object = self.requested_files[filepath]
        file_object.local_satatus = LocalStatus.PENDING
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return self.requested_files[filepath]

    def _declare_generated_file(self, remote_file, domain, final_var, cmor_var, data_convention,
                                region, diagnostic, grid, var_type, original_var):
        if remote_file not in self.requested_files:
            self.requested_files[remote_file] = NCfile.to_storage(remote_file, data_convention)
        file_object = self.requested_files[remote_file]
        file_object.diagnostic = diagnostic
        file_object.var_type = var_type
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        file_object.grid = grid
        file_object.data_manager = self
        file_object.domain = domain
        file_object.var = original_var
        file_object.final_name = final_var
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        file_object.cmor_var = cmor_var
        file_object.region = region
        file_object.storage_status = StorageStatus.PENDING
        return file_object

    @staticmethod
    def _get_final_var_name(box, var):
        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
        return var

    def get_varfolder(self, domain, var, grid=None, frequency=None):
        if grid:
            var = '{0}-{1}'.format(var, grid)

        if domain in [ModelingRealms.ocean, ModelingRealms.seaIce, ModelingRealms.ocnBgchem]:
            return self._apply_fxh(var, self.experiment.ocean_timestep, frequency)
            return self._apply_fxh(var, self.experiment.atmos_timestep, frequency)

    def _apply_fxh(self, folder_name, timestep, frequency=None):
        is_base_frequency = frequency is not None and frequency.frequency.endswith('hr')
        if not is_base_frequency and timestep > 0:
            return '{0}_f{1}h'.format(folder_name, timestep)
        return folder_name
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if self.config.data_convention == 'meteofrance':
        freq_str = frequency.folder_name(vartype)
        variable_folder = self.get_varfolder(domain, var)
        vargrid_folder = self.get_varfolder(domain, var, grid)
        try:
            if grid == 'original':
                link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
                if os.path.islink(link_path):
                    link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)

                Utils.create_folder_tree(link_path)
            else:
                link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
                Utils.create_folder_tree(link_path)
                default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
                original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
                                             vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))

                if os.path.islink(default_path):
                    os.remove(default_path)
                elif os.path.isdir(default_path):
                    shutil.move(default_path, original_path)
                os.symlink(link_path, default_path)

            if move_old and link_path not in self._checked_vars:
                self._checked_vars.append(link_path)
                old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
                                        'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep))
                regex = re.compile(var + '_[0-9]{6,8}\.nc')
                for filename in os.listdir(link_path):
                    if regex.match(filename):
                        Utils.create_folder_tree(old_path)
                        Utils.move_file(os.path.join(link_path, filename),
                                        os.path.join(old_path, filename))

            link_path = os.path.join(link_path, os.path.basename(filepath))
            if os.path.lexists(link_path):
                os.remove(link_path)
            if not os.path.exists(filepath):
                raise ValueError('Original file {0} does not exists'.format(filepath))
            if not os.path.isdir(os.path.dirname(link_path)):
                Utils.create_folder_tree(os.path.dirname(link_path))
            relative_path = os.path.relpath(filepath, os.path.dirname(link_path))
            os.symlink(relative_path, link_path)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        except Exception:
    # Overridable methods (not mandatory)
    def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
                  frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Creates the link of a given file from the CMOR repository.

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param move_old:
        :param date_str:
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param domain: CMOR domain
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
    def prepare(self):
        """
        Prepares the data to be used by the diagnostic.
        :return:
        """
        pass

    def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None):
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param domain: CMOR domain
        :type domain: Domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str|None
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency|None
        :return: path to the copy created on the scratch folder
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :rtype: str
        """
        raise NotImplementedError('Class must override request_chunk method')

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class UnitConversion(object):
    """
    Class to manage unit conversions
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    _dict_conversions = None

    @classmethod
    def load_conversions(cls):
        """
        Load conversions from the configuration file
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        cls._dict_conversions = dict()
        with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'r') as csvfile:
            reader = csv.reader(csvfile, dialect='excel')
            for line in reader:
                if line[0] == 'original':
                    continue
                cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @classmethod
    def add_conversion(cls, conversion):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Adds a conversion to the dictionary

        :param conversion: conversion to add
        :type conversion: UnitConversion
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion

    def __init__(self, source, destiny, factor, offset):
        self.source = source
        self.destiny = destiny
        self.factor = float(factor)
        self.offset = float(offset)

    @classmethod
    def get_conversion_factor_offset(cls, input_units, output_units):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
        converted = original * factor + offset

        :param input_units: original units
        :type input_units: str
        :param output_units: destiny units
        :type output_units: str
        :return: factor and offset
        :rtype: [float, float]
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        units = input_units.split()
        if len(units) == 1:
            scale_unit = 1
            unit = units[0]
        else:
            if '^' in units[0]:
                values = units[0].split('^')
                scale_unit = pow(int(values[0]), int(values[1]))
            else:
                scale_unit = float(units[0])
            unit = units[1]

        units = output_units.split()
        if len(units) == 1:
            scale_new_unit = 1
            new_unit = units[0]
        else:
            if '^' in units[0]:
                values = units[0].split('^')
                scale_new_unit = pow(int(values[0]), int(values[1]))
            else:
                scale_new_unit = float(units[0])
            new_unit = units[1]

        factor, offset = UnitConversion._get_factor(new_unit, unit)
        if factor is None:
            return None, None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        factor = factor * scale_unit / float(scale_new_unit)
        offset /= float(scale_new_unit)

        return factor, offset

    @classmethod
    def _get_factor(cls, new_unit, unit):
        # Add  only the conversions with a factor greater than 1
        if unit == new_unit:
            return 1, 0
        elif (unit, new_unit) in cls._dict_conversions:
            conversion = cls._dict_conversions[(unit, new_unit)]
            return conversion.factor, conversion.offset
        elif (new_unit, unit) in cls._dict_conversions:
            conversion = cls._dict_conversions[(new_unit, unit)]
            return 1 / conversion.factor, -conversion.offset
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        else:
            return None, None