Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
import re
from cfunits import Units
from earthdiagnostics.variable import Variable
"""
Class to manage the data repositories
Javier Vegas-Regidor
committed
:param config:
:type config: Config
Javier Vegas-Regidor
committed
def __init__(self, config):
self.config = config
self.experiment = config.experiment
self._checked_vars = list()
Javier Vegas-Regidor
committed
Variable.load_variables()
Javier Vegas-Regidor
committed
self.lock = threading.Lock()
Javier Vegas-Regidor
committed
self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, box, grid, None, None)
Javier Vegas-Regidor
committed
temp_path = TempFile.get()
shutil.copyfile(filepath, temp_path)
return temp_path
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
Javier Vegas-Regidor
committed
rename_var=None, frequency=None, year=None, date_str=None, move_old=False):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
Javier Vegas-Regidor
committed
:param move_old: if true, moves files following older conventions that may be found on the links folder
:type move_old: bool
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
cmor_var = Variable.get_variable(var)
Javier Vegas-Regidor
committed
if rename_var and rename_var != var:
Utils.rename_variable(filetosend, rename_var, var)
elif original_var != var:
Utils.rename_variable(filetosend, original_var, var)
if not frequency:
Javier Vegas-Regidor
committed
frequency = self.config.frequency
Javier Vegas-Regidor
committed
domain = DataManager.correct_domain(domain)
filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, box,
grid, year, date_str)
netcdf_file = NetCDFFile(filepath, filetosend, domain, var, cmor_var)
netcdf_file.send()
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
self._create_link(domain, filepath, frequency, var, grid, move_old)
@staticmethod
def _get_final_var_name(box, var):
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
return var
Javier Vegas-Regidor
committed
@staticmethod
def correct_domain(domain):
"""
Corrects domain capitalization
:param domain: domain name
:type domain: str
:return: domain with correct capitalization
:rtype: str
"""
Javier Vegas-Regidor
committed
domain = domain.lower()
if domain == 'seaice':
return 'seaIce'
elif domain == 'landice':
return 'landIce'
return domain
Javier Vegas-Regidor
committed
def get_varfolder(self, domain, var):
if domain in ['ocean', 'seaIce']:
return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep)
Javier Vegas-Regidor
committed
return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep)
def _create_link(self, domain, filepath, frequency, var, grid, move_old):
freq_str = self.frequency_folder_name(frequency)
Javier Vegas-Regidor
committed
if not grid:
grid = 'original'
var_grid = '{0}-{1}'.format(var, grid)
Javier Vegas-Regidor
committed
variable_folder = self.get_varfolder(domain, var)
vargrid_folder = self.get_varfolder(domain, var_grid)
Javier Vegas-Regidor
committed
if grid == 'original':
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
if os.path.islink(link_path):
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
Javier Vegas-Regidor
committed
Utils.create_folder_tree(link_path)
Javier Vegas-Regidor
committed
else:
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
Javier Vegas-Regidor
committed
Utils.create_folder_tree(link_path)
Javier Vegas-Regidor
committed
default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))
if os.path.islink(default_path):
Javier Vegas-Regidor
committed
os.remove(default_path)
elif os.path.isdir(default_path):
shutil.move(default_path, original_path)
Javier Vegas-Regidor
committed
os.symlink(link_path, default_path)
if move_old:
Javier Vegas-Regidor
committed
if self.lock.acquire(False):
if link_path not in self._checked_vars:
self._checked_vars.append(link_path)
Javier Vegas-Regidor
committed
self.lock.release()
old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep))
regex = re.compile(var + '_[0-9]{6,8}\.nc')
for filename in os.listdir(link_path):
if regex.match(filename):
Javier Vegas-Regidor
committed
Utils.create_folder_tree(old_path)
Javier Vegas-Regidor
committed
Utils.move_file(os.path.join(link_path, filename),
os.path.join(old_path, filename))
Javier Vegas-Regidor
committed
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
os.remove(link_path)
if not os.path.exists(filepath):
raise ValueError('Original file {0} does not exists'.format(filepath))
Javier Vegas-Regidor
committed
os.symlink(filepath, link_path)
Javier Vegas-Regidor
committed
def frequency_folder_name(self, frequency):
if frequency in ('d', 'daily', 'day'):
freq_str = 'daily_mean'
elif frequency.endswith('hr'):
freq_str = frequency[:-2] + 'hourly'
else:
freq_str = 'monthly_mean'
return freq_str
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# Abstract methods
def get_file_path(self, startdate, member, domain, var, chunk, frequency,
box=None, grid=None, year=None, date_str=None):
"""
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int | None
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: file absolute path
:rtype: str
"""
raise NotImplementedError()
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
"""
Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and
returns the path to the scratch's copy.
:param year: year to retrieve
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:return: path to the copy created on the scratch folder
:rtype: str
"""
raise NotImplementedError()
# Overridable methods (not mandatory)
def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
frequency=None, year=None, date_str=None, move_old=False):
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
pass
class NetCDFFile(object):
"""
Class to manage netCDF file and pr
:param remote_file:
:type remote_file: str
:param local_file:
:type local_file: str
:param domain:
:type domain: str
:param var:
:type var: str
:param cmor_var:
:type cmor_var: Variable
"""
def __init__(self, remote_file, local_file, domain, var, cmor_var):
self.remote_file = remote_file
Javier Vegas-Regidor
committed
self.local_file = local_file
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
self.domain = domain
self.var = var
self.cmor_var = cmor_var
self.region = None
self.frequency = None
def send(self):
Utils.convert2netcdf4(self.local_file)
if self.region:
self._prepare_region()
if self.cmor_var:
self._correct_metadata()
self._rename_coordinate_variables()
Utils.move_file(self.local_file, self.remote_file)
def _prepare_region(self):
if not os.path.exists(self.remote_file):
self._add_region_dimension_to_var()
else:
self._update_var_with_region_data()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O --fix_rec_dmn region')
def _update_var_with_region_data(self):
temp = TempFile.get()
shutil.copyfile(self.remote_file, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(self.local_file)
value = handler_send.variables[self.var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == self.region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = self.region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[self.var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, self.local_file)
def _add_region_dimension_to_var(self):
handler = Utils.openCdf(self.local_file)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = self.region
original_var = handler.variables[self.var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.var))
Utils.rename_variable(self.local_file, 'new_var', self.var)
def _correct_metadata(self):
handler = Utils.openCdf(self.local_file)
var_handler = handler.variables[self.var]
self._fix_variable_name(var_handler)
handler.modeling_realm = self.cmor_var.domain
handler.table_id = 'Table {0} (December 2013)'.format(Variable.get_table_name(self.cmor_var.domain,
self.frequency))
if self.cmor_var.units:
self._fix_units(var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(handler)
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type)
def _fix_variable_name(self, var_handler):
var_handler.standard_name = self.cmor_var.standard_name
var_handler.long_name = self.cmor_var.long_name
var_handler.short_name = self.cmor_var.short_name
def _fix_values_metadata(self, var_type):
if self.cmor_var.valid_min != '':
valid_min = '-a valid_min, {0}, o, {1}, "{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min)
else:
valid_min = ''
if self.cmor_var.valid_max != '':
valid_max = '-a valid_max, {0}, o, {1}, "{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max)
else:
valid_max = ''
Utils.nco.ncatted(input=self.local_file, output=self.local_file,
options='-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.var, var_type.char,
valid_min, valid_max))
def _fix_coordinate_variables_metadata(self, handler):
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if self.domain == 'ocean':
handler.variables['lev'].standard_name = 'depth'
if 'lon' in handler.variables:
handler.variables['lon'].short_name = 'lon'
handler.variables['lon'].standard_name = 'longitude'
if 'lat' in handler.variables:
handler.variables['lat'].short_name = 'lat'
handler.variables['lat'].standard_name = 'latitude'
def _fix_units(self, var_handler):
if 'units' not in var_handler.ncattrs():
return
if var_handler.units == 'PSU':
var_handler.units = 'psu'
if var_handler.units == 'C' and self.cmor_var.units == 'K':
var_handler.units = 'deg_C'
if self.cmor_var.units != var_handler.units:
self._convert_units(var_handler)
var_handler.units = self.cmor_var.units
def _convert_units(self, var_handler):
try:
self._convert_using_cfunits(var_handler)
except ValueError:
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
self.cmor_var.units)
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
def _convert_using_cfunits(self, var_handler):
new_unit = Units(self.cmor_var.units)
old_unit = Units(var_handler.units)
var_handler[:] = Units.conform(var_handler[:], old_unit, new_unit, inplace=True)
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = Units.conform(float(var_handler.valid_min), old_unit, new_unit,
inplace=True)
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = Units.conform(float(var_handler.valid_max), old_unit, new_unit,
inplace=True)
def _rename_coordinate_variables(self):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = 'lat'
variables['nav_lon_grid_V'] = 'lon'
variables['nav_lat_grid_U'] = 'lat'
variables['nav_lon_grid_U'] = 'lon'
variables['nav_lat_grid_T'] = 'lat'
variables['nav_lon_grid_T'] = 'lon'
Utils.rename_variables(self.local_file, variables, False, True)
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
Javier Vegas-Regidor
committed
return 1 / conversion.factor, -conversion.offset