cmormanager.py 22 KB
Newer Older
from datetime import datetime

import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day

from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.datamanager import DataManager, NetCDFFile
from earthdiagnostics.frequency import Frequencies, Frequency
from earthdiagnostics.modelingrealm import ModelingRealm
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import Variable
from earthdiagnostics.variable_type import VariableType


class CMORManager(DataManager):
    """
    Data manager class for CMORized experiments
    """
    def __init__(self, config):
        super(CMORManager, self).__init__(config)
        data_folders = self.config.data_dir.split(':')
        experiment_folder = self.experiment.model.lower()
        if experiment_folder.startswith('ec-earth'):
            experiment_folder = 'ecearth'

        self.config.data_dir = None
        for data_folder in data_folders:
            if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
                self.config.data_dir = data_folder
                break

            data_folder = os.path.join(data_folder, self.config.data_type,  experiment_folder)
            if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
                self.config.data_dir = data_folder
                break

        if not self.config.data_dir:
            raise Exception('Can not find model data')
        self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
    def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
                 vartype=VariableType.MEAN):
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param domain: CMOR domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str|NoneType
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency|NoneType
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, box, grid, None, None)

        temp_path = TempFile.get()
        Utils.copy_file(filepath, temp_path)
        return temp_path

    def get_file_path(self, startdate, member, domain, var, chunk, frequency,
                      box=None, grid=None, year=None, date_str=None):
        """
        Returns the path to a concrete file
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param domain: file's domain
        :type var: var
        :param chunk: file's chunk
        :type chunk: int
        :param frequency: file's frequency
        :type frequency: Frequency
        :param box: file's box
        :type box: Box
        :param grid: file's grid
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :type grid: str|NoneType
        :type year: int|str|NoneType
        :param date_str: date string to add directly. Overrides year or chunk configurations
        :type date_str: str|NoneType
        :rtype: str|NoneType
        """
        if not frequency:
            frequency = self.config.frequency
        var = self._get_final_var_name(box, var)

        folder_path = self._get_full_cmor_folder_path(startdate, member, domain, var, frequency, grid)
        file_name = self._get_cmor_file_name(startdate, member, domain, var, frequency, chunk, year, date_str, grid)
        filepath = os.path.join(folder_path, file_name)
        return filepath
    def _get_cmor_file_name(self, startdate, member, domain, var, frequency, chunk, year, date_str, grid):
        cmor_var = self.variable_list.get_variable(var)
        if cmor_var is None:
            cmor_table = domain.get_table(frequency, self.config.data_convention)
        else:
            cmor_table = cmor_var.get_table(frequency, self.config.data_convention)

        if chunk is not None:
            time_bound = self._get_chunk_time_bounds(startdate, chunk)
            if frequency != Frequencies.yearly:
                raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
            time_bound = str(year)
        elif date_str:
            time_bound = date_str
        else:
            raise ValueError('Chunk, year and date_str can not be None at the same time')

        if time_bound:
            time_bound = '_{0}.nc'.format(time_bound)
        else:
            time_bound = '.nc'

        if self.config.data_convention == 'specs':

            file_name = '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1{6}'.format(var,
                                                                  cmor_table.name,
                                                                  self.experiment.model,
                                                                  self.experiment.experiment_name,
                                                                  startdate,
        elif self.config.data_convention in ('primavera', 'cmip6'):
            file_name = '{0}_{1}_{2}_{3}_S{4}-r{5}i1p1_{6}{7}'.format(var,
                                                                      cmor_table.name,
                                                                      self.experiment.experiment_name,
                                                                      self.experiment.model,
                                                                      startdate,
                                                                      member + 1,
                                                                      grid,
                                                                      time_bound)
        else:
            raise Exception('Data convention {0} not supported'.format(self.config.data_convention))
        return file_name

    def _get_full_cmor_folder_path(self, startdate, member, domain, var, frequency, grid):
        folder_path = os.path.join(self._get_startdate_path(startdate), str(frequency), domain.name, var)
            folder_path = os.path.join(folder_path, grid)
        folder_path = os.path.join(folder_path, 'r{0}i1p1'.format(member + 1))
        return folder_path

    def _get_chunk_time_bounds(self, startdate, chunk):
        start = parse_date(startdate)
        chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')
        time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                        chunk_end.month)
        return time_bound

    def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
                  frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
        """
        Creates the link of a given file from the CMOR repository.

        :param move_old:
        :param date_str:
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param domain: CMOR domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        var = self._get_final_var_name(box, var)

        if not frequency:
            frequency = self.config.frequency
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, grid=grid, year=str(year),
                                      date_str=date_str)
        self._create_link(domain, filepath, frequency, var, grid, move_old, vartype)
    def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None,
                  box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
                  diagnostic=None, cmorized=False, vartype=VariableType.MEAN):
        """
        Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
        with already existing ones as needed

        :param move_old: if true, moves files following older conventions that may be found on the links folder
        :type move_old: bool
        :param date_str: exact date_str to use in the cmorized file
        :type: str
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param rename_var: if exists, the given variable will be renamed to the one given by var
        :type rename_var: str
        :param filetosend: path to the file to send to the CMOR repository
        :type filetosend: str
        :param region: specifies the region represented by the file. If it is defined, the data will be appended to the
            CMOR repository as a new region in the file or will overwrite if region was already present
        :type region: str
        :param domain: CMOR domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency
        :param diagnostic: diagnostic used to generate the file
        :type diagnostic: Diagnostic
        :param cmorized: flag to indicate if file was generated in cmorization process
        :type cmorized: bool
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        cmor_var = self.variable_list.get_variable(original_var)
        var = self._get_final_var_name(box, var)

        if rename_var and rename_var != var:
            Utils.rename_variable(filetosend, rename_var, var)
        elif original_var != var:
            Utils.rename_variable(filetosend, original_var, var)

        if not frequency:
            frequency = self.config.frequency

        filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, None,
                                      grid, year, date_str)
        netcdf_file = NetCDFFile(filepath, filetosend, domain, var, cmor_var, self.config.data_convention, region)
        netcdf_file.frequency = frequency
        if diagnostic:
            netcdf_file.add_diagnostic_history(diagnostic)
        elif cmorized:
            netcdf_file.add_cmorization_history()
        else:
            raise ValueError('You must provide a diagnostic or set cmorized to true to store data '
                             'using the CMORManager')
        self._create_link(domain, filepath, frequency, var, grid, move_old, vartype)
    def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
        """
        Ge a file containing all the data for one year for one variable
        :param domain: variable's domain
        :type domain: str
        :param var: variable's name
        :type var: str
        :param startdate: startdate to retrieve
        :type startdate: str
        :param member: member to retrieve
        :type member: int
        :param year: year to retrieve
        :type year: int
        :param grid: variable's grid
        :type grid: str
        :param box: variable's box
        :type box: Box
        :return:
        """

        chunk_files = list()
        for chunk in self.experiment.get_year_chunks(startdate, year):
            chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))

        if len(chunk_files) > 1:
            temp = self._merge_chunk_files(chunk_files)
        temp2 = self._select_data_of_given_year(temp, year)
        os.remove(temp)
    @staticmethod
    def _select_data_of_given_year(data_file, year):
        Utils.cdo.selyear(str(year), input=data_file, output=temp2)
    @staticmethod
    def _merge_chunk_files(chunk_files):
        temp = TempFile.get()
        Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
        for chunk_file in chunk_files:
            os.remove(chunk_file)
        return temp
    def prepare(self):
        """
        Prepares the data to be used by the diagnostic.

        If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
        will be launched

        If CMOR data is available but packed, the procedure will unpack it.

        :return:
        """
        # Check if cmorized and convert if not

        for startdate, member in self.experiment.get_member_list():
                self._cmorize_member(startdate, member)
    def _is_cmorized(self, startdate, member, chunk):
        startdate_path = self._get_startdate_path(startdate)
        if not os.path.exists(startdate_path):
            return False
        for freq in os.listdir(startdate_path):
            freq_path = os.path.join(startdate_path, freq)
            for domain in os.listdir(freq_path):
                domain_path = os.path.join(freq_path, domain)
                for var in os.listdir(domain_path):
                    var_path = self.get_file_path(startdate, member, ModelingRealm(domain), var, chunk, Frequency(freq))
                    if os.path.exists(var_path):
    def _cmorize_member(self, startdate, member):
        start_time = datetime.now()
        member_str = self.experiment.get_member_str(member)
        Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time)
        cmorizer = Cmorizer(self, startdate, member)
        cmorizer.cmorize_ocean()
        cmorizer.cmorize_atmos()
        Log.result('CMORized startdate {0} member {1}! Ellpased time: {2}\n\n', startdate, member_str,
                   datetime.now() - start_time)
    def _unpack_cmor_files(self, startdate, member):
        if self.config.cmor.force:
            return False
        chunk = 1
        cmorized = False

        if not self.config.cmor.force_untar:
            while self._is_cmorized(startdate, member, chunk):
                chunk += 1
                cmorized = True

        while self._unpack_chunk(startdate, member, chunk):
            chunk += 1
            cmorized = True

        return cmorized

    def _unpack_chunk(self, startdate, member, chunk):

        filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz')
            Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk)
            Utils.unzip(filepaths, True)

        if not os.path.exists(self.cmor_path):
            os.mkdir(self.cmor_path)

        filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar')
            Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk)
            Utils.untar(filepaths, self.cmor_path)
            self._correct_paths(startdate)
            self._create_links(startdate)
            return True
        return False

    def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension):
        tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
        tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
                                          'cmorfiles')
        file_name = 'CMOR?_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate,
                                                         self.experiment.get_member_str(member),
                                                         self.experiment.get_chunk_start_str(startdate, chunk),
                                                         extension)
        filepaths = glob.glob(os.path.join(tar_path, file_name))
        filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
        filepaths += glob.glob(os.path.join(tar_original_files, file_name))
        filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
        return filepaths
        self._remove_extra_output_folder()
        self._fix_model_as_experiment_error(startdate)
    def _fix_model_as_experiment_error(self, startdate):
        if self.experiment.experiment_name != self.experiment.model:
            bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
                                    self.experiment.model)
            Log.debug('Correcting double model appearance')
            for (dirpath, dirnames, filenames) in os.walk(bad_path, False):

                for filename in filenames:
                    filepath = os.path.join(dirpath, filename)
                    good = filepath.replace('_{0}_output_'.format(self.experiment.model),
                                            '_{0}_{1}_S{2}_'.format(self.experiment.model,
                                                                    self.experiment.experiment_name,
                                                                    startdate))

                    good = good.replace('/{0}/{0}'.format(self.experiment.model),
                                        '/{0}/{1}'.format(self.experiment.model,
                                                          self.experiment.experiment_name))

                    Utils.move_file(filepath, good)
                os.rmdir(dirpath)
            Log.debug('Done')

    def _remove_extra_output_folder(self):
        bad_path = os.path.join(self.cmor_path, 'output')
        if os.path.exists(bad_path):
            Log.debug('Moving CMOR files out of the output folder')
            CMORManager.copytree(bad_path, self.cmor_path)
            shutil.rmtree(bad_path)
    def copytree(source, destiny):
        if not os.path.exists(destiny):
            os.makedirs(destiny)
            shutil.copystat(source, destiny)
        lst = os.listdir(source)
            item_source = os.path.join(source, item)
            item_destiny = os.path.join(destiny, item)
            if os.path.isdir(item_source):
                CMORManager.copytree(item_source, item_destiny)
                shutil.copy2(item_source, item_destiny)
        Log.info('Creating links for CMOR files ({0})', startdate)
        path = self._get_startdate_path(startdate)
        for freq in os.listdir(path):
            frequency = Frequency.parse(freq)
            for domain in os.listdir(os.path.join(path, freq)):
                for var in os.listdir(os.path.join(path, freq, domain)):
                    for member in os.listdir(os.path.join(path, freq, domain, var)):
                        for name in os.listdir(os.path.join(path, freq, domain, var, member)):
                            filepath = os.path.join(path, freq, domain, var, member, name)
                            if os.path.isfile(filepath):
                                self._create_link(domain, filepath, frequency, var, "", False,
                                                  vartype=VariableType.MEAN)
                                    self._create_link(domain, os.path.join(filepath, filename), freq, var, "", False,
                                                      vartype=VariableType.MEAN)
        Log.debug('Links ready')

    def _get_startdate_path(self, startdate):
        """
        Returns the path to the startdate's CMOR folder
        :param startdate: target startdate
        :type startdate: str
        :return: path to the startdate's CMOR º
        :rtype: str
        """
        return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
                            self.experiment.model, self.experiment.experiment_name, 'S' + startdate)