threddsmanager.py 13.7 KB
Newer Older
# coding=utf-8
import os
from autosubmit.date.chunk_date_lib import parse_date, add_months, chunk_start_date, chunk_end_date, date2str
from earthdiagnostics.datamanager import DataManager, NetCDFFile
from earthdiagnostics.utils import TempFile, Utils
from datetime import datetime
from earthdiagnostics.variable import Variable, VarType

class THREDDSManager(DataManager):
    """
    Data manager class for CMORized experiments
    """
        super(THREDDSManager, self).__init__(config)
        self.server_url = config.thredds.server_url
        data_folders = self.config.data_dir.split(':')
        self.config.data_dir = None
        for data_folder in data_folders:
            if os.path.isdir(os.path.join(data_folder, self.config.data_type,  self.experiment.institute.lower(),
                self.config.data_dir = data_folder
                break

        if not self.config.data_dir:
            raise Exception('Can not find model data')
        if self.config.data_type in ('obs', 'recon') and self.experiment.chunk_size !=1 :
            raise Exception('For obs and recon data chunk_size must be always 1')

    def get_leadtimes(self, domain, variable, startdate, member, leadtimes, frequency=None, vartype=VarType.MEAN):
        aggregation_path = self.get_var_url(variable, startdate, frequency, None, vartype)
        startdate = parse_date(startdate)
        start_chunk = chunk_start_date(startdate, self.experiment.num_chunks, self.experiment.chunk_size,
                                       'month', 'standard')
        end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', 'standard')

        thredds_subset = THREDDSSubset(aggregation_path, variable, startdate, end_chunk).get_url()
        selected_months = ','.join([str(add_months(startdate, i, 'standard').month) for i in leadtimes])
        temp = TempFile.get()
        if self.config.data_type == 'exp':
            select_months = '-selmonth,{0} {1}'.format(selected_months, thredds_subset)
            selected_years = ','.join([str(add_months(startdate, i, 'standard').year) for i in leadtimes])
            Utils.cdo.selyear(selected_years, input=select_months, output=temp)
        else:
            Utils.cdo.selmonth(selected_months, input=thredds_subset, output=temp)
    def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
                 vartype=VarType.MEAN):
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)

        start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month', 'standard')
        end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', 'standard')

        thredds_subset = THREDDSSubset(aggregation_path, var, start_chunk, end_chunk)
        return thredds_subset.download()

    def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
                  rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
                  diagnostic=None, cmorized=False, vartype=VarType.MEAN):
        """
        Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
        with already existing ones as needed

        :param move_old: if true, moves files following older conventions that may be found on the links folder
        :type move_old: bool
        :param date_str: exact date_str to use in the cmorized file
        :type: str
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param rename_var: if exists, the given variable will be renamed to the one given by var
        :type rename_var: str
        :param filetosend: path to the file to send to the CMOR repository
        :type filetosend: str
        :param region: specifies the region represented by the file. If it is defined, the data will be appended to the
            CMOR repository as a new region in the file or will overwrite if region was already present
        :type region: str
        :param domain: CMOR domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :param diagnostic: diagnostic used to generate the file
        :type diagnostic: Diagnostic
        :param cmorized: flag to indicate if file was generated in cmorization process
        :type cmorized: bool

        if cmorized:
            raise ValueError('cmorized is not supported in THREDDS manager')
        original_var = var
        cmor_var = Variable.get_variable(var)
        var = self._get_final_var_name(box, var)

        if rename_var and rename_var != var:
            Utils.rename_variable(filetosend, rename_var, var)
        elif original_var != var:
            Utils.rename_variable(filetosend, original_var, var)

        if not frequency:
            frequency = self.config.frequency

        filepath = self.get_file_path(startdate, domain, var, frequency, vartype, box, grid)
        netcdf_file = NetCDFFile(filepath, filetosend, domain, var, cmor_var)
        if diagnostic:
            netcdf_file.add_diagnostic_history(diagnostic)
        else:
            raise ValueError('You must provide a diagnostic to store data using the THREDDSmanager')
        netcdf_file.send()

    def get_file_path(self, startdate, domain, var, frequency, vartype,
        """
        Returns the path to a concrete file
        :param startdate: file's startdate
        :type startdate: str
        :param domain: file's domain
        :type domain: str
        :param var: file's var
        :type var: str
        :param frequency: file's frequency
        :type frequency: str
        :param box: file's box
        :type box: Box
        :param grid: file's grid
        :type grid: str
        :return: path to the file
        :rtype: str
        """
        if not frequency:
            frequency = self.config.frequency
        var = self._get_final_var_name(box, var)

        folder_path = self._get_folder_path(frequency, domain, var, grid, vartype)
        file_name = self._get_file_name(startdate, var)

        filepath = os.path.join(folder_path, file_name)
        return filepath

    def _get_folder_path(self, frequency, domain, variable, grid, vartype):

        if self.config.data_type == 'exp':
            var_folder = self.get_varfolder(domain, variable, grid)
        else:
            var_folder = variable

        folder_path = os.path.join(self.config.data_dir, self.config.data_type,
                                   self.experiment.institute.lower(),
                                   self.experiment.model.lower(),
    def get_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VarType.MEAN):
        """
        Ge a file containing all the data for one year for one variable
        :param domain: variable's domain
        :type domain: str
        :param var: variable's name
        :type var: str
        :param startdate: startdate to retrieve
        :type startdate: str
        :param member: member to retrieve
        :type member: int
        :param year: year to retrieve
        :type year: int
        :param grid: variable's grid
        :type grid: str
        :param box: variable's box
        :type box: Box
        :return:
        """
        aggregation_path = self.get_var_url(var, startdate, None, box, vartype)
        thredds_subset = THREDDSSubset(aggregation_path, var, datetime(year, 1, 1), datetime(year+1, 1, 1))
        return thredds_subset.download()

    def get_var_url(self, var, startdate, frequency, box, vartype):
        if not frequency:
            frequency = self.config.frequency
        var = self._get_final_var_name(box, var)
        full_path = os.path.join(self.server_url, 'dodsC', self.config.data_type, self.experiment.institute,
                                 self.experiment.model, self.frequency_folder_name(frequency, vartype))
        if self.config.data_type == 'exp':
            full_path = os.path.join(full_path, var, self._get_file_name(startdate, var))
            full_path = os.path.join(full_path, self._get_file_name(None, var))

    def _get_file_name(self, startdate, var):
            return '{0}_{1}.nc'.format(var, startdate[0:6])
        else:
            return '{0}.nc'.format(var)

    def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                  frequency=None, year=None, date_str=None, move_old=False, vartype=VarType.MEAN):
        """
        Creates the link of a given file from the CMOR repository.

        :param move_old:
        :param date_str:
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        # THREDDSManager does not require links
        pass


