Newer
Older
# coding=utf-8
import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, add_months, chunk_start_date, chunk_end_date, date2str
from earthdiagnostics.datamanager import DataManager, NetCDFFile
from earthdiagnostics.utils import TempFile, Utils
Javier Vegas-Regidor
committed
from earthdiagnostics.variable import Variable, VarType
class THREDDSManager(DataManager):
"""
Data manager class for CMORized experiments
"""
def __init__(self, config):
super(THREDDSManager, self).__init__(config)
self.server_url = config.thredds.server_url
data_folders = self.config.data_dir.split(':')
self.config.data_dir = None
for data_folder in data_folders:
if os.path.isdir(os.path.join(data_folder, self.config.data_type, self.experiment.institute.lower(),
Javier Vegas-Regidor
committed
self.experiment.model.lower())):
self.config.data_dir = data_folder
break
if not self.config.data_dir:
raise Exception('Can not find model data')
if self.config.data_type in ('obs', 'recon') and self.experiment.chunk_size !=1 :
raise Exception('For obs and recon data chunk_size must be always 1')
Javier Vegas-Regidor
committed
def get_leadtimes(self, domain, variable, startdate, member, leadtimes, frequency=None, vartype=VarType.MEAN):
if not frequency:
frequency = self.config.frequency
aggregation_path = self.get_var_url(variable, startdate, frequency, None, vartype)
temp = TempFile.get()
startdate = parse_date(startdate)
selected_months = ','.join([str(add_months(startdate, i, 'standard').month) for i in leadtimes])
select_months = '-selmonth,{0} {1}'.format(selected_months, aggregation_path)
selected_years = ','.join([str(add_months(startdate, i, 'standard').year) for i in leadtimes])
Utils.cdo.selyear(selected_years, input=select_months, output=temp)
return temp
Javier Vegas-Regidor
committed
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VarType.MEAN):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
if not frequency:
frequency = self.config.frequency
aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)
start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month', 'standard')
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', 'standard')
handler = Utils.openCdf(aggregation_path)
times = Utils.get_datetime_from_netcdf(handler)
lat_size = handler.dimensions['latitude'].size -1
lon_size = handler.dimensions['longitude'].size -1
handler.close()
time_start = 0
while time_start < times.size and times[time_start] < start_chunk:
time_start += 1
if time_start == times.size:
raise Exception('Timesteps not available for chunk {0}'.format(chunk))
time_end = time_start
if times[time_end] >= end_chunk:
raise Exception('Timesteps not available for chunk {0}'.format(chunk))
while time_end < times.size-1 and times[time_end+1] < end_chunk:
time_end += 1
slice_path = '{0}?time[{1}:1:{2}],latitude[0:1:{3}],longitude[0:1:{4}],' \
'{5}[{1}:1:{2}][0:1:{3}][0:1:{4}]'.format(aggregation_path, time_start, time_end,
lat_size, lon_size, var)
Log.debug(slice_path)
Utils.execute_shell_command(['nccopy', slice_path, temp])
if not Utils.check_netcdf_file(temp):
raise THREDDSError('Can not retrieve {0} from server'.format(aggregation_path))
return temp
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
Javier Vegas-Regidor
committed
diagnostic=None, cmorized=False, vartype=VarType.MEAN):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
:param move_old: if true, moves files following older conventions that may be found on the links folder
:type move_old: bool
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
Javier Vegas-Regidor
committed
:param diagnostic: diagnostic used to generate the file
:type diagnostic: Diagnostic
:param cmorized: flag to indicate if file was generated in cmorization process
:type cmorized: bool
Javier Vegas-Regidor
committed
if cmorized:
raise ValueError('cmorized is not supported in THREDDS manager')
original_var = var
cmor_var = Variable.get_variable(var)
var = self._get_final_var_name(box, var)
if rename_var and rename_var != var:
Utils.rename_variable(filetosend, rename_var, var)
elif original_var != var:
Utils.rename_variable(filetosend, original_var, var)
if not frequency:
frequency = self.config.frequency
start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month',
'standard')
filepath = self.get_file_path(date2str(start_chunk)[0:6], domain, var, frequency, vartype, box, grid)
netcdf_file = NetCDFFile(filepath, filetosend, domain, var, cmor_var)
if diagnostic:
netcdf_file.add_diagnostic_history(diagnostic)
else:
raise ValueError('You must provide a diagnostic to store data using the THREDDSmanager')
netcdf_file.send()
Javier Vegas-Regidor
committed
def get_file_path(self, startdate, domain, var, frequency, vartype,
Javier Vegas-Regidor
committed
box=None, grid=None):
"""
Returns the path to a concrete file
:param startdate: file's startdate
:type startdate: str
:param domain: file's domain
:type domain: str
:param var: file's var
:type var: str
:param frequency: file's frequency
:type frequency: str
:param box: file's box
:type box: Box
:param grid: file's grid
:type grid: str
:return: path to the file
:rtype: str
"""
if not frequency:
frequency = self.config.frequency
var = self._get_final_var_name(box, var)
Javier Vegas-Regidor
committed
folder_path = self._get_folder_path(frequency, domain, var, grid, vartype)
file_name = self._get_file_name(startdate, var)
filepath = os.path.join(folder_path, file_name)
return filepath
Javier Vegas-Regidor
committed
def _get_folder_path(self, frequency, domain, variable, grid, vartype):
if self.config.data_type == 'exp':
var_folder = self.get_varfolder(domain, variable, grid)
else:
var_folder = variable
folder_path = os.path.join(self.config.data_dir, self.config.data_type,
self.experiment.institute.lower(),
self.experiment.model.lower(),
Javier Vegas-Regidor
committed
self.frequency_folder_name(frequency, vartype),
return folder_path
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
"""
Ge a file containing all the data for one year for one variable
:param domain: variable's domain
:type domain: str
:param var: variable's name
:type var: str
:param startdate: startdate to retrieve
:type startdate: str
:param member: member to retrieve
:type member: int
:param year: year to retrieve
:type year: int
:param grid: variable's grid
:type grid: str
:param box: variable's box
:type box: Box
:return:
"""
def get_var_url(self, var, startdate, frequency, box, vartype):
var = self._get_final_var_name(box, var)
full_path = os.path.join(self.server_url, 'dodsC', self.config.data_type, self.experiment.institute,
self.experiment.model, self.frequency_folder_name(frequency, vartype))
if self.config.data_type == 'exp':
full_path = os.path.join(full_path, var, self._get_file_name(startdate, var))
full_path = os.path.join(full_path, self._get_file_name(None, var))
return full_path
def _get_file_name(self, startdate, var):
return '{0}_{1}.nc'.format(var, startdate)
else:
return '{0}.nc'.format(var)
def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VarType.MEAN):
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
# THREDDSManager does not require links
pass
class THREDDSError(Exception):
pass