Newer
Older
import os
from autosubmit.config.log import Log
from earthdiagnostics.constants import Basins
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealm
class VariableJsonException(Exception):
pass
class SingletonType(type):
def __call__(cls, *args):
try:
return cls.__instance
except AttributeError:
cls.__instance = super(SingletonType, cls).__call__(*args)
return cls.__instance
__metaclass__ = SingletonType
def __init__(self):
self._cmor_tables_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_tables')
self._aliases_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'variable_alias')
self._dict_variables = {}
def get_variable(self, original_name, silent=False):
"""
Returns the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()][1]
except KeyError:
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
return None
def get_variable_and_alias(self, original_name, silent=False):
"""
Returns the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:type silent: bool
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()]
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
def load_variables(self, table_name):
Loads the CMOR csv and creates the variables dictionary
self._load_variable_list(table_name)
self._load_missing_defaults()
self._load_known_aliases()
self._construct_aliases_dict()
def _load_variable_list(self, table_name):
json_folder = self._get_json_folder(table_name)
if os.path.isdir(json_folder):
self._load_json(json_folder)
return
xlsx_path = self._get_xlsx_path(table_name)
if os.path.isfile(xlsx_path):
self._load_xlsx(table_name)
return
csv_path = self._get_csv_path(table_name)
if os.path.isfile(csv_path):
raise Exception('Data convention {0} unknown'.format(table_name))
def _get_csv_path(self, table_name):
csv_table_path = os.path.join(self._cmor_tables_folder, '{0}.csv'.format(table_name))
return csv_table_path
def _get_json_folder(self, table_name):
json_folder = os.path.join(self._cmor_tables_folder, '{0}/Tables'.format(table_name))
return json_folder
def _load_file(self, csv_table_path, default=False):
with open(self._get_csv_path(csv_table_path), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Variable':
continue
var = Variable()
var.parse_csv(line)
if not var.short_name or var.short_name.lower() in self._dict_variables:
var.default = default
self._dict_variables[var.short_name.lower()] = var
def _load_json(self, json_folder):
for file_name in os.listdir(json_folder):
if file_name in ('CMIP6_grids.json', 'CMIP6_formula_terms.json'):
continue
json_data = open(os.path.join(json_folder, file_name)).read()
data = json.loads(json_data)
if 'variable_entry' in data:
Log.debug('Parsing file {0}'.format(file_name))
table = CMORTable(data['Header']['table_id'][6:],
Frequency(data['Header']['frequency']),
data['Header']['table_date'])
self._load_json_variables(data['variable_entry'], table)
def _load_json_variables(self, json_data, table):
for short_name in json_data.keys():
if short_name.lower() in self._dict_variables:
self._dict_variables[short_name.lower()].tables.append(table)
continue
variable = Variable()
try:
variable.parse_json(json_data[short_name], short_name)
variable.tables.append(table)
self._dict_variables[variable.short_name.lower()] = variable
except VariableJsonException:
Log.error('Could not read variable {0}'.format(short_name))
def _load_known_aliases(self):
with open(self._get_aliases_csv_path(), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Aliases':
continue
aliases = line[0].split(':')
if line[1] not in aliases:
aliases.append(line[1])
cmor_vars = []
for alias in aliases:
if alias.lower() in self._dict_variables:
cmor_vars.append(self._dict_variables[alias.lower()])
if len(cmor_vars) == 0:
Log.error('Aliases {0} could not be mapped to any variable'.format(aliases))
continue
elif len(cmor_vars) > 1:
non_default = [var for var in cmor_vars if not var.default]
if len(non_default) == 1:
for default in [var for var in cmor_vars if var not in non_default]:
del self._dict_variables[default.short_name.lower()]
cmor_vars = non_default
else:
Log.error('Aliases {0} can be be mapped to multiple variables '
'[{1}]'.format(aliases, ', '.join(map(str, cmor_vars))))
cmor_var = cmor_vars[0]
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
for alias in aliases:
if alias != cmor_var.short_name and alias in self._dict_variables:
Log.error('Alias {0} for variable {1} is already a different '
'variable!'.format(alias, cmor_var.short_name))
continue
alias_object = VariableAlias(alias)
if line[2]:
alias_object.basin = Basins.parse(line[2])
if line[3]:
alias_object.grid = line[3]
cmor_var.known_aliases.append(alias_object)
def _get_aliases_csv_path(self):
csv_table_path = os.path.join(self._aliases_folder, 'default.csv')
return csv_table_path
def _construct_aliases_dict(self):
self._dict_aliases = {}
for cmor_var_name in self._dict_variables:
cmor_var = self._dict_variables[cmor_var_name]
if cmor_var_name not in cmor_var.known_aliases:
cmor_var.known_aliases.append(VariableAlias(cmor_var_name))
for alias in cmor_var.known_aliases:
self._dict_aliases[alias.alias] = (alias, cmor_var)
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def _get_xlsx_path(self, table_name):
xlsx_table_path = os.path.join(self._cmor_tables_folder, '{0}.xlsx'.format(table_name))
return xlsx_table_path
def _load_xlsx(self, table_name):
xlsx_table_path = os.path.join(self._cmor_tables_folder, '{0}.xlsx'.format(table_name))
excel = openpyxl.load_workbook(xlsx_table_path, True)
for sheet_name in excel.sheetnames:
sheet = excel.get_sheet_by_name(sheet_name)
if sheet['A1'].value != 'Priority':
continue
for row in sheet.rows:
if row[0].value == 'Priority' or not row[5].value:
continue
if row[5].value.lower() in self._dict_variables:
self._dict_variables[row[5].value.lower()].tables.append(sheet.title)
continue
var = Variable()
var.short_name = row[5].value
var.standard_name = row[6].value
var.long_name = row[1].value
self._process_modelling_realm(var, row[12].value)
var.units = row[2].value
var.tables.append(sheet.title)
self._dict_variables[var.short_name.lower()] = var
def _process_modelling_realm(self, var, value):
if value is None:
value = ''
modelling_realm = value.split(' ')
if len(modelling_realm) > 1:
Log.warning('Multiple modeling realms assigned to variable {0}: {1}. '
'We wil use first ({1[0]}) as modelling realm'.format(var.short_name, modelling_realm))
if not modelling_realm[0]:
Log.warning('Variable {0} has no modeling realm defined'.format(var.short_name))
else:
var.domain = ModelingRealm(modelling_realm[0])
def _load_missing_defaults(self):
self._load_file('default', True)
class Variable(object):
"""
Class to characterize a CMOR variable. It also contains the static method to make the match between thje original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
"""
def __str__(self):
return '{0} ({1})'.format(self.standard_name, self.short_name)
def __init__(self):
self.short_name = None
self.standard_name = None
self.long_name = None
self.units = None
self.valid_min = None
self.valid_max = None
self.grid = None
self.default = False
def parse_json(self, json_var, key):
if 'out_name' in json_var:
self.short_name = json_var['out_name']
else:
raise VariableJsonException('Variable has no out name defined'.format(key))
self.standard_name = json_var['standard_name']
self.long_name = json_var['long_name']
domain = json_var['modeling_realm'].split(' ')
if len(domain) > 1:
Log.warning('Multiple modeling realms assigned to variable {0}: {1}. '
'We wil use first ({1[0]}) as domain'.format(self.short_name, domain))
if not domain[0]:
Log.warning('Variable {0} has no modeling realm defined'.format(self.short_name))
else:
self.domain = ModelingRealm(domain[0])
self.valid_min = json_var['valid_min']
self.valid_max = json_var['valid_max']
self.units = json_var['units']
def parse_csv(self, var_line):
self.short_name = var_line[1].strip()
self.standard_name = var_line[2].strip()
self.long_name = var_line[3].strip()
self.domain = ModelingRealm(var_line[4].strip())
self.basin = Basins.parse(var_line[5])
self.units = var_line[6].strip()
self.valid_min = var_line[7].strip()
self.valid_max = var_line[8].strip()
self.grid = var_line[9].strip()
for table in var_line[10].strip().split(':'):
def get_table(self, frequency, data_convention):
for table in self.tables:
if table.frequency == frequency:
return table
table_name = self.domain.get_table_name(frequency, data_convention)
return CMORTable(table_name, frequency, 'December 2013')
Javier Vegas-Regidor
committed
class VariableAlias(object):
"""
Class to characterize a CMOR variable. It also contains the static method to make the match between thje original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
"""
def __init__(self, alias):
self.alias = alias
self.basin = None
self.grid = None
def __str__(self):
string = self.alias
if self.basin:
string += ' Basin: {0}'.format(self.basin)
if self.grid:
string += ' Grid: {0}'.format(self.grid)
return string
def __init__(self, name, frequency, date):
self.name = name
self.frequency = frequency
def __str__(self):
return self.name