threddsmanager.py 16.4 KB
Newer Older
# coding=utf-8
import os
from time import strptime

import iris
import netCDF4
import numpy as np
from bscearth.utils.date import parse_date, add_months, chunk_start_date, chunk_end_date
from bscearth.utils.log import Log
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from iris.coords import DimCoord
from cf_units import Unit
from datafile import DataFile, StorageStatus, LocalStatus
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.utils import TempFile, Utils
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from datetime import datetime
from earthdiagnostics.variable_type import VariableType

class THREDDSManager(DataManager):
    """
    Data manager class for CMORized experiments
    """
        super(THREDDSManager, self).__init__(config)
        self.server_url = config.thredds.server_url
        data_folders = self.config.data_dir.split(':')
        self.config.data_dir = None
        for data_folder in data_folders:
            if os.path.isdir(os.path.join(data_folder, self.config.data_type,  self.experiment.institute.lower(),
                self.config.data_dir = data_folder
                break

        if not self.config.data_dir:
            raise Exception('Can not find model data')
        if self.config.data_type in ('obs', 'recon') and self.experiment.chunk_size != 1:
            raise Exception('For obs and recon data chunk_size must be always 1')

    # noinspection PyUnusedLocal
    def get_leadtimes(self, domain, variable, startdate, member, leadtimes, frequency=None, vartype=VariableType.MEAN):
        aggregation_path = self.get_var_url(variable, startdate, frequency, None, vartype)
        startdate = parse_date(startdate)
        start_chunk = chunk_start_date(startdate, self.experiment.num_chunks, self.experiment.chunk_size,
                                       'month', self.experiment.calendar)
        end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        thredds_subset = THREDDSSubset(aggregation_path, "", variable, startdate, end_chunk)
        selected_months = ','.join([str(add_months(startdate, i, self.experiment.calendar).month) for i in leadtimes])
        temp = TempFile.get()
        if self.config.data_type == 'exp':
            select_months = '-selmonth,{0} {1}'.format(selected_months, thredds_subset)
            selected_years = ','.join([str(add_months(startdate, i, self.experiment.calendar).year) for i in leadtimes])
            Utils.cdo.selyear(selected_years, input=select_months, output=temp)
        else:
            Utils.cdo.selmonth(selected_months, input=thredds_subset, output=temp)
    def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
                    vartype=VariableType.MEAN):
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)

        start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month',
                                       self.experiment.calendar)
        end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
        thredds_subset = THREDDSSubset(aggregation_path, "", var, start_chunk, end_chunk)
    def get_file_path(self, startdate, domain, var, frequency, vartype,
        """
        Returns the path to a concrete file
        :param startdate: file's startdate
        :type startdate: str
        :param domain: file's domain
        :type domain: str
        :param var: file's var
        :type var: str
        :param frequency: file's frequency
        :type frequency: Frequency
        :param box: file's box
        :type box: Box
        :param grid: file's grid
        :type grid: str
        :return: path to the file
        :rtype: str
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        """
        if not frequency:
            frequency = self.config.frequency
        var = self._get_final_var_name(box, var)

        folder_path = self._get_folder_path(frequency, domain, var, grid, vartype)
        file_name = self._get_file_name(var, startdate)

        filepath = os.path.join(folder_path, file_name)
        return filepath

    def _get_folder_path(self, frequency, domain, variable, grid, vartype):

        if self.config.data_type == 'exp':
            var_folder = self.get_varfolder(domain, variable, grid)
        else:
            var_folder = variable

        folder_path = os.path.join(self.config.data_dir, self.config.data_type,
                                   self.experiment.institute.lower(),
                                   self.experiment.model.lower(),
                                   frequency.folder_name(vartype),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    # noinspection PyUnusedLocal
    def get_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN):
        """
        Ge a file containing all the data for one year for one variable
        :param domain: variable's domain
        :type domain: str
        :param var: variable's name
        :type var: str
        :param startdate: startdate to retrieve
        :type startdate: str
        :param member: member to retrieve
        :type member: int
        :param year: year to retrieve
        :type year: int
        :param grid: variable's grid
        :type grid: str
        :param box: variable's box
        :type box: Box
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        aggregation_path = self.get_var_url(var, startdate, None, box, vartype)
        thredds_subset = THREDDSSubset(aggregation_path, "", var, datetime(year, 1, 1), datetime(year+1, 1, 1))
        return thredds_subset.download()

    def get_var_url(self, var, startdate, frequency, box, vartype):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Get url for dataset
        :param var: variable to retrieve
        :type var: str
        :param startdate: startdate to retrieve
        :type startdate: str
        :param frequency: frequency to get:
        :type frequency: Frequency | None
        :param box: box to get
        :type box: Box
        :param vartype: type of variable
        :type vartype: VariableType
        :return:
        """
        if not frequency:
            frequency = self.config.frequency
        var = self._get_final_var_name(box, var)
        full_path = os.path.join(self.server_url, 'dodsC', self.config.data_type, self.experiment.institute,
                                 self.experiment.model, frequency.folder_name(vartype))
            full_path = os.path.join(full_path, var, self._get_file_name(var, startdate))
            full_path = os.path.join(full_path, self._get_file_name(var, None))
    def _get_file_name(self, var, startdate):
            if self.config.data_type != 'exp':
                startdate = startdate[0:6]
            return '{0}_{1}.nc'.format(var, startdate)
        else:
            return '{0}.nc'.format(var)
    def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
                  frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
        """
        Creates the link of a given file from the CMOR repository.

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param cmor_var: 
        :param move_old:
        :param date_str:
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: path to the copy created on the scratch folder
        :rtype: str
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param cmor_var: variable instance describing the selected variable
        :type cmor_var: Variable
        """
        # THREDDSManager does not require links
        pass
    def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
                      vartype=VariableType.MEAN):
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param vartype: 
        :param domain: CMOR domain
        :type domain: Domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str|NoneType
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency|NoneType
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)
        file_path = self.get_file_path(startdate, domain, var, frequency, vartype, box=box)

        start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month',
                                       self.experiment.calendar)
        end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)

        thredds_subset = THREDDSSubset(aggregation_path, file_path, var, start_chunk, end_chunk)
        thredds_subset.local_status = LocalStatus.PENDING
        self.requested_files[file_path] = thredds_subset
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    # noinspection PyUnusedLocal
    def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
                      vartype=VariableType.MEAN, diagnostic=None):
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param diagnostic: 
        :param region: 
        :param domain: CMOR domain
        :type domain: Domain
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str|NoneType
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: Frequency|NoneType
        :param vartype: Variable type (mean, statistic)
        :type vartype: VariableType
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)
        file_path = self.get_file_path(startdate, domain, var, frequency, vartype, box=box)

        start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month',
                                       self.experiment.calendar)
        end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
        final_name = self._get_final_var_name(box, var)
        if file_path in self.requested_files:
            thredds_subset = self.requested_files[file_path]
        else:
            thredds_subset = THREDDSSubset(aggregation_path, file_path, var, start_chunk, end_chunk)
            self.requested_files[file_path] = thredds_subset
        thredds_subset.final_name = final_name
        thredds_subset.diagnostic = diagnostic
        thredds_subset.storage_status = StorageStatus.PENDING