class THREDDSError(Exception):
    pass

class THREDDSSubset:
    def __init__(self, thredds_path, var, start_time, end_time):
        self.thredds_path = thredds_path
        self.var = var
        self.dimension_indexes = {}
        self.handler = None
        self.start_time = start_time
        self.end_time = end_time

    def get_url(self):
        self.handler = Utils.openCdf(self.thredds_path)
        self._read_metadata()
        self.handler.close()

        self._get_time_indexes()

        return self._get_subset_url()

    def download(self):
        url = self.get_url()
        return self._download_url(url)

    def _read_metadata(self):
        self.var_dimensions = self.handler.variables[self.var].dimensions
        for dimension in self.var_dimensions:
            if dimension == 'time':
                continue
            self.dimension_indexes[dimension] = (0, self.handler.dimensions[dimension].size - 1)

        if 'time' in self.var_dimensions:
            self.times = Utils.get_datetime_from_netcdf(self.handler)

    def _get_time_indexes(self):
        if 'time' not in self.var_dimensions:
            return

        time_start = 0
        while time_start < self.times.size and self.times[time_start] < self.start_time:
            time_start += 1
        if time_start == self.times.size:
            raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time))
        time_end = time_start
        if self.times[time_end] >= self.end_time:
            raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time))
        while time_end < self.times.size - 1 and self.times[time_end + 1] < self.end_time:
            time_end += 1
        self.dimension_indexes['time'] = (time_start, time_end)

    def _download_url(self, url):
        temp = TempFile.get()
        Utils.execute_shell_command(['nccopy', url, temp])
        if not Utils.check_netcdf_file(temp):
            raise THREDDSError('Can not retrieve {0} from server'.format(url))
        return temp

    def _get_subset_url(self):
        var_slice = self.var
        dimensions_slice = ''

        for dimension in self.var_dimensions:
            slice_index = self._get_slice_index(self.dimension_indexes[dimension])
            var_slice += slice_index
            dimensions_slice += '{0}{1},'.format(dimension, slice_index)

        return '{0}?{1}{2}'.format(self.thredds_path, dimensions_slice, var_slice)

    def _get_slice_index(self, index_tuple):
        return '[{0[0]}:1:{0[1]}]'.format(index_tuple)