Newer
Older
# coding=utf-8
import os
from time import strptime
import iris
import netCDF4
import numpy as np
from bscearth.utils.date import parse_date, add_months, chunk_start_date, chunk_end_date
from bscearth.utils.log import Log
from datafile import DataFile, StorageStatus, LocalStatus
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable_type import VariableType
class THREDDSManager(DataManager):
"""
Data manager class for CMORized experiments
"""
def __init__(self, config):
super(THREDDSManager, self).__init__(config)
self.server_url = config.thredds.server_url
data_folders = self.config.data_dir.split(':')
self.config.data_dir = None
for data_folder in data_folders:
if os.path.isdir(os.path.join(data_folder, self.config.data_type, self.experiment.institute.lower(),
Javier Vegas-Regidor
committed
self.experiment.model.lower())):
self.config.data_dir = data_folder
break
if not self.config.data_dir:
raise Exception('Can not find model data')
if self.config.data_type in ('obs', 'recon') and self.experiment.chunk_size != 1:
raise Exception('For obs and recon data chunk_size must be always 1')
def get_leadtimes(self, domain, variable, startdate, member, leadtimes, frequency=None, vartype=VariableType.MEAN):
aggregation_path = self.get_var_url(variable, startdate, frequency, None, vartype)
startdate = parse_date(startdate)
Javier Vegas-Regidor
committed
start_chunk = chunk_start_date(startdate, self.experiment.num_chunks, self.experiment.chunk_size,
'month', self.experiment.calendar)
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
thredds_subset = THREDDSSubset(aggregation_path, "", variable, startdate, end_chunk)
selected_months = ','.join([str(add_months(startdate, i, self.experiment.calendar).month) for i in leadtimes])
Javier Vegas-Regidor
committed
if self.config.data_type == 'exp':
select_months = '-selmonth,{0} {1}'.format(selected_months, thredds_subset)
selected_years = ','.join([str(add_months(startdate, i, self.experiment.calendar).year) for i in leadtimes])
Javier Vegas-Regidor
committed
Utils.cdo.selyear(selected_years, input=select_months, output=temp)
else:
Utils.cdo.selmonth(selected_months, input=thredds_subset, output=temp)
return temp
# noinspection PyUnusedLocal
def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)
start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month',
self.experiment.calendar)
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
thredds_subset = THREDDSSubset(aggregation_path, "", var, start_chunk, end_chunk)
return thredds_subset
Javier Vegas-Regidor
committed
def get_file_path(self, startdate, domain, var, frequency, vartype,
Javier Vegas-Regidor
committed
box=None, grid=None):
"""
Returns the path to a concrete file
:param startdate: file's startdate
:type startdate: str
:param domain: file's domain
:type domain: str
:param var: file's var
:type var: str
:param frequency: file's frequency
:type frequency: Frequency
:param box: file's box
:type box: Box
:param grid: file's grid
:type grid: str
:return: path to the file
:rtype: str
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
"""
if not frequency:
frequency = self.config.frequency
var = self._get_final_var_name(box, var)
Javier Vegas-Regidor
committed
folder_path = self._get_folder_path(frequency, domain, var, grid, vartype)
file_name = self._get_file_name(var, startdate)
filepath = os.path.join(folder_path, file_name)
return filepath
Javier Vegas-Regidor
committed
def _get_folder_path(self, frequency, domain, variable, grid, vartype):
if self.config.data_type == 'exp':
var_folder = self.get_varfolder(domain, variable, grid)
else:
var_folder = variable
folder_path = os.path.join(self.config.data_dir, self.config.data_type,
self.experiment.institute.lower(),
self.experiment.model.lower(),
frequency.folder_name(vartype),
return folder_path
def get_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN):
"""
Ge a file containing all the data for one year for one variable
:param domain: variable's domain
:type domain: str
:param var: variable's name
:type var: str
:param startdate: startdate to retrieve
:type startdate: str
:param member: member to retrieve
:type member: int
:param year: year to retrieve
:type year: int
:param grid: variable's grid
:type grid: str
:param box: variable's box
:type box: Box
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
aggregation_path = self.get_var_url(var, startdate, None, box, vartype)
thredds_subset = THREDDSSubset(aggregation_path, "", var, datetime(year, 1, 1), datetime(year+1, 1, 1))
def get_var_url(self, var, startdate, frequency, box, vartype):
"""
Get url for dataset
:param var: variable to retrieve
:type var: str
:param startdate: startdate to retrieve
:type startdate: str
:param frequency: frequency to get:
:type frequency: Frequency | None
:param box: box to get
:type box: Box
:param vartype: type of variable
:type vartype: VariableType
:return:
"""
if not frequency:
frequency = self.config.frequency
var = self._get_final_var_name(box, var)
full_path = os.path.join(self.server_url, 'dodsC', self.config.data_type, self.experiment.institute,
self.experiment.model, frequency.folder_name(vartype))
if self.config.data_type == 'exp':
full_path = os.path.join(full_path, var, self._get_file_name(var, startdate))
full_path = os.path.join(full_path, self._get_file_name(var, None))
return full_path
def _get_file_name(self, var, startdate):
Javier Vegas-Regidor
committed
if self.config.data_type != 'exp':
startdate = startdate[0:6]
return '{0}_{1}.nc'.format(var, startdate)
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
:param cmor_var: variable instance describing the selected variable
:type cmor_var: Variable
"""
# THREDDSManager does not require links
pass
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param vartype:
:param domain: CMOR domain
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str|NoneType
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency|NoneType
:return: path to the copy created on the scratch folder
:rtype: str
"""
aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)
file_path = self.get_file_path(startdate, domain, var, frequency, vartype, box=box)
start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month',
self.experiment.calendar)
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
thredds_subset = THREDDSSubset(aggregation_path, file_path, var, start_chunk, end_chunk)
thredds_subset.local_status = LocalStatus.PENDING
self.requested_files[file_path] = thredds_subset
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
return thredds_subset
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN, diagnostic=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param diagnostic:
:param region:
:param domain: CMOR domain
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str|NoneType
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency|NoneType
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype)
file_path = self.get_file_path(startdate, domain, var, frequency, vartype, box=box)
start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month',
self.experiment.calendar)
end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
final_name = self._get_final_var_name(box, var)
if file_path in self.requested_files:
thredds_subset = self.requested_files[file_path]
else:
thredds_subset = THREDDSSubset(aggregation_path, file_path, var, start_chunk, end_chunk)
self.requested_files[file_path] = thredds_subset
thredds_subset.final_name = final_name
thredds_subset.diagnostic = diagnostic
thredds_subset.storage_status = StorageStatus.PENDING
return thredds_subset
class THREDDSError(Exception):
pass
Javier Vegas-Regidor
committed
class THREDDSSubset(DataFile):
def __init__(self, thredds_path, file_path, var, start_time, end_time):
"""
:param thredds_path:
:param file_path:
:param var:
:type var: str
:param start_time:
:param end_time:
"""
super(THREDDSSubset, self).__init__()
self.remote_file = file_path
if '_f' in var:
self.var = var[:var.index('_f')]
self.hourly = var[var.index('_f'):]
else:
self.var = var
self.hourly = ''
self.dimension_indexes = {}
self.handler = None
self.start_time = start_time
self.end_time = end_time
def __str__(self):
return 'THREDDS {0.thredds_path} ({0.start_time}-{0.end_time})'.format(self)
try:
iris.FUTURE.netcdf_promote = True
iris.FUTURE.netcdf_no_unlimited = True
with iris.FUTURE.context(cell_datetime_objects=True):
time_constraint = iris.Constraint(time=lambda cell: self.start_time <= cell.point <= self.end_time)
var_cube = iris.load_cube(self.thredds_path, constraint=time_constraint, callback=self._correct_cube)
if not self.local_file:
self.local_file = TempFile.get()
iris.save(var_cube, self.local_file, zlib=True)
if not Utils.check_netcdf_file(self.local_file):
raise Exception('netcdf check for downloaded file failed')
Log.info('Request {0} ready!', self)
self.local_status = LocalStatus.READY
except Exception as ex:
Log.error('Can not retrieve {0} from server: {1}'.format(self, ex))
self.local_status = LocalStatus.FAILED
def _correct_cube(self, cube, field, filename):
if not cube.coords('time'):
time = cube.coord('time')
if time.units.origin.startswith('month'):
ref = strptime(time.units.origin[time.units.origin.index(' since ') + 7:], '%Y-%m-%d %H:%M:%S')
helper = np.vectorize(lambda x: datetime(year=ref.tm_year + int(x) / 12,
month=int(x-1) % 12 + 1,
day=ref.tm_mday))
times = np.round(time.points + ref.tm_mon)
dates = helper(times)
dates = netCDF4.date2num(dates, units='days since 1850-01-01', calendar=time.units.calendar)
new_time = DimCoord(dates, standard_name=time.standard_name, long_name=time.long_name,
var_name=time.var_name, attributes=time.attributes,
units=Unit('days since 1850-01-01', time.units.calendar))
[dimension] = cube.coord_dims(time)
cube.remove_coord(time)
cube.add_dim_coord(new_time, dimension)