Newer
Older
Javier Vegas-Regidor
committed
import shutil
# import threading
#
# import netCDF4
import numpy as np
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day
Javier Vegas-Regidor
committed
from earthdiagnostics import Utils, TempFile
def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name):
self.institution = institution
self.model = model
self.expid = expid
self.data_dir = datafolder
self.frequency = frequency
self.chunk_size = chunk_size
self.experiment_name = experiment_name
self.add_startdate = True
self.add_name = True
# noinspection PyPep8Naming
def prepare_CMOR_files(self, startdates, members):
# Check if cmorized and convert if not
if not os.path.exists(os.path.join(self.data_dir, self.expid)):
raise Exception('The experiment {0} is not CMORized. '
'Please, CMORize it and launch again.'.format(self.expid))
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# for startdate in startdates:
# for member in members:
# member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc{0}'.format(member), 'outputs')
# Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))
#
# threads = list()
# numthreads = Utils.available_cpu_count()
# filepaths = glob.glob(os.path.join(member_path, '*.gz'))
# for numthread in range(0, numthreads):
# t = threading.Thread(target=DataManager._unzip,
# args=([filepaths[numthread::numthreads]]))
# threads.append(t)
# t.start()
#
# for t in threads:
# t.join()
#
# filepaths = glob.glob(os.path.join(member_path, '*.tar'))
# for numthread in range(0, numthreads):
# t = threading.Thread(target=DataManager._untar,
# args=(filepaths[numthread::numthreads], member_path))
# threads.append(t)
# t.start()
#
# for t in threads:
# t.join()
#
# if self.experiment_name != self.model:
# bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model)
# for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
# for filename in filenames:
# filepath = os.path.join(dirpath, filename)
# good = filepath.replace('_{0}_output_'.format(self.model),
# '_{0}_{1}_'.format(self.model, self.experiment_name))
#
# good = good.replace('/{0}/{0}'.format(self.model),
# '/{0}/{1}'.format(self.model,
# self.experiment_name))
#
# Utils.move_file(filepath, good)
# os.rmdir(dirpath)
#
# good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name)
# for sdate in os.listdir(good_dir):
# for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
# for filename in filenames:
# filepath = os.path.join(dirpath, filename)
# good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name,sdate),
# '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate))
# if good != filepath:
# Log.info('Moving {0} to {1}'.format(filename, good))
# Utils.move_file(filepath, good)
Javier Vegas-Regidor
committed
@staticmethod
def _unzip(files):
for filepath in files:
Log.debug('Unzipping {0}', filepath)
Utils.execute_shell_command('gunzip {0}'.format(filepath))
@staticmethod
def _untar(files, member_path):
for filepath in files:
Log.debug('Unpacking {0}', filepath)
Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
os.remove(filepath)
def get_files(self, startdate, member, chunk, domain, variables, grid=None):
file_names = list()
if domain == 'seaIce':
domain_abreviattion = 'OI'
else:
domain_abreviattion = domain[0].upper()
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency,
domain)
chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
var_file = list()
for var in variables:
Javier Vegas-Regidor
committed
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
var_file.append(os.path.join(var_path,
'{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}-{8}.nc'.format(var, domain_abreviattion,
Javier Vegas-Regidor
committed
self.frequency, self.model, self.experiment_name,
startdate, member_plus,
"{0:04}{1:02}".format(chunk_start.year,
chunk_start.month),
"{0:04}{1:02}".format(chunk_end.year,
chunk_end.month))))
file_names.append(var_file)
return file_names
Javier Vegas-Regidor
committed
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def get_file(self, domain, var, startdate, member, chunk, grid=None):
if domain == 'seaIce':
domain_abreviattion = 'OI'
else:
domain_abreviattion = domain[0].upper()
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency,
domain)
chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}-{8}.nc'.format(var, domain_abreviattion, self.frequency, self.model,
self.experiment_name, startdate, member_plus,
"{0:04}{1:02}".format(chunk_start.year,
chunk_start.month),
"{0:04}{1:02}".format(chunk_end.year,
chunk_end.month)))
temp_path = TempFile.get()
shutil.copyfile(filepath, temp_path)
return temp_path
def send_file(self, filetosend, domain, var, startdate, member, chunk, grid=None, region=None):
Utils.convert2netcdf4(filetosend)
Javier Vegas-Regidor
committed
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
if domain == 'seaIce':
domain_abreviattion = 'OI'
else:
domain_abreviattion = domain[0].upper()
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency,
domain)
chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
'{7}-{8}.nc'.format(var, domain_abreviattion, self.frequency, self.model,
self.experiment_name, startdate, member_plus,
"{0:04}{1:02}".format(chunk_start.year,
chunk_start.month),
"{0:04}{1:02}".format(chunk_end.year,
chunk_end.month)))
if region:
if not os.path.exists(filepath):
handler = Utils.openCdf(filetosend)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = region
original_var = handler.variables[var]
new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
Utils.rename_variable(filetosend, 'new_var', var)
else:
temp = TempFile.get()
shutil.copyfile(filepath, temp)
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(filetosend)
value = handler_send.variables[var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler_send.variables[var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, filetosend)
Javier Vegas-Regidor
committed
Utils.move_file(filetosend, filepath)
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
#
# set -vx
#
# exp=a034
# dir=/group_workspaces/jasmin2/primavera1/WP2/CPL/EC-EARTH3.1/HIST_T511ORCA025
# res=$( echo $dir | cut -f3 -d"_" )
# model=$(echo $dir | cut -f7 -d"/" )
#
# cd /esnas/exp/ecearth/$exp/
#
# for sd in [1-2]*
# do
# cd /esnas/exp/ecearth/$exp/$sd
# for fc in *
# do
# cd /esnas/exp/ecearth/$exp/$sd/$fc/outputs
# realization=$(( $(echo $fc | cut -c 3-) + 1 ))
# outdir=/esnas/exp/ecearth/$exp/$sd/$fc/outputs/primavera
# mkdir -p $outdir
# for tarfile in MMO*tar
# do
# lt=$(echo $tarfile | cut -f 5-6 -d"_" | cut -f1 -d"." )
# tar kxvf $tarfile -C $outdir
# cd $outdir
# for file in ${exp}_1m_*grid*gz ${exp}_*icemod*gz
# do
# echo $file
# gunzip $file
# for var in $( cdo showvar ${file%???} )
# do
# varnew=""
# case $var in
# votemper) varnew=thetao; realm=Omon ; modeling_realm="ocean" ; units="C";;
# vosaline) varnew=so; realm=Omon ; modeling_realm="ocean" ;;
# sosstsst) varnew=tos; realm=Omon ; modeling_realm="ocean" ; units="C";;
# sosaline) varnew=sos; realm=Omon ; modeling_realm="ocean" ;;
# sossheig) varnew=zos; realm=Omon ; modeling_realm="ocean" ;;
# vozocrtx) varnew=uo; realm=Omon ; modeling_realm="ocean" ;;
# vomecrty) varnew=vo; realm=Omon ; modeling_realm="ocean" ;;
# iiceconc) varnew=sic; realm=OImon ; modeling_realm="seaIce" ;;
# iicethic) varnew=sit; realm=OImon ; modeling_realm="seaIce" ;;
# isnowthi) varnew=snd; realm=OImon ; modeling_realm="seaIce" ;;
# iicesurt) varnew=tsice; realm=OImon ; modeling_realm="seaIce" ; units="C";;
# esac
# outfile=$(echo ${varnew}_${realm}_${model}_${res}_BSC_${realization}_${lt}.nc )
# if [[ ! -z $varnew ]] && [[ ! -f ${outdir}/${outfile} ]]; then #check if var belongs to list
# cdo selvar,$var ${file%???} ${outdir}/$outfile
# ncrename -v .time_counter,time -d time_counter,time -v nav_lon,longitude -v nav_lat,latitude -v .${var},${varnew} ${outdir}/${outfile}
# for att in institution institute_id experiment_id model_id contact experiment frequency creation_date project_id modeling_realm realization
# do
# case $att in
# institution) att_val="Barcelona Supercomputing Center - Centro Nacional de Supercomputacion" ;;
# institute_id) att_val="BSC-CNS" ;;
# experiment_id) att_val=$(echo $dir | cut -f7 -d"/") ;;
# model_id) att_val="$model" ;;
# contact) att_val="pierre-antoine.bretonniere@bsc.es" ;;
# experiment) att_val=$(echo $dir | cut -f7 -d"/") ;;
# frequency) att_val="mon" ;;
# creation_date) att_val="$(echo $(date --rfc-3339=seconds | cut -f1 -d"+" | sed -e 's/ /T/')Z)" ;;
# project_id) att_val="PRIMAVERA" ;;
# modeling_realm) att_val="$modeling_realm" ;;
# realization) att_val="$realization" ;;
# esac
# ncatted -O -h -a ${att},global,c,c,"${att_val}" ${outdir}/$outfile
# if [[ "$units" == "C" ]]; then
# cdo addc,273.25 ${outdir}/$outfile ${outdir}/${outfile}.tmp
# mv ${outdir}/${outfile}.tmp ${outdir}/${outfile}
# ncatted -O -h -a units,${varnew},m,c,"K" ${outdir}/$outfile
# fi #temp from C to K
# done #attributes
# fi
# done #var
# rm ${file%???}
# done #file
# cd /esnas/exp/ecearth/$exp/$sd/$fc/outputs
# done #tarfile
# scp -i ~/.ssh/id_rsa_jasmin ${outdir}/*Omon*nc pabretonniere@jasmin-xfer1.ceda.ac.uk:$dir/.
# scp -i ~/.ssh/id_rsa_jasmin ${outdir}/*OImon*nc pabretonniere@jasmin-xfer1.ceda.ac.uk:$dir/.
# done #fc
# done #sd