import glob from datetime import datetime import os from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.datamanager import DataManager from earthdiagnostics.utils import TempFile, Utils class CMORManager(DataManager): """ Data manager class for CMORized experiments """ def get_file_path(self, startdate, member, domain, var, chunk, frequency, box=None, grid=None, year=None, date_str=None): """ Returns the path to a concrete file :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param domain: file's domain :type domain: str :param var: file's var :type var: str :param chunk: file's chunk :type chunk: int :param frequency: file's frequency :type frequency: str :param box: file's box :type box: Box :param grid: file's grid :type grid: str :param year: file's year :type year: int :param date_str: date string to add directly. Overrides year or chunk configurations :type date_str: str :return: path to the file :rtype: str """ if not frequency: frequency = self.config.frequency var = self._get_final_var_name(box, var) domain_abreviattion = self.get_domain_abbreviation(domain, frequency) start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self._get_startdate_path(startdate), frequency, domain) if chunk is not None: chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month) elif year: if frequency is not 'yr': raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) elif date_str: time_bound = date_str else: raise ValueError('Chunk, year and date_str can not be None at the same time') if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_' '{6}.nc'.format(var, domain_abreviattion, self.experiment.model, self.experiment.experiment_name, startdate, member_plus, time_bound)) return filepath def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None, frequency=None, year=None, date_str=None, move_old=False): """ Creates the link of a given file from the CMOR repository. :param move_old: :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ var = self._get_final_var_name(box, var) if not frequency: frequency = self.config.frequency domain = DataManager.correct_domain(domain) filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, grid, year, date_str) self._create_link(domain, filepath, frequency, var, grid, move_old) def get_year(self, domain, var, startdate, member, year, grid=None, box=None): """ Ge a file containing all the data for one year for one variable :param domain: variable's domain :type domain: str :param var: variable's name :type var: str :param startdate: startdate to retrieve :type startdate: str :param member: member to retrieve :type member: int :param year: year to retrieve :type year: int :param grid: variable's grid :type grid: str :param box: variable's box :type box: Box :return: """ chunk_files = list() for chunk in self.experiment.get_year_chunks(startdate, year): chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box)) if len(chunk_files) > 1: temp = TempFile.get() Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp) for chunk_file in chunk_files: os.remove(chunk_file) else: temp = chunk_files[0] temp2 = TempFile.get() handler = Utils.openCdf(temp) time = Utils.get_datetime_from_netcdf(handler) handler.close() start = None end = None for x in range(0, len(time)): date = time[x] if date.year == year: if date.month == 1: start = x elif date.month == 12: end = x Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end)) os.remove(temp) return temp2 def _is_cmorized(self, startdate, member): startdate_path = self._get_startdate_path(startdate) if not os.path.exists(startdate_path): return False for freq in os.listdir(startdate_path): freq_path = os.path.join(startdate_path, freq) for domain in os.listdir(freq_path): domain_path = os.path.join(freq_path, domain) for var in os.listdir(domain_path): member_path = os.path.join(domain_path, var, 'r{0}i1p1'.format(member + 1)) if os.path.exists(member_path): return True return False # noinspection PyPep8Naming def prepare_CMOR_files(self): """ Prepares the data to be used by the diagnostic. If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure will be launched If CMOR data is available but packed, the procedure will unpack it. :return: """ # Check if cmorized and convert if not for startdate, member in self.experiment.get_member_list(): if self._is_cmorized(startdate, member) and not self.config.cmor.force: continue member_str = self.experiment.get_member_str(member) if not self.config.cmor.force: tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles') tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid, 'cmorfiles') file_name = 'CMOR?_{0}_{1}_*.tar.gz'.format(self.experiment.expid, startdate, member_str) filepaths = glob.glob(os.path.join(tar_path, file_name)) filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name)) filepaths += glob.glob(os.path.join(tar_original_files, file_name)) filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name)) if len(filepaths) > 0: Log.info('Unzipping cmorized data...') Utils.unzip(filepaths, True) if not os.path.exists(self.cmor_path): os.mkdir(self.cmor_path) file_name = 'CMOR?_{0}_{1}_*.tar'.format(self.experiment.expid, startdate, member_str) filepaths = glob.glob(os.path.join(tar_path, file_name)) filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name)) filepaths += glob.glob(os.path.join(tar_original_files, file_name)) filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name)) if len(filepaths) > 0: Log.info('Unpacking cmorized data...') Utils.untar(filepaths, self.cmor_path) self._correct_paths(startdate) self._create_links(startdate) continue start_time = datetime.now() Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time) cmorizer = Cmorizer(self, startdate, member) cmorizer.cmorize_ocean() cmorizer.cmorize_atmos() Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str, datetime.now() - start_time) def _correct_paths(self, startdate): bad_path = os.path.join(self.cmor_path, 'output', self.experiment.institute) if os.path.exists(bad_path): Log.debug('Moving CMOR files out of the output folder') Utils.execute_shell_command(['mv', bad_path, os.path.join(bad_path, '..', '..')]) os.rmdir(os.path.join(self.cmor_path, 'output')) Log.debug('Done') if self.experiment.experiment_name != self.experiment.model: bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model, self.experiment.model) Log.debug('Correcting double model appearance') for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_output_'.format(self.experiment.model), '_{0}_{1}_S{2}_'.format(self.experiment.model, self.experiment.experiment_name, startdate)) good = good.replace('/{0}/{0}'.format(self.experiment.model), '/{0}/{1}'.format(self.experiment.model, self.experiment.experiment_name)) Utils.move_file(filepath, good) os.rmdir(dirpath) Log.debug('Done') def _create_links(self, startdate): Log.info('Creating links for CMOR files ()') path = self._get_startdate_path(startdate) for freq in os.listdir(path): for domain in os.listdir(os.path.join(path, freq)): for var in os.listdir(os.path.join(path, freq, domain)): for member in os.listdir(os.path.join(path, freq, domain, var)): for name in os.listdir(os.path.join(path, freq, domain, var, member)): filepath = os.path.join(path, freq, domain, var, member, name) if os.path.isfile(filepath): self._create_link(domain, filepath, freq, var, "", False) else: for filename in os.listdir(filepath): self._create_link(domain, os.path.join(filepath, filename), freq, var, "", False) Log.info('Creating lings for CMOR files') def _get_startdate_path(self, startdate): """ Returns the path to the startdate's CMOR folder :param startdate: target startdate :type startdate: str :return: path to the startdate's CMOR folder :rtype: str """ return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute, self.experiment.model, self.experiment.experiment_name, 'S' + startdate)