Newer
Older
"""Classes to manage variable definitions and aliases"""
from bscearth.utils.log import Log
from earthdiagnostics.constants import Basins
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
class VariableJsonException(Exception):
"""Exception to be raised when an error related to the json reading is encountered"""
"""
Class for translating variable alias into standard names and provide the correct description for them
"""
def __init__(self):
self._cmor_tables_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_tables')
self._aliases_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'variable_alias')
self._dict_variables = {}
def get_variable(self, original_name, silent=False):
Return the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()][1]
except KeyError:
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
return None
:return: CMOR variable list
:rtype: set[Variable]
"""
all_vars = set(self._dict_variables.values())
return sorted(all_vars, key=lambda var: var.short_name)
def get_variable_and_alias(self, original_name, silent=False):
"""
Return the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:type silent: bool
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()]
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
def load_variables(self, table_name):
Loads the CMOR csv and creates the variables dictionary
Parameters
----------
table_name: str
self._load_variable_list(table_name)
self._load_missing_defaults()
def _load_variable_list(self, table_name):
xlsx_path = self._get_xlsx_path(table_name)
if xlsx_path:
self._load_xlsx(xlsx_path)
return
json_folder = self._get_json_folder(table_name)
if os.path.isdir(json_folder):
self._load_json(json_folder)
return
csv_path = self._get_csv_path(table_name)
if os.path.isfile(csv_path):
raise Exception('Data convention {0} unknown'.format(table_name))
def _get_csv_path(self, table_name):
csv_table_path = os.path.join(self._cmor_tables_folder, '{0}.csv'.format(table_name))
return csv_table_path
def _get_json_folder(self, table_name):
json_folder = os.path.join(self._cmor_tables_folder, '{0}/Tables'.format(table_name))
return json_folder
def _load_file(self, csv_table_path, default=False):
with open(self._get_csv_path(csv_table_path), 'r') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Variable':
continue
var = Variable()
var.parse_csv(line)
if not var.short_name or var.short_name.lower() in self._dict_variables:
var.default = default
self._dict_variables[var.short_name.lower()] = var
def _load_json(self, json_folder):
for file_name in os.listdir(json_folder):
if file_name in ('CMIP6_grids.json', 'CMIP6_formula_terms.json'):
continue
json_data = open(os.path.join(json_folder, file_name)).read()
try:
data = json.loads(json_data)
except ValueError:
continue
if 'variable_entry' in data:
Log.debug('Parsing file {0}'.format(file_name))
table_id = data['Header']['table_id'][6:]
table = CMORTable(table_id, Frequency(data['Header']['frequency']), data['Header']['table_date'])
self.tables[table_id] = table
self._load_json_variables(data['variable_entry'], table)
def _load_json_variables(self, json_data, table):
for short_name in json_data.keys():
if short_name == 'ta19':
pass
short_name = str.strip(str(short_name))
if short_name.lower() in self._dict_variables:
self._dict_variables[short_name.lower()].tables.append(table)
continue
variable = Variable()
try:
variable.parse_json(json_data[short_name], short_name)
self._dict_variables[variable.short_name.lower()] = variable
except VariableJsonException:
Log.error('Could not read variable {0}'.format(short_name))
def _load_known_aliases(self, table_name):
self._load_alias_csv('default')
self._load_alias_csv(table_name)
def _load_alias_csv(self, filename):
file_path = self._get_aliases_csv_path(filename)
if not os.path.isfile(file_path):
return
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Aliases':
continue
aliases = line[0].split(':')
if line[1] not in aliases:
aliases.append(line[1])
cmor_vars = []
for alias in aliases:
if alias.lower() in self._dict_variables:
cmor_vars.append(self._dict_variables[alias.lower()])
if len(cmor_vars) == 0:
Log.error('Aliases {0} could not be mapped to any variable'.format(aliases))
continue
elif len(cmor_vars) > 1:
non_default = [var for var in cmor_vars if not var.default]
if len(non_default) == 1:
for default in [var for var in cmor_vars if var not in non_default]:
del self._dict_variables[default.short_name.lower()]
cmor_vars = non_default
else:
Log.error('Aliases {0} can be be mapped to multiple variables '
'[{1}]'.format(aliases, ', '.join(map(str, cmor_vars))))
cmor_var = cmor_vars[0]
for alias in aliases:
if alias != cmor_var.short_name and alias in self._dict_variables:
Log.error('Alias {0} for variable {1} is already a different '
'variable!'.format(alias, cmor_var.short_name))
continue
alias_object = VariableAlias(alias)
if line[2]:
alias_object.basin = Basins().parse(line[2])
if line[3]:
alias_object.grid = line[3]
cmor_var.known_aliases.append(alias_object)
def _get_aliases_csv_path(self, filename):
csv_table_path = os.path.join(self._aliases_folder, '{0}.csv'.format(filename))
return csv_table_path
def _construct_aliases_dict(self):
self._dict_aliases = {}
for cmor_var_name in self._dict_variables:
cmor_var = self._dict_variables[cmor_var_name]
if cmor_var_name not in cmor_var.known_aliases:
cmor_var.known_aliases.append(VariableAlias(cmor_var_name))
for alias in cmor_var.known_aliases:
self._dict_aliases[alias.alias] = (alias, cmor_var)
def _get_xlsx_path(self, table_name):
xlsx_table_path = os.path.join(self._cmor_tables_folder, '{0}.xlsx'.format(table_name))
if os.path.isfile(xlsx_table_path):
return xlsx_table_path
xlsx_table_path = os.path.join(self._cmor_tables_folder, table_name, 'etc', '*.xlsx')
xlsx_table_path = glob.glob(xlsx_table_path)
if len(xlsx_table_path) == 1:
return xlsx_table_path[0]
return None
def _load_xlsx(self, xlsx_table_path):
excel = openpyxl.load_workbook(xlsx_table_path, True)
table_data = {}
data_sheet = excel.worksheets[0]
for row in data_sheet.rows:
if row[1].value in excel.sheetnames:
table_data[row[1].value] = (Frequency(row[2].value), 'Date missing')
for sheet_name in excel.sheetnames:
try:
sheet = excel.get_sheet_by_name(sheet_name)
if sheet.title == 'Primday':
pass
table_frequency, table_date = table_data[sheet.title]
table = CMORTable(sheet.title, table_frequency, table_date)
for row in sheet.rows:
if row[0].value == 'Priority' or not row[5].value:
continue
cmor_name = row[11].value
if not cmor_name:
cmor_name = row[5].value
priority = int(row[0].value)
bsc_commitment = row[30].value
if bsc_commitment is not None and bsc_commitment.strip().lower() == 'false':
priority = priority + 3
if cmor_name.lower() in self._dict_variables:
var = self._dict_variables[cmor_name.lower()]
else:
var = Variable()
var.short_name = cmor_name
var.standard_name = row[6].value
var.long_name = row[1].value
var.domain = self._process_modelling_realm(var, row[12].value)
var.units = row[2].value
self._dict_variables[var.short_name.lower()] = var
var.add_table(table, priority)
except Exception as ex:
Log.error('Table {0} can not be loaded: {1}', sheet_name, ex)
import traceback
traceback.print_exc()
@staticmethod
def _process_modelling_realm(var, value):
if value is None:
value = ''
modelling_realm = value.split(' ')
def _load_missing_defaults(self):
self._load_file('default', True)
class Variable(object):
"""
Class to characterize a CMOR variable.
It also contains the static method to make the match between the original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
"""
def __str__(self):
return '{0} ({1})'.format(self.standard_name, self.short_name)
def __repr__(self):
return '{0} ({1})'.format(self.standard_name, self.short_name)
def __init__(self):
self.short_name = None
self.standard_name = None
self.long_name = None
self.units = None
self.valid_min = None
self.valid_max = None
self.grid = None
self.default = False
def add_table(self, table, priority=None):
self.tables.append((table, priority))
def parse_json(self, json_var, key):
if 'out_name' in json_var:
self.short_name = json_var['out_name'].strip()
raise VariableJsonException('Variable {0} has no out name defined'.format(key))
self.standard_name = json_var['standard_name'].strip()
self.long_name = json_var['long_name'].strip()
domain = json_var['modeling_realm'].split(' ')
Javier Vegas-Regidor
committed
self.domain = self.get_modelling_realm(domain)
self.valid_min = json_var['valid_min'].strip()
self.valid_max = json_var['valid_max'].strip()
self.units = json_var['units'].strip()
if 'priority' in json_var:
self.priority = int(json_var['priority'].strip())
elif 'primavera_priority' in json_var:
self.priority = int(json_var['primavera_priority'].strip())
else:
self.priority = 1
Javier Vegas-Regidor
committed
def get_modelling_realm(self, domains):
if len(domains) > 1:
Log.warning('Multiple modeling realms assigned to variable {0}: {1}. ', self, domains)
parsed = []
for domain in domains:
parsed.append(ModelingRealms.parse(domain))
selected = self._select_most_specific(parsed)
if selected:
Log.warning('We will use {0} as it is the most specific', selected)
return selected
Log.warning('We will use {0} as it is the first on the list and there is no one that is more specific',
parsed[0])
return parsed[0]
Javier Vegas-Regidor
committed
Log.warning('Variable {0} has no modeling realm defined'.format(self.short_name))
return None
else:
return ModelingRealms.parse(domains[0])
def parse_csv(self, var_line):
"""
Fill the object information from a csv line
Parameters
----------
var_line: list of str
"""
self.short_name = var_line[1].strip()
self.standard_name = var_line[2].strip()
self.long_name = var_line[3].strip()
Javier Vegas-Regidor
committed
self.domain = ModelingRealms.parse(var_line[4].strip())
self.basin = Basins().parse(var_line[5])
self.units = var_line[6].strip()
self.valid_min = var_line[7].strip()
self.valid_max = var_line[8].strip()
self.grid = var_line[9].strip()
def get_table(self, frequency, data_convention):
"""
Get a table object given the frequency and data_covention
If the variable does not contain the table information, it uses the domain to make a guess
Parameters
----------
frequency: Frequency
data_convention: str
Returns
-------
CMORTable
Raises
------
ValueError
If a table can not be deduced from the given parameters
"""
if table.frequency == frequency:
return table
if self.domain:
table_name = self.domain.get_table_name(frequency, data_convention)
return CMORTable(table_name, frequency, 'December 2013')
raise ValueError('Can not get table for {0} and frequency {1}'.format(self, frequency))
@staticmethod
def _select_most_specific(parsed):
Javier Vegas-Regidor
committed
parsed = set(parsed)
if {ModelingRealms.land, ModelingRealms.landIce} == parsed:
return ModelingRealms.landIce
if {ModelingRealms.seaIce, ModelingRealms.ocean} == parsed:
return ModelingRealms.seaIce
if {ModelingRealms.atmos, ModelingRealms.atmosChem} == parsed:
return ModelingRealms.atmosChem
if {ModelingRealms.ocean, ModelingRealms.ocnBgchem} == parsed:
return ModelingRealms.ocnBgchem
return None
class VariableAlias(object):
"""
Class to characterize a CMOR variable. It also contains the static method to make the match between thje original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
Parameters
----------
alias: str
"""
def __init__(self, alias):
self.alias = alias
self.basin = None
self.grid = None
def __str__(self):
string = self.alias
if self.basin:
string += ' Basin: {0}'.format(self.basin)
if self.grid:
string += ' Grid: {0}'.format(self.grid)
return string
"""
Class to represent a CMOR table
Parameters
----------
name: str
frequency: Frequency
date: str
"""
def __init__(self, name, frequency, date):
self.frequency = Frequency.parse(frequency)
def __str__(self):
return self.name
def __repr__(self):
return '{0.name} ({0.frequency}, {0.date}'.format(self)
def __lt__(self, other):
return self.name < other.name