interpolatecdo.py 13.6 KB
Newer Older
"""CDO-based interpolation"""
import numpy as np
from bscearth.utils.log import Log

from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticDomainOption, DiagnosticVariableListOption, \
    DiagnosticChoiceOption, DiagnosticBoolOption, DiagnosticOption
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile

class InterpolateCDO(Diagnostic):
    """
    3-dimensional conservative interpolation to the regular atmospheric grid.
    It can also be used for 2D (i,j) variables

    :original author: Javier Vegas-Regidor<javier.vegas@bsc.es>

    :created: October 2016

    :param data_manager: data management object
    :type data_manager: DataManager
    :param startdate: startdate
    :type startdate: str
    :param member: member number
    :type member: int
    :param chunk: chunk's number
    :type chunk: int
    :param variable: variable's name
    :type variable: str
    :param domain: variable's domain
    :type domain: ModelingRealm
    :param model_version: model version
    :type model_version: str
    """

    "Diagnostic alias for the configuration file"

    BILINEAR = 'bilinear'
    BICUBIC = 'bicubic'
    CONSERVATIVE = 'conservative'
    CONSERVATIVE2 = 'conservative2'

    METHODS = [BILINEAR, BICUBIC, CONSERVATIVE, CONSERVATIVE2]

    def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid, model_version,
                 mask_oceans, original_grid, weights):
        Diagnostic.__init__(self, data_manager)
        self.startdate = startdate
        self.member = member
        self.chunk = chunk
        self.variable = variable
        self.domain = domain
        self.model_version = model_version
        self.required_vars = [variable]
        self.generated_vars = [variable]
        self.tempTemplate = ''
        self.grid = target_grid
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.mask_oceans = mask_oceans
        self.weights = weights

    def __eq__(self, other):
        if self._different_type(other):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            return False

        return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
            self.model_version == other.model_version and self.domain == other.domain and \
            self.variable == other.variable and self.mask_oceans == other.mask_oceans and self.grid == other.grid and \
            self.original_grid == other.original_grid
    def __hash__(self):
        return hash(str(self))

    def __str__(self):
        return 'Interpolate with CDO Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} ' \
               'Variable: {0.domain}:{0.variable} Target grid: {0.grid} Original grid: {0.original_grid} ' \
               'Mask ocean: {0.mask_oceans} Model: {0.model_version}'.format(self)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def generate_jobs(cls, diags, options):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Create a job for each chunk to compute the diagnostic
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        :param diags: Diagnostics manager class
        :type diags: Diags
        :param options: target_grid, variable, domain=ocean
        :type options: list[str]
        :return:
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        options_available = (DiagnosticDomainOption(default_value=ModelingRealms.ocean),
                             DiagnosticVariableListOption(diags.data_manager.config.var_manager, 'variables'),
                             DiagnosticOption('target_grid', diags.config.experiment.atmos_grid.lower()),
                             DiagnosticChoiceOption('method', InterpolateCDO.METHODS, InterpolateCDO.BILINEAR),
                             DiagnosticOption('original_grid', ''),
                             DiagnosticBoolOption('weights_from_mask', True)
                             )
        options = cls.process_options(options, options_available)
        target_grid = cls._translate_ifs_grids_to_cdo_names(options['target_grid'])
        if not target_grid:
            raise Exception('Target grid not provided')
        job_list = list()
        weights = TempFile.get()
        method = options['method'].lower()
        if options['weights_from_mask']:
            temp = cls.get_sample_grid_file()
            cls.compute_weights(method, target_grid, temp, weights)
            os.remove(temp)
            weights_job = None
        else:
            startdate, member, chunk = diags.config.experiment.get_chunk_list()[0]
            weights_job = ComputeWeights(diags.data_manager, startdate, member, chunk, options['domain'],
                                         options['variables'][0], target_grid, options['original_grid'], weights,
                                         options['method'])

        for var in options['variables']:
            for startdate, member, chunk in diags.config.experiment.get_chunk_list():
                job = InterpolateCDO(diags.data_manager, startdate, member, chunk, options['domain'], var, target_grid,
                                     diags.config.experiment.model_version, options['mask_oceans'],
                                     options['original_grid'], weights)
                if weights_job is not None:
                    job.add_subjob(weights_job)
                job_list.append(job)
    def compute_weights(cls, method, target_grid, sample_file, weights):
        """
        Compute weights for interpolation from sample file

        Parameters
        ----------
        method: int
            Interpolation method
        target_grid: str
            Grid to intepolate to. Can be anything understand by CDO
        sample_file: str
            Path to a file containing original mesh information
        weights:
            Path to the file to store the weights

        if method == InterpolateCDO.BILINEAR:
            Utils.cdo().genbil(target_grid, input=sample_file, output=weights)
        elif method == InterpolateCDO.BICUBIC:
            Utils.cdo().genbic(target_grid, input=sample_file, output=weights)
        elif method == InterpolateCDO.CONSERVATIVE:
            Utils.cdo().genycon(target_grid, input=sample_file, output=weights)
        elif method == InterpolateCDO.CONSERVATIVE2:
            Utils.cdo().gencon2(target_grid, input=sample_file, output=weights)
    def get_sample_grid_file(cls):
        """
        Get a sample grid file

        Create a sample grid file from the definition in the masks file

        Returns
        -------
        str
        """
        temp = TempFile.get()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf('mask.nc')
        lat_name, lon_name = cls._get_lat_lon_alias(handler)
        lon_bnds_name = '{0}_bnds'.format(lon_name)
        lat_bnds_name = '{0}_bnds'.format(lat_name)

        Utils.nco().ncks(
            input='mask.nc',
            output=temp,
            options=('-O -v tmask,{0},{1},gphif,glamf'.format(lat_name, lon_name),)
        )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(temp)
        lon = handler.variables[lon_name]
        lon.units = "degrees_east"
        lon.long_name = "Longitude"
        lon.nav_model = "Default grid"
        lon.standard_name = "longitude"
        lon.short_name = lon_name
        lon.bounds = lon_bnds_name
        lat = handler.variables[lat_name]
        lat.units = "degrees_north"
        lat.long_name = "Latitude"
        lat.nav_model = "Default grid"
        lat.standard_name = "latitude"
        lat.short_name = lat_name
        lat.bounds = lat_bnds_name

        handler.createDimension('bounds', 4)

        lon_bnds = handler.createVariable(lon_bnds_name, lon.datatype, ('j', 'i', 'bounds'))
        corner_lat = handler.variables['glamf'][0, ...]
        lon_bnds[:, :, 0] = corner_lat
        lon_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
        lon_bnds[:, :, 2] = np.roll(corner_lat, -1, 1)
        lon_bnds[:, :, 3] = np.roll(lon_bnds[:, :, 1], -1, 1)

        lat_bnds = handler.createVariable(lat_bnds_name, lat.datatype, ('j', 'i', 'bounds'))
        corner_lat = handler.variables['gphif'][0, ...]
        lat_bnds[:, :, 0] = corner_lat
        lat_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
        lat_bnds[:, :, 2] = np.roll(corner_lat, 1, 1)
        lat_bnds[:, :, 3] = np.roll(lat_bnds[:, :, 1], 1, 1)
        lat_bnds[0, :, 1] = lat_bnds[1, 0, 1] - 1
        lat_bnds[0, :, 3] = lat_bnds[1, 0, 3] - 1

        tmask = handler.variables['tmask']
        tmask.coordinates = 'time lev {0} {1}'.format(lat_name, lon_name)
        Utils.nco().ncks(input=temp, output=temp, options=('-O -x -v gphif,glamf',))
    @classmethod
    def _get_lat_lon_alias(cls, handler):
        lat_name = None
        for lat_alias in ['lat', 'latitude']:
            if lat_alias in handler.variables:
                lat_name = lat_alias
                break
        lon_name = None
        for lon_alias in ['lon', 'longitude']:
            if lon_alias in handler.variables:
                lon_name = lon_alias
                break
        return lat_name, lon_name

    def _translate_ifs_grids_to_cdo_names(cls, target_grid):
        if target_grid.upper().startswith('T159L'):
            target_grid = 't106grid'
        if target_grid.upper().startswith('T255L'):
            target_grid = 't170grid'
        if target_grid.upper().startswith('T511L'):
            target_grid = 't340grid'
    def request_data(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Request data required by the diagnostic"""
        self.original = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
                                           grid=self.original_grid)

    def declare_data_generated(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Declare data to be generated by the diagnostic"""
        self.regridded = self.declare_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
                                            grid=self.grid)

    def compute(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Run the diagnostic"""
        variable_file = TempFile.get()
        Utils.copy_file(self.original.local_file, variable_file)
        Utils.rename_variables(
            variable_file,
            {
                'jpib': 'i', 'jpjb': 'j', 'x': 'i', 'y': 'j',
                'dim1': 'j', 'dim2': 'i',
                'time_counter': 'time', 't': 'time',
                'SSTK_ens0': 'tos', 'SSTK_ens1': 'tos', 'SSTK_ens2': 'tos',
                'nav_lat': 'lat', 'nav_lon': 'lon',
                'time_centered': None, 'time_centered_bnds': None
            },
            must_exist=False
        )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(variable_file)
        lat_name, lon_name = self._get_lat_lon_alias(handler)
        var = handler.variables[self.variable]
        try:
            units = var.units
        except AttributeError:
            units = None
        coordinates = list()
        for dim in var.dimensions:
            if dim == 'i':
                coordinates.append(lon_name)
            elif dim == 'j':
                coordinates.append(lat_name)
            else:
                coordinates.append(dim)
        var.coordinates = ' '.join(coordinates)
        if self.mask_oceans:
            mask = Utils.get_mask(Basins().Global).astype(float)
            mask[mask == 0] = np.nan
            var[:] = mask * var[:]
        handler.close()

        temp = TempFile.get()
        Utils.cdo().remap(','.join((self.grid.split('_')[0], self.weights)), input=variable_file, output=temp)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(temp)
        if units:
            handler.variables[self.variable].units = units
        if lat_name != 'lat':
            Utils.rename_variables(temp, {'lat': lat_name, 'lon': lon_name}, True)
        self.regridded.set_local_file(temp)

class ComputeWeights(Diagnostic):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    Diagnostic used to compute interpolation weights
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    Parameters
    ----------
    data_manager: DataManager
    startdate: str
    member: int
    chunk: int
    domain: ModelingRealm
    variable: str
    target_grid: str
    original_grid: str
    weights_file: str
    method: str

    """
    alias = 'computeinterpcdoweights'
    "Diagnostic alias for the configuration file"
    @classmethod
    def generate_jobs(cls, diags, options):
        """
        Generate the instances of the diagnostics that will be run by the manager

        This method does not does anything as this diagnostic is not expected to be called by the users
        """
        pass

    def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid,
                 original_grid, weights_file, method):
        Diagnostic.__init__(self, data_manager)
        self.startdate = startdate
        self.member = member
        self.chunk = chunk
        self.variable = variable
        self.domain = domain
        self.grid = target_grid
        self.original_grid = original_grid
        self.weights_file = weights_file
        self.method = method

    def __str__(self):
        return 'Computing weights for CDO interpolation: Method {0.method} Target grid: {0.grid}'.format(self)

    def compute(self):
        """Compute weights"""
        InterpolateCDO.compute_weights(self.method, self.grid, self.sample_data.local_file, self.weights_file)

    def request_data(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Request data required by the diagnostic"""
        self.sample_data = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
                                              grid=self.original_grid)

    def declare_data_generated(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Declare data to be generated by the diagnostic"""