cmormanager.py 22.8 KB
Newer Older
from datetime import datetime

import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day

from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.datamanager import DataManager, NetCDFFile
from earthdiagnostics.frequency import Frequencies, Frequency
from earthdiagnostics.modelingrealm import ModelingRealm, ModelingRealms
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import Variable
from earthdiagnostics.variable_type import VariableType


class CMORManager(DataManager):
    """
    Data manager class for CMORized experiments
    """
    def __init__(self, config):
        super(CMORManager, self).__init__(config)
        data_folders = self.config.data_dir.split(':')
        experiment_folder = self.experiment.model.lower()
        if experiment_folder.startswith('ec-earth'):
            experiment_folder = 'ecearth'

        self.config.data_dir = None
        for data_folder in data_folders:
            if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
                self.config.data_dir = data_folder
                break

            data_folder = os.path.join(data_folder, self.config.data_type,  experiment_folder)
            if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
                self.config.data_dir = data_folder
                break

        if not self.config.data_dir:
            raise Exception('Can not find model data')
        self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
    def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
                    vartype=VariableType.MEAN):
        filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, box, grid, None, None)

        try:
            return os.path.isfile(filepath)
        except Exception:
            return False

    def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
                 vartype=VariableType.MEAN):
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param domain: CMOR domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str|NoneType
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency|NoneType
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, box, grid, None, None)

        temp_path = TempFile.get()
        Utils.copy_file(filepath, temp_path)
        return temp_path

    def get_file_path(self, startdate, member, domain, var, chunk, frequency,
                      box=None, grid=None, year=None, date_str=None):
        """
        Returns the path to a concrete file
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param domain: file's domain
        :type var: var
        :param chunk: file's chunk
        :type chunk: int
        :param frequency: file's frequency
        :type frequency: Frequency
        :param box: file's box
        :type box: Box
        :param grid: file's grid
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :type grid: str|NoneType
        :type year: int|str|NoneType
        :param date_str: date string to add directly. Overrides year or chunk configurations
        :type date_str: str|NoneType
        :rtype: str|NoneType
        """
        if not frequency:
            frequency = self.config.frequency
        var = self._get_final_var_name(box, var)

        folder_path = self._get_full_cmor_folder_path(startdate, member, domain, var, frequency, grid)
        file_name = self._get_cmor_file_name(startdate, member, domain, var, frequency, chunk, year, date_str, grid)
        filepath = os.path.join(folder_path, file_name)
        return filepath
    def _get_cmor_file_name(self, startdate, member, domain, var, frequency, chunk, year, date_str, grid):
        cmor_var = self.variable_list.get_variable(var)
        if cmor_var is None:
            cmor_table = domain.get_table(frequency, self.config.data_convention)
        else:
            cmor_table = cmor_var.get_table(frequency, self.config.data_convention)

        if chunk is not None:
            time_bound = self._get_chunk_time_bounds(startdate, chunk)
            if frequency != Frequencies.yearly:
                raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
            time_bound = str(year)
        elif date_str:
            time_bound = date_str
        else:
            raise ValueError('Chunk, year and date_str can not be None at the same time')

        if time_bound:
            time_bound = '_{0}.nc'.format(time_bound)
        else:
            time_bound = '.nc'

        if self.config.data_convention == 'specs':

            file_name = '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1{6}'.format(var,
                                                                  cmor_table.name,
                                                                  self.experiment.model,
                                                                  self.experiment.experiment_name,
                                                                  startdate,
        elif self.config.data_convention in ('primavera', 'cmip6'):
            file_name = '{0}_{1}_{2}_{3}_S{4}-r{5}i1p1_{6}{7}'.format(var,
                                                                      cmor_table.name,
                                                                      self.experiment.experiment_name,
                                                                      self.experiment.model,
                                                                      startdate,
                                                                      member + 1,
                                                                      grid,
                                                                      time_bound)
        else:
            raise Exception('Data convention {0} not supported'.format(self.config.data_convention))
        return file_name

    def _get_full_cmor_folder_path(self, startdate, member, domain, var, frequency, grid):
        folder_path = os.path.join(self._get_startdate_path(startdate), str(frequency), domain.name, var)
            folder_path = os.path.join(folder_path, grid)
        folder_path = os.path.join(folder_path, 'r{0}i1p1'.format(member + 1))
        return folder_path

    def _get_chunk_time_bounds(self, startdate, chunk):
        start = parse_date(startdate)
        chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')
        time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                        chunk_end.month)
        return time_bound

    def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
                  frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
        """
        Creates the link of a given file from the CMOR repository.

        :param move_old:
        :param date_str:
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param domain: CMOR domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        var = self._get_final_var_name(box, var)

        if not frequency:
            frequency = self.config.frequency
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, grid=grid, year=str(year),
                                      date_str=date_str)
        self._create_link(domain, filepath, frequency, var, grid, move_old, vartype)
    def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None,
                  box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
                  diagnostic=None, cmorized=False, vartype=VariableType.MEAN):
        """
        Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
        with already existing ones as needed

        :param move_old: if true, moves files following older conventions that may be found on the links folder
        :type move_old: bool
        :param date_str: exact date_str to use in the cmorized file
        :type: str
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param rename_var: if exists, the given variable will be renamed to the one given by var
        :type rename_var: str
        :param filetosend: path to the file to send to the CMOR repository
        :type filetosend: str
        :param region: specifies the region represented by the file. If it is defined, the data will be appended to the
            CMOR repository as a new region in the file or will overwrite if region was already present
        :type region: str
        :param domain: CMOR domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency
        :param diagnostic: diagnostic used to generate the file
        :type diagnostic: Diagnostic
        :param cmorized: flag to indicate if file was generated in cmorization process
        :type cmorized: bool
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        cmor_var = self.variable_list.get_variable(original_var)
        var = self._get_final_var_name(box, var)

        if rename_var and rename_var != var:
            Utils.rename_variable(filetosend, rename_var, var)
        elif original_var != var:
            Utils.rename_variable(filetosend, original_var, var)

        if not frequency:
            frequency = self.config.frequency

        filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, None,
                                      grid, year, date_str)
        netcdf_file = NetCDFFile(filepath, filetosend, domain, var, cmor_var, self.config.data_convention, region)
        netcdf_file.frequency = frequency
        if diagnostic:
            netcdf_file.add_diagnostic_history(diagnostic)
        elif cmorized:
            netcdf_file.add_cmorization_history()
        else:
            raise ValueError('You must provide a diagnostic or set cmorized to true to store data '
                             'using the CMORManager')
        self._create_link(domain, filepath, frequency, var, grid, move_old, vartype)
    def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
        """
        Ge a file containing all the data for one year for one variable
        :param domain: variable's domain
        :type domain: str
        :param var: variable's name
        :type var: str
        :param startdate: startdate to retrieve
        :type startdate: str
        :param member: member to retrieve
        :type member: int
        :param year: year to retrieve
        :type year: int
        :param grid: variable's grid
        :type grid: str
        :param box: variable's box
        :type box: Box
        :return:
        """

        chunk_files = list()
        for chunk in self.experiment.get_year_chunks(startdate, year):
            chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))

        if len(chunk_files) > 1:
            temp = self._merge_chunk_files(chunk_files)
        temp2 = self._select_data_of_given_year(temp, year)
        os.remove(temp)
    @staticmethod
    def _select_data_of_given_year(data_file, year):
        Utils.cdo.selyear(str(year), input=data_file, output=temp2)
    @staticmethod
    def _merge_chunk_files(chunk_files):
        temp = TempFile.get()
        Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
        for chunk_file in chunk_files:
            os.remove(chunk_file)
        return temp
    def prepare(self):
        """
        Prepares the data to be used by the diagnostic.

        If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
        will be launched

        If CMOR data is available but packed, the procedure will unpack it.

        :return:
        """
        # Check if cmorized and convert if not

        for startdate, member in self.experiment.get_member_list():
                self._cmorize_member(startdate, member)
    def is_cmorized(self, startdate, member, chunk, domain):
        identifier = (startdate, member, chunk, domain.name)
        if identifier not in self._dic_cmorized:
            self._dic_cmorized[identifier] = self._is_cmorized(startdate, member, chunk, domain)
        return self._dic_cmorized[identifier]
    def _is_cmorized(self, startdate, member, chunk, domain):
        startdate_path = self._get_startdate_path(startdate)
            return False
        for freq in os.listdir(startdate_path):
            domain_path = os.path.join(startdate_path, freq,
                                       domain.name)
            if os.path.isdir(domain_path):
                for var in os.listdir(domain_path):
                    var_path = self.get_file_path(startdate, member, domain, var, chunk, Frequency(freq))
                    if os.path.isfile(var_path):
    def _cmorize_member(self, startdate, member):
        start_time = datetime.now()
        member_str = self.experiment.get_member_str(member)
        Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time)
        cmorizer = Cmorizer(self, startdate, member)
        cmorizer.cmorize_ocean()
        cmorizer.cmorize_atmos()
        Log.result('CMORized startdate {0} member {1}! Ellpased time: {2}\n\n', startdate, member_str,
                   datetime.now() - start_time)
    def _unpack_cmor_files(self, startdate, member):
        if self.config.cmor.force:
            return False
        chunk = 1
        cmorized = False

        if not self.config.cmor.force_untar:
            while self.is_cmorized(startdate, member, chunk, ModelingRealms.ocean) or\
                    self.is_cmorized(startdate, member, chunk, ModelingRealms.atmos):
                chunk += 1

        while self._unpack_chunk(startdate, member, chunk):
            chunk += 1
            cmorized = True

        return cmorized

    def _unpack_chunk(self, startdate, member, chunk):

        filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz')
            Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk)
            Utils.unzip(filepaths, True)

        if not os.path.exists(self.cmor_path):
            os.mkdir(self.cmor_path)

        filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar')
            Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk)
            Utils.untar(filepaths, self.cmor_path)
            self._correct_paths(startdate)
            self._create_links(startdate)
            return True
        return False

    def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension):
        tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
        tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
                                          'cmorfiles')
        file_name = 'CMOR?_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate,
                                                         self.experiment.get_member_str(member),
                                                         self.experiment.get_chunk_start_str(startdate, chunk),
                                                         extension)
        filepaths = glob.glob(os.path.join(tar_path, file_name))
        filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
        filepaths += glob.glob(os.path.join(tar_original_files, file_name))
        filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
        return filepaths
        self._remove_extra_output_folder()
        self._fix_model_as_experiment_error(startdate)
    def _fix_model_as_experiment_error(self, startdate):
        if self.experiment.experiment_name != self.experiment.model:
            bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
                                    self.experiment.model)
            Log.debug('Correcting double model appearance')
            for (dirpath, dirnames, filenames) in os.walk(bad_path, False):

                for filename in filenames:
                    filepath = os.path.join(dirpath, filename)
                    good = filepath.replace('_{0}_output_'.format(self.experiment.model),
                                            '_{0}_{1}_S{2}_'.format(self.experiment.model,
                                                                    self.experiment.experiment_name,
                                                                    startdate))

                    good = good.replace('/{0}/{0}'.format(self.experiment.model),
                                        '/{0}/{1}'.format(self.experiment.model,
                                                          self.experiment.experiment_name))

                    Utils.move_file(filepath, good)
                os.rmdir(dirpath)
            Log.debug('Done')

    def _remove_extra_output_folder(self):
        bad_path = os.path.join(self.cmor_path, 'output')
        if os.path.exists(bad_path):
            Log.debug('Moving CMOR files out of the output folder')
            CMORManager.copytree(bad_path, self.cmor_path)
            shutil.rmtree(bad_path)
    def copytree(source, destiny):
        if not os.path.exists(destiny):
            os.makedirs(destiny)
            shutil.copystat(source, destiny)
        lst = os.listdir(source)
            item_source = os.path.join(source, item)
            item_destiny = os.path.join(destiny, item)
            if os.path.isdir(item_source):
                CMORManager.copytree(item_source, item_destiny)
                shutil.copy2(item_source, item_destiny)
        Log.info('Creating links for CMOR files ({0})', startdate)
        path = self._get_startdate_path(startdate)
        for freq in os.listdir(path):
            frequency = Frequency.parse(freq)
            for domain in os.listdir(os.path.join(path, freq)):
                for var in os.listdir(os.path.join(path, freq, domain)):
                    for member in os.listdir(os.path.join(path, freq, domain, var)):
                        for name in os.listdir(os.path.join(path, freq, domain, var, member)):
                            filepath = os.path.join(path, freq, domain, var, member, name)
                            if os.path.isfile(filepath):
                                self._create_link(domain, filepath, frequency, var, "", False,
                                                  vartype=VariableType.MEAN)
                                    self._create_link(domain, os.path.join(filepath, filename), frequency, var, "", False,
                                                      vartype=VariableType.MEAN)
        Log.debug('Links ready')

    def _get_startdate_path(self, startdate):
        """
        Returns the path to the startdate's CMOR folder
        :param startdate: target startdate
        :type startdate: str
        :return: path to the startdate's CMOR º
        :rtype: str
        """
        return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
                            self.experiment.model, self.experiment.experiment_name, 'S' + startdate)