Newer
Older
"""CDO-based interpolation"""
import os
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticDomainOption, DiagnosticVariableListOption, \
DiagnosticChoiceOption, DiagnosticBoolOption, DiagnosticOption
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
Javier Vegas-Regidor
committed
class InterpolateCDO(Diagnostic):
"""
3-dimensional conservative interpolation to the regular atmospheric grid.
It can also be used for 2D (i,j) variables
:original author: Javier Vegas-Regidor<javier.vegas@bsc.es>
:created: October 2016
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable's name
:type variable: str
:param domain: variable's domain
:param model_version: model version
:type model_version: str
"""
alias = 'interpcdo'
"Diagnostic alias for the configuration file"
BILINEAR = 'bilinear'
BICUBIC = 'bicubic'
CONSERVATIVE = 'conservative'
CONSERVATIVE2 = 'conservative2'
METHODS = [BILINEAR, BICUBIC, CONSERVATIVE, CONSERVATIVE2]
def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid, model_version,
mask_oceans, original_grid, weights):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.model_version = model_version
self.required_vars = [variable]
self.generated_vars = [variable]
self.tempTemplate = ''
self.grid = target_grid
Javier Vegas-Regidor
committed
self.original_grid = original_grid
def __eq__(self, other):
return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
self.model_version == other.model_version and self.domain == other.domain and \
self.variable == other.variable and self.mask_oceans == other.mask_oceans and self.grid == other.grid and \
self.original_grid == other.original_grid
def __hash__(self):
return hash(str(self))
return 'Interpolate with CDO Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} ' \
'Variable: {0.domain}:{0.variable} Target grid: {0.grid} Original grid: {0.original_grid} ' \
'Mask ocean: {0.mask_oceans} Model: {0.model_version}'.format(self)
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: target_grid, variable, domain=ocean
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(default_value=ModelingRealms.ocean),
DiagnosticVariableListOption(diags.data_manager.config.var_manager, 'variables'),
DiagnosticOption('target_grid', diags.config.experiment.atmos_grid.lower()),
DiagnosticChoiceOption('method', InterpolateCDO.METHODS, InterpolateCDO.BILINEAR),
Javier Vegas-Regidor
committed
DiagnosticBoolOption('mask_oceans', True),
DiagnosticOption('original_grid', ''),
DiagnosticBoolOption('weights_from_mask', True)
)
options = cls.process_options(options, options_available)
target_grid = cls._translate_ifs_grids_to_cdo_names(options['target_grid'])
if not target_grid:
raise Exception('Target grid not provided')
weights = TempFile.get()
method = options['method'].lower()
cls.compute_weights(method, target_grid, temp, weights)
os.remove(temp)
weights_job = None
else:
startdate, member, chunk = diags.config.experiment.get_chunk_list()[0]
weights_job = ComputeWeights(diags.data_manager, startdate, member, chunk, options['domain'],
options['variables'][0], target_grid, options['original_grid'], weights,
for var in options['variables']:
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job = InterpolateCDO(diags.data_manager, startdate, member, chunk, options['domain'], var, target_grid,
diags.config.experiment.model_version, options['mask_oceans'],
options['original_grid'], weights)
if weights_job is not None:
job.add_subjob(weights_job)
job_list.append(job)
return job_list
@classmethod
def compute_weights(cls, method, target_grid, sample_file, weights):
"""
Compute weights for interpolation from sample file
Parameters
----------
method: int
target_grid: str
Grid to intepolate to. Can be anything understand by CDO
sample_file: str
Path to a file containing original mesh information
weights:
Path to the file to store the weights
if method == InterpolateCDO.BILINEAR:
Utils.cdo.genbil(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.BICUBIC:
Utils.cdo.genbic(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.CONSERVATIVE:
Utils.cdo.genycon(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.CONSERVATIVE2:
Utils.cdo.gencon2(target_grid, input=sample_file, output=weights)
def get_sample_grid_file(cls):
"""
Get a sample grid file
Create a sample grid file from the definition in the masks file
Returns
-------
str
"""
lat_name, lon_name = cls._get_lat_lon_alias(handler)
lon_bnds_name = '{0}_bnds'.format(lon_name)
lat_bnds_name = '{0}_bnds'.format(lat_name)
Utils.nco.ncks(input='mask.nc', output=temp,
options=('-O -v tmask,{0},{1},gphif,glamf'.format(lat_name, lon_name),))
lon = handler.variables[lon_name]
lon.units = "degrees_east"
lon.long_name = "Longitude"
lon.nav_model = "Default grid"
lon.standard_name = "longitude"
lon.short_name = lon_name
lon.bounds = lon_bnds_name
lat = handler.variables[lat_name]
lat.units = "degrees_north"
lat.long_name = "Latitude"
lat.nav_model = "Default grid"
lat.standard_name = "latitude"
lat.short_name = lat_name
lat.bounds = lat_bnds_name
handler.createDimension('bounds', 4)
lon_bnds = handler.createVariable(lon_bnds_name, lon.datatype, ('j', 'i', 'bounds'))
corner_lat = handler.variables['glamf'][0, ...]
lon_bnds[:, :, 0] = corner_lat
lon_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
lon_bnds[:, :, 2] = np.roll(corner_lat, -1, 1)
lon_bnds[:, :, 3] = np.roll(lon_bnds[:, :, 1], -1, 1)
lat_bnds = handler.createVariable(lat_bnds_name, lat.datatype, ('j', 'i', 'bounds'))
corner_lat = handler.variables['gphif'][0, ...]
lat_bnds[:, :, 0] = corner_lat
lat_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
lat_bnds[:, :, 2] = np.roll(corner_lat, 1, 1)
lat_bnds[:, :, 3] = np.roll(lat_bnds[:, :, 1], 1, 1)
lat_bnds[0, :, 1] = lat_bnds[1, 0, 1] - 1
lat_bnds[0, :, 3] = lat_bnds[1, 0, 3] - 1
tmask = handler.variables['tmask']
tmask.coordinates = 'time lev {0} {1}'.format(lat_name, lon_name)
Utils.nco.ncks(input=temp, output=temp, options=('-O -x -v gphif,glamf',))
@classmethod
def _get_lat_lon_alias(cls, handler):
lat_name = None
for lat_alias in ['lat', 'latitude']:
if lat_alias in handler.variables:
lat_name = lat_alias
break
lon_name = None
for lon_alias in ['lon', 'longitude']:
if lon_alias in handler.variables:
lon_name = lon_alias
break
return lat_name, lon_name
@classmethod
def _translate_ifs_grids_to_cdo_names(cls, target_grid):
if target_grid.upper().startswith('T159L'):
target_grid = 't106grid'
if target_grid.upper().startswith('T255L'):
target_grid = 't170grid'
if target_grid.upper().startswith('T511L'):
target_grid = 't340grid'
return target_grid
def request_data(self):
self.original = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.original_grid)
def declare_data_generated(self):
self.regridded = self.declare_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.grid)
variable_file = TempFile.get()
Utils.copy_file(self.original.local_file, variable_file)
Utils.rename_variables(variable_file, {'jpib': 'i', 'jpjb': 'j', 'x': 'i', 'y': 'j',
'time_counter': 'time', 't': 'time',
'SSTK_ens0': 'tos', 'SSTK_ens1': 'tos', 'SSTK_ens2': 'tos',
'nav_lat': 'lat', 'nav_lon': 'lon'},
must_exist=False, rename_dimension=True)
lat_name, lon_name = self._get_lat_lon_alias(handler)
var = handler.variables[self.variable]
units = var.units
coordinates = list()
for dim in var.dimensions:
if dim == 'i':
coordinates.append(lon_name)
coordinates.append(lat_name)
else:
coordinates.append(dim)
var.coordinates = ' '.join(coordinates)
mask = Utils.get_mask(Basins().Global).astype(float)
mask[mask == 0] = np.nan
var[:] = mask * var[:]
temp = TempFile.get()
Utils.cdo.remap(','.join((self.grid.split('_')[0], self.weights)), input=variable_file, output=temp)
handler.variables[self.variable].units = units
handler.close()
if lat_name != 'lat':
Utils.rename_variables(temp, {'lat': lat_name, 'lon': lon_name}, True, True)
self.regridded.set_local_file(temp)
"""
Diagnostic used to compute interpolation weights
Parameters
----------
data_manager: DataManager
startdate: str
member: int
chunk: int
domain: ModelingRealm
variable: str
target_grid: str
original_grid: str
weights_file: str
method: str
"""
alias = 'computeinterpcdoweights'
"Diagnostic alias for the configuration file"
@classmethod
def generate_jobs(cls, diags, options):
"""
Generate the instances of the diagnostics that will be run by the manager
This method does not does anything as this diagnostic is not expected to be called by the users
"""
pass
def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid,
original_grid, weights_file, method):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.grid = target_grid
self.original_grid = original_grid
self.weights_file = weights_file
self.method = method
def __str__(self):
return 'Computing weights for CDO interpolation: Method {0.method} Target grid: {0.grid}'.format(self)
def compute(self):
InterpolateCDO.compute_weights(self.method, self.grid, self.sample_data.local_file, self.weights_file)
def request_data(self):
self.sample_data = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.original_grid)
def declare_data_generated(self):