Newer
Older
"""Classes to manage cmorized datasets"""
Javier Vegas-Regidor
committed
import glob
Javier Vegas-Regidor
committed
from datetime import datetime
Javier Vegas-Regidor
committed
from earthdiagnostics.datafile import StorageStatus
from earthdiagnostics.diagnostic import Diagnostic
Javier Vegas-Regidor
committed
from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.frequency import Frequencies, Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
Javier Vegas-Regidor
committed
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import VariableType
Javier Vegas-Regidor
committed
class CMORManager(DataManager):
"""
Data manager class for CMORized experiments
Parameters
----------
config: earthdiagnostics.config.Config
Javier Vegas-Regidor
committed
"""
def __init__(self, config):
super(CMORManager, self).__init__(config)
self._dic_cmorized = dict()
self.find_model_data()
self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid)
self.convention = self.config.data_convention
"""
Seek the configured data folders for the experiment data
For each folder, it looks at:
-<folder>/<expid>
-<folder>/<model>/<expid>
-<folder>/<data_type>/<model>/<expid>
Model has any '-' character removed and is passed to lower
"""
data_folders = self.config.data_dir.split(':')
experiment_folder = self.experiment.model.lower()
if experiment_folder.startswith('ec-earth'):
experiment_folder = 'ecearth'
self.config.data_dir = None
for data_folder in data_folders:
if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
self.config.data_dir = data_folder
break
test_folder = os.path.join(data_folder, self.experiment.model.lower().replace('-', ''))
if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
self.config.data_dir = test_folder
Javier Vegas-Regidor
committed
break
test_folder = os.path.join(data_folder, self.config.data_type, experiment_folder)
if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
self.config.data_dir = test_folder
if not self.config.data_dir:
raise Exception('Can not find model data')
def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN, possible_versions=None):
"""
Check if a file exists in the storage
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str or None
box: Box or None
frequency: Frequency or None
vartype: VariableType
possible_versions: iterable od str or None
Returns
-------
bool
"""
cmor_var = self.variable_list.get_variable(var)
filepath = self.convention.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid,
None, None)
if possible_versions is None:
else:
for version in possible_versions:
if os.path.isfile(filepath.replace(self.config.cmor.version, version)):
return True
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None):
Request a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str or None
box: Box or None
frequency: Frequency or None
vartype: VariableType or None
Returns
-------
DataFile
if frequency is None:
frequency = self.config.frequency
cmor_var = self.variable_list.get_variable(var)
var = self._get_final_var_name(box, var)
filepath = self.convention.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid,
None, None)
return self._get_file_from_storage(filepath)
def request_year(self, diagnostic, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
Request a given year for a variavle from a CMOR repository
Parameters
----------
diagnostic: Diagnostic
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str or None
box: Box or None
frequency: Frequency or None
Returns
-------
DataFile
job = MergeYear(self, domain, var, startdate, member, year, grid, box, frequency)
job.request_data()
job.declare_data_generated()
if not job.year_file.job_added:
diagnostic.subjobs.append(job)
job.year_file.job_added = True
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN, diagnostic=None):
"""
Declare a variable chunk to be generated by a diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str or None
region: Basin or None
box: Box or None
frequency: Frequency or None
vartype: VariableType
diagnostic: Diagnostic
Returns
-------
DataFile
cmor_var = self.variable_list.get_variable(var)
if cmor_var:
var = cmor_var.short_name
final_name = self._get_final_var_name(box, var)
filepath = self.convention.get_file_path(startdate, member, domain, final_name, cmor_var, chunk,
frequency, grid)
netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
region, diagnostic, grid, vartype, original_name)
netcdf_file.frequency = frequency
return netcdf_file
def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
Declare a variable year to be generated by a diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str or None
box: Box or None
vartype: VariableType
diagnostic: Diagnostic
Returns
-------
DataFile
"""
original_name = var
cmor_var = self.variable_list.get_variable(var)
if cmor_var:
var = cmor_var.short_name
final_name = self._get_final_var_name(box, var)
filepath = self.convention.get_file_path(startdate, member, domain, final_name, cmor_var, None,
Frequencies.yearly, grid, year=year)
netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
None, diagnostic, grid, vartype, original_name)
netcdf_file.frequency = Frequencies.yearly
return netcdf_file
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
Javier Vegas-Regidor
committed
"""
Create the link of a given file from the CMOR repository.
Javier Vegas-Regidor
committed
Parameters
----------
domain: ModelingRealm
var: str
cmor_var:
startdate: str
member: int
chunk: int or None, optional
grid: str or None, optional
frequency: Frequency or None, optional
year: int or None, optional
date_str: str or None, optional
move_old: bool, optional
vartype: VariableType, optional
Javier Vegas-Regidor
committed
"""
Javier Vegas-Regidor
committed
frequency = self.config.frequency
filepath = self.convention.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency,
grid=grid, year=year, date_str=date_str)
self.convention.create_link(domain, filepath, frequency, var, grid, move_old, vartype)
Javier Vegas-Regidor
committed
"""
Javier Vegas-Regidor
committed
If CMOR data is not created, an automatic cmorization procedure
is launched
Javier Vegas-Regidor
committed
If CMOR data is available but packed, the procedure will unpack it.
"""
# Check if cmorized and convert if not
if self.config.data_convention.name == 'meteofrance':
if self.config.cmor.skip_prepare:
return
Javier Vegas-Regidor
committed
for startdate, member in self.experiment.get_member_list():
self._prepare_member(startdate, member)
def _prepare_member(self, startdate, member):
Log.info('Checking data for startdate {0} member {1}', startdate, member)
if not self.config.cmor.force:
cmorized = False
for chunk in range(1, self.experiment.num_chunks + 1):
if not self.config.cmor.chunk_cmorization_requested(chunk):
Log.debug('Skipping chunk {0}', chunk)
continue
if not self.config.cmor.force_untar:
Log.debug('Checking chunk {0}...', chunk)
for domain in (ModelingRealms.atmos, ModelingRealms.ocean, ModelingRealms.seaIce):
if self.is_cmorized(startdate, member, chunk, domain):
Log.debug('Chunk {0} ready', chunk)
if skip:
continue
if self._unpack_chunk(startdate, member, chunk):
cmorized = True
if cmorized:
Log.info('Startdate {0} member {1} ready', startdate, member)
return
self._cmorize_member(startdate, member)
def is_cmorized(self, startdate, member, chunk, domain):
"""
Check if a chunk domain is cmorized
A cache is maintained so only the first check is costly
Parameters
----------
startdate: str
member: int
chunk: int
domain: ModelingRealm
Returns
-------
bool
"""
identifier = (startdate, member, chunk)
if identifier not in self._dic_cmorized:
self._dic_cmorized[identifier][domain] = self.convention.is_cmorized(startdate, member, chunk, domain)
elif domain not in self._dic_cmorized[identifier]:
self._dic_cmorized[identifier][domain] = self.convention.is_cmorized(startdate, member, chunk, domain)
return self._dic_cmorized[identifier][domain]
def _cmorize_member(self, startdate, member):
start_time = datetime.now()
member_str = self.experiment.get_member_str(member)
Log.info('CMORizing startdate {0} member {1}. Starting at {2}', startdate, member_str, start_time)
cmorizer = Cmorizer(self, startdate, member)
cmorizer.cmorize_ocean()
cmorizer.cmorize_atmos()
Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str,
datetime.now() - start_time)
def _unpack_chunk(self, startdate, member, chunk):
filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz')
if len(filepaths) > 0:
Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk)
Utils.unzip(filepaths, True)
filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar')
if len(filepaths) > 0:
Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk)
Utils.untar(filepaths, os.path.join(self.cmor_path, 'cmorfiles'))
self._correct_paths(startdate)
self.convention.create_links(startdate, member)
return True
return False
def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension):
tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
'cmorfiles')
filepaths = []
for cmor_prefix in ('CMOR?', 'CMOR'):
file_name = '{5}_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate,
self.experiment.get_member_str(member),
self.experiment.get_chunk_start_str(startdate, chunk),
extension, cmor_prefix)
filepaths += glob.glob(os.path.join(tar_path, file_name))
filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
filepaths += glob.glob(os.path.join(tar_original_files, file_name))
filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
return filepaths
Javier Vegas-Regidor
committed
self._remove_extra_output_folder()
Javier Vegas-Regidor
committed
def _fix_model_as_experiment_error(self, startdate):
experiment_name = self.convention.experiment_name(startdate)
if experiment_name != self.experiment.model:
Javier Vegas-Regidor
committed
bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
self.experiment.model)
Log.debug('Correcting double model appearance')
for (dirpath, _, filenames) in os.walk(bad_path, False):
Javier Vegas-Regidor
committed
for filename in filenames:
if '_S{0}_'.format(startdate) in filename:
continue
Javier Vegas-Regidor
committed
filepath = os.path.join(dirpath, filename)
good = filepath
good = good.replace('_{0}_output_'.format(self.experiment.model),
'_{0}_{1}_S{2}_'.format(self.experiment.model,
experiment_name,
Javier Vegas-Regidor
committed
good = good.replace('/{0}/{0}'.format(self.experiment.model),
'/{0}/{1}'.format(self.experiment.model,
experiment_name))
Javier Vegas-Regidor
committed
Utils.move_file(filepath, good)
if self.experiment.model != experiment_name:
Javier Vegas-Regidor
committed
Log.debug('Done')
def _remove_extra_output_folder(self):
bad_path = os.path.join(self.cmor_path, 'output')
if os.path.exists(bad_path):
Log.debug('Moving CMOR files out of the output folder')
Utils.move_tree(bad_path, self.cmor_path)
Log.debug('Done')
class MergeYear(Diagnostic):
"""
Diagnostic to get all the data for a given year and merge it in a file
Parameters
----------
data_manager: DataManager
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str or None, optional
box: Box or None, optional
frequency: Frequency or None, optional
"""
@classmethod
def generate_jobs(cls, diags, options):
"""
Method to generate the required diagnostics from a section of the configuration file
Required by the interface, does nothing as this diagnostic is not meant to be configured in the usal way
"""
def __init__(self, data_manager, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
super(MergeYear, self).__init__(data_manager)
self.chunk_files = []
self.experiment = self.data_manager.experiment
self.domain = domain
self.var = var
self.startdate = startdate
self.member = member
self.year = year
self.grid = grid
self.box = box
self.frequency = frequency
def __str__(self):
return 'Merge year data Variable: {0.domain}:{0.var} Startdate: {0.startdate} Member: {0.member} ' \
'Year: {0.year} Grid: {0.grid} Box: {0.box} Frequency: {0.frequency}'.format(self)
def __eq__(self, other):
return self.domain == other.domain and self.var == other.var and self.startdate == other.startdate and \
self.member == other.member and self.year == other.year and self.grid == other.grid and \
self.box == other.box and self.frequency == other.frequency
def __hash__(self):
return hash(str(self))
"""Request all the data required by the diagnostic"""
for chunk in self.experiment.get_year_chunks(self.startdate, self.year):
self.chunk_files.append(self.request_chunk(self.domain, self.var, self.startdate, self.member, chunk,
grid=self.grid, box=self.box, frequency=self.frequency))
def declare_data_generated(self):
"""Declare all the data generated by the diagnostic"""
self.year_file = self.declare_year(self.domain, self.var, self.startdate, self.member, self.year,
self.year_file.storage_status = StorageStatus.NO_STORE
def compute(self):
temp = self._merge_chunk_files()
temp2 = self._select_data_of_given_year(temp)
def _select_data_of_given_year(self, data_file):
temp2 = TempFile.get()
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
times = Utils.get_datetime_from_netcdf(handler)
x = 0
first_index = None
last_index = None
while x < times.size:
if times[x].year == self.year:
first_index = x
break
else:
x += 1
while x < times.size:
if times[x].year != self.year:
last_index = x
break
else:
x += 1
if last_index is None:
last_index = times.size
Utils.nco.ncks(input=data_file, output=temp2, options=['-d time,{0},{1}'.format(first_index, last_index - 1)])
return temp2
def _merge_chunk_files(self):
temp = TempFile.get()
if len(self.chunk_files) == 1:
Utils.copy_file(self.chunk_files[0].local_file, temp)
return temp
Utils.nco.ncrcat(input=' '.join(self.chunk_files), output=temp)
for chunk_file in self.chunk_files:
os.remove(chunk_file)
return temp