Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
import numpy as np
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day
def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name, num_chunks,
calendar='standard'):
self.institution = institution
self.model = model
self.expid = expid
self.data_dir = datafolder
self.frequency = frequency
self.chunk_size = chunk_size
self.experiment_name = experiment_name
self.add_startdate = True
self.add_name = True
self.num_chunks = num_chunks
self.calendar = calendar
# noinspection PyPep8Naming
def prepare_CMOR_files(self, startdates, members):
# Check if cmorized and convert if not
if not os.path.exists(os.path.join(self.data_dir, self.expid)):
raise Exception('The experiment {0} is not CMORized. '
'Please, CMORize it and launch again.'.format(self.expid))
for startdate in startdates:
for member in members:
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc{0}'.format(member), 'outputs')
Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))
filepaths = glob.glob(os.path.join(member_path, '*.gz'))
if len(filepaths) == 0:
continue
threads = list()
numthreads = Utils.available_cpu_count()
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
for numthread in range(0, numthreads):
t = threading.Thread(target=DataManager._unzip,
args=([filepaths[numthread::numthreads]]))
threads.append(t)
t.start()
for t in threads:
t.join()
filepaths = glob.glob(os.path.join(member_path, '*.tar'))
for numthread in range(0, numthreads):
t = threading.Thread(target=DataManager._untar,
args=(filepaths[numthread::numthreads], member_path))
threads.append(t)
t.start()
for t in threads:
t.join()
if self.experiment_name != self.model:
bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model)
for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_output_'.format(self.model),
'_{0}_{1}_'.format(self.model, self.experiment_name))
good = good.replace('/{0}/{0}'.format(self.model),
'/{0}/{1}'.format(self.model,
self.experiment_name))
Utils.move_file(filepath, good)
os.rmdir(dirpath)
good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name)
for sdate in os.listdir(good_dir):
for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name, sdate),
'_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate))
if good != filepath:
Log.info('Moving {0} to {1}'.format(filename, good))
Utils.move_file(filepath, good)
Javier Vegas-Regidor
committed
@staticmethod
def _unzip(files):
for filepath in files:
Log.debug('Unzipping {0}', filepath)
Utils.execute_shell_command('gunzip {0}'.format(filepath))
@staticmethod
def _untar(files, member_path):
for filepath in files:
Log.debug('Unpacking {0}', filepath)
Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
os.remove(filepath)
def get_files(self, startdate, member, chunk, domain, variables, grid=None):
file_names = list()
if domain == 'seaIce':
domain_abreviattion = 'OI'
else:
domain_abreviattion = domain[0].upper()
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency,
domain)
chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
var_file = list()
for var in variables:
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
var_file.append(os.path.join(var_path,
'{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}-{8}.nc'.format(var, domain_abreviattion,
Javier Vegas-Regidor
committed
self.frequency, self.model, self.experiment_name,
startdate, member_plus,
"{0:04}{1:02}".format(chunk_start.year,
chunk_start.month),
"{0:04}{1:02}".format(chunk_end.year,
chunk_end.month))))
file_names.append(var_file)
return file_names
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
Javier Vegas-Regidor
committed
if domain == 'seaIce':
domain_abreviattion = 'OI'
else:
domain_abreviattion = domain[0].upper()
if not frequency:
frequency = self.frequency
Javier Vegas-Regidor
committed
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
self.institution, self.model, self.experiment_name, 'S' + startdate, frequency,
Javier Vegas-Regidor
committed
domain)
chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}-{8}.nc'.format(var, domain_abreviattion, frequency, self.model,
Javier Vegas-Regidor
committed
self.experiment_name, startdate, member_plus,
"{0:04}{1:02}".format(chunk_start.year,
chunk_start.month),
"{0:04}{1:02}".format(chunk_end.year,
chunk_end.month)))
temp_path = TempFile.get()
shutil.copyfile(filepath, temp_path)
return temp_path
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
rename_var=None, frequency=None, year=None):
Utils.convert2netcdf4(filetosend)
Javier Vegas-Regidor
committed
if domain == 'seaIce':
domain_abreviattion = 'OI'
else:
domain_abreviattion = domain[0].upper()
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
if rename_var:
Utils.rename_variable(filetosend, rename_var, var)
if not frequency:
frequency = self.frequency
Javier Vegas-Regidor
committed
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
self.institution, self.model, self.experiment_name, 'S' + startdate, frequency,
Javier Vegas-Regidor
committed
domain)
if chunk is not None:
chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
chunk_end.month)
elif year is not None:
if frequency is not 'yr':
raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
time_bound = str(year)
else:
raise ValueError('Chunk and year can not be None at the same time')
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}.nc'.format(var, domain_abreviattion, frequency, self.model,
self.experiment_name, startdate, member_plus, time_bound))
Javier Vegas-Regidor
committed
if region:
if not os.path.exists(filepath):
handler = Utils.openCdf(filetosend)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region', fill_value='')
var_region[0] = region
original_var = handler.variables[var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
Utils.rename_variable(filetosend, 'new_var', var)
else:
temp = TempFile.get()
shutil.copyfile(filepath, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(filetosend)
value = handler_send.variables[var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, filetosend)
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region')
Javier Vegas-Regidor
committed
Utils.move_file(filetosend, filepath)
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
chunk_files = list()
for chunk in self.get_year_chunks(startdate, year):
chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))
if len(chunk_files) > 1:
temp = TempFile.get()
Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
for chunk_file in chunk_files:
os.remove(chunk_file)
else:
temp = chunk_files[0]
temp2 = TempFile.get()
handler = Utils.openCdf(temp)
time = Utils.get_datetime_from_netcdf(handler)
handler.close()
start = None
end = None
for x in range(0, len(time)):
date = time[x]
if date.year == year:
if date.month == 1:
start = x
elif date.month == 12:
end = x
Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
os.remove(temp)
return temp2
def get_year_chunks(self, startdate, year):
date = parse_date(startdate)
chunks = list()
for chunk in range(1, self.num_chunks+1):
chunk_start = chunk_start_date(date, chunk, self.chunk_size, 'month', self.calendar)
if chunk_start.year > year:
break
elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
self.calendar).year == year:
return chunks
def get_full_years(self, startdate):
chunks_per_year = 12 / self.chunk_size
date = parse_date(startdate)
first_january = 0
first_year = date.year
if date.month != 1:
month = date.month
first_year += 1
while month + self.chunk_size < 12:
month += self.chunk_size
first_january += 1
years = list()
for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year):
years.append(first_year)
first_year += 1
return years