"""Module to manage the data conventions supported by EarthDiagnostics""" import os import shutil import re import threading from bscearth.utils.date import ( parse_date, chunk_start_date, chunk_end_date, previous_day, add_hours, ) from bscearth.utils.log import Log from earthdiagnostics.frequency import Frequency, Frequencies from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.variable import VariableType from earthdiagnostics.utils import Utils class DataConvention(object): """Base class to manage filename conventions""" def __init__(self, name, config): self.config = config self.name = name self.lat_name = "lat" self.lon_name = "lon" self.time_separator = "-" self.lock = threading.Lock() self._checked_vars = list() def get_scratch_masks(self, scratch_masks): """ Get the final scratch_masks path Parameters ---------- scratch_masks: str Returns ------- str """ return scratch_masks def get_file_path( self, startdate, member, domain, var, cmor_var, chunk, frequency, grid=None, year=None, date_str=None, ): """ Return the path to a concrete file Parameters ---------- startdate: str member: int domain: ModelingRealm var: str cmor_var: Variable chunk: int or None frequency: Frequency or str grid: str or None year: int or None date_str: str or None Returns ------- str Raises ------ ValueError If you provide two or more parameters from chunk, year or date_str or none at all """ if frequency is None: frequency = self.config.frequency frequency = Frequency.parse(frequency) folder_path = self.get_cmor_folder_path( startdate, member, domain, var, frequency, grid, cmor_var ) file_name = self.get_file_name( startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ) filepath = os.path.join(folder_path, file_name) return filepath def get_file_name( self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): """ Get filename for a given configuration Parameters ---------- startdate: str member: int domain: ModelingRealm var: str cmor_var: Variable frequency: Frequency chunk: int or None year: int or None date_str: str or None grid: str or None Returns ------- str Raises ------ NotImplementedError: If not implemented by derived classes """ raise NotImplementedError def get_cmor_folder_path( self, startdate, member, domain, var, frequency, grid, cmor_var ): """ Get the folder path following current CMOR convention Parameters ---------- startdate: str member: int domain: ModelingRealm var: str frequency: Frequency grid: str cmor_var: Variable Returns ------- str Raises ------ NotImplementedError: If not implemented by derived classes """ raise NotImplementedError def get_startdate_path(self, startdate): """ Return the path to the startdate's CMOR folder Parameters ---------- startdate: str Returns ------- str """ return os.path.join( self.config.data_dir, self.config.experiment.expid, "original_files", "cmorfiles", self.config.cmor.activity, self.config.experiment.institute, self.config.experiment.model, self.experiment_name(startdate), ) def experiment_name(self, startdate): """ Get experiment name, appending startdate if needed Parameters ---------- startdate: str Returns ------- str """ if self.config.cmor.append_startdate: return "{}S{}".format( self.config.experiment.experiment_name, startdate ) else: return self.config.experiment.experiment_name def get_member_str(self, member): """ Transalate member number to member string Parameters ---------- member: int Returns ------- str Raises ------ NotImplementedError: If not implemented by derived classes """ raise NotImplementedError def create_links(self, startdate, member=None): """ Create links for a given startdate or member Parameters ---------- startdate: str member: int or None """ if member is not None: member_str = self.get_member_str(member) else: member_str = None Log.info("Creating links for CMOR files ({0})", startdate) path = self.get_startdate_path(startdate) self._link_startdate(path, member_str) Log.debug("Links ready") def _link_startdate(self, path, member_str): raise NotImplementedError def create_link( self, domain, filepath, frequency, var, grid, move_old, vartype ): """ Create file link Parameters ---------- domain: ModelingRealm filepath: str frequency: Frequency var: str grid: str move_old: bool vartype: VariableType """ freq_str = frequency.folder_name(vartype) if not grid: grid = "original" variable_folder = domain.get_varfolder( var, self.config.experiment.ocean_timestep, self.config.experiment.atmos_timestep, ) vargrid_folder = domain.get_varfolder( var, self.config.experiment.ocean_timestep, self.config.experiment.atmos_timestep, grid=grid, ) self.lock.acquire() try: expid = self.config.experiment.expid if grid == "original": link_path = os.path.join( self.config.data_dir, expid, freq_str, variable_folder ) Utils.create_folder_tree(link_path) else: link_path = os.path.join( self.config.data_dir, expid, freq_str, vargrid_folder ) Utils.create_folder_tree(link_path) default_path = os.path.join( self.config.data_dir, expid, freq_str, variable_folder ) original_path = os.path.join( self.config.data_dir, expid, freq_str, vargrid_folder.replace( "-{0}_f".format(grid), "-original_f" ), ) if os.path.islink(default_path): os.remove(default_path) elif os.path.isdir(default_path): shutil.move(default_path, original_path) os.symlink(link_path, default_path) if move_old and link_path not in self._checked_vars: self._checked_vars.append(link_path) old_path = os.path.join( self.config.data_dir, expid, freq_str, "old_{0}".format(os.path.basename(link_path)), ) regex = re.compile(var + "_[0-9]{6,8}[.]nc") for filename in os.listdir(link_path): if regex.match(filename): Utils.create_folder_tree(old_path) Utils.move_file( os.path.join(link_path, filename), os.path.join(old_path, filename), ) link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): try: os.remove(link_path) except OSError: pass if not os.path.exists(filepath): raise ValueError( "Original file {0} does not exists".format(filepath) ) relative_path = os.path.relpath( filepath, os.path.dirname(link_path) ) try: os.symlink(relative_path, link_path) except OSError: pass except Exception: raise finally: self.lock.release() def _get_time_component(self, chunk, date_str, frequency, startdate, year): if len([x for x in (chunk, date_str, year) if x is not None]) > 1: raise ValueError( "Only one of the parameters chunk, year or date_str may be " "provided" ) if chunk is not None: time_bound = self._get_chunk_time_bounds( startdate, chunk, frequency ) elif year: if frequency != Frequencies.yearly: raise ValueError( 'Year may be provided instead of chunk only if ' 'frequency is "yr"' ) time_bound = str(year) elif date_str: time_bound = date_str else: raise ValueError("Time info not provided") return time_bound def _get_chunk_time_bounds(self, startdate, chunk, frequency): start = parse_date(startdate) chunk_start = chunk_start_date( start, chunk, self.config.experiment.chunk_size, "month", self.config.experiment.calendar, ) chunk_end = chunk_end_date( chunk_start, self.config.experiment.chunk_size, "month", self.config.experiment.calendar, ) chunk_end = previous_day(chunk_end, self.config.experiment.calendar) time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format( chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month, self.time_separator, ) return time_bound def _check_var_presence( self, folder, current_count, startdate, member, domain, chunk, freq ): for var in os.listdir(folder): cmor_var = self.config.var_manager.get_variable(var, True) var_path = self.get_file_path( startdate, member, domain, var, cmor_var, chunk, frequency=freq ) if os.path.isfile(var_path): current_count += 1 if current_count >= self.config.cmor.min_cmorized_vars: break return current_count def is_cmorized(self, startdate, member, chunk, domain): """ Check if a given chunk is cmorized for a given domain Parameters ---------- startdate: str member: str chunk: int domain: ModelingRealm Returns ------- bool Raises ------ NotImplementedError: If not implemented by the derived classes """ raise NotImplementedError class Cmor2Convention(DataConvention): """Base class for CMOR2-based conventions""" def get_file_name( self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): """ Get filename for a given configuration Parameters ---------- startdate: str member: int domain: ModelingRealm var: str cmor_var: Variable frequency: Frequency chunk: int or None year: int or None date_str: str or None grid: str or None Returns ------- str """ if cmor_var is None: cmor_table = domain.get_table( frequency, self.config.data_convention ) else: cmor_table = cmor_var.get_table( frequency, self.config.data_convention ) time_bound = self._get_time_component( chunk, date_str, frequency, startdate, year ) time_bound = "_{0}.nc".format(time_bound) file_name = "{0}_{1}_{2}_{3}_S{4}_{5}{6}".format( var, cmor_table.name, self.config.experiment.model, self.experiment_name(startdate), startdate, self.get_member_str(member), time_bound, ) return file_name def get_cmor_folder_path( self, startdate, member, domain, var, frequency, grid, cmor_var ): folder_path = os.path.join( self.get_startdate_path(startdate), str(frequency), domain.name, var, ) if grid and grid != "original": folder_path = os.path.join(folder_path, grid) folder_path = os.path.join(folder_path, self.get_member_str(member)) if self.config.cmor.version: folder_path = os.path.join(folder_path, self.config.cmor.version) return folder_path def get_member_str(self, member): """ Transalate member number to member string Parameters ---------- member: int Returns ------- str Raises ------ NotImplementedError: If not implemented by derived classes """ template = "r{0}i{1}p1" return template.format( member + 1 - self.config.experiment.member_count_start, self.config.cmor.initialization_number, ) def _link_startdate(self, path, member_str): for freq in os.listdir(path): Log.debug("Creating links for frequency {0}", freq) frequency = Frequency.parse(freq) for domain in os.listdir(os.path.join(path, freq)): Log.debug("Creating links for domain {0}", domain) for var in os.listdir(os.path.join(path, freq, domain)): for member in os.listdir( os.path.join(path, freq, domain, var) ): if member_str is not None and member_str != member: continue for name in os.listdir( os.path.join(path, freq, domain, var, member) ): filepath = os.path.join( path, freq, domain, var, member, name ) if os.path.isfile(filepath): self.create_link( ModelingRealms.parse(domain), filepath, frequency, var, "", False, vartype=VariableType.MEAN, ) else: for filename in os.listdir(filepath): self.create_link( ModelingRealms.parse(domain), os.path.join(filepath, filename), frequency, var, "", False, vartype=VariableType.MEAN, ) def is_cmorized(self, startdate, member, chunk, domain): """ Check if a given chunk is cmorized for a given domain Parameters ---------- startdate: str member: str chunk: int domain: ModelingRealm Returns ------- bool """ startdate_path = self.get_startdate_path(startdate) if not os.path.isdir(startdate_path): return False count = 0 for freq in os.listdir(startdate_path): domain_path = os.path.join(startdate_path, freq, domain.name) if os.path.isdir(domain_path): count = self._check_var_presence( domain_path, count, startdate, member, domain, chunk, freq ) if count >= self.config.cmor.min_cmorized_vars: return True return False class SPECSConvention(Cmor2Convention): """Base class for CMOR2-based conventions""" def get_startdate_path(self, startdate): """ Return the path to the startdate's CMOR folder Parameters ---------- startdate: str Returns ------- str """ return os.path.join( self.config.data_dir, self.config.experiment.expid, "original_files", "cmorfiles", self.config.experiment.institute, self.config.experiment.model, self.experiment_name(startdate), "S" + startdate, ) class PrefaceConvention(Cmor2Convention): """ Class to manage Preface convention Parameters ---------- name: str config: Config """ def __init__(self, name, config): super(PrefaceConvention, self).__init__(name, config) self.time_separator = "_" def get_startdate_path(self, startdate): """ Return the path to the startdate's CMOR folder Parameters ---------- startdate: str Returns ------- str """ return os.path.join( self.config.data_dir, self.config.experiment.expid, "original_files", "cmorfiles", self.config.experiment.institute, self.experiment_name(startdate), "S" + startdate, ) class Cmor3Convention(DataConvention): """ Base class for CMOR3-based conventions Parameters ---------- name: str config: Config """ def __init__(self, name, config): super(Cmor3Convention, self).__init__(name, config) self.lat_name = "latitude" self.lon_name = "longitude" def get_scratch_masks(self, scratch_masks): """ Get the final scratch_masks path Adds a folder matching the convention name to the configured path Parameters ---------- scratch_masks: str Returns ------- str """ return os.path.join(scratch_masks, self.name) def get_file_name( self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): """ Get filename for a given configuration Parameters ---------- startdate: str member: int domain: ModelingRealm var: str cmor_var: Variable frequency: Frequency chunk: int or None year: int or None date_str: str or None grid: str or None Returns ------- str """ if cmor_var is None: cmor_table = domain.get_table( frequency, self.config.data_convention ) else: cmor_table = cmor_var.get_table( frequency, self.config.data_convention ) time_bound = self._get_time_component( chunk, date_str, frequency, startdate, year ) time_bound = "_{0}.nc".format(time_bound) if not grid: if domain in [ ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean, ]: grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid if self.config.cmor.append_startdate: if self.config.cmor.append_startdate_year_only: startdate = startdate[0:4] subexp_id = "s{}-".format(startdate) else: subexp_id = "" file_name = ( f"{var}_{cmor_table.name}_{self.config.experiment.model}" f"_{self.experiment_name(startdate)}_{subexp_id}" f"{self.get_member_str(member)}_{grid}{time_bound}" ) return file_name def _get_chunk_time_bounds(self, startdate, chunk, frequency): start = parse_date(startdate) chunk_start = chunk_start_date( start, chunk, self.config.experiment.chunk_size, "month", self.config.experiment.calendar, ) chunk_end = chunk_end_date( chunk_start, self.config.experiment.chunk_size, "month", self.config.experiment.calendar, ) if frequency == Frequencies.monthly: chunk_end = previous_day( chunk_end, self.config.experiment.calendar ) time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format( chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month, self.time_separator, ) elif frequency == Frequencies.daily: chunk_end = previous_day( chunk_end, self.config.experiment.calendar ) time_bound = ( "{0.year:04}{0.month:02}{0.day:02}{2}" "{1.year:04}{1.month:02}{1.day:02}".format( chunk_start, chunk_end, self.time_separator ) ) elif frequency.frequency.endswith("hr"): chunk_end = add_hours( chunk_end, -int(frequency.frequency[:-2]), self.config.experiment.calendar, ) time_bound = ( "{0.year:04}{0.month:02}{0.day:02}{0.hour:02}{0.minute:02}{2}" "{1.year:04}{1.month:02}{1.day:02}{1.hour:02}" "{1.minute:02}".format( chunk_start, chunk_end, self.time_separator ) ) return time_bound def get_cmor_folder_path( self, startdate, member, domain, var, frequency, grid, cmor_var ): if not self.config.cmor.version: raise ValueError( "CMOR version is mandatory for PRIMAVERA and CMIP6" ) if not grid: if domain in [ ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean, ]: grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid if cmor_var is None: table_name = domain.get_table( frequency, self.config.data_convention ).name else: table_name = cmor_var.get_table( frequency, self.config.data_convention ).name folder_path = os.path.join( self.get_startdate_path(startdate), self.get_member_str(member), table_name, var, grid, self.config.cmor.version, ) return folder_path def _link_startdate(self, path, member_str): for member in os.listdir(path): for table in os.listdir(os.path.join(path, member)): frequency = self.config.var_manager.tables[table].frequency Log.debug("Creating links for table {0}", table) for var in os.listdir(os.path.join(path, member, table)): cmor_var = self.config.var_manager.get_variable( var, silent=True ) domain = None if cmor_var is not None: domain = cmor_var.domain if domain is None: domain = self.config.var_manager.tables[table].domain for grid in os.listdir( os.path.join(path, member, table, var) ): if member_str is not None and member_str != member: continue for name in os.listdir( os.path.join(path, member, table, var, grid) ): filepath = os.path.join( path, member, table, var, grid, name ) if os.path.isfile(filepath): self.create_link( domain, filepath, frequency, var, "", False, vartype=VariableType.MEAN, ) else: for filename in os.listdir(filepath): cmorfile = os.path.join(filepath, filename) self.create_link( domain, cmorfile, frequency, var, "", False, vartype=VariableType.MEAN, ) def experiment_name(self, startdate): """ Get experiment name. CMOR3 ignores append startdates as thery are added to the member str Parameters ---------- startdate: str Returns ------- str """ return self.config.experiment.experiment_name def get_member_str(self, member): """ Transalate member number to member string Parameters ---------- member: int Returns ------- str """ template = "r{0}i{1}p1f1" return template.format( member + 1 - self.config.experiment.member_count_start, self.config.cmor.initialization_number, ) def is_cmorized(self, startdate, member, chunk, domain): """ Check if a given chunk is cmorized for a given domain Parameters ---------- startdate: str member: str chunk: int domain: ModelingRealm Returns ------- bool """ startdate_path = self.get_startdate_path(startdate) if not os.path.isdir(startdate_path): return False count = 0 member_path = os.path.join(startdate_path, self.get_member_str(member)) if not os.path.isdir(member_path): return False freq = Frequencies.monthly table = domain.get_table(freq, self.config.data_convention) table_dir = os.path.join(member_path, table.name) if not os.path.isdir(table_dir): return False count = self._check_var_presence( table_dir, count, startdate, member, domain, chunk, freq ) if count >= self.config.cmor.min_cmorized_vars: return True return False class CMIP6Convention(Cmor3Convention): """Class managing CMIP6 file conventions""" pass class PrimaveraConvention(Cmor3Convention): """Class managing Primavera file conventions""" pass class MeteoFranceConvention(DataConvention): """Class managing MeteoFrance file conventions""" def get_file_name( self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): """ Get filename for a given configuration Parameters ---------- startdate: str member: int domain: ModelingRealm var: str cmor_var: Variable frequency: Frequency chunk: int or None year: int or None date_str: str or None grid: str or None Returns ------- str """ if year is not None: raise ValueError("Year not supported with MeteoFrance convention") if date_str is not None: raise ValueError( "Date_str not supported with MeteoFrance convention" ) if chunk is None: raise ValueError( "Chunk must be provided in MeteoFrance convention" ) time_bound = self._get_chunk_time_bounds(startdate, chunk, frequency) file_name = "{0}_{1}_{2}_{3}.nc".format( var, frequency, time_bound, self.get_member_str(member) ) return file_name def get_cmor_folder_path( self, startdate, member, domain, var, frequency, grid, cmor_var ): folder_path = os.path.join( self.config.data_dir, self.experiment_name(startdate), "H{0}".format(chr(64 + int(startdate[4:6]))), startdate[0:4], ) return folder_path def get_member_str(self, member): """ Transalate member number to member string Parameters ---------- member: int Returns ------- str """ return "{0:02d}".format(member) def _get_chunk_time_bounds(self, startdate, chunk, frequency): start = parse_date(startdate) chunk_start = chunk_start_date( start, chunk, self.config.experiment.chunk_size, "month", self.config.experiment.calendar, ) time_bound = "{0:04}{1:02}".format(chunk_start.year, chunk_start.month) return time_bound def create_link( self, domain, filepath, frequency, var, grid, move_old, vartype ): """ Create file link In this convention, it does nothing Parameters ---------- domain: ModelingRealm filepath: str frequency: Frequency var: str grid: str move_old: bool vartype: VariableType """ pass def create_links(self, startdate, member=None): """ Create links for a given startdate or member In this convention, it does nothing Parameters ---------- startdate: str member: int or None """ pass def is_cmorized(self, startdate, member, chunk, domain): """ Check if a given chunk is cmorized for a given domain Parameters ---------- startdate: str member: str chunk: int domain: ModelingRealm Returns ------- bool Raises ------ NotImplementedError: If not implemented by the derived classes """ return True