Newer
Older
# coding: utf-8
import csv
import shutil
from datetime import datetime
import numpy as np
import os
from bscearth.utils.log import Log
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.modelingrealm import ModelingRealms
class LocalStatus(object):
PENDING = 0
DOWNLOADING = 1
READY = 2
FAILED = 3
NOT_REQUESTED = 4
class StorageStatus(object):
PENDING = 0
UPLOADING = 1
READY = 2
FAILED = 3
def __init__(self):
super(DataFile, self).__init__()
self.remote_file = None
self.local_file = None
self.domain = None
self.var = None
self.cmor_var = None
self.frequency = None
self.data_convention = None
self.grid = None
self.data_manager = None
self.var_type = VariableType.MEAN
self._local_status = LocalStatus.NOT_REQUESTED
self._storage_status = StorageStatus.READY
self._modifiers = []
def unsubscribe(self, who):
super(DataFile, self).unsubscribe(who)
if self.local_status == LocalStatus.READY and len(self.subscribers) == 0:
os.remove(self.local_file)
def upload_required(self):
return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING
def download_required(self):
if not self.local_status == LocalStatus.PENDING:
return False
if self.storage_status == StorageStatus.READY:
return True
if self.has_modifiers():
print 'Scheduling because has modifiers'
return True
def add_modifier(self, diagnostic):
self._modifiers.append(diagnostic)
def has_modifiers(self):
return len(self._modifiers) > 0
def ready_to_run(self, diagnostic):
if not self.local_status == LocalStatus.READY:
return False
if len(self._modifiers) == 0:
return True
return self._modifiers[0] is diagnostic
@property
def local_status(self):
return self._local_status
@local_status.setter
def local_status(self, value):
if self._local_status == value:
return
self.dispatch(self)
@property
def storage_status(self):
return self._storage_status
@storage_status.setter
def storage_status(self, value):
if self._storage_status == value:
return
self.dispatch(self)
@classmethod
def from_storage(cls, filepath):
file_object = cls()
file_object.remote_file = filepath
file_object.local_status = LocalStatus.PENDING
return file_object
@classmethod
new_object.remote_file = remote_file
new_object.storage_status = StorageStatus.PENDING
def download(self):
raise NotImplementedError()
Utils.convert2netcdf4(self.local_file)
self._correct_metadata()
self._prepare_region()
self._rename_coordinate_variables()
def upload(self):
self.storage_status = StorageStatus.UPLOADING
Utils.copy_file(self.local_file, self.remote_file)
Log.info('File {0} uploaded!', self.remote_file)
def set_local_file(self, local_file, diagnostic=None):
if diagnostic in self._modifiers:
self._modifiers.remove(diagnostic)
self.local_status = LocalStatus.READY
def _correct_metadata(self):
if not self.cmor_var:
return
handler = Utils.openCdf(self.local_file)
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
var_handler = handler.variables[self.var]
self._fix_variable_name(var_handler)
handler.modeling_realm = self.cmor_var.domain.name
table = self.cmor_var.get_table(self.frequency, self.data_convention)
handler.table_id = 'Table {0} ({1})'.format(table.name, table.date)
if self.cmor_var.units:
self._fix_units(var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(handler)
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type)
def _fix_variable_name(self, var_handler):
var_handler.standard_name = self.cmor_var.standard_name
var_handler.long_name = self.cmor_var.long_name
var_handler.short_name = self.cmor_var.short_name
def _fix_values_metadata(self, var_type):
if self.cmor_var.valid_min != '':
valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min)
else:
valid_min = ''
if self.cmor_var.valid_max != '':
valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max)
else:
valid_max = ''
Utils.nco.ncatted(input=self.local_file, output=self.local_file,
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
options='-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.var, var_type.char,
valid_min, valid_max))
def _fix_coordinate_variables_metadata(self, handler):
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if self.domain == ModelingRealms.ocean:
handler.variables['lev'].standard_name = 'depth'
if 'lon' in handler.variables:
handler.variables['lon'].short_name = 'lon'
handler.variables['lon'].standard_name = 'longitude'
if 'lat' in handler.variables:
handler.variables['lat'].short_name = 'lat'
handler.variables['lat'].standard_name = 'latitude'
def _fix_units(self, var_handler):
if 'units' not in var_handler.ncattrs():
return
if var_handler.units == '-':
var_handler.units = '1.0'
if var_handler.units == 'PSU':
var_handler.units = 'psu'
if var_handler.units == 'C' and self.cmor_var.units == 'K':
var_handler.units = 'deg_C'
if self.cmor_var.units != var_handler.units:
self._convert_units(var_handler)
var_handler.units = self.cmor_var.units
def _convert_units(self, var_handler):
try:
Utils.convert_units(var_handler, self.cmor_var.units)
except ValueError as ex:
Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
self.cmor_var.short_name)
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
self.cmor_var.units)
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
def _prepare_region(self):
if not self.region:
return
if not os.path.exists(self.remote_file):
self._add_region_dimension_to_var()
else:
self._update_var_with_region_data()
self._correct_metadata()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O --fix_rec_dmn region')
def _update_var_with_region_data(self):
temp = TempFile.get()
shutil.copyfile(self.remote_file, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(self.local_file)
value = handler_send.variables[self.var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == self.region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = self.region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[self.var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, self.local_file)
handler = Utils.openCdf(self.local_file)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = self.region
original_var = handler.variables[self.var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.var))
Utils.rename_variable(self.local_file, 'new_var', self.var)
def _rename_coordinate_variables(self):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = 'lat'
variables['nav_lon_grid_V'] = 'lon'
variables['nav_lat_grid_U'] = 'lat'
variables['nav_lon_grid_U'] = 'lon'
variables['nav_lat_grid_T'] = 'lat'
variables['nav_lon_grid_T'] = 'lon'
Utils.rename_variables(self.local_file, variables, False, True)
def add_diagnostic_history(self):
if not self.diagnostic:
return
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,
self._add_history_line(history_line)
def add_cmorization_history(self):
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)
self._add_history_line(history_line)
def _add_history_line(self, history_line):
utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
history_line = '{0}: {1};'.format(utc_datetime, history_line)
handler = Utils.openCdf(self.local_file)
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
try:
history_line = history_line + handler.history
except AttributeError:
history_line = history_line
handler.history = Utils.convert_to_ASCII_if_possible(history_line)
handler.close()
class UnitConversion(object):
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
cls._dict_conversions = dict()
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
return None, None
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
return 1 / conversion.factor, -conversion.offset
else:
return None, None
class THREDDSData(DataFile):
def download(self):
self.local_path = TempFile.get()
Utils.execute_shell_command(['nccopy', '-s', '-d', '-4', self.remote_file, self.local_path])
if not Utils.check_netcdf_file(self.local_path):
self.message = 'Can not retrieve {0} from server'.format(self.remote_file)
self.storage_status = LocalStatus.FAILED
self.status = LocalStatus.READY
class NetCDFFile(DataFile):
def download(self):
try:
self.local_status = LocalStatus.DOWNLOADING
if not self.local_file:
self.local_file = TempFile.get()
Utils.copy_file(self.remote_file, self.local_file)
Log.info('File {0} ready!', self.remote_file)
self.local_status = LocalStatus.READY
except Exception as ex:
if os.path.isfile(self.local_file):
os.remove(self.local_file)
Log.error('File {0} not available: {1}', self.remote_file, ex)
self.local_status = LocalStatus.FAILED
try:
self.data_manager._create_link(self.domain, self.remote_file, self.frequency, self.var,
self.grid, False, self.var_type)
except Exception as ex:
Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file))