Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
from bscearth.utils.log import Log
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.variable_type import VariableType
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.datafile import NetCDFFile as NCfile, StorageStatus, LocalStatus
"""
Class to manage the data repositories
Javier Vegas-Regidor
committed
:param config:
:type config: Config
Javier Vegas-Regidor
committed
def __init__(self, config):
self.config = config
self.experiment = config.experiment
self._checked_vars = list()
self.variable_list.load_variables(self.config.data_convention)
Javier Vegas-Regidor
committed
self.lock = threading.Lock()
def _get_file_from_storage(self, filepath):
if filepath not in self.requested_files:
self.requested_files[filepath] = NCfile.from_storage(filepath)
file_object = self.requested_files[filepath]
file_object.local_satatus = LocalStatus.PENDING
def _declare_generated_file(self, remote_file, domain, final_var, cmor_var, data_convention,
region, diagnostic, grid, var_type, original_var):
if remote_file not in self.requested_files:
self.requested_files[remote_file] = NCfile.to_storage(remote_file)
file_object = self.requested_files[remote_file]
file_object.diagnostic = diagnostic
file_object.var_type = var_type
file_object.grid = grid
file_object.data_manager = self
file_object.domain = domain
file_object.var = original_var
file_object.final_name = final_var
file_object.cmor_var = cmor_var
file_object.region = region
file_object.data_convention = data_convention
file_object.storage_status = StorageStatus.PENDING
return file_object
@staticmethod
def _get_final_var_name(box, var):
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
return var
def get_varfolder(self, domain, var, grid=None):
if grid:
var = '{0}-{1}'.format(var, grid)
if domain in [ModelingRealms.ocean, ModelingRealms.seaIce]:
Javier Vegas-Regidor
committed
return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep)
Javier Vegas-Regidor
committed
return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep)
def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
freq_str = frequency.folder_name(vartype)
Javier Vegas-Regidor
committed
if not grid:
grid = 'original'
Javier Vegas-Regidor
committed
variable_folder = self.get_varfolder(domain, var)
vargrid_folder = self.get_varfolder(domain, var, grid)
Javier Vegas-Regidor
committed
self.lock.acquire()
try:
if grid == 'original':
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
if os.path.islink(link_path):
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
Utils.create_folder_tree(link_path)
else:
Javier Vegas-Regidor
committed
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
Utils.create_folder_tree(link_path)
default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))
if os.path.islink(default_path):
os.remove(default_path)
elif os.path.isdir(default_path):
shutil.move(default_path, original_path)
os.symlink(link_path, default_path)
if move_old and link_path not in self._checked_vars:
self._checked_vars.append(link_path)
old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep))
regex = re.compile(var + '_[0-9]{6,8}\.nc')
for filename in os.listdir(link_path):
if regex.match(filename):
Utils.create_folder_tree(old_path)
Utils.move_file(os.path.join(link_path, filename),
os.path.join(old_path, filename))
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
os.remove(link_path)
if not os.path.exists(filepath):
raise ValueError('Original file {0} does not exists'.format(filepath))
if not os.path.isdir(os.path.dirname(link_path)):
Utils.create_folder_tree(os.path.dirname(link_path))
relative_path = os.path.relpath(filepath, os.path.dirname(link_path))
os.symlink(relative_path, link_path)
except:
raise
finally:
Javier Vegas-Regidor
committed
self.lock.release()
# Overridable methods (not mandatory)
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
pass
def prepare(self):
"""
Prepares the data to be used by the diagnostic.
:return:
"""
pass
class NetCDFFile(object):
"""
Class to manage netCDF file and pr
"""
def __init__(self):
self.remote_file = None
self.local_file = None
self.domain = None
self.var = None
self.cmor_var = None
self.region = None
self.data_convention = None
@staticmethod
def from_storage(filepath):
file_object = NetCDFFile()
file_object.remote_file = filepath
return file_object
@staticmethod
def to_storage(domain, var, cmor_var, data_convention, region):
new_object = NetCDFFile()
new_object.domain = domain
new_object.var = var
new_object.cmor_var = cmor_var
new_object.region = region
new_object.frequency = None
new_object.data_convention = data_convention
return new_object
def download(self):
try:
if not self.local_file:
self.local_file = TempFile.get()
Utils.copy_file(self.remote_file, self.local_file)
Log.info('File {0} ready!', self.remote_file)
except Exception as ex:
os.remove(self.local_file)
Log.error('File {0} not available: {1}', self.remote_file, ex)
def send(self):
Utils.convert2netcdf4(self.local_file)
Javier Vegas-Regidor
committed
self._correct_metadata()
self._prepare_region()
self._rename_coordinate_variables()
Utils.move_file(self.local_file, self.remote_file)
def _prepare_region(self):
Javier Vegas-Regidor
committed
if not self.region:
return
if not os.path.exists(self.remote_file):
self._add_region_dimension_to_var()
else:
self._update_var_with_region_data()
Javier Vegas-Regidor
committed
self._correct_metadata()
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O --fix_rec_dmn region')
def _update_var_with_region_data(self):
temp = TempFile.get()
shutil.copyfile(self.remote_file, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(self.local_file)
value = handler_send.variables[self.var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == self.region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = self.region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[self.var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, self.local_file)
def _add_region_dimension_to_var(self):
handler = Utils.openCdf(self.local_file)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = self.region
original_var = handler.variables[self.var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.var))
Utils.rename_variable(self.local_file, 'new_var', self.var)
def _correct_metadata(self):
Javier Vegas-Regidor
committed
if not self.cmor_var:
return
handler = Utils.openCdf(self.local_file)
var_handler = handler.variables[self.var]
self._fix_variable_name(var_handler)
Javier Vegas-Regidor
committed
handler.modeling_realm = self.cmor_var.domain.name
table = self.cmor_var.get_table(self.frequency, self.data_convention)
handler.table_id = 'Table {0} ({1})'.format(table.name, table.date)
if self.cmor_var.units:
self._fix_units(var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(handler)
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type)
def _fix_variable_name(self, var_handler):
var_handler.standard_name = self.cmor_var.standard_name
var_handler.long_name = self.cmor_var.long_name
var_handler.short_name = self.cmor_var.short_name
def _fix_values_metadata(self, var_type):
if self.cmor_var.valid_min != '':
valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min)
else:
valid_min = ''
if self.cmor_var.valid_max != '':
valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max)
else:
valid_max = ''
Utils.nco.ncatted(input=self.local_file, output=self.local_file,
options='-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.var, var_type.char,
valid_min, valid_max))
def _fix_coordinate_variables_metadata(self, handler):
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if self.domain == ModelingRealms.ocean:
handler.variables['lev'].standard_name = 'depth'
if 'lon' in handler.variables:
handler.variables['lon'].short_name = 'lon'
handler.variables['lon'].standard_name = 'longitude'
if 'lat' in handler.variables:
handler.variables['lat'].short_name = 'lat'
handler.variables['lat'].standard_name = 'latitude'
EQUIVALENT_UNITS = {'-': '1.0', 'fractional': '1.0', 'psu': 'psu'}
def _fix_units(self, var_handler):
if 'units' not in var_handler.ncattrs():
return
if var_handler.units.lower() in NetCDFFile.EQUIVALENT_UNITS:
var_handler.units = NetCDFFile.EQUIVALENT_UNITS[var_handler.units.lower()]
elif var_handler.units == 'C' or self.cmor_var.units == 'K':
var_handler.units = 'deg_C'
if self.cmor_var.units != var_handler.units:
self._convert_units(var_handler)
var_handler.units = self.cmor_var.units
def _convert_units(self, var_handler):
try:
Utils.convert_units(var_handler, self.cmor_var.units)
except ValueError as ex:
Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
self.cmor_var.short_name)
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
self.cmor_var.units)
Javier Vegas-Regidor
committed
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
def _rename_coordinate_variables(self):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = 'lat'
variables['nav_lon_grid_V'] = 'lon'
variables['nav_lat_grid_U'] = 'lat'
variables['nav_lon_grid_U'] = 'lon'
variables['nav_lat_grid_T'] = 'lat'
variables['nav_lon_grid_T'] = 'lon'
Utils.rename_variables(self.local_file, variables, False, True)
def add_diagnostic_history(self, diagnostic):
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,
Javier Vegas-Regidor
committed
diagnostic)
self._add_history_line(history_line)
def add_cmorization_history(self):
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)
self._add_history_line(history_line)
def _add_history_line(self, history_line):
utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
history_line = '{0}: {1};'.format(utc_datetime, history_line)
handler = Utils.openCdf(self.local_file)
try:
history_line = history_line + handler.history
Javier Vegas-Regidor
committed
history_line = history_line
Javier Vegas-Regidor
committed
handler.history = Utils.convert_to_ASCII_if_possible(history_line)
Javier Vegas-Regidor
committed
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
Javier Vegas-Regidor
committed
return 1 / conversion.factor, -conversion.offset