# coding=utf-8 import csv import glob import json import os import openpyxl from bscearth.utils.log import Log from earthdiagnostics.constants import Basins from earthdiagnostics.frequency import Frequency from earthdiagnostics.modelingrealm import ModelingRealms from singleton import SingletonType class VariableJsonException(Exception): pass class VariableManager(object): __metaclass__ = SingletonType def __init__(self): self._cmor_tables_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_tables') self._aliases_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'variable_alias') self._dict_variables = {} def get_variable(self, original_name, silent=False): """ Returns the cmor variable instance given a variable name :param original_name: original variable's name :type original_name: str :param silent: if True, omits log warning when variable is not found :type silent: bool :return: CMOR variable :rtype: Variable """ try: return self._dict_aliases[original_name.lower()][1] except KeyError: if not silent: Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name)) return None def get_all_variables(self): """ Returns all variables :return: CMOR variable list :rtype: set[Variable] """ all_vars = set(self._dict_variables.values()) return sorted(all_vars, key=lambda var: var.short_name) def get_variable_and_alias(self, original_name, silent=False): """ Returns the cmor variable instance given a variable name :param original_name: original variable's name :type original_name: str :param silent: if True, omits log warning when variable is not found :type silent: bool :return: CMOR variable :rtype: Variable """ try: return self._dict_aliases[original_name.lower()] except KeyError: if not silent: Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name)) return None, None def load_variables(self, table_name): """ Loads the CMOR csv and creates the variables dictionary """ self._dict_variables = dict() self._load_variable_list(table_name) self._load_missing_defaults() self._load_known_aliases(table_name) self._construct_aliases_dict() def _load_variable_list(self, table_name): xlsx_path = self._get_xlsx_path(table_name) if xlsx_path: self._load_xlsx(xlsx_path) return json_folder = self._get_json_folder(table_name) if os.path.isdir(json_folder): self._load_json(json_folder) return csv_path = self._get_csv_path(table_name) if os.path.isfile(csv_path): self._load_file(table_name) return raise Exception('Data convention {0} unknown'.format(table_name)) def _get_csv_path(self, table_name): csv_table_path = os.path.join(self._cmor_tables_folder, '{0}.csv'.format(table_name)) return csv_table_path def _get_json_folder(self, table_name): json_folder = os.path.join(self._cmor_tables_folder, '{0}/Tables'.format(table_name)) return json_folder def _load_file(self, csv_table_path, default=False): with open(self._get_csv_path(csv_table_path), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'Variable': continue var = Variable() var.parse_csv(line) if not var.short_name or var.short_name.lower() in self._dict_variables: continue var.default = default self._dict_variables[var.short_name.lower()] = var def _load_json(self, json_folder): for file_name in os.listdir(json_folder): if file_name in ('CMIP6_grids.json', 'CMIP6_formula_terms.json'): continue json_data = open(os.path.join(json_folder, file_name)).read() try: data = json.loads(json_data) except ValueError: continue if 'variable_entry' in data: Log.debug('Parsing file {0}'.format(file_name)) table = CMORTable(data['Header']['table_id'][6:], Frequency(data['Header']['frequency']), data['Header']['table_date']) self._load_json_variables(data['variable_entry'], table) def _load_json_variables(self, json_data, table): for short_name in json_data.keys(): if short_name == 'ta19': pass short_name = str.strip(str(short_name)) if short_name.lower() in self._dict_variables: self._dict_variables[short_name.lower()].tables.append(table) continue variable = Variable() try: variable.parse_json(json_data[short_name], short_name) variable.tables.append(table) self._dict_variables[variable.short_name.lower()] = variable except VariableJsonException: Log.error('Could not read variable {0}'.format(short_name)) def _load_known_aliases(self, table_name): self._load_alias_csv('default') self._load_alias_csv(table_name) def _load_alias_csv(self, filename): file_path = self._get_aliases_csv_path(filename) if not os.path.isfile(file_path): return with open(file_path, 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'Aliases': continue aliases = line[0].split(':') if line[1] not in aliases: aliases.append(line[1]) cmor_vars = [] for alias in aliases: alias = str.strip(alias) if alias.lower() in self._dict_variables: cmor_vars.append(self._dict_variables[alias.lower()]) if len(cmor_vars) == 0: Log.error('Aliases {0} could not be mapped to any variable'.format(aliases)) continue elif len(cmor_vars) > 1: non_default = [var for var in cmor_vars if not var.default] if len(non_default) == 1: for default in [var for var in cmor_vars if var not in non_default]: del self._dict_variables[default.short_name.lower()] cmor_vars = non_default else: Log.error('Aliases {0} can be be mapped to multiple variables ' '[{1}]'.format(aliases, ', '.join(map(str, cmor_vars)))) continue cmor_var = cmor_vars[0] for alias in aliases: if alias != cmor_var.short_name and alias in self._dict_variables: Log.error('Alias {0} for variable {1} is already a different ' 'variable!'.format(alias, cmor_var.short_name)) continue alias_object = VariableAlias(alias) if line[2]: alias_object.basin = Basins().parse(line[2]) if line[3]: alias_object.grid = line[3] cmor_var.known_aliases.append(alias_object) def _get_aliases_csv_path(self, filename): csv_table_path = os.path.join(self._aliases_folder, '{0}.csv'.format(filename)) return csv_table_path def _construct_aliases_dict(self): self._dict_aliases = {} for cmor_var_name in self._dict_variables: cmor_var = self._dict_variables[cmor_var_name] if cmor_var_name not in cmor_var.known_aliases: cmor_var.known_aliases.append(VariableAlias(cmor_var_name)) for alias in cmor_var.known_aliases: self._dict_aliases[alias.alias] = (alias, cmor_var) def _get_xlsx_path(self, table_name): xlsx_table_path = os.path.join(self._cmor_tables_folder, '{0}.xlsx'.format(table_name)) if os.path.isfile(xlsx_table_path): return xlsx_table_path xlsx_table_path = os.path.join(self._cmor_tables_folder, table_name, 'etc', '*.xlsx') xlsx_table_path = glob.glob(xlsx_table_path) if len(xlsx_table_path) == 1: return xlsx_table_path[0] return None def _load_xlsx(self, xlsx_table_path): excel = openpyxl.load_workbook(xlsx_table_path, True) table_data = {} data_sheet = excel.worksheets[0] for row in data_sheet.rows: if row[1].value in excel.sheetnames: table_data[row[1].value] = (Frequency(row[2].value), 'Date missing') for sheet_name in excel.sheetnames: try: sheet = excel.get_sheet_by_name(sheet_name) if sheet.title == 'Primday': pass if sheet['A1'].value != 'Priority': continue table_frequency, table_date = table_data[sheet.title] table = CMORTable(sheet.title, table_frequency, table_date) for row in sheet.rows: if row[0].value == 'Priority' or not row[5].value: continue cmor_name = row[11].value if not cmor_name: cmor_name = row[5].value priority = int(row[0].value) bsc_commitment = row[30].value if bsc_commitment is not None and bsc_commitment.strip().lower() == 'false': priority = priority + 3 if cmor_name.lower() in self._dict_variables: var = self._dict_variables[cmor_name.lower()] if var.priority > priority: var.priority = priority var.tables.append(table) continue var = Variable() var.priority = priority var.short_name = cmor_name var.standard_name = row[6].value var.long_name = row[1].value var.domain = self._process_modelling_realm(var, row[12].value) var.units = row[2].value var.tables.append(table) self._dict_variables[var.short_name.lower()] = var except Exception as ex: Log.error('Table {0} can not be loaded: {1}', sheet_name, ex) import traceback traceback.print_exc() def _process_modelling_realm(self, var, value): if value is None: value = '' modelling_realm = value.split(' ') return var.get_modelling_realm(modelling_realm) def _load_missing_defaults(self): self._load_file('default', True) class Variable(object): """ Class to characterize a CMOR variable. It also contains the static method to make the match between thje original name and the standard name. Requires data _convetion to be available in cmor_tables to work. """ def __str__(self): return '{0} ({1})'.format(self.standard_name, self.short_name) def __repr__(self): return '{0} ({1})'.format(self.standard_name, self.short_name) def __init__(self): self.short_name = None self.standard_name = None self.long_name = None self.units = None self.valid_min = None self.valid_max = None self.grid = None self.priority = None self.default = False self.domain = None self.known_aliases = [] self.tables = [] def parse_json(self, json_var, key): if 'out_name' in json_var: self.short_name = json_var['out_name'].strip() else: raise VariableJsonException('Variable has no out name defined'.format(key)) self.standard_name = json_var['standard_name'].strip() self.long_name = json_var['long_name'].strip() domain = json_var['modeling_realm'].split(' ') self.domain = self.get_modelling_realm(domain) self.valid_min = json_var['valid_min'].strip() self.valid_max = json_var['valid_max'].strip() self.units = json_var['units'].strip() if 'priority' in json_var: self.priority = int(json_var['priority'].strip()) elif 'primavera_priority' in json_var: self.priority = int(json_var['primavera_priority'].strip()) else: self.priority = 1 def get_modelling_realm(self, domains): if len(domains) > 1: Log.warning('Multiple modeling realms assigned to variable {0}: {1}. ', self, domains) parsed = [] for domain in domains: parsed.append(ModelingRealms.parse(domain)) selected = self._select_most_specific(parsed) if selected: Log.warning('We will use {0} as it is the most specific', selected) return selected Log.warning('We will use {0} as it is the first on the list and there is no one that is more specific', parsed[0]) return parsed[0] if not domains[0]: Log.warning('Variable {0} has no modeling realm defined'.format(self.short_name)) return None else: return ModelingRealms.parse(domains[0]) def parse_csv(self, var_line): self.short_name = var_line[1].strip() self.standard_name = var_line[2].strip() self.long_name = var_line[3].strip() self.domain = ModelingRealms.parse(var_line[4].strip()) self.basin = Basins().parse(var_line[5]) self.units = var_line[6].strip() self.valid_min = var_line[7].strip() self.valid_max = var_line[8].strip() self.grid = var_line[9].strip() for table in var_line[10].strip().split(':'): if table: self.tables.append(table) def get_table(self, frequency, data_convention): for table in self.tables: if table.frequency == frequency: return table if self.domain: table_name = self.domain.get_table_name(frequency, data_convention) return CMORTable(table_name, frequency, 'December 2013') return self.tables[0] def _select_most_specific(self, parsed): parsed = set(parsed) if {ModelingRealms.land, ModelingRealms.landIce} == parsed: return ModelingRealms.landIce if {ModelingRealms.seaIce, ModelingRealms.ocean} == parsed: return ModelingRealms.seaIce if {ModelingRealms.atmos, ModelingRealms.atmosChem} == parsed: return ModelingRealms.atmosChem if {ModelingRealms.ocean, ModelingRealms.ocnBgchem} == parsed: return ModelingRealms.ocnBgchem return None class VariableAlias(object): """ Class to characterize a CMOR variable. It also contains the static method to make the match between thje original name and the standard name. Requires data _convetion to be available in cmor_tables to work. """ def __init__(self, alias): self.alias = alias self.basin = None self.grid = None def __str__(self): string = self.alias if self.basin: string += ' Basin: {0}'.format(self.basin) if self.grid: string += ' Grid: {0}'.format(self.grid) return string class CMORTable(object): def __init__(self, name, frequency, date): self.name = name self.frequency = Frequency.parse(frequency) self.date = date def __str__(self): return self.name def __repr__(self): return '{0.name} ({0.frequency}, {0.date}'.format(self) def __lt__(self, other): return self.name < other.name