Newer
Older
Javier Vegas-Regidor
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
import glob
from datetime import datetime
import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day
from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.utils import TempFile, Utils
class CMORManager(DataManager):
"""
Data manager class for CMORized experiments
"""
def get_file_path(self, startdate, member, domain, var, chunk, frequency,
box=None, grid=None, year=None, date_str=None):
"""
Returns the path to a concrete file
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param domain: file's domain
:type domain: str
:param var: file's var
:type var: str
:param chunk: file's chunk
:type chunk: int
:param frequency: file's frequency
:type frequency: str
:param box: file's box
:type box: Box
:param grid: file's grid
:type grid: str
:param year: file's year
:type year: int
:param date_str: date string to add directly. Overrides year or chunk configurations
:type date_str: str
:return: path to the file
:rtype: str
"""
if not frequency:
frequency = self.config.frequency
var = self._get_final_var_name(box, var)
domain_abreviattion = self.get_domain_abbreviation(domain, frequency)
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self._get_startdate_path(startdate), frequency, domain)
if chunk is not None:
chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
chunk_end.month)
elif year:
if frequency is not 'yr':
raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
time_bound = str(year)
elif date_str:
time_bound = date_str
else:
raise ValueError('Chunk, year and date_str can not be None at the same time')
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_'
'{6}.nc'.format(var, domain_abreviattion, self.experiment.model,
self.experiment.experiment_name,
startdate, member_plus, time_bound))
return filepath
def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
frequency=None, year=None, date_str=None, move_old=False):
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
var = self._get_final_var_name(box, var)
if not frequency:
frequency = self.config.frequency
domain = DataManager.correct_domain(domain)
filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, grid, year, date_str)
self._create_link(domain, filepath, frequency, var, grid, move_old)
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
"""
Ge a file containing all the data for one year for one variable
:param domain: variable's domain
:type domain: str
:param var: variable's name
:type var: str
:param startdate: startdate to retrieve
:type startdate: str
:param member: member to retrieve
:type member: int
:param year: year to retrieve
:type year: int
:param grid: variable's grid
:type grid: str
:param box: variable's box
:type box: Box
:return:
"""
chunk_files = list()
for chunk in self.experiment.get_year_chunks(startdate, year):
chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))
if len(chunk_files) > 1:
temp = TempFile.get()
Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
for chunk_file in chunk_files:
os.remove(chunk_file)
else:
temp = chunk_files[0]
temp2 = TempFile.get()
handler = Utils.openCdf(temp)
time = Utils.get_datetime_from_netcdf(handler)
handler.close()
start = None
end = None
for x in range(0, len(time)):
date = time[x]
if date.year == year:
if date.month == 1:
start = x
elif date.month == 12:
end = x
Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
os.remove(temp)
return temp2
def _is_cmorized(self, startdate, member):
startdate_path = self._get_startdate_path(startdate)
if not os.path.exists(startdate_path):
return False
for freq in os.listdir(startdate_path):
freq_path = os.path.join(startdate_path, freq)
for domain in os.listdir(freq_path):
domain_path = os.path.join(freq_path, domain)
for var in os.listdir(domain_path):
member_path = os.path.join(domain_path, var, 'r{0}i1p1'.format(member + 1))
if os.path.exists(member_path):
return True
return False
# noinspection PyPep8Naming
def prepare_CMOR_files(self):
"""
Prepares the data to be used by the diagnostic.
If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
will be launched
If CMOR data is available but packed, the procedure will unpack it.
:return:
"""
# Check if cmorized and convert if not
for startdate, member in self.experiment.get_member_list():
if self._is_cmorized(startdate, member) and not self.config.cmor.force:
continue
member_str = self.experiment.get_member_str(member)
if not self.config.cmor.force:
tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
'cmorfiles')
file_name = 'CMOR?_{0}_{1}_*.tar.gz'.format(self.experiment.expid, startdate, member_str)
filepaths = glob.glob(os.path.join(tar_path, file_name))
filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
filepaths += glob.glob(os.path.join(tar_original_files, file_name))
filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
if len(filepaths) > 0:
Log.info('Unzipping cmorized data...')
Utils.unzip(filepaths, True)
if not os.path.exists(self.cmor_path):
os.mkdir(self.cmor_path)
file_name = 'CMOR?_{0}_{1}_*.tar'.format(self.experiment.expid, startdate, member_str)
filepaths = glob.glob(os.path.join(tar_path, file_name))
filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
filepaths += glob.glob(os.path.join(tar_original_files, file_name))
filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
if len(filepaths) > 0:
Log.info('Unpacking cmorized data...')
Utils.untar(filepaths, self.cmor_path)
self._correct_paths(startdate)
self._create_links(startdate)
continue
start_time = datetime.now()
Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time)
cmorizer = Cmorizer(self, startdate, member)
cmorizer.cmorize_ocean()
cmorizer.cmorize_atmos()
Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str,
datetime.now() - start_time)
def _correct_paths(self, startdate):
bad_path = os.path.join(self.cmor_path, 'output', self.experiment.institute)
if os.path.exists(bad_path):
Log.debug('Moving CMOR files out of the output folder')
Utils.execute_shell_command(['mv', bad_path, os.path.join(bad_path, '..', '..')])
os.rmdir(os.path.join(self.cmor_path, 'output'))
Log.debug('Done')
if self.experiment.experiment_name != self.experiment.model:
bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
self.experiment.model)
Log.debug('Correcting double model appearance')
for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_output_'.format(self.experiment.model),
'_{0}_{1}_S{2}_'.format(self.experiment.model,
self.experiment.experiment_name,
startdate))
good = good.replace('/{0}/{0}'.format(self.experiment.model),
'/{0}/{1}'.format(self.experiment.model,
self.experiment.experiment_name))
Utils.move_file(filepath, good)
os.rmdir(dirpath)
Log.debug('Done')
def _create_links(self, startdate):
Log.info('Creating links for CMOR files ()')
path = self._get_startdate_path(startdate)
for freq in os.listdir(path):
for domain in os.listdir(os.path.join(path, freq)):
for var in os.listdir(os.path.join(path, freq, domain)):
for member in os.listdir(os.path.join(path, freq, domain, var)):
for name in os.listdir(os.path.join(path, freq, domain, var, member)):
filepath = os.path.join(path, freq, domain, var, member, name)
if os.path.isfile(filepath):
self._create_link(domain, filepath, freq, var, "", False)
else:
for filename in os.listdir(filepath):
self._create_link(domain, os.path.join(filepath, filename), freq, var, "", False)
Log.info('Creating lings for CMOR files')
def _get_startdate_path(self, startdate):
"""
Returns the path to the startdate's CMOR folder
:param startdate: target startdate
:type startdate: str
:return: path to the startdate's CMOR folder
:rtype: str
"""
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
self.experiment.model, self.experiment.experiment_name, 'S' + startdate)