threddsmanager.py 12.7 KB
Newer Older
from autosubmit.date.chunk_date_lib import parse_date, add_months

from earthdiagnostics.datamanager import DataManager, NetCDFFile
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import Variable


class THREDDSManager(DataManager):
    """
    Data manager class for CMORized experiments
    """
        super(THREDDSManager, self).__init__(config)
        self.server_url = config.thredds.server_url
        data_folders = self.config.data_dir.split(':')
        self.config.data_dir = None
        for data_folder in data_folders:
            if os.path.isdir(os.path.join(data_folder, self.experiment.institute.lower(),
                                          self.experiment.model.lower())):
                self.config.data_dir = data_folder
                break

        if not self.config.data_dir:
            raise Exception('Can not find model data')
    def get_leadtimes(self, domain, variable, startdate, member, leadtimes, frequency=None):
        if not frequency:
            frequency = self.config.frequency
        aggregation_path = self.get_var_url(variable, startdate, frequency, None, False)
        temp = TempFile.get()
        startdate = parse_date(startdate)
        selected_months = ','.join([str(add_months(startdate, i, 'standard').month) for i in leadtimes])
        select_months = '-selmonth,{0} {1}'.format(selected_months, aggregation_path)
        selected_years = ','.join([str(add_months(startdate, i, 'standard').year) for i in leadtimes])
        Utils.cdo.selyear(selected_years, input=select_months, output=temp)
        return temp

    def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        if not frequency:
            frequency = self.config.frequency
        aggregation_path = self.get_var_url(var, startdate, frequency, box, True)
        temp = TempFile.get()
        urllib.urlretrieve(aggregation_path, temp)
        if not Utils.check_netcdf_file(temp):
            raise THREDDSError('Can not retrieve {0} from server'.format(aggregation_path))
        return temp

    def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
                  rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
                  diagnostic=None, cmorized=False):
        """
        Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
        with already existing ones as needed

        :param move_old: if true, moves files following older conventions that may be found on the links folder
        :type move_old: bool
        :param date_str: exact date_str to use in the cmorized file
        :type: str
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param rename_var: if exists, the given variable will be renamed to the one given by var
        :type rename_var: str
        :param filetosend: path to the file to send to the CMOR repository
        :type filetosend: str
        :param region: specifies the region represented by the file. If it is defined, the data will be appended to the
            CMOR repository as a new region in the file or will overwrite if region was already present
        :type region: str
        :param domain: CMOR domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :param diagnostic: diagnostic used to generate the file
        :type diagnostic: Diagnostic
        :param cmorized: flag to indicate if file was generated in cmorization process
        :type cmorized: bool

        if cmorized:
            raise ValueError('cmorized is not supported in THREDDS manager')
        original_var = var
        cmor_var = Variable.get_variable(var)
        var = self._get_final_var_name(box, var)

        if rename_var and rename_var != var:
            Utils.rename_variable(filetosend, rename_var, var)
        elif original_var != var:
            Utils.rename_variable(filetosend, original_var, var)

        if not frequency:
            frequency = self.config.frequency

        filepath = self.get_file_path(startdate, domain, var, frequency, box,
                                      grid)
        netcdf_file = NetCDFFile(filepath, filetosend, domain, var, cmor_var)
        if diagnostic:
            netcdf_file.add_diagnostic_history(diagnostic)
        else:
            raise ValueError('You must provide a diagnostic to store data using the THREDDSmanager')
        netcdf_file.send()

    def get_file_path(self, startdate, domain, var, frequency,
                      box=None, grid=None):
        """
        Returns the path to a concrete file
        :param startdate: file's startdate
        :type startdate: str
        :param domain: file's domain
        :type domain: str
        :param var: file's var
        :type var: str
        :param frequency: file's frequency
        :type frequency: str
        :param box: file's box
        :type box: Box
        :param grid: file's grid
        :type grid: str
        :return: path to the file
        :rtype: str
        """
        if not frequency:
            frequency = self.config.frequency
        var = self._get_final_var_name(box, var)

        folder_path = self._get_folder_path(frequency, domain, var, grid)
        if startdate:
            file_name = '{0}_{1}.nc'.format(var, startdate)
        else:
            file_name = '{0}.nc'.format(var)

        filepath = os.path.join(folder_path, file_name)
        return filepath

    def _get_folder_path(self, frequency, domain, variable, grid):
        folder_path = os.path.join(self.config.data_dir,
                                   self.experiment.institute.lower(),
                                   self.experiment.model.lower(),
                                   self.frequency_folder_name(frequency),
                                   self.get_varfolder(domain, variable, grid))
        return folder_path

    def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
        """
        Ge a file containing all the data for one year for one variable
        :param domain: variable's domain
        :type domain: str
        :param var: variable's name
        :type var: str
        :param startdate: startdate to retrieve
        :type startdate: str
        :param member: member to retrieve
        :type member: int
        :param year: year to retrieve
        :type year: int
        :param grid: variable's grid
        :type grid: str
        :param box: variable's box
        :type box: Box
        :return:
        """

    def get_var_url(self, var, startdate, frequency, box, fileserver):
        var = self._get_final_var_name(box, var)
        if fileserver:
            protocol = 'fileServer'
        else:
            protocol = 'dodsC'
        return os.path.join(self.server_url, protocol, 'exp', self.experiment.institute,
                            self.experiment.model, self.frequency_folder_name(frequency),
                            var, '{0}_{1}.nc'.format(var, startdate))

    def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
                  frequency=None, year=None, date_str=None, move_old=False):
        """
        Creates the link of a given file from the CMOR repository.

        :param move_old:
        :param date_str:
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        # THREDDSManager does not require links
        pass


