Newer
Older
"""Classes to manage variable definitions and aliases"""
from bscearth.utils.log import Log
from earthdiagnostics.constants import Basins
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
from concurrent.futures import ThreadPoolExecutor
class VariableJsonException(Exception):
"""Exception to be raised when an error related to the json reading is encountered"""
"""Class for translating variable alias into standard names and provide the correct description for them"""
def __init__(self):
self._cmor_tables_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_tables')
self._aliases_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'variable_alias')
self.clean()
def clean(self):
"""Clean all information contained in the variable manager"""
self._dict_variables = {}
def get_variable(self, original_name, silent=False):
Return the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()][1]
except KeyError:
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
return None
:return: CMOR variable list
:rtype: set[Variable]
"""
all_vars = set(self._dict_variables.values())
return sorted(all_vars, key=lambda var: var.short_name)
def get_variable_and_alias(self, original_name, silent=False):
"""
Return the cmor variable instance given a variable name
:param original_name: original variable's name
:type original_name: str
:param silent: if True, omits log warning when variable is not found
:type silent: bool
:return: CMOR variable
:rtype: Variable
"""
try:
return self._dict_aliases[original_name.lower()]
if not silent:
Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name))
def load_variables(self, table_name):
Load the CMOR csv and creates the variables dictionary
Parameters
----------
table_name: str
self._load_variable_list(table_name)
self._load_missing_defaults()
def _load_variable_list(self, table_name):
xlsx_path = self._get_xlsx_path(table_name)
if xlsx_path:
self._load_xlsx(xlsx_path)
return
json_folder = self._get_json_folder(table_name)
if os.path.isdir(json_folder):
self._load_json(json_folder)
return
csv_path = self._get_csv_path(table_name)
if os.path.isfile(csv_path):
raise Exception('Data convention {0} unknown'.format(table_name))
def _get_csv_path(self, table_name):
csv_table_path = os.path.join(self._cmor_tables_folder, '{0}.csv'.format(table_name))
return csv_table_path
def _get_json_folder(self, table_name):
json_folder = os.path.join(self._cmor_tables_folder, '{0}/Tables'.format(table_name))
return json_folder
def _load_file(self, csv_table_path, default=False):
with open(self._get_csv_path(csv_table_path), 'r') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Variable':
continue
var = Variable()
var.parse_csv(line)
if not var.short_name or var.short_name.lower() in self._dict_variables:
var.default = default
self.register_variable(var)
def register_variable(self, var):
"""
Register variable info
Parameters
----------
var: Variable
"""
self._dict_variables[var.short_name.lower()] = var
def _load_json(self, json_folder):
for file_name in os.listdir(json_folder):
if file_name in ('CMIP6_grids.json', 'CMIP6_formula_terms.json'):
continue
executor.submit(self._load_json_file, os.path.join(json_folder, file_name))
executor.shutdown(True)
def _load_json_file(self, json_path):
with open(json_path) as json_file:
json_data = json_file.read()
try:
data = json.loads(json_data)
except ValueError:
return
if 'variable_entry' in data:
Log.debug('Parsing file {0}'.format(json_path))
table_id = data['Header']['table_id'][6:]
table = CMORTable(table_id,
Frequency(data['variable_entry'].values()[0]['frequency']),
data['Header']['table_date'])
self.tables[table_id] = table
self._load_json_variables(data['variable_entry'], table)
def _load_json_variables(self, json_data, table):
for short_name in json_data.keys():
if short_name == 'ta19':
pass
short_name = str.strip(str(short_name))
if short_name.lower() in self._dict_variables:
self._dict_variables[short_name.lower()].tables.append(table)
continue
variable = Variable()
try:
variable.parse_json(json_data[short_name], short_name)
except VariableJsonException:
Log.error('Could not read variable {0}'.format(short_name))
def _load_known_aliases(self, table_name):
self._load_alias_csv('default')
self._load_alias_csv(table_name)
def _load_alias_csv(self, filename):
file_path = self._get_aliases_csv_path(filename)
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'Aliases':
continue
aliases = self._get_aliases(line)
cmor_vars = []
for alias in aliases:
if alias.lower() in self._dict_variables:
cmor_vars.append(self._dict_variables[alias.lower()])
if len(cmor_vars) == 0:
Log.error('Aliases {0} could not be mapped to any variable'.format(aliases))
continue
elif len(cmor_vars) > 1:
non_default = [var for var in cmor_vars if not var.default]
if len(non_default) == 1:
for default in [var for var in cmor_vars if var not in non_default]:
del self._dict_variables[default.short_name.lower()]
cmor_vars = non_default
else:
Log.error('Aliases {0} can be be mapped to multiple variables '
'[{1}]'.format(aliases, ', '.join(map(str, cmor_vars))))
cmor_var = cmor_vars[0]
self._register_aliases(aliases, cmor_var, line)
@staticmethod
def _get_aliases(line):
aliases = line[0].split(':')
if line[1] not in aliases:
aliases.append(line[1])
return aliases
def _register_aliases(self, aliases, cmor_var, line):
for alias in aliases:
if alias != cmor_var.short_name and alias in self._dict_variables:
Log.error('Alias {0} for variable {1} is already a different '
'variable!'.format(alias, cmor_var.short_name))
continue
alias_object = VariableAlias(alias)
if line[2]:
alias_object.basin = Basins().parse(line[2])
if line[3]:
alias_object.grid = line[3]
cmor_var.known_aliases.append(alias_object)
def _get_aliases_csv_path(self, filename):
csv_table_path = os.path.join(self._aliases_folder, '{0}.csv'.format(filename))
"""Create aliases dictionary for the registered variables"""
self._dict_aliases = {}
for cmor_var_name in self._dict_variables:
cmor_var = self._dict_variables[cmor_var_name]
base_alias = VariableAlias(cmor_var_name)
if base_alias not in cmor_var.known_aliases:
cmor_var.known_aliases.append(base_alias)
for alias in cmor_var.known_aliases:
self._dict_aliases[alias.alias] = (alias, cmor_var)
def _get_xlsx_path(self, table_name):
xlsx_table_path = os.path.join(self._cmor_tables_folder, '{0}.xlsx'.format(table_name))
if os.path.isfile(xlsx_table_path):
return xlsx_table_path
xlsx_table_path = os.path.join(self._cmor_tables_folder, table_name, 'etc', '*.xlsx')
xlsx_table_path = glob.glob(xlsx_table_path)
if len(xlsx_table_path) == 1:
return xlsx_table_path[0]
return None
def _load_xlsx(self, xlsx_table_path):
excel = openpyxl.load_workbook(xlsx_table_path, True)
table_data = {}
data_sheet = excel.worksheets[0]
for row in data_sheet.rows:
if row[1].value in excel.sheetnames:
table_data[row[1].value] = (Frequency(row[2].value), 'Date missing')
for sheet_name in excel.sheetnames:
continue
self._load_xlsx_table(sheet, table_data)
def _load_xlsx_table(self, sheet, table_data):
try:
table_frequency, table_date = table_data[sheet.title]
table = CMORTable(sheet.title, table_frequency, table_date)
self.tables[sheet.title] = table
for row in sheet.rows:
if row[0].value == 'Priority' or not row[5].value:
continue
self._parse_xlsx_var_row(row, table)
except Exception as ex:
Log.error('Table {0} can not be loaded: {1}', sheet.title, ex)
import traceback
traceback.print_exc()
def _parse_xlsx_var_row(self, row, table):
cmor_name = row[11].value
if not cmor_name:
cmor_name = row[5].value
priority = int(row[0].value)
bsc_commitment = row[30].value
if bsc_commitment is not None and bsc_commitment.strip().lower() == 'false':
priority = priority + 3
if cmor_name.lower() in self._dict_variables:
var = self._dict_variables[cmor_name.lower()]
else:
var = Variable()
var.short_name = cmor_name
var.standard_name = row[6].value
var.long_name = row[1].value
var.domain = self._process_modelling_realm(var, row[12].value)
var.units = row[2].value
var.add_table(table, priority)
@staticmethod
def _process_modelling_realm(var, value):
if value is None:
value = ''
modelling_realm = value.split(' ')
def _load_missing_defaults(self):
self._load_file('default', True)
class Variable(object):
"""
Class to characterize a CMOR variable.
It also contains the static method to make the match between the original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
"""
def __str__(self):
return '{0} ({1})'.format(self.standard_name, self.short_name)
def __repr__(self):
return '{0} ({1})'.format(self.standard_name, self.short_name)
def __init__(self):
self.short_name = None
self.standard_name = None
self.long_name = None
self.units = None
self.valid_min = None
self.valid_max = None
self.grid = None
self.default = False
"""
Add table to variable
Parameters
----------
table: CMORTable
priority: int or None, optional
"""
def parse_json(self, json_var, variable):
"""
Parse variable json
Parameters
----------
json_var: dict of str: str
variable: str
Returns
-------
"""
if 'out_name' in json_var:
self.short_name = json_var['out_name'].strip()
raise VariableJsonException('Variable {0} has no out name defined'.format(variable))
self.standard_name = json_var['standard_name'].strip()
self.long_name = json_var['long_name'].strip()
domain = json_var['modeling_realm'].split(' ')
Javier Vegas-Regidor
committed
self.domain = self.get_modelling_realm(domain)
self.valid_min = json_var['valid_min'].strip()
self.valid_max = json_var['valid_max'].strip()
self.units = json_var['units'].strip()
if 'priority' in json_var:
self.priority = int(json_var['priority'].strip())
elif 'primavera_priority' in json_var:
self.priority = int(json_var['primavera_priority'].strip())
else:
self.priority = 1
Javier Vegas-Regidor
committed
def get_modelling_realm(self, domains):
"""
Get var modelling realm
Parameters
----------
domains: iterable of str
Returns
-------
ModelingRealm or None
"""
Javier Vegas-Regidor
committed
if len(domains) > 1:
Log.warning('Multiple modeling realms assigned to variable {0}: {1}. ', self, domains)
parsed = []
for domain in domains:
parsed.append(ModelingRealms.parse(domain))
selected = self._select_most_specific(parsed)
if selected:
Log.warning('We will use {0} as it is the most specific', selected)
return selected
Log.warning('We will use {0} as it is the first on the list and there is no one that is more specific',
parsed[0])
return parsed[0]
Javier Vegas-Regidor
committed
Log.warning('Variable {0} has no modeling realm defined'.format(self.short_name))
return None
else:
return ModelingRealms.parse(domains[0])
def parse_csv(self, var_line):
"""
Fill the object information from a csv line
Parameters
----------
var_line: list of str
"""
self.short_name = var_line[1].strip()
self.standard_name = var_line[2].strip()
self.long_name = var_line[3].strip()
Javier Vegas-Regidor
committed
self.domain = ModelingRealms.parse(var_line[4].strip())
self.basin = Basins().parse(var_line[5])
self.units = var_line[6].strip()
self.valid_min = var_line[7].strip()
self.valid_max = var_line[8].strip()
self.grid = var_line[9].strip()
def get_table(self, frequency, data_convention):
"""
Get a table object given the frequency and data_covention
If the variable does not contain the table information, it uses the domain to make a guess
Parameters
----------
frequency: Frequency
data_convention: str
Returns
-------
CMORTable
Raises
------
ValueError
If a table can not be deduced from the given parameters
"""
if table.frequency == frequency:
return table
if self.domain:
table_name = self.domain.get_table_name(frequency, data_convention)
return CMORTable(table_name, frequency, 'December 2013')
raise ValueError('Can not get table for {0} and frequency {1}'.format(self, frequency))
@staticmethod
def _select_most_specific(parsed):
Javier Vegas-Regidor
committed
parsed = set(parsed)
if {ModelingRealms.land, ModelingRealms.landIce} == parsed:
return ModelingRealms.landIce
if {ModelingRealms.seaIce, ModelingRealms.ocean} == parsed:
return ModelingRealms.seaIce
if {ModelingRealms.atmos, ModelingRealms.atmosChem} == parsed:
return ModelingRealms.atmosChem
if {ModelingRealms.ocean, ModelingRealms.ocnBgchem} == parsed:
return ModelingRealms.ocnBgchem
return None
Class to characterize a CMOR variable.
It also contains the static method to make the match between the original
name and the standard name. Requires data _convetion to be available in cmor_tables to work.
Parameters
----------
alias: str
def __str__(self):
string = self.alias
if self.basin:
string += ' Basin: {0}'.format(self.basin)
if self.grid:
string += ' Grid: {0}'.format(self.grid)
return string
def __eq__(self, other):
if other is None:
return False
return self.alias == other.alias and self.grid == other.grid and self.basin == other.basin
def __ne__(self, other):
return not self == other
"""
Class to represent a CMOR table
Parameters
----------
name: str
frequency: Frequency
date: str
"""
def __init__(self, name, frequency, date):
def __str__(self):
return self.name
return '{0.name} ({0.frequency}, {0.date})'.format(self)
def __lt__(self, other):
return self.name < other.name
class VariableType(object):
"""Enumeration of variable types"""
MEAN = 1
STATISTIC = 2
@staticmethod
def to_str(vartype):
"""Get str representation of vartype for the folder convention"""
if vartype == VariableType.MEAN:
return 'mean'
elif vartype == VariableType.STATISTIC:
return 'statistics'
else:
raise ValueError('Variable type {0} not supported'.format(vartype))