Newer
Older
Javier Vegas-Regidor
committed
import glob
from datetime import datetime
import os
from bscearth.utils.log import Log
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, previous_day
Javier Vegas-Regidor
committed
from datafile import StorageStatus
from diagnostic import Diagnostic
Javier Vegas-Regidor
committed
from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.frequency import Frequencies, Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
Javier Vegas-Regidor
committed
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable_type import VariableType
Javier Vegas-Regidor
committed
class CMORManager(DataManager):
"""
Data manager class for CMORized experiments
"""
def __init__(self, config):
super(CMORManager, self).__init__(config)
self._dic_cmorized = dict()
data_folders = self.config.data_dir.split(':')
experiment_folder = self.experiment.model.lower()
if experiment_folder.startswith('ec-earth'):
experiment_folder = 'ecearth'
self.config.data_dir = None
for data_folder in data_folders:
if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
self.config.data_dir = data_folder
break
test_folder = os.path.join(data_folder, self.experiment.model.lower().replace('-', ''))
if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
self.config.data_dir = test_folder
Javier Vegas-Regidor
committed
break
test_folder = os.path.join(data_folder, self.config.data_type, experiment_folder)
if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
self.config.data_dir = test_folder
if not self.config.data_dir:
raise Exception('Can not find model data')
self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN, possible_versions=None):
cmor_var = self.variable_list.get_variable(var)
filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None)
if possible_versions is None:
try:
return os.path.isfile(filepath)
except Exception:
return False
else:
for version in possible_versions:
try:
if os.path.isfile(filepath.replace(self.config.cmor.version, version)):
return True
except Exception:
pass
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str|NoneType
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency|NoneType
:return: path to the copy created on the scratch folder
:rtype: str
"""
cmor_var = self.variable_list.get_variable(var)
var = self._get_final_var_name(box, var)
filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None)
return self._get_file_from_storage(filepath)
def request_year(self, diagnostic, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str|NoneType
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency|NoneType
:return: path to the copy created on the scratch folder
:rtype: str
"""
job = MergeYear(self, domain, var, startdate, member, year, grid, box, frequency)
job.request_data()
job.declare_data_generated()
if not job.year_file.job_added:
diagnostic.subjobs.append(job)
job.year_file.job_added = True
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN, diagnostic=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str|NoneType
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency|NoneType
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
if not frequency:
frequency = self.config.frequency
cmor_var = self.variable_list.get_variable(var)
if cmor_var:
var = cmor_var.short_name
final_name = self._get_final_var_name(box, var)
filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, chunk, frequency, grid)
netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
region, diagnostic, grid, vartype, original_name)
netcdf_file.frequency = frequency
return netcdf_file
def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
:param domain: CMOR domain
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str|NoneType
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
original_name = var
cmor_var = self.variable_list.get_variable(var)
if cmor_var:
var = cmor_var.short_name
final_name = self._get_final_var_name(box, var)
filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, None, Frequencies.yearly, grid,
year=year)
netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
None, diagnostic, grid, vartype, original_name)
netcdf_file.frequency = Frequencies.yearly
return netcdf_file
def get_file_path(self, startdate, member, domain, var, cmor_var, chunk, frequency,
grid=None, year=None, date_str=None):
Javier Vegas-Regidor
committed
"""
Returns the path to a concrete file
Javier Vegas-Regidor
committed
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param domain: file's domain
Javier Vegas-Regidor
committed
:type domain: Domain
Javier Vegas-Regidor
committed
:param var: file's var
Javier Vegas-Regidor
committed
:param chunk: file's chunk
Javier Vegas-Regidor
committed
:param frequency: file's frequency
:type frequency: Frequency
Javier Vegas-Regidor
committed
:param grid: file's grid
Javier Vegas-Regidor
committed
:param year: file's year
:type year: int|str|NoneType
Javier Vegas-Regidor
committed
:param date_str: date string to add directly. Overrides year or chunk configurations
:type date_str: str|NoneType
Javier Vegas-Regidor
committed
:return: path to the file
:param cmor_var: variable instance describing the selected variable
:type cmor_var: Variable
Javier Vegas-Regidor
committed
"""
if not frequency:
frequency = self.config.frequency
folder_path = self._get_full_cmor_folder_path(startdate, member, domain, var, frequency, grid, cmor_var)
file_name = self._get_cmor_file_name(startdate, member, domain, var, cmor_var, frequency,
chunk, year, date_str, grid)
Javier Vegas-Regidor
committed
filepath = os.path.join(folder_path, file_name)
return filepath
Javier Vegas-Regidor
committed
def _get_cmor_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ):
if cmor_var is None:
cmor_table = domain.get_table(frequency, self.config.data_convention)
else:
cmor_table = cmor_var.get_table(frequency, self.config.data_convention)
if chunk is not None:
time_bound = self._get_chunk_time_bounds(startdate, chunk)
Javier Vegas-Regidor
committed
elif year:
if frequency != Frequencies.yearly:
Javier Vegas-Regidor
committed
raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
time_bound = str(year)
elif date_str:
time_bound = date_str
else:
raise ValueError('Chunk, year and date_str can not be None at the same time')
if time_bound:
time_bound = '_{0}.nc'.format(time_bound)
else:
time_bound = '.nc'
if self.config.data_convention in ('specs', 'preface'):
file_name = '{0}_{1}_{2}_{3}_S{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model,
self.experiment.experiment_name, startdate,
self._get_member_str(member), time_bound)
elif self.config.data_convention in ('primavera', 'cmip6'):
if not grid:
if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]:
grid = self.config.cmor.default_ocean_grid
else:
grid = self.config.cmor.default_atmos_grid
file_name = '{0}_{1}_{2}_{3}_{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.experiment_name,
self.experiment.model, self._get_member_str(member),
grid, time_bound)
else:
raise Exception('Data convention {0} not supported'.format(self.config.data_convention))
return file_name
def _get_full_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var):
if self.config.data_convention in ('specs', 'preface'):
folder_path = os.path.join(self._get_startdate_path(startdate), str(frequency), domain.name, var)
if grid:
folder_path = os.path.join(folder_path, grid)
folder_path = os.path.join(folder_path, self._get_member_str(member))
if self.config.cmor.version:
folder_path = os.path.join(folder_path, self.config.cmor.version)
else:
if not grid:
if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]:
grid = self.config.cmor.default_ocean_grid
else:
grid = self.config.cmor.default_atmos_grid
folder_path = os.path.join(self._get_startdate_path(startdate), self._get_member_str(member),
cmor_var.get_table(frequency, self.config.data_convention).name, var,
return folder_path
def _get_chunk_time_bounds(self, startdate, chunk):
start = parse_date(startdate)
chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar)
chunk_end = previous_day(chunk_end, self.experiment.calendar)
if self.config.data_convention == 'preface':
separator = '_'
else:
separator = '-'
time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
return time_bound
Javier Vegas-Regidor
committed
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
Javier Vegas-Regidor
committed
"""
Creates the link of a given file from the CMOR repository.
Javier Vegas-Regidor
committed
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
Javier Vegas-Regidor
committed
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
Javier Vegas-Regidor
committed
:return: path to the copy created on the scratch folder
:rtype: str
:param cmor_var: variable instance describing the selected variable
:type cmor_var: Variable
Javier Vegas-Regidor
committed
"""
if not frequency:
frequency = self.config.frequency
filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency,
grid=grid, year=str(year), date_str=date_str)
self.create_link(domain, filepath, frequency, var, grid, move_old, vartype)
Javier Vegas-Regidor
committed
# noinspection PyPep8Naming
Javier Vegas-Regidor
committed
"""
Prepares the data to be used by the diagnostic.
If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
will be launched
If CMOR data is available but packed, the procedure will unpack it.
:return:
"""
# Check if cmorized and convert if not
for startdate, member in self.experiment.get_member_list():
Javier Vegas-Regidor
committed
if not self._unpack_cmor_files(startdate, member):
self._cmorize_member(startdate, member)
def is_cmorized(self, startdate, member, chunk, domain):
identifier = (startdate, member, chunk)
if identifier not in self._dic_cmorized:
self._dic_cmorized[identifier] = {}
self._dic_cmorized[identifier][domain] = self._is_cmorized(startdate, member, chunk, domain)
elif domain not in self._dic_cmorized[identifier]:
self._dic_cmorized[identifier][domain] = self._is_cmorized(startdate, member, chunk, domain)
return self._dic_cmorized[identifier][domain]
def _is_cmorized(self, startdate, member, chunk, domain):
startdate_path = self._get_startdate_path(startdate)
Javier Vegas-Regidor
committed
if not os.path.isdir(startdate_path):
if self.config.data_convention == 'specs':
for freq in os.listdir(startdate_path):
domain_path = os.path.join(startdate_path, freq,
domain.name)
if os.path.isdir(domain_path):
for var in os.listdir(domain_path):
cmor_var = self.variable_list.get_variable(var, True)
var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk,
Frequency(freq))
if os.path.isfile(var_path):
return True
else:
member_path = os.path.join(startdate_path, self._get_member_str(member))
if not os.path.isdir(member_path):
return False
freq = Frequencies.monthly
table = domain.get_table(freq, self.config.data_convention)
table_dir = os.path.join(member_path, table)
if not os.path.isdir(table_dir):
return False
for var in os.listdir(table_dir):
cmor_var = self.variable_list.get_variable(var, True)
var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency=freq)
if os.path.isfile(var_path):
return True
return False
def _cmorize_member(self, startdate, member):
start_time = datetime.now()
member_str = self.experiment.get_member_str(member)
Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time)
cmorizer = Cmorizer(self, startdate, member)
cmorizer.cmorize_ocean()
cmorizer.cmorize_atmos()
Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str,
datetime.now() - start_time)
def _unpack_cmor_files(self, startdate, member):
if self.config.cmor.force:
return False
chunk = 1
cmorized = False
if not self.config.cmor.force_untar:
while self.is_cmorized(startdate, member, chunk, ModelingRealms.atmos) or \
self.is_cmorized(startdate, member, chunk, ModelingRealms.ocean):
chunk += 1
while self._unpack_chunk(startdate, member, chunk):
chunk += 1
cmorized = True
if self.experiment.num_chunks <= chunk:
cmorized = True
if cmorized:
Log.info('Startdate {0} member {1} ready', startdate, member)
return cmorized
def _unpack_chunk(self, startdate, member, chunk):
filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz')
if len(filepaths) > 0:
if self.config.cmor.chunk_cmorization_requested(chunk):
Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk)
Utils.unzip(filepaths, True)
else:
return True
if not os.path.exists(self.cmor_path):
os.mkdir(self.cmor_path)
filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar')
if len(filepaths) > 0:
if self.config.cmor.chunk_cmorization_requested(chunk):
Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk)
Utils.untar(filepaths, self.cmor_path)
self.create_links(startdate, member)
return True
return False
def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension):
tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
'cmorfiles')
file_name = 'CMOR?_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate,
self.experiment.get_member_str(member),
self.experiment.get_chunk_start_str(startdate, chunk),
extension)
filepaths = glob.glob(os.path.join(tar_path, file_name))
filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
filepaths += glob.glob(os.path.join(tar_original_files, file_name))
filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
return filepaths
Javier Vegas-Regidor
committed
self._remove_extra_output_folder()
Javier Vegas-Regidor
committed
def _fix_model_as_experiment_error(self, startdate):
Javier Vegas-Regidor
committed
if self.experiment.experiment_name != self.experiment.model:
bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
self.experiment.model)
Log.debug('Correcting double model appearance')
for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
for filename in filenames:
if '_S{0}_'.format(startdate) in filename:
continue
Javier Vegas-Regidor
committed
filepath = os.path.join(dirpath, filename)
good = filepath
good = good.replace('_{0}_output_'.format(self.experiment.model),
'_{0}_{1}_S{2}_'.format(self.experiment.model,
self.experiment.experiment_name,
startdate))
Javier Vegas-Regidor
committed
good = good.replace('/{0}/{0}'.format(self.experiment.model),
'/{0}/{1}'.format(self.experiment.model,
self.experiment.experiment_name))
Utils.move_file(filepath, good)
if self.experiment.model != self.experiment.experiment_name:
os.rmdir(dirpath)
Javier Vegas-Regidor
committed
Log.debug('Done')
def _remove_extra_output_folder(self):
bad_path = os.path.join(self.cmor_path, 'output')
if os.path.exists(bad_path):
Log.debug('Moving CMOR files out of the output folder')
Utils.move_tree(bad_path, self.cmor_path)
Log.debug('Done')
def create_links(self, startdate, member=None):
member_str = self._get_member_str(member)
else:
member_str = None
Log.info('Creating links for CMOR files ({0})', startdate)
Javier Vegas-Regidor
committed
path = self._get_startdate_path(startdate)
for freq in os.listdir(path):
frequency = Frequency.parse(freq)
Javier Vegas-Regidor
committed
for domain in os.listdir(os.path.join(path, freq)):
for var in os.listdir(os.path.join(path, freq, domain)):
for member in os.listdir(os.path.join(path, freq, domain, var)):
if member_str != member:
continue
Javier Vegas-Regidor
committed
for name in os.listdir(os.path.join(path, freq, domain, var, member)):
filepath = os.path.join(path, freq, domain, var, member, name)
if os.path.isfile(filepath):
self.create_link(domain, filepath, frequency, var, "", False,
vartype=VariableType.MEAN)
Javier Vegas-Regidor
committed
else:
for filename in os.listdir(filepath):
self.create_link(domain, os.path.join(filepath, filename), frequency, var, "",
False, vartype=VariableType.MEAN)
Javier Vegas-Regidor
committed
def _get_startdate_path(self, startdate):
"""
Returns the path to the startdate's CMOR folder
:param startdate: target startdate
:type startdate: str
:return: path to the startdate's CMOR º
Javier Vegas-Regidor
committed
:rtype: str
"""
if self.config.data_convention == 'specs':
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
self.experiment.model, self.experiment.experiment_name, 'S' + startdate)
elif self.config.data_convention == 'preface':
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
self.experiment.experiment_name, 'S' + startdate)
else:
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.config.cmor.activity,
self.experiment.institute, self.experiment.model, self.experiment.experiment_name)
def _get_member_str(self, member):
if self.config.data_convention in ('specs', 'preface'):
template = 'r{0}i{1}p1'
elif self.config.data_convention in ('primavera', 'cmip6'):
template = 'r{0}i{1}p1f1'
else:
raise Exception('Data convention {0} not supported'.format(self.config.data_convention))
return template.format(member + 1 - self.experiment.member_count_start, self.config.cmor.initialization_number)
class MergeYear(Diagnostic):
@classmethod
def generate_jobs(cls, diags, options):
pass
def __init__(self, data_manager, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
super(MergeYear, self).__init__(data_manager)
self.chunk_files = []
self.experiment = self.data_manager.experiment
self.domain = domain
self.var = var
self.startdate = startdate
self.member = member
self.year = year
self.grid = grid
self.box = box
self.frequency = frequency
def request_data(self):
for chunk in self.experiment.get_year_chunks(self.startdate, self.year):
self.chunk_files.append(self.request_chunk(self.domain, self.var, self.startdate, self.member, chunk,
grid=self.grid, box=self.box, frequency=self.frequency))
def declare_data_generated(self):
self.year_file = self.declare_year(self.domain, self.var, self.startdate, self.member, self.year,
self.year_file.storage_status = StorageStatus.NO_STORE
def compute(self):
temp = self._merge_chunk_files()
temp2 = self._select_data_of_given_year(temp)
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def _select_data_of_given_year(self, data_file):
temp2 = TempFile.get()
handler = Utils.openCdf(data_file)
times = Utils.get_datetime_from_netcdf(handler)
x = 0
first_index = None
last_index = None
while x < times.size:
if times[x].year == self.year:
first_index = x
break
else:
x += 1
while x < times.size:
if times[x].year != self.year:
last_index = x
break
else:
x += 1
if last_index is None:
last_index = times.size
Utils.nco.ncks(input=data_file, output=temp2, options=['-d time,{0},{1}'.format(first_index, last_index - 1)])
return temp2
def _merge_chunk_files(self):
temp = TempFile.get()
if len(self.chunk_files) == 1:
Utils.copy_file(self.chunk_files[0].local_file, temp)
return temp
Utils.nco.ncrcat(input=' '.join(self.chunk_files), output=temp)
for chunk_file in self.chunk_files:
os.remove(chunk_file)
return temp
def __str__(self):
return 'Create year CMOR file Startdate: {0.startdate} Member: {0.member} Year: {0.year} ' \
'Variable: {0.domain}:{0.var} Grid: {0.grid} Box: {0.box}'.format(self)
def __eq__(self, other):
return self.startdate == other.startdate and self.member == other.member and self.year == other.year and\
self.domain == other.domain and self.var == other.var and self.grid == other.grid and \