cmormanager.py 19.5 KB
Newer Older
"""Classes to manage cmorized datasets"""
import os
from bscearth.utils.log import Log
from earthdiagnostics.datafile import StorageStatus
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.frequency import Frequencies, Frequency
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import VariableType


class CMORManager(DataManager):
    """
    Data manager class for CMORized experiments
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    Parameters
    ----------
    config: earthdiagnostics.config.Config

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def __init__(self, config):
        super(CMORManager, self).__init__(config)
        self.find_model_data()
        self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid)
        self.convention = self.config.data_convention
    def find_model_data(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Seek the configured data folders for the experiment data

        For each folder, it looks at:
        -<folder>/<expid>
        -<folder>/<model>/<expid>
        -<folder>/<data_type>/<model>/<expid>

        Model has any '-' character removed and is passed to lower
        """
        data_folders = self.config.data_dir.split(':')
        experiment_folder = self.experiment.model.lower()
        if experiment_folder.startswith('ec-earth'):
            experiment_folder = 'ecearth'
        self.config.data_dir = None
        for data_folder in data_folders:
            if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
                self.config.data_dir = data_folder
                break
            test_folder = os.path.join(data_folder, self.experiment.model.lower().replace('-', ''))
            if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
                self.config.data_dir = test_folder
            test_folder = os.path.join(data_folder, self.config.data_type, experiment_folder)
            if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
                self.config.data_dir = test_folder
        if not self.config.data_dir:
            raise Exception('Can not find model data')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    # noinspection PyUnusedLocal
    def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
                    vartype=VariableType.MEAN, possible_versions=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Check if a file exists in  the storage

        Parameters
        ----------
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        chunk: int
        grid: str or None
        box: Box or None
        frequency: Frequency or None
        vartype: VariableType
        possible_versions: iterable od str or None

        Returns
        -------
        bool

        """
        cmor_var = self.variable_list.get_variable(var)
        filepath = self.convention.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid,
                                                 None, None)
        if possible_versions is None:
            return os.path.isfile(filepath)
        else:
            for version in possible_versions:
                if os.path.isfile(filepath.replace(self.config.cmor.version, version)):
                    return True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Request a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        Parameters
        ----------
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        chunk: int
        grid: str or None
        box: Box or None
        frequency: Frequency or None
        vartype: VariableType or None

        Returns
        -------
        DataFile

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        if frequency is None:
            frequency = self.config.frequency
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        cmor_var = self.variable_list.get_variable(var)
        var = self._get_final_var_name(box, var)
        filepath = self.convention.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid,
                                                 None, None)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        return self._get_file_from_storage(filepath)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def request_year(self, diagnostic, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Request a given year for a variavle from a CMOR repository

        Parameters
        ----------
        diagnostic: Diagnostic
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        year: int
        grid: str or None
        box: Box or None
        frequency: Frequency or None

        Returns
        -------
        DataFile
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        job = MergeYear(self, domain, var, startdate, member, year, grid, box, frequency)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        job.request_data()
        job.declare_data_generated()
        if not job.year_file.job_added:
            diagnostic.subjobs.append(job)
            job.year_file.job_added = True
    def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
                      vartype=VariableType.MEAN, diagnostic=None):
        """
        Declare a variable chunk to be generated by a diagnostic
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        Parameters
        ----------
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        chunk: int
        grid: str or None
        region: Basin or None
        box: Box or None
        frequency: Frequency or None
        vartype: VariableType
        diagnostic: Diagnostic

        Returns
        -------
        DataFile

        if frequency is None:
            frequency = self.config.frequency
        original_name = var
        cmor_var = self.variable_list.get_variable(var)
        if cmor_var:
            var = cmor_var.short_name
        final_name = self._get_final_var_name(box, var)

        filepath = self.convention.get_file_path(startdate, member, domain, final_name, cmor_var, chunk,
                                                 frequency, grid)
        netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
                                                   region, diagnostic, grid, vartype, original_name)
        netcdf_file.frequency = frequency
        return netcdf_file

    def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                     vartype=VariableType.MEAN, diagnostic=None):
        Declare a variable year to be generated by a diagnostic
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        Parameters
        ----------
        domain: ModelingRealm
        var: str
        startdate: str
        member: int
        year: int
        grid: str or None
        box: Box or None
        vartype: VariableType
        diagnostic: Diagnostic

        Returns
        -------
        DataFile

        """
        original_name = var
        cmor_var = self.variable_list.get_variable(var)
        if cmor_var:
            var = cmor_var.short_name
        final_name = self._get_final_var_name(box, var)

        filepath = self.convention.get_file_path(startdate, member, domain, final_name, cmor_var, None,
                                                 Frequencies.yearly, grid, year=year)
        netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
                                                   None, diagnostic, grid, vartype, original_name)
        netcdf_file.frequency = Frequencies.yearly
        return netcdf_file

    def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
                  frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
        Create the link of a given file from the CMOR repository.
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Parameters
        ----------
        domain: ModelingRealm
        var: str
        cmor_var:
        startdate: str
        member: int
        chunk: int or None, optional
        grid: str or None, optional
        frequency: Frequency or None, optional
        year: int or None, optional
        date_str: str or None, optional
        move_old: bool, optional
        vartype: VariableType, optional
        if frequency is None:
        filepath = self.convention.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency,
                                                 grid=grid, year=year, date_str=date_str)
        self.convention.create_link(domain, filepath, frequency, var, grid, move_old, vartype)
    def prepare(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Prepare the data to be used by the diagnostic.
        If CMOR data is not created, an automatic cmorization procedure
        is launched

        If CMOR data is available but packed, the procedure will unpack it.

        """
        # Check if cmorized and convert if not

        if self.config.data_convention.name == 'meteofrance':
        for startdate, member in self.experiment.get_member_list():
            self._prepare_member(startdate, member)

    def _prepare_member(self, startdate, member):
        Log.info('Checking data for startdate {0} member {1}', startdate, member)
        if not self.config.cmor.force:
            cmorized = False
            for chunk in range(1, self.experiment.num_chunks + 1):
                if not self.config.cmor.chunk_cmorization_requested(chunk):
                    Log.debug('Skipping chunk {0}', chunk)
                    continue
                if not self.config.cmor.force_untar:
                    Log.debug('Checking chunk {0}...', chunk)
                    for domain in (ModelingRealms.atmos, ModelingRealms.ocean, ModelingRealms.seaIce):
                        if self.is_cmorized(startdate, member, chunk, domain):
                            Log.debug('Chunk {0} ready', chunk)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                            skip = True
                            break
                if self._unpack_chunk(startdate, member, chunk):
                    cmorized = True
            if cmorized:
                Log.info('Startdate {0} member {1} ready', startdate, member)
                return
        self._cmorize_member(startdate, member)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def is_cmorized(self, startdate, member, chunk, domain):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Check if a chunk domain is cmorized

        A cache is maintained so only the first check is costly

        Parameters
        ----------
        startdate: str
        member: int
        chunk: int
        domain: ModelingRealm

        Returns
        -------
        bool

        """
        identifier = (startdate, member, chunk)
        if identifier not in self._dic_cmorized:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self._dic_cmorized[identifier] = {}
            self._dic_cmorized[identifier][domain] = self.convention.is_cmorized(startdate, member, chunk, domain)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        elif domain not in self._dic_cmorized[identifier]:
            self._dic_cmorized[identifier][domain] = self.convention.is_cmorized(startdate, member, chunk, domain)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return self._dic_cmorized[identifier][domain]

    def _cmorize_member(self, startdate, member):
        start_time = datetime.now()
        member_str = self.experiment.get_member_str(member)
        Log.info('CMORizing startdate {0} member {1}. Starting at {2}', startdate, member_str, start_time)
        cmorizer = Cmorizer(self, startdate, member)
        cmorizer.cmorize_ocean()
        cmorizer.cmorize_atmos()
        Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str,
    def _unpack_chunk(self, startdate, member, chunk):
        filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz')
            Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk)
            Utils.unzip(filepaths, True)
        filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar')
            Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Utils.untar(filepaths, os.path.join(self.cmor_path, 'cmorfiles'))
            self._correct_paths(startdate)
            self.convention.create_links(startdate, member)
    def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension):
        tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
        tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
                                          'cmorfiles')
        filepaths = []
        for cmor_prefix in ('CMOR?', 'CMOR'):
            file_name = '{5}_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate,
                                                           self.experiment.get_member_str(member),
                                                           self.experiment.get_chunk_start_str(startdate, chunk),
                                                           extension, cmor_prefix)
            filepaths += glob.glob(os.path.join(tar_path, file_name))
            filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
            filepaths += glob.glob(os.path.join(tar_original_files, file_name))
            filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _correct_paths(self, startdate):
        self._remove_extra_output_folder()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._fix_model_as_experiment_error(startdate)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _fix_model_as_experiment_error(self, startdate):
        experiment_name = self.convention.experiment_name(startdate)
        if experiment_name != self.experiment.model:
            bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
                                    self.experiment.model)
            Log.debug('Correcting double model appearance')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            for (dirpath, _, filenames) in os.walk(bad_path, False):
                    if '_S{0}_'.format(startdate) in filename:
                        continue
                    good = filepath
                    good = good.replace('_{0}_output_'.format(self.experiment.model),
                                        '_{0}_{1}_S{2}_'.format(self.experiment.model,

                    good = good.replace('/{0}/{0}'.format(self.experiment.model),
                                        '/{0}/{1}'.format(self.experiment.model,
                if self.experiment.model != experiment_name:
                    os.rmdir(dirpath)
    def _remove_extra_output_folder(self):
        bad_path = os.path.join(self.cmor_path, 'output')
        if os.path.exists(bad_path):
            Log.debug('Moving CMOR files out of the output folder')
            Utils.move_tree(bad_path, self.cmor_path)

class MergeYear(Diagnostic):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    Diagnostic to get all the data for a given year and merge it in a file

    Parameters
    ----------
    data_manager: DataManager
    domain: ModelingRealm
    var: str
    startdate: str
    member: int
    year: int
    grid: str or None, optional
    box: Box or None, optional
    frequency: Frequency or None, optional
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @classmethod
    def generate_jobs(cls, diags, options):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Method to generate the required diagnostics from a section of the configuration file

        Required by the interface, does nothing as this diagnostic is not meant to be configured in the usal way
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        pass

    def __init__(self, data_manager, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
        super(MergeYear, self).__init__(data_manager)
        self.chunk_files = []
        self.experiment = self.data_manager.experiment
        self.domain = domain
        self.var = var
        self.startdate = startdate
        self.member = member
        self.year = year
        self.grid = grid
        self.box = box
        self.frequency = frequency

    def __str__(self):
        return 'Merge year data Variable: {0.domain}:{0.var} Startdate: {0.startdate} Member: {0.member} ' \
               'Year: {0.year} Grid: {0.grid} Box: {0.box} Frequency: {0.frequency}'.format(self)

    def __eq__(self, other):
        return self.domain == other.domain and self.var == other.var and self.startdate == other.startdate and \
            self.member == other.member and self.year == other.year and self.grid == other.grid and \
            self.box == other.box and self.frequency == other.frequency

    def __hash__(self):
        return hash(str(self))

    def request_data(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Request all the data required by the diagnostic"""
        for chunk in self.experiment.get_year_chunks(self.startdate, self.year):
            self.chunk_files.append(self.request_chunk(self.domain, self.var, self.startdate, self.member, chunk,
                                                       grid=self.grid, box=self.box, frequency=self.frequency))

    def declare_data_generated(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Declare all the data generated by the diagnostic"""
        self.year_file = self.declare_year(self.domain, self.var, self.startdate, self.member, self.year,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                                           grid=self.grid, box=self.box)
        self.year_file.storage_status = StorageStatus.NO_STORE

    def compute(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """Create the yearly file for the data"""
        temp = self._merge_chunk_files()
        temp2 = self._select_data_of_given_year(temp)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.year_file.set_local_file(temp2)

    def _select_data_of_given_year(self, data_file):
        temp2 = TempFile.get()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(data_file)
        times = Utils.get_datetime_from_netcdf(handler)
        x = 0
        first_index = None
        last_index = None
        while x < times.size:
            if times[x].year == self.year:
                first_index = x
                break
            else:
                x += 1

        while x < times.size:
            if times[x].year != self.year:
                last_index = x
                break
            else:
                x += 1
        if last_index is None:
            last_index = times.size
        Utils.nco.ncks(input=data_file, output=temp2, options=['-d time,{0},{1}'.format(first_index, last_index - 1)])
        return temp2

    def _merge_chunk_files(self):
        temp = TempFile.get()
        if len(self.chunk_files) == 1:
            Utils.copy_file(self.chunk_files[0].local_file, temp)
            return temp

        Utils.nco.ncrcat(input=' '.join(self.chunk_files), output=temp)
        for chunk_file in self.chunk_files:
            os.remove(chunk_file)
        return temp