Commit e145aafe authored by Javier Vegas-Regidor's avatar Javier Vegas-Regidor
Browse files

Merge branch 'master' into 'production'

Master

See merge request !47
parents 7785accf ad717ce8
......@@ -18,7 +18,6 @@ class CDFTools(object):
def __init__(self, path=''):
self.path = path
self.data_convention = ''
# noinspection PyShadowingBuiltins
def run(self, command, input_file, output_file=None, options=None, log_level=Log.INFO, input_option=None):
......
Subproject commit 5236593cf11fd3175dd1e48d201cd507bd2e37ac
Subproject commit f3e44bf8211642e519597f73a0510e13ec3d4725
......@@ -6,11 +6,18 @@ import shutil
import uuid
import traceback
import pygrib
from datetime import datetime
from datetime import datetime, timedelta
import six
from bscearth.utils.date import parse_date, chunk_end_date, previous_day, date2str, add_months
from bscearth.utils.log import Log
from cdo import CDOException
import iris
import iris.coord_categorisation
import iris.analysis
import iris.util
import iris.exceptions
import cf_units
from earthdiagnostics.datafile import NetCDFFile
from earthdiagnostics.frequency import Frequency, Frequencies
......@@ -277,9 +284,7 @@ class Cmorizer(object):
if not os.path.exists(next_gribfile):
os.remove(gribfile)
self._ungrib_vars(gribfile, current_date.month, Frequency('{0}hr'.format(self.atmos_timestep)))
self._ungrib_vars(gribfile, current_date.month, Frequencies.daily)
self._ungrib_vars(gribfile, current_date.month, Frequencies.monthly)
self._ungrib_vars(gribfile, current_date.month)
for splited_file in glob.glob('{0}_*.128.nc'.format(gribfile)):
os.remove(splited_file)
......@@ -301,18 +306,18 @@ class Cmorizer(object):
codes_str = ','.join([str(code) for code in codes])
try:
if grid == 'SH':
Utils.cdo.splitparam(input='-sp2gpl -selmon,{2} -selcode,{0} {1} '.format(codes_str, full_file, month),
Utils.cdo.splitparam(input='-sp2gpl -selcode,{0} {1} '.format(codes_str, full_file),
output=gribfile + '_',
options='-f nc4')
options='-f nc4 -t ecmwf')
else:
Utils.cdo.splitparam(input='-selmon,{2} -selcode,{0} {1}'.format(codes_str, full_file, month),
Utils.cdo.splitparam(input='-selcode,{0} {1}'.format(codes_str, full_file),
output=gribfile + '_',
options='-R -f nc4')
options='-R -f nc4 -t ecmwf')
# total precipitation (remove negative values)
if 228 in codes:
Utils.cdo.setcode(228,
input='-setmisstoc,0 -setvrange,0,Inf '
input='-chname,LSP,TP -setmisstoc,0 -setvrange,0,Inf '
'-add {0}_142.128.nc {0}_143.128.nc'.format(gribfile),
output='{0}_228.128.nc'.format(gribfile),
options='-f nc4')
......@@ -442,10 +447,37 @@ class Cmorizer(object):
return
temp = TempFile.get()
Utils.nco.ncks(input=file_path, output=temp, options=('-v {0}'.format(variable),))
self._rename_level_variables(temp, var_cmor)
handler = Utils.open_cdf(file_path)
coords = [self.lon_name, self.lat_name, 'time']
if 'leadtime' in handler.variables.keys():
coords.append('leadtime')
self._add_coordinate_variables(file_path, temp)
if var_cmor.domain == ModelingRealms.ocean:
lev_dimensions = {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev',
'depth': 'lev'}
elif var_cmor.domain in [ModelingRealms.landIce, ModelingRealms.land]:
lev_dimensions = {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth',
'depth_4': 'sdepth'}
elif var_cmor.domain == ModelingRealms.atmos:
lev_dimensions = {'depth': 'plev'}
else:
lev_dimensions = {}
for lev_dim in lev_dimensions.keys():
if lev_dim in handler.variables[variable].dimensions:
coords.append(lev_dim)
handler.variables[variable].coordinates = ' '.join(set(coords))
handler.close()
cube = iris.load_cube(file_path, iris.Constraint(cube_func=lambda c: c.var_name == variable))
for lev_original, lev_target in six.iteritems(lev_dimensions):
try:
cube.coord(lev_original).var_name = lev_target
except iris.exceptions.CoordinateNotFoundError:
pass
iris.save(cube, temp, zlib=True)
if alias.basin is None:
region = None
......@@ -512,139 +544,115 @@ class Cmorizer(object):
raise Exception('File {0} start date is not a valid chunk start date'.format(file_path))
return chunk
def _add_coordinate_variables(self, file_path, temp):
handler_cmor = Utils.open_cdf(temp)
handler = Utils.open_cdf(file_path, 'r')
Utils.copy_variable(handler, handler_cmor, self.lon_name, False)
Utils.copy_variable(handler, handler_cmor, self.lat_name, False)
if 'time' in handler_cmor.dimensions.keys():
Utils.copy_variable(handler, handler_cmor, 'leadtime', False)
handler_cmor.close()
handler.close()
@staticmethod
def _rename_level_variables(temp, var_cmor):
if var_cmor.domain == ModelingRealms.ocean:
Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev',
'depth': 'lev'}, False)
if var_cmor.domain in [ModelingRealms.landIce, ModelingRealms.land]:
Utils.rename_variables(temp, {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth',
'depth_4': 'sdepth'}, False)
if var_cmor.domain == ModelingRealms.atmos:
Utils.rename_variables(temp, {'depth': 'plev'}, False)
def _merge_grib_files(self, current_month, prev_gribfile, gribfile):
Log.info('Merging data from different files...')
rules_path = os.path.join(self.config.scratch_dir, 'rules_files')
fd = open(rules_path, 'w')
fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(current_month))
fd.close()
# get first timestep for each month from previous file (if possible)
if os.path.exists(self.path_icm):
os.remove(self.path_icm)
Utils.execute_shell_command('grib_filter -o {2} {3} '
'{0} {1}'.format(prev_gribfile, gribfile, self.path_icm, rules_path))
os.remove(rules_path)
Utils.remove_file(prev_gribfile)
def _ungrib_vars(self, gribfile, month, frequency):
cdo_reftime = parse_date(self.startdate).strftime('%Y-%m-%d,00:00')
Log.info('Preparing {0} variables'.format(frequency))
var_codes = self.config.cmor.get_variables(frequency)
for var_code in var_codes:
if not os.path.exists('{0}_{1}.128.nc'.format(gribfile, var_code)):
temp = TempFile.get(suffix='.grb')
Utils.cdo.selmon(current_month.month, input=prev_gribfile, output=temp)
Utils.cdo.mergetime(input=[temp, gribfile],
output=self.path_icm)
os.remove(prev_gribfile)
os.remove(temp)
def _ungrib_vars(self, gribfile, month):
for var_code in self.cmor.get_requested_codes():
file_path = '{0}_{1}.128.nc'.format(gribfile, var_code)
if not os.path.exists(file_path):
continue
new_units = None
cdo_operator = ''
cdo_operator = self._get_time_average(cdo_operator, frequency, var_code)
cdo_operator = self._fix_time_shift(cdo_operator, var_code)
cdo_operator, new_units = self._change_units(cdo_operator, new_units, var_code)
levels = self.config.cmor.get_levels(frequency, var_code)
if levels:
cdo_operator = "{0} -sellevel,{1}".format(cdo_operator, levels)
Utils.cdo.setreftime(cdo_reftime,
input='{2} {0}_{1}.128.nc '.format(gribfile, var_code, cdo_operator),
output='{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency),
options='-t ecmwf')
h_var_file = '{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency)
handler = Utils.open_cdf(h_var_file)
if new_units:
for var in handler.variables.values():
if 'code' in var.ncattrs() and var.code == var_code:
var.units = new_units
break
var_name = None
for key in handler.variables.keys():
if key + '_2' in handler.variables and key not in handler.dimensions:
var_name = key
handler.close()
cube = iris.load_cube(file_path)
cube = self._fix_time_coord(cube, var_code)
cube = cube.extract(iris.Constraint(month_number=month))
cube = self._change_units(cube, var_code)
for frequency in (Frequencies.monthly, Frequencies.daily, Frequency('{0}hr'.format(self.atmos_timestep))):
if var_code not in self.cmor.get_variables(frequency):
continue
time_cube = self._get_time_average(cube, frequency, var_code)
levels = self.config.cmor.get_levels(frequency, var_code)
if levels:
time_cube = time_cube.extract(level=levels)
if cube.var_name.endswith('_2'):
time_cube.var_name = cube.var_name[:-2]
if var_name is not None:
Utils.nco.ncks(input='{0}_{1}_1m.nc'.format(gribfile, var_code),
output='{0}_{1}_1m.nc'.format(gribfile, var_code),
options=('-O -v {0}'.format(var_name)))
out_file = '{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency)
time_cube.remove_coord('month_number')
time_cube.remove_coord('day_of_month')
iris.save(time_cube, out_file, zlib=True)
def _fix_time_coord(self, cube, var_code):
time = cube.coord('time')
target_units = 'days since 1950-01-01 00:00:00'.format(parse_date(self.startdate))
time.convert_units(cf_units.Unit(target_units, calendar=time.units.calendar))
time.units = target_units
def _fix_time_shift(self, cdo_operator, var_code):
if var_code in (144, 146, 147, 169, 175, 176, 177, 179, 180, 181, 182, 201, 202, 205, 212, 228):
cdo_operator = '{0} -shifttime,-{1}hours'.format(cdo_operator, self.experiment.atmos_timestep)
return cdo_operator
time.points = time.points - (self.experiment.atmos_timestep / 24.0)
iris.coord_categorisation.add_day_of_month(cube, 'time')
iris.coord_categorisation.add_month_number(cube, 'time')
return cube
@staticmethod
def _get_time_average(cdo_operator, frequency, var_code):
def _get_time_average(cube, frequency, var_code):
if frequency == Frequencies.monthly:
if var_code == 201:
cdo_operator = "-monmean -daymax {0}".format(cdo_operator)
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MAX)
elif var_code == 202:
cdo_operator = "-monmean -daymin {0}".format(cdo_operator)
else:
cdo_operator = "-monmean {0} ".format(cdo_operator)
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MIN)
cube = cube.aggregated_by(['month_number'], iris.analysis.MEAN)
elif frequency == Frequencies.daily:
if var_code == 201:
cdo_operator = "-daymax {0} ".format(cdo_operator)
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MAX)
elif var_code == 202:
cdo_operator = "-daymin {0} ".format(cdo_operator)
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MIN)
else:
cdo_operator = "-daymean {0} ".format(cdo_operator)
return cdo_operator
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MEAN)
return cube
def _change_units(self, cdo_operator, new_units, var_code):
def _change_units(self, cube, var_code):
var_name = cube.var_name
if var_code == 129:
# geopotential
new_units = "m"
cdo_operator = "-divc,9.81 {0}".format(cdo_operator)
cube = cube / 9.81
cube.units = 'm'
elif var_code in (146, 147, 169, 175, 176, 177, 179, 212):
# radiation
new_units = "W m-2"
cdo_operator = "-divc,{0} {1}".format(self.experiment.atmos_timestep * 3600, cdo_operator)
cube = cube / (self.experiment.atmos_timestep * 3600)
cube.units = 'W m-2'
elif var_code in (180, 181):
# momentum flux
new_units = "N m-2"
cdo_operator = "-divc,{0} {1}".format(self.experiment.atmos_timestep * 3600, cdo_operator)
cube = cube / (self.experiment.atmos_timestep * 3600)
cube.units = 'N m-2'
elif var_code in (144, 182, 205, 228):
# precipitation/evaporation/runoff
new_units = "kg m-2 s-1"
cdo_operator = "-mulc,1000 -divc,{0} {1}".format(self.experiment.atmos_timestep * 3600, cdo_operator)
return cdo_operator, new_units
cube = cube * 1000 / (self.experiment.atmos_timestep * 3600)
cube.units = 'kg m-2 s-1'
cube.var_name = var_name
return cube
def _merge_and_cmorize_atmos(self, chunk_start, chunk_end, grid, frequency):
merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid)
files = glob.glob(os.path.join(self.config.scratch_dir,
'{0}_*_{1}.nc'.format(self._get_grib_filename(grid, chunk_start), frequency)))
def _load_cube(cube, field, filename):
if 'history' in cube.attributes:
del cube.attributes['history']
for first_file in files:
shutil.move(first_file, merged_file)
current_month = add_months(chunk_start, 1, self.experiment.calendar)
var_files = []
current_month = chunk_start
while current_month < chunk_end:
month_file = first_file.replace('+{0}.grb'.format(date2str(chunk_start)[:-2]),
'+{0}.grb'.format(date2str(current_month)[:-2]))
Utils.concat_variables(month_file, merged_file, True)
var_files.append(first_file.replace('+{0}.grb'.format(date2str(chunk_start)[:-2]),
'+{0}.grb'.format(date2str(current_month)[:-2])))
current_month = add_months(current_month, 1, self.experiment.calendar)
var_cubes = iris.load(var_files, callback=_load_cube)
iris.util.unify_time_units(var_cubes)
var_cube = var_cubes.concatenate_cube()
iris.save(var_cube, merged_file, zlib=True)
for var_file in var_files:
os.remove(var_file)
self._cmorize_nc_file(merged_file)
def _update_time_variables(self, handler):
......@@ -685,7 +693,9 @@ class Cmorizer(object):
handler.Conventions = 'CF-1.6'
handler.creation_date = datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)')
handler.experiment_id = experiment.experiment_name
handler.forecast_reference_time = parse_date(self.startdate).strftime('%Y-%m-%d(T%H:%M:%SZ)')
startdate = parse_date(self.startdate)
handler.forecast_reference_time = '{0.year}-{0.month:02}-{0.day:02}' \
'(T{0.hour:02}:{0.minute:02}:{0.second:02}Z)'.format(startdate)
handler.frequency = frequency.frequency
handler.institute_id = experiment.institute
handler.institution = experiment.institute
......
This diff is collapsed.
......@@ -12,6 +12,8 @@ from earthdiagnostics import cdftools
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealm
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.data_convention import SPECSConvention, CMIP6Convention, PrimaveraConvention, \
MeteoFranceConvention, PrefaceConvention
class ConfigException(Exception):
......@@ -137,22 +139,32 @@ class Config(object):
self.mask_regions = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS', '')
self.mask_regions_3d = parser.get_path_option('DIAGNOSTICS', 'MASK_REGIONS_3D', '')
self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION',
('specs', 'primavera', 'cmip6', 'preface', 'meteofrance'),
'specs',
ignore_case=True)
if self.data_convention in ('primavera', 'cmip6'):
self.scratch_masks = os.path.join(self.scratch_masks, self.data_convention)
cdftools.data_convention = self.data_convention
namelist_file = os.path.join(os.path.dirname(__file__), 'CDFTOOLS_{0}.namlist'.format(self.data_convention))
data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION',
('specs', 'primavera', 'cmip6', 'preface', 'meteofrance'),
'specs',
ignore_case=True)
if data_convention == 'specs':
self.data_convention = SPECSConvention(data_convention, self)
elif data_convention == 'primavera':
self.data_convention = PrimaveraConvention(data_convention, self)
elif data_convention == 'cmip6':
self.data_convention = CMIP6Convention(data_convention, self)
elif data_convention == 'preface':
self.data_convention = PrefaceConvention(data_convention, self)
elif data_convention == 'meteofrance':
self.data_convention == MeteoFranceConvention(data_convention, self)
self.scratch_masks = self.data_convention.get_scratch_masks(self.scratch_masks)
namelist_file = os.path.join(os.path.dirname(__file__),
'CDFTOOLS_{0}.namlist'.format(self.data_convention.name))
Log.debug(namelist_file)
if os.path.isfile(namelist_file):
Log.debug('Setting namelist {0}', namelist_file)
os.environ['NAM_CDF_NAMES'] = namelist_file
self.var_manager = VariableManager()
self.var_manager.load_variables(self.data_convention)
self.var_manager.load_variables(data_convention)
self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS')
self.skip_diags_done = parser.get_bool_option('DIAGNOSTICS', 'SKIP_DIAGS_DONE', True)
self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY'))
......
This diff is collapsed.
......@@ -5,8 +5,10 @@ import os
import shutil
from datetime import datetime
import six
import iris
import iris.coords
import iris.exceptions
import numpy as np
from bscearth.utils.log import Log
......@@ -160,6 +162,7 @@ class DataFile(Publisher):
----------
diagnostic: Diagnostic
Returns
-------
bool
......@@ -308,7 +311,7 @@ class DataFile(Publisher):
var_handler = handler.variables[self.final_name]
coords = set.intersection({'time', 'lev', self.lat_name, self.lon_name, 'leadtime', 'region', 'time_centered'},
set(handler.variables.keys()))
var_handler.coordinates = ' '.join(coords)
var_handler.coordinates = Utils.convert_to_ascii_if_possible(' '.join(coords))
if 'time_centered' in handler.variables:
if hasattr(handler.variables['time_centered'], 'standard_name'):
del handler.variables['time_centered'].standard_name
......@@ -438,7 +441,14 @@ class DataFile(Publisher):
original_regions = handler.variables['region'][...]
var = handler.variables[self.final_name]
if region in original_regions:
var[np.where(original_regions == region)[0][0], ...] = region_slice
indices = list()
region_index = np.where(original_regions == region)[0][0]
for dim in var.dimensions:
if dim == 'region':
indices.append(region_index)
else:
indices.append(slice(None))
var[indices] = region_slice
else:
var[original_regions.shape[0], ...] = region_slice
handler.variables[-1] = region
......@@ -608,6 +618,9 @@ class UnitConversion(object):
else:
return None, None
def check_is_in_storage(self):
return
class NetCDFFile(DataFile):
"""Implementation of DataFile for netCDF files"""
......@@ -641,6 +654,30 @@ class NetCDFFile(DataFile):
Log.error('File {0} not available: {1}', self.remote_file, ex)
self.local_status = LocalStatus.FAILED
def check_is_in_storage(self):
if os.path.isfile(self.remote_file):
if self.region:
try:
cubes = iris.load(self.remote_file)
for cube in cubes:
try:
if isinstance(self.region, six.string_types):
regions = {self.region.name}
else:
regions = {basin.name for basin in self.region}
if regions.issubset(set(cube.coord('region').points)):
self.storage_status = StorageStatus.READY
except iris.exceptions.CoordinateNotFoundError:
pass
except iris.exceptions.TranslationError as ex:
# If the check goes wrong, we must execute everything
os.remove(self.remote_file)
except Exception as ex:
pass
else:
self.storage_status = StorageStatus.READY
def create_link(self):
"""Create a link from the original data in the <frequency>_<var_type> folder"""
try:
......
# coding: utf-8
"""Base data manager for Earth diagnostics"""
import threading
from earthdiagnostics.datafile import NetCDFFile as NCfile, StorageStatus, LocalStatus, UnitConversion
from earthdiagnostics.modelingrealm import ModelingRealms
......@@ -19,10 +18,8 @@ class DataManager(object):
def __init__(self, config):
self.config = config
self.experiment = config.experiment
self._checked_vars = list()
self.variable_list = config.var_manager
UnitConversion.load_conversions()
self.lock = threading.Lock()
self.requested_files = {}
def _get_file_from_storage(self, filepath):
......@@ -55,42 +52,6 @@ class DataManager(object):
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
return var
def get_varfolder(self, domain, var, grid=None, frequency=None):
"""Get variable folder name for <frequency>_<var_type> folder"""
if grid:
var = '{0}-{1}'.format(var, grid)
if domain in [ModelingRealms.ocean, ModelingRealms.seaIce, ModelingRealms.ocnBgchem]:
return DataManager._apply_fxh(var, self.experiment.ocean_timestep, frequency)
else:
return DataManager._apply_fxh(var, self.experiment.atmos_timestep, frequency)
@staticmethod
def _apply_fxh(folder_name, timestep, frequency=None):
is_base_frequency = frequency is not None and frequency.frequency.endswith('hr')
if not is_base_frequency and timestep > 0:
return '{0}_f{1}h'.format(folder_name, timestep)
return folder_name
def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
"""
Create file link
Must be implementd by the derived classes. If not, this method will have no effect
Parameters
----------
domain: ModelingRealm
filepath: str
frequency: Frequency
var: str
grid: str
move_old: bool
vartype: VariableType
"""
pass
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
"""
......
......@@ -89,6 +89,7 @@ class Diagnostic(Publisher):
if not self._generated_files:
return False
for file_generated in self._generated_files:
file_generated.check_is_in_storage()
if file_generated.storage_status != StorageStatus.READY:
return False
if file_generated.has_modifiers():
......@@ -109,13 +110,14 @@ class Diagnostic(Publisher):
def status(self, value):
if self._status == value:
return
old_status = self.status
self._status = value
if self.status == DiagnosticStatus.RUNNING:
for generated_file in self._generated_files:
generated_file.local_status = LocalStatus.COMPUTING
if self.status in (DiagnosticStatus.FAILED, DiagnosticStatus.COMPLETED):
self._unsuscribe_requests()
self.dispatch(self)
self.dispatch(self, old_status)
@staticmethod
def register(diagnostic_class):
......@@ -205,8 +207,8 @@ class Diagnostic(Publisher):
region = region.name
generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box,
diagnostic=self, vartype=vartype, frequency=frequency)
if region is not None: