Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
from datetime import datetime
import netCDF4
import numpy as np
import os
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day, add_months, \
date2str
from earthdiagnostics.constants import Basins
"""
Class to manage the data repositories
Javier Vegas-Regidor
committed
:param config:
:type config: Config
Javier Vegas-Regidor
committed
def __init__(self, config):
self.config = config
self.experiment = config.experiment
Javier Vegas-Regidor
committed
Variable.load_variables()
# noinspection PyPep8Naming
Javier Vegas-Regidor
committed
def prepare_CMOR_files(self):
"""
Prepares the data to be used by the diagnostic.
If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
will be launched
If CMOR data is available but packed, the procedure will unpack it.
:return:
"""
Javier Vegas-Regidor
committed
errors = list()
Javier Vegas-Regidor
committed
for startdate, member in self.experiment.get_member_list():
member_str = self.experiment.get_member_str(member)
if self.config.cmor.force or not self._is_cmorized(startdate, member):
Javier Vegas-Regidor
committed
created = True
Log.info('CMORizing startdate {0} member {1}', startdate, member_str)
Javier Vegas-Regidor
committed
if self.config.cmor.ocean:
Javier Vegas-Regidor
committed
errors += self._unpack_ocean_files('MMO', startdate, member)
errors += self._unpack_ocean_files('diags', startdate, member)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
if not self.config.cmor.atmosphere:
Javier Vegas-Regidor
committed
continue
Javier Vegas-Regidor
committed
grb_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate,
Javier Vegas-Regidor
committed
member_str, 'outputs', '*.grb')
gribfiles = glob.glob(grb_path)
if len(gribfiles) == 0:
tar_files = glob.glob(
Javier Vegas-Regidor
committed
os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate,
member_str, 'outputs', 'MMA*'))
tar_files.sort()
count = 1
for tarfile in tar_files:
Log.info('Unpacking atmospheric file {0}/{1}'.format(count, len(tar_files)))
Javier Vegas-Regidor
committed
errors += self._unpack_tar(tarfile, startdate, member)
Log.result('Atmospheric file {0}/{1} finished'.format(count, len(tar_files)))
Javier Vegas-Regidor
committed
else:
self._cmorize_grib(startdate, member)
Javier Vegas-Regidor
committed
Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str)
Javier Vegas-Regidor
committed
for error in errors:
Log.error('File {0} could not be unzipped.', error)
Javier Vegas-Regidor
committed
for startdate, member in self.experiment.get_member_list():
member_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
Javier Vegas-Regidor
committed
Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))
Javier Vegas-Regidor
committed
filepaths = glob.glob(os.path.join(member_path, '*.gz'))
Javier Vegas-Regidor
committed
if len(filepaths) == 0:
continue
self._unpack_cmorfiles(filepaths, member_path)
def _unpack_ocean_files(self, prefix, startdate, member):
Javier Vegas-Regidor
committed
tar_folder = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate,
self.experiment.get_member_str(member), 'outputs', '{0}*'.format(prefix))
tar_files = glob.glob(tar_folder)
tar_files.sort()
Javier Vegas-Regidor
committed
errors = list()
count = 1
for tarfile in tar_files:
Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files)))
Javier Vegas-Regidor
committed
errors += self._unpack_tar(tarfile, startdate, member)
Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files)))
return errors
def _get_grib_filename(self, grid, month):
Javier Vegas-Regidor
committed
return 'ICM{0}{1}+{2}.grb'.format(grid, self.experiment.expid, date2str(month)[:-2])
def _cmorize_grib(self, startdate, member):
count = 1
chunk_start = parse_date(startdate)
Javier Vegas-Regidor
committed
member_str = self.experiment.get_member_str(member)
data_folder = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate,
member_str, 'outputs')
while os.path.exists(os.path.join(data_folder, self._get_grib_filename('GG', chunk_start))) or \
os.path.exists(os.path.join(data_folder, self._get_grib_filename('SH', chunk_start))):
Javier Vegas-Regidor
committed
chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
Log.info('CMORizing chunk {0}-{1}', date2str(chunk_start), date2str(chunk_end))
Javier Vegas-Regidor
committed
for grid in ('SH', 'GG'):
Log.info('Processing {0} variables', grid)
if not os.path.exists(os.path.join(data_folder, self._get_grib_filename(grid, chunk_start))):
continue
Javier Vegas-Regidor
committed
for month in range(0, self.experiment.chunk_size):
current_month = add_months(chunk_start, month, 'standard')
original_gribfile = os.path.join(data_folder, self._get_grib_filename(grid, current_month))
Log.info('Processing month {1}', grid, date2str(current_month))
Javier Vegas-Regidor
committed
gribfile = os.path.join(self.config.scratch_dir, os.path.basename(original_gribfile))
if not os.path.isfile(gribfile):
Javier Vegas-Regidor
committed
Log.info('Copying file...', grid, date2str(current_month))
shutil.copy(original_gribfile, gribfile)
if atmos_timestep is None:
atmos_timestep = self._get_atmos_timestep(gribfile)
Javier Vegas-Regidor
committed
prev_gribfile = os.path.join(self.config.scratch_dir,
self._get_grib_filename(grid,
add_months(current_month, -1, 'standard')))
if os.path.exists(prev_gribfile):
Javier Vegas-Regidor
committed
self._merge_grib_files(current_month, prev_gribfile, gribfile)
full_file = 'ICM'
else:
full_file = gribfile
Log.info('Unpacking... ')
# remap on regular Gauss grid
if grid == 'SH':
Javier Vegas-Regidor
committed
Utils.cdo.splitparam(input='-sp2gpl {0}'.format(full_file), output=gribfile + '_',
options='-f nc4')
Javier Vegas-Regidor
committed
Utils.cdo.splitparam(input=full_file, output=gribfile + '_', options='-R -f nc4')
# total precipitation (remove negative values)
Utils.cdo.setcode(228, input='-setmisstoc,0 -setvrange,0,Inf -add '
Javier Vegas-Regidor
committed
'{0}_{{142,143}}.128.nc'.format(gribfile),
output='{0}_228.128.nc'.format(gribfile))
Javier Vegas-Regidor
committed
Utils.remove_file('ICM')
next_gribfile = os.path.join(data_folder,
self._get_grib_filename(grid,
add_months(current_month, 1, 'standard')))
if not os.path.exists(next_gribfile):
os.remove(gribfile)
cdo_reftime = parse_date(startdate).strftime('%Y-%m-%d,00:00')
self._ungrib_vars(cdo_reftime, gribfile, current_month.month, '{0}hr'.format(atmos_timestep))
self._ungrib_vars(cdo_reftime, gribfile, current_month.month, '1d')
self._ungrib_vars(cdo_reftime, gribfile, current_month.month, '1m')
Javier Vegas-Regidor
committed
for splited_file in glob.glob('{0}_*.128.nc'.format(gribfile)):
os.remove(splited_file)
Javier Vegas-Regidor
committed
Log.result('Month {0}, {1} variables finished', date2str(current_month), grid)
count += 1
self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, grid, '1m')
self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, grid, '1d')
self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, grid,
'{0}hr'.format(atmos_timestep))
Javier Vegas-Regidor
committed
chunk_start = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
Javier Vegas-Regidor
committed
def _merge_grib_files(current_month, prev_gribfile, gribfile):
Javier Vegas-Regidor
committed
Log.info('Merging data from different files...')
fd = open('rules_files', 'w')
fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(current_month))
fd.close()
# get first timestep for each month from previous file (if possible)
if os.path.exists('ICM'):
os.remove('ICM')
Utils.execute_shell_command('grib_filter -o ICM rules_files '
Javier Vegas-Regidor
committed
'{0} {1}'.format(os.path.basename(prev_gribfile),
os.path.basename(gribfile)))
Javier Vegas-Regidor
committed
os.remove('rules_files')
Javier Vegas-Regidor
committed
Utils.remove_file(prev_gribfile)
Javier Vegas-Regidor
committed
def _get_atmos_timestep(self, gribfile):
Javier Vegas-Regidor
committed
Log.info('Getting timestep...')
grib_handler = pygrib.open(gribfile)
mes1 = grib_handler.message(1)
mes2 = grib_handler.readline()
while mes2.analDate == mes1.analDate:
Javier Vegas-Regidor
committed
mes2 = grib_handler.readline()
atmos_timestep = mes2.analDate - mes1.analDate
atmos_timestep = int(atmos_timestep.total_seconds() / 3600)
self.atmos_timestep = atmos_timestep
Javier Vegas-Regidor
committed
grib_handler.close()
Javier Vegas-Regidor
committed
def _ungrib_vars(self, cdo_reftime, gribfile, month, frequency):
Log.info('Preparing {0} variables'.format(frequency))
var_codes = self.config.cmor.get_variables(frequency)
Javier Vegas-Regidor
committed
for var_code in var_codes:
if not os.path.exists('{0}_{1}.128.nc'.format(gribfile, var_code)):
Javier Vegas-Regidor
committed
continue
new_units = None
cdo_operator = '-selmon,{0}'.format(month)
if frequency in ('month', 'monthly', 'mon', '1m'):
Javier Vegas-Regidor
committed
if var_code == 201:
cdo_operator = "-monmean -daymax {0}".format(cdo_operator)
elif var_code == 202:
cdo_operator = "-monmean -daymax {0}".format(cdo_operator)
else:
cdo_operator = "-monmean {0} ".format(cdo_operator)
elif frequency in ('day', 'daily', '1d'):
Javier Vegas-Regidor
committed
if var_code == 201:
cdo_operator = "-daymax {0} ".format(cdo_operator)
elif var_code == 202:
cdo_operator = "-daymin {0} ".format(cdo_operator)
else:
cdo_operator = "-daymean {0} ".format(cdo_operator)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
if var_code in (144, 146, 147, 169, 175, 176, 177, 179, 180, 181, 182, 201, 202, 205, 212, 228):
cdo_operator = '{0} -shifttime,-{1}hours'.format(cdo_operator, self.atmos_timestep)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
if var_code == 129:
# geopotential
new_units = "m"
cdo_operator = "-divc,9.81 {0}".format(cdo_operator)
elif var_code in (146, 147, 169, 175, 176, 177, 179, 212):
# radiation
Javier Vegas-Regidor
committed
new_units = "W m-2"
cdo_operator = "-divc,{0} {1}".format(self.atmos_timestep * 3600, cdo_operator)
Javier Vegas-Regidor
committed
elif var_code in (180, 181):
Javier Vegas-Regidor
committed
# momentum flux
new_units = "N m-2"
cdo_operator = "-divc,{0} {1}".format(self.atmos_timestep * 3600, cdo_operator)
Javier Vegas-Regidor
committed
elif var_code in (144, 182, 205, 228):
Javier Vegas-Regidor
committed
# precipitation/evaporation/runoff
new_units = "kg m-2 s-1"
cdo_operator = "-mulc,1000 -divc,{0}".format(self.atmos_timestep * 3600)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
levels = self.config.cmor.get_levels(frequency, var_code)
if levels:
cdo_operator = "{0} -sellevel,{1}".format(cdo_operator, levels)
Utils.execute_shell_command('cdo -t ecmwf setreftime,{0} '
Javier Vegas-Regidor
committed
'{1} {2}_{3}.128.nc '
'{2}_{3}_{4}.nc'.format(cdo_reftime, cdo_operator,
gribfile, var_code, frequency))
h_var_file = '{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency)
handler = Utils.openCdf(h_var_file)
Javier Vegas-Regidor
committed
if new_units:
for var in handler.variables.values():
Javier Vegas-Regidor
committed
if 'code' in var.ncattrs() and var.code == var_code:
Javier Vegas-Regidor
committed
var.units = new_units
break
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
var_name = None
for key in handler.variables.keys():
if key + '_2' in handler.variables and key not in handler.dimensions:
var_name = key
handler.close()
Javier Vegas-Regidor
committed
if var_name is not None:
Javier Vegas-Regidor
committed
Utils.nco.ncks(input='{0}_{1}_1m.nc'.format(gribfile, var_code),
output='{0}_{1}_1m.nc'.format(gribfile, var_code),
Javier Vegas-Regidor
committed
options='-O -v {0}'.format(var_name))
Javier Vegas-Regidor
committed
def _merge_and_cmorize_atmos(self, startdate, member, chunk_start, chunk_end, grid, frequency):
merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid)
Javier Vegas-Regidor
committed
files = glob.glob(os.path.join(self.config.scratch_dir,
Javier Vegas-Regidor
committed
'{0}_*_{1}.nc'.format(self._get_grib_filename(grid, chunk_start), frequency)))
for first_file in files:
shutil.move(first_file, merged_file)
current_month = add_months(chunk_start, 1, 'standard')
while current_month < chunk_end:
month_file = first_file.replace('+{0}.grb'.format(date2str(chunk_start)[:-2]),
'+{0}.grb'.format(date2str(current_month)[:-2]))
Javier Vegas-Regidor
committed
Utils.concat_variables(month_file, merged_file, True)
current_month = add_months(current_month, 1, 'standard')
Javier Vegas-Regidor
committed
self._cmorize_nc_file(merged_file, member, startdate)
def _unpack_cmorfiles(self, filepaths, member_path):
threads = list()
numthreads = Utils.available_cpu_count()
for numthread in range(0, numthreads):
t = threading.Thread(target=DataManager._unzip,
args=([filepaths[numthread::numthreads]]))
threads.append(t)
t.start()
for t in threads:
t.join()
filepaths = glob.glob(os.path.join(member_path, '*.tar')).sort()
for numthread in range(0, numthreads):
t = threading.Thread(target=DataManager._untar,
args=(filepaths[numthread::numthreads], member_path))
threads.append(t)
t.start()
for t in threads:
t.join()
if self.experiment.experiment_name != self.experiment.model:
bad_path = os.path.join(member_path, self.experiment.institute, self.experiment.model,
self.experiment.model)
for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_output_'.format(self.experiment.model),
'_{0}_{1}_'.format(self.experiment.model, self.experiment.experiment_name))
good = good.replace('/{0}/{0}'.format(self.experiment.model),
'/{0}/{1}'.format(self.experiment.model,
self.experiment.experiment_name))
Utils.move_file(filepath, good)
os.rmdir(dirpath)
good_dir = os.path.join(member_path, self.experiment.institute, self.experiment.model,
self.experiment.experiment_name)
for sdate in os.listdir(good_dir):
for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_{1}_r'.format(self.experiment.model,
self.experiment.experiment_name, sdate),
'_{0}_{1}_{2}_r'.format(self.experiment.model,
self.experiment.experiment_name, sdate))
if good != filepath:
Log.info('Moving {0} to {1}'.format(filename, good))
Utils.move_file(filepath, good)
def _unpack_tar(self, tarfile, startdate, member):
Javier Vegas-Regidor
committed
scratch_dir = os.path.join(self.config.scratch_dir, 'CMOR')
if os.path.exists(scratch_dir):
shutil.rmtree(scratch_dir)
os.makedirs(scratch_dir)
Javier Vegas-Regidor
committed
errors = self._unzip(glob.glob(os.path.join(scratch_dir, '*.gz')))
if os.path.basename(tarfile).startswith('MMA'):
for filename in glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')):
Utils.cdo.sp2gpl(options='-O', input=filename, output=temp)
shutil.move(temp, filename)
sh_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc'))
Utils.cdo.mergetime(input=sh_files, output=os.path.join(scratch_dir, 'sh.nc'))
gg_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_GG_*.nc'))
Utils.cdo.mergetime(input=gg_files, output=os.path.join(scratch_dir, 'gg.nc'))
for filename in sh_files + gg_files:
os.remove(filename)
Utils.nco.ncks(input=os.path.join(scratch_dir, 'sh.nc'),
output=os.path.join(scratch_dir, 'gg.nc'), options='-A')
os.remove(os.path.join(scratch_dir, 'sh.nc'))
tar_startdate = tarfile[0:-4].split('_')[5].split('-')
new_name = 'MMA_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate)
shutil.move(os.path.join(scratch_dir, 'gg.nc'), os.path.join(scratch_dir, new_name))
for filename in glob.glob(os.path.join(scratch_dir, '*.nc')):
self._cmorize_nc_file(filename, member, startdate)
Javier Vegas-Regidor
committed
return errors
def _cmorize_nc_file(self, filename, member, startdate):
Log.info('Processing file {0}', filename)
temp = TempFile.get()
Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filename, temp])
shutil.move(temp, filename)
file_parts = os.path.basename(filename).split('_')
Javier Vegas-Regidor
committed
if self.experiment.expid in [file_parts[1], file_parts[2]]:
frequency = 'm'
else:
frequency = file_parts[1][1].lower()
variables = dict()
variables['time_counter'] = 'time'
variables['time_counter_bnds'] = 'time_bnds'
variables['tbnds'] = 'bnds'
variables['nav_lat'] = 'lat'
variables['nav_lon'] = 'lon'
variables['x'] = 'i'
variables['y'] = 'j'
Utils.rename_variables(filename, variables, False, True)
handler = Utils.openCdf(filename)
self._add_common_attributes(frequency, handler, member, startdate)
self._update_time_variables(handler, startdate)
handler.sync()
temp = TempFile.get()
Log.info('Splitting file {0}', filename)
for variable in handler.variables.keys():
if variable in ('lon', 'lat', 'time', 'time_bnds', 'leadtime', 'lev', 'icethi',
'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds',
'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds',
'time_counter_bounds', 'ncatice',
'nav_lat_grid_V', 'nav_lat_grid_U', 'nav_lat_grid_T',
'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T',
'depth', 'depth_2', 'depth_3', 'depth_4',
'mlev', 'hyai', 'hybi', 'hyam', 'hybm'):
Javier Vegas-Regidor
committed
self.extract_variable(filename, handler, frequency, member, startdate, temp, variable)
Log.result('File {0} cmorized!', filename)
handler.close()
os.remove(filename)
def extract_variable(self, file_path, handler, frequency, member, startdate, temp, variable):
"""
Extracts a variable from a file and creates the CMOR file
:param file_path: path to the file
:type file_path: str
:param handler: netCDF4 handler for the file
:type handler: netCDF$.Dataset
:param frequency: variable's frequency
:type frequency: str
:param member: member
:type member: int
:param startdate: startdate
:type startdate: str
:param temp: temporal file to use
:type temp: str
:param variable: variable's name
:type variable: str
"""
file_parts = os.path.basename(file_path).split('_')
var_cmor = Variable.get_variable(variable)
if var_cmor is None:
return
if frequency == 'd':
frequency = 'day'
elif frequency == 'm':
frequency = 'mon'
elif frequency == 'h':
frequency = '6hr'
else:
raise Exception('Frequency {0} not supported'.format(frequency))
Utils.nco.ncks(input=file_path, output=temp, options='-v {0}'.format(variable))
if var_cmor.domain == 'ocean':
Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev',
'depth': 'lev'}, False, True)
elif var_cmor.domain in ('land', 'landIce'):
Utils.rename_variables(temp, {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth',
'depth_4': 'sdepth'}, False, True)
elif var_cmor.domain == 'atmos':
Utils.rename_variables(temp, {'depth': 'plev'}, False, True)
handler_cmor = Utils.openCdf(temp)
Utils.copy_variable(handler, handler_cmor, 'lon', False)
Utils.copy_variable(handler, handler_cmor, 'lat', False)
if 'time' in handler_cmor.dimensions.keys():
Utils.copy_variable(handler, handler_cmor, 'leadtime', False)
handler_cmor.close()
if var_cmor.basin is None:
region = None
else:
region = var_cmor.basin.fullname
Javier Vegas-Regidor
committed
if file_parts[0] == self.experiment.expid or file_parts[0].startswith('ORCA') or \
file_parts[0] in ('MMA', 'MMO'):
# Model output
date_str = '{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6])
Javier Vegas-Regidor
committed
elif file_parts[1] == self.experiment.expid:
# Files generated by the old version of the diagnostics
date_str = '{0}-{1}'.format(file_parts[4][0:6], file_parts[5][0:6])
else:
Javier Vegas-Regidor
committed
Log.error('Variable {0} can not be cmorized. Original filename does not match a recognized pattern',
var_cmor.short_name)
self.send_file(temp, var_cmor.domain, var_cmor.short_name, startdate, member,
frequency=frequency, rename_var=variable,
date_str=date_str,
region=region)
@staticmethod
def _update_time_variables(handler, startdate):
time_var = handler.variables['time']
times = Utils.get_datetime_from_netcdf(handler)
if type(times[0]) is not datetime:
for x in range(0, times.shape[0]):
Javier Vegas-Regidor
committed
# noinspection PyProtectedMember
times[x] = times[x]._to_real_datetime()
time_var[:] = netCDF4.date2num(times, 'days since 1850-01-01', 'standard')
if 'axis_nbounds' in handler.dimensions:
Javier Vegas-Regidor
committed
handler.renameDimension('axis_nbounds', 'bnds')
if 'time_counter_bounds' in handler.variables:
handler.renameVariable('time_counter_bounds', 'time_bnds')
handler.sync()
if 'time_bnds' in handler.variables:
time_bounds_var = handler.variables['time_bnds']
time_var.bounds = "time_bnds"
time_bounds = Utils.get_datetime_from_netcdf(handler, 'time_bnds')
Javier Vegas-Regidor
committed
if type(time_bounds[0, 0]) is not datetime:
for x in range(0, time_bounds.shape[0]):
for y in range(0, time_bounds.shape[1]):
Javier Vegas-Regidor
committed
# noinspection PyProtectedMember
time_bounds[x, y] = time_bounds[x, y]._to_real_datetime()
time_bounds_var[:] = netCDF4.date2num(time_bounds, 'days since 1850-01-01', 'standard')
time_var.units = 'days since 1850-01-01'
time_var.time_origin = "1850-01-01"
time_var.calendar = 'standard'
time_var.long_name = "Verification time of the forecast"
time_var.standard_name = "time"
time_var.axis = "T"
if 'leadtime' in handler.variables:
var = handler.variables['leadtime']
else:
var = handler.createVariable('leadtime', float, 'time')
var.units = "days"
var.long_name = "Time elapsed since the start of the forecast"
var.standard_name = "forecast_period"
leadtime = (Utils.get_datetime_from_netcdf(handler) - parse_date(startdate))
for lt in range(0, leadtime.shape[0]):
var[lt] = leadtime[lt].days
def _add_common_attributes(self, frequency, handler, member, startdate):
Javier Vegas-Regidor
committed
cmor = self.config.cmor
experiment = self.config.experiment
Javier Vegas-Regidor
committed
handler.associated_experiment = cmor.associated_experiment
handler.batch = '{0}{1}'.format(experiment.institute, datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)'))
handler.contact = 'Pierre-Antoine Bretonnière, pierre-antoine.bretonniere@bsc.es , ' \
'Javier Vegas-Regidor, javier.vegas@bsc.es '
handler.Conventions = 'CF-1.6'
handler.creation_date = datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)')
handler.experiment_id = experiment.experiment_name
handler.forecast_reference_time = parse_date(startdate).strftime('%Y-%m-%d(T%H:%M:%SZ)')
handler.institute_id = experiment.institute
handler.institution = experiment.institute
Javier Vegas-Regidor
committed
handler.initialization_method = cmor.initialization_method
handler.initialization_description = cmor.initialization_description
handler.physics_version = cmor.physics_version
handler.physics_description = cmor.physics_description
handler.model_id = experiment.model
Javier Vegas-Regidor
committed
handler.associated_model = cmor.associated_model
Javier Vegas-Regidor
committed
handler.source = cmor.source
handler.startdate = 'S{0}'.format(startdate)
handler.tracking_id = str(uuid.uuid1())
handler.title = "{0} model output prepared for SPECS {1}".format(experiment.model, experiment.experiment_name)
Javier Vegas-Regidor
committed
@staticmethod
def _unzip(files):
Javier Vegas-Regidor
committed
errors = list()
Javier Vegas-Regidor
committed
for filepath in files:
Log.debug('Unzipping {0}', filepath)
Javier Vegas-Regidor
committed
try:
Utils.execute_shell_command('gunzip {0}'.format(filepath))
except Exception as ex:
Log.error('Can not unzip {0}: {1}', filepath, ex)
errors.append(filepath)
return errors
Javier Vegas-Regidor
committed
@staticmethod
def _untar(files, member_path):
for filepath in files:
Log.debug('Unpacking {0}', filepath)
Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
if not frequency:
Javier Vegas-Regidor
committed
frequency = self.config.frequency
Javier Vegas-Regidor
committed
domain = DataManager.correct_domain(domain)
domain_abbreviation = self.domain_abbreviation(domain, frequency)
Javier Vegas-Regidor
committed
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.get_startdate_path(startdate), frequency, domain)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
Javier Vegas-Regidor
committed
chunk_end = previous_day(chunk_end, 'standard')
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}_{3}_{4}_S{5}_r{6}i1p1_'
Javier Vegas-Regidor
committed
'{7}-{8}.nc'.format(var, domain_abbreviation, frequency,
self.experiment.model,
self.experiment.experiment_name, startdate, member_plus,
Javier Vegas-Regidor
committed
"{0:04}{1:02}".format(chunk_start.year,
chunk_start.month),
"{0:04}{1:02}".format(chunk_end.year,
chunk_end.month)))
temp_path = TempFile.get()
shutil.copyfile(filepath, temp_path)
return temp_path
def get_startdate_path(self, startdate):
"""
Returns the path to the startdate's CMOR folder
:param startdate: target startdate
:type startdate: str
:return: path to the startdate's CMOR folder
:rtype: str
"""
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
self.experiment.model, self.experiment.experiment_name, 'S' + startdate)
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
rename_var=None, frequency=None, year=None, date_str=None):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
cmor_var = Variable.get_variable(var)
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
if rename_var:
Utils.rename_variable(filetosend, rename_var, var)
elif original_var != var:
Utils.rename_variable(filetosend, original_var, var)
if not frequency:
Javier Vegas-Regidor
committed
frequency = self.config.frequency
Javier Vegas-Regidor
committed
domain = DataManager.correct_domain(domain)
domain_abreviattion = self.domain_abbreviation(domain, frequency)
Javier Vegas-Regidor
committed
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.get_startdate_path(startdate), frequency, domain)
Javier Vegas-Regidor
committed
if chunk is not None:
Javier Vegas-Regidor
committed
chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
chunk_end.month)
elif year is not None:
if frequency is not 'yr':
raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
time_bound = str(year)
elif date_str is not None:
time_bound = date_str
else:
raise ValueError('Chunk and year can not be None at the same time')
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_'
'{6}.nc'.format(var, domain_abreviattion, self.experiment.model,
self.experiment.experiment_name,
Javier Vegas-Regidor
committed
startdate, member_plus, time_bound))
Javier Vegas-Regidor
committed
if region:
Utils.convert2netcdf4(filetosend)
if not os.path.exists(filepath):
handler = Utils.openCdf(filetosend)
handler.createDimension('region')
Javier Vegas-Regidor
committed
var_region = handler.createVariable('region', str, 'region')
var_region[0] = region
original_var = handler.variables[var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
Utils.rename_variable(filetosend, 'new_var', var)
else:
temp = TempFile.get()
shutil.copyfile(filepath, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(filetosend)
value = handler_send.variables[var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, filetosend)
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region')
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
temp = TempFile.get()
Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetosend, temp])
shutil.move(temp, filetosend)
if cmor_var:
handler = Utils.openCdf(filetosend)
var_handler = handler.variables[var]
var_handler.standard_name = cmor_var.standard_name
var_handler.long_name = cmor_var.long_name
var_handler.short_name = cmor_var.short_name
Javier Vegas-Regidor
committed
var_type = var_handler.dtype
handler.modeling_realm = cmor_var.domain
handler.table_id = 'Table {0} (December 2013)'.format(self.domain_abbreviation(cmor_var.domain, frequency))
Javier Vegas-Regidor
committed
if cmor_var.units:
if 'units' in var_handler.ncattrs():
if var_handler.units == 'PSU':
var_handler.units = 'psu'
if var_handler.units == 'C' and cmor_var.units == 'K':
var_handler.units = 'deg_C'
Javier Vegas-Regidor
committed
if cmor_var.units != var_handler.units:
try:
new_unit = Units(cmor_var.units)
old_unit = Units(var_handler.units)
var_handler[:] = Units.conform(var_handler[:], old_unit, new_unit, inplace=True)
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = Units.conform(float(var_handler.valid_min), old_unit, new_unit,
inplace=True)
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = Units.conform(float(var_handler.valid_max), old_unit, new_unit,
inplace=True)
except ValueError:
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
cmor_var.units)
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
Javier Vegas-Regidor
committed
handler.sync()
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if domain == 'ocean':
handler.variables['lev'].standard_name = 'depth'
if 'lon' in handler.variables:
handler.variables['lon'].short_name = 'lon'
handler.variables['lon'].standard_name = 'longitude'
if 'lat' in handler.variables:
handler.variables['lat'].short_name = 'lat'
handler.variables['lat'].standard_name = 'latitude'
handler.close()
Javier Vegas-Regidor
committed
if cmor_var.valid_min != '':
valid_min = '-a valid_min, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_min)
else:
valid_min = ''
if cmor_var.valid_max != '':
valid_max = '-a valid_max, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_max)
else:
valid_max = ''
Utils.nco.ncatted(input=filetosend, output=filetosend,
options='-O -a _FillValue,{0},o,{1},"1.e20" '
Javier Vegas-Regidor
committed
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(var, var_type.char,
valid_min, valid_max))
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = 'lat'
variables['nav_lon_grid_V'] = 'lon'
variables['nav_lat_grid_U'] = 'lat'
variables['nav_lon_grid_U'] = 'lon'
variables['nav_lat_grid_T'] = 'lat'
variables['nav_lon_grid_T'] = 'lon'
Utils.rename_variables(filetosend, variables, False, True)
Javier Vegas-Regidor
committed
Utils.move_file(filetosend, filepath)
self._create_link(domain, filepath, frequency, var, grid)
Javier Vegas-Regidor
committed
@staticmethod
def correct_domain(domain):
"""
Corrects domain capitalization
:param domain: domain name
:type domain: str
:return: domain with correct capitalization
:rtype: str
"""
Javier Vegas-Regidor
committed
domain = domain.lower()
if domain == 'seaice':
return 'seaIce'
elif domain == 'landice':
return 'landIce'
return domain
def _create_link(self, domain, filepath, frequency, var, grid):
if frequency in ('d', 'daily', 'day'):
freq_str = 'daily_mean'
else:
freq_str = 'monthly_mean'
if grid:
var = '{0}-{1}'.format(var, grid)
Javier Vegas-Regidor
committed
variable_folder = '{0}_f{1}h'.format(var, self.experiment.ocean_timestep)
variable_folder = '{0}_f{1}h'.format(var, self.atmos_timestep)
Javier Vegas-Regidor
committed
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
Javier Vegas-Regidor
committed
if not os.path.exists(link_path):
# This can be a race condition
# noinspection PyBroadException
try:
os.makedirs(link_path)
except Exception:
pass
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
os.remove(link_path)
Javier Vegas-Regidor
committed
os.symlink(filepath, link_path)
@staticmethod
def domain_abbreviation(domain, frequency):
"""
Returns the table name for a domain-frequency pair
:param domain: variable's domain
:type domain: str
:param frequency: variable's frequency
:type frequency: str
:return: variable's table name
:rtype: str
"""
if frequency == 'mon':
if domain == 'seaIce':
domain_abreviattion = 'OImon'
elif domain == 'landIce':
domain_abreviattion = 'LImon'
else:
domain_abreviattion = domain[0].upper() + 'mon'
elif frequency == '6hr':
domain_abreviattion = '6hrPlev'
else:
domain_abreviattion = 'day'
return domain_abreviattion
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and
returns the path to the scratch's copy.
:param year: year to retrieve
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:return: path to the copy created on the scratch folder
:rtype: str
"""
chunk_files = list()
Javier Vegas-Regidor
committed
for chunk in self.experiment.get_year_chunks(startdate, year):
chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))
if len(chunk_files) > 1:
temp = TempFile.get()
Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
for chunk_file in chunk_files:
os.remove(chunk_file)
else:
temp = chunk_files[0]
temp2 = TempFile.get()
handler = Utils.openCdf(temp)
time = Utils.get_datetime_from_netcdf(handler)
handler.close()
start = None
end = None
for x in range(0, len(time)):
date = time[x]
if date.year == year:
if date.month == 1:
start = x
elif date.month == 12:
end = x
Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
os.remove(temp)
return temp2
def _is_cmorized(self, startdate, member):
if not os.path.exists(self.get_startdate_path(startdate)):
return False
startdate_path = self.get_startdate_path(startdate)
for freq in os.listdir(startdate_path):
freq_path = os.path.join(startdate_path, freq)
for domain in os.listdir(freq_path):
domain_path = os.path.join(freq_path, domain)
for var in os.listdir(domain_path):
Javier Vegas-Regidor
committed
member_path = os.path.join(domain_path, var, 'r{0}i1p1'.format(member + 1))
if os.path.exists(member_path):
return True
return False
"""
Class to characterize a CMOR variable. It also contains the static method to make the match between thje original
name and the standard name. Requires cmor_table.csv to work.
"""
self.short_name = line[1].strip()
self.standard_name = line[2].strip()
self.long_name = line[3].strip()
self.domain = line[4].strip()
Javier Vegas-Regidor
committed
self.valid_min = line[7].strip()
self.valid_max = line[8].strip()
@classmethod
def get_variable(cls, original_name):
Javier Vegas-Regidor
committed
Returns the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:return: CMOR variable
:rtype: Variable
"""
return cls._dict_variables[original_name.lower()]
Javier Vegas-Regidor
committed
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
Javier Vegas-Regidor
committed
@classmethod
def load_variables(cls):
"""
Loads the cmor_table.csv and creates the variables dictionary
"""
Variable._dict_variables = dict()
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_table.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
Javier Vegas-Regidor
committed
continue
var = Variable(line)
if not var.short_name:
continue
for old_name in line[0].split(':'):
Variable._dict_variables[old_name] = var
Variable._dict_variables[var.short_name] = var
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
Javier Vegas-Regidor
committed
return 1 / conversion.factor, -conversion.offset