Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
import uuid
from datetime import datetime
import netCDF4
import numpy as np
import os
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day, add_months, \
date2str
from earthdiagnostics.constants import Basins
"""
Class to manage the data repositories
:param exp_manager:
:type exp_manager: ExperimentManager
:param institution: CMOR's institution name
:type institution: str
:param model: CMOR's model name
:type model: str
:param expid: experiment identifier
:type expid: str
:param datafolder: Folder containing the data
:type datafolder: str
:param frequency: default frequency
:type frequency: str
:param experiment_name: CMOR's experiment name
:type experiment_name: str
:param scratch_dir: path to scratch folder
:type scratch_dir: str
:param nfrp: sample frequency
:type nfrp: int
:param calendar: experiment's calendar
:type calendar: str
Javier Vegas-Regidor
committed
def __init__(self, exp_manager, institution, model, expid, datafolder, frequency, experiment_name,
scratch_dir, nfrp, calendar='standard'):
self.initialization_method = 'to be filled'
self.initialization_description = 'to be filled'
self.physics_version = 'to be filled'
self.physics_description = 'to be filled'
self.associated_model = 'to be filled'
self.source = 'to be filled'
self.associated_experiment = 'to be filled'
self.institution = institution
self.model = model
self.expid = expid
self.data_dir = datafolder
self.frequency = frequency
self.experiment_name = experiment_name
self.add_startdate = True
self.add_name = True
self.calendar = calendar
self.scratch_dir = scratch_dir
self.nfrp = nfrp
Javier Vegas-Regidor
committed
self.exp_manager = exp_manager
Javier Vegas-Regidor
committed
Variable.load_variables()
# noinspection PyPep8Naming
Javier Vegas-Regidor
committed
def prepare_CMOR_files(self, force_rebuild, ocean, atmosphere):
"""
Prepares the data to be used by the diagnostic.
If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
will be launched
If CMOR data is available but packed, the procedure will unpack it.
:param atmosphere: activates atmosphere files cmorization
:type atmosphere: bool
:param ocean: activates ocean files cmorization
:type ocean: bool
:param force_rebuild: if True, forces the creation of the CMOR files
:type force_rebuild: bool
Javier Vegas-Regidor
committed
for startdate, member in self.exp_manager.get_member_list():
member_str = self.exp_manager.get_member_str(member)
if force_rebuild or not os.path.exists(os.path.join(self.data_dir, self.expid, 'cmorfiles',
startdate, member_str)):
created = True
Log.info('CMORizing startdate {0} member {1}', startdate, member_str)
if ocean:
self._unpack_ocean_files('MMO', startdate, member)
self._unpack_ocean_files('diags', startdate, member)
Javier Vegas-Regidor
committed
if not atmosphere:
continue
grb_path = os.path.join(self.data_dir, self.expid, 'original_files', startdate,
member_str, 'outputs', '*.grb')
gribfiles = glob.glob(grb_path)
if len(gribfiles) == 0:
tar_files = glob.glob(
os.path.join(self.data_dir, self.expid, 'original_files', startdate, member_str, 'outputs',
'MMA*'))
tar_files.sort()
count = 1
for tarfile in tar_files:
Log.info('Unpacking atmospheric file {0}/{1}'.format(count, len(tar_files)))
self._unpack_tar(tarfile, startdate, member)
count += 1
Log.result('Atmospheric file {0}/{1} finished'.format(count, len(tar_files)))
Javier Vegas-Regidor
committed
else:
self._cmorize_grib(startdate, member, gribfiles)
Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str)
Javier Vegas-Regidor
committed
for startdate, member in self.exp_manager.get_member_list():
member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate,
self.exp_manager.get_member_str(member), 'outputs')
Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))
Javier Vegas-Regidor
committed
filepaths = glob.glob(os.path.join(member_path, '*.gz'))
Javier Vegas-Regidor
committed
if len(filepaths) == 0:
continue
self._unpack_cmorfiles(filepaths, member_path)
def _unpack_ocean_files(self, prefix, startdate, member):
tar_folder = os.path.join(self.data_dir, self.expid, 'original_files', startdate,
self.exp_manager.get_member_str(member), 'outputs', '{0}*'.format(prefix))
tar_files = glob.glob(tar_folder)
tar_files.sort()
count = 1
for tarfile in tar_files:
Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files)))
self._unpack_tar(tarfile, startdate, member)
count += 1
Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files)))
def _cmorize_grib(self, startdate, member, gribfiles):
gribfiles.sort()
copied_gribfiles = list()
for gribfile in gribfiles:
shutil.copy(gribfile, os.path.join(self.scratch_dir, os.path.basename(gribfile)))
copied_gribfiles.append(os.path.join(self.scratch_dir, os.path.basename(gribfile)))
count = 1
for gribfile in copied_gribfiles:
Log.info('Unpacking atmospheric grib file {0}/{1}'.format(count, len(copied_gribfiles)))
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
cdo = Utils.cdo
start = parse_date(gribfile[-10:-4])
month = '{0:02}'.format(start.month)
times = cdo.showtimestamp(input=gribfile)
times = times[0].split()[0:2]
time_diff = datetime.strptime(times[1], '%Y-%m-%dT%H:%M:%S') - datetime.strptime(times[0],
'%Y-%m-%dT%H:%M:%S')
nfrp = (time_diff.seconds // 3600)
param_6hr = (151, 167, 168, 164, 165, 166, 129,)
param_day = (167, 165, 166, 151, 164, 168, 169, 177, 179, 228, 201, 202, 130)
param_mon = (167, 201, 202, 165, 166, 151, 144, 228, 205, 182, 164, 146, 147, 176, 169,
177, 175, 212, 141, 180, 181, 179, 168, 243, 129, 130, 131, 132, 133)
grid = os.path.basename(gribfile)[3:5]
if os.path.exists('ICM{0}{1}+{2.year}{2.month:02}.grb'.format(grid, self.expid,
add_months(start, -1,
'standard'))):
fd = open('rules_files', 'w')
fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(start))
fd.close()
# get first timestep for each month from previous file (if possible)
if os.path.exists('ICM'):
os.remove('ICM')
Utils.execute_shell_command('grib_filter -o ICM rules_files '
'ICM{0}{1}+{2.year}{2.month:02}.grb '
'{3}'.format(grid, self.expid,
add_months(start, -1, 'standard'),
gribfile))
os.remove('rules_files')
else:
shutil.copy(gribfile, 'ICM')
# remap on regular Gauss grid
if grid == 'SH':
Utils.cdo.splitparam(input='-sp2gpl ICM', output=gribfile + '_')
else:
Utils.cdo.splitparam(input='ICM', output=gribfile + '_', options='-R')
# total precipitation (remove negative values)
Utils.cdo.setcode(228, input='-setmisstoc,0 -setvrange,0,Inf -add '
'{0}_{{142,143}}.128.grb'.format(gribfile),
output='{0}_228.128.grb'.format(gribfile))
os.remove('ICM')
cdo_reftime = parse_date(startdate).strftime('%Y-%m-%d,00:00')
# daily variables
for param in param_day:
if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)):
continue
new_units = None
if param in (169, 177, 179):
# radiation
new_units = "W m-2"
cdo_operator = "-divc,{0} -daymean -selmon,{2} " \
"-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month)
elif param == 228:
# precipitation
new_units = "kg m-2 -s"
cdo_operator = "-mulc,1000 -divc,{0} -daymean -selmon,{2} " \
"-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month)
elif param == 201:
# maximum
cdo_operator = "-daymax -selmon,{1} -shifttime,-{0}hours".format(nfrp, month)
elif param == 202:
# minmimum
cdo_operator = "-daymin -selmon,{1} -shifttime,-{0}hours".format(nfrp, month)
elif param == 130:
# 850 hPa
cdo_operator = "-daymean -sellevel,85000 -selmon,{0}".format(month)
else:
# default, plain daily mean
cdo_operator = "-daymean -selmon,{0}".format(month)
Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} '
'{1} {2}_{3}.128.grb '
'{2}_{3}_day.nc'.format(cdo_reftime, cdo_operator,
gribfile, param))
if new_units:
handler = Utils.openCdf('{0}_{1}_day.nc'.format(gribfile, param))
for var in handler.variables.values():
if 'code' in var.ncattrs() and var.code == param:
var.units = new_units
break
handler.close()
# concat all vars in one file for day
Utils.nco.ncks(input='{0}_{1}_day.nc'.format(gribfile, param),
output='{0}_day.nc'.format(gribfile), options='-A')
os.remove('{0}_{1}_day.nc'.format(gribfile, param))
# monthly variables
for param in param_mon:
if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)):
continue
new_units = None
if param in (146, 147, 176, 169, 177, 175, 179, 212):
# radiation/heat
new_units = "W m-2"
cdo_operator = "-divc,{0} -monmean -selmon,{2} " \
"-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month)
elif param in (180, 181):
# momentum flux
new_units = "N m-2"
cdo_operator = "-divc,{0} -monmean -selmon,{2} " \
"-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month)
elif param in (144, 228, 205, 182):
# precipitation/evaporation/runoff
new_units = "kg m-2 s-1"
cdo_operator = "-mulc,1000 -divc,{0} -monmean -selmon,{2} " \
"-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month)
elif param == 201:
# mean daily maximum
cdo_operator = "-monmean -daymax -selmon,{1} " \
"-shifttime,-{0}hours".format(nfrp, month)
elif param == 202:
# mean daily minmimum
cdo_operator = "-monmean -daymin -selmon,{1} " \
"-shifttime,-{0}hours".format(nfrp, month)
elif param in (130, 131, 132, 133):
# upper-air
cdo_operator = "-monmean -sellevel,5000,20000,50000,85000 " \
"-selmon,{0}".format(month)
elif param == 129:
# upper-air geopotential
new_units = "m"
cdo_operator = "-divc,9.81 -timmean -sellevel,5000,20000,50000,85000 " \
"-selmon,{0}".format(month)
else:
# default, plain monthly mean
cdo_operator = "-monmean -selmon,{0}".format(month)
Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} '
'{1} {2}_{3}.128.grb '
'{2}_{3}_mon.nc'.format(cdo_reftime, cdo_operator,
gribfile, param))
handler = Utils.openCdf('{0}_{1}_mon.nc'.format(gribfile, param))
if new_units:
for var in handler.variables.values():
if 'code' in var.ncattrs() and var.code == param:
var.units = new_units
break
var_name = None
for key in handler.variables.keys():
if key + '_2' in handler.variables and key not in handler.dimensions:
var_name = key
handler.close()
if var_name is not None:
Utils.nco.ncks(input='{0}_{1}_mon.nc'.format(gribfile, param),
output='{0}_{1}_mon.nc'.format(gribfile, param),
options='-O -v {0}'.format(var_name))
# concat all vars in one file for mon
Utils.nco.ncks(input='{0}_{1}_mon.nc'.format(gribfile, param),
output='{0}_mon.nc'.format(gribfile), options='-A')
os.remove('{0}_{1}_mon.nc'.format(gribfile, param))
# 6-hourly variables
for param in param_6hr:
if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)):
continue
new_units = None
if param == 129:
# geopotential
new_units = "m"
cdo_operator = "-divc,9.81 -sellevel,50000 -selmon,{0}".format(month)
else:
# default, plain monthly mean
cdo_operator = "-selmon,{0}".format(month)
Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} '
'{1} {2}_{3}.128.grb '
'{2}_{3}_6hr.nc'.format(cdo_reftime, cdo_operator,
gribfile, param))
if new_units:
handler = Utils.openCdf('{0}_{1}_6hr.nc'.format(gribfile, param))
for var in handler.variables.values():
if 'code' in var.ncattrs() and var.code == param:
var.units = new_units
break
handler.close()
# concat all vars in one file for 6hr
Utils.nco.ncks(input='{0}_{1}_6hr.nc'.format(gribfile, param),
output='{0}_6hr.nc'.format(gribfile), options='-A')
os.remove('{0}_{1}_6hr.nc'.format(gribfile, param))
for splited_file in glob.glob('{0}_???.128.grb'.format(gribfile)):
os.remove(splited_file)
count += 1
Log.result('Atmospheric grib file {0}/{1} finished'.format(count, len(copied_gribfiles)))
chunk_start = parse_date(startdate)
while os.path.exists(os.path.join(self.scratch_dir,
'ICMGG{0}+{1}.grb'.format(self.expid,
date2str(chunk_start)[:-2]))):
Javier Vegas-Regidor
committed
chunk_end = chunk_end_date(chunk_start, self.exp_manager.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
chunk_files_gg_mon = list()
chunk_files_gg_day = list()
chunk_files_gg_6h = list()
chunk_files_sh_mon = list()
chunk_files_sh_day = list()
chunk_files_sh_6h = list()
Javier Vegas-Regidor
committed
for month in range(0, self.exp_manager.chunk_size):
chunk_file = 'ICMGG{0}+{1}.grb'.format(self.expid,
date2str(add_months(chunk_start, month,
'standard'))[:-2])
os.remove(chunk_file)
os.remove('ICMSH' + chunk_file[5:])
chunk_files_gg_mon.append(chunk_file + '_mon.nc')
chunk_files_gg_day.append(chunk_file + '_day.nc')
chunk_files_gg_6h.append(chunk_file + '_6hr.nc')
chunk_files_sh_mon.append('ICMSH' + chunk_file[5:] + '_mon.nc')
chunk_files_sh_day.append('ICMSH' + chunk_file[5:] + '_day.nc')
chunk_files_sh_6h.append('ICMSH' + chunk_file[5:] + '_6hr.nc')
self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_mon,
'SH', '1m')
self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_day,
'SH', '1d')
self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_6h,
'SH', '6hr')
self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_mon,
'GG', '1m')
self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_day,
'GG', '1d')
self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_6h,
'GG', '6hr')
Javier Vegas-Regidor
committed
chunk_start = chunk_end_date(chunk_start, self.exp_manager.chunk_size, 'month', 'standard')
def _merge_and_cmorize_atmos(self, startdate, member, chunk_start, chunk_end, chunk_files, grid, frequency):
merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid)
for x in range(0, len(chunk_files)):
chunk_files[x] = os.path.join(self.scratch_dir, chunk_files[x])
Utils.cdo.mergetime(input=' '.join(chunk_files), output=merged_file, options='-O')
for filepath in chunk_files:
os.remove(filepath)
self._cmorize_nc_file(merged_file, member, startdate)
def _unpack_cmorfiles(self, filepaths, member_path):
threads = list()
numthreads = Utils.available_cpu_count()
for numthread in range(0, numthreads):
t = threading.Thread(target=DataManager._unzip,
args=([filepaths[numthread::numthreads]]))
threads.append(t)
t.start()
for t in threads:
t.join()
filepaths = glob.glob(os.path.join(member_path, '*.tar')).sort()
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
for numthread in range(0, numthreads):
t = threading.Thread(target=DataManager._untar,
args=(filepaths[numthread::numthreads], member_path))
threads.append(t)
t.start()
for t in threads:
t.join()
if self.experiment_name != self.model:
bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model)
for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_output_'.format(self.model),
'_{0}_{1}_'.format(self.model, self.experiment_name))
good = good.replace('/{0}/{0}'.format(self.model),
'/{0}/{1}'.format(self.model,
self.experiment_name))
Utils.move_file(filepath, good)
os.rmdir(dirpath)
good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name)
for sdate in os.listdir(good_dir):
for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name, sdate),
'_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate))
if good != filepath:
Log.info('Moving {0} to {1}'.format(filename, good))
Utils.move_file(filepath, good)
def _unpack_tar(self, tarfile, startdate, member):
scratch_dir = os.path.join(self.scratch_dir, 'CMOR')
if os.path.exists(scratch_dir):
shutil.rmtree(scratch_dir)
os.makedirs(scratch_dir)
self._untar((tarfile,), scratch_dir)
self._unzip(glob.glob(os.path.join(scratch_dir, '*.gz')))
if os.path.basename(tarfile).startswith('MMA'):
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
for filename in glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')):
Utils.cdo.sp2gpl(options='-O', input=filename, output=temp)
shutil.move(temp, filename)
sh_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc'))
Utils.cdo.mergetime(input=sh_files, output=os.path.join(scratch_dir, 'sh.nc'))
gg_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_GG_*.nc'))
Utils.cdo.mergetime(input=gg_files, output=os.path.join(scratch_dir, 'gg.nc'))
for filename in sh_files + gg_files:
os.remove(filename)
Utils.nco.ncks(input=os.path.join(scratch_dir, 'sh.nc'),
output=os.path.join(scratch_dir, 'gg.nc'), options='-A')
os.remove(os.path.join(scratch_dir, 'sh.nc'))
tar_startdate = tarfile[0:-4].split('_')[5].split('-')
new_name = 'MMA_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate)
shutil.move(os.path.join(scratch_dir, 'gg.nc'), os.path.join(scratch_dir, new_name))
for filename in glob.glob(os.path.join(scratch_dir, '*.nc')):
self._cmorize_nc_file(filename, member, startdate)
def _cmorize_nc_file(self, filename, member, startdate):
Log.info('Processing file {0}', filename)
file_parts = os.path.basename(filename).split('_')
if self.expid in [file_parts[1], file_parts[2]]:
frequency = 'm'
else:
frequency = file_parts[1][1].lower()
variables = dict()
variables['time_counter'] = 'time'
variables['time_counter_bnds'] = 'time_bnds'
# variables['time_counter_bounds'] = 'time_bnds'
variables['tbnds'] = 'bnds'
# variables['axis_nbounds'] = 'bnds'
variables['nav_lat'] = 'lat'
variables['nav_lon'] = 'lon'
variables['x'] = 'i'
variables['y'] = 'j'
Utils.rename_variables(filename, variables, False, True)
handler = Utils.openCdf(filename)
self._add_common_attributes(frequency, handler, member, startdate)
self._update_time_variables(handler, startdate)
handler.sync()
temp = TempFile.get()
Log.info('Splitting file {0}', filename)
for variable in handler.variables.keys():
if variable in ('lon', 'lat', 'time', 'time_bnds', 'leadtime', 'lev', 'icethi',
'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds',
'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds',
'time_counter_bounds', 'ncatice',
'nav_lat_grid_V', 'nav_lat_grid_U', 'nav_lat_grid_T',
'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T',
'depth', 'depth_2', 'depth_3', 'depth_4',
'mlev', 'hyai', 'hybi', 'hyam', 'hybm'):
self.extract_variable(filename, handler, frequency, member, startdate, temp, variable)
Log.result('File {0} cmorized!', filename)
handler.close()
os.remove(filename)
def extract_variable(self, file_path, handler, frequency, member, startdate, temp, variable):
"""
Extracts a variable from a file and creates the CMOR file
:param file_path: path to the file
:type file_path: str
:param handler: netCDF4 handler for the file
:type handler: netCDF$.Dataset
:param frequency: variable's frequency
:type frequency: str
:param member: member
:type member: int
:param startdate: startdate
:type startdate: str
:param temp: temporal file to use
:type temp: str
:param variable: variable's name
:type variable: str
"""
file_parts = os.path.basename(file_path).split('_')
var_cmor = Variable.get_variable(variable)
if var_cmor is None:
return
Utils.nco.ncks(input=file_path, output=temp, options='-v {0}'.format(variable))
if var_cmor.domain == 'ocean':
Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev',
'depth': 'lev'}, False, True)
elif var_cmor.domain in ('land', 'landIce'):
Utils.rename_variables(temp, {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth',
'depth_4': 'sdepth'}, False, True)
elif var_cmor.domain == 'atmos':
Utils.rename_variables(temp, {'depth': 'plev'}, False, True)
handler_cmor = Utils.openCdf(temp)
Utils.copy_variable(handler, handler_cmor, 'lon', False)
Utils.copy_variable(handler, handler_cmor, 'lat', False)
if 'time' in handler_cmor.dimensions.keys():
Utils.copy_variable(handler, handler_cmor, 'leadtime', False)
handler_cmor.modeling_realm = var_cmor.domain
handler_cmor.table_id = 'SPECS_' + self.domain_abbreviation(var_cmor.domain, frequency)
handler_cmor.close()
if frequency == 'd':
frequency = 'day'
elif frequency == 'm':
frequency = 'mon'
elif frequency == 'h':
frequency = '6hr'
else:
raise Exception('Frequency {0} not supported'.format(frequency))
if var_cmor.basin is None:
region = None
else:
region = var_cmor.basin.fullname
self.send_file(temp, var_cmor.domain, var_cmor.short_name, startdate, member,
frequency=frequency, rename_var=variable,
date_str='{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6]),
region=region)
@staticmethod
def _update_time_variables(handler, startdate):
time_var = handler.variables['time']
times = netCDF4.num2date(time_var[:], time_var.units, time_var.calendar)
if type(times[0]) is not datetime:
for x in range(0, times.shape[0]):
Javier Vegas-Regidor
committed
# noinspection PyProtectedMember
times[x] = times[x]._to_real_datetime()
time_var[:] = netCDF4.date2num(times, 'days since 1850-01-01', 'standard')
if 'axis_nbounds' in handler.dimensions:
Javier Vegas-Regidor
committed
handler.renameDimension('axis_nbounds', 'bnds')
if 'time_counter_bounds' in handler.variables:
handler.renameVariable('time_counter_bounds', 'time_bnds')
handler.sync()
if 'time_bnds' in handler.variables:
time_bounds_var = handler.variables['time_bnds']
time_var.bounds = "time_bnds"
time_bounds = netCDF4.num2date(time_bounds_var[:], time_var.units, time_var.calendar)
Javier Vegas-Regidor
committed
if type(time_bounds[0, 0]) is not datetime:
for x in range(0, time_bounds.shape[0]):
for y in range(0, time_bounds.shape[1]):
Javier Vegas-Regidor
committed
# noinspection PyProtectedMember
time_bounds[x, y] = time_bounds[x, y]._to_real_datetime()
time_bounds_var[:] = netCDF4.date2num(time_bounds, 'days since 1850-01-01', 'standard')
time_var.units = 'days since 1850-01-01'
time_var.time_origin = "1850-01-01"
time_var.calendar = 'standard'
time_var.long_name = "Verification time of the forecast"
time_var.standard_name = "time"
time_var.axis = "T"
if 'leadtime' in handler.variables:
var = handler.variables['leadtime']
else:
var = handler.createVariable('leadtime', float, 'time')
var.units = "days"
var.long_name = "Time elapsed since the start of the forecast"
var.standard_name = "forecast_period"
leadtime = (netCDF4.num2date(time_var[:], time_var.units, time_var.calendar) - parse_date(startdate))
for lt in range(0, leadtime.shape[0]):
var[lt] = leadtime[lt].days
def _add_common_attributes(self, frequency, handler, member, startdate):
handler.associated_experiment = self.associated_experiment
handler.batch = '{0}{1}'.format(self.institution, datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'))
handler.contact = 'Pierre-Antoine Bretonnire, pierre-antoine.bretonniere@bsc.es , ' \
'Javier Vegas-Regidor, javier.vegas@bsc.es '
handler.conventions = 'CF-1.6'
handler.creation_date = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
handler.experiment_id = self.experiment_name
handler.forecast_reference_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
if frequency == 'd':
handler.frequency = 'daily'
elif frequency == 'm':
handler.frequency = 'monthly'
handler.institute_id = self.institution
handler.institution = self.institution
handler.initialization_method = self.initialization_method
handler.initialization_description = self.initialization_description
handler.physics_version = self.physics_version
handler.physics_description = self.physics_description
handler.associated_model = self.associated_model
handler.realization = member + 1
handler.source = self.source
handler.startdate = 'S{0}'.format(startdate)
handler.tracking_id = str(uuid.uuid1())
handler.title = "{0} model output prepared for SPECS {1}".format(self.model, self.experiment_name)
Javier Vegas-Regidor
committed
@staticmethod
def _unzip(files):
for filepath in files:
Log.debug('Unzipping {0}', filepath)
Utils.execute_shell_command('gunzip {0}'.format(filepath))
@staticmethod
def _untar(files, member_path):
for filepath in files:
Log.debug('Unpacking {0}', filepath)
Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
if not frequency:
frequency = self.frequency
domain_abbreviation = self.domain_abbreviation(domain, frequency)
Javier Vegas-Regidor
committed
start = parse_date(startdate)
member_plus = str(member + 1)
Javier Vegas-Regidor
committed
member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate,
self.exp_manager.get_member_str(member),
Javier Vegas-Regidor
committed
'outputs', 'output', self.institution, self.model, self.experiment_name,
'S' + startdate, frequency, domain)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
chunk_start = chunk_start_date(start, chunk, self.exp_manager.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.exp_manager.chunk_size, 'month', 'standard')
Javier Vegas-Regidor
committed
chunk_end = previous_day(chunk_end, 'standard')
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}-{8}.nc'.format(var, domain_abbreviation, frequency, self.model,
Javier Vegas-Regidor
committed
self.experiment_name, startdate, member_plus,
"{0:04}{1:02}".format(chunk_start.year,
chunk_start.month),
"{0:04}{1:02}".format(chunk_end.year,
chunk_end.month)))
temp_path = TempFile.get()
shutil.copyfile(filepath, temp_path)
return temp_path
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
rename_var=None, frequency=None, year=None, date_str=None):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
if rename_var:
Utils.rename_variable(filetosend, rename_var, var)
if not frequency:
frequency = self.frequency
domain_abreviattion = self.domain_abbreviation(domain, frequency)
Javier Vegas-Regidor
committed
start = parse_date(startdate)
member_plus = str(member + 1)
Javier Vegas-Regidor
committed
member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate,
self.exp_manager.get_member_str(member),
Javier Vegas-Regidor
committed
'outputs', 'output', self.institution, self.model, self.experiment_name,
'S' + startdate,
Javier Vegas-Regidor
committed
if chunk is not None:
Javier Vegas-Regidor
committed
chunk_start = chunk_start_date(start, chunk, self.exp_manager.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.exp_manager.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
chunk_end.month)
elif year is not None:
if frequency is not 'yr':
raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
time_bound = str(year)
elif date_str is not None:
time_bound = date_str
else:
raise ValueError('Chunk and year can not be None at the same time')
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_'
'{6}.nc'.format(var, domain_abreviattion, self.model,
self.experiment_name, startdate, member_plus, time_bound))
Javier Vegas-Regidor
committed
if region:
Utils.convert2netcdf4(filetosend)
if not os.path.exists(filepath):
handler = Utils.openCdf(filetosend)
handler.createDimension('region')
Javier Vegas-Regidor
committed
var_region = handler.createVariable('region', str, 'region')
var_region[0] = region
original_var = handler.variables[var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
Utils.rename_variable(filetosend, 'new_var', var)
else:
temp = TempFile.get()
shutil.copyfile(filepath, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(filetosend)
value = handler_send.variables[var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, filetosend)
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region')
Javier Vegas-Regidor
committed
cmor_var = Variable.get_variable(var)
if cmor_var:
handler = Utils.openCdf(filetosend)
var_handler = handler.variables[var]
var_handler.standard_name = cmor_var.standard_name
var_handler.long_name = cmor_var.long_name
var_handler.short_name = cmor_var.short_name
handler.sync()
handler.close()
Javier Vegas-Regidor
committed
temp = TempFile.get()
Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetosend, temp])
shutil.move(temp, filetosend)
Javier Vegas-Regidor
committed
Utils.move_file(filetosend, filepath)
self._create_link(domain, filepath, frequency, var)
def _create_link(self, domain, filepath, frequency, var):
if frequency in ('d', 'daily', 'day'):
freq_str = 'daily_mean'
else:
freq_str = 'monthly_mean'
if domain in ['ocean', 'seaIce']:
Javier Vegas-Regidor
committed
variable_folder = '{0}_f6h'.format(var)
Javier Vegas-Regidor
committed
variable_folder = '{0}_f{1}h'.format(var, self.nfrp)
link_path = os.path.join(self.data_dir, self.expid, freq_str, variable_folder)
Javier Vegas-Regidor
committed
if not os.path.exists(link_path):
# This can be a race condition
# noinspection PyBroadException
try:
os.makedirs(link_path)
except Exception:
pass
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
os.remove(link_path)
Javier Vegas-Regidor
committed
os.symlink(filepath, link_path)
@staticmethod
def domain_abbreviation(domain, frequency):
"""
Returns the table name for a domain-frequency pair
:param domain: variable's domain
:type domain: str
:param frequency: variable's frequency
:type frequency: str
:return: variable's table name
:rtype: str
"""
if frequency == 'mon':
if domain == 'seaIce':
domain_abreviattion = 'OImon'
elif domain == 'landIce':
domain_abreviattion = 'LImon'
else:
domain_abreviattion = domain[0].upper() + 'mon'
elif frequency == '6hr':
domain_abreviattion = '6hrPlev'
else:
domain_abreviattion = 'day'
return domain_abreviattion
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and
returns the path to the scratch's copy.
:param year: year to retrieve
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:return: path to the copy created on the scratch folder
:rtype: str
"""
chunk_files = list()
Javier Vegas-Regidor
committed
for chunk in self.exp_manager.get_year_chunks(startdate, year):
chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))
if len(chunk_files) > 1:
temp = TempFile.get()
Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
for chunk_file in chunk_files:
os.remove(chunk_file)
else:
temp = chunk_files[0]
temp2 = TempFile.get()
handler = Utils.openCdf(temp)
time = Utils.get_datetime_from_netcdf(handler)
handler.close()
start = None
end = None
for x in range(0, len(time)):
date = time[x]
if date.year == year:
if date.month == 1:
start = x
elif date.month == 12:
end = x
Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
os.remove(temp)
return temp2
"""
Class to characterize a CMOR variable. It also contains the static method to make the match between thje original
name and the standard name. Requires cmor_table.csv to work.
"""
def __init__(self, line):
self.short_name = line[1]
self.standard_name = line[2]
self.long_name = line[3]
self.domain = line[4]
self.basin = Basins.parse(line[5])
Javier Vegas-Regidor
committed
self._dict_variables = None
@classmethod
def get_variable(cls, original_name):
"""
Returns the cmor variable instance given a varuiable name
:param original_name: original variable's name
:type original_name: str
:return: CMOR variable
:rtype: Variable
"""
return Variable._dict_variables[original_name.lower()]
except KeyError:
Log.error('Variable {0} is not defined'.format(original_name))
return None
Javier Vegas-Regidor
committed
@classmethod
def load_variables(cls):
"""
Loads the cmor_table.csv and creates the variables dictionary
:return:
"""
Variable._dict_variables = dict()
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_table.csv'), 'rb') as csvfile: