datafile.py 12 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding: utf-8
import csv
import shutil
from datetime import datetime

import numpy as np
import os
from bscearth.utils.log import Log

from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.modelingrealm import ModelingRealms


class LocalStatus(object):
    PENDING = 0
    DOWNLOADING = 1
    READY = 2
    FAILED = 3
    TO_COMPUTE = 4


class StorageStatus(object):
    PENDING = 0
    UPLOADING = 1
    READY = 2
    FAILED = 3


class DataFile(object):

    def __init__(self, domain, var, frequency, data_convention):
        self.local_path = None
        self.upload_path = None
        self.download_path = None
        self.local_status = LocalStatus.PENDING
        self.message = None
        self.domain = domain
        self.var = var
        self.region = None
        self.frequency = frequency
        self.data_convention = data_convention

        self.cmor_var = VariableManager().get_variable(var, True)

    def download(self):
        raise NotImplementedError()

    def upload(self):
        self.storage_status = StorageStatus.UPLOADING
        Utils.convert2netcdf4(self.local_path)
        self._correct_metadata()
        self._prepare_region()
        self._rename_coordinate_variables()
        Utils.move_file(self.local_path, self.upload_path)
        self.storage_status = StorageStatus.READY

    def _correct_metadata(self):
        if not self.cmor_var:
            return
        handler = Utils.openCdf(self.local_path)
        var_handler = handler.variables[self.var]
        self._fix_variable_name(var_handler)
        handler.modeling_realm = self.cmor_var.domain.name
        table = self.cmor_var.get_table(self.frequency, self.data_convention)
        handler.table_id = 'Table {0} ({1})'.format(table.name, table.date)
        if self.cmor_var.units:
            self._fix_units(var_handler)
        handler.sync()
        self._fix_coordinate_variables_metadata(handler)
        var_type = var_handler.dtype
        handler.close()
        self._fix_values_metadata(var_type)

    def _fix_variable_name(self, var_handler):
        var_handler.standard_name = self.cmor_var.standard_name
        var_handler.long_name = self.cmor_var.long_name
        var_handler.short_name = self.cmor_var.short_name

    def _fix_values_metadata(self, var_type):
        if self.cmor_var.valid_min != '':
            valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min)
        else:
            valid_min = ''
        if self.cmor_var.valid_max != '':
            valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max)
        else:
            valid_max = ''
        Utils.nco.ncatted(input=self.local_path, output=self.local_path,
                          options='-O -a _FillValue,{0},o,{1},"1.e20" '
                                  '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.var, var_type.char,
                                                                                    valid_min, valid_max))

    def _fix_coordinate_variables_metadata(self, handler):
        if 'lev' in handler.variables:
            handler.variables['lev'].short_name = 'lev'
            if self.domain == ModelingRealms.ocean:
                handler.variables['lev'].standard_name = 'depth'
        if 'lon' in handler.variables:
            handler.variables['lon'].short_name = 'lon'
            handler.variables['lon'].standard_name = 'longitude'
        if 'lat' in handler.variables:
            handler.variables['lat'].short_name = 'lat'
            handler.variables['lat'].standard_name = 'latitude'

    def _fix_units(self, var_handler):
        if 'units' not in var_handler.ncattrs():
            return
        if var_handler.units == '-':
            var_handler.units = '1.0'
        if var_handler.units == 'PSU':
            var_handler.units = 'psu'
        if var_handler.units == 'C' and self.cmor_var.units == 'K':
            var_handler.units = 'deg_C'
        if self.cmor_var.units != var_handler.units:
            self._convert_units(var_handler)
        var_handler.units = self.cmor_var.units

    def _convert_units(self, var_handler):
        try:
            Utils.convert_units(var_handler, self.cmor_var.units)
        except ValueError as ex:
            Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
                        self.cmor_var.short_name)
            factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
                                                                         self.cmor_var.units)

            var_handler[:] = var_handler[:] * factor + offset
            if 'valid_min' in var_handler.ncattrs():
                var_handler.valid_min = float(var_handler.valid_min) * factor + offset
            if 'valid_max' in var_handler.ncattrs():
                var_handler.valid_max = float(var_handler.valid_max) * factor + offset

    def _prepare_region(self):
        if not self.region:
            return
        if not os.path.exists(self.upload_path):
            self._add_region_dimension_to_var()
        else:
            self._update_var_with_region_data()
            self._correct_metadata()
        Utils.nco.ncks(input=self.local_path, output=self.local_path, options='-O --fix_rec_dmn region')

    def _update_var_with_region_data(self):
        temp = TempFile.get()
        shutil.copyfile(self.upload_path, temp)
        Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
        handler = Utils.openCdf(temp)
        handler_send = Utils.openCdf(self.local_path)
        value = handler_send.variables[self.var][:]
        var_region = handler.variables['region']
        basin_index = np.where(var_region[:] == self.region)
        if len(basin_index[0]) == 0:
            var_region[var_region.shape[0]] = self.region
            basin_index = var_region.shape[0] - 1

        else:
            basin_index = basin_index[0][0]
        handler.variables[self.var][..., basin_index] = value
        handler.close()
        handler_send.close()
        Utils.move_file(temp, self.local_path)

    def _add_region_dimension_to_var(self):
        handler = Utils.openCdf(self.local_path)
        handler.createDimension('region')
        var_region = handler.createVariable('region', str, 'region')
        var_region[0] = self.region
        original_var = handler.variables[self.var]
        new_var = handler.createVariable('new_var', original_var.datatype,
                                         original_var.dimensions + ('region',))
        new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
        value = original_var[:]
        new_var[..., 0] = value
        handler.close()
        Utils.nco.ncks(input=self.local_path, output=self.local_path, options='-O -x -v {0}'.format(self.var))
        Utils.rename_variable(self.local_path, 'new_var', self.var)

    def _rename_coordinate_variables(self):
        variables = dict()
        variables['x'] = 'i'
        variables['y'] = 'j'
        variables['nav_lat_grid_V'] = 'lat'
        variables['nav_lon_grid_V'] = 'lon'
        variables['nav_lat_grid_U'] = 'lat'
        variables['nav_lon_grid_U'] = 'lon'
        variables['nav_lat_grid_T'] = 'lat'
        variables['nav_lon_grid_T'] = 'lon'
        Utils.rename_variables(self.local_path, variables, False, True)

    def add_diagnostic_history(self, diagnostic):
        from earthdiagnostics.earthdiags import EarthDiags
        history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,
                                                                                            diagnostic)
        self._add_history_line(history_line)

    def add_cmorization_history(self):
        from earthdiagnostics.earthdiags import EarthDiags
        history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)
        self._add_history_line(history_line)

    def _add_history_line(self, history_line):
        utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
        history_line = '{0}: {1};'.format(utc_datetime, history_line)

        handler = Utils.openCdf(self.local_path)
        try:
            history_line = history_line + handler.history
        except AttributeError:
            history_line = history_line
        handler.history = Utils.convert_to_ASCII_if_possible(history_line)
        handler.close()


