Newer
Older
from bscearth.utils.log import Log
from earthdiagnostics.constants import Basins
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
class VariableJsonException(Exception):
pass
class SingletonType(type):
def __call__(cls, *args):
try:
return cls.__instance
except AttributeError:
cls.__instance = super(SingletonType, cls).__call__(*args)
return cls.__instance
__metaclass__ = SingletonType
def __init__(self):
self._cmor_tables_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_tables')
self._aliases_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'variable_alias')
self._dict_variables = {}
def get_variable(self, original_name, silent=False):
"""
Returns the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()][1]
except KeyError:
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
return None
def get_all_variables(self):
"""
Returns all variables
:return: CMOR variable list
:rtype: set[Variable]
"""
return set(self._dict_variables.values())
def get_variable_and_alias(self, original_name, silent=False):
"""
Returns the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:type silent: bool
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()]
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
def load_variables(self, table_name):
Loads the CMOR csv and creates the variables dictionary
self._load_variable_list(table_name)
self._load_missing_defaults()
def _load_variable_list(self, table_name):
xlsx_path = self._get_xlsx_path(table_name)
if xlsx_path:
self._load_xlsx(xlsx_path)
return
json_folder = self._get_json_folder(table_name)
if os.path.isdir(json_folder):
self._load_json(json_folder)
return
csv_path = self._get_csv_path(table_name)
if os.path.isfile(csv_path):
raise Exception('Data convention {0} unknown'.format(table_name))
def _get_csv_path(self, table_name):
csv_table_path = os.path.join(self._cmor_tables_folder, '{0}.csv'.format(table_name))
return csv_table_path
def _get_json_folder(self, table_name):
json_folder = os.path.join(self._cmor_tables_folder, '{0}/Tables'.format(table_name))
return json_folder
def _load_file(self, csv_table_path, default=False):
with open(self._get_csv_path(csv_table_path), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Variable':
continue
var = Variable()
var.parse_csv(line)
if not var.short_name or var.short_name.lower() in self._dict_variables:
var.default = default
self._dict_variables[var.short_name.lower()] = var
def _load_json(self, json_folder):
for file_name in os.listdir(json_folder):
if file_name in ('CMIP6_grids.json', 'CMIP6_formula_terms.json'):
continue
json_data = open(os.path.join(json_folder, file_name)).read()
data = json.loads(json_data)
if 'variable_entry' in data:
Log.debug('Parsing file {0}'.format(file_name))
table = CMORTable(data['Header']['table_id'][6:],
Frequency(data['Header']['frequency']),
data['Header']['table_date'])
self._load_json_variables(data['variable_entry'], table)
def _load_json_variables(self, json_data, table):
for short_name in json_data.keys():
short_name = str.strip(str(short_name))
if short_name.lower() in self._dict_variables:
self._dict_variables[short_name.lower()].tables.append(table)
continue
variable = Variable()
try:
variable.parse_json(json_data[short_name], short_name)
variable.tables.append(table)
self._dict_variables[variable.short_name.lower()] = variable
except VariableJsonException:
Log.error('Could not read variable {0}'.format(short_name))
def _load_known_aliases(self, table_name):
self._load_alias_csv('default')
self._load_alias_csv(table_name)
def _load_alias_csv(self, filename):
file_path = self._get_aliases_csv_path(filename)
if not os.path.isfile(file_path):
return
with open(file_path, 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Aliases':
continue
aliases = line[0].split(':')
if line[1] not in aliases:
aliases.append(line[1])
cmor_vars = []
for alias in aliases:
if alias.lower() in self._dict_variables:
cmor_vars.append(self._dict_variables[alias.lower()])
if len(cmor_vars) == 0:
Log.error('Aliases {0} could not be mapped to any variable'.format(aliases))
continue
elif len(cmor_vars) > 1:
non_default = [var for var in cmor_vars if not var.default]
if len(non_default) == 1:
for default in [var for var in cmor_vars if var not in non_default]:
del self._dict_variables[default.short_name.lower()]
cmor_vars = non_default
else:
Log.error('Aliases {0} can be be mapped to multiple variables '
'[{1}]'.format(aliases, ', '.join(map(str, cmor_vars))))
cmor_var = cmor_vars[0]
for alias in aliases:
if alias != cmor_var.short_name and alias in self._dict_variables:
Log.error('Alias {0} for variable {1} is already a different '
'variable!'.format(alias, cmor_var.short_name))
continue
alias_object = VariableAlias(alias)
if line[2]:
alias_object.basin = Basins.parse(line[2])
if line[3]:
alias_object.grid = line[3]
cmor_var.known_aliases.append(alias_object)
def _get_aliases_csv_path(self, filename):
csv_table_path = os.path.join(self._aliases_folder, '{0}.csv'.format(filename))
return csv_table_path
def _construct_aliases_dict(self):
self._dict_aliases = {}
for cmor_var_name in self._dict_variables:
cmor_var = self._dict_variables[cmor_var_name]
if cmor_var_name not in cmor_var.known_aliases:
cmor_var.known_aliases.append(VariableAlias(cmor_var_name))
for alias in cmor_var.known_aliases:
self._dict_aliases[alias.alias] = (alias, cmor_var)
def _get_xlsx_path(self, table_name):
xlsx_table_path = os.path.join(self._cmor_tables_folder, '{0}.xlsx'.format(table_name))
if os.path.isfile(xlsx_table_path):
return xlsx_table_path
xlsx_table_path = os.path.join(self._cmor_tables_folder, table_name, 'etc', '*.xlsx')
xlsx_table_path = glob.glob(xlsx_table_path)
if len(xlsx_table_path) == 1:
return xlsx_table_path[0]
return None
def _load_xlsx(self, xlsx_table_path):
excel = openpyxl.load_workbook(xlsx_table_path, True)
table_data = {}
data_sheet = excel.worksheets[0]
for row in data_sheet.rows:
if row[1].value in excel.sheetnames:
table_data[row[1].value] = (Frequency(row[2].value), 'Date missing')
for sheet_name in excel.sheetnames:
try:
sheet = excel.get_sheet_by_name(sheet_name)
if sheet['A1'].value != 'Priority':
table_frequency, table_date = table_data[sheet.title]
table = CMORTable(sheet.title, table_frequency, table_date)
for row in sheet.rows:
if row[0].value == 'Priority' or not row[5].value:
continue
if row[5].value.lower() in self._dict_variables:
self._dict_variables[row[5].value.lower()].tables.append(table)
continue
var = Variable()
var.priority = row[0].value
var.short_name = row[5].value
var.standard_name = row[6].value
var.long_name = row[1].value
var.domain = self._process_modelling_realm(var, row[12].value)
var.units = row[2].value
var.tables.append(table)
self._dict_variables[var.short_name.lower()] = var
except Exception as ex:
Log.error('Table {0} can not be loaded: {1}', sheet_name, ex)
@staticmethod
def _process_modelling_realm(var, value):
if value is None:
value = ''
modelling_realm = value.split(' ')
def _load_missing_defaults(self):
self._load_file('default', True)
class Variable(object):
"""
Class to characterize a CMOR variable. It also contains the static method to make the match between thje original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
"""
def __str__(self):
return '{0} ({1})'.format(self.standard_name, self.short_name)
def __repr__(self):
return '{0} ({1})'.format(self.standard_name, self.short_name)
def __init__(self):
self.short_name = None
self.standard_name = None
self.long_name = None
self.units = None
self.valid_min = None
self.valid_max = None
self.grid = None
self.default = False
def parse_json(self, json_var, key):
if 'out_name' in json_var:
self.short_name = json_var['out_name'].strip()
else:
raise VariableJsonException('Variable has no out name defined'.format(key))
self.standard_name = json_var['standard_name'].strip()
self.long_name = json_var['long_name'].strip()
domain = json_var['modeling_realm'].split(' ')
Javier Vegas-Regidor
committed
self.domain = self.get_modelling_realm(domain)
self.valid_min = json_var['valid_min'].strip()
self.valid_max = json_var['valid_max'].strip()
self.units = json_var['units'].strip()
if 'priority' in json_var:
self.priority = int(json_var['priority'].strip())
elif 'primavera_priority' in json_var:
self.priority = int(json_var['primavera_priority'].strip())
else:
self.priority = 1
Javier Vegas-Regidor
committed
def get_modelling_realm(self, domains):
if len(domains) > 1:
Log.warning('Multiple modeling realms assigned to variable {0}: {1}. ', self, domains)
parsed = []
for domain in domains:
parsed.append(ModelingRealms.parse(domain))
selected = self._select_most_specific(parsed)
if selected:
Log.warning('We will use {0} as it is the most specific', selected)
return selected
Log.warning('We will use {0} as it is the first on the list and there is no one that is more specific',
parsed[0])
return parsed[0]
if not domains[0]:
Log.warning('Variable {0} has no modeling realm defined'.format(self.short_name))
return None
else:
return ModelingRealms.parse(domains[0])
def parse_csv(self, var_line):
self.short_name = var_line[1].strip()
self.standard_name = var_line[2].strip()
self.long_name = var_line[3].strip()
Javier Vegas-Regidor
committed
self.domain = ModelingRealms.parse(var_line[4].strip())
self.basin = Basins.parse(var_line[5])
self.units = var_line[6].strip()
self.valid_min = var_line[7].strip()
self.valid_max = var_line[8].strip()
self.grid = var_line[9].strip()
for table in var_line[10].strip().split(':'):
def get_table(self, frequency, data_convention):
for table in self.tables:
if table.frequency == frequency:
return table
if self.domain:
table_name = self.domain.get_table_name(frequency, data_convention)
return CMORTable(table_name, frequency, 'December 2013')
return self.tables[0]
@staticmethod
def _select_most_specific(parsed):
Javier Vegas-Regidor
committed
parsed = set(parsed)
if {ModelingRealms.land, ModelingRealms.landIce} == parsed:
return ModelingRealms.landIce
if {ModelingRealms.seaIce, ModelingRealms.ocean} == parsed:
return ModelingRealms.seaIce
if {ModelingRealms.atmos, ModelingRealms.atmosChem} == parsed:
return ModelingRealms.atmosChem
if {ModelingRealms.ocean, ModelingRealms.ocnBgchem} == parsed:
return ModelingRealms.ocnBgchem
return None
class VariableAlias(object):
"""
Class to characterize a CMOR variable. It also contains the static method to make the match between thje original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
"""
def __init__(self, alias):
self.alias = alias
self.basin = None
self.grid = None
def __str__(self):
string = self.alias
if self.basin:
string += ' Basin: {0}'.format(self.basin)
if self.grid:
string += ' Grid: {0}'.format(self.grid)
return string
def __init__(self, name, frequency, date):
self.frequency = Frequency.parse(frequency)
def __str__(self):
return self.name
def __repr__(self):
return '{0.name} ({0.frequency}, {0.date}'.format(self)
def __lt__(self, other):
return self.name < other.name