Newer
Older
Javier Vegas-Regidor
committed
#!/usr/bin/env python
import argparse
import shutil
Javier Vegas-Regidor
committed
import threading
import netCDF4
import os
from autosubmit.date.chunk_date_lib import *
from datamanager import DataManager
from earthdiagnostics import cdftools
from earthdiagnostics.utils import TempFile
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.ocean import *
from ocean import ConvectionSites, Gyres, Psi, MaxMoc, AreaMoc, Moc, VerticalMean, VerticalMeanMeters, Interpolate, \
AverageSection, CutSection, MixedLayerSaltContent, Siasiesiv
Javier Vegas-Regidor
committed
class Diags(object):
:param config_file: path to the configuration file
:type config_file: str
def __init__(self, config_file):
self._read_config(config_file)
TempFile.scratch_folder = self.scratch_dir
cdftools.path = self.cdftools_path
def _create_dic_variables(self):
self.dic_variables = dict()
self.dic_variables['x'] = 'i'
self.dic_variables['y'] = 'j'
self.dic_variables['z'] = 'lev'
self.dic_variables['nav_lon'] = 'lon'
self.dic_variables['nav_lat'] = 'lat'
self.dic_variables['nav_lev'] = 'lev'
self.dic_variables['time_counter'] = 'time'
self.dic_variables['t'] = 'time'
Log.debug('Using netCDF version {0}', netCDF4.getlibversion())
time = datetime.datetime.now()
Log.info("Starting diagnostics at {0}", time)
if not os.path.exists(self.scratch_dir):
os.makedirs(self.scratch_dir)
os.chdir(self.scratch_dir)
Diagnostic.register(MixedLayerSaltContent, 'mlotstsc')
Diagnostic.register(Siasiesiv, 'siasiesiv')
Diagnostic.register(VerticalMean, 'vertmean')
Diagnostic.register(VerticalMeanMeters, 'vertmeanmeters')
Diagnostic.register(Interpolate, 'interp')
Diagnostic.register(AreaMoc, 'mocarea')
Diagnostic.register(MaxMoc, 'mocmax')
Javier Vegas-Regidor
committed
Diagnostic.register(ConvectionSites, 'convection')
Diagnostic.register(CutSection, 'cutsection')
Diagnostic.register(AverageSection, 'avgsection')
Javier Vegas-Regidor
committed
Diagnostic.register(MixedLayerHeatContent, 'mlotsthc')
Diagnostic.register(HeatContentLayer, 'ohclayer')
Diagnostic.register(HeatContent, 'ohc')
self.data_manager.prepare_CMOR_files(self.startdates, self.members, self.force_CMOR)
# Run diagnostics
Log.info('Running diagnostics')
Log.info("Running {0}", fulldiag)
Javier Vegas-Regidor
committed
diag_options = fulldiag.split(',')
diag_class = Diagnostic.get_diagnostic(diag_options[0])
if diag_class:
for job in diag_class.generate_jobs(self, diag_options):
list_jobs.put(job)
Javier Vegas-Regidor
committed
continue
Javier Vegas-Regidor
committed
Log.error('{0} is not an available diagnostic', diag_options[0])
numthreads = min(Utils.available_cpu_count(), self.max_cores)
Javier Vegas-Regidor
committed
threads = list()
for numthread in range(0, numthreads):
t = threading.Thread(target=Diags._run_jobs, args=(list_jobs, numthread))
Javier Vegas-Regidor
committed
threads.append(t)
t.start()
Javier Vegas-Regidor
committed
TempFile.clean()
finsih_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finsih_time)
Log.result("Time ellapsed: {0}", finsih_time - time)
Javier Vegas-Regidor
committed
@staticmethod
def _run_jobs(queue, numthread):
def _run_job(current_job, retrials=1):
return True
except Exception as ex:
retrials -= 1
while not queue.empty():
try:
job = queue.get(timeout=1)
if _run_job(job):
count += 1
queue.task_done()
except Queue.Empty:
continue
if len(failed_jobs) == 0:
Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count)
else:
Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count,
count + len(failed_jobs))
for job in failed_jobs:
Log.error('Job {0} could not be run', job)
Javier Vegas-Regidor
committed
def _get_commands(self):
Log.debug('Preparing command list')
commands = self.diags.split()
Javier Vegas-Regidor
committed
real_commands = list()
for command in commands:
if command in self._aliases:
added_commands = self._aliases[command]
Log.info('Changing alias {0} for {1}', command, ' '.join(added_commands))
Javier Vegas-Regidor
committed
real_commands.append(add_command)
else:
real_commands.append(command)
Javier Vegas-Regidor
committed
return real_commands
def _prepare_mesh_files(self):
Log.info('Copying mesh files')
self._copy_file(os.path.join(self.con_files, 'mesh_mask_nemo.{0}.nc'.format(self.nemo_version)), 'mesh_hgr.nc')
self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
self._link_file('mesh_hgr.nc', 'mask.nc')
self._copy_file(os.path.join(self.con_files, 'new_maskglo.{0}.nc'.format(self.nemo_version)), 'new_maskglo.nc')
self._copy_file(os.path.join(self.con_files, 'mask.regions.{0}.nc'.format(self.nemo_version)),
'mask_regions.nc')
self._copy_file(os.path.join(self.con_files, 'mask.regions.3d.{0}.nc'.format(self.nemo_version)),
def _copy_file(self, source, destiny):
if not os.path.exists(source):
Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version)
if os.path.exists(destiny):
if os.stat(source).st_size == os.stat(destiny).st_size:
Log.info('File {0} already exists', destiny)
return
Log.info('Creating file {0}', destiny)
shutil.copy(source, destiny)
Log.info('File {0} ready', destiny)
Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True)
def _link_file(self, source, destiny):
if not os.path.exists(source):
Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version)
if os.path.lexists(destiny):
os.remove(destiny)
os.symlink(source, destiny)
Log.info('File {0} ready', destiny)
def _read_config(self, config_file):
self.parser = Parser()
self.parser.optionxform = str
self.parser.read(config_file)
# Read diags config
self.scratch_dir = self.parser.get_option('DIAGNOSTICS', 'SCRATCH_DIR')
self.data_dir = self.parser.get_option('DIAGNOSTICS', 'DATA_DIR')
self.con_files = self.parser.get_option('DIAGNOSTICS', 'CON_FILES')
self.diags = self.parser.get_option('DIAGNOSTICS', 'DIAGS').lower()
self.frequency = self.parser.get_option('DIAGNOSTICS', 'FREQUENCY')
self.cdftools_path = self.parser.get_option('DIAGNOSTICS', 'CDFTOOLS_PATH')
self.max_cores = self.parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 100000)
# Read experiment config
self.institute = self.parser.get_option('EXPERIMENT', 'INSTITUTE')
self.expid = self.parser.get_option('EXPERIMENT', 'EXPID')
self.experiment_name = self.parser.get_option('EXPERIMENT', 'NAME', self.expid)
self.members = list()
for member in self.parser.get_option('EXPERIMENT', 'MEMBERS').split():
self.members.append(int(member))
self.member_digits = self.parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1)
self.startdates = self.parser.get_option('EXPERIMENT', 'STARTDATES').split()
self.chunk_size = self.parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
self.chunks = self.parser.get_int_option('EXPERIMENT', 'CHUNKS')
self.calendar = self.parser.get_option('EXPERIMENT', 'CALENDAR', 'standard')
self.model = self.parser.get_option('EXPERIMENT', 'MODEL')
self.nfrp = self.parser.get_int_option('EXPERIMENT', 'NFRP')
self.nemo_version = self.parser.get_option('EXPERIMENT', 'NEMO_VERSION')
self.add_name = self.parser.get_bool_option('CMOR', 'ADD_NAME')
self.add_startdate = self.parser.get_bool_option('CMOR', 'ADD_STARTDATE')
# Read aliases
self._aliases = dict()
if self.parser.has_section('ALIAS'):
for option in self.parser.options('ALIAS'):
self._aliases[option.lower()] = self.parser.get_option('ALIAS', option).lower().split()
self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.expid)
if not os.path.exists(self.scratch_dir):
os.makedirs(self.scratch_dir)
os.chdir(self.scratch_dir)
self.data_manager = DataManager(self.institute, self.model, self.expid, self.data_dir,
self.frequency, self.chunk_size, self.experiment_name, self.chunks,
self.scratch_dir, self.nfrp, self.member_digits,
self.calendar)
self.data_manager.add_startdate = self.add_startdate
self.data_manager.add_name = self.add_name
self.force_CMOR = self.parser.get_bool_option('CMOR', 'FORCE', False)
Javier Vegas-Regidor
committed
self.data_manager.associated_experiment = self.parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT',
'to be filled')
Javier Vegas-Regidor
committed
self.data_manager.associated_model = self.parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled')
self.data_manager.initialization_description = self.parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION',
'to be filled')
self.data_manager.initialization_method = self.parser.get_option('CMOR', 'INITIALIZATION_METHOD',
'to be filled')
self.data_manager.physics_description = self.parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled')
self.data_manager.physics_version = self.parser.get_option('CMOR', 'PHYSICS_VERSION', 'to be filled')
self.data_manager.source = self.parser.get_option('CMOR', 'SOURCE', 'to be filled')
Javier Vegas-Regidor
committed
"""
Entry point for the Earth Diagnostics. For more detailed documentation, use -h option
Javier Vegas-Regidor
committed
"""
parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.')
parser.add_argument('-v', '--version', action='version', version='0.1',
help="returns Earth Diagnostics's version number and exit")
parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='DEBUG', type=str,
help="sets file's log level.")
parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='INFO', type=str,
help="sets console's log level")
parser.add_argument('-log', '--logfilepath', default=None, type=str)
parser.add_argument('-f', '--configfile', default='diags.conf', type=str)
args = parser.parse_args()
Log.set_console_level(args.logconsole)
Log.set_file_level(args.logfile)
if args.logfilepath:
Log.set_file(args.logfilepath)
diags = Diags(args.configfile)
if __name__ == "__main__":