Newer
Older
import os
import numpy as np
from bscearth.utils.log import Log
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
PENDING = 0
DOWNLOADING = 1
READY = 2
FAILED = 3
NOT_REQUESTED = 4
PENDING = 0
UPLOADING = 1
READY = 2
FAILED = 3
"""
Represent a data file
Must be derived for each concrete data file format
"""
def __init__(self):
super(DataFile, self).__init__()
self.remote_file = None
self.local_file = None
self.domain = None
self.var = None
self.cmor_var = None
self.frequency = None
self.data_convention = None
self._local_status = LocalStatus.NOT_REQUESTED
self._storage_status = StorageStatus.READY
self._modifiers = []
def __str__(self):
return 'Data file for {0}'.format(self.remote_file)
@property
def size(self):
if self._size is None:
self._get_size()
return self._size
def _get_size(self):
try:
if self.local_status == LocalStatus.READY:
self._size = os.path.getsize(self.local_file)
def clean_local(self):
if self.local_status != LocalStatus.READY or len(self.suscribers) > 0 or self.upload_required() or \
self.storage_status == StorageStatus.UPLOADING:
return
Log.debug('File {0} no longer needed. Deleting from scratch...'.format(self.remote_file))
os.remove(self.local_file)
Log.debug('File {0} deleted from scratch'.format(self.remote_file))
self.local_file = None
self.local_status = LocalStatus.PENDING
def only_suscriber(self, who):
if len(self._subscribers) != 1:
return
return who in self._subscribers
def upload_required(self):
return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING
def download_required(self):
if not self.local_status == LocalStatus.PENDING:
return False
if self.storage_status == StorageStatus.READY:
return True
if self.has_modifiers():
return True
def add_modifier(self, diagnostic):
self._modifiers.append(diagnostic)
def ready_to_run(self, diagnostic):
if not self.local_status == LocalStatus.READY:
return False
if len(self._modifiers) == 0:
return True
return self._modifiers[0] is diagnostic
@property
def local_status(self):
return self._local_status
@local_status.setter
def local_status(self, value):
if self._local_status == value:
return
self.dispatch(self)
@property
def storage_status(self):
return self._storage_status
@storage_status.setter
def storage_status(self, value):
if self._storage_status == value:
return
self.dispatch(self)
def from_storage(cls, filepath, data_convention):
file_object = cls()
file_object.remote_file = filepath
file_object.local_status = LocalStatus.PENDING
file_object.data_convention = data_convention
return file_object
@classmethod
def to_storage(cls, remote_file, data_convention):
new_object.remote_file = remote_file
new_object.storage_status = StorageStatus.PENDING
new_object.data_convention = data_convention
raise NotImplementedError('Class must implement the download method')
def prepare_to_upload(self, rename_var):
if self.data_convention in ('primavera', 'cmip6'):
self.lon_name = 'longitude'
self.lat_name = 'latitude'
else:
self.lon_name = 'longitude'
self.lat_name = 'latitude'
Utils.convert2netcdf4(self.local_file)
if rename_var:
original_name = rename_var
else:
original_name = self.var
if self.final_name != original_name:
Utils.rename_variable(self.local_file, original_name, self.final_name)
self._correct_metadata()
self._prepare_region()
if self.region is not None:
self.upload()
def upload(self):
self.storage_status = StorageStatus.UPLOADING
Javier Vegas-Regidor
committed
Utils.copy_file(self.local_file, self.remote_file, save_hash=True)
Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex)
self.storage_status = StorageStatus.FAILED
return
Log.info('File {0} uploaded!', self.remote_file)
def set_local_file(self, local_file, diagnostic=None, rename_var='', region=None):
if diagnostic in self._modifiers:
self._modifiers.remove(diagnostic)
if region is not None:
self.region = region.name
else:
self.region = None
self.prepare_to_upload(rename_var)
self.local_status = LocalStatus.READY
var_handler = handler.variables[self.final_name]
coords = set.intersection({'time', 'lev', self.lat_name, self.lon_name}, set(handler.variables.keys()))
var_handler.coordinates = ' '.join(coords)
if not self.cmor_var:
handler.close()
return
self._fix_variable_name(var_handler)
handler.modeling_realm = self.cmor_var.domain.name
table = self.cmor_var.get_table(self.frequency, self.data_convention)
handler.table_id = 'Table {0} ({1})'.format(table.name, table.date)
if self.cmor_var.units:
self._fix_units(var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(handler)
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type)
def _fix_variable_name(self, var_handler):
var_handler.standard_name = self.cmor_var.standard_name
var_handler.long_name = self.cmor_var.long_name
def _fix_values_metadata(self, var_type, file_path=None):
if file_path is None:
file_path = self.local_file
valid_min = ''
valid_max = ''
if self.cmor_var is not None:
if self.cmor_var.valid_min:
valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char,
self.cmor_var.valid_min)
if self.cmor_var.valid_max:
valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char,
self.cmor_var.valid_max)
Utils.nco.ncatted(input=file_path, output=file_path,
options=('-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.final_name, var_type.char,
def _fix_coordinate_variables_metadata(self, handler):
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if self.domain == ModelingRealms.ocean:
handler.variables['lev'].standard_name = 'depth'
if self.lon_name in handler.variables:
handler.variables[self.lon_name].short_name = self.lon_name
handler.variables[self.lon_name].standard_name = 'longitude'
if self.lat_name in handler.variables:
handler.variables[self.lat_name].short_name = self.lat_name
handler.variables[self.lat_name].standard_name = 'latitude'
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def _fix_units(self, var_handler):
if 'units' not in var_handler.ncattrs():
return
if var_handler.units == '-':
var_handler.units = '1.0'
if var_handler.units == 'PSU':
var_handler.units = 'psu'
if var_handler.units == 'C' and self.cmor_var.units == 'K':
var_handler.units = 'deg_C'
if self.cmor_var.units != var_handler.units:
self._convert_units(var_handler)
var_handler.units = self.cmor_var.units
def _convert_units(self, var_handler):
try:
Utils.convert_units(var_handler, self.cmor_var.units)
except ValueError as ex:
Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
self.cmor_var.short_name)
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
self.cmor_var.units)
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
def _prepare_region(self):
if not self.region:
return
if not os.path.exists(self.remote_file):
self._add_region_dimension_to_var()
else:
self._update_var_with_region_data()
self._correct_metadata()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options=['--fix_rec_dmn region'])
regions = handler.variables['region'][...].tolist()
if len(regions) > 1:
ordered_regions = sorted(regions)
print(regions)
print(ordered_regions)
new_indexes = [regions.index(region) for region in ordered_regions]
print(new_indexes)
for var in handler.variables.values():
if 'region' not in var.dimensions:
continue
index_region = var.dimensions.index('region')
var_values = var[...]
var_ordered = np.take(var_values, new_indexes, index_region)
var[...] = var_ordered
handler.close()
def _update_var_with_region_data(self):
temp = TempFile.get()
shutil.copyfile(self.remote_file, temp)
var_handler = handler.variables[self.final_name]
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type, temp)
Utils.nco.ncks(input=temp, output=temp, options=['--mk_rec_dmn region'])
var_handler = handler.variables[self.final_name]
if hasattr(var_handler, 'valid_min'):
del var_handler.valid_min
if hasattr(var_handler, 'valid_max'):
del var_handler.valid_max
handler.sync()
cubes = iris.load(self.local_file)
for cube in cubes:
if self.final_name == cube.var_name:
value = cube.data
break
if isinstance(value, np.ma.MaskedArray):
value = np.ma.getdata(value)
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == self.region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = self.region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[self.final_name][..., basin_index] = np.multiply(np.ones(value.shape), value)
Utils.move_file(temp, self.local_file)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = self.region
original_var = handler.variables[self.final_name]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('-x -v {0}'.format(self.final_name),))
Utils.rename_variable(self.local_file, 'new_var', self.final_name)
def _rename_coordinate_variables(self):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = self.lat_name
variables['nav_lon_grid_V'] = self.lon_name
variables['nav_lat_grid_U'] = self.lat_name
variables['nav_lon_grid_U'] = self.lon_name
variables['nav_lat_grid_T'] = self.lat_name
variables['nav_lon_grid_T'] = self.lon_name
Utils.rename_variables(self.local_file, variables, False, True)
def add_diagnostic_history(self):
if not self.diagnostic:
return
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,
self._add_history_line(history_line)
def add_cmorization_history(self):
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)
self._add_history_line(history_line)
def _add_history_line(self, history_line):
utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
history_line = '{0}: {1};'.format(utc_datetime, history_line)
try:
history_line = history_line + handler.history
except AttributeError:
history_line = history_line
handler.history = Utils.convert_to_ascii_if_possible(history_line)
handler.close()
class UnitConversion(object):
_dict_conversions = None
@classmethod
def load_conversions(cls):
cls._dict_conversions = dict()
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Add a conversion to the dictionary
Parameters
----------
conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Get the conversion factor and offset for two units.
The conversion has to be done in the following way:
Parameters
----------
input_units: str
output_units = str
Returns
-------
float
Factor
float
Offset
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
"""
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
return None, None
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
conversion = cls._dict_conversions[(new_unit, unit)]
return 1 / conversion.factor, -conversion.offset
class NetCDFFile(DataFile):
def download(self):
try:
self.local_status = LocalStatus.DOWNLOADING
Log.debug('Downloading file {0}...', self.remote_file)
if not self.local_file:
self.local_file = TempFile.get()
Javier Vegas-Regidor
committed
Utils.get_file_hash(self.remote_file, use_stored=True, save=True)
Utils.copy_file(self.remote_file, self.local_file)
if self.data_convention == 'meteofrance':
Log.debug('Converting variable names from meteofrance convention')
alt_coord_names = {'time_counter': 'time', 'time_counter_bounds': 'time_bnds',
'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i',
'y': 'j'}
Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False, rename_dimension=True)
Log.info('File {0} ready!', self.remote_file)
self.local_status = LocalStatus.READY
except Exception as ex:
if os.path.isfile(self.local_file):
os.remove(self.local_file)
Log.error('File {0} not available: {1}', self.remote_file, ex)
self.local_status = LocalStatus.FAILED
self.data_manager.create_link(self.domain, self.remote_file, self.frequency, self.final_name,
Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file))
try:
if self.local_status == LocalStatus.READY:
self._size = os.path.getsize(self.local_file)
if self.storage_status == StorageStatus.READY:
self._size = os.path.getsize(self.remote_file)