Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
import numpy as np
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day
def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name, num_chunks,
calendar='standard'):
"""
:param institution:
:param model:
:param expid:
:param datafolder:
:param frequency:
:param chunk_size:
:param experiment_name:
:param num_chunks:
:param calendar:
"""
self.institution = institution
self.model = model
self.expid = expid
self.data_dir = datafolder
self.frequency = frequency
self.chunk_size = chunk_size
self.experiment_name = experiment_name
self.add_startdate = True
self.add_name = True
self.num_chunks = num_chunks
self.calendar = calendar
# noinspection PyPep8Naming
def prepare_CMOR_files(self, startdates, members):
"""
Prepares the data to be used by the diagnostic.
If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
will be launched
If CMOR data is available but packed, the procedure will unpack it.
:param startdates: list of startdates that will be used by the diagnostics
:type startdates: list[str]
:param members: lists of members that will be used by the diagnostics
:type members: list[int]
:return:
"""
# Check if cmorized and convert if not
if not os.path.exists(os.path.join(self.data_dir, self.expid)):
raise Exception('The experiment {0} is not CMORized. '
'Please, CMORize it and launch again.'.format(self.expid))
for startdate in startdates:
for member in members:
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc{0}'.format(member), 'outputs')
Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))
filepaths = glob.glob(os.path.join(member_path, '*.gz'))
if len(filepaths) == 0:
continue
threads = list()
numthreads = Utils.available_cpu_count()
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
for numthread in range(0, numthreads):
t = threading.Thread(target=DataManager._unzip,
args=([filepaths[numthread::numthreads]]))
threads.append(t)
t.start()
for t in threads:
t.join()
filepaths = glob.glob(os.path.join(member_path, '*.tar'))
for numthread in range(0, numthreads):
t = threading.Thread(target=DataManager._untar,
args=(filepaths[numthread::numthreads], member_path))
threads.append(t)
t.start()
for t in threads:
t.join()
if self.experiment_name != self.model:
bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model)
for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_output_'.format(self.model),
'_{0}_{1}_'.format(self.model, self.experiment_name))
good = good.replace('/{0}/{0}'.format(self.model),
'/{0}/{1}'.format(self.model,
self.experiment_name))
Utils.move_file(filepath, good)
os.rmdir(dirpath)
good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name)
for sdate in os.listdir(good_dir):
for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name, sdate),
'_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate))
if good != filepath:
Log.info('Moving {0} to {1}'.format(filename, good))
Utils.move_file(filepath, good)
Javier Vegas-Regidor
committed
@staticmethod
def _unzip(files):
for filepath in files:
Log.debug('Unzipping {0}', filepath)
Utils.execute_shell_command('gunzip {0}'.format(filepath))
@staticmethod
def _untar(files, member_path):
for filepath in files:
Log.debug('Unpacking {0}', filepath)
Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
os.remove(filepath)
def get_files(self, startdate, member, chunk, domain, variables, grid=None):
"""
Returns a list of filenames for different variables
:param startdate: startdate to retrieve
:type startdate: str
:param member: member to retrieve
:type member: int
:param chunk: chunk to retrieve
:type chunk: int
:param domain: variable's CMOR domain
:type domain:str
:param variables: variables list
:param grid: specifies if the variable must be in a interpolated grid
:type grid: str
:return:
"""
file_names = list()
if domain == 'seaIce':
domain_abreviattion = 'OI'
else:
domain_abreviattion = domain[0].upper()
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency,
domain)
chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
var_file = list()
for var in variables:
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
var_file.append(os.path.join(var_path,
'{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}-{8}.nc'.format(var, domain_abreviattion,
Javier Vegas-Regidor
committed
self.frequency, self.model, self.experiment_name,
startdate, member_plus,
"{0:04}{1:02}".format(chunk_start.year,
chunk_start.month),
"{0:04}{1:02}".format(chunk_end.year,
chunk_end.month))))
file_names.append(var_file)
return file_names
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
if domain == 'seaIce':
domain_abreviattion = 'OI'
else:
domain_abreviattion = domain[0].upper()
if not frequency:
frequency = self.frequency
Javier Vegas-Regidor
committed
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
self.institution, self.model, self.experiment_name, 'S' + startdate, frequency,
Javier Vegas-Regidor
committed
domain)
chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}-{8}.nc'.format(var, domain_abreviattion, frequency, self.model,
Javier Vegas-Regidor
committed
self.experiment_name, startdate, member_plus,
"{0:04}{1:02}".format(chunk_start.year,
chunk_start.month),
"{0:04}{1:02}".format(chunk_end.year,
chunk_end.month)))
temp_path = TempFile.get()
shutil.copyfile(filepath, temp_path)
return temp_path
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
rename_var=None, frequency=None, year=None):
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Utils.convert2netcdf4(filetosend)
Javier Vegas-Regidor
committed
if domain == 'seaIce':
domain_abreviattion = 'OI'
else:
domain_abreviattion = domain[0].upper()
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
if rename_var:
Utils.rename_variable(filetosend, rename_var, var)
if not frequency:
frequency = self.frequency
Javier Vegas-Regidor
committed
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
self.institution, self.model, self.experiment_name, 'S' + startdate, frequency,
Javier Vegas-Regidor
committed
domain)
if chunk is not None:
chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
chunk_end.month)
elif year is not None:
if frequency is not 'yr':
raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
time_bound = str(year)
else:
raise ValueError('Chunk and year can not be None at the same time')
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}.nc'.format(var, domain_abreviattion, frequency, self.model,
self.experiment_name, startdate, member_plus, time_bound))
Javier Vegas-Regidor
committed
if region:
if not os.path.exists(filepath):
handler = Utils.openCdf(filetosend)
handler.createDimension('region')
Javier Vegas-Regidor
committed
var_region = handler.createVariable('region', str, 'region')
var_region[0] = region
original_var = handler.variables[var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
Utils.rename_variable(filetosend, 'new_var', var)
else:
temp = TempFile.get()
shutil.copyfile(filepath, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(filetosend)
value = handler_send.variables[var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, filetosend)
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region')
Javier Vegas-Regidor
committed
Utils.move_file(filetosend, filepath)
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
"""
Gets all the data corresponfing to a given year from the CMOR repository to the scratch folder as one file and
returns the path to the scratch's copy.
:param year: year to retrieve
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:return: path to the copy created on the scratch folder
:rtype: str
"""
chunk_files = list()
for chunk in self.get_year_chunks(startdate, year):
chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))
if len(chunk_files) > 1:
temp = TempFile.get()
Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
for chunk_file in chunk_files:
os.remove(chunk_file)
else:
temp = chunk_files[0]
temp2 = TempFile.get()
handler = Utils.openCdf(temp)
time = Utils.get_datetime_from_netcdf(handler)
handler.close()
start = None
end = None
for x in range(0, len(time)):
date = time[x]
if date.year == year:
if date.month == 1:
start = x
elif date.month == 12:
end = x
Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
os.remove(temp)
return temp2
def get_year_chunks(self, startdate, year):
"""
Get the list of chunks containing timesteps from the given year
:param startdate: startdate to use
:type startdate: str
:param year: reference year
:type year: int
:return: list of chunks containing data from the given year
:rtype: list[int]
date = parse_date(startdate)
chunks = list()
for chunk in range(1, self.num_chunks+1):
chunk_start = chunk_start_date(date, chunk, self.chunk_size, 'month', self.calendar)
if chunk_start.year > year:
break
elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
self.calendar).year == year:
return chunks
def get_full_years(self, startdate):
"""
Returns the list of full years that are in the given startdate
:param startdate: startdate to use
:type startdate: str
:return: list of full years
:rtype: list[int]
"""
chunks_per_year = 12 / self.chunk_size
date = parse_date(startdate)
first_january = 0
first_year = date.year
if date.month != 1:
month = date.month
first_year += 1
while month + self.chunk_size < 12:
month += self.chunk_size
first_january += 1
years = list()
for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year):
years.append(first_year)
first_year += 1
return years