Newer
Older
# coding: utf-8
import csv
import shutil
from datetime import datetime
import numpy as np
import os
from bscearth.utils.log import Log
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.modelingrealm import ModelingRealms
class LocalStatus(object):
PENDING = 0
DOWNLOADING = 1
READY = 2
FAILED = 3
TO_COMPUTE = 4
class StorageStatus(object):
PENDING = 0
UPLOADING = 1
READY = 2
FAILED = 3
def __init__(self):
super(DataFile, self).__init__()
self.remote_file = None
self.local_file = None
self.domain = None
self.var = None
self.cmor_var = None
self.frequency = None
self.data_convention = None
self._local_status = LocalStatus.PENDING
self._storage_status = StorageStatus.READY
def unsubscribe(self, who):
super(DataFile, self).unsubscribe(who)
if self.local_status == LocalStatus.READY and len(self.subscribers) == 0:
os.remove(self.local_file)
@property
def local_status(self):
return self._local_status
@local_status.setter
def local_status(self, value):
self._local_status = value
self.dispatch(self, value)
@classmethod
def from_storage(cls, filepath):
file_object = cls()
file_object.remote_file = filepath
file_object.local_status = LocalStatus.PENDING
return file_object
@classmethod
def to_storage(cls, domain, var, cmor_var, data_convention, region):
new_object = cls()
new_object.domain = domain
new_object.var = var
new_object.cmor_var = cmor_var
new_object.region = region
new_object.frequency = None
new_object.data_convention = data_convention
new_object.storage_status = StorageStatus.PENDING
def download(self):
raise NotImplementedError()
def upload(self):
self.storage_status = StorageStatus.UPLOADING
Utils.convert2netcdf4(self.local_file)
self._correct_metadata()
self._prepare_region()
self._rename_coordinate_variables()
Utils.move_file(self.local_file, self.remote_file)
self.storage_status = StorageStatus.READY
def _correct_metadata(self):
if not self.cmor_var:
return
handler = Utils.openCdf(self.local_file)
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
var_handler = handler.variables[self.var]
self._fix_variable_name(var_handler)
handler.modeling_realm = self.cmor_var.domain.name
table = self.cmor_var.get_table(self.frequency, self.data_convention)
handler.table_id = 'Table {0} ({1})'.format(table.name, table.date)
if self.cmor_var.units:
self._fix_units(var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(handler)
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type)
def _fix_variable_name(self, var_handler):
var_handler.standard_name = self.cmor_var.standard_name
var_handler.long_name = self.cmor_var.long_name
var_handler.short_name = self.cmor_var.short_name
def _fix_values_metadata(self, var_type):
if self.cmor_var.valid_min != '':
valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_min)
else:
valid_min = ''
if self.cmor_var.valid_max != '':
valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.var, var_type.char, self.cmor_var.valid_max)
else:
valid_max = ''
Utils.nco.ncatted(input=self.local_file, output=self.local_file,
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
options='-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.var, var_type.char,
valid_min, valid_max))
def _fix_coordinate_variables_metadata(self, handler):
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if self.domain == ModelingRealms.ocean:
handler.variables['lev'].standard_name = 'depth'
if 'lon' in handler.variables:
handler.variables['lon'].short_name = 'lon'
handler.variables['lon'].standard_name = 'longitude'
if 'lat' in handler.variables:
handler.variables['lat'].short_name = 'lat'
handler.variables['lat'].standard_name = 'latitude'
def _fix_units(self, var_handler):
if 'units' not in var_handler.ncattrs():
return
if var_handler.units == '-':
var_handler.units = '1.0'
if var_handler.units == 'PSU':
var_handler.units = 'psu'
if var_handler.units == 'C' and self.cmor_var.units == 'K':
var_handler.units = 'deg_C'
if self.cmor_var.units != var_handler.units:
self._convert_units(var_handler)
var_handler.units = self.cmor_var.units
def _convert_units(self, var_handler):
try:
Utils.convert_units(var_handler, self.cmor_var.units)
except ValueError as ex:
Log.warning('Can not convert {3} from {0} to {1}: {2}', var_handler.units, self.cmor_var.units, ex,
self.cmor_var.short_name)
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
self.cmor_var.units)
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
def _prepare_region(self):
if not self.region:
return
if not os.path.exists(self.remote_file):
self._add_region_dimension_to_var()
else:
self._update_var_with_region_data()
self._correct_metadata()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O --fix_rec_dmn region')
def _update_var_with_region_data(self):
temp = TempFile.get()
shutil.copyfile(self.remote_file, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(self.local_file)
value = handler_send.variables[self.var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == self.region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = self.region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[self.var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, self.local_file)
handler = Utils.openCdf(self.local_file)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = self.region
original_var = handler.variables[self.var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.var))
Utils.rename_variable(self.local_file, 'new_var', self.var)
def _rename_coordinate_variables(self):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = 'lat'
variables['nav_lon_grid_V'] = 'lon'
variables['nav_lat_grid_U'] = 'lat'
variables['nav_lon_grid_U'] = 'lon'
variables['nav_lat_grid_T'] = 'lat'
variables['nav_lon_grid_T'] = 'lon'
Utils.rename_variables(self.local_file, variables, False, True)
def add_diagnostic_history(self, diagnostic):
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'Diagnostic {1} calculated with EarthDiagnostics version {0}'.format(EarthDiags.version,
diagnostic)
self._add_history_line(history_line)
def add_cmorization_history(self):
from earthdiagnostics.earthdiags import EarthDiags
history_line = 'CMORized with Earthdiagnostics version {0}'.format(EarthDiags.version)
self._add_history_line(history_line)
def _add_history_line(self, history_line):
utc_datetime = 'UTC ' + datetime.utcnow().isoformat()
history_line = '{0}: {1};'.format(utc_datetime, history_line)
handler = Utils.openCdf(self.local_file)
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
try:
history_line = history_line + handler.history
except AttributeError:
history_line = history_line
handler.history = Utils.convert_to_ASCII_if_possible(history_line)
handler.close()
class UnitConversion(object):
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
cls._dict_conversions = dict()
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
return None, None
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
return 1 / conversion.factor, -conversion.offset
else:
return None, None
class THREDDSData(DataFile):
def download(self):
self.local_path = TempFile.get()
Utils.execute_shell_command(['nccopy', '-s', '-d', '-4', self.remote_file, self.local_path])
if not Utils.check_netcdf_file(self.local_path):
self.message = 'Can not retrieve {0} from server'.format(self.remote_file)
self.storage_status = LocalStatus.FAILED
self.status = LocalStatus.READY
class NetCDFFile(DataFile):
def download(self):
try:
self.local_status = LocalStatus.DOWNLOADING
if not self.local_file:
self.local_file = TempFile.get()
Utils.copy_file(self.remote_file, self.local_file)
Log.info('File {0} ready!', self.remote_file)
self.local_status = LocalStatus.READY
except Exception as ex:
os.remove(self.local_file)
Log.error('File {0} not available: {1}', self.remote_file, ex)
self.local_status = LocalStatus.FAILED