Newer
Older
"""CDO-based interpolation"""
import os
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import (
Diagnostic,
DiagnosticDomainOption,
DiagnosticVariableListOption,
DiagnosticChoiceOption,
DiagnosticBoolOption,
DiagnosticOption,
DiagnosticFrequencyOption,
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
Javier Vegas-Regidor
committed
class InterpolateCDO(Diagnostic):
"""
3-dimensional conservative interpolation to the regular atmospheric grid.
It can also be used for 2D (i,j) variables
:original author: Javier Vegas-Regidor<javier.vegas@bsc.es>
:created: October 2016
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable's name
:type variable: str
:param domain: variable's domain
:param model_version: model version
:type model_version: str
"""
"Diagnostic alias for the configuration file"
BILINEAR = "bilinear"
BICUBIC = "bicubic"
CONSERVATIVE = "conservative"
CONSERVATIVE2 = "conservative2"
METHODS = [BILINEAR, BICUBIC, CONSERVATIVE, CONSERVATIVE2]
def __init__(
self,
data_manager,
startdate,
member,
chunk,
domain,
variable,
target_grid,
model_version,
mask_oceans,
original_grid,
weights,
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.model_version = model_version
self.required_vars = [variable]
self.generated_vars = [variable]
Javier Vegas-Regidor
committed
self.original_grid = original_grid
self.frequency = frequency
def __eq__(self, other):
if self._different_type(other):
return (
self.startdate == other.startdate
and self.member == other.member
and self.chunk == other.chunk
and self.model_version == other.model_version
and self.domain == other.domain
and self.variable == other.variable
and self.mask_oceans == other.mask_oceans
and self.grid == other.grid
and self.original_grid == other.original_grid
and self.frequency == other.frequency
def __hash__(self):
return hash(str(self))
return (
"Interpolate with CDO Startdate: {0.startdate} Member: {0.member} "
"Chunk: {0.chunk} Variable: {0.domain}:{0.variable} "
"Frequency: {0.frequency} "
"Target grid: {0.grid} Original grid: {0.original_grid} "
"Mask ocean: {0.mask_oceans} Model: {0.model_version}".format(self)
)
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: target_grid, variable, domain=ocean
:type options: list[str]
:return:
"""
options_available = (
DiagnosticDomainOption(default_value=ModelingRealms.ocean),
DiagnosticVariableListOption(
diags.data_manager.config.var_manager, "variables"
),
DiagnosticOption(
"target_grid", diags.config.experiment.atmos_grid.lower()
),
DiagnosticChoiceOption(
"method", InterpolateCDO.METHODS, InterpolateCDO.BILINEAR
),
DiagnosticBoolOption("mask_oceans", True),
DiagnosticOption("original_grid", ""),
DiagnosticBoolOption("weights_from_mask", True),
DiagnosticFrequencyOption(
default_value=diags.config.frequency),
options = cls.process_options(options, options_available)
target_grid = cls._translate_ifs_grids_to_cdo_names(
options["target_grid"]
)
method = options["method"].lower()
if options["weights_from_mask"]:
cls.compute_weights(method, target_grid, temp, weights)
os.remove(temp)
weights_job = None
else:
(
startdate,
member,
chunk,
) = diags.config.experiment.get_chunk_list()[0]
weights_job = ComputeWeights(
diags.data_manager,
startdate,
member,
chunk,
options["domain"],
options["variables"][0],
target_grid,
options["original_grid"],
weights,
options["method"],
options["frequency"],
)
for var in options["variables"]:
for (
startdate,
member,
chunk,
) in diags.config.experiment.get_chunk_list():
job = InterpolateCDO(
diags.data_manager,
startdate,
member,
chunk,
options["domain"],
var,
target_grid,
diags.config.experiment.model_version,
options["mask_oceans"],
options["original_grid"],
weights,
options["frequency"]
if weights_job is not None:
job.add_subjob(weights_job)
job_list.append(job)
return job_list
@classmethod
def compute_weights(cls, method, target_grid, sample_file, weights):
"""
Compute weights for interpolation from sample file
Parameters
----------
method: int
target_grid: str
Grid to intepolate to. Can be anything understand by CDO
sample_file: str
Path to a file containing original mesh information
weights:
Path to the file to store the weights
if method == InterpolateCDO.BILINEAR:
Utils.cdo().genbil(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.BICUBIC:
Utils.cdo().genbic(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.CONSERVATIVE:
Utils.cdo().genycon(target_grid, input=sample_file, output=weights)
elif method == InterpolateCDO.CONSERVATIVE2:
Utils.cdo().gencon2(target_grid, input=sample_file, output=weights)
def get_sample_grid_file(cls):
"""
Get a sample grid file
Create a sample grid file from the definition in the masks file
Returns
-------
str
"""
lat_name, lon_name = cls._get_lat_lon_alias(handler)
lon_bnds_name = "{0}_bnds".format(lon_name)
lat_bnds_name = "{0}_bnds".format(lat_name)
options=(
"-O -v tmask,{0},{1},gphif,glamf".format(lat_name, lon_name),
),
lon = handler.variables[lon_name]
lon.units = "degrees_east"
lon.long_name = "Longitude"
lon.nav_model = "Default grid"
lon.standard_name = "longitude"
lon.short_name = lon_name
lon.bounds = lon_bnds_name
lat = handler.variables[lat_name]
lat.units = "degrees_north"
lat.long_name = "Latitude"
lat.nav_model = "Default grid"
lat.standard_name = "latitude"
lat.short_name = lat_name
lat.bounds = lat_bnds_name
lon_bnds = handler.createVariable(
lon_bnds_name, lon.datatype, ("j", "i", "bounds")
)
corner_lat = handler.variables["glamf"][0, ...]
lon_bnds[:, :, 0] = corner_lat
lon_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
lon_bnds[:, :, 2] = np.roll(corner_lat, -1, 1)
lon_bnds[:, :, 3] = np.roll(lon_bnds[:, :, 1], -1, 1)
lat_bnds = handler.createVariable(
lat_bnds_name, lat.datatype, ("j", "i", "bounds")
)
corner_lat = handler.variables["gphif"][0, ...]
lat_bnds[:, :, 0] = corner_lat
lat_bnds[:, :, 1] = np.roll(corner_lat, 1, 0)
lat_bnds[:, :, 2] = np.roll(corner_lat, 1, 1)
lat_bnds[:, :, 3] = np.roll(lat_bnds[:, :, 1], 1, 1)
lat_bnds[0, :, 1] = lat_bnds[1, 0, 1] - 1
lat_bnds[0, :, 3] = lat_bnds[1, 0, 3] - 1
tmask = handler.variables["tmask"]
tmask.coordinates = "time lev {0} {1}".format(lat_name, lon_name)
Utils.nco().ncks(
input=temp, output=temp, options=("-O -x -v gphif,glamf",)
)
@classmethod
def _get_lat_lon_alias(cls, handler):
lat_name = None
if lat_alias in handler.variables:
lat_name = lat_alias
break
lon_name = None
if lon_alias in handler.variables:
lon_name = lon_alias
break
return lat_name, lon_name
@classmethod
def _translate_ifs_grids_to_cdo_names(cls, target_grid):
if target_grid.upper().startswith("T159L"):
target_grid = "t106grid"
if target_grid.upper().startswith("T255L"):
target_grid = "t170grid"
if target_grid.upper().startswith("T511L"):
target_grid = "t340grid"
return target_grid
def request_data(self):
self.original = self.request_chunk(
self.domain,
self.variable,
self.startdate,
self.member,
self.chunk,
grid=self.original_grid,
frequency=self.frequency,
def declare_data_generated(self):
self.regridded = self.declare_chunk(
self.domain,
self.variable,
self.startdate,
self.member,
self.chunk,
grid=self.grid,
frequency=self.frequency,
variable_file = TempFile.get()
Utils.copy_file(self.original.local_file, variable_file)
Utils.rename_variables(
variable_file,
{
"jpib": "i",
"jpjb": "j",
"x": "i",
"y": "j",
"dim1": "j",
"dim2": "i",
"time_counter": "time",
"t": "time",
"SSTK_ens0": "tos",
"SSTK_ens1": "tos",
"SSTK_ens2": "tos",
"nav_lat": "lat",
"nav_lon": "lon",
"time_centered": None,
"time_centered_bnds": None,
lat_name, lon_name = self._get_lat_lon_alias(handler)
var = handler.variables[self.variable]
try:
units = var.units
except AttributeError:
units = None
coordinates = list()
for dim in var.dimensions:
coordinates.append(lon_name)
coordinates.append(lat_name)
else:
coordinates.append(dim)
mask = Utils.get_mask(Basins().Global).astype(float)
mask[mask == 0] = np.nan
var[:] = mask * var[:]
temp = TempFile.get()
Utils.cdo().remap(
",".join((self.grid.split("_")[0], self.weights)),
input=variable_file,
output=temp,
)
if units:
handler.variables[self.variable].units = units
handler.close()
if lat_name != "lat":
Utils.rename_variables(
temp, {"lat": lat_name, "lon": lon_name}, True
)
self.regridded.set_local_file(temp)
"""
Diagnostic used to compute interpolation weights
Parameters
----------
data_manager: DataManager
startdate: str
member: int
chunk: int
domain: ModelingRealm
variable: str
target_grid: str
original_grid: str
weights_file: str
method: str
"""
"Diagnostic alias for the configuration file"
@classmethod
def generate_jobs(cls, diags, options):
Generate the instances of the diagnostics that will be run
This method does not does anything as this diagnostic is
not expected to be called by the users
def __init__(
self,
data_manager,
startdate,
member,
chunk,
domain,
variable,
target_grid,
original_grid,
weights_file,
method,
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.variable = variable
self.domain = domain
self.grid = target_grid
self.original_grid = original_grid
self.weights_file = weights_file
self.method = method
self.frequency = frequency
return (
"Computing weights for CDO interpolation: Method {0.method} "
"Target grid: {0.grid}".format(self)
)
InterpolateCDO.compute_weights(
self.method,
self.grid,
self.sample_data.local_file,
self.weights_file,
)
self.sample_data = self.request_chunk(
self.domain,
self.variable,
self.startdate,
self.member,
self.chunk,
grid=self.original_grid,
frequency=self.frequency,