class THREDDSError(Exception):
    pass

def main():
    import shutil
    from datetime import datetime, timedelta

    thredds_time = list()
    copy_time = list()

    # thredds_time = lis
    # copy_time = [2, 3, 3]

    total_attempts = 100

    start_tests = datetime.now()
    attempts = range(total_attempts)
    for x in attempts:
        start = datetime.now()
        temp_thredds = '/scratch/Earth/jvegas/temp_thredds.nc'
        urllib.urlretrieve('http://earth.bsc.es/thredds/fileServer/exp/ecmwf/system4_m1/6hourly/tas/tas_19810101.nc',
                           temp_thredds)
        ellapesd = datetime.now() - start
        thredds_time.append(ellapesd.total_seconds())
        print ('THREDDS attempt {0}: {1}'.format(x+1, ellapesd))

        start = datetime.now()
        temp_copy = '/scratch/Earth/jvegas/temp_copy.nc'
        shutil.copyfile('/esnas/exp/ecmwf/system4_m1/6hourly/tas/tas_19810101.nc', temp_copy)
        ellapesd = datetime.now() - start
        copy_time.append(ellapesd.total_seconds())
        print ('Copy attempt {0}: {1}'.format(x + 1, ellapesd))

    end_test = datetime.now()
    import matplotlib as mpl
    import numpy as np
    mpl.use('QT5Agg')
    import matplotlib.pyplot as plt

    copy_time = np.array(copy_time)
    copy_average = np.average(copy_time)
    copy_std = np.std(copy_time)
    thredds_time = np.array(thredds_time)
    thredds_average = np.average(thredds_time)
    thredds_std = np.std(thredds_time)
    attempts = np.array(attempts)

    fontsize = 20
    fig, ax1 = plt.subplots(figsize=(16, 9))

    fig.suptitle('Comparison between THREDDS and direct copy performance on Moore '
                 '({0:%b %d %Y} {0:%H:%M} - {1:%H:%M}) \nFile used: system4_m1 tas_19810101.nc'.format(start_tests, end_test),
                 fontsize=fontsize)

    ax1.plot(attempts, copy_time, 'bo', markersize=8)
    plt.axhspan(copy_average-copy_std, copy_average + copy_std, facecolor='b', alpha=0.3)
    plt.axhline(y=copy_average, color='b')
    ax1.plot(attempts, thredds_time, 'ro', markersize=8)
    plt.axhspan(thredds_average - thredds_std, thredds_average + thredds_std, facecolor='r', alpha=0.3)
    plt.axhline(y=thredds_average, color='r')

    def time_ticks(x, pos):
        d = timedelta(seconds=x)
        return str(d)
    formatter = mpl.ticker.FuncFormatter(time_ticks)
    ax1.yaxis.set_major_formatter(formatter)
    ax1.set_ylabel('Time', fontsize=fontsize)
    ax1.set_xlabel('Attempt', fontsize=fontsize)
    ax1.set_xlim([-0.5, total_attempts - 0.5])
    ax1.set_ylim([0, 150])
    plt.tick_params(axis='x', which='both', bottom='off', top='off', labelbottom='off')
    plt.show()
    fig.savefig('cp{0:%Y-%m-%d-%H:%M}.png'.format(start_tests))

    with open('cp{0:%Y-%m-%d-%H:%M}.csv'.format(start_tests), 'wb') as csvfile:
        spamwriter = csv.writer(csvfile)
        spamwriter.writerow(['Attempt', 'THREDDS', 'Copy'])
        for x in range(total_attempts):
            spamwriter.writerow([x, thredds_time[x], copy_time[x]])

if __name__ == "__main__":
    main()