regionmean.py 14.9 KB
Newer Older
# coding=utf-8
"""Diagnostic to compute regional averages"""
import iris.util
import iris.analysis
import iris.exceptions
import numpy as np

from earthdiagnostics.box import Box
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticIntOption, DiagnosticDomainOption, \
    DiagnosticBoolOption, DiagnosticBasinOption, DiagnosticVariableOption
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile


class RegionMean(Diagnostic):
    """
    Computes the mean value of the field (3D, weighted).

    For 3D fields, a horizontal mean for each level is also given. If a spatial window
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    is specified, the mean value is computed only in this window.

    :original author: Javier Vegas-Regidor <javier.vegas@bsc.es>

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    :created: March 2017

    :param data_manager: data management object
    :type data_manager: DataManager
    :param startdate: startdate
    :type startdate: str
    :param member: member number
    :type member: int
    :param chunk: chunk's number
    :type chunk: int
    :param variable: variable to average
    :type variable: str
    :param box: box used to restrict the vertical mean
    :type box: Box
    """

    alias = 'regmean'
    "Diagnostic alias for the configuration file"

    def __init__(self, data_manager, startdate, member, chunk, domain, variable, box, save3d, weights_file,
                 variance, basin):
        Diagnostic.__init__(self, data_manager)
        self.startdate = startdate
        self.member = member
        self.chunk = chunk
        self.domain = domain
        self.variable = variable
        self.box = box
        self.save3d = save3d
        self.weights_file = weights_file
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.variance = variance
        self.declared = {}
        self.lat_name = 'lat'
        self.lon_name = 'lon'

    def __eq__(self, other):
        if type(self) is not type(other):
            return False
        return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
            self.box == other.box and self.variable == other.variable

    def __str__(self):
        return 'Region mean Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} Variable: {0.variable} ' \
               'Box: {0.box} Save 3D: {0.save3d} Save variance: {0.variance}'.format(self)

    def __hash__(self):
        return hash(str(self))

    @classmethod
    def generate_jobs(cls, diags, options):
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Create a job for each chunk to compute the diagnostic

        :param diags: Diagnostics manager class
        :type diags: Diags
        :param options: variable, minimum depth (level), maximum depth (level)
        :type options: list[str]
        :return:
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        options_available = (DiagnosticDomainOption(),
                             DiagnosticVariableOption(diags.data_manager.config.var_manager),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                             DiagnosticOption('grid_point', 'T'),
                             DiagnosticBasinOption('basin', Basins().Global),
                             DiagnosticIntOption('min_depth', -1),
                             DiagnosticIntOption('max_depth', -1),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                             DiagnosticBoolOption('save3D', True),
                             DiagnosticBoolOption('variance', False),
                             DiagnosticOption('grid', ''))
        options = cls.process_options(options, options_available)

        box = Box()
        box.min_depth = options['min_depth']
        box.max_depth = options['max_depth']
        weights_file = TempFile.get()
        weight_diagnostics = ComputeWeights(diags.data_manager, options['grid_point'],  options['basin'], box,
                                            weights_file)
        job_list = list()
        for startdate, member, chunk in diags.config.experiment.get_chunk_list():
            job = RegionMean(diags.data_manager, startdate, member, chunk,
                             options['domain'], options['variable'], box,
                             options['save3D'], weights_file, options['variance'], options['basin'])
            job.add_subjob(weight_diagnostics)
            job_list.append(job)

        return job_list

    def request_data(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Request data required by the diagnostic"""
        self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk)
    def declare_data_generated(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Declare data to be generated by the diagnostic"""
        if self.box.min_depth == 0:
            # To cdftools, this means all levels
            box_save = None
        else:
            box_save = self.box

        self._declare_var('mean', False, box_save)
        self._declare_var('mean', True, box_save)

        if self.variance:
            self._declare_var('var', False, box_save)
            self._declare_var('var', True, box_save)
    def compute(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Run the diagnostic"""
        iris.FUTURE.netcdf_promote = True
        iris.FUTURE.netcdf_no_unlimited = True

        has_levels = self._fix_file_metadata()

        data = self._load_data()

        weights = iris.load_cube(self.weights_file, 'weights').data
        i_indexes = iris.load_cube(self.weights_file, 'i_indexes').data
        j_indexes = iris.load_cube(self.weights_file, 'j_indexes').data
        lev_limits = iris.load_cube(self.weights_file, 'lev_limits').data

        def selected_i(cell):
            return cell.point - 1 in i_indexes

        def selected_j(cell):
            return cell.point - 1 in j_indexes

        def selected_level(cell):
            return lev_limits[0] <= cell.point - 1 <= lev_limits[1]

        data = data.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level))
        if has_levels:
            self._meand_3d_variable(data, weights)
        else:
            self._mean_2d_var(data, weights)

    def _mean_2d_var(self, data, weights):
        mean = iris.cube.CubeList()
        var = iris.cube.CubeList()
        for time_slice in data.slices_over('time'):
            mean.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.MEAN, weights=weights))
            var.append(time_slice.collapsed(['latitude', 'longitude'], iris.analysis.VARIANCE, weights=weights))
        self._send_var('mean', False, mean.merge_cube())
        if self.variance:
            self._send_var('var', False, var.merge_cube())

    def _meand_3d_variable(self, data, weights):
        mean = iris.cube.CubeList()
        mean3d = iris.cube.CubeList()
        var = iris.cube.CubeList()
        var3d = iris.cube.CubeList()
        for time_slice in data.slices_over('time'):
            mean.append(time_slice.collapsed(['latitude', 'longitude', 'depth'],
                                             iris.analysis.MEAN, weights=weights))
            if self.save3d:
                mean3d.append(time_slice.collapsed(['latitude', 'longitude'],
                                                   iris.analysis.MEAN, weights=weights))
            if self.variance:
                var.append(time_slice.collapsed(['latitude', 'longitude', 'depth'],
                                                iris.analysis.VARIANCE, weights=weights))
                if self.save3d:
                    var3d.append(time_slice.collapsed(['latitude', 'longitude'],
                                                      iris.analysis.VARIANCE, weights=weights))
        self._send_var('mean', True, mean3d)
        self._send_var('mean', False, mean)
        if self.variance:
            self._send_var('var', True, var3d)

            self._send_var('var', False, var)
    def _load_data(self):
        def add_i_j(cube, field, filename):
            if cube.var_name != self.variable:
                raise iris.exceptions.IgnoreCubeException()
            if not cube.coords('i'):
                index = field.dimensions.index('i')
                i = np.arange(1, field.shape[index] + 1)
                i_coord = iris.coords.DimCoord(i, var_name='i')
                cube.add_dim_coord(i_coord, index)
            if not cube.coords('j'):
                index = field.dimensions.index('j')
                i = np.arange(1, field.shape[index] + 1)
                i_coord = iris.coords.DimCoord(i, var_name='j')
                cube.add_dim_coord(i_coord, index)
            if not cube.coords('lev'):
                index = field.dimensions.index('lev')
                i = np.arange(1, field.shape[index] + 1)
                lev = iris.coords.AuxCoord(i, var_name='lev')
                cube.add_aux_coord(lev, index)
        coords = []
        handler = Utils.open_cdf(self.variable_file.local_file)
        for variable in handler.variables:
            if variable in ('time', 'lev', 'lat', 'lon', 'latitude', 'longitude'):
                coords.append(variable)

        handler.variables[self.variable].coordinates = ' '.join(coords)
        handler.close()

        data = iris.load_cube(self.variable_file.local_file,
                              callback=add_i_j)

        if data.coords('model_level_number'):
            coord = data.coord('model_level_number')
            coord.standard_name = 'depth'
            coord.long_name = 'depth'

        return data
    def _fix_file_metadata(self):
        handler = Utils.open_cdf(self.variable_file.local_file)
        var = handler.variables[self.variable]
        coordinates = ''
        has_levels = False
        for dimension in handler.variables.keys():
            if dimension in ['time', 'lev', 'lat', 'latitude', 'lon', 'longitude', 'i', 'j']:
                coordinates += ' {0}'.format(dimension)
            if dimension == 'lev':
                has_levels = True
        var.coordinates = coordinates
        handler.close()
        return has_levels
    def _declare_var(self, var, threed, box_save):
        if threed:
            if not self.save3d:
                return False
            final_name = '{1}3d{0}'.format(var, self.variable)
        else:
            final_name = '{1}{0}'.format(var, self.variable)

        self.declared[final_name] = self.declare_chunk(ModelingRealms.ocean, final_name, self.startdate, self.member,
                                                       self.chunk, box=box_save, region=self.basin)

    def _send_var(self, var, threed, cube_list):
        if threed:
            if not self.save3d and threed:
                return False
            final_name = '{1}3d{0}'.format(var, self.variable)
            final_name = '{1}{0}'.format(var, self.variable)
        cube = cube_list.merge_cube()
        print(cube)
        print(cube.data)
        cube.var_name = 'result'
        cube.remove_coord('latitude')
        cube.remove_coord('longitude')
        cube.remove_coord('depth')
        cube.remove_coord('lev')
        temp = TempFile.get()
        iris.save(cube, temp)
        self.declared[final_name].set_local_file(temp, diagnostic=self, rename_var='result', region=self.basin)


class ComputeWeights(Diagnostic):
    """
    Diagnostic used to compute regional mean and sum weights

    Parameters
    ----------
    data_manager: DataManager
    grid_point: str
    basin: int
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    alias = 'computeregmeanweights'
    "Diagnostic alias for the configuration file"

    @classmethod
    def generate_jobs(cls, diags, options):
        """
        Generate the instances of the diagnostics that will be run by the manager

        This method does not does anything as this diagnostic is not expected to be called by the users
        """
    def __init__(self, data_manager, grid_point, basin, box, weights_file):
        Diagnostic.__init__(self, data_manager)
        self.weights_file = weights_file
        self.basin = basin
        self.grid_point = grid_point.lower()
        self.box = box
    def __eq__(self, other):
        if type(self) is not type(other):
            return False
        return self.weights_file == other.weights_file and self.basin == other.basin and \
            self.grid_point == other.grid_point and self.box != other.box
        return 'Computing weights for region averaging: Point {0.grid_point} Basin: {0.basin} Box: {0.box}'\
            .format(self)

    def __hash__(self):
        return hash(str(self))

    def compute(self):
        """Compute weights"""
        iris.FUTURE.netcdf_promote = True
        iris.FUTURE.netcdf_no_unlimited = True

        mask = np.squeeze(Utils.get_mask(self.basin, True))
        surface_mask = mask[0, ...]
        i_indexes = np.where(np.any(surface_mask != 0, 0))[0]
        j_indexes = np.where(np.any(surface_mask != 0, 1))[0]
        mask_small = np.take(np.take(mask, i_indexes, 2), j_indexes, 1)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        e1 = self._try_load_cube(1)
        e2 = self._try_load_cube(2)
        e3 = self._try_load_cube(3)
        depth = iris.util.squeeze(iris.load_cube('mesh_hgr.nc', 'gdept_0'))
        if self.box.min_depth == -1:
            min_level = 0
        else:
            distance = abs((depth - self.box.min_depth).data)
            min_level = np.argmin(distance)

        if self.box.max_depth == -1:
            max_level = depth.shape[0]
        else:
            distance = abs((depth - self.box.max_depth).data)
            max_level = np.argmin(distance)

        def selected_i(cell):
            return cell.point - 1 in i_indexes

        def selected_j(cell):
            return cell.point - 1 in j_indexes

        def selected_level(cell):
            return min_level <= cell.point - 1 <= max_level
        e1_small = e1.extract(iris.Constraint(i=selected_i, j=selected_j))
        e2_small = e2.extract(iris.Constraint(i=selected_i, j=selected_j))
        e3_small = e3.extract(iris.Constraint(i=selected_i, j=selected_j, lev=selected_level))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        mask_small = mask_small[min_level:max_level + 1, ...]
        mask_small = e3_small * mask_small
        e_small = e1_small * e2_small
        for coord in e_small.coords():
            e_small.remove_coord(coord)
        for coord in mask_small.coords():
            mask_small.remove_coord(coord)
        weights = mask_small * e_small
        weights.var_name = 'weights'
        i_indexes = iris.cube.Cube(i_indexes, var_name='i_indexes')
        j_indexes = iris.cube.Cube(j_indexes, var_name='j_indexes')
        lev_limits = iris.cube.Cube([min_level, max_level], var_name='lev_limits')
        iris.save((weights, i_indexes, j_indexes, lev_limits), self.weights_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _try_load_cube(self, number):
        try:
            cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}'.format(number, self.grid_point))
        except iris.exceptions.ConstraintMismatchError:
            cube = iris.load_cube('mesh_hgr.nc', 'e{0}{1}_0'.format(number, self.grid_point))
        return iris.util.squeeze(cube)

    def request_data(self):
        """Request data required by the diagnostic"""

    def declare_data_generated(self):
        """Declare data to be generated by the diagnostic"""
        pass