Newer
Older
"""Base data manager for Earth diagnostics"""
Javier Vegas-Regidor
committed
import threading
Javier Vegas-Regidor
committed
from earthdiagnostics.datafile import NetCDFFile as NCfile, StorageStatus, LocalStatus
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.variable_type import VariableType
"""
Class to manage the data repositories
Parameters
----------
config: Config
Javier Vegas-Regidor
committed
def __init__(self, config):
self.config = config
self.experiment = config.experiment
self._checked_vars = list()
self.variable_list = config.var_manager
Javier Vegas-Regidor
committed
self.lock = threading.Lock()
def _get_file_from_storage(self, filepath):
if filepath not in self.requested_files:
self.requested_files[filepath] = NCfile.from_storage(filepath, self.config.data_convention)
file_object = self.requested_files[filepath]
file_object.local_satatus = LocalStatus.PENDING
def _declare_generated_file(self, remote_file, domain, final_var, cmor_var, data_convention,
region, diagnostic, grid, var_type, original_var):
if remote_file not in self.requested_files:
self.requested_files[remote_file] = NCfile.to_storage(remote_file, data_convention)
file_object = self.requested_files[remote_file]
file_object.diagnostic = diagnostic
file_object.var_type = var_type
file_object.grid = grid
file_object.data_manager = self
file_object.domain = domain
file_object.var = original_var
file_object.final_name = final_var
file_object.cmor_var = cmor_var
file_object.region = region
file_object.storage_status = StorageStatus.PENDING
return file_object
@staticmethod
def _get_final_var_name(box, var):
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
return var
def get_varfolder(self, domain, var, grid=None, frequency=None):
"""Get variable folder name for <frequency>_<var_type> folder"""
if grid:
var = '{0}-{1}'.format(var, grid)
Javier Vegas-Regidor
committed
if domain in [ModelingRealms.ocean, ModelingRealms.seaIce, ModelingRealms.ocnBgchem]:
return DataManager._apply_fxh(var, self.experiment.ocean_timestep, frequency)
return DataManager._apply_fxh(var, self.experiment.atmos_timestep, frequency)
@staticmethod
def _apply_fxh(folder_name, timestep, frequency=None):
is_base_frequency = frequency is not None and frequency.frequency.endswith('hr')
if not is_base_frequency and timestep > 0:
return '{0}_f{1}h'.format(folder_name, timestep)
return folder_name
Javier Vegas-Regidor
committed
def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
Must be implementd by the derived classes. If not, this method will have no effect
Parameters
----------
domain: ModelingRealm
filepath: str
frequency: Frequency
var: str
grid: str
move_old: bool
vartype: VariableType
"""
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
Create the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
pass
"""Prepare the data to be used by Earth Diagnostics"""
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None):
"""
Request a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str or None, optional
box: Box or None, optional
frequency: Frequency or None, optional
vartype: VariableType or None, optional
Raises
------
NotImplementedError
If not implemented by derived classes
"""
raise NotImplementedError('Class must override request_chunk method')
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def request_year(self, diagnostic, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
"""
Request a given year for a variavle from a CMOR repository
Parameters
----------
diagnostic: Diagnostic
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str or None, optional
box: Box or None, optional
frequency: Frequency or None, optional
Returns
-------
DataFile
Raises
------
NotImplementedError
If not implemented by derived classes
"""
raise NotImplementedError('Class must override request_year method')
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN, diagnostic=None):
"""
Declare a variable chunk to be generated by a diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str or None, optional
region: Basin or None, optional
box: Box or None, optional
frequency: Frequency or None, optional
vartype: VariableType, optional
diagnostic: Diagnostic, optional
Returns
-------
DataFile
Raises
------
NotImplementedError
If not implemented by derived classes
"""
raise NotImplementedError('Class must override declare_chunk method')
def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
vartype=VariableType.MEAN, diagnostic=None):
"""
Declare a variable year to be generated by a diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str or None, optional
box: Box or None, optional
vartype: VariableType, optional
diagnostic: Diagnostic, optional
Returns
-------
DataFile
Raises
------
NotImplementedError
If not implemented by derived classes
"""
raise NotImplementedError('Class must override declare_year method')
def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN, possible_versions=None):
"""
Check if a file exists in the storage
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str or None, optional
box: Box or None, optional
frequency: Frequency or None, optional
vartype: VariableType, optional
possible_versions: iterable od str or None, optional
Raises
------
NotImplementedError
If not implemented by derived classes
Returns
-------
bool
"""
raise NotImplementedError('Class must override file_exists method')
"""
Class to manage unit conversions
Parameters
----------
source: str
destiny: str
factor: float
offset: float
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
"""Load conversions from the configuration file"""
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'r') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
Get the conversion factor and offset for two units.
The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
Javier Vegas-Regidor
committed
return 1 / conversion.factor, -conversion.offset