# coding: utf-8 import csv import glob import shutil import threading from datetime import datetime import numpy as np import os import re from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day from cfunits import Units from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.variable import Variable class DataManager(object): """ Class to manage the data repositories :param config: :type config: Config """ def __init__(self, config): self.config = config self.experiment = config.experiment self._checked_vars = list() Variable.load_variables() UnitConversion.load_conversions() self.lock = threading.Lock() self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles') def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, box, grid, None, None) temp_path = TempFile.get() shutil.copyfile(filepath, temp_path) return temp_path def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False): """ Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge with already existing ones as needed :param move_old: if true, moves files following older conventions that may be found on the links folder :type move_old: bool :param date_str: exact date_str to use in the cmorized file :type: str :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param rename_var: if exists, the given variable will be renamed to the one given by var :type rename_var: str :param filetosend: path to the file to send to the CMOR repository :type filetosend: str :param region: specifies the region represented by the file. If it is defined, the data will be appended to the CMOR repository as a new region in the file or will overwrite if region was already present :type region: str :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ original_var = var cmor_var = Variable.get_variable(var) var = self._get_final_var_name(box, var) if rename_var and rename_var != var: Utils.rename_variable(filetosend, rename_var, var) elif original_var != var: Utils.rename_variable(filetosend, original_var, var) if not frequency: frequency = self.config.frequency domain = DataManager.correct_domain(domain) filepath = self.get_file_path(startdate, member, domain, cmor_var.short_name, chunk, frequency, box, grid, year, date_str) if region: self._prepare_region(filepath, filetosend, region, var) temp = TempFile.get() Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetosend, temp]) shutil.move(temp, filetosend) if cmor_var: handler = Utils.openCdf(filetosend) var_handler = handler.variables[var] var_handler.standard_name = cmor_var.standard_name var_handler.long_name = cmor_var.long_name var_handler.short_name = cmor_var.short_name var_type = var_handler.dtype handler.modeling_realm = cmor_var.domain handler.table_id = 'Table {0} (December 2013)'.format(self.get_domain_abbreviation(cmor_var.domain, frequency)) if cmor_var.units: self._fix_units(cmor_var, var_handler) handler.sync() if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if domain == 'ocean': handler.variables['lev'].standard_name = 'depth' if 'lon' in handler.variables: handler.variables['lon'].short_name = 'lon' handler.variables['lon'].standard_name = 'longitude' if 'lat' in handler.variables: handler.variables['lat'].short_name = 'lat' handler.variables['lat'].standard_name = 'latitude' handler.close() if cmor_var.valid_min != '': valid_min = '-a valid_min, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_min) else: valid_min = '' if cmor_var.valid_max != '': valid_max = '-a valid_max, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_max) else: valid_max = '' Utils.nco.ncatted(input=filetosend, output=filetosend, options='-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(var, var_type.char, valid_min, valid_max)) variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = 'lat' variables['nav_lon_grid_V'] = 'lon' variables['nav_lat_grid_U'] = 'lat' variables['nav_lon_grid_U'] = 'lon' variables['nav_lat_grid_T'] = 'lat' variables['nav_lon_grid_T'] = 'lon' Utils.rename_variables(filetosend, variables, False, True) Utils.move_file(filetosend, filepath) self._create_link(domain, filepath, frequency, var, grid, move_old) @staticmethod def _fix_units(cmor_var, var_handler): if 'units' in var_handler.ncattrs(): if var_handler.units == 'PSU': var_handler.units = 'psu' if var_handler.units == 'C' and cmor_var.units == 'K': var_handler.units = 'deg_C' if cmor_var.units != var_handler.units: try: new_unit = Units(cmor_var.units) old_unit = Units(var_handler.units) var_handler[:] = Units.conform(var_handler[:], old_unit, new_unit, inplace=True) if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = Units.conform(float(var_handler.valid_min), old_unit, new_unit, inplace=True) if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = Units.conform(float(var_handler.valid_max), old_unit, new_unit, inplace=True) except ValueError: factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float(var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float(var_handler.valid_max) * factor + offset var_handler.units = cmor_var.units @staticmethod def _prepare_region(filepath, filetosend, region, var): Utils.convert2netcdf4(filetosend) if not os.path.exists(filepath): handler = Utils.openCdf(filetosend) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = region original_var = handler.variables[var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var)) Utils.rename_variable(filetosend, 'new_var', var) else: temp = TempFile.get() shutil.copyfile(filepath, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(filetosend) value = handler_send.variables[var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, filetosend) Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region') @staticmethod def _get_final_var_name(box, var): if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() return var @staticmethod def correct_domain(domain): """ Corrects domain capitalization :param domain: domain name :type domain: str :return: domain with correct capitalization :rtype: str """ domain = domain.lower() if domain == 'seaice': return 'seaIce' elif domain == 'landice': return 'landIce' return domain def _create_link(self, domain, filepath, frequency, var, grid, move_old): if frequency in ('d', 'daily', 'day'): freq_str = 'daily_mean' elif frequency.endswith('hr'): freq_str = frequency[:-2] + 'hourly' else: freq_str = 'monthly_mean' if not grid: grid = 'original' var_grid = '{0}-{1}'.format(var, grid) if domain in ['ocean', 'seaIce']: variable_folder = '{0}_f{1}h'.format(var, self.experiment.ocean_timestep) vargrid_folder = '{0}_f{1}h'.format(var_grid, self.experiment.ocean_timestep) else: variable_folder = '{0}_f{1}h'.format(var, self.experiment.atmos_timestep) vargrid_folder = '{0}_f{1}h'.format(var_grid, self.experiment.atmos_timestep) if grid == 'original': link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) if os.path.islink(link_path): link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) if not os.path.exists(link_path): # This can be a race condition # noinspection PyBroadException try: os.makedirs(link_path) except Exception: pass else: link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) if not os.path.exists(link_path): # This can be a race condition # noinspection PyBroadException try: os.makedirs(link_path) except Exception: pass default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder.replace('-{0}_f'.format(grid), '-original_f')) if os.path.islink(default_path): os.remove(default_path) elif os.path.isdir(default_path): shutil.move(default_path, original_path) os.symlink(link_path, default_path) if move_old: if self.lock.acquire(False): if link_path not in self._checked_vars: self._checked_vars.append(link_path) self.lock.release() old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, 'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep)) regex = re.compile(var + '_[0-9]{6,8}\.nc') for filename in os.listdir(link_path): if regex.match(filename): if not os.path.exists(old_path): # This can be a race condition # noinspection PyBroadException try: os.makedirs(old_path) except Exception: pass Utils.move_file(os.path.join(link_path, filename), os.path.join(old_path, filename)) link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) if not os.path.exists(filepath): raise ValueError('Original file {0} does not exists'.format(filepath)) os.symlink(filepath, link_path) @staticmethod def get_domain_abbreviation(domain, frequency): """ Returns the table name for a domain-frequency pair :param domain: variable's domain :type domain: str :param frequency: variable's frequency :type frequency: str :return: variable's table name :rtype: str """ if frequency == 'mon': if domain == 'seaIce': domain_abreviattion = 'OImon' elif domain == 'landIce': domain_abreviattion = 'LImon' else: domain_abreviattion = domain[0].upper() + 'mon' elif frequency == '6hr': domain_abreviattion = '6hrPlev' else: domain_abreviattion = 'day' return domain_abreviattion # Abstract methods def get_file_path(self, startdate, member, domain, var, chunk, frequency, box=None, grid=None, year=None, date_str=None): """ :param date_str: exact date_str to use in the cmorized file :type: str :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int | None :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: file absolute path :rtype: str """ raise NotImplementedError() def get_year(self, domain, var, startdate, member, year, grid=None, box=None): raise NotImplementedError() # Overridable methods (not mandatory) def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None, frequency=None, year=None, date_str=None, move_old=False): pass class CMORManager(DataManager): def get_file_path(self, startdate, member, domain, var, chunk, frequency, box=None, grid=None, year=None, date_str=None): if not frequency: frequency = self.config.frequency var = self._get_final_var_name(box, var) domain_abreviattion = self.get_domain_abbreviation(domain, frequency) start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self._get_startdate_path(startdate), frequency, domain) if chunk is not None: chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month) elif year: if frequency is not 'yr': raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) elif date_str: time_bound = date_str else: raise ValueError('Chunk, year and date_str can not be None at the same time') if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_' '{6}.nc'.format(var, domain_abreviattion, self.experiment.model, self.experiment.experiment_name, startdate, member_plus, time_bound)) return filepath def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None, frequency=None, year=None, date_str=None, move_old=False): """ Creates the link of a given file from the CMOR repository. :param move_old: :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ var = self._get_final_var_name(box, var) if not frequency: frequency = self.config.frequency domain = DataManager.correct_domain(domain) filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, grid, year, date_str) self._create_link(domain, filepath, frequency, var, grid, move_old) def get_year(self, domain, var, startdate, member, year, grid=None, box=None): """ Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and returns the path to the scratch's copy. :param year: year to retrieve :type year: int :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :return: path to the copy created on the scratch folder :rtype: str """ chunk_files = list() for chunk in self.experiment.get_year_chunks(startdate, year): chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box)) if len(chunk_files) > 1: temp = TempFile.get() Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp) for chunk_file in chunk_files: os.remove(chunk_file) else: temp = chunk_files[0] temp2 = TempFile.get() handler = Utils.openCdf(temp) time = Utils.get_datetime_from_netcdf(handler) handler.close() start = None end = None for x in range(0, len(time)): date = time[x] if date.year == year: if date.month == 1: start = x elif date.month == 12: end = x Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end)) os.remove(temp) return temp2 def _is_cmorized(self, startdate, member): startdate_path = self._get_startdate_path(startdate) if not os.path.exists(startdate_path): return False for freq in os.listdir(startdate_path): freq_path = os.path.join(startdate_path, freq) for domain in os.listdir(freq_path): domain_path = os.path.join(freq_path, domain) for var in os.listdir(domain_path): member_path = os.path.join(domain_path, var, 'r{0}i1p1'.format(member + 1)) if os.path.exists(member_path): return True return False # noinspection PyPep8Naming def prepare_CMOR_files(self): """ Prepares the data to be used by the diagnostic. If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure will be launched If CMOR data is available but packed, the procedure will unpack it. :return: """ # Check if cmorized and convert if not for startdate, member in self.experiment.get_member_list(): if self._is_cmorized(startdate, member) and not self.config.cmor.force: continue member_str = self.experiment.get_member_str(member) if not self.config.cmor.force: tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles') tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid, 'cmorfiles') file_name = 'CMOR?_{0}_{1}_*.tar.gz'.format(self.experiment.expid, startdate, member_str) filepaths = glob.glob(os.path.join(tar_path, file_name)) filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name)) filepaths += glob.glob(os.path.join(tar_original_files, file_name)) filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name)) if len(filepaths) > 0: Log.info('Unzipping cmorized data...') Utils.unzip(filepaths, True) if not os.path.exists(self.cmor_path): os.mkdir(self.cmor_path) file_name = 'CMOR?_{0}_{1}_*.tar'.format(self.experiment.expid, startdate, member_str) filepaths = glob.glob(os.path.join(tar_path, file_name)) filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name)) filepaths += glob.glob(os.path.join(tar_original_files, file_name)) filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name)) if len(filepaths) > 0: Log.info('Unpacking cmorized data...') Utils.untar(filepaths, self.cmor_path) self._correct_paths(startdate) self._create_links(startdate) continue start_time = datetime.now() Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time) cmorizer = Cmorizer(self, startdate, member) cmorizer.cmorize_ocean() cmorizer.cmorize_atmos() Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str, datetime.now() - start_time) def _correct_paths(self, startdate): bad_path = os.path.join(self.cmor_path, 'output', self.experiment.institute) if os.path.exists(bad_path): Log.debug('Moving CMOR files out of the output folder') Utils.execute_shell_command(['mv', bad_path, os.path.join(bad_path, '..', '..')]) os.rmdir(os.path.join(self.cmor_path, 'output')) Log.debug('Done') if self.experiment.experiment_name != self.experiment.model: bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model, self.experiment.model) Log.debug('Correcting double model appearance') for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_output_'.format(self.experiment.model), '_{0}_{1}_S{2}_'.format(self.experiment.model, self.experiment.experiment_name, startdate)) good = good.replace('/{0}/{0}'.format(self.experiment.model), '/{0}/{1}'.format(self.experiment.model, self.experiment.experiment_name)) Utils.move_file(filepath, good) os.rmdir(dirpath) Log.debug('Done') def _create_links(self, startdate): Log.info('Creating links for CMOR files ()') path = self._get_startdate_path(startdate) for freq in os.listdir(path): for domain in os.listdir(os.path.join(path, freq)): for var in os.listdir(os.path.join(path, freq, domain)): for member in os.listdir(os.path.join(path, freq, domain, var)): for name in os.listdir(os.path.join(path, freq, domain, var, member)): filepath = os.path.join(path, freq, domain, var, member, name) if os.path.isfile(filepath): self._create_link(domain, filepath, freq, var, "", False) else: for filename in os.listdir(filepath): self._create_link(domain, os.path.join(filepath, filename), freq, var, "", False) Log.info('Creating lings for CMOR files') def _get_startdate_path(self, startdate): """ Returns the path to the startdate's CMOR folder :param startdate: target startdate :type startdate: str :return: path to the startdate's CMOR folder :rtype: str """ return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute, self.experiment.model, self.experiment.experiment_name, 'S' + startdate) class UnitConversion(object): """ Class to manage unit conversions """ _dict_conversions = None @classmethod def load_conversions(cls): """ Load conversions from the configuration file """ cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Adds a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Gets the conversion factor and offset for two units . The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None