Newer
Older
import os
import numpy as np
from bscearth.utils.log import Log
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
class LocalStatus(object):
PENDING = 0
DOWNLOADING = 1
READY = 2
FAILED = 3
NOT_REQUESTED = 4
class StorageStatus(object):
PENDING = 0
UPLOADING = 1
READY = 2
FAILED = 3
def __init__(self):
super(DataFile, self).__init__()
self.remote_file = None
self.local_file = None
self.domain = None
self.var = None
self.cmor_var = None
self.frequency = None
self.data_convention = None
self._local_status = LocalStatus.NOT_REQUESTED
self._storage_status = StorageStatus.READY
self._modifiers = []
def __str__(self):
return 'Data file for {0}'.format(self.remote_file)
def unsubscribe(self, who):
super(DataFile, self).unsubscribe(who)
@property
def size(self):
if self._size is None:
self._get_size()
return self._size
def _get_size(self):
try:
if self.local_status == LocalStatus.READY:
self._size = os.path.getsize(self.local_file)
except Exception:
self._size = None
def _clean_local(self):
if self.local_status != LocalStatus.READY or len(self.suscribers) > 0 or self.upload_required():
return
Log.debug('File {0} no longer needed. Deleting from scratch...'.format(self.remote_file))
os.remove(self.local_file)
Log.debug('File {0} deleted from scratch'.format(self.remote_file))
self.local_file = None
self.local_status = LocalStatus.PENDING
def upload_required(self):
return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING
def download_required(self):
if not self.local_status == LocalStatus.PENDING:
return False
if self.storage_status == StorageStatus.READY:
return True
if self.has_modifiers():
return True
def add_modifier(self, diagnostic):
self._modifiers.append(diagnostic)
def has_modifiers(self):
return len(self._modifiers) > 0
def ready_to_run(self, diagnostic):
if not self.local_status == LocalStatus.READY:
return False
if len(self._modifiers) == 0:
return True
return self._modifiers[0] is diagnostic
@property
def local_status(self):
return self._local_status
@local_status.setter
def local_status(self, value):
if self._local_status == value:
return
self.dispatch(self)
@property
def storage_status(self):
return self._storage_status
@storage_status.setter
def storage_status(self, value):
if self._storage_status == value:
return
self.dispatch(self)
@classmethod
def from_storage(cls, filepath):
file_object = cls()
file_object.remote_file = filepath
file_object.local_status = LocalStatus.PENDING
return file_object
@classmethod
new_object.remote_file = remote_file
new_object.storage_status = StorageStatus.PENDING
raise NotImplementedError('Class must implement the download method')
def prepare_to_upload(self, rename_var):
Utils.convert2netcdf4(self.local_file)
if rename_var:
original_name = rename_var
else:
original_name = self.var
if self.final_name != original_name:
Utils.rename_variable(self.local_file, original_name, self.final_name)
self._correct_metadata()
self._prepare_region()
if self.region is not None:
self.upload()
def upload(self):
self.storage_status = StorageStatus.UPLOADING
Javier Vegas-Regidor
committed
Utils.copy_file(self.local_file, self.remote_file, save_hash=True)
except Exception as ex:
Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex)
self.storage_status = StorageStatus.FAILED
return
Log.info('File {0} uploaded!', self.remote_file)
try:
self.create_link()
except Exception as ex:
Log.warning('Link for file {0} can not be created: {1}', self.remote_file, ex)
def set_local_file(self, local_file, diagnostic=None, rename_var=''):
if diagnostic in self._modifiers:
self._modifiers.remove(diagnostic)
self.prepare_to_upload(rename_var)
self.local_status = LocalStatus.READY
handler = Utils.openCdf(self.local_file)
var_handler = handler.variables[self.final_name]
coords = set.intersection({'time', 'lev', 'lat', 'lon'}, set(handler.variables.keys()))
var_handler.coordinates = ' '.join(coords)
if not self.cmor_var:
handler.close()
return
self._fix_variable_name(var_handler)
handler.modeling_realm = self.cmor_var.domain.name
table = self.cmor_var.get_table(self.frequency, self.data_convention)
handler.table_id = 'Table {0} ({1})'.format(table.name, table.date)
if self.cmor_var.units:
self._fix_units(var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(handler)
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type)
def _fix_variable_name(self, var_handler):
var_handler.standard_name = self.cmor_var.standard_name
var_handler.long_name = self.cmor_var.long_name
def _fix_values_metadata(self, var_type, file_path=None):
if file_path is None:
file_path = self.local_file
valid_min = ''
valid_max = ''
if self.cmor_var is not None:
if self.cmor_var.valid_min != '':
valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_min)
if self.cmor_var.valid_max != '':
valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_max)
Utils.nco.ncatted(input=file_path, output=file_path,
options=('-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.final_name, var_type.char,
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def _fix_coordinate_variables_metadata(self, handler):
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if self.domain == ModelingRealms.ocean:
handler.variables['lev'].standard_name = 'depth'
if 'lon' in handler.variables:
handler.variables['lon'].short_name = 'lon'
handler.variables['lon'].standard_name = 'longitude'
if 'lat' in handler.variables:
handler.variables['lat'].short_name = 'lat'
handler.variables['lat'].standard_name = 'latitude'
def _fix_units(self, var_handler):
if 'units' not in var_handler.ncattrs():
return
if var_handler.units == '-':
var_handler.units = '1.0'
if var_handler.units == 'PSU':
var_handler.units = 'psu'
if var_handler.units == 'C' and self.cmor_var.units == 'K':
var_handler.units = 'deg_C'
if self.cmor_var.units != var_handler.units:
self._convert_units(var_handler)
var_handler.units = self.cmor_var.units
def _convert_units(self, var_handler):
try:
Utils.convert_units(var_handler, self.cmor_var.units)
except ValueError as ex:
Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
self.cmor_var.short_name)
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
self.cmor_var.units)
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
def _prepare_region(self):
if not self.region:
return
if not os.path.exists(self.remote_file):
self._add_region_dimension_to_var()
else:
self._update_var_with_region_data()
self._correct_metadata()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options=['--fix_rec_dmn region'])
def _update_var_with_region_data(self):
temp = TempFile.get()
shutil.copyfile(self.remote_file, temp)
handler = Utils.openCdf(temp)
var_handler = handler.variables[self.final_name]
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type, temp)
Utils.nco.ncks(input=temp, output=temp, options=['--mk_rec_dmn region'])
var_handler = handler.variables[self.final_name]
if hasattr(var_handler, 'valid_min'):
del var_handler.valid_min
if hasattr(var_handler, 'valid_max'):
del var_handler.valid_max
handler.sync()
cubes = iris.load(self.local_file)
for cube in cubes:
if self.final_name == cube.var_name:
value = cube.data
break
if isinstance(value, np.ma.MaskedArray):
value = np.ma.getdata(value)
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == self.region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = self.region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[self.final_name][..., basin_index] = np.multiply(np.ones(value.shape), value)
Utils.move_file(temp, self.local_file)
handler = Utils.openCdf(self.local_file)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = self.region
original_var = handler.variables[self.final_name]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.final_name))
Utils.rename_variable(self.local_file, 'new_var', self.final_name)
def _rename_coordinate_variables(self):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = 'lat'
variables['nav_lon_grid_V'] = 'lon'
variables['nav_lat_grid_U'] = 'lat'
variables['nav_lon_grid_U'] = 'lon'
variables['nav_lat_grid_T'] = 'lat'
variables['nav_lon_grid_T'] = 'lon'
Utils.rename_variables(self.local_file, variables, False, True)
def add_diagnostic_history(self):
if not self.diagnostic:
return
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,
self._add_history_line(history_line)
def add_cmorization_history(self):
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)
self._add_history_line(history_line)
def _add_history_line(self, history_line):
utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
history_line = '{0}: {1};'.format(utc_datetime, history_line)
handler = Utils.openCdf(self.local_file)
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
try:
history_line = history_line + handler.history
except AttributeError:
history_line = history_line
handler.history = Utils.convert_to_ASCII_if_possible(history_line)
handler.close()
class UnitConversion(object):
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
cls._dict_conversions = dict()
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
return None, None
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
return 1 / conversion.factor, -conversion.offset
else:
return None, None
class NetCDFFile(DataFile):
def download(self):
try:
self.local_status = LocalStatus.DOWNLOADING
Log.debug('Downloading file {0}...', self.remote_file)
if not self.local_file:
self.local_file = TempFile.get()
Javier Vegas-Regidor
committed
Utils.get_file_hash(self.remote_file, use_stored=True, save=True)
Utils.copy_file(self.remote_file, self.local_file)
Log.info('File {0} ready!', self.remote_file)
self.local_status = LocalStatus.READY
except Exception as ex:
if os.path.isfile(self.local_file):
os.remove(self.local_file)
Log.error('File {0} not available: {1}', self.remote_file, ex)
self.local_status = LocalStatus.FAILED
self.data_manager.create_link(self.domain, self.remote_file, self.frequency, self.final_name,
except Exception as ex:
Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file))
try:
if self.local_status == LocalStatus.READY:
self._size = os.path.getsize(self.local_file)
if self.storage_status == StorageStatus.READY:
self._size = os.path.getsize(self.remote_file)
except Exception:
self._size = None