Newer
Older
Javier Vegas-Regidor
committed
#!/usr/bin/env python
Javier Vegas-Regidor
committed
import argparse
import shutil
Javier Vegas-Regidor
committed
import threading
Javier Vegas-Regidor
committed
import pkg_resources
import netCDF4
Javier Vegas-Regidor
committed
import operator
import os
from autosubmit.date.chunk_date_lib import *
Javier Vegas-Regidor
committed
from config import Config
from datamanager import DataManager
from earthdiagnostics import cdftools
from earthdiagnostics.utils import TempFile
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.ocean import *
from earthdiagnostics.general import *
from ocean import ConvectionSites, Gyres, Psi, MaxMoc, AreaMoc, Moc, VerticalMean, VerticalMeanMeters, Interpolate, \
AverageSection, CutSection, MixedLayerSaltContent, Siasiesiv
from utils import Utils
:param config_file: path to the configuration file
:type config_file: str
Javier Vegas-Regidor
committed
# Get the version number from the relevant file. If not, from autosubmit package
scriptdir = os.path.abspath(os.path.dirname(__file__))
if not os.path.exists(os.path.join(scriptdir, 'VERSION')):
scriptdir = os.path.join(scriptdir, os.path.pardir)
version_path = os.path.join(scriptdir, 'VERSION')
readme_path = os.path.join(scriptdir, 'README')
changes_path = os.path.join(scriptdir, 'CHANGELOG')
if os.path.isfile(version_path):
with open(version_path) as f:
autosubmit_version = f.read().strip()
else:
autosubmit_version = pkg_resources.require("earthdiagnostics")[0].version
def __init__(self, config_file):
Javier Vegas-Regidor
committed
self.config = Config(config_file)
Javier Vegas-Regidor
committed
TempFile.scratch_folder = self.config.scratch_dir
cdftools.path = self.config.cdftools_path
Javier Vegas-Regidor
committed
self.time = dict()
Javier Vegas-Regidor
committed
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
@staticmethod
def parse_args():
"""
Entry point for the Earth Diagnostics. For more detailed documentation, use -h option
"""
try:
parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.')
parser.add_argument('-v', '--version', action='version', version='3.0.0b8',
help="returns Earth Diagnostics's version number and exit")
parser.add_argument('--doc', action='store_true',
help="opens documentation and exits")
parser.add_argument('--clean', action='store_true',
help="clean the scratch folder and exits")
parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='DEBUG', type=str,
help="sets file's log level.")
parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='INFO', type=str,
help="sets console's log level")
parser.add_argument('-log', '--logfilepath', default=None, type=str)
parser.add_argument('-f', '--configfile', default='diags.conf', type=str)
args = parser.parse_args()
if args.doc:
Log.info('Opening documentation...')
Utils.execute_shell_command(('xdg-open', os.path.join(os.path.dirname(os.path.realpath(__file__)), '..',
'EarthDiagnostics.pdf')))
Log.result('Documentation opened!')
return True
Log.set_console_level(args.logconsole)
Log.set_file_level(args.logfile)
if args.logfilepath:
Log.set_file(Utils.expand_path(args.logfilepath))
diags = EarthDiags(Utils.expand_path(args.configfile))
if args.clean:
diags.clean()
else:
diags.run()
TempFile.clean()
except Exception as e:
from traceback import format_exc
Log.critical('Unhandled exception on EarthDiagnostics: {0}\n{1}', e, format_exc(10))
return False
def _create_dic_variables(self):
self.dic_variables = dict()
self.dic_variables['x'] = 'i'
self.dic_variables['y'] = 'j'
self.dic_variables['z'] = 'lev'
self.dic_variables['nav_lon'] = 'lon'
self.dic_variables['nav_lat'] = 'lat'
self.dic_variables['nav_lev'] = 'lev'
self.dic_variables['time_counter'] = 'time'
self.dic_variables['t'] = 'time'
Log.debug('Using netCDF version {0}', netCDF4.getlibversion())
Javier Vegas-Regidor
committed
if not os.path.exists(self.config.scratch_dir):
os.makedirs(self.config.scratch_dir)
os.chdir(self.config.scratch_dir)
Diagnostic.register(MixedLayerSaltContent)
Diagnostic.register(Siasiesiv)
Diagnostic.register(VerticalMean)
Diagnostic.register(VerticalMeanMeters)
Diagnostic.register(Interpolate)
Diagnostic.register(Moc)
Diagnostic.register(AreaMoc)
Diagnostic.register(MaxMoc)
Diagnostic.register(Psi)
Diagnostic.register(Gyres)
Diagnostic.register(ConvectionSites)
Diagnostic.register(CutSection)
Diagnostic.register(AverageSection)
Diagnostic.register(MixedLayerHeatContent)
Diagnostic.register(HeatContentLayer)
Diagnostic.register(HeatContent)
Diagnostic.register(MonthlyMean)
Diagnostic.register(Rewrite)
Javier Vegas-Regidor
committed
self.data_manager = DataManager(self.config)
self.data_manager.prepare_CMOR_files()
# Run diagnostics
Log.info('Running diagnostics')
Javier Vegas-Regidor
committed
for fulldiag in self.config.get_commands():
Log.info("Adding {0} to diagnostic list", fulldiag)
Javier Vegas-Regidor
committed
diag_options = fulldiag.split(',')
diag_class = Diagnostic.get_diagnostic(diag_options[0])
if diag_class:
for job in diag_class.generate_jobs(self, diag_options):
list_jobs.put(job)
Javier Vegas-Regidor
committed
continue
Javier Vegas-Regidor
committed
Log.error('{0} is not an available diagnostic', diag_options[0])
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
Javier Vegas-Regidor
committed
num_threads = min(Utils.available_cpu_count(), self.config.max_cores)
Javier Vegas-Regidor
committed
Log.info('Using {0} threads', num_threads)
Javier Vegas-Regidor
committed
threads = list()
Javier Vegas-Regidor
committed
for num_thread in range(0, num_threads):
self.time[num_thread] = dict()
t = threading.Thread(target=EarthDiags._run_jobs, args=(self, list_jobs, num_thread))
Javier Vegas-Regidor
committed
threads.append(t)
t.start()
Javier Vegas-Regidor
committed
TempFile.clean()
finsih_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finsih_time)
Javier Vegas-Regidor
committed
Log.result("Time ellapsed: {0}\n", finsih_time - time)
Javier Vegas-Regidor
committed
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
Javier Vegas-Regidor
committed
total = dict()
Javier Vegas-Regidor
committed
for num_thread in range(0, num_threads):
for key, value in self.time[num_thread].items():
Javier Vegas-Regidor
committed
if key in total:
total[key] += value
else:
total[key] = value
for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
Javier Vegas-Regidor
committed
Log.info('{0:23} {1:}', diag.__name__, time)
def clean(self):
Log.info('Removing scratch folder...')
Javier Vegas-Regidor
committed
if os.path.exists(self.config.scratch_dir):
shutil.rmtree(self.config.scratch_dir)
Log.result('Scratch folder removed')
Javier Vegas-Regidor
committed
def _run_jobs(self, queue, numthread):
def _run_job(current_job, retrials=1):
Javier Vegas-Regidor
committed
while retrials >= 0:
Log.info('Starting {0}', current_job)
Javier Vegas-Regidor
committed
time = datetime.datetime.now()
Javier Vegas-Regidor
committed
time = datetime.datetime.now() - time
if type(current_job) in self.time[numthread]:
self.time[numthread][type(current_job)] += time
else:
self.time[numthread][type(current_job)] = time
return True
except Exception as ex:
retrials -= 1
while not queue.empty():
try:
job = queue.get(timeout=1)
if _run_job(job):
count += 1
queue.task_done()
except Queue.Empty:
continue
if len(failed_jobs) == 0:
Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count)
else:
Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count,
count + len(failed_jobs))
for job in failed_jobs:
Log.error('Job {0} could not be run', job)
Javier Vegas-Regidor
committed
def _prepare_mesh_files(self):
Log.info('Copying mesh files')
Javier Vegas-Regidor
committed
con_files = self.config.con_files
model_version = self.config.experiment.model_version
restore_meshes = self.config.restore_meshes
self._copy_file(os.path.join(con_files, 'mesh_mask_nemo.{0}.nc'.format(model_version)), 'mesh_hgr.nc',
restore_meshes)
self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
self._link_file('mesh_hgr.nc', 'mask.nc')
Javier Vegas-Regidor
committed
self._copy_file(os.path.join(con_files, 'new_maskglo.{0}.nc'.format(model_version)), 'new_maskglo.nc',
restore_meshes)
self._copy_file(os.path.join(con_files, 'mask.regions.{0}.nc'.format(model_version)),
'mask_regions.nc', restore_meshes)
self._copy_file(os.path.join(con_files, 'mask.regions.3d.{0}.nc'.format(model_version)),
'mask_regions.3d.nc', restore_meshes)
def _copy_file(self, source, destiny, force):
Javier Vegas-Regidor
committed
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
if not force and os.path.exists(destiny):
if os.stat(source).st_size == os.stat(destiny).st_size:
Log.info('File {0} already exists', destiny)
return
Log.info('Creating file {0}', destiny)
shutil.copy(source, destiny)
Log.info('File {0} ready', destiny)
Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True)
def _link_file(self, source, destiny):
if not os.path.exists(source):
Javier Vegas-Regidor
committed
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
if os.path.lexists(destiny):
os.remove(destiny)
os.symlink(source, destiny)
Log.info('File {0} ready', destiny)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
EarthDiags.parse_args()
if __name__ == "__main__":