import os import shutil import re import threading from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, previous_day, add_hours from bscearth.utils.log import Log from earthdiagnostics.frequency import Frequency, Frequencies from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.variable import VariableType from earthdiagnostics.utils import Utils class DataConvention(object): """Base class to manage filename conventions""" def __init__(self, name, config): self.config = config self.name = name self.lat_name = 'lat' self.lon_name = 'lon' self.time_separator = '-' self.lock = threading.Lock() self._checked_vars = list() def get_file_path(self, startdate, member, domain, var, cmor_var, chunk, frequency, grid=None, year=None, date_str=None): """ Return the path to a concrete file Parameters ---------- startdate: str member: int domain: ModelingRealm var: str cmor_var: Variable chunk: int or None frequency: Frequency or str grid: str or None year: int or None date_str: str or None Returns ------- str Raises ------ ValueError If you provide two or more parameters from chunk, year or date_str or none at all """ if frequency is None: frequency = self.config.frequency frequency = Frequency.parse(frequency) folder_path = self.get_cmor_folder_path(startdate, member, domain, var, frequency, grid, cmor_var) file_name = self.get_file_name(startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid) filepath = os.path.join(folder_path, file_name) return filepath def get_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): """ Get filename for a given configuration Parameters ---------- startdate: str member: int domain: ModelingRealm var: str cmor_var: Variable frequency: Frequency chunk: int or None year: int or None date_str: str or None grid: str or None Returns ------- str Raises ------ NotImplementedError: If not implemented by derived classes """ raise NotImplementedError def get_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var): """ Get the folder path following current CMOR convention Parameters ---------- startdate: str member: int domain: ModelingRealm var: str frequency: Frequency grid: str cmor_var: Variable Returns ------- str Raises ------ NotImplementedError: If not implemented by derived classes """ raise NotImplementedError def get_startdate_path(self, startdate): """ Return the path to the startdate's CMOR folder Parameters ---------- startdate: str Returns ------- str """ return os.path.join(self.config.data_dir, self.config.experiment.expid, 'cmorfiles', self.config.cmor.activity, self.config.experiment.institute, self.config.experiment.model, self.experiment_name(startdate)) def experiment_name(self, startdate): """ Get experiment name, appending startdate if needed Parameters ---------- startdate: str Returns ------- str """ if self.config.cmor.append_startdate: return '{}S{}'.format(self.config.experiment.experiment_name, startdate) else: return self.config.experiment.experiment_name def get_member_str(self, member): """ Transalate member number to member string Parameters ---------- member: int Returns ------- str Raises ------ NotImplementedError: If not implemented by derived classes """ raise NotImplementedError def create_links(self, startdate, member=None): """ Create links for a given startdate or member Parameters ---------- startdate: str member: int or None """ if member is not None: member_str = self.get_member_str(member) else: member_str = None Log.info('Creating links for CMOR files ({0})', startdate) path = self.get_startdate_path(startdate) self._link_startdate(path, member_str) Log.debug('Links ready') def _link_startdate(self, path, member_str): raise NotImplementedError def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype): """ Create file link Parameters ---------- domain: ModelingRealm filepath: str frequency: Frequency var: str grid: str move_old: bool vartype: VariableType """ freq_str = frequency.folder_name(vartype) if not grid: grid = 'original' variable_folder = domain.get_varfolder(var, self.config.experiment.ocean_timestep, self.config.experiment.atmos_timestep) vargrid_folder = domain.get_varfolder(var, self.config.experiment.ocean_timestep, self.config.experiment.atmos_timestep, grid=grid) self.lock.acquire() try: expid = self.config.experiment.expid if grid == 'original': link_path = os.path.join(self.config.data_dir, expid, freq_str, variable_folder) Utils.create_folder_tree(link_path) else: link_path = os.path.join(self.config.data_dir, expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) default_path = os.path.join(self.config.data_dir, expid, freq_str, variable_folder) original_path = os.path.join(self.config.data_dir, expid, freq_str, vargrid_folder.replace('-{0}_f'.format(grid), '-original_f')) if os.path.islink(default_path): os.remove(default_path) elif os.path.isdir(default_path): shutil.move(default_path, original_path) os.symlink(link_path, default_path) if move_old and link_path not in self._checked_vars: self._checked_vars.append(link_path) old_path = os.path.join(self.config.data_dir, expid, freq_str, 'old_{0}'.format(os.path.basename(link_path))) regex = re.compile(var + '_[0-9]{6,8}[.]nc') for filename in os.listdir(link_path): if regex.match(filename): Utils.create_folder_tree(old_path) Utils.move_file(os.path.join(link_path, filename), os.path.join(old_path, filename)) link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) if not os.path.exists(filepath): raise ValueError('Original file {0} does not exists'.format(filepath)) relative_path = os.path.relpath(filepath, os.path.dirname(link_path)) os.symlink(relative_path, link_path) except Exception: raise finally: self.lock.release() def _get_time_component(self, chunk, date_str, frequency, startdate, year): if chunk is not None: time_bound = self._get_chunk_time_bounds(startdate, chunk, frequency) elif year: if frequency != Frequencies.yearly: raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) elif date_str: time_bound = date_str else: raise ValueError('Time info not provided') return time_bound def _get_chunk_time_bounds(self, startdate, chunk, frequency): start = parse_date(startdate) chunk_start = chunk_start_date(start, chunk, self.config.experiment.chunk_size, 'month', self.config.experiment.calendar) chunk_end = chunk_end_date(chunk_start, self.config.experiment.chunk_size, 'month', self.config.experiment.calendar) chunk_end = previous_day(chunk_end, self.config.experiment.calendar) time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month, self.time_separator) return time_bound def _check_var_presence(self, folder, current_count, startdate, member, domain, chunk, freq): for var in os.listdir(folder): cmor_var = self.config.var_manager.get_variable(var, True) var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency=freq) if os.path.isfile(var_path): current_count += 1 if current_count >= self.config.cmor.min_cmorized_vars: break return current_count def is_cmorized(self, startdate, member, chunk, domain): raise NotImplementedError class Cmor2Convention(DataConvention): """Base class for CMOR2-based conventions""" def get_scratch_masks(self, scratch_masks): return scratch_masks def get_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): if cmor_var is None: cmor_table = domain.get_table(frequency, self.config.data_convention) else: cmor_table = cmor_var.get_table(frequency, self.config.data_convention) time_bound = self._get_time_component(chunk, date_str, frequency, startdate, year) time_bound = '_{0}.nc'.format(time_bound) file_name = '{0}_{1}_{2}_{3}_S{4}_{5}{6}'.format(var, cmor_table.name, self.config.experiment.model, self.experiment_name(startdate), startdate, self.get_member_str(member), time_bound) return file_name def get_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var): folder_path = os.path.join(self.get_startdate_path(startdate), str(frequency), domain.name, var) if grid: folder_path = os.path.join(folder_path, grid) folder_path = os.path.join(folder_path, self.get_member_str(member)) if self.config.cmor.version: folder_path = os.path.join(folder_path, self.config.cmor.version) return folder_path def get_member_str(self, member): template = 'r{0}i{1}p1' return template.format(member + 1 - self.config.experiment.member_count_start, self.config.cmor.initialization_number) def _link_startdate(self, path, member_str): for freq in os.listdir(path): Log.debug('Creating links for frequency {0}', freq) frequency = Frequency.parse(freq) for domain in os.listdir(os.path.join(path, freq)): Log.debug('Creating links for domain {0}', domain) for var in os.listdir(os.path.join(path, freq, domain)): for member in os.listdir(os.path.join(path, freq, domain, var)): if member_str is not None and member_str != member: continue for name in os.listdir(os.path.join(path, freq, domain, var, member)): filepath = os.path.join(path, freq, domain, var, member, name) if os.path.isfile(filepath): self.create_link(domain, filepath, frequency, var, "", False, vartype=VariableType.MEAN) else: for filename in os.listdir(filepath): self.create_link(domain, os.path.join(filepath, filename), frequency, var, "", False, vartype=VariableType.MEAN) def is_cmorized(self, startdate, member, chunk, domain): startdate_path = self.get_startdate_path(startdate) if not os.path.isdir(startdate_path): return False count = 0 for freq in os.listdir(startdate_path): domain_path = os.path.join(startdate_path, freq, domain.name) if os.path.isdir(domain_path): count = self._check_var_presence(domain_path, count, startdate, member, domain, chunk, freq) if count >= self.config.cmor.min_cmorized_vars: return True return False class SPECSConvention(Cmor2Convention): """Base class for CMOR2-based conventions""" def get_startdate_path(self, startdate): return os.path.join(self.config.data_dir, self.config.experiment.expid, 'cmorfiles', self.config.experiment.institute, self.config.experiment.model, self.experiment_name(startdate), 'S' + startdate) class PrefaceConvention(Cmor2Convention): """ Class to manage Preface convention Parameters ---------- name: str config: Config """ def __init__(self, name, config): super(PrefaceConvention, self).__init__(name, config) self.time_separator = '_' def get_startdate_path(self, startdate): return os.path.join(self.config.data_dir, self.config.experiment.expid, 'cmorfiles', self.config.experiment.institute, self.experiment_name(startdate), 'S' + startdate) class Cmor3Convention(DataConvention): """ Base class for CMOR3-based conventions Parameters ---------- name: str config: Config """ def __init__(self, name, config): super(Cmor3Convention, self).__init__(name, config) self.lat_name = 'latitude' self.lon_name = 'longitude' def get_scratch_masks(self, scratch_masks): return os.path.join(scratch_masks, self.name) def get_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): if cmor_var is None: cmor_table = domain.get_table(frequency, self.config.data_convention) else: cmor_table = cmor_var.get_table(frequency, self.config.data_convention) time_bound = self._get_time_component(chunk, date_str, frequency, startdate, year) time_bound = '_{0}.nc'.format(time_bound) if not grid: if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]: grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid file_name = '{0}_{1}_{2}_{3}_{4}_{5}{6}'.format(var, cmor_table.name, self.config.experiment.model, self.experiment_name(startdate), self.get_member_str(member), grid, time_bound) return file_name def _get_chunk_time_bounds(self, startdate, chunk, frequency): start = parse_date(startdate) chunk_start = chunk_start_date(start, chunk, self.config.experiment.chunk_size, 'month', self.config.experiment.calendar) chunk_end = chunk_end_date(chunk_start, self.config.experiment.chunk_size, 'month', self.config.experiment.calendar) if frequency == Frequencies.monthly: chunk_end = previous_day(chunk_end, self.config.experiment.calendar) time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month, self.time_separator) elif frequency == Frequencies.daily: chunk_end = previous_day(chunk_end, self.config.experiment.calendar) time_bound = "{0.year:04}{0.month:02}{0.day:02}{2}" \ "{1.year:04}{1.month:02}{1.day:02}".format(chunk_start, chunk_end, self.time_separator) elif frequency.frequency.endswith('hr'): chunk_end = add_hours(chunk_end, -int(frequency.frequency[:-2]), self.config.experiment.calendar) time_bound = "{0.year:04}{0.month:02}{0.day:02}{0.hour:02}{0.minute:02}{2}" \ "{1.year:04}{1.month:02}{1.day:02}{1.hour:02}{1.minute:02}".format(chunk_start, chunk_end, self.time_separator) return time_bound def get_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var): if not self.config.cmor.version: raise ValueError('CMOR version is mandatory for PRIMAVERA and CMIP6') if not grid: if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]: grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid if cmor_var is None: table_name = domain.get_table(frequency, self.config.data_convention).name else: table_name = cmor_var.get_table(frequency, self.config.data_convention).name folder_path = os.path.join(self.get_startdate_path(startdate), self.get_member_str(member), table_name, var, grid, self.config.cmor.version) return folder_path def _link_startdate(self, path, member_str): for member in os.listdir(path): for table in os.listdir(os.path.join(path, member)): frequency = self.config.var_manager.tables[table].frequency Log.debug('Creating links for table {0}', table) for var in os.listdir(os.path.join(path, member, table)): domain = self.config.var_manager.get_variable(var, silent=True).domain for grid in os.listdir(os.path.join(path, member, table, var)): if member_str is not None and member_str != member: continue for name in os.listdir(os.path.join(path, member, table, var, grid)): filepath = os.path.join(path, member, table, var, grid, name) if os.path.isfile(filepath): self.create_link(domain, filepath, frequency, var, "", False, vartype=VariableType.MEAN) else: for filename in os.listdir(filepath): cmorfile = os.path.join(filepath, filename) self.create_link(domain, cmorfile, frequency, var, "", False, vartype=VariableType.MEAN) def get_member_str(self, member): template = 'r{0}i{1}p1f1' return template.format(member + 1 - self.config.experiment.member_count_start, self.config.cmor.initialization_number) def is_cmorized(self, startdate, member, chunk, domain): startdate_path = self.get_startdate_path(startdate) if not os.path.isdir(startdate_path): return False count = 0 member_path = os.path.join(startdate_path, self.get_member_str(member)) if not os.path.isdir(member_path): return False freq = Frequencies.monthly table = domain.get_table(freq, self.config.data_convention) table_dir = os.path.join(member_path, table.name) if not os.path.isdir(table_dir): return False count = self._check_var_presence(table_dir, count, startdate, member, domain, chunk, freq) if count >= self.config.cmor.min_cmorized_vars: return True return False class CMIP6Convention(Cmor3Convention): """Class managing CMIP6 file conventions""" pass class PrimaveraConvention(Cmor3Convention): """Class managing Primavera file conventions""" pass class MeteoFranceConvention(DataConvention): """Class managing MeteoFrance file conventions""" def get_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid,): if year is not None: raise ValueError('Year not supported with MeteoFrance convention') if date_str is not None: raise ValueError('Date_str not supported with MeteoFrance convention') if chunk is None: raise ValueError('Chunk must be provided in MeteoFrance convention') time_bound = self._get_chunk_time_bounds(startdate, chunk, frequency) file_name = '{0}_{1}_{2}_{3}.nc'.format(var, frequency, time_bound, self.get_member_str(member)) return file_name def get_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var): folder_path = os.path.join(self.config.data_dir, self.experiment_name(startdate), 'H{0}'.format(chr(64 + int(startdate[4:6]))), startdate[0:4]) return folder_path def get_member_str(self, member): return '{0:02d}'.format(member) def _get_chunk_time_bounds(self, startdate, chunk, frequency): start = parse_date(startdate) chunk_start = chunk_start_date(start, chunk, self.config.experiment.chunk_size, 'month', self.config.experiment.calendar) time_bound = "{0:04}{1:02}".format(chunk_start.year, chunk_start.month) return time_bound def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype): pass def create_links(self, startdate, member=None): pass def is_cmorized(self, startdate, member, chunk, domain): return True