# coding=utf-8 import glob from datetime import datetime import os from bscearth.utils.log import Log from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, previous_day from datafile import StorageStatus from diagnostic import Diagnostic from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.datamanager import DataManager from earthdiagnostics.frequency import Frequencies, Frequency from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import TempFile, Utils from earthdiagnostics.variable_type import VariableType class CMORManager(DataManager): """ Data manager class for CMORized experiments """ def __init__(self, config): super(CMORManager, self).__init__(config) self._dic_cmorized = dict() data_folders = self.config.data_dir.split(':') experiment_folder = self.experiment.model.lower() if experiment_folder.startswith('ec-earth'): experiment_folder = 'ecearth' self.config.data_dir = None for data_folder in data_folders: if os.path.isdir(os.path.join(data_folder, self.experiment.expid)): self.config.data_dir = data_folder break test_folder = os.path.join(data_folder, self.experiment.model.lower().replace('-', '')) if os.path.isdir(os.path.join(test_folder, self.experiment.expid)): self.config.data_dir = test_folder break test_folder = os.path.join(data_folder, self.config.data_type, experiment_folder) if os.path.isdir(os.path.join(test_folder, self.experiment.expid)): self.config.data_dir = test_folder break if not self.config.data_dir: raise Exception('Can not find model data') self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles') def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=VariableType.MEAN, possible_versions=None): cmor_var = self.variable_list.get_variable(var) filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None) # noinspection PyBroadException if possible_versions is None: # noinspection PyBroadException try: return os.path.isfile(filepath) except Exception: return False else: for version in possible_versions: # noinspection PyBroadException try: if os.path.isfile(filepath.replace(self.config.cmor.version, version)): return True except Exception: pass return False def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param **kwargs: :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str|NoneType :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency|NoneType :return: path to the copy created on the scratch folder :rtype: str """ cmor_var = self.variable_list.get_variable(var) var = self._get_final_var_name(box, var) filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None) return self._get_file_from_storage(filepath) def request_year(self, diagnostic, domain, var, startdate, member, year, grid=None, box=None, frequency=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param year: :param diagnostic: :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param grid: file's grid (only needed if it is not the original) :type grid: str|NoneType :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency|NoneType :return: path to the copy created on the scratch folder :rtype: str """ job = MergeYear(self, domain, var, startdate, member, year, grid, box, frequency) job.request_data() job.declare_data_generated() if not job.year_file.job_added: diagnostic.subjobs.append(job) job.year_file.job_added = True return job.year_file def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None, vartype=VariableType.MEAN, diagnostic=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param diagnostic: :param region: :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str|NoneType :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency|NoneType :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ if not frequency: frequency = self.config.frequency original_name = var cmor_var = self.variable_list.get_variable(var) if cmor_var: var = cmor_var.short_name final_name = self._get_final_var_name(box, var) filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, chunk, frequency, grid) netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention, region, diagnostic, grid, vartype, original_name) netcdf_file.frequency = frequency return netcdf_file def declare_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN, diagnostic=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param diagnostic: :param year: :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param grid: file's grid (only needed if it is not the original) :type grid: str|NoneType :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ original_name = var cmor_var = self.variable_list.get_variable(var) if cmor_var: var = cmor_var.short_name final_name = self._get_final_var_name(box, var) filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, None, Frequencies.yearly, grid, year=year) netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention, None, diagnostic, grid, vartype, original_name) netcdf_file.frequency = Frequencies.yearly return netcdf_file def get_file_path(self, startdate, member, domain, var, cmor_var, chunk, frequency, grid=None, year=None, date_str=None): """ Returns the path to a concrete file :param cmor_var: :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param domain: file's domain :type domain: Domain :param var: file's var :type var: var :param chunk: file's chunk :type chunk: int|NoneType :param frequency: file's frequency :type frequency: Frequency :param grid: file's grid :type grid: str|NoneType :param year: file's year :type year: int|str|NoneType :param date_str: date string to add directly. Overrides year or chunk configurations :type date_str: str|NoneType :return: path to the file :rtype: str|NoneType :param cmor_var: variable instance describing the selected variable :type cmor_var: Variable """ if not frequency: frequency = self.config.frequency folder_path = self._get_full_cmor_folder_path(startdate, member, domain, var, frequency, grid, cmor_var) file_name = self._get_cmor_file_name(startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid) filepath = os.path.join(folder_path, file_name) return filepath def _get_cmor_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): if cmor_var is None: cmor_table = domain.get_table(frequency, self.config.data_convention) else: cmor_table = cmor_var.get_table(frequency, self.config.data_convention) if chunk is not None: time_bound = self._get_chunk_time_bounds(startdate, chunk) elif year: if frequency != Frequencies.yearly: raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) elif date_str: time_bound = date_str else: raise ValueError('Chunk, year and date_str can not be None at the same time') if time_bound: time_bound = '_{0}.nc'.format(time_bound) else: time_bound = '.nc' if self.config.data_convention in ('specs', 'preface'): file_name = '{0}_{1}_{2}_{3}_S{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model, self.experiment.experiment_name, startdate, self._get_member_str(member), time_bound) elif self.config.data_convention in ('primavera', 'cmip6'): if not grid: if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]: grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid file_name = '{0}_{1}_{2}_{3}_{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.experiment_name, self.experiment.model, self._get_member_str(member), grid, time_bound) else: raise Exception('Data convention {0} not supported'.format(self.config.data_convention)) return file_name def _get_full_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var): if self.config.data_convention in ('specs', 'preface'): folder_path = os.path.join(self._get_startdate_path(startdate), str(frequency), domain.name, var) if grid: folder_path = os.path.join(folder_path, grid) folder_path = os.path.join(folder_path, self._get_member_str(member)) if self.config.cmor.version: folder_path = os.path.join(folder_path, self.config.cmor.version) else: if not grid: if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]: grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid folder_path = os.path.join(self._get_startdate_path(startdate), self._get_member_str(member), cmor_var.get_table(frequency, self.config.data_convention).name, var, grid, self.config.cmor.version) return folder_path def _get_chunk_time_bounds(self, startdate, chunk): start = parse_date(startdate) chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar) chunk_end = previous_day(chunk_end, self.experiment.calendar) if self.config.data_convention == 'preface': separator = '_' else: separator = '-' time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month, separator) return time_bound def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None, frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN): """ Creates the link of a given file from the CMOR repository. :param cmor_var: :param move_old: :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param domain: CMOR domain :type domain: Domain :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str :param cmor_var: variable instance describing the selected variable :type cmor_var: Variable """ if not frequency: frequency = self.config.frequency filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid=grid, year=str(year), date_str=date_str) self.create_link(domain, filepath, frequency, var, grid, move_old, vartype) # noinspection PyPep8Naming def prepare(self): """ Prepares the data to be used by the diagnostic. If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure will be launched If CMOR data is available but packed, the procedure will unpack it. :return: """ # Check if cmorized and convert if not for startdate, member in self.experiment.get_member_list(): if not self._unpack_cmor_files(startdate, member): self._cmorize_member(startdate, member) def is_cmorized(self, startdate, member, chunk): identifier = (startdate, member, chunk) if identifier not in self._dic_cmorized: self._dic_cmorized[identifier] = self._is_cmorized(startdate, member, chunk) if self._dic_cmorized[identifier]: return True return False def _is_cmorized(self, startdate, member, chunk): startdate_path = self._get_startdate_path(startdate) if not os.path.isdir(startdate_path): return False if self.config.data_convention == 'specs': for freq in os.listdir(startdate_path): for domain in (ModelingRealms.ocean, ModelingRealms.ocnBgchem, ModelingRealms.ocnBgchem, ModelingRealms.atmos): domain_path = os.path.join(startdate_path, freq, domain.name) if os.path.isdir(domain_path): for var in os.listdir(domain_path): cmor_var = self.variable_list.get_variable(var, True) var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, Frequency(freq)) if os.path.isfile(var_path): return True else: member_path = os.path.join(startdate_path, self._get_member_str(member)) if not os.path.isdir(member_path): return False for table, domain, freq in (('Amon', ModelingRealms.atmos, Frequencies.monthly), ('Omon', ModelingRealms.ocean, Frequencies.monthly), ('SImon', ModelingRealms.seaIce, Frequencies.monthly)): table_dir = os.path.join(member_path, table) if not os.path.isdir(table_dir): continue for var in os.listdir(table_dir): cmor_var = self.variable_list.get_variable(var, True) var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency=freq) if os.path.isfile(var_path): return True return False def _cmorize_member(self, startdate, member): start_time = datetime.now() member_str = self.experiment.get_member_str(member) Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time) cmorizer = Cmorizer(self, startdate, member) cmorizer.cmorize_ocean() cmorizer.cmorize_atmos() Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str, datetime.now() - start_time) def _unpack_cmor_files(self, startdate, member): if self.config.cmor.force: return False chunk = 1 cmorized = False if not self.config.cmor.force_untar: while self.is_cmorized(startdate, member, chunk): chunk += 1 while self._unpack_chunk(startdate, member, chunk): chunk += 1 cmorized = True if self.experiment.num_chunks <= chunk: cmorized = True if cmorized: Log.info('Startdate {0} member {1} ready', startdate, member) return cmorized def _unpack_chunk(self, startdate, member, chunk): filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz') if len(filepaths) > 0: if self.config.cmor.chunk_cmorization_requested(chunk): Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk) Utils.unzip(filepaths, True) else: return True if not os.path.exists(self.cmor_path): os.mkdir(self.cmor_path) filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar') if len(filepaths) > 0: if self.config.cmor.chunk_cmorization_requested(chunk): Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk) Utils.untar(filepaths, self.cmor_path) self._correct_paths(startdate) self.create_links(startdate, member) return True return False def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension): tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles') tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid, 'cmorfiles') file_name = 'CMOR?_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate, self.experiment.get_member_str(member), self.experiment.get_chunk_start_str(startdate, chunk), extension) filepaths = glob.glob(os.path.join(tar_path, file_name)) filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name)) filepaths += glob.glob(os.path.join(tar_original_files, file_name)) filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name)) return filepaths def _correct_paths(self, startdate): self._remove_extra_output_folder() self._fix_model_as_experiment_error(startdate) def _fix_model_as_experiment_error(self, startdate): if self.experiment.experiment_name != self.experiment.model: bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model, self.experiment.model) Log.debug('Correcting double model appearance') for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: if '_S{0}_'.format(startdate) in filename: continue filepath = os.path.join(dirpath, filename) good = filepath good = good.replace('_{0}_output_'.format(self.experiment.model), '_{0}_{1}_S{2}_'.format(self.experiment.model, self.experiment.experiment_name, startdate)) good = good.replace('/{0}/{0}'.format(self.experiment.model), '/{0}/{1}'.format(self.experiment.model, self.experiment.experiment_name)) Utils.move_file(filepath, good) if self.experiment.model != self.experiment.experiment_name: os.rmdir(dirpath) Log.debug('Done') def _remove_extra_output_folder(self): bad_path = os.path.join(self.cmor_path, 'output') if os.path.exists(bad_path): Log.debug('Moving CMOR files out of the output folder') Utils.move_tree(bad_path, self.cmor_path) Log.debug('Done') def create_links(self, startdate, member=None): if member is not None: member_str = self._get_member_str(member) else: member_str = None Log.info('Creating links for CMOR files ({0})', startdate) path = self._get_startdate_path(startdate) for freq in os.listdir(path): frequency = Frequency.parse(freq) for domain in os.listdir(os.path.join(path, freq)): for var in os.listdir(os.path.join(path, freq, domain)): for member in os.listdir(os.path.join(path, freq, domain, var)): if member_str != member: continue for name in os.listdir(os.path.join(path, freq, domain, var, member)): filepath = os.path.join(path, freq, domain, var, member, name) if os.path.isfile(filepath): self.create_link(domain, filepath, frequency, var, "", False, vartype=VariableType.MEAN) else: for filename in os.listdir(filepath): self.create_link(domain, os.path.join(filepath, filename), frequency, var, "", False, vartype=VariableType.MEAN) Log.debug('Links ready') def _get_startdate_path(self, startdate): """ Returns the path to the startdate's CMOR folder :param startdate: target startdate :type startdate: str :return: path to the startdate's CMOR ยบ :rtype: str """ if self.config.data_convention == 'specs': return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute, self.experiment.model, self.experiment.experiment_name, 'S' + startdate) elif self.config.data_convention == 'preface': return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute, self.experiment.experiment_name, 'S' + startdate) else: return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.config.cmor.activity, self.experiment.institute, self.experiment.model, self.experiment.experiment_name) def _get_member_str(self, member): if self.config.data_convention in ('specs', 'preface'): template = 'r{0}i{1}p1' elif self.config.data_convention in ('primavera', 'cmip6'): template = 'r{0}i{1}p1f1' else: raise Exception('Data convention {0} not supported'.format(self.config.data_convention)) return template.format(member + 1 - self.experiment.member_count_start, self.config.cmor.initialization_number) class MergeYear(Diagnostic): @classmethod def generate_jobs(cls, diags, options): pass def __init__(self, data_manager, domain, var, startdate, member, year, grid=None, box=None, frequency=None): super(MergeYear, self).__init__(data_manager) self.chunk_files = [] self.experiment = self.data_manager.experiment self.domain = domain self.var = var self.startdate = startdate self.member = member self.year = year self.grid = grid self.box = box self.frequency = frequency def request_data(self): for chunk in self.experiment.get_year_chunks(self.startdate, self.year): self.chunk_files.append(self.request_chunk(self.domain, self.var, self.startdate, self.member, chunk, grid=self.grid, box=self.box, frequency=self.frequency)) def declare_data_generated(self): self.year_file = self.declare_year(self.domain, self.var, self.startdate, self.member, self.year, grid=self.grid, box=self.box) self.year_file.storage_status = StorageStatus.NO_STORE def compute(self): temp = self._merge_chunk_files() temp2 = self._select_data_of_given_year(temp) self.year_file.set_local_file(temp2) def _select_data_of_given_year(self, data_file): temp2 = TempFile.get() handler = Utils.openCdf(data_file) times = Utils.get_datetime_from_netcdf(handler) x = 0 first_index = None last_index = None while x < times.size: if times[x].year == self.year: first_index = x break else: x += 1 while x < times.size: if times[x].year != self.year: last_index = x break else: x += 1 if last_index is None: last_index = times.size Utils.nco.ncks(input=data_file, output=temp2, options=['-d time,{0},{1}'.format(first_index, last_index - 1)]) return temp2 def _merge_chunk_files(self): temp = TempFile.get() if len(self.chunk_files) == 1: Utils.copy_file(self.chunk_files[0].local_file, temp) return temp Utils.nco.ncrcat(input=' '.join(self.chunk_files), output=temp) for chunk_file in self.chunk_files: os.remove(chunk_file) return temp def __str__(self): return 'Create year CMOR file Startdate: {0.startdate} Member: {0.member} Year: {0.year} ' \ 'Variable: {0.domain}:{0.var} Grid: {0.grid} Box: {0.box}'.format(self) def __eq__(self, other): return self.startdate == other.startdate and self.member == other.member and self.year == other.year and\ self.domain == other.domain and self.var == other.var and self.grid == other.grid and \ self.box == other.box