data_convention.py 22.6 KB
Newer Older
import os
import shutil
import re
import threading
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, previous_day, add_hours
from bscearth.utils.log import Log

from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.variable import VariableType
from earthdiagnostics.utils import Utils


class DataConvention(object):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Base class to manage filename conventions"""

    def __init__(self, name, config):
        self.config = config
        self.name = name
        self.lat_name = 'lat'
        self.lon_name = 'lon'
        self.time_separator = '-'
        self.lock = threading.Lock()
        self._checked_vars = list()
    def get_file_path(self, startdate, member, domain, var, cmor_var, chunk, frequency,
                      grid=None, year=None, date_str=None):
        """
        Return the path to a concrete file

        Parameters
        ----------
        startdate: str
        member: int
        domain: ModelingRealm
        var: str
        cmor_var: Variable
        chunk: int or None
        frequency: Frequency or str
        grid: str or None
        year: int or None
        date_str: str or None

        Returns
        -------
        str

        Raises
        ------
        ValueError
            If you provide two or more parameters from chunk, year or date_str or none at all

        """
        if frequency is None:
            frequency = self.config.frequency
        frequency = Frequency.parse(frequency)
        folder_path = self.get_cmor_folder_path(startdate, member, domain, var, frequency, grid, cmor_var)
        file_name = self.get_file_name(startdate, member, domain, var, cmor_var, frequency,
                                       chunk, year, date_str, grid)

        filepath = os.path.join(folder_path, file_name)
        return filepath

    def get_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Get filename for a given configuration

        Parameters
        ----------
        startdate: str
        member: int
        domain: ModelingRealm
        var: str
        cmor_var: Variable
        frequency: Frequency
        chunk: int or None
        year: int or None
        date_str: str or None
        grid: str or None

        Returns
        -------
        str

        Raises
        ------
        NotImplementedError:
            If not implemented by derived classes

        """
        raise NotImplementedError

    def get_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Get the folder path following current CMOR convention

        Parameters
        ----------
        startdate: str
        member: int
        domain: ModelingRealm
        var: str
        frequency: Frequency
        grid: str
        cmor_var: Variable

        Returns
        -------
        str

        Raises
        ------
        NotImplementedError:
            If not implemented by derived classes

        """
        raise NotImplementedError

    def get_startdate_path(self, startdate):
        """
        Return the path to the startdate's CMOR folder

        Parameters
        ----------
        startdate: str

        Returns
        -------
        str
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        """
        return os.path.join(self.config.data_dir, self.config.experiment.expid, 'cmorfiles', self.config.cmor.activity,
                            self.config.experiment.institute, self.config.experiment.model,
                            self.experiment_name(startdate))

    def experiment_name(self, startdate):
        """
        Get experiment name, appending startdate if needed

        Parameters
        ----------
        startdate: str

        Returns
        -------
        str

        """
        if self.config.cmor.append_startdate:
            return '{}S{}'.format(self.config.experiment.experiment_name, startdate)
        else:
            return self.config.experiment.experiment_name

    def get_member_str(self, member):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Transalate member number to member string

        Parameters
        ----------
        member: int

        Returns
        -------
        str

        Raises
        ------
        NotImplementedError:
            If not implemented by derived classes

        """
        raise NotImplementedError

    def create_links(self, startdate, member=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Create links for a given startdate or member

        Parameters
        ----------
        startdate: str
        member: int or None

        """
        if member is not None:
            member_str = self.get_member_str(member)
        else:
            member_str = None
        Log.info('Creating links for CMOR files ({0})', startdate)
        path = self.get_startdate_path(startdate)
        self._link_startdate(path, member_str)
        Log.debug('Links ready')

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _link_startdate(self, path, member_str):
        raise NotImplementedError

    def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
        """
        Create file link

        Parameters
        ----------
        domain: ModelingRealm
        filepath: str
        frequency: Frequency
        var: str
        grid: str
        move_old: bool
        vartype: VariableType

        """
        freq_str = frequency.folder_name(vartype)

        if not grid:
            grid = 'original'

        variable_folder = domain.get_varfolder(var, self.config.experiment.ocean_timestep,
                                               self.config.experiment.atmos_timestep)
        vargrid_folder = domain.get_varfolder(var, self.config.experiment.ocean_timestep,
                                              self.config.experiment.atmos_timestep, grid=grid)

        self.lock.acquire()
        try:
            expid = self.config.experiment.expid
            if grid == 'original':
                link_path = os.path.join(self.config.data_dir, expid, freq_str, variable_folder)
                Utils.create_folder_tree(link_path)
            else:
                link_path = os.path.join(self.config.data_dir, expid, freq_str, vargrid_folder)
                Utils.create_folder_tree(link_path)
                default_path = os.path.join(self.config.data_dir, expid, freq_str, variable_folder)
                original_path = os.path.join(self.config.data_dir, expid, freq_str,
                                             vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))

                if os.path.islink(default_path):
                    os.remove(default_path)
                elif os.path.isdir(default_path):
                    shutil.move(default_path, original_path)
                os.symlink(link_path, default_path)

            if move_old and link_path not in self._checked_vars:
                self._checked_vars.append(link_path)
                old_path = os.path.join(self.config.data_dir, expid, freq_str,
                                        'old_{0}'.format(os.path.basename(link_path)))
                regex = re.compile(var + '_[0-9]{6,8}[.]nc')
                for filename in os.listdir(link_path):
                    if regex.match(filename):
                        Utils.create_folder_tree(old_path)
                        Utils.move_file(os.path.join(link_path, filename),
                                        os.path.join(old_path, filename))

            link_path = os.path.join(link_path, os.path.basename(filepath))
            if os.path.lexists(link_path):
                os.remove(link_path)
            if not os.path.exists(filepath):
                raise ValueError('Original file {0} does not exists'.format(filepath))
            relative_path = os.path.relpath(filepath, os.path.dirname(link_path))
            os.symlink(relative_path, link_path)
        except Exception:
            raise
        finally:
            self.lock.release()

    def _get_time_component(self, chunk, date_str, frequency, startdate, year):
        if chunk is not None:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            time_bound = self._get_chunk_time_bounds(startdate, chunk, frequency)
        elif year:
            if frequency != Frequencies.yearly:
                raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
            time_bound = str(year)
            time_bound = date_str
        else:
            raise ValueError('Time info not provided')
        return time_bound

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _get_chunk_time_bounds(self, startdate, chunk, frequency):
        start = parse_date(startdate)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        chunk_start = chunk_start_date(start, chunk, self.config.experiment.chunk_size, 'month',
                                       self.config.experiment.calendar)
        chunk_end = chunk_end_date(chunk_start, self.config.experiment.chunk_size, 'month',
                                   self.config.experiment.calendar)
        chunk_end = previous_day(chunk_end, self.config.experiment.calendar)
        time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                          chunk_end.month, self.time_separator)
        return time_bound

    def _check_var_presence(self, folder, current_count, startdate, member, domain, chunk, freq):
        for var in os.listdir(folder):
            cmor_var = self.config.var_manager.get_variable(var, True)
            var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency=freq)
            if os.path.isfile(var_path):
                current_count += 1
                if current_count >= self.config.cmor.min_cmorized_vars:
                    break
        return current_count

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def is_cmorized(self, startdate, member, chunk, domain):
        raise NotImplementedError


class Cmor2Convention(DataConvention):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Base class for CMOR2-based conventions"""

    def get_scratch_masks(self, scratch_masks):
        return scratch_masks

    def get_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ):
        if cmor_var is None:
            cmor_table = domain.get_table(frequency, self.config.data_convention)
        else:
            cmor_table = cmor_var.get_table(frequency, self.config.data_convention)

        time_bound = self._get_time_component(chunk, date_str, frequency, startdate, year)
        time_bound = '_{0}.nc'.format(time_bound)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        file_name = '{0}_{1}_{2}_{3}_S{4}_{5}{6}'.format(var, cmor_table.name, self.config.experiment.model,
                                                         self.experiment_name(startdate), startdate,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                                         self.get_member_str(member), time_bound)
        return file_name

    def get_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var):
        folder_path = os.path.join(self.get_startdate_path(startdate), str(frequency), domain.name, var)
        if grid:
            folder_path = os.path.join(folder_path, grid)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        folder_path = os.path.join(folder_path, self.get_member_str(member))
        if self.config.cmor.version:
            folder_path = os.path.join(folder_path, self.config.cmor.version)
        return folder_path

    def get_member_str(self, member):
        template = 'r{0}i{1}p1'
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return template.format(member + 1 - self.config.experiment.member_count_start,
                               self.config.cmor.initialization_number)
    def _link_startdate(self, path, member_str):
        for freq in os.listdir(path):
            Log.debug('Creating links for frequency {0}', freq)
            frequency = Frequency.parse(freq)
            for domain in os.listdir(os.path.join(path, freq)):
                Log.debug('Creating links for domain {0}', domain)
                for var in os.listdir(os.path.join(path, freq, domain)):
                    for member in os.listdir(os.path.join(path, freq, domain, var)):
                        if member_str is not None and member_str != member:
                            continue
                        for name in os.listdir(os.path.join(path, freq, domain, var, member)):
                            filepath = os.path.join(path, freq, domain, var, member, name)
                            if os.path.isfile(filepath):
                                self.create_link(domain, filepath, frequency, var, "", False,
                                                 vartype=VariableType.MEAN)
                            else:
                                for filename in os.listdir(filepath):
                                    self.create_link(domain, os.path.join(filepath, filename), frequency, var,
                                                     "", False, vartype=VariableType.MEAN)
    def is_cmorized(self, startdate, member, chunk, domain):
        startdate_path = self.get_startdate_path(startdate)
        if not os.path.isdir(startdate_path):
            return False
        count = 0
        for freq in os.listdir(startdate_path):
            domain_path = os.path.join(startdate_path, freq, domain.name)
            if os.path.isdir(domain_path):
                count = self._check_var_presence(domain_path, count, startdate, member, domain, chunk, freq)
                if count >= self.config.cmor.min_cmorized_vars:
                    return True
        return False

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class SPECSConvention(Cmor2Convention):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Base class for CMOR2-based conventions"""
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def get_startdate_path(self, startdate):
        return os.path.join(self.config.data_dir, self.config.experiment.expid, 'cmorfiles',
                            self.config.experiment.institute,
                            self.config.experiment.model, self.experiment_name(startdate), 'S' + startdate)


class PrefaceConvention(Cmor2Convention):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    Class to manage Preface convention
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    Parameters
    ----------
    name: str
    config: Config

    """
    def __init__(self, name, config):
        super(PrefaceConvention, self).__init__(name, config)
        self.time_separator = '_'

    def get_startdate_path(self, startdate):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return os.path.join(self.config.data_dir, self.config.experiment.expid, 'cmorfiles',
                            self.config.experiment.institute,
                            self.experiment_name(startdate), 'S' + startdate)


