Newer
Older
# coding=utf-8
import os
from bscearth.utils.date import parse_date, add_months, chunk_start_date, chunk_end_date
from earthdiagnostics.datamanager import DataManager, NetCDFFile
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.variable_type import VariableType
class THREDDSManager(DataManager):
"""
Data manager class for CMORized experiments
"""
def __init__(self, config):
super(THREDDSManager, self).__init__(config)
self.server_url = config.thredds.server_url
data_folders = self.config.data_dir.split(':')
self.config.data_dir = None
for data_folder in data_folders:
if os.path.isdir(os.path.join(data_folder, self.config.data_type, self.experiment.institute.lower(),
Javier Vegas-Regidor
committed
self.experiment.model.lower())):
self.config.data_dir = data_folder
break
if not self.config.data_dir:
raise Exception('Can not find model data')
if self.config.data_type in ('obs', 'recon') and self.experiment.chunk_size != 1:
raise Exception('For obs and recon data chunk_size must be always 1')
def get_leadtimes(self, domain, variable, startdate, member, leadtimes, frequency=None, vartype=VariableType.MEAN):
aggregation_path = self.get_var_url(variable, startdate, frequency, None, vartype)
startdate = parse_date(startdate)
Javier Vegas-Regidor
committed
start_chunk = chunk_start_date(startdate, self.experiment.num_chunks, self.experiment.chunk_size,
'month', self.experiment.calendar)
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
Javier Vegas-Regidor
committed
thredds_subset = THREDDSSubset(aggregation_path, variable, startdate, end_chunk).get_url()
selected_months = ','.join([str(add_months(startdate, i, self.experiment.calendar).month) for i in leadtimes])
Javier Vegas-Regidor
committed
if self.config.data_type == 'exp':
select_months = '-selmonth,{0} {1}'.format(selected_months, thredds_subset)
selected_years = ','.join([str(add_months(startdate, i, self.experiment.calendar).year) for i in leadtimes])
Javier Vegas-Regidor
committed
Utils.cdo.selyear(selected_years, input=select_months, output=temp)
else:
Utils.cdo.selmonth(selected_months, input=thredds_subset, output=temp)
return temp
def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)
start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month',
self.experiment.calendar)
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
thredds_subset = THREDDSSubset(aggregation_path, var, start_chunk, end_chunk)
return thredds_subset.check()
Javier Vegas-Regidor
committed
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)
start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month',
self.experiment.calendar)
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
thredds_subset = THREDDSSubset(aggregation_path, var, start_chunk, end_chunk)
return thredds_subset.download()
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
diagnostic=None, cmorized=False, vartype=VariableType.MEAN):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
:param move_old: if true, moves files following older conventions that may be found on the links folder
:type move_old: bool
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
Javier Vegas-Regidor
committed
:param diagnostic: diagnostic used to generate the file
:type diagnostic: Diagnostic
:param cmorized: flag to indicate if file was generated in cmorization process
:type cmorized: bool
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
Javier Vegas-Regidor
committed
if cmorized:
raise ValueError('cmorized is not supported in THREDDS manager')
original_var = var
cmor_var = VariableManager().get_variable(var)
var = self._get_final_var_name(box, var)
if rename_var and rename_var != var:
Utils.rename_variable(filetosend, rename_var, var)
elif original_var != var:
Utils.rename_variable(filetosend, original_var, var)
if not frequency:
frequency = self.config.frequency
Javier Vegas-Regidor
committed
filepath = self.get_file_path(startdate, domain, var, frequency, vartype, box, grid)
netcdf_file = NetCDFFile(filepath, filetosend, domain, var, cmor_var, self.config.data_convention, region)
if diagnostic:
netcdf_file.add_diagnostic_history(diagnostic)
else:
raise ValueError('You must provide a diagnostic to store data using the THREDDSmanager')
netcdf_file.send()
Javier Vegas-Regidor
committed
def get_file_path(self, startdate, domain, var, frequency, vartype,
Javier Vegas-Regidor
committed
box=None, grid=None):
"""
Returns the path to a concrete file
:param startdate: file's startdate
:type startdate: str
:param domain: file's domain
:type domain: str
:param var: file's var
:type var: str
:param frequency: file's frequency
:type frequency: Frequency
:param box: file's box
:type box: Box
:param grid: file's grid
:type grid: str
:return: path to the file
:rtype: str
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
"""
if not frequency:
frequency = self.config.frequency
var = self._get_final_var_name(box, var)
Javier Vegas-Regidor
committed
folder_path = self._get_folder_path(frequency, domain, var, grid, vartype)
file_name = self._get_file_name(var, startdate)
filepath = os.path.join(folder_path, file_name)
return filepath
Javier Vegas-Regidor
committed
def _get_folder_path(self, frequency, domain, variable, grid, vartype):
if self.config.data_type == 'exp':
var_folder = self.get_varfolder(domain, variable, grid)
else:
var_folder = variable
folder_path = os.path.join(self.config.data_dir, self.config.data_type,
self.experiment.institute.lower(),
self.experiment.model.lower(),
frequency.folder_name(vartype),
return folder_path
def get_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN):
"""
Ge a file containing all the data for one year for one variable
:param domain: variable's domain
:type domain: str
:param var: variable's name
:type var: str
:param startdate: startdate to retrieve
:type startdate: str
:param member: member to retrieve
:type member: int
:param year: year to retrieve
:type year: int
:param grid: variable's grid
:type grid: str
:param box: variable's box
:type box: Box
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
aggregation_path = self.get_var_url(var, startdate, None, box, vartype)
thredds_subset = THREDDSSubset(aggregation_path, var, datetime(year, 1, 1), datetime(year+1, 1, 1))
return thredds_subset.download()
def get_var_url(self, var, startdate, frequency, box, vartype):
"""
Get url for dataset
:param var: variable to retrieve
:type var: str
:param startdate: startdate to retrieve
:type startdate: str
:param frequency: frequency to get:
:type frequency: Frequency | None
:param box: box to get
:type box: Box
:param vartype: type of variable
:type vartype: VariableType
:return:
"""
if not frequency:
frequency = self.config.frequency
var = self._get_final_var_name(box, var)
full_path = os.path.join(self.server_url, 'dodsC', self.config.data_type, self.experiment.institute,
self.experiment.model, frequency.folder_name(vartype))
if self.config.data_type == 'exp':
full_path = os.path.join(full_path, var, self._get_file_name(var, startdate))
full_path = os.path.join(full_path, self._get_file_name(var, None))
return full_path
def _get_file_name(self, var, startdate):
Javier Vegas-Regidor
committed
if self.config.data_type != 'exp':
startdate = startdate[0:6]
return '{0}_{1}.nc'.format(var, startdate)
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
# THREDDSManager does not require links
pass
class THREDDSError(Exception):
pass
Javier Vegas-Regidor
committed
class THREDDSSubset:
def __init__(self, thredds_path, var, start_time, end_time):
self.thredds_path = thredds_path
self.var = var
self.dimension_indexes = {}
self.handler = None
self.start_time = start_time
self.end_time = end_time
def get_url(self):
self.handler = Utils.openCdf(self.thredds_path)
self._read_metadata()
self.handler.close()
self._get_time_indexes()
return self._get_subset_url()
def download(self):
url = self.get_url()
return self._download_url(url)
try:
self.handler = Utils.openCdf(self.get_url())
self.handler.close()
return True
except Exception:
return False
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def _read_metadata(self):
self.var_dimensions = self.handler.variables[self.var].dimensions
for dimension in self.var_dimensions:
if dimension == 'time':
continue
self.dimension_indexes[dimension] = (0, self.handler.dimensions[dimension].size - 1)
if 'time' in self.var_dimensions:
self.times = Utils.get_datetime_from_netcdf(self.handler)
def _get_time_indexes(self):
if 'time' not in self.var_dimensions:
return
time_start = 0
while time_start < self.times.size and self.times[time_start] < self.start_time:
time_start += 1
if time_start == self.times.size:
raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time))
time_end = time_start
if self.times[time_end] >= self.end_time:
raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time))
while time_end < self.times.size - 1 and self.times[time_end + 1] < self.end_time:
time_end += 1
self.dimension_indexes['time'] = (time_start, time_end)
@staticmethod
def _download_url(url):
Utils.execute_shell_command(['nccopy', '-s', '-d', '-4', url, temp])
if not Utils.check_netcdf_file(temp):
raise THREDDSError('Can not retrieve {0} from server'.format(url))
return temp
def _get_subset_url(self):
var_slice = self.var
dimensions_slice = ''
for dimension in self.var_dimensions:
slice_index = self._get_slice_index(self.dimension_indexes[dimension])
var_slice += slice_index
Javier Vegas-Regidor
committed
if dimension == 'ensemble':
dimension = 'realization'
dimensions_slice += '{0}{1},'.format(dimension, slice_index)
return '{0}?{1}{2}'.format(self.thredds_path, dimensions_slice, var_slice)
@staticmethod
def _get_slice_index(index_tuple):
return '[{0[0]}:1:{0[1]}]'.format(index_tuple)