class UnitConversion(object):
    """
    Class to manage unit conversions
    """
    _dict_conversions = None

    @classmethod
    def load_conversions(cls):
        """
        Load conversions from the configuration file
        """
        cls._dict_conversions = dict()
        with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
            reader = csv.reader(csvfile, dialect='excel')
            for line in reader:
                if line[0] == 'original':
                    continue
                cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))

    @classmethod
    def add_conversion(cls, conversion):
        """
        Adds a conversion to the dictionary

        :param conversion: conversion to add
        :type conversion: UnitConversion
        """
        cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion

    def __init__(self, source, destiny, factor, offset):
        self.source = source
        self.destiny = destiny
        self.factor = float(factor)
        self.offset = float(offset)

    @classmethod
    def get_conversion_factor_offset(cls, input_units, output_units):
        """
        Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
        converted = original * factor + offset

        :param input_units: original units
        :type input_units: str
        :param output_units: destiny units
        :type output_units: str
        :return: factor and offset
        :rtype: [float, float]
        """
        units = input_units.split()
        if len(units) == 1:
            scale_unit = 1
            unit = units[0]
        else:
            if '^' in units[0]:
                values = units[0].split('^')
                scale_unit = pow(int(values[0]), int(values[1]))
            else:
                scale_unit = float(units[0])
            unit = units[1]

        units = output_units.split()
        if len(units) == 1:
            scale_new_unit = 1
            new_unit = units[0]
        else:
            if '^' in units[0]:
                values = units[0].split('^')
                scale_new_unit = pow(int(values[0]), int(values[1]))
            else:
                scale_new_unit = float(units[0])
            new_unit = units[1]

        factor, offset = UnitConversion._get_factor(new_unit, unit)
        if factor is None:
            return None, None
        factor = factor * scale_unit / float(scale_new_unit)
        offset /= float(scale_new_unit)

        return factor, offset

    @classmethod
    def _get_factor(cls, new_unit, unit):
        # Add  only the conversions with a factor greater than 1
        if unit == new_unit:
            return 1, 0
        elif (unit, new_unit) in cls._dict_conversions:
            conversion = cls._dict_conversions[(unit, new_unit)]
            return conversion.factor, conversion.offset
        elif (new_unit, unit) in cls._dict_conversions:
            conversion = cls._dict_conversions[(new_unit, unit)]
            return 1 / conversion.factor, -conversion.offset
        else:
            return None, None


class THREDDSData(DataFile):

    def download(self):
        self.local_path = TempFile.get()
        Utils.execute_shell_command(['nccopy', '-s', '-d', '-4', self.download_path, self.local_path])
        if not Utils.check_netcdf_file(self.local_path):
            self.message = 'Can not retrieve {0} from server'.format(self.download_path)
            self.status = DataStatus.UNAVAILABLE
            return
        self.status = DataStatus.ON_LOCAL