datafile.py 20.1 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding: utf-8
import csv
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
import shutil
from datetime import datetime

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
import numpy as np
from bscearth.utils.log import Log

from earthdiagnostics.modelingrealm import ModelingRealms
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.utils import Utils, TempFile
from publisher import Publisher
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from variable_type import VariableType
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed


class LocalStatus(object):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Local file status enumeration"""
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    PENDING = 0
    DOWNLOADING = 1
    READY = 2
    FAILED = 3
    COMPUTING = 5
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed


class StorageStatus(object):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Remote file status enumeration"""
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    PENDING = 0
    UPLOADING = 1
    READY = 2
    FAILED = 3
class DataFile(Publisher):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    Represent a data file

    Must be derived for each concrete data file format
    """
    def __init__(self):
        super(DataFile, self).__init__()
        self.remote_file = None
        self.local_file = None
        self.domain = None
        self.var = None
        self.cmor_var = None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.region = None
        self.frequency = None
        self.data_convention = None
        self.diagnostic = None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.grid = None
        self.data_manager = None
        self.final_name = None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.var_type = VariableType.MEAN
        self._local_status = LocalStatus.NOT_REQUESTED
        self._storage_status = StorageStatus.READY
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.job_added = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.lon_name = None
        self.lat_name = None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __str__(self):
        return 'Data file for {0}'.format(self.remote_file)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """File size"""
        if self._size is None:
            self._get_size()
        return self._size

    def _get_size(self):
        try:
            if self.local_status == LocalStatus.READY:
                self._size = os.path.getsize(self.local_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        except OSError:
            self._size = None
    def clean_local(self):
        if self.local_status != LocalStatus.READY or len(self.suscribers) > 0 or self.upload_required() or \
           self.storage_status == StorageStatus.UPLOADING:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            return
        Log.debug('File {0} no longer needed. Deleting from scratch...'.format(self.remote_file))
        os.remove(self.local_file)
        Log.debug('File {0} deleted from scratch'.format(self.remote_file))
        self.local_file = None
        self.local_status = LocalStatus.PENDING
    def only_suscriber(self, who):
        if len(self._subscribers) != 1:
            return
        return who in self._subscribers

    def upload_required(self):
        return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING

    def download_required(self):
        if not self.local_status == LocalStatus.PENDING:
            return False

        if self.storage_status == StorageStatus.READY:
            return True

        if self.has_modifiers():
            return True
    def add_modifier(self, diagnostic):
        self._modifiers.append(diagnostic)

    def has_modifiers(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return self._modifiers
    def ready_to_run(self, diagnostic):
        if not self.local_status == LocalStatus.READY:
            return False
        if len(self._modifiers) == 0:
            return True
        return self._modifiers[0] is diagnostic

    @property
    def local_status(self):
        return self._local_status

    @local_status.setter
    def local_status(self, value):
        if self._local_status == value:
            return
        self._local_status = value
    @property
    def storage_status(self):
        return self._storage_status

    @storage_status.setter
    def storage_status(self, value):
        if self._storage_status == value:
            return
        self._storage_status = value
    @classmethod
    def from_storage(cls, filepath, data_convention):
        file_object = cls()
        file_object.remote_file = filepath
        file_object.local_status = LocalStatus.PENDING
        file_object.data_convention = data_convention
        return file_object

    @classmethod
    def to_storage(cls, remote_file, data_convention):
        new_object = cls()
        new_object.remote_file = remote_file
        new_object.storage_status = StorageStatus.PENDING
        new_object.data_convention = data_convention
        return new_object
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def download(self):
        raise NotImplementedError('Class must implement the download method')
    def prepare_to_upload(self, rename_var):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if self.data_convention in ('primavera', 'cmip6'):
            self.lon_name = 'longitude'
            self.lat_name = 'latitude'
        else:
            self.lon_name = 'longitude'
            self.lat_name = 'latitude'

        Utils.convert2netcdf4(self.local_file)
        if rename_var:
            original_name = rename_var
        else:
            original_name = self.var
        if self.final_name != original_name:
            Utils.rename_variable(self.local_file, original_name, self.final_name)
        self._rename_coordinate_variables()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._correct_metadata()
        self._prepare_region()
        self.add_diagnostic_history()
        if self.region is not None:
            self.upload()

    def upload(self):
        self.storage_status = StorageStatus.UPLOADING
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
            Utils.copy_file(self.local_file, self.remote_file, save_hash=True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        except (OSError, Exception) as ex:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex)
            self.storage_status = StorageStatus.FAILED
            return

        Log.info('File {0} uploaded!', self.remote_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.create_link()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.storage_status = StorageStatus.READY

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def set_local_file(self, local_file, diagnostic=None, rename_var='', region=None):
        if diagnostic in self._modifiers:
            self._modifiers.remove(diagnostic)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if region is not None:
            self.region = region.name
        else:
            self.region = None
        self.local_file = local_file
        self.prepare_to_upload(rename_var)
        self.local_status = LocalStatus.READY

    def create_link(self):
        pass

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _correct_metadata(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(self.local_file)
        var_handler = handler.variables[self.final_name]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        coords = set.intersection({'time', 'lev', self.lat_name, self.lon_name}, set(handler.variables.keys()))
        var_handler.coordinates = ' '.join(coords)
        if not self.cmor_var:
            handler.close()
            return

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._fix_variable_name(var_handler)
        handler.modeling_realm = self.cmor_var.domain.name
        table = self.cmor_var.get_table(self.frequency, self.data_convention)
        handler.table_id = 'Table {0} ({1})'.format(table.name, table.date)
        if self.cmor_var.units:
            self._fix_units(var_handler)
        handler.sync()
        self._fix_coordinate_variables_metadata(handler)
        var_type = var_handler.dtype
        handler.close()
        self._fix_values_metadata(var_type)

    def _fix_variable_name(self, var_handler):
        var_handler.standard_name = self.cmor_var.standard_name
        var_handler.long_name = self.cmor_var.long_name

    def _fix_values_metadata(self, var_type, file_path=None):
        if file_path is None:
            file_path = self.local_file
        valid_min = ''
        valid_max = ''

        if self.cmor_var is not None:
            if self.cmor_var.valid_min:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char,
                                                                   self.cmor_var.valid_min)
            if self.cmor_var.valid_max:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char,
                                                                   self.cmor_var.valid_max)

        Utils.nco.ncatted(input=file_path, output=file_path,
                          options=('-O -a _FillValue,{0},o,{1},"1.e20" '
                                   '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.final_name, var_type.char,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                                                                     valid_min, valid_max),))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _fix_coordinate_variables_metadata(self, handler):
        if 'lev' in handler.variables:
            handler.variables['lev'].short_name = 'lev'
            if self.domain == ModelingRealms.ocean:
                handler.variables['lev'].standard_name = 'depth'
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if self.lon_name in handler.variables:
            handler.variables[self.lon_name].short_name = self.lon_name
            handler.variables[self.lon_name].standard_name = 'longitude'
        if self.lat_name in handler.variables:
            handler.variables[self.lat_name].short_name = self.lat_name
            handler.variables[self.lat_name].standard_name = 'latitude'
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _fix_units(self, var_handler):
        if 'units' not in var_handler.ncattrs():
            return
        if var_handler.units == '-':
            var_handler.units = '1.0'
        if var_handler.units == 'PSU':
            var_handler.units = 'psu'
        if var_handler.units == 'C' and self.cmor_var.units == 'K':
            var_handler.units = 'deg_C'
        if self.cmor_var.units != var_handler.units:
            self._convert_units(var_handler)
        var_handler.units = self.cmor_var.units

    def _convert_units(self, var_handler):
        try:
            Utils.convert_units(var_handler, self.cmor_var.units)
        except ValueError as ex:
            Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
                        self.cmor_var.short_name)
            factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
                                                                         self.cmor_var.units)

            var_handler[:] = var_handler[:] * factor + offset
            if 'valid_min' in var_handler.ncattrs():
                var_handler.valid_min = float(var_handler.valid_min) * factor + offset
            if 'valid_max' in var_handler.ncattrs():
                var_handler.valid_max = float(var_handler.valid_max) * factor + offset

    def _prepare_region(self):
        if not self.region:
            return
        if not os.path.exists(self.remote_file):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self._add_region_dimension_to_var()
        else:
            self._update_var_with_region_data()
            self._correct_metadata()
        Utils.nco.ncks(input=self.local_file, output=self.local_file, options=['--fix_rec_dmn region'])
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(self.local_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        regions = handler.variables['region'][...].tolist()
        if len(regions) > 1:
            ordered_regions = sorted(regions)
            print(regions)
            print(ordered_regions)
            new_indexes = [regions.index(region) for region in ordered_regions]
            print(new_indexes)

            for var in handler.variables.values():
                if 'region' not in var.dimensions:
                    continue
                index_region = var.dimensions.index('region')
                var_values = var[...]
                var_ordered = np.take(var_values, new_indexes, index_region)
                var[...] = var_ordered
        handler.close()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _update_var_with_region_data(self):
        temp = TempFile.get()
        shutil.copyfile(self.remote_file, temp)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(temp)
        var_handler = handler.variables[self.final_name]
        var_type = var_handler.dtype
        handler.close()
        self._fix_values_metadata(var_type, temp)

        Utils.nco.ncks(input=temp, output=temp, options=['--mk_rec_dmn region'])
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(temp)
        var_handler = handler.variables[self.final_name]
        if hasattr(var_handler, 'valid_min'):
            del var_handler.valid_min
        if hasattr(var_handler, 'valid_max'):
            del var_handler.valid_max
        handler.sync()
        cubes = iris.load(self.local_file)
        for cube in cubes:
            if self.final_name == cube.var_name:
                value = cube.data
                break
        if isinstance(value, np.ma.MaskedArray):
            value = np.ma.getdata(value)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        var_region = handler.variables['region']
        basin_index = np.where(var_region[:] == self.region)
        if len(basin_index[0]) == 0:
            var_region[var_region.shape[0]] = self.region
            basin_index = var_region.shape[0] - 1

        else:
            basin_index = basin_index[0][0]
        handler.variables[self.final_name][..., basin_index] = np.multiply(np.ones(value.shape), value)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler.close()
        Utils.move_file(temp, self.local_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _add_region_dimension_to_var(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(self.local_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler.createDimension('region')
        var_region = handler.createVariable('region', str, 'region')
        var_region[0] = self.region
        original_var = handler.variables[self.final_name]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        new_var = handler.createVariable('new_var', original_var.datatype,
                                         original_var.dimensions + ('region',))
        new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
        value = original_var[:]
        new_var[..., 0] = value
        handler.close()
        Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('-x -v {0}'.format(self.final_name),))
        Utils.rename_variable(self.local_file, 'new_var', self.final_name)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _rename_coordinate_variables(self):
        variables = dict()
        variables['x'] = 'i'
        variables['y'] = 'j'
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        variables['nav_lat_grid_V'] = self.lat_name
        variables['nav_lon_grid_V'] = self.lon_name
        variables['nav_lat_grid_U'] = self.lat_name
        variables['nav_lon_grid_U'] = self.lon_name
        variables['nav_lat_grid_T'] = self.lat_name
        variables['nav_lon_grid_T'] = self.lon_name
        Utils.rename_variables(self.local_file, variables, False, True)
    def add_diagnostic_history(self):
        if not self.diagnostic:
            return
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        from earthdiagnostics.earthdiags import EarthDiags
        history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,
                                                                                            self.diagnostic)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._add_history_line(history_line)

    def add_cmorization_history(self):
        from earthdiagnostics.earthdiags import EarthDiags
        history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)
        self._add_history_line(history_line)

    def _add_history_line(self, history_line):
        utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
        history_line = '{0}: {1};'.format(utc_datetime, history_line)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(self.local_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
            history_line = history_line + handler.history
        except AttributeError:
            history_line = history_line
        handler.history = Utils.convert_to_ascii_if_possible(history_line)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler.close()


class UnitConversion(object):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Class to manage unit conversions"""

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    _dict_conversions = None

    @classmethod
    def load_conversions(cls):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Load conversions from the configuration file"""
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        cls._dict_conversions = dict()
        with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
            reader = csv.reader(csvfile, dialect='excel')
            for line in reader:
                if line[0] == 'original':
                    continue
                cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))

    @classmethod
    def add_conversion(cls, conversion):
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Add a conversion to the dictionary

        Parameters
        ----------
        conversion: UnitConversion
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        """
        cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion

    def __init__(self, source, destiny, factor, offset):
        self.source = source
        self.destiny = destiny
        self.factor = float(factor)
        self.offset = float(offset)

    @classmethod
    def get_conversion_factor_offset(cls, input_units, output_units):
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Get the conversion factor and offset for two units.

        The conversion has to be done in the following way:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        converted = original * factor + offset

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Parameters
        ----------
        input_units: str
        output_units = str

        Returns
        -------
        float
            Factor
        float
            Offset

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        units = input_units.split()
        if len(units) == 1:
            scale_unit = 1
            unit = units[0]
        else:
            if '^' in units[0]:
                values = units[0].split('^')
                scale_unit = pow(int(values[0]), int(values[1]))
            else:
                scale_unit = float(units[0])
            unit = units[1]

        units = output_units.split()
        if len(units) == 1:
            scale_new_unit = 1
            new_unit = units[0]
        else:
            if '^' in units[0]:
                values = units[0].split('^')
                scale_new_unit = pow(int(values[0]), int(values[1]))
            else:
                scale_new_unit = float(units[0])
            new_unit = units[1]

        factor, offset = UnitConversion._get_factor(new_unit, unit)
        if factor is None:
            return None, None
        factor = factor * scale_unit / float(scale_new_unit)
        offset /= float(scale_new_unit)

        return factor, offset

    @classmethod
    def _get_factor(cls, new_unit, unit):
        # Add  only the conversions with a factor greater than 1
        if unit == new_unit:
            return 1, 0
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if (unit, new_unit) in cls._dict_conversions:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            conversion = cls._dict_conversions[(unit, new_unit)]
            return conversion.factor, conversion.offset
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if (new_unit, unit) in cls._dict_conversions:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            conversion = cls._dict_conversions[(new_unit, unit)]
            return 1 / conversion.factor, -conversion.offset
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return None, None
class NetCDFFile(DataFile):

    def download(self):
        try:
            self.local_status = LocalStatus.DOWNLOADING
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.debug('Downloading file {0}...', self.remote_file)
            if not self.local_file:
                self.local_file = TempFile.get()
            Utils.get_file_hash(self.remote_file, use_stored=True, save=True)
            Utils.copy_file(self.remote_file, self.local_file)
            if self.data_convention == 'meteofrance':
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.debug('Converting variable names from meteofrance convention')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                alt_coord_names = {'time_counter': 'time', 'time_counter_bounds': 'time_bnds',
                                   'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i',
                                   'y': 'j'}
                Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False, rename_dimension=True)
            Log.info('File {0} ready!', self.remote_file)
            self.local_status = LocalStatus.READY

        except Exception as ex:
            if os.path.isfile(self.local_file):
                os.remove(self.local_file)
            Log.error('File {0} not available: {1}', self.remote_file, ex)
            self.local_status = LocalStatus.FAILED
    def create_link(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self.data_manager.create_link(self.domain, self.remote_file, self.frequency, self.final_name,
                                          self.grid, True, self.var_type)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        except (ValueError, Exception) as ex:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file))
    def _get_size(self):
        try:
            if self.local_status == LocalStatus.READY:
                self._size = os.path.getsize(self.local_file)
            if self.storage_status == StorageStatus.READY:
                self._size = os.path.getsize(self.remote_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        except OSError:
            self._size = None