Newer
Older
"""Classes to manage cmorized datasets"""
Javier Vegas-Regidor
committed
import glob
Javier Vegas-Regidor
committed
from datetime import datetime
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, previous_day
Javier Vegas-Regidor
committed
from earthdiagnostics.datafile import StorageStatus
from earthdiagnostics.diagnostic import Diagnostic
Javier Vegas-Regidor
committed
from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.datamanager import DataManager
from earthdiagnostics.frequency import Frequencies, Frequency
from earthdiagnostics.modelingrealm import ModelingRealms
Javier Vegas-Regidor
committed
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import VariableType
Javier Vegas-Regidor
committed
class CMORManager(DataManager):
"""
Data manager class for CMORized experiments
Parameters
----------
config: earthdiagnostics.config.Config
Javier Vegas-Regidor
committed
"""
def __init__(self, config):
super(CMORManager, self).__init__(config)
self._dic_cmorized = dict()
self.find_model_data()
self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid)
def experiment_name(self, startdate):
"""
Get experiment name, appending startdate if needed
Parameters
----------
startdate: str
Returns
-------
str
"""
if self.config.cmor.append_startdate:
return '{}S{}'.format(self.config.experiment.experiment_name, startdate)
else:
return self.config.experiment.experiment_name
"""
Seek the configured data folders for the experiment data
For each folder, it looks at:
-<folder>/<expid>
-<folder>/<model>/<expid>
-<folder>/<data_type>/<model>/<expid>
Model has any '-' character removed and is passed to lower
"""
data_folders = self.config.data_dir.split(':')
experiment_folder = self.experiment.model.lower()
if experiment_folder.startswith('ec-earth'):
experiment_folder = 'ecearth'
self.config.data_dir = None
for data_folder in data_folders:
if os.path.isdir(os.path.join(data_folder, self.experiment.expid)):
self.config.data_dir = data_folder
break
test_folder = os.path.join(data_folder, self.experiment.model.lower().replace('-', ''))
if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
self.config.data_dir = test_folder
Javier Vegas-Regidor
committed
break
test_folder = os.path.join(data_folder, self.config.data_type, experiment_folder)
if os.path.isdir(os.path.join(test_folder, self.experiment.expid)):
self.config.data_dir = test_folder
if not self.config.data_dir:
raise Exception('Can not find model data')
def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN, possible_versions=None):
"""
Check if a file exists in the storage
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str or None
box: Box or None
frequency: Frequency or None
vartype: VariableType
possible_versions: iterable od str or None
Returns
-------
bool
"""
cmor_var = self.variable_list.get_variable(var)
filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None)
if possible_versions is None:
else:
for version in possible_versions:
if os.path.isfile(filepath.replace(self.config.cmor.version, version)):
return True
def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None):
Request a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str or None
box: Box or None
frequency: Frequency or None
vartype: VariableType or None
Returns
-------
DataFile
if frequency is None:
frequency = self.config.frequency
cmor_var = self.variable_list.get_variable(var)
var = self._get_final_var_name(box, var)
filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None)
return self._get_file_from_storage(filepath)
def request_year(self, diagnostic, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
Request a given year for a variavle from a CMOR repository
Parameters
----------
diagnostic: Diagnostic
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str or None
box: Box or None
frequency: Frequency or None
Returns
-------
DataFile
job = MergeYear(self, domain, var, startdate, member, year, grid, box, frequency)
job.request_data()
job.declare_data_generated()
if not job.year_file.job_added:
diagnostic.subjobs.append(job)
job.year_file.job_added = True
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN, diagnostic=None):
"""
Declare a variable chunk to be generated by a diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
chunk: int
grid: str or None
region: Basin or None
box: Box or None
frequency: Frequency or None
vartype: VariableType
diagnostic: Diagnostic
Returns
-------
DataFile
cmor_var = self.variable_list.get_variable(var)
if cmor_var:
var = cmor_var.short_name
final_name = self._get_final_var_name(box, var)
filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, chunk, frequency, grid)
netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
region, diagnostic, grid, vartype, original_name)
netcdf_file.frequency = frequency
return netcdf_file
def declare_year(self, domain, var, startdate, member, year, grid=None, box=None,
Declare a variable year to be generated by a diagnostic
Parameters
----------
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str or None
box: Box or None
vartype: VariableType
diagnostic: Diagnostic
Returns
-------
DataFile
"""
original_name = var
cmor_var = self.variable_list.get_variable(var)
if cmor_var:
var = cmor_var.short_name
final_name = self._get_final_var_name(box, var)
filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, None, Frequencies.yearly, grid,
year=year)
netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention,
None, diagnostic, grid, vartype, original_name)
netcdf_file.frequency = Frequencies.yearly
return netcdf_file
def get_file_path(self, startdate, member, domain, var, cmor_var, chunk, frequency,
grid=None, year=None, date_str=None):
Javier Vegas-Regidor
committed
"""
Parameters
----------
startdate: str
member: int
domain: ModelingRealm
var: str
cmor_var: Variable
chunk: int or None
grid: str or None
year: int or None
date_str: str or None
Returns
-------
str
Raises
------
ValueError
If you provide two or more parameters from chunk, year or date_str or none at all
"""
if frequency is None:
frequency = self.config.frequency
options = sum(x is not None for x in (chunk, year, date_str))
if options == 0:
raise ValueError('You must provide chunk, year or date_str')
elif options > 1:
raise ValueError('You must provide only one parameter in chunk, year or date_str')
Javier Vegas-Regidor
committed
frequency = self.config.frequency
Javier Vegas-Regidor
committed
folder_path = self._get_full_cmor_folder_path(startdate, member, domain, var, frequency, grid, cmor_var)
file_name = self._get_cmor_file_name(startdate, member, domain, var, cmor_var, frequency,
chunk, year, date_str, grid)
Javier Vegas-Regidor
committed
filepath = os.path.join(folder_path, file_name)
return filepath
Javier Vegas-Regidor
committed
def _get_cmor_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ):
if cmor_var is None:
cmor_table = domain.get_table(frequency, self.config.data_convention)
else:
cmor_table = cmor_var.get_table(frequency, self.config.data_convention)
time_bound = self._get_time_component(chunk, date_str, frequency, startdate, year)
time_bound = '_{0}.nc'.format(time_bound)
if self.config.data_convention in ('specs', 'preface'):
file_name = '{0}_{1}_{2}_{3}_S{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model,
self.experiment_name(startdate), startdate,
self._get_member_str(member), time_bound)
elif self.config.data_convention in ('primavera', 'cmip6'):
if not grid:
if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]:
grid = self.config.cmor.default_ocean_grid
else:
grid = self.config.cmor.default_atmos_grid
file_name = '{0}_{1}_{2}_{3}_{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model,
self.experiment_name(startdate),
time_bound = self._get_chunk_time_bounds(startdate, chunk)
file_name = '{0}_{1}_{2}_{3}.nc'.format(var, frequency, time_bound, self._get_member_str(member))
return file_name
def _get_time_component(self, chunk, date_str, frequency, startdate, year):
if chunk is not None:
time_bound = self._get_chunk_time_bounds(startdate, chunk)
elif year:
if frequency != Frequencies.yearly:
raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
time_bound = str(year)
else:
return time_bound
def _get_full_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var):
if self.config.data_convention in ('specs', 'preface'):
folder_path = os.path.join(self._get_startdate_path(startdate), str(frequency), domain.name, var)
if grid:
folder_path = os.path.join(folder_path, grid)
folder_path = os.path.join(folder_path, self._get_member_str(member))
if self.config.cmor.version:
folder_path = os.path.join(folder_path, self.config.cmor.version)
elif self.config.data_convention in ('primavera', 'cmip6'):
if not self.config.cmor.version:
raise ValueError('CMOR version is mandatory for PRIMAVERA and CMIP6')
if not grid:
if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]:
grid = self.config.cmor.default_ocean_grid
else:
grid = self.config.cmor.default_atmos_grid
if cmor_var is None:
table_name = domain.get_table(frequency, self.config.data_convention).name
else:
table_name = cmor_var.get_table(frequency, self.config.data_convention).name
folder_path = os.path.join(self._get_startdate_path(startdate), self._get_member_str(member),
elif self.config.data_convention == 'meteofrance':
folder_path = os.path.join(self.config.data_dir, self.experiment_name(startdate),
'H{0}'.format(chr(64 + int(startdate[4:6]))),
startdate[0:4])
else:
raise ValueError('Data convention {0} not supported'.format(self.config.data_convention))
return folder_path
def _get_chunk_time_bounds(self, startdate, chunk):
start = parse_date(startdate)
chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', self.experiment.calendar)
chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar)
chunk_end = previous_day(chunk_end, self.experiment.calendar)
if self.config.data_convention == 'preface':
separator = '_'
else:
separator = '-'
if self.config.data_convention == 'meteofrance':
time_bound = "{0:04}{1:02}".format(chunk_start.year, chunk_start.month)
else:
time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
return time_bound
Javier Vegas-Regidor
committed
def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None,
frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN):
Javier Vegas-Regidor
committed
"""
Create the link of a given file from the CMOR repository.
Javier Vegas-Regidor
committed
Parameters
----------
domain: ModelingRealm
var: str
cmor_var:
startdate: str
member: int
chunk: int or None, optional
grid: str or None, optional
frequency: Frequency or None, optional
year: int or None, optional
date_str: str or None, optional
move_old: bool, optional
vartype: VariableType, optional
Javier Vegas-Regidor
committed
"""
Javier Vegas-Regidor
committed
frequency = self.config.frequency
filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency,
self.create_link(domain, filepath, frequency, var, grid, move_old, vartype)
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype):
"""
Create file link
Parameters
----------
domain: ModelingRealm
filepath: str
frequency: Frequency
var: str
grid: str
move_old: bool
vartype: VariableType
"""
if self.config.data_convention == 'meteofrance':
return
freq_str = frequency.folder_name(vartype)
if not grid:
grid = 'original'
variable_folder = self.get_varfolder(domain, var)
vargrid_folder = self.get_varfolder(domain, var, grid)
self.lock.acquire()
try:
if grid == 'original':
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
Utils.create_folder_tree(link_path)
else:
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder)
Utils.create_folder_tree(link_path)
default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
vargrid_folder.replace('-{0}_f'.format(grid), '-original_f'))
if os.path.islink(default_path):
os.remove(default_path)
elif os.path.isdir(default_path):
shutil.move(default_path, original_path)
os.symlink(link_path, default_path)
if move_old and link_path not in self._checked_vars:
self._checked_vars.append(link_path)
old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
'old_{0}'.format(os.path.basename(link_path)))
regex = re.compile(var + '_[0-9]{6,8}\.nc')
for filename in os.listdir(link_path):
if regex.match(filename):
Utils.create_folder_tree(old_path)
Utils.move_file(os.path.join(link_path, filename),
os.path.join(old_path, filename))
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
os.remove(link_path)
if not os.path.exists(filepath):
raise ValueError('Original file {0} does not exists'.format(filepath))
relative_path = os.path.relpath(filepath, os.path.dirname(link_path))
os.symlink(relative_path, link_path)
except Exception:
raise
finally:
self.lock.release()
Javier Vegas-Regidor
committed
"""
Javier Vegas-Regidor
committed
If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
will be launched
If CMOR data is available but packed, the procedure will unpack it.
"""
# Check if cmorized and convert if not
if self.config.data_convention == 'meteofrance':
return
Javier Vegas-Regidor
committed
for startdate, member in self.experiment.get_member_list():
Javier Vegas-Regidor
committed
Log.info('Checking data for startdate {0} member {1}', startdate, member)
if not self.config.cmor.force:
cmorized = False
for chunk in range(1, self.experiment.num_chunks + 1):
if not self.config.cmor.chunk_cmorization_requested(chunk):
Log.debug('Skipping chunk {0}', chunk)
continue
if not self.config.cmor.force_untar:
Log.debug('Checking chunk {0}...', chunk)
for domain in (ModelingRealms.atmos, ModelingRealms.ocean, ModelingRealms.seaIce):
if self.is_cmorized(startdate, member, chunk, domain):
Log.debug('Chunk {0} ready', chunk)
continue
if self._unpack_chunk(startdate, member, chunk):
cmorized = True
if cmorized:
Log.info('Startdate {0} member {1} ready', startdate, member)
return
self._cmorize_member(startdate, member)
def is_cmorized(self, startdate, member, chunk, domain):
"""
Check if a chunk domain is cmorized
A cache is maintained so only the first check is costly
Parameters
----------
startdate: str
member: int
chunk: int
domain: ModelingRealm
Returns
-------
bool
"""
identifier = (startdate, member, chunk)
if identifier not in self._dic_cmorized:
self._dic_cmorized[identifier] = {}
self._dic_cmorized[identifier][domain] = self._is_cmorized(startdate, member, chunk, domain)
elif domain not in self._dic_cmorized[identifier]:
self._dic_cmorized[identifier][domain] = self._is_cmorized(startdate, member, chunk, domain)
return self._dic_cmorized[identifier][domain]
def _is_cmorized(self, startdate, member, chunk, domain):
startdate_path = self._get_startdate_path(startdate)
Javier Vegas-Regidor
committed
if not os.path.isdir(startdate_path):
if self.config.data_convention == 'specs':
for freq in os.listdir(startdate_path):
domain_path = os.path.join(startdate_path, freq, domain.name)
count = self._check_var_presence(domain_path, count, startdate, member, domain, chunk, freq)
if count >= self.config.cmor.min_cmorized_vars:
return True
else:
member_path = os.path.join(startdate_path, self._get_member_str(member))
if not os.path.isdir(member_path):
return False
freq = Frequencies.monthly
table = domain.get_table(freq, self.config.data_convention)
table_dir = os.path.join(member_path, table.name)
if not os.path.isdir(table_dir):
return False
count = self._check_var_presence(table_dir, count, startdate, member, domain, chunk, freq)
if count >= self.config.cmor.min_cmorized_vars:
return True
return False
def _check_var_presence(self, folder, current_count, startdate, member, domain, chunk, freq):
for var in os.listdir(folder):
cmor_var = self.variable_list.get_variable(var, True)
var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency=freq)
if os.path.isfile(var_path):
current_count += 1
if current_count >= self.config.cmor.min_cmorized_vars:
break
return current_count
def _cmorize_member(self, startdate, member):
start_time = datetime.now()
member_str = self.experiment.get_member_str(member)
Log.info('CMORizing startdate {0} member {1}. Starting at {2}', startdate, member_str, start_time)
cmorizer = Cmorizer(self, startdate, member)
cmorizer.cmorize_ocean()
cmorizer.cmorize_atmos()
Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str,
datetime.now() - start_time)
def _unpack_chunk(self, startdate, member, chunk):
filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz')
if len(filepaths) > 0:
Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk)
Utils.unzip(filepaths, True)
filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar')
if len(filepaths) > 0:
Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk)
Utils.untar(filepaths, self.cmor_path)
self._correct_paths(startdate)
self.create_links(startdate, member)
return True
return False
def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension):
tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
'cmorfiles')
filepaths = []
for cmor_prefix in ('CMOR?', 'CMOR'):
file_name = '{5}_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate,
self.experiment.get_member_str(member),
self.experiment.get_chunk_start_str(startdate, chunk),
extension, cmor_prefix)
filepaths += glob.glob(os.path.join(tar_path, file_name))
filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
filepaths += glob.glob(os.path.join(tar_original_files, file_name))
filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
return filepaths
Javier Vegas-Regidor
committed
self._remove_extra_output_folder()
Javier Vegas-Regidor
committed
def _fix_model_as_experiment_error(self, startdate):
if self.experiment_name(startdate) != self.experiment.model:
Javier Vegas-Regidor
committed
bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
self.experiment.model)
Log.debug('Correcting double model appearance')
for (dirpath, _, filenames) in os.walk(bad_path, False):
Javier Vegas-Regidor
committed
for filename in filenames:
if '_S{0}_'.format(startdate) in filename:
continue
Javier Vegas-Regidor
committed
filepath = os.path.join(dirpath, filename)
good = filepath
good = good.replace('_{0}_output_'.format(self.experiment.model),
'_{0}_{1}_S{2}_'.format(self.experiment.model,
self.experiment_name(startdate),
Javier Vegas-Regidor
committed
good = good.replace('/{0}/{0}'.format(self.experiment.model),
'/{0}/{1}'.format(self.experiment.model,
self.experiment_name(startdate)))
Javier Vegas-Regidor
committed
Utils.move_file(filepath, good)
if self.experiment.model != self.experiment_name(startdate):
Javier Vegas-Regidor
committed
Log.debug('Done')
def _remove_extra_output_folder(self):
bad_path = os.path.join(self.cmor_path, 'output')
if os.path.exists(bad_path):
Log.debug('Moving CMOR files out of the output folder')
Utils.move_tree(bad_path, self.cmor_path)
Log.debug('Done')
def create_links(self, startdate, member=None):
"""
Create links for a gicen startdate or member
Parameters
----------
startdate: str
member: int or None
Returns
-------
ValueError:
If the data convention is not supported
"""
member_str = self._get_member_str(member)
else:
member_str = None
Log.info('Creating links for CMOR files ({0})', startdate)
Javier Vegas-Regidor
committed
path = self._get_startdate_path(startdate)
if self.config.data_convention.upper() in ('SPECS', 'APPLICATE'):
elif self.config.data_convention.upper() in ('CMIP6', 'PRIMAVERA'):
else:
raise ValueError('Dataset convention {0} not supported for massive '
'link creation'.format(self.config.data_convention))
Log.debug('Links ready')
Javier Vegas-Regidor
committed
for freq in os.listdir(path):
frequency = Frequency.parse(freq)
Javier Vegas-Regidor
committed
for domain in os.listdir(os.path.join(path, freq)):
for var in os.listdir(os.path.join(path, freq, domain)):
for member in os.listdir(os.path.join(path, freq, domain, var)):
if member_str is not None and member_str != member:
continue
Javier Vegas-Regidor
committed
for name in os.listdir(os.path.join(path, freq, domain, var, member)):
filepath = os.path.join(path, freq, domain, var, member, name)
if os.path.isfile(filepath):
self.create_link(domain, filepath, frequency, var, "", False,
vartype=VariableType.MEAN)
Javier Vegas-Regidor
committed
else:
for filename in os.listdir(filepath):
self.create_link(domain, os.path.join(filepath, filename), frequency, var, "",
False, vartype=VariableType.MEAN)
for member in os.listdir(path):
for table in os.listdir(os.path.join(path, member)):
frequency = self.variable_list.tables[table].frequency
domain = None
for var in os.listdir(os.path.join(path, member, table)):
for grid in os.listdir(os.path.join(path, member, table, var)):
if member_str is not None and member_str != member:
continue
for name in os.listdir(os.path.join(path, member, table, var, grid)):
filepath = os.path.join(path, member, table, var, grid, name)
if os.path.isfile(filepath):
self.create_link(domain, filepath, frequency, var, "", False,
vartype=VariableType.MEAN)
else:
for filename in os.listdir(filepath):
cmorfile = os.path.join(filepath, filename)
self.create_link(domain, cmorfile, frequency, var, "",
Javier Vegas-Regidor
committed
def _get_startdate_path(self, startdate):
"""
Return the path to the startdate's CMOR folder
Parameters
----------
startdate: str
Returns
-------
str
Javier Vegas-Regidor
committed
"""
if self.config.data_convention == 'specs':
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
self.experiment.model, self.experiment_name(startdate), 'S' + startdate)
elif self.config.data_convention == 'preface':
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
self.experiment_name(startdate), 'S' + startdate)
else:
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.config.cmor.activity,
self.experiment.institute, self.experiment.model, self.experiment_name(startdate))
def _get_member_str(self, member):
if self.config.data_convention in ('specs', 'preface'):
template = 'r{0}i{1}p1'
elif self.config.data_convention in ('primavera', 'cmip6'):
template = 'r{0}i{1}p1f1'
elif self.config.data_convention == 'meteofrance':
return template.format(member + 1 - self.experiment.member_count_start, self.config.cmor.initialization_number)
class MergeYear(Diagnostic):
"""
Diagnostic to get all the data for a given year and merge it in a file
Parameters
----------
data_manager: DataManager
domain: ModelingRealm
var: str
startdate: str
member: int
year: int
grid: str or None, optional
box: Box or None, optional
frequency: Frequency or None, optional
"""
@classmethod
def generate_jobs(cls, diags, options):
"""
Method to generate the required diagnostics from a section of the configuration file
Required by the interface, does nothing as this diagnostic is not meant to be configured in the usal way
"""
def __init__(self, data_manager, domain, var, startdate, member, year, grid=None, box=None, frequency=None):
super(MergeYear, self).__init__(data_manager)
self.chunk_files = []
self.experiment = self.data_manager.experiment
self.domain = domain
self.var = var
self.startdate = startdate
self.member = member
self.year = year
self.grid = grid
self.box = box
self.frequency = frequency
def __str__(self):
return 'Merge year data Variable: {0.domain}:{0.var} Startdate: {0.startdate} Member: {0.member} ' \
'Year: {0.year} Grid: {0.grid} Box: {0.box} Frequency: {0.frequency}'.format(self)
def __eq__(self, other):
return self.domain == other.domain and self.var == other.var and self.startdate == other.startdate and \
self.member == other.member and self.year == other.year and self.grid == other.grid and \
self.box == other.box and self.frequency == other.frequency
def __hash__(self):
return hash(str(self))
"""Request all the data required by the diagnostic"""
for chunk in self.experiment.get_year_chunks(self.startdate, self.year):
self.chunk_files.append(self.request_chunk(self.domain, self.var, self.startdate, self.member, chunk,
grid=self.grid, box=self.box, frequency=self.frequency))
def declare_data_generated(self):
"""Declare all the data generated by the diagnostic"""
self.year_file = self.declare_year(self.domain, self.var, self.startdate, self.member, self.year,
self.year_file.storage_status = StorageStatus.NO_STORE
def compute(self):
temp = self._merge_chunk_files()
temp2 = self._select_data_of_given_year(temp)
def _select_data_of_given_year(self, data_file):
temp2 = TempFile.get()
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
times = Utils.get_datetime_from_netcdf(handler)
x = 0
first_index = None
last_index = None
while x < times.size:
if times[x].year == self.year:
first_index = x
break
else:
x += 1
while x < times.size:
if times[x].year != self.year:
last_index = x
break
else:
x += 1
if last_index is None:
last_index = times.size
Utils.nco.ncks(input=data_file, output=temp2, options=['-d time,{0},{1}'.format(first_index, last_index - 1)])
return temp2
def _merge_chunk_files(self):
temp = TempFile.get()
if len(self.chunk_files) == 1:
Utils.copy_file(self.chunk_files[0].local_file, temp)
return temp
Utils.nco.ncrcat(input=' '.join(self.chunk_files), output=temp)
for chunk_file in self.chunk_files:
os.remove(chunk_file)
return temp