Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
from bscearth.utils.log import Log
from earthdiagnostics.variable import Variable, VariableManager
from earthdiagnostics.variable_type import VariableType
from earthdiagnostics.modelingrealm import ModelingRealms
"""
Class to manage the data repositories
Javier Vegas-Regidor
committed
:param config:
:type config: Config
Javier Vegas-Regidor
committed
def __init__(self, config):
self.config = config
self.experiment = config.experiment
self._checked_vars = list()
Javier Vegas-Regidor
committed
self.lock = threading.Lock()
def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
Checks if a given file exists
:param domain: CMOR domain
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
raise NotImplementedError()
Javier Vegas-Regidor
committed
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
raise NotImplementedError()
Javier Vegas-Regidor
committed
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None,
box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
diagnostic=None, cmorized=False, vartype=VariableType.MEAN):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
Javier Vegas-Regidor
committed
:param move_old: if true, moves files following older conventions that may be found on the links folder
:type move_old: bool
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency
Javier Vegas-Regidor
committed
:param diagnostic: diagnostic used to generate the file
:type diagnostic: Diagnostic
:param cmorized: flag to indicate if file was generated in cmorization process
:type cmorized: bool
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
raise NotImplementedError()
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
"""
Ge a file containing all the data for one year for one variable
:param domain: variable's domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable's name
:type var: str
:param startdate: startdate to retrieve
:type startdate: str
:param member: member to retrieve
:type member: int
:param year: year to retrieve
:type year: int
:param grid: variable's grid
:type grid: str
:param box: variable's box
:type box: Box
:return:
"""
raise NotImplementedError()
@staticmethod
def _get_final_var_name(box, var):
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
return var
def get_varfolder(self, domain, var, grid=None):
if grid:
var = '{0}-{1}'.format(var, grid)
if domain in [ModelingRealms.ocean, ModelingRealms.seaIce]:
Javier Vegas-Regidor
committed
return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep)
Javier Vegas-Regidor
committed
return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep)
Javier Vegas-Regidor
committed
def _create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
freq_str = frequency.folder_name(vartype)
Javier Vegas-Regidor
committed
if not grid:
grid = 'original'
Javier Vegas-Regidor
committed
variable_folder = self.get_varfolder(domain, var)
vargrid_folder = self.get_varfolder(domain, var, grid)
Javier Vegas-Regidor
committed
self.lock.acquire()
try:
if grid == 'original':
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
if os.path.islink(link_path):
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
Utils.create_folder_tree(link_path)
else:
Javier Vegas-Regidor
committed
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
Utils.create_folder_tree(link_path)
default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))
if os.path.islink(default_path):
os.remove(default_path)
elif os.path.isdir(default_path):
shutil.move(default_path, original_path)
os.symlink(link_path, default_path)
if move_old and link_path not in self._checked_vars:
self._checked_vars.append(link_path)
old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep))
regex = re.compile(var + '_[0-9]{6,8}\.nc')
for filename in os.listdir(link_path):
if regex.match(filename):
Utils.create_folder_tree(old_path)
Utils.move_file(os.path.join(link_path, filename),
os.path.join(old_path, filename))
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
os.remove(link_path)
if not os.path.exists(filepath):
raise ValueError('Original file {0} does not exists'.format(filepath))
if not os.path.isdir(os.path.dirname(link_path)):
Utils.create_folder_tree(os.path.dirname(link_path))
relative_path = os.path.relpath(filepath, os.path.dirname(link_path))
os.symlink(relative_path, link_path)
except:
raise
finally:
Javier Vegas-Regidor
committed
self.lock.release()
# Overridable methods (not mandatory)
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
pass
def prepare(self):
"""
Prepares the data to be used by the diagnostic.
:return:
"""
pass
class NetCDFFile(object):
"""
Class to manage netCDF file and pr
:param remote_file:
:type remote_file: str
:param local_file:
:type local_file: str
:param domain:
Javier Vegas-Regidor
committed
:type domain: Domain
:param var:
:type var: str
:param cmor_var:
:type cmor_var: Variable
"""
def __init__(self, remote_file, local_file, domain, var, cmor_var, data_convention, region):
Javier Vegas-Regidor
committed
self.local_file = local_file
self.domain = domain
self.var = var
self.cmor_var = cmor_var
self.data_convention = data_convention
def send(self):
Utils.convert2netcdf4(self.local_file)
Javier Vegas-Regidor
committed
self._correct_metadata()
self._prepare_region()
self._rename_coordinate_variables()
Utils.move_file(self.local_file, self.remote_file)
Utils.give_group_write_permissions(self.remote_file)
Javier Vegas-Regidor
committed
if not self.region:
return
if not os.path.exists(self.remote_file):
self._add_region_dimension_to_var()
else:
self._update_var_with_region_data()
Javier Vegas-Regidor
committed
self._correct_metadata()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('--fix_rec_dmn region',))
def _update_var_with_region_data(self):
temp = TempFile.get()
shutil.copyfile(self.remote_file, temp)
Utils.nco.ncks(input=temp, output=temp, options=('--mk_rec_dmn region',))
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(self.local_file)
value = handler_send.variables[self.var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == self.region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = self.region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[self.var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, self.local_file)
def _add_region_dimension_to_var(self):
handler = Utils.openCdf(self.local_file)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = self.region
original_var = handler.variables[self.var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('-x -v {0}'.format(self.var),))
Utils.rename_variable(self.local_file, 'new_var', self.var)
def _correct_metadata(self):
Javier Vegas-Regidor
committed
if not self.cmor_var:
return
handler = Utils.openCdf(self.local_file)
var_handler = handler.variables[self.var]
self._fix_variable_name(var_handler)
Javier Vegas-Regidor
committed
handler.modeling_realm = self.cmor_var.domain.name
table = self.cmor_var.get_table(self.frequency, self.data_convention)
handler.table_id = 'Table {0} ({1})'.format(table.name, table.date)
if self.cmor_var.units:
self._fix_units(var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(handler)
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type)
def _fix_variable_name(self, var_handler):
var_handler.standard_name = self.cmor_var.standard_name
var_handler.long_name = self.cmor_var.long_name
var_handler.short_name = self.cmor_var.short_name
def _fix_values_metadata(self, var_type):
options = ['-a _FillValue,{0},o,{1},"1.e20"'.format(self.var, var_type.char),
'-a missingValue,{0},o,{1},"1.e20"'.format(self.var, var_type.char)]
options.append('-a valid_min,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min))
options.append('-a valid_max,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max))
Utils.nco.ncatted(input=self.local_file, output=self.local_file, options=options)
def _fix_coordinate_variables_metadata(self, handler):
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if self.domain == ModelingRealms.ocean:
handler.variables['lev'].standard_name = 'depth'
if 'lon' in handler.variables:
handler.variables['lon'].short_name = 'lon'
handler.variables['lon'].standard_name = 'longitude'
if 'lat' in handler.variables:
handler.variables['lat'].short_name = 'lat'
handler.variables['lat'].standard_name = 'latitude'
EQUIVALENT_UNITS = {'-': '1.0', 'fractional': '1.0', 'psu': 'psu'}
def _fix_units(self, var_handler):
if 'units' not in var_handler.ncattrs():
return
if var_handler.units.lower() in NetCDFFile.EQUIVALENT_UNITS:
var_handler.units = NetCDFFile.EQUIVALENT_UNITS[var_handler.units.lower()]
elif var_handler.units == 'C' or self.cmor_var.units == 'K':
var_handler.units = 'deg_C'
if self.cmor_var.units != var_handler.units:
self._convert_units(var_handler)
var_handler.units = self.cmor_var.units
def _convert_units(self, var_handler):
try:
Utils.convert_units(var_handler, self.cmor_var.units)
except ValueError as ex:
Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
self.cmor_var.short_name)
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
self.cmor_var.units)
Javier Vegas-Regidor
committed
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
def _rename_coordinate_variables(self):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = 'lat'
variables['nav_lon_grid_V'] = 'lon'
variables['nav_lat_grid_U'] = 'lat'
variables['nav_lon_grid_U'] = 'lon'
variables['nav_lat_grid_T'] = 'lat'
variables['nav_lon_grid_T'] = 'lon'
Utils.rename_variables(self.local_file, variables, False, True)
def add_diagnostic_history(self, diagnostic):
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,
Javier Vegas-Regidor
committed
diagnostic)
self._add_history_line(history_line)
def add_cmorization_history(self):
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)
self._add_history_line(history_line)
def _add_history_line(self, history_line):
utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
history_line = '{0}: {1};'.format(utc_datetime, history_line)
handler = Utils.openCdf(self.local_file)
try:
history_line = history_line + handler.history
Javier Vegas-Regidor
committed
history_line = history_line
Javier Vegas-Regidor
committed
handler.history = Utils.convert_to_ASCII_if_possible(history_line)
Javier Vegas-Regidor
committed
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
Javier Vegas-Regidor
committed
return 1 / conversion.factor, -conversion.offset