cmormanager.py 12.7 KB
Newer Older
import glob
from datetime import datetime

import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day

from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.utils import TempFile, Utils


class CMORManager(DataManager):
    """
    Data manager class for CMORized experiments
    """
    def get_file_path(self, startdate, member, domain, var, chunk, frequency,
                      box=None, grid=None, year=None, date_str=None):
        """
        Returns the path to a concrete file
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param domain: file's domain
        :type domain: str
        :param var: file's var
        :type var: str
        :param chunk: file's chunk
        :type chunk: int
        :param frequency: file's frequency
        :type frequency: str
        :param box: file's box
        :type box: Box
        :param grid: file's grid
        :type grid: str
        :param year: file's year
        :type year: int
        :param date_str: date string to add directly. Overrides year or chunk configurations
        :type date_str: str
        :return: path to the file
        :rtype: str
        """
        if not frequency:
            frequency = self.config.frequency
        var = self._get_final_var_name(box, var)
        domain_abreviattion = self.get_domain_abbreviation(domain, frequency)
        start = parse_date(startdate)
        member_plus = str(member + 1)

        member_path = os.path.join(self._get_startdate_path(startdate), frequency, domain)
        if chunk is not None:
            chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard')
            chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
            chunk_end = previous_day(chunk_end, 'standard')

            time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                            chunk_end.month)

        elif year:
            if frequency is not 'yr':
                raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
            time_bound = str(year)
        elif date_str:
            time_bound = date_str
        else:
            raise ValueError('Chunk, year and date_str can not be None at the same time')
        if grid:
            var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
        else:
            var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
        filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_'
                                          '{6}.nc'.format(var, domain_abreviattion, self.experiment.model,
                                                          self.experiment.experiment_name,
                                                          startdate, member_plus, time_bound))
        return filepath

    def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
                  frequency=None, year=None, date_str=None, move_old=False):
        """
        Creates the link of a given file from the CMOR repository.

        :param move_old:
        :param date_str:
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        var = self._get_final_var_name(box, var)

        if not frequency:
            frequency = self.config.frequency
        domain = DataManager.correct_domain(domain)
        filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, grid, year, date_str)
        self._create_link(domain, filepath, frequency, var, grid, move_old)

    def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
        """
        Ge a file containing all the data for one year for one variable
        :param domain: variable's domain
        :type domain: str
        :param var: variable's name
        :type var: str
        :param startdate: startdate to retrieve
        :type startdate: str
        :param member: member to retrieve
        :type member: int
        :param year: year to retrieve
        :type year: int
        :param grid: variable's grid
        :type grid: str
        :param box: variable's box
        :type box: Box
        :return:
        """

        chunk_files = list()
        for chunk in self.experiment.get_year_chunks(startdate, year):
            chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))

        if len(chunk_files) > 1:
            temp = TempFile.get()
            Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
            for chunk_file in chunk_files:
                os.remove(chunk_file)
        else:
            temp = chunk_files[0]
        temp2 = TempFile.get()
        handler = Utils.openCdf(temp)
        time = Utils.get_datetime_from_netcdf(handler)
        handler.close()
        start = None
        end = None
        for x in range(0, len(time)):
            date = time[x]
            if date.year == year:
                if date.month == 1:
                    start = x
                elif date.month == 12:
                    end = x

        Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
        os.remove(temp)
        return temp2

    def _is_cmorized(self, startdate, member):
        startdate_path = self._get_startdate_path(startdate)
        if not os.path.exists(startdate_path):
            return False
        for freq in os.listdir(startdate_path):
            freq_path = os.path.join(startdate_path, freq)
            for domain in os.listdir(freq_path):
                domain_path = os.path.join(freq_path, domain)
                for var in os.listdir(domain_path):
                    member_path = os.path.join(domain_path, var, 'r{0}i1p1'.format(member + 1))
                    if os.path.exists(member_path):
                        return True
        return False

    # noinspection PyPep8Naming
    def prepare_CMOR_files(self):
        """
        Prepares the data to be used by the diagnostic.

        If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
        will be launched

        If CMOR data is available but packed, the procedure will unpack it.

        :return:
        """
        # Check if cmorized and convert if not

        for startdate, member in self.experiment.get_member_list():

            if self._is_cmorized(startdate, member) and not self.config.cmor.force:
                continue
            member_str = self.experiment.get_member_str(member)
            if not self.config.cmor.force:
                tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
                tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
                                                  'cmorfiles')
                file_name = 'CMOR?_{0}_{1}_*.tar.gz'.format(self.experiment.expid, startdate, member_str)
                filepaths = glob.glob(os.path.join(tar_path, file_name))
                filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
                filepaths += glob.glob(os.path.join(tar_original_files, file_name))
                filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
                if len(filepaths) > 0:
                    Log.info('Unzipping cmorized data...')
                    Utils.unzip(filepaths, True)

                if not os.path.exists(self.cmor_path):
                    os.mkdir(self.cmor_path)

                file_name = 'CMOR?_{0}_{1}_*.tar'.format(self.experiment.expid, startdate, member_str)
                filepaths = glob.glob(os.path.join(tar_path, file_name))
                filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
                filepaths += glob.glob(os.path.join(tar_original_files, file_name))
                filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
                if len(filepaths) > 0:
                    Log.info('Unpacking cmorized data...')
                    Utils.untar(filepaths, self.cmor_path)
                    self._correct_paths(startdate)
                    self._create_links(startdate)
                    continue

            start_time = datetime.now()
            Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time)

            cmorizer = Cmorizer(self, startdate, member)
            cmorizer.cmorize_ocean()
            cmorizer.cmorize_atmos()
            Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str,
                       datetime.now() - start_time)

    def _correct_paths(self, startdate):
        bad_path = os.path.join(self.cmor_path, 'output', self.experiment.institute)
        if os.path.exists(bad_path):
            Log.debug('Moving CMOR files out of the output folder')
            Utils.execute_shell_command(['mv', bad_path, os.path.join(bad_path, '..', '..')])
            os.rmdir(os.path.join(self.cmor_path, 'output'))
            Log.debug('Done')

        if self.experiment.experiment_name != self.experiment.model:
            bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
                                    self.experiment.model)
            Log.debug('Correcting double model appearance')
            for (dirpath, dirnames, filenames) in os.walk(bad_path, False):

                for filename in filenames:
                    filepath = os.path.join(dirpath, filename)
                    good = filepath.replace('_{0}_output_'.format(self.experiment.model),
                                            '_{0}_{1}_S{2}_'.format(self.experiment.model,
                                                                    self.experiment.experiment_name,
                                                                    startdate))

                    good = good.replace('/{0}/{0}'.format(self.experiment.model),
                                        '/{0}/{1}'.format(self.experiment.model,
                                                          self.experiment.experiment_name))

                    Utils.move_file(filepath, good)
                os.rmdir(dirpath)
            Log.debug('Done')

    def _create_links(self, startdate):
        Log.info('Creating links for CMOR files ()')
        path = self._get_startdate_path(startdate)
        for freq in os.listdir(path):
            for domain in os.listdir(os.path.join(path, freq)):
                for var in os.listdir(os.path.join(path, freq, domain)):
                    for member in os.listdir(os.path.join(path, freq, domain, var)):
                        for name in os.listdir(os.path.join(path, freq, domain, var, member)):
                            filepath = os.path.join(path, freq, domain, var, member, name)
                            if os.path.isfile(filepath):
                                self._create_link(domain, filepath, freq, var, "", False)
                            else:
                                for filename in os.listdir(filepath):
                                    self._create_link(domain, os.path.join(filepath, filename), freq, var, "", False)
        Log.info('Creating lings for CMOR files')

    def _get_startdate_path(self, startdate):
        """
        Returns the path to the startdate's CMOR folder
        :param startdate: target startdate
        :type startdate: str
        :return: path to the startdate's CMOR folder
        :rtype: str
        """
        return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
                            self.experiment.model, self.experiment.experiment_name, 'S' + startdate)