Source code for nes.methods.vertical_interpolation

#!/usr/bin/env python

import sys
import nes
from scipy.interpolate import interp1d
import numpy as np
from copy import copy


[docs] def add_4d_vertical_info(self, info_to_add): """ To add the vertical information from other source. Parameters ---------- self : nes.Nes Source Nes object. info_to_add : nes.Nes, str Nes object with the vertical information as variable or str with the path to the NetCDF file that contains the vertical data. """ vertical_var = list(self.concatenate(info_to_add)) self.vertical_var_name = vertical_var[0] return None
[docs] def interpolate_vertical(self, new_levels, new_src_vertical=None, kind='linear', extrapolate=None, info=None, overwrite=False): """ Vertical interpolation. Parameters ---------- self : Nes Source Nes object. new_levels : List List of new vertical levels. new_src_vertical : nes.Nes, str Nes object with the vertical information as variable or str with the path to the NetCDF file that contains the vertical data. kind : str Vertical methods type. extrapolate : None, tuple, str Extrapolate method (for non linear operations). info: None, bool Indicates if you want to print extra information. overwrite: bool Indicates if you want to compute the vertical interpolation in the same object or not. """ if len(self.lev) == 1: raise RuntimeError("1D data cannot be vertically interpolated.") if not overwrite: self = self.copy(copy_vars=True) if info is None: info = self.info if new_src_vertical is not None: self.add_4d_vertical_info(new_src_vertical) if new_levels[0] > new_levels[-1]: ascendant = False else: ascendant = True nz_new = len(new_levels) if self.vertical_var_name is None: # To use current level data current_level = True # Checking old order src_levels = self.lev['data'] if src_levels[0] > src_levels[-1]: if not ascendant: flip = False else: flip = True src_levels = np.flip(src_levels) else: if ascendant: flip = False else: flip = True src_levels = np.flip(src_levels) else: current_level = False src_levels = self.variables[self.vertical_var_name]['data'] if self.vertical_var_name == 'layer_thickness': src_levels = np.flip(np.cumsum(np.flip(src_levels, axis=1), axis=1)) else: # src_levels = np.flip(src_levels, axis=1) pass # Checking old order if np.nanmean(src_levels[:, 0, :, :]) > np.nanmean(src_levels[:, -1, :, :]): if not ascendant: flip = False else: flip = True src_levels = np.flip(src_levels, axis=1) else: if ascendant: flip = False else: flip = True src_levels = np.flip(src_levels, axis=1) # Loop over variables for var_name in self.variables.keys(): if self.variables[var_name]['data'] is None: # Load data if it is not loaded yet self.load(var_name) if var_name != self.vertical_var_name: if flip: self.variables[var_name]['data'] = np.flip(self.variables[var_name]['data'], axis=1) if info and self.master: print("\t{var} vertical methods".format(var=var_name)) sys.stdout.flush() nt, nz, ny, nx = self.variables[var_name]['data'].shape dst_data = np.empty((nt, nz_new, ny, nx), dtype=self.variables[var_name]['data'].dtype) for t in range(nt): # if info and self.rank == self.size - 1: if self.info and self.master: print('\t\t{3} time step {0} ({1}/{2}))'.format(self.time[t], t + 1, nt, var_name)) sys.stdout.flush() for j in range(ny): for i in range(nx): curr_level_values = src_levels[t, :, j, i] try: # Check if all values are identical or masked if ((isinstance(curr_level_values, np.ndarray) and (curr_level_values == curr_level_values[0]).all()) or (isinstance(curr_level_values, np.ma.core.MaskedArray) and curr_level_values.mask.all())): kind = 'slinear' else: kind = kind # 'cubic' if extrapolate is None: if ascendant: fill_value = (np.float64(self.variables[var_name]['data'][t, 0, j, i]), np.float64(self.variables[var_name]['data'][t, -1, j, i])) else: fill_value = (np.float64(self.variables[var_name]['data'][t, -1, j, i]), np.float64(self.variables[var_name]['data'][t, 0, j, i])) else: fill_value = extrapolate # We force the methods with float64 to avoid negative values # We don't know why the negatives appears with float34 if current_level: # 1D vertical component src_levels_aux = src_levels else: # 4D vertical component src_levels_aux = src_levels[t, :, j, i] if kind == 'linear' and ascendant: dst_data[t, :, j, i] = np.array( np.interp(new_levels, np.array(src_levels_aux, dtype=np.float64), np.array(self.variables[var_name]['data'][t, :, j, i], dtype=np.float64)), dtype=self.variables[var_name]['data'].dtype) else: dst_data[t, :, j, i] = np.array( interp1d(np.array(src_levels_aux, dtype=np.float64), np.array(self.variables[var_name]['data'][t, :, j, i], dtype=np.float64), kind=kind, bounds_error=False, fill_value=fill_value)(new_levels), dtype=self.variables[var_name]['data'].dtype) except Exception as e: print("time lat lon", t, j, i) print("***********************") print("LEVELS", np.array(src_levels[t, :, j, i], dtype=np.float64)) print("DATA", np.array(self.variables[var_name]['data'][t, :, j, i], dtype=np.float64)) print("METHOD", kind) print("FILL_VALUE", fill_value) print("+++++++++++++++++++++++") raise Exception(str(e)) # if level_array is not None: # dst_data[t, :, j, i] = np.array(f(level_array), dtype=np.float32) self.variables[var_name]['data'] = copy(dst_data) # print(self.variables[var_name]['data']) # Update level information new_lev_info = {'data': np.array(new_levels)} if 'positive' in self._lev.keys(): # Vertical level direction if flip: self.reverse_level_direction() new_lev_info['positive'] = self._lev['positive'] for var_attr, attr_info in self.variables[self.vertical_var_name].items(): if var_attr not in ['data', 'dimensions', 'crs', 'grid_mapping']: new_lev_info[var_attr] = copy(attr_info) self.set_levels(new_lev_info) self.free_vars(self.vertical_var_name) self.vertical_var_name = None # Remove original file information self.__ini_path = None self.dataset = None self.netcdf = None return self