Newer
Older
Javier Vegas-Regidor
committed
import glob
from datetime import datetime
import os
from bscearth.utils.log import Log
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, previous_day
Javier Vegas-Regidor
committed
from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.datamanager import DataManager, NetCDFFile
from earthdiagnostics.frequency import Frequencies, Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
Javier Vegas-Regidor
committed
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable_type import VariableType
Javier Vegas-Regidor
committed
class CMORManager(DataManager):
"""
Data manager class for CMORized experiments
"""
def __init__(self, config):
super(CMORManager, self).__init__(config)
self._dic_cmorized = dict()
data_folders = self.config.data_dir.split(':')
experiment_folder = self.experiment.model.lower()
if experiment_folder.startswith('ec-earth'):
experiment_folder = 'ecearth'
self.config.data_dir = None
for data_folder in data_folders:
if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
self.config.data_dir = data_folder
break
test_folder = os.path.join(data_folder, self.experiment.model.lower().replace('-', ''))
if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
self.config.data_dir = test_folder
Javier Vegas-Regidor
committed
break
test_folder = os.path.join(data_folder, self.config.data_type, experiment_folder)
if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
self.config.data_dir = test_folder
if not self.config.data_dir:
raise Exception('Can not find model data')
self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN):
cmor_var = self.variable_list.get_variable(var)
filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None)
try:
return os.path.isfile(filepath)
except Exception:
return False
Javier Vegas-Regidor
committed
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str|NoneType
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency|NoneType
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:return: path to the copy created on the scratch folder
:rtype: str
"""
cmor_var = self.variable_list.get_variable(var)
var = self._get_final_var_name(box, var)
filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None)
temp_path = TempFile.get()
Utils.copy_file(filepath, temp_path)
return temp_path
def get_file_path(self, startdate, member, domain, var, cmor_var, chunk, frequency,
grid=None, year=None, date_str=None):
Javier Vegas-Regidor
committed
"""
Returns the path to a concrete file
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param domain: file's domain
Javier Vegas-Regidor
committed
:type domain: Domain
Javier Vegas-Regidor
committed
:param var: file's var
Javier Vegas-Regidor
committed
:param chunk: file's chunk
:type chunk: int
:param frequency: file's frequency
:type frequency: Frequency
Javier Vegas-Regidor
committed
:param grid: file's grid
Javier Vegas-Regidor
committed
:param year: file's year
:type year: int|str|NoneType
Javier Vegas-Regidor
committed
:param date_str: date string to add directly. Overrides year or chunk configurations
:type date_str: str|NoneType
Javier Vegas-Regidor
committed
:return: path to the file
Javier Vegas-Regidor
committed
"""
if not frequency:
frequency = self.config.frequency
folder_path = self._get_full_cmor_folder_path(startdate, member, domain, var, frequency, grid)
file_name = self._get_cmor_file_name(startdate, member, domain, var, cmor_var, frequency,
chunk, year, date_str, grid)
Javier Vegas-Regidor
committed
filepath = os.path.join(folder_path, file_name)
return filepath
Javier Vegas-Regidor
committed
def _get_cmor_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ):
if cmor_var is None:
cmor_table = domain.get_table(frequency, self.config.data_convention)
else:
cmor_table = cmor_var.get_table(frequency, self.config.data_convention)
if chunk is not None:
time_bound = self._get_chunk_time_bounds(startdate, chunk)
Javier Vegas-Regidor
committed
elif year:
if frequency != Frequencies.yearly:
Javier Vegas-Regidor
committed
raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
time_bound = str(year)
elif date_str:
time_bound = date_str
else:
raise ValueError('Chunk, year and date_str can not be None at the same time')
if time_bound:
time_bound = '_{0}.nc'.format(time_bound)
else:
time_bound = '.nc'
if self.config.data_convention == 'specs':
file_name = '{0}_{1}_{2}_{3}_S{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model,
self.experiment.experiment_name, startdate,
self._get_member_str(member), time_bound)
elif self.config.data_convention in ('primavera', 'cmip6'):
grid = '_{0}'.format(grid)
file_name = '{0}_{1}_{2}_{3}_S{4}-{5}{6}{7}'.format(var, cmor_table.name, self.experiment.experiment_name,
self.experiment.model, startdate,
self._get_member_str(member), grid, time_bound)
else:
raise Exception('Data convention {0} not supported'.format(self.config.data_convention))
return file_name
def _get_full_cmor_folder_path(self, startdate, member, domain, var, frequency, grid):
folder_path = os.path.join(self._get_startdate_path(startdate), str(frequency), domain.name, var)
Javier Vegas-Regidor
committed
if grid:
folder_path = os.path.join(folder_path, grid)
folder_path = os.path.join(folder_path, self._get_member_str(member))
return folder_path
def _get_chunk_time_bounds(self, startdate, chunk):
start = parse_date(startdate)
chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar)
chunk_end = previous_day(chunk_end, self.experiment.calendar)
time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
chunk_end.month)
return time_bound
Javier Vegas-Regidor
committed
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
Javier Vegas-Regidor
committed
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
Javier Vegas-Regidor
committed
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
Javier Vegas-Regidor
committed
:return: path to the copy created on the scratch folder
:rtype: str
"""
if not frequency:
frequency = self.config.frequency
filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency,
grid=grid, year=str(year), date_str=date_str)
Javier Vegas-Regidor
committed
self._create_link(domain, filepath, frequency, var, grid, move_old, vartype)
Javier Vegas-Regidor
committed
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None,
box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False,
diagnostic=None, cmorized=False, vartype=VariableType.MEAN):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
:param move_old: if true, moves files following older conventions that may be found on the links folder
:type move_old: bool
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
Javier Vegas-Regidor
committed
:type domain: Domain
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency
Javier Vegas-Regidor
committed
:param diagnostic: diagnostic used to generate the file
:type diagnostic: Diagnostic
:param cmorized: flag to indicate if file was generated in cmorization process
:type cmorized: bool
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
if rename_var:
original_name = rename_var
else:
original_name = var
cmor_var = self.variable_list.get_variable(var)
final_name = self._get_final_var_name(box, var)
if final_name != original_name:
Utils.rename_variable(filetosend, original_name, final_name)
if not frequency:
frequency = self.config.frequency
filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, chunk, frequency, grid, year,
date_str)
netcdf_file = NetCDFFile(filepath, filetosend, domain, final_name, cmor_var, self.config.data_convention,
region)
if diagnostic:
netcdf_file.add_diagnostic_history(diagnostic)
elif cmorized:
netcdf_file.add_cmorization_history()
else:
raise ValueError('You must provide a diagnostic or set cmorized to true to store data '
'using the CMORManager')
self._create_link(domain, filepath, frequency, final_name, grid, move_old, vartype)
Javier Vegas-Regidor
committed
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
"""
Ge a file containing all the data for one year for one variable
:param domain: variable's domain
:type domain: str
:param var: variable's name
:type var: str
:param startdate: startdate to retrieve
:type startdate: str
:param member: member to retrieve
:type member: int
:param year: year to retrieve
:type year: int
:param grid: variable's grid
:type grid: str
:param box: variable's box
:type box: Box
:return:
"""
chunk_files = list()
for chunk in self.experiment.get_year_chunks(startdate, year):
chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))
if len(chunk_files) > 1:
temp = self._merge_chunk_files(chunk_files)
Javier Vegas-Regidor
committed
else:
temp = chunk_files[0]
temp2 = self._select_data_of_given_year(temp, year)
os.remove(temp)
return temp2
@staticmethod
def _select_data_of_given_year(data_file, year):
Javier Vegas-Regidor
committed
temp2 = TempFile.get()
Javier Vegas-Regidor
committed
handler = Utils.openCdf(data_file)
times = Utils.get_datetime_from_netcdf(handler)
x = 0
first_index = None
last_index = None
while x < times.size:
if times[x].year == year:
first_index = x
break
else:
x += 1
while x < times.size:
if times[x].year != year:
last_index = x
break
else:
x += 1
if last_index is None:
last_index = times.size
Utils.nco.ncks(input=data_file, output=temp2, options=['-d time,{0},{1}'.format(first_index, last_index - 1)])
Javier Vegas-Regidor
committed
return temp2
@staticmethod
def _merge_chunk_files(chunk_files):
temp = TempFile.get()
Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
for chunk_file in chunk_files:
os.remove(chunk_file)
return temp
Javier Vegas-Regidor
committed
# noinspection PyPep8Naming
Javier Vegas-Regidor
committed
"""
Prepares the data to be used by the diagnostic.
If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
will be launched
If CMOR data is available but packed, the procedure will unpack it.
:return:
"""
# Check if cmorized and convert if not
for startdate, member in self.experiment.get_member_list():
Javier Vegas-Regidor
committed
if not self._unpack_cmor_files(startdate, member):
self._cmorize_member(startdate, member)
Javier Vegas-Regidor
committed
def is_cmorized(self, startdate, member, chunk, domain):
identifier = (startdate, member, chunk, domain.name)
if identifier not in self._dic_cmorized:
self._dic_cmorized[identifier] = self._is_cmorized(startdate, member, chunk, domain)
return self._dic_cmorized[identifier]
Javier Vegas-Regidor
committed
def _is_cmorized(self, startdate, member, chunk, domain):
startdate_path = self._get_startdate_path(startdate)
Javier Vegas-Regidor
committed
if not os.path.isdir(startdate_path):
return False
for freq in os.listdir(startdate_path):
Javier Vegas-Regidor
committed
domain_path = os.path.join(startdate_path, freq,
domain.name)
if os.path.isdir(domain_path):
for var in os.listdir(domain_path):
cmor_var = self.variable_list.get_variable(var, True)
var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, Frequency(freq))
Javier Vegas-Regidor
committed
if os.path.isfile(var_path):
return True
return False
def _cmorize_member(self, startdate, member):
start_time = datetime.now()
member_str = self.experiment.get_member_str(member)
Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time)
cmorizer = Cmorizer(self, startdate, member)
cmorizer.cmorize_ocean()
cmorizer.cmorize_atmos()
Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str,
datetime.now() - start_time)
def _unpack_cmor_files(self, startdate, member):
if self.config.cmor.force:
return False
chunk = 1
cmorized = False
if not self.config.cmor.force_untar:
Javier Vegas-Regidor
committed
while self.is_cmorized(startdate, member, chunk, ModelingRealms.ocean) or\
self.is_cmorized(startdate, member, chunk, ModelingRealms.atmos):
chunk += 1
while self._unpack_chunk(startdate, member, chunk):
chunk += 1
cmorized = True
return cmorized
def _unpack_chunk(self, startdate, member, chunk):
filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz')
if len(filepaths) > 0:
if self.config.cmor.chunk_cmorization_requested(chunk):
Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk)
Utils.unzip(filepaths, True)
else:
return True
if not os.path.exists(self.cmor_path):
os.mkdir(self.cmor_path)
filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar')
if len(filepaths) > 0:
if self.config.cmor.chunk_cmorization_requested(chunk):
Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk)
Utils.untar(filepaths, self.cmor_path)
self._correct_paths(startdate, member)
self.create_links(startdate, member)
return True
return False
def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension):
tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
'cmorfiles')
file_name = 'CMOR?_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate,
self.experiment.get_member_str(member),
self.experiment.get_chunk_start_str(startdate, chunk),
extension)
filepaths = glob.glob(os.path.join(tar_path, file_name))
filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
filepaths += glob.glob(os.path.join(tar_original_files, file_name))
filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
return filepaths
Javier Vegas-Regidor
committed
def _correct_paths(self, startdate, member):
self._remove_extra_output_folder()
self._fix_model_as_experiment_error(startdate, member)
Javier Vegas-Regidor
committed
def _fix_model_as_experiment_error(self, startdate, member):
Javier Vegas-Regidor
committed
if self.experiment.experiment_name != self.experiment.model:
bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
self.experiment.model)
Log.debug('Correcting double model appearance')
for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
for filename in filenames:
if '_S{0}_'.format(startdate) in filename:
continue
if self._get_member_str(member) in filename:
continue
Javier Vegas-Regidor
committed
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_output_'.format(self.experiment.model),
'_{0}_{1}_S{2}_'.format(self.experiment.model,
self.experiment.experiment_name,
startdate))
good = good.replace('/{0}/{0}'.format(self.experiment.model),
'/{0}/{1}'.format(self.experiment.model,
self.experiment.experiment_name))
Utils.move_file(filepath, good)
os.rmdir(dirpath)
Log.debug('Done')
def _remove_extra_output_folder(self):
bad_path = os.path.join(self.cmor_path, 'output')
if os.path.exists(bad_path):
Log.debug('Moving CMOR files out of the output folder')
Utils.move_tree(bad_path, self.cmor_path)
Log.debug('Done')
def create_links(self, startdate, member=None):
if member:
member_str = self._get_member_str(member)
else:
member_str = None
Log.info('Creating links for CMOR files ({0})', startdate)
Javier Vegas-Regidor
committed
path = self._get_startdate_path(startdate)
for freq in os.listdir(path):
frequency = Frequency.parse(freq)
Javier Vegas-Regidor
committed
for domain in os.listdir(os.path.join(path, freq)):
for var in os.listdir(os.path.join(path, freq, domain)):
for member in os.listdir(os.path.join(path, freq, domain, var)):
if member_str != member:
continue
Javier Vegas-Regidor
committed
for name in os.listdir(os.path.join(path, freq, domain, var, member)):
filepath = os.path.join(path, freq, domain, var, member, name)
if os.path.isfile(filepath):
self._create_link(domain, filepath, frequency, var, "", False,
vartype=VariableType.MEAN)
Javier Vegas-Regidor
committed
else:
for filename in os.listdir(filepath):
self._create_link(domain, os.path.join(filepath, filename), frequency, var, "",
False, vartype=VariableType.MEAN)
Javier Vegas-Regidor
committed
def _get_startdate_path(self, startdate):
"""
Returns the path to the startdate's CMOR folder
:param startdate: target startdate
:type startdate: str
:return: path to the startdate's CMOR º
Javier Vegas-Regidor
committed
:rtype: str
"""
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
self.experiment.model, self.experiment.experiment_name, 'S' + startdate)
def _get_member_str(self, member):
return 'r{0}i1p1'.format(member + 1 - self.experiment.member_count_start)