Newer
Older
# coding=utf-8
import os
from autosubmit.date.chunk_date_lib import parse_date, add_months, chunk_start_date, chunk_end_date, date2str
from earthdiagnostics.datamanager import DataManager, NetCDFFile
from earthdiagnostics.utils import TempFile, Utils
Javier Vegas-Regidor
committed
from earthdiagnostics.variable import Variable, VarType
class THREDDSManager(DataManager):
"""
Data manager class for CMORized experiments
"""
def __init__(self, config):
super(THREDDSManager, self).__init__(config)
self.server_url = config.thredds.server_url
data_folders = self.config.data_dir.split(':')
self.config.data_dir = None
for data_folder in data_folders:
if os.path.isdir(os.path.join(data_folder, self.config.data_type, self.experiment.institute.lower(),
Javier Vegas-Regidor
committed
self.experiment.model.lower())):
self.config.data_dir = data_folder
break
if not self.config.data_dir:
raise Exception('Can not find model data')
if self.config.data_type in ('obs', 'recon') and self.experiment.chunk_size !=1 :
raise Exception('For obs and recon data chunk_size must be always 1')
Javier Vegas-Regidor
committed
def get_leadtimes(self, domain, variable, startdate, member, leadtimes, frequency=None, vartype=VarType.MEAN):
aggregation_path = self.get_var_url(variable, startdate, frequency, None, vartype)
startdate = parse_date(startdate)
Javier Vegas-Regidor
committed
start_chunk = chunk_start_date(startdate, self.experiment.num_chunks, self.experiment.chunk_size,
'month', 'standard')
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', 'standard')
Javier Vegas-Regidor
committed
thredds_subset = THREDDSSubset(aggregation_path, variable, startdate, end_chunk).get_url()
selected_months = ','.join([str(add_months(startdate, i, 'standard').month) for i in leadtimes])
Javier Vegas-Regidor
committed
if self.config.data_type == 'exp':
select_months = '-selmonth,{0} {1}'.format(selected_months, thredds_subset)
selected_years = ','.join([str(add_months(startdate, i, 'standard').year) for i in leadtimes])
Utils.cdo.selyear(selected_years, input=select_months, output=temp)
else:
Utils.cdo.selmonth(selected_months, input=thredds_subset, output=temp)
return temp
Javier Vegas-Regidor
committed
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VarType.MEAN):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)
start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month', 'standard')
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', 'standard')
thredds_subset = THREDDSSubset(aggregation_path, var, start_chunk, end_chunk)
return thredds_subset.download()
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
Javier Vegas-Regidor
committed
diagnostic=None, cmorized=False, vartype=VarType.MEAN):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
:param move_old: if true, moves files following older conventions that may be found on the links folder
:type move_old: bool
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
Javier Vegas-Regidor
committed
:param diagnostic: diagnostic used to generate the file
:type diagnostic: Diagnostic
:param cmorized: flag to indicate if file was generated in cmorization process
:type cmorized: bool
Javier Vegas-Regidor
committed
if cmorized:
raise ValueError('cmorized is not supported in THREDDS manager')
original_var = var
cmor_var = Variable.get_variable(var)
var = self._get_final_var_name(box, var)
if rename_var and rename_var != var:
Utils.rename_variable(filetosend, rename_var, var)
elif original_var != var:
Utils.rename_variable(filetosend, original_var, var)
if not frequency:
frequency = self.config.frequency
Javier Vegas-Regidor
committed
filepath = self.get_file_path(startdate, domain, var, frequency, vartype, box, grid)
netcdf_file = NetCDFFile(filepath, filetosend, domain, var, cmor_var)
if diagnostic:
netcdf_file.add_diagnostic_history(diagnostic)
else:
raise ValueError('You must provide a diagnostic to store data using the THREDDSmanager')
netcdf_file.send()
Javier Vegas-Regidor
committed
def get_file_path(self, startdate, domain, var, frequency, vartype,
Javier Vegas-Regidor
committed
box=None, grid=None):
"""
Returns the path to a concrete file
:param startdate: file's startdate
:type startdate: str
:param domain: file's domain
:type domain: str
:param var: file's var
:type var: str
:param frequency: file's frequency
:type frequency: str
:param box: file's box
:type box: Box
:param grid: file's grid
:type grid: str
:return: path to the file
:rtype: str
"""
if not frequency:
frequency = self.config.frequency
var = self._get_final_var_name(box, var)
Javier Vegas-Regidor
committed
folder_path = self._get_folder_path(frequency, domain, var, grid, vartype)
file_name = self._get_file_name(startdate, var)
filepath = os.path.join(folder_path, file_name)
return filepath
Javier Vegas-Regidor
committed
def _get_folder_path(self, frequency, domain, variable, grid, vartype):
if self.config.data_type == 'exp':
var_folder = self.get_varfolder(domain, variable, grid)
else:
var_folder = variable
folder_path = os.path.join(self.config.data_dir, self.config.data_type,
self.experiment.institute.lower(),
self.experiment.model.lower(),
Javier Vegas-Regidor
committed
self.frequency_folder_name(frequency, vartype),
return folder_path
def get_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VarType.MEAN):
"""
Ge a file containing all the data for one year for one variable
:param domain: variable's domain
:type domain: str
:param var: variable's name
:type var: str
:param startdate: startdate to retrieve
:type startdate: str
:param member: member to retrieve
:type member: int
:param year: year to retrieve
:type year: int
:param grid: variable's grid
:type grid: str
:param box: variable's box
:type box: Box
:return:
"""
aggregation_path = self.get_var_url(var, startdate, None, box, vartype)
thredds_subset = THREDDSSubset(aggregation_path, var, datetime(year, 1, 1), datetime(year+1, 1, 1))
return thredds_subset.download()
def get_var_url(self, var, startdate, frequency, box, vartype):
if not frequency:
frequency = self.config.frequency
var = self._get_final_var_name(box, var)
full_path = os.path.join(self.server_url, 'dodsC', self.config.data_type, self.experiment.institute,
self.experiment.model, self.frequency_folder_name(frequency, vartype))
if self.config.data_type == 'exp':
full_path = os.path.join(full_path, var, self._get_file_name(startdate, var))
full_path = os.path.join(full_path, self._get_file_name(None, var))
return full_path
def _get_file_name(self, startdate, var):
Javier Vegas-Regidor
committed
return '{0}_{1}.nc'.format(var, startdate[0:6])
def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VarType.MEAN):
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
# THREDDSManager does not require links
pass
class THREDDSError(Exception):
pass
Javier Vegas-Regidor
committed
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
class THREDDSSubset:
def __init__(self, thredds_path, var, start_time, end_time):
self.thredds_path = thredds_path
self.var = var
self.dimension_indexes = {}
self.handler = None
self.start_time = start_time
self.end_time = end_time
def get_url(self):
self.handler = Utils.openCdf(self.thredds_path)
self._read_metadata()
self.handler.close()
self._get_time_indexes()
return self._get_subset_url()
def download(self):
url = self.get_url()
return self._download_url(url)
def _read_metadata(self):
self.var_dimensions = self.handler.variables[self.var].dimensions
for dimension in self.var_dimensions:
if dimension == 'time':
continue
self.dimension_indexes[dimension] = (0, self.handler.dimensions[dimension].size - 1)
if 'time' in self.var_dimensions:
self.times = Utils.get_datetime_from_netcdf(self.handler)
def _get_time_indexes(self):
if 'time' not in self.var_dimensions:
return
time_start = 0
while time_start < self.times.size and self.times[time_start] < self.start_time:
time_start += 1
if time_start == self.times.size:
raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time))
time_end = time_start
if self.times[time_end] >= self.end_time:
raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time))
while time_end < self.times.size - 1 and self.times[time_end + 1] < self.end_time:
time_end += 1
self.dimension_indexes['time'] = (time_start, time_end)
def _download_url(self, url):
temp = TempFile.get()
Utils.execute_shell_command(['nccopy', url, temp])
if not Utils.check_netcdf_file(temp):
raise THREDDSError('Can not retrieve {0} from server'.format(url))
return temp
def _get_subset_url(self):
var_slice = self.var
dimensions_slice = ''
for dimension in self.var_dimensions:
slice_index = self._get_slice_index(self.dimension_indexes[dimension])
var_slice += slice_index
dimensions_slice += '{0}{1},'.format(dimension, slice_index)
return '{0}?{1}{2}'.format(self.thredds_path, dimensions_slice, var_slice)
def _get_slice_index(self, index_tuple):
return '[{0[0]}:1:{0[1]}]'.format(index_tuple)