# coding=utf-8 import csv import glob import json import openpyxl import os from bscearth.utils.log import Log from earthdiagnostics.constants import Basins from earthdiagnostics.frequency import Frequency from earthdiagnostics.modelingrealm import ModelingRealms class VariableJsonException(Exception): pass class SingletonType(type): def __call__(cls, *args): try: return cls.__instance except AttributeError: cls.__instance = super(SingletonType, cls).__call__(*args) return cls.__instance class VariableManager(object): __metaclass__ = SingletonType def __init__(self): self._cmor_tables_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_tables') self._aliases_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'variable_alias') self._dict_variables = {} def get_variable(self, original_name, silent=False): """ Returns the cmor variable instance given a variable name :param original_name: original variable's name :type original_name: str :param silent: if True, omits log warning when variable is not found :type silent: bool :return: CMOR variable :rtype: Variable """ try: return self._dict_aliases[original_name.lower()][1] except KeyError: if not silent: Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name)) return None def get_all_variables(self): """ Returns all variables :return: CMOR variable list :rtype: set[Variable] """ return set(self._dict_variables.values()) def get_variable_and_alias(self, original_name, silent=False): """ Returns the cmor variable instance given a variable name :param original_name: original variable's name :type original_name: str :param silent: if True, omits log warning when variable is not found :type silent: bool :return: CMOR variable :rtype: Variable """ try: return self._dict_aliases[original_name.lower()] except KeyError: if not silent: Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name)) return None, None def load_variables(self, table_name): """ Loads the CMOR csv and creates the variables dictionary """ self._dict_variables = dict() self._load_variable_list(table_name) self._load_missing_defaults() self._load_known_aliases(table_name) self._construct_aliases_dict() def _load_variable_list(self, table_name): xlsx_path = self._get_xlsx_path(table_name) if xlsx_path: self._load_xlsx(xlsx_path) return json_folder = self._get_json_folder(table_name) if os.path.isdir(json_folder): self._load_json(json_folder) return csv_path = self._get_csv_path(table_name) if os.path.isfile(csv_path): self._load_file(table_name) return raise Exception('Data convention {0} unknown'.format(table_name)) def _get_csv_path(self, table_name): csv_table_path = os.path.join(self._cmor_tables_folder, '{0}.csv'.format(table_name)) return csv_table_path def _get_json_folder(self, table_name): json_folder = os.path.join(self._cmor_tables_folder, '{0}/Tables'.format(table_name)) return json_folder def _load_file(self, csv_table_path, default=False): with open(self._get_csv_path(csv_table_path), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'Variable': continue var = Variable() var.parse_csv(line) if not var.short_name or var.short_name.lower() in self._dict_variables: continue var.default = default self._dict_variables[var.short_name.lower()] = var def _load_json(self, json_folder): for file_name in os.listdir(json_folder): if file_name in ('CMIP6_grids.json', 'CMIP6_formula_terms.json'): continue json_data = open(os.path.join(json_folder, file_name)).read() try: data = json.loads(json_data) except ValueError: continue if 'variable_entry' in data: Log.debug('Parsing file {0}'.format(file_name)) table = CMORTable(data['Header']['table_id'][6:], Frequency(data['Header']['frequency']), data['Header']['table_date']) self._load_json_variables(data['variable_entry'], table) def _load_json_variables(self, json_data, table): for short_name in json_data.keys(): short_name = str.strip(str(short_name)) if short_name.lower() in self._dict_variables: self._dict_variables[short_name.lower()].tables.append(table) continue variable = Variable() try: variable.parse_json(json_data[short_name], short_name) variable.tables.append(table) self._dict_variables[variable.short_name.lower()] = variable except VariableJsonException: Log.error('Could not read variable {0}'.format(short_name)) def _load_known_aliases(self, table_name): self._load_alias_csv('default') self._load_alias_csv(table_name) def _load_alias_csv(self, filename): file_path = self._get_aliases_csv_path(filename) if not os.path.isfile(file_path): return with open(file_path, 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'Aliases': continue aliases = line[0].split(':') if line[1] not in aliases: aliases.append(line[1]) cmor_vars = [] for alias in aliases: alias = str.strip(alias) if alias.lower() in self._dict_variables: cmor_vars.append(self._dict_variables[alias.lower()]) if len(cmor_vars) == 0: Log.error('Aliases {0} could not be mapped to any variable'.format(aliases)) continue elif len(cmor_vars) > 1: non_default = [var for var in cmor_vars if not var.default] if len(non_default) == 1: for default in [var for var in cmor_vars if var not in non_default]: del self._dict_variables[default.short_name.lower()] cmor_vars = non_default else: Log.error('Aliases {0} can be be mapped to multiple variables ' '[{1}]'.format(aliases, ', '.join(map(str, cmor_vars)))) continue cmor_var = cmor_vars[0] for alias in aliases: if alias != cmor_var.short_name and alias in self._dict_variables: Log.error('Alias {0} for variable {1} is already a different ' 'variable!'.format(alias, cmor_var.short_name)) continue alias_object = VariableAlias(alias) if line[2]: alias_object.basin = Basins.parse(line[2]) if line[3]: alias_object.grid = line[3] cmor_var.known_aliases.append(alias_object) def _get_aliases_csv_path(self, filename): csv_table_path = os.path.join(self._aliases_folder, '{0}.csv'.format(filename)) return csv_table_path def _construct_aliases_dict(self): self._dict_aliases = {} for cmor_var_name in self._dict_variables: cmor_var = self._dict_variables[cmor_var_name] if cmor_var_name not in cmor_var.known_aliases: cmor_var.known_aliases.append(VariableAlias(cmor_var_name)) for alias in cmor_var.known_aliases: self._dict_aliases[alias.alias] = (alias, cmor_var) def _get_xlsx_path(self, table_name): xlsx_table_path = os.path.join(self._cmor_tables_folder, '{0}.xlsx'.format(table_name)) if os.path.isfile(xlsx_table_path): return xlsx_table_path xlsx_table_path = os.path.join(self._cmor_tables_folder, table_name, 'etc', '*.xlsx') xlsx_table_path = glob.glob(xlsx_table_path) if len(xlsx_table_path) == 1: return xlsx_table_path[0] return None def _load_xlsx(self, xlsx_table_path): excel = openpyxl.load_workbook(xlsx_table_path, True) table_data = {} data_sheet = excel.worksheets[0] for row in data_sheet.rows: if row[1].value in excel.sheetnames: table_data[row[1].value] = (Frequency(row[2].value), 'Date missing') for sheet_name in excel.sheetnames: try: sheet = excel.get_sheet_by_name(sheet_name) if sheet['A1'].value != 'Priority': continue table_frequency, table_date = table_data[sheet.title] table = CMORTable(sheet.title, table_frequency, table_date) for row in sheet.rows: if row[0].value == 'Priority' or not row[5].value: continue if row[5].value.lower() in self._dict_variables: self._dict_variables[row[5].value.lower()].tables.append(table) continue var = Variable() var.priority = row[0].value var.short_name = row[5].value var.standard_name = row[6].value var.long_name = row[1].value var.domain = self._process_modelling_realm(var, row[12].value) var.units = row[2].value var.tables.append(table) self._dict_variables[var.short_name.lower()] = var except Exception as ex: Log.error('Table {0} can not be loaded: {1}', sheet_name, ex) def _process_modelling_realm(self, var, value): if value is None: value = '' modelling_realm = value.split(' ') return var.get_modelling_realm(modelling_realm) def _load_missing_defaults(self): self._load_file('default', True) class Variable(object): """ Class to characterize a CMOR variable. It also contains the static method to make the match between thje original name and the standard name. Requires data _convetion to be available in cmor_tables to work. """ def __str__(self): return '{0} ({1})'.format(self.standard_name, self.short_name) def __repr__(self): return '{0} ({1})'.format(self.standard_name, self.short_name) def __init__(self): self.short_name = None self.standard_name = None self.long_name = None self.units = None self.valid_min = None self.valid_max = None self.grid = None self.priority = None self.default = False self.domain = None self.known_aliases = [] self.tables = [] def parse_json(self, json_var, key): if 'out_name' in json_var: self.short_name = json_var['out_name'].strip() else: raise VariableJsonException('Variable has no out name defined'.format(key)) self.standard_name = json_var['standard_name'].strip() self.long_name = json_var['long_name'].strip() domain = json_var['modeling_realm'].split(' ') self.domain = self.get_modelling_realm(domain) self.valid_min = json_var['valid_min'].strip() self.valid_max = json_var['valid_max'].strip() self.units = json_var['units'].strip() if 'priority' in json_var: self.priority = int(json_var['priority'].strip()) elif 'primavera_priority' in json_var: self.priority = int(json_var['primavera_priority'].strip()) else: self.priority = 1 def get_modelling_realm(self, domains): if len(domains) > 1: Log.warning('Multiple modeling realms assigned to variable {0}: {1}. ', self, domains) parsed = [] for domain in domains: parsed.append(ModelingRealms.parse(domain)) selected = self._select_most_specific(parsed) if selected: Log.warning('We will use {0} as it is the most specific', selected) return selected Log.warning('We will use {0} as it is the first on the list and there is no one that is more specific', parsed[0]) return parsed[0] if not domains[0]: Log.warning('Variable {0} has no modeling realm defined'.format(self.short_name)) return None else: return ModelingRealms.parse(domains[0]) def parse_csv(self, var_line): self.short_name = var_line[1].strip() self.standard_name = var_line[2].strip() self.long_name = var_line[3].strip() self.domain = ModelingRealms.parse(var_line[4].strip()) self.basin = Basins.parse(var_line[5]) self.units = var_line[6].strip() self.valid_min = var_line[7].strip() self.valid_max = var_line[8].strip() self.grid = var_line[9].strip() for table in var_line[10].strip().split(':'): if table: self.tables.append(table) def get_table(self, frequency, data_convention): for table in self.tables: if table.frequency == frequency: return table if self.domain: table_name = self.domain.get_table_name(frequency, data_convention) return CMORTable(table_name, frequency, 'December 2013') return self.tables[0] def _select_most_specific(self, parsed): parsed = set(parsed) if {ModelingRealms.land, ModelingRealms.landIce} == parsed: return ModelingRealms.landIce if {ModelingRealms.seaIce, ModelingRealms.ocean} == parsed: return ModelingRealms.seaIce if {ModelingRealms.atmos, ModelingRealms.atmosChem} == parsed: return ModelingRealms.atmosChem if {ModelingRealms.ocean, ModelingRealms.ocnBgchem} == parsed: return ModelingRealms.ocnBgchem return None class VariableAlias(object): """ Class to characterize a CMOR variable. It also contains the static method to make the match between thje original name and the standard name. Requires data _convetion to be available in cmor_tables to work. """ def __init__(self, alias): self.alias = alias self.basin = None self.grid = None def __str__(self): string = self.alias if self.basin: string += ' Basin: {0}'.format(self.basin) if self.grid: string += ' Grid: {0}'.format(self.grid) return string class CMORTable(object): def __init__(self, name, frequency, date): self.name = name self.frequency = Frequency.parse(frequency) self.date = date def __str__(self): return self.name def __repr__(self): return '{0.name} ({0.frequency}, {0.date}'.format(self) def __lt__(self, other): return self.name < other.name