# coding=utf-8 import os from bscearth.utils.date import parse_date, add_months, chunk_start_date, chunk_end_date from earthdiagnostics.datamanager import DataManager, NetCDFFile from earthdiagnostics.utils import TempFile, Utils from datetime import datetime from earthdiagnostics.variable import VariableManager from earthdiagnostics.variable_type import VariableType class THREDDSManager(DataManager): """ Data manager class for CMORized experiments """ def __init__(self, config): super(THREDDSManager, self).__init__(config) self.server_url = config.thredds.server_url data_folders = self.config.data_dir.split(':') self.config.data_dir = None for data_folder in data_folders: if os.path.isdir(os.path.join(data_folder, self.config.data_type, self.experiment.institute.lower(), self.experiment.model.lower())): self.config.data_dir = data_folder break if not self.config.data_dir: raise Exception('Can not find model data') if self.config.data_type in ('obs', 'recon') and self.experiment.chunk_size != 1: raise Exception('For obs and recon data chunk_size must be always 1') # noinspection PyUnusedLocal def get_leadtimes(self, domain, variable, startdate, member, leadtimes, frequency=None, vartype=VariableType.MEAN): aggregation_path = self.get_var_url(variable, startdate, frequency, None, vartype) startdate = parse_date(startdate) start_chunk = chunk_start_date(startdate, self.experiment.num_chunks, self.experiment.chunk_size, 'month', self.experiment.calendar) end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) thredds_subset = THREDDSSubset(aggregation_path, variable, startdate, end_chunk).get_url() selected_months = ','.join([str(add_months(startdate, i, self.experiment.calendar).month) for i in leadtimes]) temp = TempFile.get() if self.config.data_type == 'exp': select_months = '-selmonth,{0} {1}'.format(selected_months, thredds_subset) selected_years = ','.join([str(add_months(startdate, i, self.experiment.calendar).year) for i in leadtimes]) Utils.cdo.selyear(selected_years, input=select_months, output=temp) else: Utils.cdo.selmonth(selected_months, input=thredds_subset, output=temp) return temp def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=VariableType.MEAN): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: Frequency :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype) start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) thredds_subset = THREDDSSubset(aggregation_path, var, start_chunk, end_chunk) return thredds_subset.check() def get_file_path(self, startdate, domain, var, frequency, vartype, box=None, grid=None): """ Returns the path to a concrete file :param startdate: file's startdate :type startdate: str :param domain: file's domain :type domain: str :param var: file's var :type var: str :param frequency: file's frequency :type frequency: Frequency :param box: file's box :type box: Box :param grid: file's grid :type grid: str :return: path to the file :rtype: str :param vartype: Variable type (mean, statistic) :type vartype: VariableType """ if not frequency: frequency = self.config.frequency var = self._get_final_var_name(box, var) folder_path = self._get_folder_path(frequency, domain, var, grid, vartype) file_name = self._get_file_name(var, startdate) filepath = os.path.join(folder_path, file_name) return filepath def _get_folder_path(self, frequency, domain, variable, grid, vartype): if self.config.data_type == 'exp': var_folder = self.get_varfolder(domain, variable, grid) else: var_folder = variable folder_path = os.path.join(self.config.data_dir, self.config.data_type, self.experiment.institute.lower(), self.experiment.model.lower(), frequency.folder_name(vartype), var_folder) return folder_path def get_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN): """ Ge a file containing all the data for one year for one variable :param domain: variable's domain :type domain: str :param var: variable's name :type var: str :param startdate: startdate to retrieve :type startdate: str :param member: member to retrieve :type member: int :param year: year to retrieve :type year: int :param grid: variable's grid :type grid: str :param box: variable's box :type box: Box :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: """ aggregation_path = self.get_var_url(var, startdate, None, box, vartype) thredds_subset = THREDDSSubset(aggregation_path, var, datetime(year, 1, 1), datetime(year+1, 1, 1)) return thredds_subset.download() def get_var_url(self, var, startdate, frequency, box, vartype): """ Get url for dataset :param var: variable to retrieve :type var: str :param startdate: startdate to retrieve :type startdate: str :param frequency: frequency to get: :type frequency: Frequency | None :param box: box to get :type box: Box :param vartype: type of variable :type vartype: VariableType :return: """ if not frequency: frequency = self.config.frequency var = self._get_final_var_name(box, var) full_path = os.path.join(self.server_url, 'dodsC', self.config.data_type, self.experiment.institute, self.experiment.model, frequency.folder_name(vartype)) if self.config.data_type == 'exp': full_path = os.path.join(full_path, var, self._get_file_name(var, startdate)) else: full_path = os.path.join(full_path, self._get_file_name(var, None)) return full_path def _get_file_name(self, var, startdate): if startdate: if self.config.data_type != 'exp': startdate = startdate[0:6] return '{0}_{1}.nc'.format(var, startdate) else: return '{0}.nc'.format(var) def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None, frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN): """ Creates the link of a given file from the CMOR repository. :param move_old: :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :param vartype: Variable type (mean, statistic) :type vartype: VariableType :return: path to the copy created on the scratch folder :rtype: str """ # THREDDSManager does not require links pass class THREDDSError(Exception): pass class THREDDSSubset: def __init__(self, thredds_path, var, start_time, end_time): self.thredds_path = thredds_path self.var = var self.dimension_indexes = {} self.handler = None self.start_time = start_time self.end_time = end_time def get_url(self): self.handler = Utils.openCdf(self.thredds_path) self._read_metadata() self.handler.close() self._get_time_indexes() return self._get_subset_url() def download(self): url = self.get_url() return self._download_url(url) def check(self): # noinspection PyBroadException try: self.handler = Utils.openCdf(self.get_url()) self.handler.close() return True except Exception: return False def _read_metadata(self): self.var_dimensions = self.handler.variables[self.var].dimensions for dimension in self.var_dimensions: if dimension == 'time': continue self.dimension_indexes[dimension] = (0, self.handler.dimensions[dimension].size - 1) if 'time' in self.var_dimensions: self.times = Utils.get_datetime_from_netcdf(self.handler) def _get_time_indexes(self): if 'time' not in self.var_dimensions: return time_start = 0 while time_start < self.times.size and self.times[time_start] < self.start_time: time_start += 1 if time_start == self.times.size: raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time)) time_end = time_start if self.times[time_end] >= self.end_time: raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time)) while time_end < self.times.size - 1 and self.times[time_end + 1] < self.end_time: time_end += 1 self.dimension_indexes['time'] = (time_start, time_end) @staticmethod def _download_url(url): temp = TempFile.get() Utils.execute_shell_command(['nccopy', '-s', '-d', '-4', url, temp]) if not Utils.check_netcdf_file(temp): raise THREDDSError('Can not retrieve {0} from server'.format(url)) return temp def _get_subset_url(self): var_slice = self.var dimensions_slice = '' for dimension in self.var_dimensions: slice_index = self._get_slice_index(self.dimension_indexes[dimension]) var_slice += slice_index if dimension == 'ensemble': dimension = 'realization' dimensions_slice += '{0}{1},'.format(dimension, slice_index) return '{0}?{1}{2}'.format(self.thredds_path, dimensions_slice, var_slice) @staticmethod def _get_slice_index(index_tuple): return '[{0[0]}:1:{0[1]}]'.format(index_tuple)