Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
import re
from cfunits import Units
from earthdiagnostics.variable import Variable
"""
Class to manage the data repositories
Javier Vegas-Regidor
committed
:param config:
:type config: Config
Javier Vegas-Regidor
committed
def __init__(self, config):
self.config = config
self.experiment = config.experiment
self._checked_vars = list()
Javier Vegas-Regidor
committed
Variable.load_variables()
Javier Vegas-Regidor
committed
self.lock = threading.Lock()
Javier Vegas-Regidor
committed
self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, box, grid, None, None)
Javier Vegas-Regidor
committed
temp_path = TempFile.get()
shutil.copyfile(filepath, temp_path)
return temp_path
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
Javier Vegas-Regidor
committed
rename_var=None, frequency=None, year=None, date_str=None, move_old=False):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
Javier Vegas-Regidor
committed
:param move_old: if true, moves files following older conventions that may be found on the links folder
:type move_old: bool
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
cmor_var = Variable.get_variable(var)
Javier Vegas-Regidor
committed
if rename_var and rename_var != var:
Utils.rename_variable(filetosend, rename_var, var)
elif original_var != var:
Utils.rename_variable(filetosend, original_var, var)
if not frequency:
Javier Vegas-Regidor
committed
frequency = self.config.frequency
Javier Vegas-Regidor
committed
domain = DataManager.correct_domain(domain)
filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, box,
grid, year, date_str)
Javier Vegas-Regidor
committed
Utils.convert2netcdf4(filetosend)
if region:
self._prepare_region(filepath, filetosend, region, var)
if cmor_var:
Javier Vegas-Regidor
committed
self._correct_metadata(cmor_var, domain, filetosend, frequency, var)
self._rename_coordinate_variables(filetosend)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
Utils.move_file(filetosend, filepath)
self._create_link(domain, filepath, frequency, var, grid, move_old)
Javier Vegas-Regidor
committed
def _rename_coordinate_variables(self, filetosend):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = 'lat'
variables['nav_lon_grid_V'] = 'lon'
variables['nav_lat_grid_U'] = 'lat'
variables['nav_lon_grid_U'] = 'lon'
variables['nav_lat_grid_T'] = 'lat'
variables['nav_lon_grid_T'] = 'lon'
Utils.rename_variables(filetosend, variables, False, True)
Javier Vegas-Regidor
committed
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def _correct_metadata(self, cmor_var, domain, filetosend, frequency, var):
handler = Utils.openCdf(filetosend)
var_handler = handler.variables[var]
var_handler.standard_name = cmor_var.standard_name
var_handler.long_name = cmor_var.long_name
var_handler.short_name = cmor_var.short_name
var_type = var_handler.dtype
handler.modeling_realm = cmor_var.domain
handler.table_id = 'Table {0} (December 2013)'.format(self.get_domain_abbreviation(cmor_var.domain,
frequency))
if cmor_var.units:
self._fix_units(cmor_var, var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(domain, handler)
handler.close()
self._fix_values_metadata(cmor_var, filetosend, var, var_type)
def _fix_values_metadata(self, cmor_var, filetosend, var, var_type):
if cmor_var.valid_min != '':
valid_min = '-a valid_min, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_min)
else:
valid_min = ''
if cmor_var.valid_max != '':
valid_max = '-a valid_max, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_max)
else:
valid_max = ''
Utils.nco.ncatted(input=filetosend, output=filetosend,
options='-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(var, var_type.char,
valid_min, valid_max))
def _fix_coordinate_variables_metadata(self, domain, handler):
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if domain == 'ocean':
handler.variables['lev'].standard_name = 'depth'
if 'lon' in handler.variables:
handler.variables['lon'].short_name = 'lon'
handler.variables['lon'].standard_name = 'longitude'
if 'lat' in handler.variables:
handler.variables['lat'].short_name = 'lat'
handler.variables['lat'].standard_name = 'latitude'
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@staticmethod
def _fix_units(cmor_var, var_handler):
if 'units' in var_handler.ncattrs():
if var_handler.units == 'PSU':
var_handler.units = 'psu'
if var_handler.units == 'C' and cmor_var.units == 'K':
var_handler.units = 'deg_C'
if cmor_var.units != var_handler.units:
try:
new_unit = Units(cmor_var.units)
old_unit = Units(var_handler.units)
var_handler[:] = Units.conform(var_handler[:], old_unit, new_unit, inplace=True)
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = Units.conform(float(var_handler.valid_min), old_unit, new_unit,
inplace=True)
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = Units.conform(float(var_handler.valid_max), old_unit, new_unit,
inplace=True)
except ValueError:
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
cmor_var.units)
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
var_handler.units = cmor_var.units
@staticmethod
def _prepare_region(filepath, filetosend, region, var):
if not os.path.exists(filepath):
handler = Utils.openCdf(filetosend)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = region
original_var = handler.variables[var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
Utils.rename_variable(filetosend, 'new_var', var)
else:
temp = TempFile.get()
shutil.copyfile(filepath, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(filetosend)
value = handler_send.variables[var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, filetosend)
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region')
@staticmethod
def _get_final_var_name(box, var):
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
return var
Javier Vegas-Regidor
committed
@staticmethod
def correct_domain(domain):
"""
Corrects domain capitalization
:param domain: domain name
:type domain: str
:return: domain with correct capitalization
:rtype: str
"""
Javier Vegas-Regidor
committed
domain = domain.lower()
if domain == 'seaice':
return 'seaIce'
elif domain == 'landice':
return 'landIce'
return domain
Javier Vegas-Regidor
committed
def get_varfolder(self, domain, var):
if domain in ['ocean', 'seaIce']:
return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep)
Javier Vegas-Regidor
committed
return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep)
def _create_link(self, domain, filepath, frequency, var, grid, move_old):
freq_str = self.frequency_folder_name(frequency)
Javier Vegas-Regidor
committed
if not grid:
grid = 'original'
var_grid = '{0}-{1}'.format(var, grid)
Javier Vegas-Regidor
committed
variable_folder = self.get_varfolder(domain, var)
vargrid_folder = self.get_varfolder(domain, var_grid)
Javier Vegas-Regidor
committed
if grid == 'original':
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
if os.path.islink(link_path):
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
Javier Vegas-Regidor
committed
Utils.create_folder_tree(link_path)
Javier Vegas-Regidor
committed
else:
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
Javier Vegas-Regidor
committed
Utils.create_folder_tree(link_path)
Javier Vegas-Regidor
committed
default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))
if os.path.islink(default_path):
Javier Vegas-Regidor
committed
os.remove(default_path)
elif os.path.isdir(default_path):
shutil.move(default_path, original_path)
Javier Vegas-Regidor
committed
os.symlink(link_path, default_path)
if move_old:
Javier Vegas-Regidor
committed
if self.lock.acquire(False):
if link_path not in self._checked_vars:
self._checked_vars.append(link_path)
Javier Vegas-Regidor
committed
self.lock.release()
old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep))
regex = re.compile(var + '_[0-9]{6,8}\.nc')
for filename in os.listdir(link_path):
if regex.match(filename):
Javier Vegas-Regidor
committed
Utils.create_folder_tree(old_path)
Javier Vegas-Regidor
committed
Utils.move_file(os.path.join(link_path, filename),
os.path.join(old_path, filename))
Javier Vegas-Regidor
committed
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
os.remove(link_path)
if not os.path.exists(filepath):
raise ValueError('Original file {0} does not exists'.format(filepath))
Javier Vegas-Regidor
committed
os.symlink(filepath, link_path)
Javier Vegas-Regidor
committed
def frequency_folder_name(self, frequency):
if frequency in ('d', 'daily', 'day'):
freq_str = 'daily_mean'
elif frequency.endswith('hr'):
freq_str = frequency[:-2] + 'hourly'
else:
freq_str = 'monthly_mean'
return freq_str
def get_domain_abbreviation(domain, frequency):
"""
Returns the table name for a domain-frequency pair
:param domain: variable's domain
:type domain: str
:param frequency: variable's frequency
:type frequency: str
:return: variable's table name
:rtype: str
"""
if frequency == 'mon':
if domain == 'seaIce':
domain_abreviattion = 'OImon'
elif domain == 'landIce':
domain_abreviattion = 'LImon'
else:
domain_abreviattion = domain[0].upper() + 'mon'
elif frequency == '6hr':
domain_abreviattion = '6hrPlev'
else:
domain_abreviattion = 'day'
return domain_abreviattion
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# Abstract methods
def get_file_path(self, startdate, member, domain, var, chunk, frequency,
box=None, grid=None, year=None, date_str=None):
"""
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int | None
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: file absolute path
:rtype: str
"""
raise NotImplementedError()
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
"""
Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and
returns the path to the scratch's copy.
:param year: year to retrieve
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:return: path to the copy created on the scratch folder
:rtype: str
"""
raise NotImplementedError()
# Overridable methods (not mandatory)
def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
frequency=None, year=None, date_str=None, move_old=False):
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
pass
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
Javier Vegas-Regidor
committed
return 1 / conversion.factor, -conversion.offset