# coding=utf-8 import glob from datetime import datetime import os from bscearth.utils.log import Log from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, previous_day from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.datamanager import DataManager from earthdiagnostics.frequency import Frequencies, Frequency from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import TempFile, Utils from earthdiagnostics.variable_type import VariableType class CMORManager(DataManager): """ Data manager class for CMORized experiments """ def __init__(self, config): super(CMORManager, self).__init__(config) self._dic_cmorized = dict() data_folders = self.config.data_dir.split(':') experiment_folder = self.experiment.model.lower() if experiment_folder.startswith('ec-earth'): experiment_folder = 'ecearth' self.config.data_dir = None for data_folder in data_folders: if os.path.isdir(os.path.join(data_folder, self.experiment.expid)): self.config.data_dir = data_folder break test_folder = os.path.join(data_folder, self.experiment.model.lower().replace('-', '')) if os.path.isdir(os.path.join(test_folder, self.experiment.expid)): self.config.data_dir = test_folder break test_folder = os.path.join(data_folder, self.config.data_type, experiment_folder) if os.path.isdir(os.path.join(test_folder, self.experiment.expid)): self.config.data_dir = test_folder break if not self.config.data_dir: raise Exception('Can not find model data') self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles') def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=VariableType.MEAN): cmor_var = self.variable_list.get_variable(var) filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None) # noinspection PyBroadException try: return os.path.isfile(filepath) except Exception: return False def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=VariableType.MEAN): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str|NoneType :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency|NoneType :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ cmor_var = self.variable_list.get_variable(var) var = self._get_final_var_name(box, var) filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None) temp_path = TempFile.get() Utils.copy_file(filepath, temp_path) return temp_path def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str|NoneType :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency|NoneType :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ cmor_var = self.variable_list.get_variable(var) var = self._get_final_var_name(box, var) filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None) return self._get_file_from_storage(filepath) def get_file_path(self, startdate, member, domain, var, cmor_var, chunk, frequency, grid=None, year=None, date_str=None): """ Returns the path to a concrete file :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param domain: file's domain :type domain: Domain :param var: file's var :type var: var :param chunk: file's chunk :type chunk: int :param frequency: file's frequency :type frequency: Frequency :param grid: file's grid :type grid: str|NoneType :param year: file's year :type year: int|str|NoneType :param date_str: date string to add directly. Overrides year or chunk configurations :type date_str: str|NoneType :return: path to the file :rtype: str|NoneType """ if not frequency: frequency = self.config.frequency folder_path = self._get_full_cmor_folder_path(startdate, member, domain, var, frequency, grid) file_name = self._get_cmor_file_name(startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid) filepath = os.path.join(folder_path, file_name) return filepath def _get_cmor_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): if cmor_var is None: cmor_table = domain.get_table(frequency, self.config.data_convention) else: cmor_table = cmor_var.get_table(frequency, self.config.data_convention) if chunk is not None: time_bound = self._get_chunk_time_bounds(startdate, chunk) elif year: if frequency != Frequencies.yearly: raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) elif date_str: time_bound = date_str else: raise ValueError('Chunk, year and date_str can not be None at the same time') if time_bound: time_bound = '_{0}.nc'.format(time_bound) else: time_bound = '.nc' if self.config.data_convention == 'specs': file_name = '{0}_{1}_{2}_{3}_S{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model, self.experiment.experiment_name, startdate, self._get_member_str(member), time_bound) elif self.config.data_convention in ('primavera', 'cmip6'): if grid: grid = '_{0}'.format(grid) else: grid = '' file_name = '{0}_{1}_{2}_{3}_S{4}-{5}{6}{7}'.format(var, cmor_table.name, self.experiment.experiment_name, self.experiment.model, startdate, self._get_member_str(member), grid, time_bound) else: raise Exception('Data convention {0} not supported'.format(self.config.data_convention)) return file_name def _get_full_cmor_folder_path(self, startdate, member, domain, var, frequency, grid): folder_path = os.path.join(self._get_startdate_path(startdate), str(frequency), domain.name, var) if grid: folder_path = os.path.join(folder_path, grid) folder_path = os.path.join(folder_path, self._get_member_str(member)) return folder_path def _get_chunk_time_bounds(self, startdate, chunk): start = parse_date(startdate) chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar) chunk_end = previous_day(chunk_end, self.experiment.calendar) time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month) return time_bound def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None, frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN): """ Creates the link of a given file from the CMOR repository. :param move_old: :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ if not frequency: frequency = self.config.frequency filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid=grid, year=str(year), date_str=date_str) self._create_link(domain, filepath, frequency, var, grid, move_old, vartype) def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False, diagnostic=None, cmorized=False, vartype=VariableType.MEAN): """ Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge with already existing ones as needed :param move_old: if true, moves files following older conventions that may be found on the links folder :type move_old: bool :param date_str: exact date_str to use in the cmorized file :type: str :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param rename_var: if exists, the given variable will be renamed to the one given by var :type rename_var: str :param filetosend: path to the file to send to the CMOR repository :type filetosend: str :param region: specifies the region represented by the file. If it is defined, the data will be appended to the CMOR repository as a new region in the file or will overwrite if region was already present :type region: str :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency :param diagnostic: diagnostic used to generate the file :type diagnostic: Diagnostic :param cmorized: flag to indicate if file was generated in cmorization process :type cmorized: bool :param vartype: Variable type (mean, statistic) :type vartype: VariableType """ if rename_var: original_name = rename_var else: original_name = var cmor_var = self.variable_list.get_variable(var) final_name = self._get_final_var_name(box, var) if final_name != original_name: Utils.rename_variable(filetosend, original_name, final_name) if not frequency: frequency = self.config.frequency filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, chunk, frequency, grid, year, date_str) netcdf_file = NetCDFFile(filepath, filetosend, domain, final_name, cmor_var, self.config.data_convention, region) netcdf_file.frequency = frequency if diagnostic: netcdf_file.add_diagnostic_history(diagnostic) elif cmorized: netcdf_file.add_cmorization_history() else: raise ValueError('You must provide a diagnostic or set cmorized to true to store data ' 'using the CMORManager') netcdf_file.send() self._create_link(domain, filepath, frequency, final_name, grid, move_old, vartype) def get_year(self, domain, var, startdate, member, year, grid=None, box=None): """ Ge a file containing all the data for one year for one variable :param domain: variable's domain :type domain: str :param var: variable's name :type var: str :param startdate: startdate to retrieve :type startdate: str :param member: member to retrieve :type member: int :param year: year to retrieve :type year: int :param grid: variable's grid :type grid: str :param box: variable's box :type box: Box :return: """ chunk_files = list() for chunk in self.experiment.get_year_chunks(startdate, year): chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box)) if len(chunk_files) > 1: temp = self._merge_chunk_files(chunk_files) else: temp = chunk_files[0] temp2 = self._select_data_of_given_year(temp, year) os.remove(temp) return temp2 @staticmethod def _select_data_of_given_year(data_file, year): temp2 = TempFile.get() handler = Utils.openCdf(data_file) times = Utils.get_datetime_from_netcdf(handler) x = 0 first_index = None last_index = None while x < times.size: if times[x].year == year: first_index = x break else: x += 1 while x < times.size: if times[x].year != year: last_index = x break else: x += 1 if last_index is None: last_index = times.size Utils.nco.ncks(input=data_file, output=temp2, options=['-d time,{0},{1}'.format(first_index, last_index - 1)]) return temp2 @staticmethod def _merge_chunk_files(chunk_files): temp = TempFile.get() Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp) for chunk_file in chunk_files: os.remove(chunk_file) return temp # noinspection PyPep8Naming def prepare(self): """ Prepares the data to be used by the diagnostic. If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure will be launched If CMOR data is available but packed, the procedure will unpack it. :return: """ # Check if cmorized and convert if not for startdate, member in self.experiment.get_member_list(): if not self._unpack_cmor_files(startdate, member): self._cmorize_member(startdate, member) def is_cmorized(self, startdate, member, chunk, domain): identifier = (startdate, member, chunk, domain.name) if identifier not in self._dic_cmorized: self._dic_cmorized[identifier] = self._is_cmorized(startdate, member, chunk, domain) return self._dic_cmorized[identifier] def _is_cmorized(self, startdate, member, chunk, domain): startdate_path = self._get_startdate_path(startdate) if not os.path.isdir(startdate_path): return False for freq in os.listdir(startdate_path): domain_path = os.path.join(startdate_path, freq, domain.name) if os.path.isdir(domain_path): for var in os.listdir(domain_path): cmor_var = self.variable_list.get_variable(var, True) var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, Frequency(freq)) if os.path.isfile(var_path): return True return False def _cmorize_member(self, startdate, member): start_time = datetime.now() member_str = self.experiment.get_member_str(member) Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time) cmorizer = Cmorizer(self, startdate, member) cmorizer.cmorize_ocean() cmorizer.cmorize_atmos() Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str, datetime.now() - start_time) def _unpack_cmor_files(self, startdate, member): if self.config.cmor.force: return False chunk = 1 cmorized = False if not self.config.cmor.force_untar: while self.is_cmorized(startdate, member, chunk, ModelingRealms.ocean) or\ self.is_cmorized(startdate, member, chunk, ModelingRealms.atmos): chunk += 1 while self._unpack_chunk(startdate, member, chunk): chunk += 1 cmorized = True return cmorized def _unpack_chunk(self, startdate, member, chunk): filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz') if len(filepaths) > 0: if self.config.cmor.chunk_cmorization_requested(chunk): Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk) Utils.unzip(filepaths, True) else: return True if not os.path.exists(self.cmor_path): os.mkdir(self.cmor_path) filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar') if len(filepaths) > 0: if self.config.cmor.chunk_cmorization_requested(chunk): Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk) Utils.untar(filepaths, self.cmor_path) self._correct_paths(startdate, member) self.create_links(startdate, member) return True return False def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension): tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles') tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid, 'cmorfiles') file_name = 'CMOR?_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate, self.experiment.get_member_str(member), self.experiment.get_chunk_start_str(startdate, chunk), extension) filepaths = glob.glob(os.path.join(tar_path, file_name)) filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name)) filepaths += glob.glob(os.path.join(tar_original_files, file_name)) filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name)) return filepaths def _correct_paths(self, startdate, member): self._remove_extra_output_folder() self._fix_model_as_experiment_error(startdate, member) def _fix_model_as_experiment_error(self, startdate, member): if self.experiment.experiment_name != self.experiment.model: bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model, self.experiment.model) Log.debug('Correcting double model appearance') for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath good = good.replace('_{0}_output_'.format(self.experiment.model), '_{0}_{1}_S{2}_'.format(self.experiment.model, self.experiment.experiment_name, startdate)) good = good.replace('/{0}/{0}'.format(self.experiment.model), '/{0}/{1}'.format(self.experiment.model, self.experiment.experiment_name)) Utils.move_file(filepath, good) os.rmdir(dirpath) Log.debug('Done') def _remove_extra_output_folder(self): bad_path = os.path.join(self.cmor_path, 'output') if os.path.exists(bad_path): Log.debug('Moving CMOR files out of the output folder') Utils.move_tree(bad_path, self.cmor_path) Log.debug('Done') def create_links(self, startdate, member=None): if member: member_str = self._get_member_str(member) else: member_str = None Log.info('Creating links for CMOR files ({0})', startdate) path = self._get_startdate_path(startdate) for freq in os.listdir(path): frequency = Frequency.parse(freq) for domain in os.listdir(os.path.join(path, freq)): for var in os.listdir(os.path.join(path, freq, domain)): for member in os.listdir(os.path.join(path, freq, domain, var)): if member_str != member: continue for name in os.listdir(os.path.join(path, freq, domain, var, member)): filepath = os.path.join(path, freq, domain, var, member, name) if os.path.isfile(filepath): self._create_link(domain, filepath, frequency, var, "", False, vartype=VariableType.MEAN) else: for filename in os.listdir(filepath): self._create_link(domain, os.path.join(filepath, filename), frequency, var, "", False, vartype=VariableType.MEAN) Log.debug('Links ready') def _get_startdate_path(self, startdate): """ Returns the path to the startdate's CMOR folder :param startdate: target startdate :type startdate: str :return: path to the startdate's CMOR ยบ :rtype: str """ return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute, self.experiment.model, self.experiment.experiment_name, 'S' + startdate) def _get_member_str(self, member): return 'r{0}i1p1'.format(member + 1 - self.experiment.member_count_start)