class Cmor3Convention(DataConvention):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    Base class for CMOR3-based conventions

    Parameters
    ----------
    name: str
    config: Config

    """
    def __init__(self, name, config):
        super(Cmor3Convention, self).__init__(name, config)
        self.lat_name = 'latitude'
        self.lon_name = 'longitude'

    def get_scratch_masks(self, scratch_masks):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return os.path.join(scratch_masks, self.name)

    def get_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ):
        if cmor_var is None:
            cmor_table = domain.get_table(frequency, self.config.data_convention)
        else:
            cmor_table = cmor_var.get_table(frequency, self.config.data_convention)

        time_bound = self._get_time_component(chunk, date_str, frequency, startdate, year)
        time_bound = '_{0}.nc'.format(time_bound)

        if not grid:
            if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]:
                grid = self.config.cmor.default_ocean_grid
            else:
                grid = self.config.cmor.default_atmos_grid
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        file_name = '{0}_{1}_{2}_{3}_{4}_{5}{6}'.format(var, cmor_table.name, self.config.experiment.model,
                                                        self.experiment_name(startdate),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                                        self.get_member_str(member),
                                                        grid, time_bound)
        return file_name

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _get_chunk_time_bounds(self, startdate, chunk, frequency):
        start = parse_date(startdate)
        chunk_start = chunk_start_date(start, chunk, self.config.experiment.chunk_size, 'month',
                                       self.config.experiment.calendar)
        chunk_end = chunk_end_date(chunk_start, self.config.experiment.chunk_size, 'month',
                                   self.config.experiment.calendar)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if frequency == Frequencies.monthly:
            chunk_end = previous_day(chunk_end, self.config.experiment.calendar)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                              chunk_end.month, self.time_separator)
        elif frequency == Frequencies.daily:
            chunk_end = previous_day(chunk_end, self.config.experiment.calendar)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            time_bound = "{0.year:04}{0.month:02}{0.day:02}{2}" \
                         "{1.year:04}{1.month:02}{1.day:02}".format(chunk_start, chunk_end, self.time_separator)
        elif frequency.frequency.endswith('hr'):
            chunk_end = add_hours(chunk_end, -int(frequency.frequency[:-2]), self.config.experiment.calendar)
            time_bound = "{0.year:04}{0.month:02}{0.day:02}{0.hour:02}{0.minute:02}{2}" \
                         "{1.year:04}{1.month:02}{1.day:02}{1.hour:02}{1.minute:02}".format(chunk_start,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                                                                            chunk_end,
                                                                                            self.time_separator)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return time_bound

    def get_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var):
        if not self.config.cmor.version:
            raise ValueError('CMOR version is mandatory for PRIMAVERA and CMIP6')
        if not grid:
            if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]:
                grid = self.config.cmor.default_ocean_grid
            else:
                grid = self.config.cmor.default_atmos_grid
        if cmor_var is None:
            table_name = domain.get_table(frequency, self.config.data_convention).name
        else:
            table_name = cmor_var.get_table(frequency, self.config.data_convention).name
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        folder_path = os.path.join(self.get_startdate_path(startdate), self.get_member_str(member),
                                   table_name, var,
                                   grid, self.config.cmor.version)
        return folder_path

    def _link_startdate(self, path, member_str):
        for member in os.listdir(path):
            for table in os.listdir(os.path.join(path, member)):
                frequency = self.config.var_manager.tables[table].frequency
                Log.debug('Creating links for table {0}', table)
                for var in os.listdir(os.path.join(path, member, table)):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    domain = self.config.var_manager.get_variable(var, silent=True).domain
                    for grid in os.listdir(os.path.join(path, member, table, var)):
                        if member_str is not None and member_str != member:
                            continue
                        for name in os.listdir(os.path.join(path, member, table, var, grid)):
                            filepath = os.path.join(path, member, table, var, grid, name)
                            if os.path.isfile(filepath):
                                self.create_link(domain, filepath, frequency, var, "", False,
                                                 vartype=VariableType.MEAN)
                            else:
                                for filename in os.listdir(filepath):
                                    cmorfile = os.path.join(filepath, filename)
                                    self.create_link(domain, cmorfile, frequency, var, "",
                                                     False, vartype=VariableType.MEAN)

    def get_member_str(self, member):
        template = 'r{0}i{1}p1f1'
        return template.format(member + 1 - self.config.experiment.member_count_start,
                               self.config.cmor.initialization_number)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def is_cmorized(self, startdate, member, chunk, domain):
        startdate_path = self.get_startdate_path(startdate)
        if not os.path.isdir(startdate_path):
            return False
        count = 0
        member_path = os.path.join(startdate_path, self.get_member_str(member))
        if not os.path.isdir(member_path):
            return False
        freq = Frequencies.monthly
        table = domain.get_table(freq, self.config.data_convention)
        table_dir = os.path.join(member_path, table.name)
        if not os.path.isdir(table_dir):
            return False
        count = self._check_var_presence(table_dir, count, startdate, member, domain, chunk, freq)
        if count >= self.config.cmor.min_cmorized_vars:
            return True
        return False

class CMIP6Convention(Cmor3Convention):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Class managing CMIP6 file conventions"""
class PrimaveraConvention(Cmor3Convention):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Class managing Primavera file conventions"""
class MeteoFranceConvention(DataConvention):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Class managing MeteoFrance file conventions"""

    def get_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid,):
        if year is not None:
            raise ValueError('Year not supported with MeteoFrance convention')
        if date_str is not None:
            raise ValueError('Date_str not supported with MeteoFrance convention')
        if chunk is None:
            raise ValueError('Chunk must be provided in MeteoFrance convention')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        time_bound = self._get_chunk_time_bounds(startdate, chunk, frequency)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        file_name = '{0}_{1}_{2}_{3}.nc'.format(var, frequency, time_bound, self.get_member_str(member))
        return file_name

    def get_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var):
        folder_path = os.path.join(self.config.data_dir, self.experiment_name(startdate),
                                   'H{0}'.format(chr(64 + int(startdate[4:6]))),
                                   startdate[0:4])
        return folder_path

    def get_member_str(self, member):
        return '{0:02d}'.format(member)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _get_chunk_time_bounds(self, startdate, chunk, frequency):
        start = parse_date(startdate)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        chunk_start = chunk_start_date(start, chunk, self.config.experiment.chunk_size, 'month',
                                       self.config.experiment.calendar)
        time_bound = "{0:04}{1:02}".format(chunk_start.year, chunk_start.month)
        return time_bound

    def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
        pass

    def create_links(self, startdate, member=None):
        pass

    def is_cmorized(self, startdate, member, chunk, domain):
        return True