21.5 KB
Newer Older
import csv
from datetime import datetime

import numpy as np
import os
from bscearth.utils.log import Log
from cfunits import Units
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable import Variable, VariableManager
from earthdiagnostics.variable_type import VariableType
from earthdiagnostics.modelingrealm import ModelingRealms

class DataManager(object):
    Class to manage the data repositories
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def __init__(self, config):
        self.config = config
        self.experiment = config.experiment
        self._checked_vars = list()
        self.variable_list = VariableManager()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
        Checks if a given file exists

        :param domain: CMOR domain
        :type domain: Domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: path to the copy created on the scratch folder
        :rtype: str
        raise NotImplementedError()

    def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param domain: CMOR domain
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :return: path to the copy created on the scratch folder
        :rtype: str
        raise NotImplementedError()
    def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None,
                  box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
                  diagnostic=None, cmorized=False, vartype=VariableType.MEAN):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
        with already existing ones as needed

        :param move_old: if true, moves files following older conventions that may be found on the links folder
        :type move_old: bool
        :param date_str: exact date_str to use in the cmorized file
        :type: str
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param rename_var: if exists, the given variable will be renamed to the one given by var
        :type rename_var: str
        :param filetosend: path to the file to send to the CMOR repository
        :type filetosend: str
        :param region: specifies the region represented by the file. If it is defined, the data will be appended to the
            CMOR repository as a new region in the file or will overwrite if region was already present
        :type region: str
        :param domain: CMOR domain
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency
        :param diagnostic: diagnostic used to generate the file
        :type diagnostic: Diagnostic
        :param cmorized: flag to indicate if file was generated in cmorization process
        :type cmorized: bool
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        raise NotImplementedError()
    def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
        Ge a file containing all the data for one year for one variable
        :param domain: variable's domain
        :param var: variable's name
        :type var: str
        :param startdate: startdate to retrieve
        :type startdate: str
        :param member: member to retrieve
        :type member: int
        :param year: year to retrieve
        :type year: int
        :param grid: variable's grid
        :type grid: str
        :param box: variable's box
        :type box: Box
        raise NotImplementedError()
    def _get_final_var_name(box, var):
        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
        return var

    def get_varfolder(self, domain, var, grid=None):
        if grid:
            var = '{0}-{1}'.format(var, grid)

        if domain in [ModelingRealms.ocean, ModelingRealms.seaIce]:
            return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep)
            return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep)

    def _create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
        freq_str = frequency.folder_name(vartype)
        vargrid_folder = self.get_varfolder(domain, var, grid)
            if grid == 'original':
                link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
                if os.path.islink(link_path):
                    link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)

                link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
                default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
                original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
                                             vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))

                if os.path.islink(default_path):
                elif os.path.isdir(default_path):
                    shutil.move(default_path, original_path)
                os.symlink(link_path, default_path)

            if move_old and link_path not in self._checked_vars:
                old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
                                        'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep))
                regex = re.compile(var + '_[0-9]{6,8}\.nc')
                for filename in os.listdir(link_path):
                    if regex.match(filename):
                        Utils.move_file(os.path.join(link_path, filename),
                                        os.path.join(old_path, filename))

            link_path = os.path.join(link_path, os.path.basename(filepath))
            if os.path.lexists(link_path):
            if not os.path.exists(filepath):
                raise ValueError('Original file {0} does not exists'.format(filepath))
            if not os.path.isdir(os.path.dirname(link_path)):
            relative_path = os.path.relpath(filepath, os.path.dirname(link_path))
            os.symlink(relative_path, link_path)
    # Overridable methods (not mandatory)
    def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
                  frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Creates the link of a given file from the CMOR repository.

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param move_old:
        :param date_str:
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param domain: CMOR domain
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :return: path to the copy created on the scratch folder
        :rtype: str
    def prepare(self):
        Prepares the data to be used by the diagnostic.

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class NetCDFFile(object):
    Class to manage netCDF file and pr

    :param remote_file:
    :type remote_file: str
    :param local_file:
    :type local_file: str
    :param domain:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    :param var:
    :type var: str
    :param cmor_var:
    :type cmor_var: Variable
    def __init__(self, remote_file, local_file, domain, var, cmor_var, data_convention, region):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.remote_file = remote_file
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.domain = domain
        self.var = var
        self.cmor_var = cmor_var
        self.region = region
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.frequency = None
        self.data_convention = data_convention
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def send(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        Utils.move_file(self.local_file, self.remote_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _prepare_region(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if not os.path.exists(self.remote_file):
        Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('--fix_rec_dmn region',))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _update_var_with_region_data(self):
        temp = TempFile.get()
        shutil.copyfile(self.remote_file, temp)
        Utils.nco.ncks(input=temp, output=temp, options=('--mk_rec_dmn region',))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.openCdf(temp)
        handler_send = Utils.openCdf(self.local_file)
        value = handler_send.variables[self.var][:]
        var_region = handler.variables['region']
        basin_index = np.where(var_region[:] == self.region)
        if len(basin_index[0]) == 0:
            var_region[var_region.shape[0]] = self.region
            basin_index = var_region.shape[0] - 1

            basin_index = basin_index[0][0]
        handler.variables[self.var][..., basin_index] = value
        Utils.move_file(temp, self.local_file)

    def _add_region_dimension_to_var(self):
        handler = Utils.openCdf(self.local_file)
        var_region = handler.createVariable('region', str, 'region')
        var_region[0] = self.region
        original_var = handler.variables[self.var]
        new_var = handler.createVariable('new_var', original_var.datatype,
                                         original_var.dimensions + ('region',))
        new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
        value = original_var[:]
        new_var[..., 0] = value
        Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('-x -v {0}'.format(self.var),))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Utils.rename_variable(self.local_file, 'new_var', self.var)

    def _correct_metadata(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.openCdf(self.local_file)
        var_handler = handler.variables[self.var]
        table = self.cmor_var.get_table(self.frequency, self.data_convention)
        handler.table_id = 'Table {0} ({1})'.format(,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if self.cmor_var.units:
        var_type = var_handler.dtype

    def _fix_variable_name(self, var_handler):
        var_handler.standard_name = self.cmor_var.standard_name
        var_handler.long_name = self.cmor_var.long_name
        var_handler.short_name = self.cmor_var.short_name

    def _fix_values_metadata(self, var_type):
        options = ['-a _FillValue,{0},o,{1},"1.e20"'.format(self.var, var_type.char),
                   '-a missingValue,{0},o,{1},"1.e20"'.format(self.var, var_type.char)]
        if self.cmor_var.valid_min:
            options.append('-a valid_min,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min))
        if self.cmor_var.valid_max:
            options.append('-a valid_max,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max))
        Utils.nco.ncatted(input=self.local_file, output=self.local_file, options=options)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _fix_coordinate_variables_metadata(self, handler):
        if 'lev' in handler.variables:
            handler.variables['lev'].short_name = 'lev'
            if self.domain == ModelingRealms.ocean:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                handler.variables['lev'].standard_name = 'depth'
        if 'lon' in handler.variables:
            handler.variables['lon'].short_name = 'lon'
            handler.variables['lon'].standard_name = 'longitude'
        if 'lat' in handler.variables:
            handler.variables['lat'].short_name = 'lat'
            handler.variables['lat'].standard_name = 'latitude'

    EQUIVALENT_UNITS = {'-': '1.0', 'fractional': '1.0', 'psu': 'psu'}

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _fix_units(self, var_handler):
        if 'units' not in var_handler.ncattrs():
        if var_handler.units.lower() in NetCDFFile.EQUIVALENT_UNITS:
            var_handler.units = NetCDFFile.EQUIVALENT_UNITS[var_handler.units.lower()]
        elif var_handler.units == 'C' or self.cmor_var.units == 'K':
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            var_handler.units = 'deg_C'
        if self.cmor_var.units != var_handler.units:
        var_handler.units = self.cmor_var.units

    def _convert_units(self, var_handler):
            Utils.convert_units(var_handler, self.cmor_var.units)
        except ValueError as ex:
            Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            var_handler[:] = var_handler[:] * factor + offset
            if 'valid_min' in var_handler.ncattrs():
                var_handler.valid_min = float(var_handler.valid_min) * factor + offset
            if 'valid_max' in var_handler.ncattrs():
                var_handler.valid_max = float(var_handler.valid_max) * factor + offset

    def _rename_coordinate_variables(self):
        variables = dict()
        variables['x'] = 'i'
        variables['y'] = 'j'
        variables['nav_lat_grid_V'] = 'lat'
        variables['nav_lon_grid_V'] = 'lon'
        variables['nav_lat_grid_U'] = 'lat'
        variables['nav_lon_grid_U'] = 'lon'
        variables['nav_lat_grid_T'] = 'lat'
        variables['nav_lon_grid_T'] = 'lon'
        Utils.rename_variables(self.local_file, variables, False, True)

    def add_diagnostic_history(self, diagnostic):
        from earthdiagnostics.earthdiags import EarthDiags
        history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,

    def add_cmorization_history(self):
        from earthdiagnostics.earthdiags import EarthDiags
        history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)

    def _add_history_line(self, history_line):
        utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
        history_line = '{0}: {1};'.format(utc_datetime, history_line)

        handler = Utils.openCdf(self.local_file)
            history_line = history_line + handler.history
        except AttributeError:
        handler.history = Utils.convert_to_ASCII_if_possible(history_line)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class UnitConversion(object):
    Class to manage unit conversions
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    _dict_conversions = None

    def load_conversions(cls):
        Load conversions from the configuration file
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        cls._dict_conversions = dict()
        with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
            reader = csv.reader(csvfile, dialect='excel')
            for line in reader:
                if line[0] == 'original':
                cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def add_conversion(cls, conversion):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Adds a conversion to the dictionary

        :param conversion: conversion to add
        :type conversion: UnitConversion
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion

    def __init__(self, source, destiny, factor, offset):
        self.source = source
        self.destiny = destiny
        self.factor = float(factor)
        self.offset = float(offset)

    def get_conversion_factor_offset(cls, input_units, output_units):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
        converted = original * factor + offset

        :param input_units: original units
        :type input_units: str
        :param output_units: destiny units
        :type output_units: str
        :return: factor and offset
        :rtype: [float, float]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        units = input_units.split()
        if len(units) == 1:
            scale_unit = 1
            unit = units[0]
            if '^' in units[0]:
                values = units[0].split('^')
                scale_unit = pow(int(values[0]), int(values[1]))
                scale_unit = float(units[0])
            unit = units[1]

        units = output_units.split()
        if len(units) == 1:
            scale_new_unit = 1
            new_unit = units[0]
            if '^' in units[0]:
                values = units[0].split('^')
                scale_new_unit = pow(int(values[0]), int(values[1]))
                scale_new_unit = float(units[0])
            new_unit = units[1]

        factor, offset = UnitConversion._get_factor(new_unit, unit)
        if factor is None:
            return None, None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        factor = factor * scale_unit / float(scale_new_unit)
        offset /= float(scale_new_unit)

        return factor, offset

    def _get_factor(cls, new_unit, unit):
        # Add  only the conversions with a factor greater than 1
        if unit == new_unit:
            return 1, 0
        elif (unit, new_unit) in cls._dict_conversions:
            conversion = cls._dict_conversions[(unit, new_unit)]
            return conversion.factor, conversion.offset
        elif (new_unit, unit) in cls._dict_conversions:
            conversion = cls._dict_conversions[(new_unit, unit)]
            return 1 / conversion.factor, -conversion.offset
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            return None, None