Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.variable_type import VariableType
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.datafile import NetCDFFile as NCfile, StorageStatus, LocalStatus
"""
Class to manage the data repositories
Javier Vegas-Regidor
committed
:param config:
:type config: Config
Javier Vegas-Regidor
committed
def __init__(self, config):
self.config = config
self.experiment = config.experiment
self._checked_vars = list()
Javier Vegas-Regidor
committed
self.lock = threading.Lock()
def _get_file_from_storage(self, filepath):
if filepath not in self.requested_files:
self.requested_files[filepath] = NCfile.from_storage(filepath)
file_object = self.requested_files[filepath]
file_object.local_satatus = LocalStatus.PENDING
def _declare_generated_file(self, remote_file, domain, final_var, cmor_var, data_convention,
region, diagnostic, grid, var_type, original_var):
if remote_file not in self.requested_files:
self.requested_files[remote_file] = NCfile.to_storage(remote_file)
file_object = self.requested_files[remote_file]
file_object.diagnostic = diagnostic
file_object.var_type = var_type
file_object.grid = grid
file_object.data_manager = self
file_object.domain = domain
file_object.var = original_var
file_object.final_name = final_var
file_object.cmor_var = cmor_var
file_object.region = region
file_object.data_convention = data_convention
file_object.storage_status = StorageStatus.PENDING
return file_object
@staticmethod
def _get_final_var_name(box, var):
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
return var
def get_varfolder(self, domain, var, grid=None):
if grid:
var = '{0}-{1}'.format(var, grid)
if domain in [ModelingRealms.ocean, ModelingRealms.seaIce]:
Javier Vegas-Regidor
committed
return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep)
Javier Vegas-Regidor
committed
return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep)
def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
freq_str = frequency.folder_name(vartype)
Javier Vegas-Regidor
committed
if not grid:
grid = 'original'
Javier Vegas-Regidor
committed
variable_folder = self.get_varfolder(domain, var)
vargrid_folder = self.get_varfolder(domain, var, grid)
Javier Vegas-Regidor
committed
self.lock.acquire()
try:
if grid == 'original':
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
if os.path.islink(link_path):
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
Utils.create_folder_tree(link_path)
else:
Javier Vegas-Regidor
committed
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
Utils.create_folder_tree(link_path)
default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))
if os.path.islink(default_path):
os.remove(default_path)
elif os.path.isdir(default_path):
shutil.move(default_path, original_path)
os.symlink(link_path, default_path)
if move_old and link_path not in self._checked_vars:
self._checked_vars.append(link_path)
old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep))
regex = re.compile(var + '_[0-9]{6,8}\.nc')
for filename in os.listdir(link_path):
if regex.match(filename):
Utils.create_folder_tree(old_path)
Utils.move_file(os.path.join(link_path, filename),
os.path.join(old_path, filename))
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
os.remove(link_path)
if not os.path.exists(filepath):
raise ValueError('Original file {0} does not exists'.format(filepath))
if not os.path.isdir(os.path.dirname(link_path)):
Utils.create_folder_tree(os.path.dirname(link_path))
relative_path = os.path.relpath(filepath, os.path.dirname(link_path))
os.symlink(relative_path, link_path)
Javier Vegas-Regidor
committed
self.lock.release()
# Overridable methods (not mandatory)
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
pass
def prepare(self):
"""
Prepares the data to be used by the diagnostic.
:return:
"""
pass
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str|NoneType
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency|NoneType
:return: path to the copy created on the scratch folder
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:rtype: str
"""
raise NotImplementedError('Class must override request_chunk method')
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
Javier Vegas-Regidor
committed
return 1 / conversion.factor, -conversion.offset