Newer
Older
import os
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticDomainOption, DiagnosticVariableListOption, \
DiagnosticChoiceOption, DiagnosticBoolOption, DiagnosticOption
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
Javier Vegas-Regidor
committed
class InterpolateCDO(Diagnostic):
"""
3-dimensional conservative interpolation to the regular atmospheric grid.
It can also be used for 2D (i,j) variables
:original author: Javier Vegas-Regidor<javier.vegas@bsc.es>
:created: October 2016
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable's name
:type variable: str
:param domain: variable's domain
:param model_version: model version
:type model_version: str
"""
alias = 'interpcdo'
"Diagnostic alias for the configuration file"
BILINEAR = 'bilinear'
BICUBIC = 'bicubic'
CONSERVATIVE = 'conservative'
CONSERVATIVE2 = 'conservative2'
METHODS = [BILINEAR, BICUBIC, CONSERVATIVE, CONSERVATIVE2]
def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid, model_version,
mask_oceans, original_grid, weights):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.model_version = model_version
self.required_vars = [variable]
self.generated_vars = [variable]
self.tempTemplate = ''
self.grid = target_grid
Javier Vegas-Regidor
committed
self.original_grid = original_grid
def __eq__(self, other):
return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \
self.model_version == other.model_version and self.domain == other.domain and \
self.variable == other.variable and self.mask_oceans == other.mask_oceans and self.grid == other.grid and \
self.original_grid == other.original_grid
def __hash__(self):
return hash(str(self))
return 'Interpolate with CDO Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} ' \
'Variable: {0.domain}:{0.variable} Target grid: {0.grid} Original grid: {0.original_grid} ' \
'Mask ocean: {0.mask_oceans} Model: {0.model_version}'.format(self)
Creates a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: target_grid, variable, domain=ocean
:type options: list[str]
:return:
"""
options_available = (DiagnosticDomainOption(default_value=ModelingRealms.ocean),
DiagnosticVariableListOption(diags.data_manager.config.var_manager, 'variables'),
DiagnosticOption('target_grid', diags.config.experiment.atmos_grid.lower()),
DiagnosticChoiceOption('method', InterpolateCDO.METHODS, InterpolateCDO.BILINEAR),
Javier Vegas-Regidor
committed
DiagnosticBoolOption('mask_oceans', True),
DiagnosticOption('original_grid', ''),
DiagnosticBoolOption('weights_from_mask', True)
)
options = cls.process_options(options, options_available)
target_grid = cls._translate_ifs_grids_to_cdo_names(options['target_grid'])
if not target_grid:
raise Exception('Target grid not provided')
weights = TempFile.get()
method = options['method'].lower()
if options['weights_from_mask']:
temp = cls.get_sample_grid_file()
cls.compute_weights(method, target_grid, temp, weights)
os.remove(temp)
weights_job = None
else:
startdate, member, chunk = diags.config.experiment.get_chunk_list()[0]
weights_job = ComputeWeights(diags.data_manager, startdate, member, chunk, options['domain'],
options['variables'][0], target_grid, options['original_grid'], weights,
for var in options['variables']:
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job = InterpolateCDO(diags.data_manager, startdate, member, chunk, options['domain'], var, target_grid,
diags.config.experiment.model_version, options['mask_oceans'],
options['original_grid'], weights)
if weights_job is not None:
job.add_subjob(weights_job)
job_list.append(job)
return job_list
@classmethod
def compute_weights(cls, method, target_grid, sample_file, weights):
if method == InterpolateCDO.BILINEAR:
Utils.cdo.genbil(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.BICUBIC:
Utils.cdo.genbic(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.CONSERVATIVE:
Utils.cdo.genycon(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.CONSERVATIVE2:
Utils.cdo.gencon2(target_grid, input=sample_file, output=weights)
@classmethod
def get_sample_grid_file(cls):
temp = TempFile.get()
lat_name, lon_name = cls._get_lat_lon_alias(handler)
lon_bnds_name = '{0}_bnds'.format(lon_name)
lat_bnds_name = '{0}_bnds'.format(lat_name)
Utils.nco.ncks(input='mask.nc', output=temp,
options=('-O -v tmask,{0},{1},gphif,glamf'.format(lat_name, lon_name),))
lon = handler.variables[lon_name]
lon.units = "degrees_east"
lon.long_name = "Longitude"
lon.nav_model = "Default grid"
lon.standard_name = "longitude"
lon.short_name = lon_name
lon.bounds = lon_bnds_name
lat = handler.variables[lat_name]
lat.units = "degrees_north"
lat.long_name = "Latitude"
lat.nav_model = "Default grid"
lat.standard_name = "latitude"
lat.short_name = lat_name
lat.bounds = lat_bnds_name
handler.createDimension('bounds', 4)
lon_bnds = handler.createVariable(lon_bnds_name, lon.datatype, ('j', 'i', 'bounds'))
corner_lat = handler.variables['glamf'][0, ...]
lon_bnds[:, :, 0] = corner_lat
lon_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
lon_bnds[:, :, 2] = np.roll(corner_lat, -1, 1)
lon_bnds[:, :, 3] = np.roll(lon_bnds[:, :, 1], -1, 1)
lat_bnds = handler.createVariable(lat_bnds_name, lat.datatype, ('j', 'i', 'bounds'))
corner_lat = handler.variables['gphif'][0, ...]
lat_bnds[:, :, 0] = corner_lat
lat_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
lat_bnds[:, :, 2] = np.roll(corner_lat, 1, 1)
lat_bnds[:, :, 3] = np.roll(lat_bnds[:, :, 1], 1, 1)
lat_bnds[0, :, 1] = lat_bnds[1, 0, 1] - 1
lat_bnds[0, :, 3] = lat_bnds[1, 0, 3] - 1
tmask = handler.variables['tmask']
tmask.coordinates = 'time lev {0} {1}'.format(lat_name, lon_name)
Utils.nco.ncks(input=temp, output=temp, options=('-O -x -v gphif,glamf',))
@classmethod
def _get_lat_lon_alias(cls, handler):
lat_name = None
for lat_alias in ['lat', 'latitude']:
if lat_alias in handler.variables:
lat_name = lat_alias
break
lon_name = None
for lon_alias in ['lon', 'longitude']:
if lon_alias in handler.variables:
lon_name = lon_alias
break
return lat_name, lon_name
@classmethod
def _translate_ifs_grids_to_cdo_names(cls, target_grid):
if target_grid.upper().startswith('T159L'):
target_grid = 't106grid'
if target_grid.upper().startswith('T255L'):
target_grid = 't170grid'
if target_grid.upper().startswith('T511L'):
target_grid = 't340grid'
return target_grid
def request_data(self):
self.original = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.original_grid)
def declare_data_generated(self):
self.regridded = self.declare_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.grid)
variable_file = TempFile.get()
Utils.copy_file(self.original.local_file, variable_file)
Utils.rename_variables(variable_file, {'jpib': 'i', 'jpjb': 'j', 'x': 'i', 'y': 'j',
'time_counter': 'time', 't': 'time',
'SSTK_ens0': 'tos', 'SSTK_ens1': 'tos', 'SSTK_ens2': 'tos',
'nav_lat': 'lat', 'nav_lon': 'lon'},
must_exist=False, rename_dimension=True)
lat_name, lon_name = self._get_lat_lon_alias(handler)
var = handler.variables[self.variable]
units = var.units
coordinates = list()
for dim in var.dimensions:
if dim == 'i':
coordinates.append(lon_name)
coordinates.append(lat_name)
else:
coordinates.append(dim)
var.coordinates = ' '.join(coordinates)
mask = Utils.get_mask(Basins().Global).astype(float)
mask[mask == 0] = np.nan
var[:] = mask * var[:]
temp = TempFile.get()
Utils.cdo.remap(','.join((self.grid.split('_')[0], self.weights)), input=variable_file, output=temp)
handler.variables[self.variable].units = units
handler.close()
if lat_name != 'lat':
Utils.rename_variables(temp, {'lat': lat_name, 'lon': lon_name}, True, True)
self.regridded.set_local_file(temp)
"""
Diagnostic used to compute interpolation weights
Parameters
----------
data_manager: DataManager
startdate: str
member: int
chunk: int
domain: ModelingRealm
variable: str
target_grid: str
original_grid: str
weights_file: str
method: str
"""
alias = 'computeinterpcdoweights'
"Diagnostic alias for the configuration file"
@classmethod
def generate_jobs(cls, diags, options):
pass
def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid,
original_grid, weights_file, method):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.grid = target_grid
self.original_grid = original_grid
self.weights_file = weights_file
self.method = method
def __str__(self):
return 'Computing weights for CDO interpolation: Method {0.method} Target grid: {0.grid}'.format(self)
def compute(self):
InterpolateCDO.compute_weights(self.method, self.grid, self.sample_data.local_file, self.weights_file)
def request_data(self):
self.sample_data = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk,
grid=self.original_grid)
def declare_data_generated(self):