class THREDDSError(Exception):
    pass
class THREDDSSubset(DataFile):
    def __init__(self, thredds_path, file_path, var, start_time, end_time):
        """
        
        :param thredds_path: 
        :param file_path: 
        :param var: 
        :type var: str
        :param start_time: 
        :param end_time: 
        """
        super(THREDDSSubset, self).__init__()
        self.thredds_path = thredds_path
        self.remote_file = file_path
        if '_f' in var:
            self.var = var[:var.index('_f')]
            self.hourly = var[var.index('_f'):]
        else:
            self.var = var
            self.hourly = ''
        self.dimension_indexes = {}
        self.handler = None
        self.start_time = start_time
        self.end_time = end_time

    def __str__(self):
        return 'THREDDS {0.thredds_path} ({0.start_time}-{0.end_time})'.format(self)

    def download(self):
        try:
            iris.FUTURE.netcdf_promote = True
            iris.FUTURE.netcdf_no_unlimited = True
            with iris.FUTURE.context(cell_datetime_objects=True):
                time_constraint = iris.Constraint(time=lambda cell: self.start_time <= cell.point <= self.end_time)
                var_cube = iris.load_cube(self.thredds_path, constraint=time_constraint, callback=self._correct_cube)

            if not self.local_file:
                self.local_file = TempFile.get()
            iris.save(var_cube, self.local_file, zlib=True)
            if not Utils.check_netcdf_file(self.local_file):
                raise Exception('netcdf check for downloaded file failed')
            Log.info('Request {0} ready!', self)
            self.local_status = LocalStatus.READY
        except Exception as ex:
            Log.error('Can not retrieve {0} from server: {1}'.format(self, ex))
            self.local_status = LocalStatus.FAILED
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    # noinspection PyUnusedLocal,PyMethodMayBeStatic
    def _correct_cube(self, cube, field, filename):
        if not cube.coords('time'):
        time = cube.coord('time')
        if time.units.origin.startswith('month'):
            ref = strptime(time.units.origin[time.units.origin.index(' since ') + 7:], '%Y-%m-%d %H:%M:%S')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            helper = np.vectorize(lambda x: datetime(year=ref.tm_year + int(x) / 12,
                                                     month=int(x-1) % 12 + 1,
                                                     day=ref.tm_mday))
            times = np.round(time.points + ref.tm_mon)
            dates = helper(times)
            dates = netCDF4.date2num(dates, units='days since 1850-01-01', calendar=time.units.calendar)
            new_time = DimCoord(dates, standard_name=time.standard_name, long_name=time.long_name,
                                var_name=time.var_name, attributes=time.attributes,
                                units=Unit('days since 1850-01-01', time.units.calendar))
            [dimension] = cube.coord_dims(time)
            cube.remove_coord(time)
            cube.add_dim_coord(new_time, dimension)