Newer
Older
Javier Vegas-Regidor
committed
#!/usr/bin/env python
Javier Vegas-Regidor
committed
import argparse
import shutil
Javier Vegas-Regidor
committed
import threading
Javier Vegas-Regidor
committed
import pkg_resources
import netCDF4
Javier Vegas-Regidor
committed
import operator
import os
from autosubmit.date.chunk_date_lib import *
Javier Vegas-Regidor
committed
from config import Config
from datamanager import DataManager
from earthdiagnostics import cdftools
from earthdiagnostics.utils import TempFile
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.ocean import *
from earthdiagnostics.general import *
from ocean import ConvectionSites, Gyres, Psi, MaxMoc, AreaMoc, Moc, VerticalMean, VerticalMeanMeters, Interpolate, \
AverageSection, CutSection, MixedLayerSaltContent, Siasiesiv
from utils import Utils
:param config_file: path to the configuration file
:type config_file: str
Javier Vegas-Regidor
committed
# Get the version number from the relevant file. If not, from autosubmit package
scriptdir = os.path.abspath(os.path.dirname(__file__))
if not os.path.exists(os.path.join(scriptdir, 'VERSION')):
scriptdir = os.path.join(scriptdir, os.path.pardir)
version_path = os.path.join(scriptdir, 'VERSION')
readme_path = os.path.join(scriptdir, 'README')
changes_path = os.path.join(scriptdir, 'CHANGELOG')
documentation_path = os.path.join(scriptdir, 'EarthDiagnostics.pdf')
Javier Vegas-Regidor
committed
if os.path.isfile(version_path):
with open(version_path) as f:
autosubmit_version = f.read().strip()
else:
autosubmit_version = pkg_resources.require("earthdiagnostics")[0].version
def __init__(self, config_file):
Javier Vegas-Regidor
committed
self.config = Config(config_file)
Javier Vegas-Regidor
committed
TempFile.scratch_folder = self.config.scratch_dir
cdftools.path = self.config.cdftools_path
Javier Vegas-Regidor
committed
self.time = dict()
Log.info('Running diags for experiment {0}, startdates {1}, members {2}', self.config.experiment.expid,
self.config.experiment.startdates, self.config.experiment.members)
Javier Vegas-Regidor
committed
@staticmethod
def parse_args():
"""
Entry point for the Earth Diagnostics. For more detailed documentation, use -h option
"""
try:
parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.')
parser.add_argument('-v', '--version', action='version', version='3.0.0b8',
help="returns Earth Diagnostics's version number and exit")
parser.add_argument('--doc', action='store_true',
help="opens documentation and exits")
parser.add_argument('--clean', action='store_true',
help="clean the scratch folder and exits")
parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='DEBUG', type=str,
help="sets file's log level.")
parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='INFO', type=str,
help="sets console's log level")
parser.add_argument('-log', '--logfilepath', default=None, type=str)
parser.add_argument('-f', '--configfile', default='diags.conf', type=str)
args = parser.parse_args()
if args.doc:
Log.info('Opening documentation...')
doc_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'EarthDiagnostics.pdf')
Utils.execute_shell_command(('xdg-open', doc_path))
Javier Vegas-Regidor
committed
Log.result('Documentation opened!')
return True
Log.set_console_level(args.logconsole)
Log.set_file_level(args.logfile)
if args.logfilepath:
Log.set_file(Utils.expand_path(args.logfilepath))
config_file_path = Utils.expand_path(args.configfile)
if not os.path.isfile(config_file_path):
Log.critical('Configuration file {0} can not be found', config_file_path)
return False
diags = EarthDiags(config_file_path)
Javier Vegas-Regidor
committed
if args.clean:
diags.clean()
else:
diags.run()
TempFile.clean()
Javier Vegas-Regidor
committed
except Exception as e:
from traceback import format_exc
Log.critical('Unhandled exception on EarthDiagnostics: {0}\n{1}', e, format_exc(10))
return False
def _create_dic_variables(self):
self.dic_variables = dict()
self.dic_variables['x'] = 'i'
self.dic_variables['y'] = 'j'
self.dic_variables['z'] = 'lev'
self.dic_variables['nav_lon'] = 'lon'
self.dic_variables['nav_lat'] = 'lat'
self.dic_variables['nav_lev'] = 'lev'
self.dic_variables['time_counter'] = 'time'
self.dic_variables['t'] = 'time'
Log.debug('Using netCDF version {0}', netCDF4.getlibversion())
Javier Vegas-Regidor
committed
if not os.path.exists(self.config.scratch_dir):
os.makedirs(self.config.scratch_dir)
os.chdir(self.config.scratch_dir)
Javier Vegas-Regidor
committed
self.data_manager = DataManager(self.config)
self.data_manager.prepare_CMOR_files()
# Run diagnostics
Log.info('Running diagnostics')
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
Javier Vegas-Regidor
committed
num_threads = min(Utils.available_cpu_count(), self.config.max_cores)
self.threads = num_threads
Log.info('Using {0} threads', self.threads)
Javier Vegas-Regidor
committed
threads = list()
for num_thread in range(0, self.threads):
Javier Vegas-Regidor
committed
self.time[num_thread] = dict()
t = threading.Thread(target=EarthDiags._run_jobs, args=(self, list_jobs, num_thread))
Javier Vegas-Regidor
committed
threads.append(t)
t.start()
Javier Vegas-Regidor
committed
TempFile.clean()
finsih_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finsih_time)
Javier Vegas-Regidor
committed
Log.result("Time ellapsed: {0}\n", finsih_time - time)
self.print_stats()
def print_stats(self):
Javier Vegas-Regidor
committed
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
Javier Vegas-Regidor
committed
total = dict()
for num_thread in range(0, self.threads):
Javier Vegas-Regidor
committed
for key, value in self.time[num_thread].items():
Javier Vegas-Regidor
committed
if key in total:
total[key] += value
else:
total[key] = value
for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
Javier Vegas-Regidor
committed
Log.info('{0:23} {1:}', diag.__name__, time)
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def prepare_job_list(self):
list_jobs = Queue.Queue()
for fulldiag in self.config.get_commands():
Log.info("Adding {0} to diagnostic list", fulldiag)
diag_options = fulldiag.split(',')
diag_class = Diagnostic.get_diagnostic(diag_options[0])
if diag_class:
for job in diag_class.generate_jobs(self, diag_options):
list_jobs.put(job)
continue
else:
Log.error('{0} is not an available diagnostic', diag_options[0])
return list_jobs
def _register_diagnostics(self):
Diagnostic.register(MixedLayerSaltContent)
Diagnostic.register(Siasiesiv)
Diagnostic.register(VerticalMean)
Diagnostic.register(VerticalMeanMeters)
Diagnostic.register(Interpolate)
Diagnostic.register(Moc)
Diagnostic.register(AreaMoc)
Diagnostic.register(MaxMoc)
Diagnostic.register(Psi)
Diagnostic.register(Gyres)
Diagnostic.register(ConvectionSites)
Diagnostic.register(CutSection)
Diagnostic.register(AverageSection)
Diagnostic.register(MixedLayerHeatContent)
Diagnostic.register(HeatContentLayer)
Diagnostic.register(HeatContent)
Diagnostic.register(MonthlyMean)
Diagnostic.register(Rewrite)
Diagnostic.register(Relink)
def clean(self):
Log.info('Removing scratch folder...')
Javier Vegas-Regidor
committed
if os.path.exists(self.config.scratch_dir):
shutil.rmtree(self.config.scratch_dir)
Log.result('Scratch folder removed')
Javier Vegas-Regidor
committed
def _run_jobs(self, queue, numthread):
def _run_job(current_job, retrials=1):
Javier Vegas-Regidor
committed
while retrials >= 0:
Log.info('Starting {0}', current_job)
Javier Vegas-Regidor
committed
time = datetime.datetime.now()
Javier Vegas-Regidor
committed
time = datetime.datetime.now() - time
if type(current_job) in self.time[numthread]:
self.time[numthread][type(current_job)] += time
else:
self.time[numthread][type(current_job)] = time
return True
except Exception as ex:
retrials -= 1
while not queue.empty():
try:
job = queue.get(timeout=1)
if _run_job(job):
count += 1
queue.task_done()
except Queue.Empty:
continue
if len(failed_jobs) == 0:
Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count)
else:
Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count,
count + len(failed_jobs))
for job in failed_jobs:
Log.error('Job {0} could not be run', job)
Javier Vegas-Regidor
committed
def _prepare_mesh_files(self):
Log.info('Copying mesh files')
Javier Vegas-Regidor
committed
con_files = self.config.con_files
model_version = self.config.experiment.model_version
restore_meshes = self.config.restore_meshes
self._copy_file(os.path.join(con_files, 'mesh_mask_nemo.{0}.nc'.format(model_version)), 'mesh_hgr.nc',
restore_meshes)
self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
self._link_file('mesh_hgr.nc', 'mask.nc')
Javier Vegas-Regidor
committed
self._copy_file(os.path.join(con_files, 'new_maskglo.{0}.nc'.format(model_version)), 'new_maskglo.nc',
restore_meshes)
self._copy_file(os.path.join(con_files, 'mask.regions.{0}.nc'.format(model_version)),
'mask_regions.nc', restore_meshes)
self._copy_file(os.path.join(con_files, 'mask.regions.3d.{0}.nc'.format(model_version)),
'mask_regions.3d.nc', restore_meshes)
def _copy_file(self, source, destiny, force):
Javier Vegas-Regidor
committed
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
if not force and os.path.exists(destiny):
if os.stat(source).st_size == os.stat(destiny).st_size:
Log.info('File {0} already exists', destiny)
return
Log.info('Creating file {0}', destiny)
shutil.copy(source, destiny)
Log.info('File {0} ready', destiny)
Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True)
def _link_file(self, source, destiny):
if not os.path.exists(source):
Javier Vegas-Regidor
committed
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
if os.path.lexists(destiny):
os.remove(destiny)
os.symlink(source, destiny)
Log.info('File {0} ready', destiny)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
EarthDiags.parse_args()
if __name__ == "__main__":