cmormanager.py 35.3 KB
Newer Older
"""Classes to manage cmorized datasets"""
import os
import re
import shutil
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, previous_day
from bscearth.utils.log import Log
from datafile import StorageStatus
from diagnostic import Diagnostic
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.frequency import Frequencies, Frequency
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable_type import VariableType


class CMORManager(DataManager):
    """
    Data manager class for CMORized experiments
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    Parameters
    ----------
    config: earthdiagnostics.config.Config

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def __init__(self, config):
        super(CMORManager, self).__init__(config)
        self.find_model_data()
        self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid)

    def find_model_data(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Seek the configured data folders for the experiment data

        For each folder, it looks at:
        -<folder>/<expid>
        -<folder>/<model>/<expid>
        -<folder>/<data_type>/<model>/<expid>

        Model has any '-' character removed and is passed to lower
        """
        data_folders = self.config.data_dir.split(':')
        experiment_folder = self.experiment.model.lower()
        if experiment_folder.startswith('ec-earth'):
            experiment_folder = 'ecearth'
        self.config.data_dir = None
        for data_folder in data_folders:
            if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
                self.config.data_dir = data_folder
                break
            test_folder = os.path.join(data_folder, self.experiment.model.lower().replace('-', ''))
            if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
                self.config.data_dir = test_folder
            test_folder = os.path.join(data_folder, self.config.data_type, experiment_folder)
            if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
                self.config.data_dir = test_folder
        if not self.config.data_dir:
            raise Exception('Can not find model data')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    # noinspection PyUnusedLocal
    def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
                    vartype=VariableType.MEAN, possible_versions=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Check if a file exists in  the storage

        Parameters
        ----------
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        chunk: int
        grid: str or None
        box: Box or None
        frequency: Frequency or None
        vartype: VariableType
        possible_versions: iterable od str or None

        Returns
        -------
        bool

        """
        cmor_var = self.variable_list.get_variable(var)
        filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None)
        if possible_versions is None:
            return os.path.isfile(filepath)
        else:
            for version in possible_versions:
                if os.path.isfile(filepath.replace(self.config.cmor.version, version)):
                    return True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Request a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        Parameters
        ----------
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        chunk: int
        grid: str or None
        box: Box or None
        frequency: Frequency or None
        vartype: VariableType or None

        Returns
        -------
        DataFile

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        cmor_var = self.variable_list.get_variable(var)
        var = self._get_final_var_name(box, var)
        filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None)

        return self._get_file_from_storage(filepath)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def request_year(self, diagnostic, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Request a given year for a variavle from a CMOR repository

        Parameters
        ----------
        diagnostic: Diagnostic
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        year: int
        grid: str or None
        box: Box or None
        frequency: Frequency or None

        Returns
        -------
        DataFile
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        job = MergeYear(self, domain, var, startdate, member, year, grid, box, frequency)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        job.request_data()
        job.declare_data_generated()
        if not job.year_file.job_added:
            diagnostic.subjobs.append(job)
            job.year_file.job_added = True
    def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
                      vartype=VariableType.MEAN, diagnostic=None):
        """
        Declare a variable chunk to be generated by a diagnostic
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        Parameters
        ----------
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        chunk: int
        grid: str or None
        region: Basin or None
        box: Box or None
        frequency: Frequency or None
        vartype: VariableType
        diagnostic: Diagnostic

        Returns
        -------
        DataFile

        if frequency is None:
            frequency = self.config.frequency
        original_name = var
        cmor_var = self.variable_list.get_variable(var)
        if cmor_var:
            var = cmor_var.short_name
        final_name = self._get_final_var_name(box, var)

        filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, chunk, frequency, grid)
        netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
                                                   region, diagnostic, grid, vartype, original_name)
        netcdf_file.frequency = frequency
        return netcdf_file

    def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                     vartype=VariableType.MEAN, diagnostic=None):
        Declare a variable year to be generated by a diagnostic
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        Parameters
        ----------
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        year: int
        grid: str or None
        box: Box or None
        vartype: VariableType
        diagnostic: Diagnostic

        Returns
        -------
        DataFile

        """
        original_name = var
        cmor_var = self.variable_list.get_variable(var)
        if cmor_var:
            var = cmor_var.short_name
        final_name = self._get_final_var_name(box, var)

        filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, None, Frequencies.yearly, grid,
                                      year=year)
        netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
                                                   None, diagnostic, grid, vartype, original_name)
        netcdf_file.frequency = Frequencies.yearly
        return netcdf_file

    def get_file_path(self, startdate, member, domain, var, cmor_var, chunk, frequency,
                      grid=None, year=None, date_str=None):
        Return the path to a concrete file
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Parameters
        ----------
        startdate: str
        member: int
        domain: ModelingRealm
        var: str
        cmor_var: Variable
        chunk: int or None
        frequency: Frequency
        grid: str or None
        year: int or None
        date_str: str or None

        Returns
        -------
        str

        Raises
        ------
        ValueError
            If you provide two or more parameters from chunk, year or date_str or none at all

        """
        options = sum(x is not None for x in (chunk, year, date_str))
        if options == 0:
            raise ValueError('You must provide chunk, year or date_str')
        elif options > 1:
            raise ValueError('You must provide only one parameter in chunk, year or date_str')
        if frequency is None:
        folder_path = self._get_full_cmor_folder_path(startdate, member, domain, var, frequency, grid, cmor_var)
        file_name = self._get_cmor_file_name(startdate, member, domain, var, cmor_var, frequency,
                                             chunk, year, date_str, grid)
        filepath = os.path.join(folder_path, file_name)
        return filepath
    def _get_cmor_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ):
        if cmor_var is None:
            cmor_table = domain.get_table(frequency, self.config.data_convention)
        else:
            cmor_table = cmor_var.get_table(frequency, self.config.data_convention)

        time_bound = self._get_time_component(chunk, date_str, frequency, startdate, year)

        if time_bound:
            time_bound = '_{0}.nc'.format(time_bound)
        else:
            time_bound = '.nc'

        if self.config.data_convention in ('specs', 'preface'):
            file_name = '{0}_{1}_{2}_{3}_S{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model,
                                                             self.experiment.experiment_name, startdate,
                                                             self._get_member_str(member), time_bound)
        elif self.config.data_convention in ('primavera', 'cmip6'):
            if not grid:
                if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]:
                    grid = self.config.cmor.default_ocean_grid
                else:
                    grid = self.config.cmor.default_atmos_grid
            file_name = '{0}_{1}_{2}_{3}_{4}_{5}{6}'.format(var, cmor_table.name,  self.experiment.model,
                                                            self.experiment.experiment_name,
                                                            self._get_member_str(member),
                                                            grid, time_bound)
        elif self.config.data_convention in ('meteofrance',):
            time_bound = self._get_chunk_time_bounds(startdate, chunk)
            file_name = '{0}_{1}_{2}_{3}.nc'.format(var, frequency, time_bound, self._get_member_str(member))
        else:
            raise Exception('Data convention {0} not supported'.format(self.config.data_convention))
    def _get_time_component(self, chunk, date_str, frequency, startdate, year):
        if chunk is not None:
            time_bound = self._get_chunk_time_bounds(startdate, chunk)
        elif year:
            if frequency != Frequencies.yearly:
                raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
            time_bound = str(year)
        elif date_str:
            time_bound = date_str
        else:
            raise ValueError('Chunk, year and date_str can not be None at the same time')
        return time_bound

    def _get_full_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var):
        if self.config.data_convention in ('specs', 'preface'):
            folder_path = os.path.join(self._get_startdate_path(startdate), str(frequency), domain.name, var)
            if grid:
                folder_path = os.path.join(folder_path, grid)
            folder_path = os.path.join(folder_path, self._get_member_str(member))
            if self.config.cmor.version:
                folder_path = os.path.join(folder_path, self.config.cmor.version)

        elif self.config.data_convention in ('primavera', 'cmip6'):
            if not self.config.cmor.version:
                raise ValueError('CMOR version is mandatory for PRIMAVERA and CMIP6')
            if not grid:
                if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]:
                    grid = self.config.cmor.default_ocean_grid
                else:
                    grid = self.config.cmor.default_atmos_grid
            if cmor_var is None:
                table_name = domain.get_table(frequency, self.config.data_convention).name
            else:
                table_name = cmor_var.get_table(frequency, self.config.data_convention).name
            folder_path = os.path.join(self._get_startdate_path(startdate), self._get_member_str(member),
                                       table_name, var,
                                       grid, self.config.cmor.version)
        elif self.config.data_convention == 'meteofrance':
            folder_path = os.path.join(self.config.data_dir, self.experiment.experiment_name,
                                       'H{0}'.format(chr(64 + int(startdate[4:6]))),
                                       startdate[0:4])
        else:
            raise ValueError('Data convention {0} not supported'.format(self.config.data_convention))
        return folder_path

    def _get_chunk_time_bounds(self, startdate, chunk):
        start = parse_date(startdate)
        chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
        chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar)
        chunk_end = previous_day(chunk_end, self.experiment.calendar)
        if self.config.data_convention == 'preface':
            separator = '_'
        else:
            separator = '-'
        if self.config.data_convention == 'meteofrance':
            time_bound = "{0:04}{1:02}".format(chunk_start.year, chunk_start.month)
        else:
            time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                              chunk_end.month, separator)
    def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
                  frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
        Create the link of a given file from the CMOR repository.
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Parameters
        ----------
        domain: ModelingRealm
        var: str
        cmor_var:
        startdate: str
        member: int
        chunk: int or None, optional
        grid: str or None, optional
        frequency: Frequency or None, optional
        year: int or None, optional
        date_str: str or None, optional
        move_old: bool, optional
        vartype: VariableType, optional
        if frequency is None:
        filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                      grid=grid, year=year, date_str=date_str)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.create_link(domain, filepath, frequency, var, grid, move_old, vartype)
    def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
        """
        Create file link

        Parameters
        ----------
        domain: ModelingRealm
        filepath: str
        frequency: Frequency
        var: str
        grid: str
        move_old: bool
        vartype: VariableType

        """
        if self.config.data_convention == 'meteofrance':
            return
        freq_str = frequency.folder_name(vartype)

        if not grid:
            grid = 'original'

        variable_folder = self.get_varfolder(domain, var)
        vargrid_folder = self.get_varfolder(domain, var, grid)

        self.lock.acquire()
        try:
            if grid == 'original':
                link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
                if os.path.islink(link_path):
                    link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)

                Utils.create_folder_tree(link_path)
            else:
                link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
                Utils.create_folder_tree(link_path)
                default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
                original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
                                             vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))

                if os.path.islink(default_path):
                    os.remove(default_path)
                elif os.path.isdir(default_path):
                    shutil.move(default_path, original_path)
                os.symlink(link_path, default_path)

            if move_old and link_path not in self._checked_vars:
                self._checked_vars.append(link_path)
                old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
                                        'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep))
                regex = re.compile(var + '_[0-9]{6,8}\.nc')
                for filename in os.listdir(link_path):
                    if regex.match(filename):
                        Utils.create_folder_tree(old_path)
                        Utils.move_file(os.path.join(link_path, filename),
                                        os.path.join(old_path, filename))

            link_path = os.path.join(link_path, os.path.basename(filepath))
            if os.path.lexists(link_path):
                os.remove(link_path)
            if not os.path.exists(filepath):
                raise ValueError('Original file {0} does not exists'.format(filepath))
            if not os.path.isdir(os.path.dirname(link_path)):
                Utils.create_folder_tree(os.path.dirname(link_path))
            relative_path = os.path.relpath(filepath, os.path.dirname(link_path))
            os.symlink(relative_path, link_path)
        except Exception:
            raise
        finally:
            self.lock.release()

    def prepare(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Prepare the data to be used by the diagnostic.

        If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
        will be launched

        If CMOR data is available but packed, the procedure will unpack it.

        """
        # Check if cmorized and convert if not

        if self.config.data_convention == 'meteofrance':
            return

        for startdate, member in self.experiment.get_member_list():
                self._cmorize_member(startdate, member)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def is_cmorized(self, startdate, member, chunk, domain):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Check if a chunk domain is cmorized

        A cache is maintained so only the first check is costly

        Parameters
        ----------
        startdate: str
        member: int
        chunk: int
        domain: ModelingRealm

        Returns
        -------
        bool

        """
        identifier = (startdate, member, chunk)
        if identifier not in self._dic_cmorized:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self._dic_cmorized[identifier] = {}
            self._dic_cmorized[identifier][domain] = self._is_cmorized(startdate, member, chunk, domain)
        elif domain not in self._dic_cmorized[identifier]:
            self._dic_cmorized[identifier][domain] = self._is_cmorized(startdate, member, chunk, domain)
        return self._dic_cmorized[identifier][domain]

    def _is_cmorized(self, startdate, member, chunk, domain):
        startdate_path = self._get_startdate_path(startdate)
        count = 0
        if self.config.data_convention == 'specs':
            for freq in os.listdir(startdate_path):
                domain_path = os.path.join(startdate_path, freq, domain.name)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                if os.path.isdir(domain_path):
                    count = self._check_var_presence(domain_path, count, startdate, member, domain, chunk, freq)
                    if count >= self.config.cmor.min_cmorized_vars:
                        return True
        else:
            member_path = os.path.join(startdate_path, self._get_member_str(member))
            if not os.path.isdir(member_path):
                return False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            freq = Frequencies.monthly
            table = domain.get_table(freq, self.config.data_convention)
            table_dir = os.path.join(member_path, table.name)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if not os.path.isdir(table_dir):
                return False
            count = self._check_var_presence(table_dir, count, startdate, member, domain, chunk, freq)
            if count >= self.config.cmor.min_cmorized_vars:
                return True
    def _check_var_presence(self, folder, current_count, startdate, member, domain, chunk, freq):
        for var in os.listdir(folder):
            cmor_var = self.variable_list.get_variable(var, True)
            var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency=freq)
            if os.path.isfile(var_path):
                current_count += 1
                if current_count >= self.config.cmor.min_cmorized_vars:
                    break
        return current_count

    def _cmorize_member(self, startdate, member):
        start_time = datetime.now()
        member_str = self.experiment.get_member_str(member)
        Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time)
        cmorizer = Cmorizer(self, startdate, member)
        cmorizer.cmorize_ocean()
        cmorizer.cmorize_atmos()
        Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str,
    def _unpack_cmor_files(self, startdate, member):
        if self.config.cmor.force:
            return False
        for chunk in range(1, self.experiment.num_chunks + 1):
            if not self.config.cmor.force_untar:
                if self.is_cmorized(startdate, member, chunk, ModelingRealms.atmos) or \
                        self.is_cmorized(startdate, member, chunk, ModelingRealms.ocean):
                    cmorized = True
                    continue

            if self._unpack_chunk(startdate, member, chunk):
                cmorized = True
        if cmorized:
            Log.info('Startdate {0} member {1} ready', startdate, member)
        return cmorized

    def _unpack_chunk(self, startdate, member, chunk):
        if not self.config.cmor.chunk_cmorization_requested(chunk):
            return True

        filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz')
            Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk)
            Utils.unzip(filepaths, True)

        if not os.path.exists(self.cmor_path):
            os.mkdir(self.cmor_path)

        filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar')
            Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk)
            Utils.untar(filepaths, self.cmor_path)
            self._correct_paths(startdate)
            self.create_links(startdate, member)
    def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension):
        tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
        tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
                                          'cmorfiles')
        filepaths = []
        for cmor_prefix in ('CMOR?', 'CMOR'):
            file_name = '{5}_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate,
                                                           self.experiment.get_member_str(member),
                                                           self.experiment.get_chunk_start_str(startdate, chunk),
                                                           extension, cmor_prefix)
            filepaths += glob.glob(os.path.join(tar_path, file_name))
            filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
            filepaths += glob.glob(os.path.join(tar_original_files, file_name))
            filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _correct_paths(self, startdate):
        self._remove_extra_output_folder()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._fix_model_as_experiment_error(startdate)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _fix_model_as_experiment_error(self, startdate):
        if self.experiment.experiment_name != self.experiment.model:
            bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
                                    self.experiment.model)
            Log.debug('Correcting double model appearance')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            for (dirpath, _, filenames) in os.walk(bad_path, False):
                    if '_S{0}_'.format(startdate) in filename:
                        continue
                    good = filepath
                    good = good.replace('_{0}_output_'.format(self.experiment.model),
                                        '_{0}_{1}_S{2}_'.format(self.experiment.model,
                                                                self.experiment.experiment_name,
                                                                startdate))

                    good = good.replace('/{0}/{0}'.format(self.experiment.model),
                                        '/{0}/{1}'.format(self.experiment.model,
                                                          self.experiment.experiment_name))

                    Utils.move_file(filepath, good)
                if self.experiment.model != self.experiment.experiment_name:
                    os.rmdir(dirpath)
    def _remove_extra_output_folder(self):
        bad_path = os.path.join(self.cmor_path, 'output')
        if os.path.exists(bad_path):
            Log.debug('Moving CMOR files out of the output folder')
            Utils.move_tree(bad_path, self.cmor_path)
    def create_links(self, startdate, member=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Create links for a gicen startdate or member

        Parameters
        ----------
        startdate: str
        member: int or None

        Returns
        -------
        ValueError:
            If the data convention is not supported

        """
        if member is not None:
            member_str = self._get_member_str(member)
        Log.info('Creating links for CMOR files ({0})', startdate)
        if self.config.data_convention.upper() in ('SPECS', 'APPLICATE'):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self._create_links_cmip5(member_str, path)
        elif self.config.data_convention.upper() in ('CMIP6', 'PRIMAVERA'):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self._create_links_cmip6(member_str, path)
        else:
            raise ValueError('Dataset convention {0} not supported for massive '
                             'link creation'.format(self.config.data_convention))
        Log.debug('Links ready')

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _create_links_cmip5(self, member_str, path):
            frequency = Frequency.parse(freq)
            for domain in os.listdir(os.path.join(path, freq)):
                for var in os.listdir(os.path.join(path, freq, domain)):
                    for member in os.listdir(os.path.join(path, freq, domain, var)):
                        for name in os.listdir(os.path.join(path, freq, domain, var, member)):
                            filepath = os.path.join(path, freq, domain, var, member, name)
                            if os.path.isfile(filepath):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                self.create_link(domain, filepath, frequency, var, "", False,
                                                 vartype=VariableType.MEAN)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                    self.create_link(domain, os.path.join(filepath, filename), frequency, var, "",
                                                     False, vartype=VariableType.MEAN)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _create_links_cmip6(self, member_str, path):
        for member in os.listdir(path):
            for table in os.listdir(os.path.join(path, member)):
                frequency = self.variable_list.tables[table].frequency
                domain = None
                for var in os.listdir(os.path.join(path, member, table)):
                    for grid in os.listdir(os.path.join(path, member, table, var)):
                        if member_str != member:
                            continue
                        for name in os.listdir(os.path.join(path, member, table, var, grid)):
                            filepath = os.path.join(path, member, table, var, grid, name)
                            if os.path.isfile(filepath):
                                self.create_link(domain, filepath, frequency, var, "", False,
                                                 vartype=VariableType.MEAN)
                            else:
                                for filename in os.listdir(filepath):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                    cmorfile = os.path.join(filepath, filename)
                                    self.create_link(domain, cmorfile, frequency, var, "",
                                                     False, vartype=VariableType.MEAN)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Return the path to the startdate's CMOR folder

        Parameters
        ----------
        startdate: str

        Returns
        -------
        str
        if self.config.data_convention == 'specs':
            return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
                                self.experiment.model, self.experiment.experiment_name, 'S' + startdate)
        elif self.config.data_convention == 'preface':
            return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
                                self.experiment.experiment_name, 'S' + startdate)
        else:
            return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.config.cmor.activity,
                                self.experiment.institute, self.experiment.model, self.experiment.experiment_name)
    def _get_member_str(self, member):
        if self.config.data_convention in ('specs', 'preface'):
        elif self.config.data_convention in ('primavera', 'cmip6'):
        elif self.config.data_convention == 'meteofrance':
            return '{0:02d}'.format(member)
        return template.format(member + 1 - self.experiment.member_count_start, self.config.cmor.initialization_number)

class MergeYear(Diagnostic):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    Diagnostic to get all the data for a given year and merge it in a file

    Parameters
    ----------
    data_manager: DataManager
    domain: ModelingRealm
    var: str
    startdate: str
    member: int
    year: int
    grid: str or None, optional
    box: Box or None, optional
    frequency: Frequency or None, optional
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @classmethod
    def generate_jobs(cls, diags, options):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Method to generate the required diagnostics from a section of the configuration file

        Required by the interface, does nothing as this diagnostic is not meant to be configured in the usal way
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        pass

    def __init__(self, data_manager, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
        super(MergeYear, self).__init__(data_manager)
        self.chunk_files = []
        self.experiment = self.data_manager.experiment
        self.domain = domain
        self.var = var
        self.startdate = startdate
        self.member = member
        self.year = year
        self.grid = grid
        self.box = box
        self.frequency = frequency

    def __str__(self):
        return 'Merge year data Variable: {0.domain}:{0.var} Startdate: {0.startdate} Member: {0.member} ' \
               'Year: {0.year} Grid: {0.grid} Box: {0.box} Frequency: {0.frequency}'.format(self)

    def __eq__(self, other):
        return self.domain == other.domain and self.var == other.var and self.startdate == other.startdate and \
            self.member == other.member and self.year == other.year and self.grid == other.grid and \
            self.box == other.box and self.frequency == other.frequency

    def __hash__(self):
        return hash(str(self))

    def request_data(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Request all the data required by the diagnostic"""
        for chunk in self.experiment.get_year_chunks(self.startdate, self.year):
            self.chunk_files.append(self.request_chunk(self.domain, self.var, self.startdate, self.member, chunk,
                                                       grid=self.grid, box=self.box, frequency=self.frequency))

    def declare_data_generated(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Declare all the data generated by the diagnostic"""
        self.year_file = self.declare_year(self.domain, self.var, self.startdate, self.member, self.year,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                           grid=self.grid, box=self.box)
        self.year_file.storage_status = StorageStatus.NO_STORE

    def compute(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Create the yearly file for the data"""
        temp = self._merge_chunk_files()
        temp2 = self._select_data_of_given_year(temp)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.year_file.set_local_file(temp2)

    def _select_data_of_given_year(self, data_file):
        temp2 = TempFile.get()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(data_file)
        times = Utils.get_datetime_from_netcdf(handler)
        x = 0
        first_index = None
        last_index = None
        while x < times.size:
            if times[x].year == self.year:
                first_index = x
                break
            else:
                x += 1

        while x < times.size:
            if times[x].year != self.year:
                last_index = x
                break
            else:
                x += 1
        if last_index is None:
            last_index = times.size
        Utils.nco.ncks(input=data_file, output=temp2, options=['-d time,{0},{1}'.format(first_index, last_index - 1)])
        return temp2

    def _merge_chunk_files(self):
        temp = TempFile.get()
        if len(self.chunk_files) == 1:
            Utils.copy_file(self.chunk_files[0].local_file, temp)
            return temp

        Utils.nco.ncrcat(input=' '.join(self.chunk_files), output=temp)
        for chunk_file in self.chunk_files:
            os.remove(chunk_file)
        return temp