Newer
Older
import six
from bscearth.utils.date import (
parse_date,
chunk_end_date,
previous_day,
date2str,
add_months,
)
import iris.coord_categorisation
import iris.analysis
import iris.util
import iris.exceptions
from earthdiagnostics.datafile import NetCDFFile
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import TempFile, Utils
class Cmorizer(object):
Parameters
----------
data_manager: DataManager
startdate: str
member: int
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
NON_DATA_VARIABLES = (
"lon",
"lat",
"longitude",
"latitude",
"plev",
"time",
"time_bnds",
"leadtime",
"lev",
"lev_2",
"icethi",
"deptht",
"depthu",
"depthw",
"depthv",
"time_centered",
"time_centered_bounds",
"deptht_bounds",
"depthu_bounds",
"depthv_bounds",
"depthw_bounds",
"deptht_bnds",
"depthu_bnds",
"depthv_bnds",
"depthw_bnds",
"time_counter_bounds",
"ncatice",
"nav_lat_grid_V",
"nav_lat_grid_U",
"nav_lat_grid_T",
"nav_lon_grid_V",
"nav_lon_grid_U",
"nav_lon_grid_T",
"depth",
"depth_2",
"depth_3",
"depth_4",
"depth_bnds",
"depth_2_bnds",
"depth_3_bnds",
"depth_4_bnds",
"mlev",
"hyai",
"hybi",
"hyam",
"hybm",
)
def __init__(self, data_manager, startdate, member):
self.data_manager = data_manager
self.startdate = startdate
self.member = member
self.config = data_manager.config
self.experiment = self.config.experiment
self.cmor = self.config.cmor
self.convetion = self.config.data_convention
self.member_str = self.experiment.get_member_str(member)
self.original_files_path = os.path.join(
self.config.data_dir,
self.experiment.expid,
"original_files",
self.startdate,
self.member_str,
"outputs",
)
self.cmor_scratch = str(
os.path.join(
self.config.scratch_dir,
"CMOR",
self.startdate,
self.member_str,
)
)
self.lon_name = self.config.data_convention.lon_name
self.lat_name = self.config.data_convention.lat_name
"time_counter": "time",
"time_counter_bnds": "time_bnds",
"time_counter_bounds": "time_bnds",
"tbnds": "bnds",
"nav_lat": self.lat_name,
"nav_lon": self.lon_name,
"x": "i",
"y": "j",
}
def path_icm(self):
"""Path to the ICM file"""
return os.path.join(self.config.scratch_dir, "ICM")
Log.info("Skipping ocean cmorization due to configuration")
Log.info("\nCMORizing ocean\n")
return self._cmorize_ocean_files("MMO", "PPO", "diags")
def _cmorize_ocean_files(self, *args):
tar_files = ()
for prefix in args:
tar_folder = os.path.join(
self.original_files_path, "{0}*".format(prefix)
)
tar_files = glob.glob(tar_folder)
tar_files.sort()
if len(tar_files) > 0:
break
if not len(tar_files):
Log.error(
"No {1} files found in {0}".format(
self.original_files_path, args
)
)
if not self._cmorization_required(
self._get_chunk(os.path.basename(tarfile)),
(
ModelingRealms.ocean,
ModelingRealms.seaIce,
ModelingRealms.ocnBgchem,
),
Log.info(
"No need to unpack file {0}/{1}".format(
count, len(tar_files)
)
)
Log.info(
"Unpacking oceanic file {0}/{1}".format(
count, len(tar_files)
)
)
try:
self._unpack_tar_file(tarfile)
self._cmorize_nc_files()
Log.result(
"Oceanic file {0}/{1} finished".format(
count, len(tar_files)
)
)
Log.error(
"Could not CMORize oceanic file {0}: {1}", count, ex
)
def _filter_files(self, file_list):
Javier Vegas-Regidor
committed
if not self.cmor.filter_files:
return file_list
for file_path in file_list:
filename = os.path.basename(file_path)
if any(f in filename for f in self.cmor.filter_files):
filtered.append(file_path)
if not filtered:
raise CMORException(
f"Filters {self.cmor.filter_files} do not match any file",
)
def _remove(self, file_path):
os.remove(file_path)
nc_files = glob.glob(os.path.join(self.cmor_scratch, "*.nc"))
for filename in self._filter_files(nc_files):
def _correct_fluxes(self):
fluxes_vars = [
self.data_manager.variable_list.get_variable(
cmor_var, True
).short_name
for cmor_var in (
"prc",
"prs",
"prsn",
"rss",
"rls",
"rsscs",
"rsds",
"rlds",
"hfss",
"hfls",
)
]
change_sign_vars = [
self.data_manager.variable_list.get_variable(
cmor_var, True
).short_name
for cmor_var in ("hfss", "hfls")
]
total_seconds = self.experiment.atmos_timestep * 3600
for filename in glob.glob(os.path.join(self.cmor_scratch, "*.nc")):
for varname in handler.variables.keys():
cmor_var = self.data_manager.variable_list.get_variable(
varname, True
)
if cmor_var is None or cmor_var.short_name not in fluxes_vars:
if cmor_var.short_name in change_sign_vars:
sign = -1
else:
sign = 1
var_handler = handler.variables[varname]
var_handler[:] = sign * var_handler[:] / total_seconds
var_handler.units = "{0} {1}".format(var_handler.units, "s-1")
Utils.untar(
(tarfile,), self.cmor_scratch, self.config.cmor.filter_files
)
if os.path.isdir(os.path.join(self.cmor_scratch, "backup")):
for filepath in glob.glob(
os.path.join(self.cmor_scratch, "backup", "*")
):
Log.debug("Moving file {0}", filepath)
shutil.move(filepath, filepath.replace("/backup/", "/"))
zip_files = glob.glob(os.path.join(self.cmor_scratch, "*.gz"))
if zip_files:
for zip_file in self._filter_files(zip_files):
try:
Utils.unzip(zip_file)
except Utils.UnzipException as ex:
Log.error(
"File {0} could not be unzipped: {1}", tarfile, ex
)
Javier Vegas-Regidor
committed
if os.path.exists(self.cmor_scratch):
Javier Vegas-Regidor
committed
shutil.rmtree(self.cmor_scratch)
def _merge_mma_files(self, tarfile):
temp = TempFile.get()
for grid in ["SH", "GG"]:
files = glob.glob(
os.path.join(self.cmor_scratch, "MMA_*_{}_*.nc".format(grid))
)
if not files:
continue
merged = TempFile.get()
for filename in files:
Utils.cdo().sp2gpl(
options="-O", input=filename, output=temp
)
shutil.move(temp, filename)
cmorize_vars = set()
var_manager = self.config.var_manager
for filename in files:
handler = Utils.open_cdf(filename)
for variable in handler.variables.keys():
_, var_cmor = var_manager.get_variable_and_alias(
variable, silent=True,
)
if self.cmor.cmorize(var_cmor):
cmorize_vars.add(variable)
handler.close()
if not cmorize_vars:
continue
var_str = ",".join([str(var) for var in cmorize_vars])
Utils.cdo().mergetime(
input=[f"-selvar,{var_str} {filepath}" for filepath in files],
output=merged
)
for filename in files:
tar_startdate = (
os.path.basename(tarfile[0:-4]).split("_")[4].split("-")
)
filename = "MMA{0}_1m_{1[0]}_{1[1]}.nc".format(grid, tar_startdate)
shutil.move(merged, os.path.join(self.cmor_scratch, filename))
def cmorize_atmos(self):
"""Cmorize atmospheric data, from grib or MMA files"""
if not self.cmor.atmosphere:
Log.info("Skipping atmosphere cmorization due to configuration")
if self.cmor.use_grib and self._gribfiles_available():
Javier Vegas-Regidor
committed
else:
tar_files = glob.glob(
os.path.join(self.original_files_path, "MMA*.tar")
)
if len(tar_files) == 0:
Log.error(
"MMA files not found in {0}".format(self.original_files_path)
)
if not self._cmorization_required(
self._get_chunk(os.path.basename(tarfile)),
(ModelingRealms.atmos,),
):
Log.info(
"No need to unpack file {0}/{1}".format(
count, len(tar_files)
)
)
count += 1
continue
Log.info(
"Unpacking atmospheric file {0}/{1}".format(
count, len(tar_files)
)
)
Javier Vegas-Regidor
committed
try:
self._unpack_tar_file(tarfile)
self._merge_mma_files(tarfile)
self._correct_fluxes()
self._cmorize_nc_files()
Log.result(
"Atmospheric file {0}/{1} finished".format(
count, len(tar_files)
)
)
Javier Vegas-Regidor
committed
except Exception as ex:
Log.error(
"Could not cmorize atmospheric file {0}: {1}\n {2}",
count,
ex,
traceback.format_exc(),
)
Javier Vegas-Regidor
committed
chunk = 1
chunk_start = parse_date(self.startdate)
while os.path.exists(
self._get_original_grib_path(chunk_start, "GG")
) or os.path.exists(self._get_original_grib_path(chunk_start, "SH")):
if self._cmorization_required(chunk, (ModelingRealms.atmos,)):
chunk_end = chunk_end_date(
chunk_start,
self.experiment.chunk_size,
"month",
self.experiment.calendar,
)
chunk_end = previous_day(chunk_end, self.experiment.calendar)
Log.info(
"CMORizing chunk {0}-{1}",
date2str(chunk_start),
date2str(chunk_end),
)
Javier Vegas-Regidor
committed
try:
for grid in ("SH", "GG"):
Log.info("Processing {0} variables", grid)
Javier Vegas-Regidor
committed
first_grib = self._get_original_grib_path(
chunk_start, grid
)
Javier Vegas-Regidor
committed
if not os.path.exists(first_grib):
continue
var_list = Utils.cdo().showvar(input=first_grib)[0]
codes = {
int(var.replace("var", ""))
for var in var_list.split()
}
if not codes.intersection(
self.config.cmor.get_requested_codes()
):
Log.info(
"No requested variables found in {0}. "
"Skipping...",
grid,
)
Javier Vegas-Regidor
committed
continue
self._cmorize_grib_file(chunk_end, chunk_start, grid)
Javier Vegas-Regidor
committed
except Exception as ex:
Log.error(
"Can not cmorize GRIB file for chunk {0}-{1}: {2}",
date2str(chunk_start),
date2str(chunk_end),
ex,
)
chunk_start = chunk_end_date(
chunk_start,
self.experiment.chunk_size,
"month",
self.experiment.calendar,
)
chunk += 1
def _cmorize_grib_file(self, chunk_end, chunk_start, grid):
for month in range(0, self.experiment.chunk_size):
current_date = add_months(
chunk_start, month, self.experiment.calendar
)
original_gribfile = self._get_original_grib_path(
current_date, grid
)
Log.info("Processing month {1}", grid, date2str(current_date))
gribfile = self._get_scratch_grib_path(current_date, grid)
if not os.path.isfile(gribfile):
Log.info("Copying file...", grid, date2str(current_date))
Utils.copy_file(original_gribfile, gribfile)
full_file = self._get_monthly_grib(current_date, gribfile, grid)
if not self._unpack_grib(
full_file, gribfile, grid, current_date.month
):
next_gribfile = self._get_original_grib_path(
add_months(current_date, 1, self.experiment.calendar), grid
)
self._ungrib_vars(gribfile, current_date.month)
for splited_file in glob.glob("{0}_*.128.nc".format(gribfile)):
Log.result(
"Month {0}, {1} variables finished",
date2str(current_date),
grid,
)
self._merge_and_cmorize_atmos(
chunk_start, chunk_end, grid, Frequencies.monthly
)
self._merge_and_cmorize_atmos(
chunk_start, chunk_end, grid, Frequencies.daily
)
self._merge_and_cmorize_atmos(
chunk_start, chunk_end, grid, "{0}hr".format(self.atmos_timestep)
)
def _unpack_grib(self, full_file, gribfile, grid, month):
# remap on regular Gauss grid
codes = self.cmor.get_requested_codes()
codes_str = ",".join([str(code) for code in codes])
input="-sp2gpl -selcode,{0} {1} ".format(
codes_str, full_file
),
output=gribfile + "_",
options="-f nc4 -t ecmwf",
input="-selcode,{0} {1}".format(codes_str, full_file),
output=gribfile + "_",
options="-R -f nc4 -t ecmwf",
Log.info("No requested codes found in {0} file".format(grid))
def _get_monthly_grib(self, current_date, gribfile, grid):
prev_gribfile = self._get_scratch_grib_path(
add_months(current_date, -1, self.experiment.calendar), grid
)
if os.path.exists(prev_gribfile):
self._merge_grib_files(current_date, prev_gribfile, gribfile)
else:
full_file = gribfile
return full_file
def _get_scratch_grib_path(self, current_date, grid):
return os.path.join(
self.config.scratch_dir,
self._get_grib_filename(grid, current_date),
)
def _obtain_atmos_timestep(self, gribfile):
if self.atmos_timestep is None:
self.atmos_timestep = self._get_atmos_timestep(gribfile)
def _get_original_grib_path(self, current_date, grid):
return os.path.join(
self.original_files_path,
self._get_grib_filename(grid, current_date),
)
def _get_grib_filename(self, grid, month):
return "ICM{0}{1}+{2}.grb".format(
grid, self.experiment.expid, date2str(month)[:-2]
)
def _get_atmos_timestep(self, gribfile):
grib = cfgrib.open_file(gribfile)
dates = set()
valid_time = grib.variables["valid_time"]
for t in valid_time.data:
dates.add(
cf_units.num2date(
t,
valid_time.attributes["units"],
valid_time.attributes["calendar"],
)
)
dates = list(dates)
dates.sort()
atmos_timestep = dates[1] - dates[0]
atmos_timestep = int(atmos_timestep.total_seconds() / 3600)
self.experiment.atmos_timestep = atmos_timestep
return atmos_timestep
def _cmorize_nc_file(self, filename):
if not self._contains_requested_variables(filename):
return
frequency = self._get_nc_file_frequency(filename)
Utils.rename_variables(filename, self.alt_coord_names, False)
self._add_common_attributes(handler, frequency)
self._update_time_variables(handler)
variables = handler.variables.keys()
handler.close()
if variable in Cmorizer.NON_DATA_VARIABLES:
continue
self.extract_variable(filename, frequency, variable)
Log.error(
"Variable {0} can not be cmorized: {1}", variable, ex
)
Log.result("File {0} cmorized!", filename)
Javier Vegas-Regidor
committed
for variable in handler.variables.keys():
var = handler.variables[variable]
Javier Vegas-Regidor
committed
del var.valid_min
Javier Vegas-Regidor
committed
del var.valid_max
Javier Vegas-Regidor
committed
file_parts = os.path.basename(filename).split("_")
if self.experiment.expid in [file_parts[1], file_parts[2]]:
Javier Vegas-Regidor
committed
elif self.experiment.expid == file_parts[0]:
try:
parse_date(file_parts[1])
Javier Vegas-Regidor
committed
except ValueError:
frequency = Frequency(file_parts[1])
frequency = Frequency(file_parts[1])
return frequency
def _contains_requested_variables(self, filename):
variables = Utils.get_file_variables(filename)
return self.cmor.any_required(variables)
def extract_variable(self, file_path, frequency, variable):
Extract a variable from a file and creates the CMOR file
Parameters
----------
file_path:str
frequency: Frequency
variable: str
Raises
------
CMORException
If the filename does not match any of the recognized patterns
alias, var_cmor = self.config.var_manager.get_variable_and_alias(
variable
)
if var_cmor is None:
return
if not self.cmor.cmorize(var_cmor):
return
temp = TempFile.get()
lev_dimensions = self._set_coordinates_attribute(
file_path, var_cmor, variable
)
self._rename_level_coords(file_path, lev_dimensions, temp, variable)
Log.error(
f"Variable {var_cmor.short_name,} can not be cmorized. "
"Original filename does not match a recognized pattern",
)
raise CMORException(
f"Variable {var_cmor.domain}:{var_cmor.short_name} can not "
"be cmorized. Original filename does not match a recognized "
"pattern"
)
netcdf_file = NetCDFFile()
netcdf_file.data_manager = self.data_manager
netcdf_file.local_file = temp
netcdf_file.remote_file = self.config.data_convention.get_file_path(
self.startdate,
self.member,
var_cmor.domain,
var_cmor.short_name,
var_cmor,
None,
frequency,
grid=alias.grid,
year=None,
date_str=date_str,
)
netcdf_file.data_convention = self.config.data_convention
netcdf_file.region = region
netcdf_file.frequency = frequency
netcdf_file.domain = var_cmor.domain
netcdf_file.var = var_cmor.short_name
netcdf_file.final_name = var_cmor.short_name
netcdf_file.prepare_to_upload(rename_var=variable)
netcdf_file.add_cmorization_history()
netcdf_file.upload()
region_str = ""
Log.info(
"Variable {0.domain}:{0.short_name} processed{1}",
var_cmor,
region_str,
)
def _rename_level_coords(self, file_path, lev_dimensions, temp, variable):
cube = iris.load_cube(
file_path,
iris.Constraint(cube_func=lambda c: c.var_name == variable),
)
for lev_original, lev_target in six.iteritems(lev_dimensions):
try:
cube.coord(var_name=lev_original).var_name = lev_target
except iris.exceptions.CoordinateNotFoundError:
pass
except iris.exceptions.CoordinateNotFoundError:
cube.add_dim_coord(
DimCoord(
range(cube.shape[-2]),
var_name="j",
long_name="Cell index along second dimension",
except iris.exceptions.CoordinateNotFoundError:
cube.add_dim_coord(
DimCoord(
range(cube.shape[-1]),
var_name="i",
long_name="Cell index along first dimension",
def _set_coordinates_attribute(self, file_path, var_cmor, variable):
coords = [self.lon_name, self.lat_name, "time"]
if "leadtime" in handler.variables.keys():
coords.append("leadtime")
lev_dimensions = self._get_lev_dimensions(var_cmor)
for lev_dim in lev_dimensions.keys():
if lev_dim in handler.variables[variable].dimensions:
coords.append(lev_dim)
handler.variables[variable].coordinates = " ".join(set(coords))
handler.close()
return lev_dimensions
def _get_lev_dimensions(self, var_cmor):
if var_cmor.domain == ModelingRealms.ocean:
lev_dimensions = {
"deptht": "lev",
"depthu": "lev",
"depthw": "lev",
"depthv": "lev",
"depth": "lev",
}
elif var_cmor.domain in [ModelingRealms.landIce, ModelingRealms.land]:
lev_dimensions = {
"depth": "sdepth",
"depth_2": "sdepth",
"depth_3": "sdepth",
"depth_4": "sdepth",
}
elif var_cmor.domain == ModelingRealms.atmos:
else:
lev_dimensions = {}
return lev_dimensions
file_parts = os.path.basename(file_path).split("_")
valid_starts = (self.experiment.expid, "MMA", "MMASH", "MMAGG", "MMO")
if file_parts[0] in valid_starts or file_parts[0].startswith("ORCA"):
if file_parts[-1].endswith(".tar"):
file_parts = file_parts[-1][0:-4].split("-")
return "{0}-{1}".format(file_parts[0][0:6], file_parts[1][0:6])
Javier Vegas-Regidor
committed
else:
return "{0}-{1}".format(file_parts[2][0:6], file_parts[3][0:6])
elif file_parts[1] == self.experiment.expid:
# Files generated by the old version of the diagnostics
return "{0}-{1}".format(file_parts[4][0:6], file_parts[5][0:6])
chunk_start = parse_date(self._get_date_str(file_path).split("-")[0])
current_date = parse_date(self.startdate)
chunk = 1
while current_date < chunk_start:
current_date = chunk_end_date(
current_date,
self.experiment.chunk_size,
"month",
self.experiment.calendar,
)
chunk += 1
if current_date != chunk_start:
raise Exception(
"File {0} start date is not a valid chunk start date".format(
file_path
)
)
return chunk
def _merge_grib_files(self, current_month, prev_gribfile, gribfile):
Log.info("Merging data from different files...")
temp = TempFile.get(suffix=".grb")
Utils.cdo().selmon(
current_month.month, input=prev_gribfile, output=temp
Utils.cdo().mergetime(input=[temp, gribfile], output=self.path_icm)
self._remove(prev_gribfile)
self._remove(temp)
def _ungrib_vars(self, gribfile, month):
for var_code in self.cmor.get_requested_codes():
file_path = "{0}_{1}.128.nc".format(gribfile, var_code)
if not os.path.exists(file_path):
cube = iris.load_cube(file_path)
cube = self._fix_time_coord(cube, var_code)
cube = cube.extract(iris.Constraint(month_number=month))
for frequency in (
Frequencies.monthly,
Frequencies.daily,
Frequency("{0}hr".format(self.atmos_timestep)),
):
if var_code not in self.cmor.get_variables(frequency):
continue
time_cube = self._get_time_average(cube, frequency, var_code)
levels = self.config.cmor.get_levels(frequency, var_code)
if levels:
time_cube = time_cube.extract(level=levels)
time_cube.var_name = cube.var_name[:-2]
out_file = "{0}_{1}_{2}.nc".format(
gribfile, var_code, frequency
)
time_cube.remove_coord("month_number")
time_cube.remove_coord("day_of_month")
iris.save(time_cube, out_file, zlib=True)
def _fix_time_coord(self, cube, var_code):
time = cube.coord("time")
target_units = "days since 1950-01-01 00:00:00"
time.convert_units(
cf_units.Unit(target_units, calendar=time.units.calendar)
)
time.units = target_units
if var_code in (
144,
146,
147,
169,
175,
176,
177,
179,
180,
181,
182,
201,
202,
205,
212,
228,
):
time.points = time.points - (self.experiment.atmos_timestep / 24.0)
iris.coord_categorisation.add_day_of_month(cube, "time")
iris.coord_categorisation.add_month_number(cube, "time")
def _get_time_average(cube, frequency, var_code):
if frequency == Frequencies.monthly:
if var_code == 201:
cube = cube.aggregated_by(
["month_number", "day_of_month"], iris.analysis.MAX
)
cube = cube.aggregated_by(
["month_number", "day_of_month"], iris.analysis.MIN
)
cube = cube.aggregated_by(["month_number"], iris.analysis.MEAN)
cube = cube.aggregated_by(
["month_number", "day_of_month"], iris.analysis.MAX
)
cube = cube.aggregated_by(
["month_number", "day_of_month"], iris.analysis.MIN
)
cube = cube.aggregated_by(
["month_number", "day_of_month"], iris.analysis.MEAN
)
def _change_units(self, cube, var_code):
var_name = cube.var_name
elif var_code in (146, 147, 169, 175, 176, 177, 179, 212):
# radiation
cube = cube / (self.experiment.atmos_timestep * 3600)
elif var_code in (180, 181):
# momentum flux
cube = cube / (self.experiment.atmos_timestep * 3600)
elif var_code in (144, 182, 205, 228):
# precipitation/evaporation/runoff
cube = cube * 1000 / (self.experiment.atmos_timestep * 3600)
cube.var_name = var_name
return cube
def _merge_and_cmorize_atmos(
self, chunk_start, chunk_end, grid, frequency
):
merged_file = "MMA_{0}_{1}_{2}_{3}.nc".format(
frequency, date2str(chunk_start), date2str(chunk_end), grid
)
files = glob.glob(
os.path.join(
self.config.scratch_dir,
"{0}_*_{1}.nc".format(
self._get_grib_filename(grid, chunk_start), frequency
),
)
)
def _load_cube(cube, field, filename):