Newer
Older
Javier Vegas-Regidor
committed
import pygrib
from datetime import datetime, timedelta
from bscearth.utils.date import parse_date, chunk_end_date, previous_day, date2str, add_months
import iris
import iris.coord_categorisation
import iris.analysis
import iris.util
import cf_units
from earthdiagnostics.datafile import NetCDFFile
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import TempFile, Utils
class Cmorizer(object):
Parameters
----------
data_manager: DataManager
startdate: str
member: int
NON_DATA_VARIABLES = ('lon', 'lat', 'longitude', 'latitude', 'plev', 'time', 'time_bnds', 'leadtime', 'lev',
'lev_2', 'icethi',
'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds',
'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds',
Javier Vegas-Regidor
committed
'deptht_bnds', 'depthu_bnds', 'depthv_bnds', 'depthw_bnds',
'time_counter_bounds', 'ncatice', 'nav_lat_grid_V', 'nav_lat_grid_U',
'nav_lat_grid_T', 'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T',
'depth', 'depth_2', 'depth_3', 'depth_4',
'depth_bnds', 'depth_2_bnds', 'depth_3_bnds', 'depth_4_bnds',
def __init__(self, data_manager, startdate, member):
self.data_manager = data_manager
self.startdate = startdate
self.member = member
self.config = data_manager.config
self.experiment = self.config.experiment
self.cmor = self.config.cmor
self.member_str = self.experiment.get_member_str(member)
self.original_files_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files',
self.startdate, self.member_str, 'outputs')
self.cmor_scratch = str(os.path.join(self.config.scratch_dir, 'CMOR', self.startdate, self.member_str))
if self.config.data_convention in ('primavera', 'cmip6'):
self.lon_name = 'longitude'
self.lat_name = 'latitude'
else:
self.lon_name = 'lon'
self.lat_name = 'lat'
self.alt_coord_names = {'time_counter': 'time', 'time_counter_bnds': 'time_bnds',
'time_counter_bounds': 'time_bnds',
'tbnds': 'bnds', 'nav_lat': self.lat_name, 'nav_lon': self.lon_name, 'x': 'i', 'y': 'j'}
def path_icm(self):
"""Path to the ICM file"""
return os.path.join(self.config.scratch_dir, 'ICM')
Log.info('Skipping ocean cmorization due to configuration')
Log.info('\nCMORizing ocean\n')
self._cmorize_ocean_files('MMO', 'PPO', 'diags')
def _cmorize_ocean_files(self, *args):
tar_files = ()
for prefix in args:
tar_folder = os.path.join(self.original_files_path, '{0}*'.format(prefix))
tar_files = glob.glob(tar_folder)
tar_files.sort()
if len(tar_files) > 0:
break
if not len(tar_files):
Log.error('No {1} files found in {0}'.format(self.original_files_path, args))
count = 1
for tarfile in tar_files:
if not self._cmorization_required(self._get_chunk(os.path.basename(tarfile)), (ModelingRealms.ocean,
ModelingRealms.seaIce,
ModelingRealms.ocnBgchem)):
Log.info('No need to unpack file {0}/{1}'.format(count, len(tar_files)))
continue
Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files)))
Javier Vegas-Regidor
committed
try:
self._unpack_tar_file(tarfile)
self._cmorize_nc_files()
Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files)))
except Exception as ex:
Log.error('Could not CMORize oceanic file {0}: {1}', count, ex)
if count > self.experiment.num_chunks:
def _filter_files(self, file_list):
Javier Vegas-Regidor
committed
if not self.cmor.filter_files:
return file_list
Javier Vegas-Regidor
committed
filters = self.cmor.filter_files.split(' ')
for file_path in file_list:
filename = os.path.basename(file_path)
if any(f in filename for f in filters):
filtered.append(file_path)
os.remove(file_path)
if len(filtered) == 0:
Log.warning('Filters {0} do not match any of the files', filters)
return filtered
Javier Vegas-Regidor
committed
nc_files = glob.glob(os.path.join(self.cmor_scratch, '*.nc'))
for filename in self._filter_files(nc_files):
def _correct_fluxes(self):
fluxes_vars = [self.data_manager.variable_list.get_variable(cmor_var, True).short_name
for cmor_var in ('prc', "prsn", "rss", "rls", "rsscs", "rsds", "rlds", "hfss", 'hfls')]
change_sign_vars = [self.data_manager.variable_list.get_variable(cmor_var, True).short_name
for cmor_var in ("hfss", 'hfls')]
total_seconds = (self.experiment.atmos_timestep * 3600)
for filename in glob.glob(os.path.join(self.cmor_scratch, '*.nc')):
for varname in handler.variables.keys():
cmor_var = self.data_manager.variable_list.get_variable(varname, True)
if cmor_var is None or cmor_var.short_name not in fluxes_vars:
if cmor_var.short_name in change_sign_vars:
sign = -1
else:
sign = 1
var_handler = handler.variables[varname]
var_handler[:] = sign * var_handler[:] / total_seconds
var_handler.units = '{0} {1}'.format(var_handler.units, 's-1')
os.makedirs(self.cmor_scratch)
Utils.untar((tarfile,), self.cmor_scratch)
if os.path.isdir(os.path.join(self.cmor_scratch, 'backup')):
for filepath in glob.glob(os.path.join(self.cmor_scratch, 'backup', '*')):
Log.debug('Moving file {0}', filepath)
shutil.move(filepath, filepath.replace('/backup/', '/'))
zip_files = glob.glob(os.path.join(self.cmor_scratch, '*.gz'))
if zip_files:
for zip_file in self._filter_files(zip_files):
try:
Utils.unzip(zip_file)
except Utils.UnzipException as ex:
Log.error('File {0} could not be unzipped: {1}', tarfile, ex)
Javier Vegas-Regidor
committed
if os.path.exists(self.cmor_scratch):
shutil.rmtree(self.cmor_scratch)
def _merge_mma_files(self, tarfile):
temp = TempFile.get()
for grid in ['SH', 'GG']:
files = glob.glob(os.path.join(self.cmor_scratch, 'MMA_*_{}_*.nc'.format(grid)))
if not files:
continue
merged = TempFile.get()
if grid == 'SH':
for filename in files:
Utils.cdo.sp2gpl(options='-O', input=filename, output=temp)
shutil.move(temp, filename)
Utils.cdo.mergetime(input=files, output=merged)
for filename in files:
os.remove(filename)
tar_startdate = os.path.basename(tarfile[0:-4]).split('_')[4].split('-')
filename = 'MMA{0}_1m_{1[0]}_{1[1]}.nc'.format(grid, tar_startdate)
shutil.move(merged, os.path.join(self.cmor_scratch, filename))
def cmorize_atmos(self):
"""Cmorize atmospheric data, from grib or MMA files"""
if not self.cmor.atmosphere:
Log.info('Skipping atmosphere cmorization due to configuration')
Log.info('\nCMORizing atmosphere\n')
if self.cmor.use_grib and self._gribfiles_available():
Javier Vegas-Regidor
committed
else:
self._cmorize_mma_files()
def _cmorize_mma_files(self):
tar_files = glob.glob(os.path.join(self.original_files_path, 'MMA*'))
tar_files.sort()
count = 1
if len(tar_files) == 0:
Log.error('MMA files not found in {0}'.format(self.original_files_path))
if not self._cmorization_required(self._get_chunk(os.path.basename(tarfile)), (ModelingRealms.atmos,)):
Log.info('No need to unpack file {0}/{1}'.format(count, len(tar_files)))
count += 1
continue
Log.info('Unpacking atmospheric file {0}/{1}'.format(count, len(tar_files)))
Javier Vegas-Regidor
committed
try:
self._unpack_tar_file(tarfile)
self._merge_mma_files(tarfile)
self._correct_fluxes()
self._cmorize_nc_files()
Log.result('Atmospheric file {0}/{1} finished'.format(count, len(tar_files)))
except Exception as ex:
Log.error('Could not cmorize atmospheric file {0}: {1}\n {2}', count, ex, traceback.format_exc())
Javier Vegas-Regidor
committed
count += 1
def _cmorize_grib_files(self):
chunk = 1
chunk_start = parse_date(self.startdate)
while os.path.exists(self._get_original_grib_path(chunk_start, 'GG')) or \
os.path.exists(self._get_original_grib_path(chunk_start, 'SH')):
if self._cmorization_required(chunk, (ModelingRealms.atmos,)):
chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar)
chunk_end = previous_day(chunk_end, self.experiment.calendar)
Log.info('CMORizing chunk {0}-{1}', date2str(chunk_start), date2str(chunk_end))
Javier Vegas-Regidor
committed
try:
for grid in ('SH', 'GG'):
Log.info('Processing {0} variables', grid)
Javier Vegas-Regidor
committed
first_grib = self._get_original_grib_path(chunk_start, grid)
if not os.path.exists(first_grib):
continue
var_list = Utils.cdo.showvar(input=first_grib)[0]
codes = {int(var.replace('var', '')) for var in var_list.split()}
if not codes.intersection(self.config.cmor.get_requested_codes()):
Log.info('No requested variables found in {0}. Skipping...', grid)
Javier Vegas-Regidor
committed
continue
self._cmorize_grib_file(chunk_end, chunk_start, grid)
Javier Vegas-Regidor
committed
except Exception as ex:
Log.error('Can not cmorize GRIB file for chunk {0}-{1}: {2}',
date2str(chunk_start), date2str(chunk_end), ex)
chunk_start = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar)
chunk += 1
def _cmorize_grib_file(self, chunk_end, chunk_start, grid):
for month in range(0, self.experiment.chunk_size):
current_date = add_months(chunk_start, month, self.experiment.calendar)
original_gribfile = self._get_original_grib_path(current_date, grid)
Log.info('Processing month {1}', grid, date2str(current_date))
gribfile = self._get_scratch_grib_path(current_date, grid)
if not os.path.isfile(gribfile):
Log.info('Copying file...', grid, date2str(current_date))
Utils.copy_file(original_gribfile, gribfile)
full_file = self._get_monthly_grib(current_date, gribfile, grid)
if not self._unpack_grib(full_file, gribfile, grid, current_date.month):
next_gribfile = self._get_original_grib_path(add_months(current_date, 1, self.experiment.calendar), grid)
if not os.path.exists(next_gribfile):
os.remove(gribfile)
self._ungrib_vars(gribfile, current_date.month, Frequency('{0}hr'.format(self.atmos_timestep)))
self._ungrib_vars(gribfile, current_date.month, Frequencies.daily)
self._ungrib_vars(gribfile, current_date.month, Frequencies.monthly)
for splited_file in glob.glob('{0}_*.128.nc'.format(gribfile)):
os.remove(splited_file)
Log.result('Month {0}, {1} variables finished', date2str(current_date), grid)
self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid, Frequencies.monthly)
self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid, Frequencies.daily)
self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid,
'{0}hr'.format(self.atmos_timestep))
def _unpack_grib(self, full_file, gribfile, grid, month):
Log.info('Unpacking... ')
# remap on regular Gauss grid
codes = self.cmor.get_requested_codes()
if 228 in codes:
codes_str = ','.join([str(code) for code in codes])
try:
if grid == 'SH':
Utils.cdo.splitparam(input='-sp2gpl -selcode,{0} {1} '.format(codes_str, full_file),
options='-f nc4 -t ecmwf')
Utils.cdo.splitparam(input='-selcode,{0} {1}'.format(codes_str, full_file),
options='-R -f nc4 -t ecmwf')
# total precipitation (remove negative values)
if 228 in codes:
Utils.cdo.setcode(228,
input='-setmisstoc,0 -setvrange,0,Inf '
'-add {0}_142.128.nc {0}_143.128.nc'.format(gribfile),
output='{0}_228.128.nc'.format(gribfile),
options='-f nc4 -t ecmwf')
return True
except CDOException:
Log.info('No requested codes found in {0} file'.format(grid))
return False
finally:
def _get_monthly_grib(self, current_date, gribfile, grid):
prev_gribfile = self._get_scratch_grib_path(add_months(current_date, -1, self.experiment.calendar), grid)
if os.path.exists(prev_gribfile):
self._merge_grib_files(current_date, prev_gribfile, gribfile)
else:
full_file = gribfile
return full_file
def _get_scratch_grib_path(self, current_date, grid):
return os.path.join(self.config.scratch_dir, self._get_grib_filename(grid, current_date))
def _obtain_atmos_timestep(self, gribfile):
if self.atmos_timestep is None:
self.atmos_timestep = self._get_atmos_timestep(gribfile)
def _get_original_grib_path(self, current_date, grid):
return os.path.join(self.original_files_path,
self._get_grib_filename(grid, current_date))
def _get_grib_filename(self, grid, month):
return 'ICM{0}{1}+{2}.grb'.format(grid, self.experiment.expid, date2str(month)[:-2])
def _get_atmos_timestep(self, gribfile):
Log.info('Getting timestep...')
grib_handler = pygrib.open(gribfile)
dates = set()
for mes in grib_handler:
dates.add(mes.validDate)
dates = list(dates)
dates.sort()
atmos_timestep = dates[1] - dates[0]
atmos_timestep = int(atmos_timestep.total_seconds() / 3600)
self.experiment.atmos_timestep = atmos_timestep
grib_handler.close()
return atmos_timestep
def _cmorize_nc_file(self, filename):
Log.info('Processing file {0}', filename)
if not self._contains_requested_variables(filename):
os.remove(filename)
return
frequency = self._get_nc_file_frequency(filename)
Utils.rename_variables(filename, self.alt_coord_names, False)
self._add_common_attributes(handler, frequency)
self._update_time_variables(handler)
variables = handler.variables.keys()
handler.close()
if variable in Cmorizer.NON_DATA_VARIABLES:
continue
self.extract_variable(filename, frequency, variable)
except Exception as ex:
Log.error('Variable {0} can not be cmorized: {1}', variable, ex)
Log.result('File {0} cmorized!', filename)
os.remove(filename)
Javier Vegas-Regidor
committed
for variable in handler.variables.keys():
var = handler.variables[variable]
if 'valid_min' in var.ncattrs():
del var.valid_min
if 'valid_max' in var.ncattrs():
del var.valid_max
Javier Vegas-Regidor
committed
file_parts = os.path.basename(filename).split('_')
if self.experiment.expid in [file_parts[1], file_parts[2]]:
frequency = Frequency('m')
Javier Vegas-Regidor
committed
elif self.experiment.expid == file_parts[0]:
try:
parse_date(file_parts[1])
frequency = Frequency('m')
Javier Vegas-Regidor
committed
except ValueError:
frequency = Frequency(file_parts[1])
frequency = Frequency(file_parts[1])
return frequency
def _contains_requested_variables(self, filename):
variables = Utils.get_file_variables(filename)
return self.cmor.any_required(variables)
def extract_variable(self, file_path, frequency, variable):
Extract a variable from a file and creates the CMOR file
Parameters
----------
file_path:str
frequency: Frequency
variable: str
Raises
------
CMORException
If the filename does not match any of the recognized patterns
alias, var_cmor = self.config.var_manager.get_variable_and_alias(variable)
if var_cmor is None:
return
if not self.cmor.cmorize(var_cmor):
return
temp = TempFile.get()
Utils.nco.ncks(input=file_path, output=temp, options=('-v {0}'.format(variable),))
self._rename_level_variables(temp, var_cmor)
self._add_coordinate_variables(file_path, temp)
Log.error('Variable {0} can not be cmorized. Original filename does not match a recognized pattern',
var_cmor.short_name)
raise CMORException('Variable {0}:{1} can not be cmorized. Original filename does not match a recognized '
'pattern'.format(var_cmor.domain, var_cmor.short_name))
netcdf_file = NetCDFFile()
netcdf_file.data_manager = self.data_manager
netcdf_file.local_file = temp
netcdf_file.remote_file = self.data_manager.get_file_path(self.startdate, self.member,
var_cmor.domain, var_cmor.short_name, var_cmor,
None, frequency,
grid=alias.grid, year=None, date_str=date_str)
netcdf_file.data_convention = self.config.data_convention
netcdf_file.region = region
netcdf_file.frequency = frequency
netcdf_file.domain = var_cmor.domain
netcdf_file.var = var_cmor.short_name
netcdf_file.final_name = var_cmor.short_name
netcdf_file.prepare_to_upload(rename_var=variable)
netcdf_file.add_cmorization_history()
netcdf_file.upload()
if region:
region_str = ' (Region {})'.format(region)
else:
region_str = ''
Log.info('Variable {0.domain}:{0.short_name} processed{1}', var_cmor, region_str)
valid_starts = (self.experiment.expid, 'MMA', 'MMASH', 'MMAGG', 'MMO')
if file_parts[0] in valid_starts or file_parts[0].startswith('ORCA'):
Javier Vegas-Regidor
committed
if file_parts[-1].endswith('.tar'):
file_parts = file_parts[-1][0:-4].split('-')
return '{0}-{1}'.format(file_parts[0][0:6], file_parts[1][0:6])
else:
return '{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6])
elif file_parts[1] == self.experiment.expid:
# Files generated by the old version of the diagnostics
return '{0}-{1}'.format(file_parts[4][0:6], file_parts[5][0:6])
else:
return None
def _get_chunk(self, file_path):
chunk_start = parse_date(self._get_date_str(file_path).split('-')[0])
current_date = parse_date(self.startdate)
chunk = 1
while current_date < chunk_start:
current_date = chunk_end_date(current_date, self.experiment.chunk_size, 'month', self.experiment.calendar)
chunk += 1
if current_date != chunk_start:
raise Exception('File {0} start date is not a valid chunk start date'.format(file_path))
return chunk
def _add_coordinate_variables(self, file_path, temp):
Utils.copy_variable(handler, handler_cmor, self.lon_name, False)
Utils.copy_variable(handler, handler_cmor, self.lat_name, False)
if 'time' in handler_cmor.dimensions.keys():
Utils.copy_variable(handler, handler_cmor, 'leadtime', False)
handler_cmor.close()
Javier Vegas-Regidor
committed
@staticmethod
def _rename_level_variables(temp, var_cmor):
if var_cmor.domain == ModelingRealms.ocean:
Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev',
'depth': 'lev'}, False)
if var_cmor.domain in [ModelingRealms.landIce, ModelingRealms.land]:
Utils.rename_variables(temp, {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth',
'depth_4': 'sdepth'}, False)
if var_cmor.domain == ModelingRealms.atmos:
Utils.rename_variables(temp, {'depth': 'plev'}, False)
def _merge_grib_files(self, current_month, prev_gribfile, gribfile):
Log.info('Merging data from different files...')
temp = TempFile.get(suffix='.grb')
Utils.cdo.selmon(current_month.month, input=prev_gribfile, output=temp)
Utils.cdo.mergetime(input=[temp, gribfile],
output=self.path_icm)
os.remove(prev_gribfile)
os.remove(temp)
def _ungrib_vars(self, gribfile, month, frequency):
Log.info('Preparing {0} variables'.format(frequency))
var_codes = self.config.cmor.get_variables(frequency)
for var_code in var_codes:
file_path = '{0}_{1}.128.nc'.format(gribfile, var_code)
if not os.path.exists(file_path):
cube = iris.load_cube(file_path)
cube = self._fix_time_coord(cube, var_code)
cube = cube.extract(iris.Constraint(month_number=month))
cube= self._change_units(cube, var_code)
cube = self._get_time_average(cube, frequency, var_code)
levels = self.config.cmor.get_levels(frequency, var_code)
if levels:
cube = cube.extract(level=levels)
if cube.var_name.endswith('_2'):
cube.var_name = cube.var_name[:-2]
out_file = '{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency)
cube.remove_coord('month_number')
cube.remove_coord('day_of_month')
iris.save(cube, out_file, zlib=True)
def _fix_time_coord(self, cube, var_code):
time=cube.coord('time')
target_units = 'days since 1950-01-01 00:00:00'.format(parse_date(self.startdate))
time.convert_units(cf_units.Unit(target_units, calendar=time.units.calendar))
time.units = target_units
if var_code in (144, 146, 147, 169, 175, 176, 177, 179, 180, 181, 182, 201, 202, 205, 212, 228):
time.points = time.points - (self.experiment.atmos_timestep / 24.0)
iris.coord_categorisation.add_day_of_month(cube, 'time')
iris.coord_categorisation.add_month_number(cube, 'time')
return cube
def _get_time_average(cube, frequency, var_code):
if frequency == Frequencies.monthly:
if var_code == 201:
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MAX)
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MIN)
cube = cube.aggregated_by(['month_number'], iris.analysis.MEAN)
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MAX)
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MIN)
cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MEAN)
return cube
def _change_units(self, cube, var_code):
var_name = cube.var_name
cube = cube / 9.81
cube.units = 'm'
elif var_code in (146, 147, 169, 175, 176, 177, 179, 212):
# radiation
cube = cube / (self.experiment.atmos_timestep * 3600)
cube.units = 'W m-2'
elif var_code in (180, 181):
# momentum flux
cube = cube / (self.experiment.atmos_timestep * 3600)
cube.units = 'N m-2'
elif var_code in (144, 182, 205, 228):
# precipitation/evaporation/runoff
cube = cube * 1000 / (self.experiment.atmos_timestep * 3600)
cube.units = 'kg m-2 s-1'
cube.var_name = var_name
return cube
def _merge_and_cmorize_atmos(self, chunk_start, chunk_end, grid, frequency):
merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid)
files = glob.glob(os.path.join(self.config.scratch_dir,
'{0}_*_{1}.nc'.format(self._get_grib_filename(grid, chunk_start), frequency)))
def _load_cube(cube, field, filename):
if 'history' in cube.attributes:
del cube.attributes['history']
var_files = []
current_month = chunk_start
while current_month < chunk_end:
var_files.append(first_file.replace('+{0}.grb'.format(date2str(chunk_start)[:-2]),
'+{0}.grb'.format(date2str(current_month)[:-2])))
current_month = add_months(current_month, 1, self.experiment.calendar)
var_cubes = iris.load(var_files, callback=_load_cube)
iris.util.unify_time_units(var_cubes)
var_cube = var_cubes.concatenate_cube()
iris.save(var_cube, merged_file, zlib=True)
for var_file in var_files:
os.remove(var_file)
self._cmorize_nc_file(merged_file)
def _update_time_variables(self, handler):
time_var = handler.variables['time']
if hasattr(time_var, 'calendar'):
calendar = time_var.calendar
else:
calendar = 'standard'
Javier Vegas-Regidor
committed
if "time_bnds" in handler.variables:
time_var.bounds = "time_bnds"
handler.variables['time_bnds'].units = time_var.units
Utils.convert_units(handler.variables['time_bnds'], 'days since 1850-01-01 00:00:00', calendar, calendar)
Utils.convert_units(time_var, 'days since 1850-1-1 00:00:00', calendar)
if 'leadtime' in handler.variables:
var = handler.variables['leadtime']
else:
var = handler.createVariable('leadtime', float, 'time')
var.units = "days"
var.long_name = "Time elapsed since the start of the forecast"
var.standard_name = "forecast_period"
leadtime = Utils.get_datetime_from_netcdf(handler)
startdate = parse_date(self.startdate)
leadtime = [datetime(time.year, time.month, time.day, time.hour, time.minute, time.second) - startdate
for time in leadtime]
for lt, lead in enumerate(leadtime):
var[lt] = lead.days
def _add_common_attributes(self, handler, frequency):
cmor = self.config.cmor
experiment = self.config.experiment
handler.associated_experiment = cmor.associated_experiment
handler.batch = '{0}{1}'.format(experiment.institute, datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)'))
Javier Vegas-Regidor
committed
handler.contact = 'Pierre-Antoine Bretonniere, pierre-antoine.bretonniere@bsc.es , ' \
'Javier Vegas-Regidor, javier.vegas@bsc.es '
handler.Conventions = 'CF-1.6'
handler.creation_date = datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)')
handler.experiment_id = experiment.experiment_name
startdate = parse_date(self.startdate)
Javier Vegas-Regidor
committed
handler.forecast_reference_time = '{0.year}-{0.month:02}-{0.day:02}' \
'(T{0.hour:02}:{0.minute:02}:{0.second:02}Z)'.format(startdate)
handler.frequency = frequency.frequency
handler.institute_id = experiment.institute
handler.institution = experiment.institute
handler.initialization_method = cmor.initialization_method
handler.initialization_description = cmor.initialization_description
handler.physics_version = cmor.physics_version
handler.physics_description = cmor.physics_description
handler.model_id = experiment.model
handler.associated_model = cmor.associated_model
handler.project_id = self.config.data_convention.upper()
handler.realization = str(self.member + 1)
handler.source = cmor.source
handler.startdate = 'S{0}'.format(self.startdate)
handler.tracking_id = str(uuid.uuid1())
handler.title = "{0} model output prepared for {2} {1}".format(experiment.model, experiment.experiment_name,
self.config.data_convention.upper())
grb_path = os.path.join(self.original_files_path, '*.grb')
gribfiles = glob.glob(grb_path)
return len(gribfiles) > 0
def _cmorization_required(self, chunk, domains):
if not self.config.cmor.chunk_cmorization_requested(chunk):
return False
if self.config.cmor.force:
return True
for domain in domains:
if self.data_manager.is_cmorized(self.startdate, self.member, chunk, domain):
return False
return True
"""Exception to be launched when an error is encountered during cmorization"""