Newer
Older
Javier Vegas-Regidor
committed
#!/usr/bin/env python
Javier Vegas-Regidor
committed
import argparse
import shutil
Javier Vegas-Regidor
committed
import threading
import netCDF4
Javier Vegas-Regidor
committed
import operator
import os
from autosubmit.date.chunk_date_lib import *
Javier Vegas-Regidor
committed
from config import Config
from datamanager import DataManager
from earthdiagnostics import cdftools
from earthdiagnostics.utils import TempFile
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.ocean import *
from earthdiagnostics.general import *
from ocean import ConvectionSites, Gyres, Psi, MaxMoc, AreaMoc, Moc, VerticalMean, VerticalMeanMeters, Interpolate, \
AverageSection, CutSection, MixedLayerSaltContent, Siasiesiv
from utils import Utils
:param config_file: path to the configuration file
:type config_file: str
def __init__(self, config_file):
Javier Vegas-Regidor
committed
self.config = Config(config_file)
Javier Vegas-Regidor
committed
TempFile.scratch_folder = self.config.scratch_dir
cdftools.path = self.config.cdftools_path
Javier Vegas-Regidor
committed
self.time = dict()
def _create_dic_variables(self):
self.dic_variables = dict()
self.dic_variables['x'] = 'i'
self.dic_variables['y'] = 'j'
self.dic_variables['z'] = 'lev'
self.dic_variables['nav_lon'] = 'lon'
self.dic_variables['nav_lat'] = 'lat'
self.dic_variables['nav_lev'] = 'lev'
self.dic_variables['time_counter'] = 'time'
self.dic_variables['t'] = 'time'
Log.debug('Using netCDF version {0}', netCDF4.getlibversion())
Javier Vegas-Regidor
committed
if not os.path.exists(self.config.scratch_dir):
os.makedirs(self.config.scratch_dir)
os.chdir(self.config.scratch_dir)
Diagnostic.register(MixedLayerSaltContent)
Diagnostic.register(Siasiesiv)
Diagnostic.register(VerticalMean)
Diagnostic.register(VerticalMeanMeters)
Diagnostic.register(Interpolate)
Diagnostic.register(Moc)
Diagnostic.register(AreaMoc)
Diagnostic.register(MaxMoc)
Diagnostic.register(Psi)
Diagnostic.register(Gyres)
Diagnostic.register(ConvectionSites)
Diagnostic.register(CutSection)
Diagnostic.register(AverageSection)
Diagnostic.register(MixedLayerHeatContent)
Diagnostic.register(HeatContentLayer)
Diagnostic.register(HeatContent)
Diagnostic.register(Rewrite)
Javier Vegas-Regidor
committed
self.data_manager = DataManager(self.config)
self.data_manager.prepare_CMOR_files()
# Run diagnostics
Log.info('Running diagnostics')
Javier Vegas-Regidor
committed
for fulldiag in self.config.get_commands():
Log.info("Adding {0} to diagnostic list", fulldiag)
Javier Vegas-Regidor
committed
diag_options = fulldiag.split(',')
diag_class = Diagnostic.get_diagnostic(diag_options[0])
if diag_class:
for job in diag_class.generate_jobs(self, diag_options):
list_jobs.put(job)
Javier Vegas-Regidor
committed
continue
Javier Vegas-Regidor
committed
Log.error('{0} is not an available diagnostic', diag_options[0])
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
Javier Vegas-Regidor
committed
num_threads = min(Utils.available_cpu_count(), self.config.max_cores)
Javier Vegas-Regidor
committed
Log.info('Using {0} threads', num_threads)
Javier Vegas-Regidor
committed
threads = list()
Javier Vegas-Regidor
committed
for num_thread in range(0, num_threads):
self.time[num_thread] = dict()
t = threading.Thread(target=EarthDiags._run_jobs, args=(self, list_jobs, num_thread))
Javier Vegas-Regidor
committed
threads.append(t)
t.start()
Javier Vegas-Regidor
committed
TempFile.clean()
finsih_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finsih_time)
Javier Vegas-Regidor
committed
Log.result("Time ellapsed: {0}\n", finsih_time - time)
Javier Vegas-Regidor
committed
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
Javier Vegas-Regidor
committed
total = dict()
Javier Vegas-Regidor
committed
for num_thread in range(0, num_threads):
for key, value in self.time[num_thread].items():
Javier Vegas-Regidor
committed
if key in total:
total[key] += value
else:
total[key] = value
for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
Javier Vegas-Regidor
committed
Log.info('{0:23} {1:}', diag.__name__, time)
def clean(self):
Log.info('Removing scratch folder...')
Javier Vegas-Regidor
committed
if os.path.exists(self.config.scratch_dir):
shutil.rmtree(self.config.scratch_dir)
Log.result('Scratch folder removed')
Javier Vegas-Regidor
committed
def _run_jobs(self, queue, numthread):
def _run_job(current_job, retrials=1):
Javier Vegas-Regidor
committed
while retrials >= 0:
Log.info('Starting {0}', current_job)
Javier Vegas-Regidor
committed
time = datetime.datetime.now()
Javier Vegas-Regidor
committed
time = datetime.datetime.now() - time
if type(current_job) in self.time[numthread]:
self.time[numthread][type(current_job)] += time
else:
self.time[numthread][type(current_job)] = time
return True
except Exception as ex:
retrials -= 1
while not queue.empty():
try:
job = queue.get(timeout=1)
if _run_job(job):
count += 1
queue.task_done()
except Queue.Empty:
continue
if len(failed_jobs) == 0:
Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count)
else:
Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count,
count + len(failed_jobs))
for job in failed_jobs:
Log.error('Job {0} could not be run', job)
Javier Vegas-Regidor
committed
def _prepare_mesh_files(self):
Log.info('Copying mesh files')
Javier Vegas-Regidor
committed
con_files = self.config.con_files
model_version = self.config.experiment.model_version
restore_meshes = self.config.restore_meshes
self._copy_file(os.path.join(con_files, 'mesh_mask_nemo.{0}.nc'.format(model_version)), 'mesh_hgr.nc',
restore_meshes)
self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
self._link_file('mesh_hgr.nc', 'mask.nc')
Javier Vegas-Regidor
committed
self._copy_file(os.path.join(con_files, 'new_maskglo.{0}.nc'.format(model_version)), 'new_maskglo.nc',
restore_meshes)
self._copy_file(os.path.join(con_files, 'mask.regions.{0}.nc'.format(model_version)),
'mask_regions.nc', restore_meshes)
self._copy_file(os.path.join(con_files, 'mask.regions.3d.{0}.nc'.format(model_version)),
'mask_regions.3d.nc', restore_meshes)
def _copy_file(self, source, destiny, force):
Javier Vegas-Regidor
committed
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
if not force and os.path.exists(destiny):
if os.stat(source).st_size == os.stat(destiny).st_size:
Log.info('File {0} already exists', destiny)
return
Log.info('Creating file {0}', destiny)
shutil.copy(source, destiny)
Log.info('File {0} ready', destiny)
Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True)
def _link_file(self, source, destiny):
if not os.path.exists(source):
Javier Vegas-Regidor
committed
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
if os.path.lexists(destiny):
os.remove(destiny)
os.symlink(source, destiny)
Log.info('File {0} ready', destiny)
Javier Vegas-Regidor
committed
"""
Entry point for the Earth Diagnostics. For more detailed documentation, use -h option
Javier Vegas-Regidor
committed
"""
parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.')
parser.add_argument('-v', '--version', action='version', version='3.0.0b3',
help="returns Earth Diagnostics's version number and exit")
Javier Vegas-Regidor
committed
parser.add_argument('--doc', action='store_true',
help="opens documentation and exits")
parser.add_argument('--clean', action='store_true',
help="clean the scratch folder and exits")
parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='DEBUG', type=str,
help="sets file's log level.")
parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='INFO', type=str,
help="sets console's log level")
parser.add_argument('-log', '--logfilepath', default=None, type=str)
parser.add_argument('-f', '--configfile', default='diags.conf', type=str)
args = parser.parse_args()
Javier Vegas-Regidor
committed
if args.doc:
Log.info('Opening documentation...')
Utils.execute_shell_command(('xdg-open', os.path.join(os.path.dirname(os.path.realpath(__file__)), '..',
'EarthDiagnostics.pdf')))
Log.result('Documentation opened!')
return
Log.set_console_level(args.logconsole)
Log.set_file_level(args.logfile)
Javier Vegas-Regidor
committed
Log.set_file(Utils.expand_path(args.logfilepath))
diags = EarthDiags(Utils.expand_path(args.configfile))
if args.clean:
diags.clean()
else:
diags.run()
if __name__ == "__main__":