#!/usr/bin/env python # coding=utf-8 import Queue import argparse import shutil import threading import pkg_resources import netCDF4 import operator import os from autosubmit.date.chunk_date_lib import * from config import Config from datamanager import DataManager from earthdiagnostics import cdftools from earthdiagnostics.utils import TempFile from earthdiagnostics.diagnostic import Diagnostic from earthdiagnostics.ocean import * from earthdiagnostics.general import * from ocean import ConvectionSites, Gyres, Psi, MaxMoc, AreaMoc, Moc, VerticalMean, VerticalMeanMeters, Interpolate, \ AverageSection, CutSection, MixedLayerSaltContent, Siasiesiv from utils import Utils class EarthDiags(object): """ Launcher class for the diagnostics :param config_file: path to the configuration file :type config_file: str """ # Get the version number from the relevant file. If not, from autosubmit package scriptdir = os.path.abspath(os.path.dirname(__file__)) if not os.path.exists(os.path.join(scriptdir, 'VERSION')): scriptdir = os.path.join(scriptdir, os.path.pardir) version_path = os.path.join(scriptdir, 'VERSION') readme_path = os.path.join(scriptdir, 'README') changes_path = os.path.join(scriptdir, 'CHANGELOG') documentation_path = os.path.join(scriptdir, 'EarthDiagnostics.pdf') if os.path.isfile(version_path): with open(version_path) as f: autosubmit_version = f.read().strip() else: autosubmit_version = pkg_resources.require("earthdiagnostics")[0].version def __init__(self, config_file): Log.debug('Initialising Diags') self.config = Config(config_file) TempFile.scratch_folder = self.config.scratch_dir cdftools.path = self.config.cdftools_path self._create_dic_variables() self.time = dict() Log.debug('Diags ready') Log.info('Running diags for experiment {0}, startdates {1}, members {2}', self.config.experiment.expid, self.config.experiment.startdates, self.config.experiment.members) @staticmethod def parse_args(): """ Entry point for the Earth Diagnostics. For more detailed documentation, use -h option """ try: parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.') parser.add_argument('-v', '--version', action='version', version='3.0.0b8', help="returns Earth Diagnostics's version number and exit") parser.add_argument('--doc', action='store_true', help="opens documentation and exits") parser.add_argument('--clean', action='store_true', help="clean the scratch folder and exits") parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='DEBUG', type=str, help="sets file's log level.") parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='INFO', type=str, help="sets console's log level") parser.add_argument('-log', '--logfilepath', default=None, type=str) parser.add_argument('-f', '--configfile', default='diags.conf', type=str) args = parser.parse_args() if args.doc: Log.info('Opening documentation...') doc_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'EarthDiagnostics.pdf') Utils.execute_shell_command(('xdg-open', doc_path)) Log.result('Documentation opened!') return True Log.set_console_level(args.logconsole) Log.set_file_level(args.logfile) if args.logfilepath: Log.set_file(Utils.expand_path(args.logfilepath)) config_file_path = Utils.expand_path(args.configfile) if not os.path.isfile(config_file_path): Log.critical('Configuration file {0} can not be found', config_file_path) return False diags = EarthDiags(config_file_path) if args.clean: diags.clean() else: diags.run() TempFile.clean() return True except Exception as e: from traceback import format_exc Log.critical('Unhandled exception on EarthDiagnostics: {0}\n{1}', e, format_exc(10)) return False def _create_dic_variables(self): self.dic_variables = dict() self.dic_variables['x'] = 'i' self.dic_variables['y'] = 'j' self.dic_variables['z'] = 'lev' self.dic_variables['nav_lon'] = 'lon' self.dic_variables['nav_lat'] = 'lat' self.dic_variables['nav_lev'] = 'lev' self.dic_variables['time_counter'] = 'time' self.dic_variables['t'] = 'time' def run(self): """ Run the diagnostics """ Log.debug('Using netCDF version {0}', netCDF4.getlibversion()) if not os.path.exists(self.config.scratch_dir): os.makedirs(self.config.scratch_dir) os.chdir(self.config.scratch_dir) self._prepare_mesh_files() self._register_diagnostics() parse_date('20000101') self.data_manager = DataManager(self.config) self.data_manager.prepare_CMOR_files() # Run diagnostics Log.info('Running diagnostics') list_jobs = self.prepare_job_list() time = datetime.datetime.now() Log.info("Starting to compute at {0}", time) num_threads = min(Utils.available_cpu_count(), self.config.max_cores) self.threads = num_threads Log.info('Using {0} threads', self.threads) threads = list() for num_thread in range(0, self.threads): self.time[num_thread] = dict() t = threading.Thread(target=EarthDiags._run_jobs, args=(self, list_jobs, num_thread)) threads.append(t) t.start() for t in threads: t.join() TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) Log.result("Time ellapsed: {0}\n", finish_time - time) self.print_stats() def print_stats(self): Log.info('Time consumed by each diagnostic class') Log.info('--------------------------------------') total = dict() for num_thread in range(0, self.threads): for key, value in self.time[num_thread].items(): if key in total: total[key] += value else: total[key] = value for diag, time in sorted(total.items(), key=operator.itemgetter(1)): Log.info('{0:23} {1:}', diag.__name__, time) def prepare_job_list(self): list_jobs = Queue.Queue() for fulldiag in self.config.get_commands(): Log.info("Adding {0} to diagnostic list", fulldiag) diag_options = fulldiag.split(',') diag_class = Diagnostic.get_diagnostic(diag_options[0]) if diag_class: for job in diag_class.generate_jobs(self, diag_options): list_jobs.put(job) continue else: Log.error('{0} is not an available diagnostic', diag_options[0]) return list_jobs @staticmethod def _register_diagnostics(): Diagnostic.register(MixedLayerSaltContent) Diagnostic.register(Siasiesiv) Diagnostic.register(VerticalMean) Diagnostic.register(VerticalMeanMeters) Diagnostic.register(Interpolate) Diagnostic.register(Moc) Diagnostic.register(AreaMoc) Diagnostic.register(MaxMoc) Diagnostic.register(Psi) Diagnostic.register(Gyres) Diagnostic.register(ConvectionSites) Diagnostic.register(CutSection) Diagnostic.register(AverageSection) Diagnostic.register(MixedLayerHeatContent) Diagnostic.register(HeatContentLayer) Diagnostic.register(HeatContent) Diagnostic.register(MonthlyMean) Diagnostic.register(Rewrite) Diagnostic.register(Relink) def clean(self): Log.info('Removing scratch folder...') if os.path.exists(self.config.scratch_dir): shutil.rmtree(self.config.scratch_dir) Log.result('Scratch folder removed') def _run_jobs(self, queue, numthread): def _run_job(current_job, retrials=1): while retrials >= 0: try: Log.info('Starting {0}', current_job) time = datetime.datetime.now() current_job.compute() time = datetime.datetime.now() - time if type(current_job) in self.time[numthread]: self.time[numthread][type(current_job)] += time else: self.time[numthread][type(current_job)] = time Log.result('Finished {0}', current_job) return True except Exception as ex: retrials -= 1 Log.error('Job {0} failed: {1}', job, ex) return False count = 0 failed_jobs = list() while not queue.empty(): try: job = queue.get(timeout=1) if _run_job(job): count += 1 else: failed_jobs.append(str(job)) queue.task_done() except Queue.Empty: continue if len(failed_jobs) == 0: Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count) else: Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count, count + len(failed_jobs)) for job in failed_jobs: Log.error('Job {0} could not be run', job) return def _prepare_mesh_files(self): Log.info('Copying mesh files') con_files = self.config.con_files model_version = self.config.experiment.model_version restore_meshes = self.config.restore_meshes self._copy_file(os.path.join(con_files, 'mesh_mask_nemo.{0}.nc'.format(model_version)), 'mesh_hgr.nc', restore_meshes) self._link_file('mesh_hgr.nc', 'mesh_zgr.nc') self._link_file('mesh_hgr.nc', 'mask.nc') self._copy_file(os.path.join(con_files, 'new_maskglo.{0}.nc'.format(model_version)), 'new_maskglo.nc', restore_meshes) self._copy_file(os.path.join(con_files, 'mask.regions.{0}.nc'.format(model_version)), 'mask_regions.nc', restore_meshes) self._copy_file(os.path.join(con_files, 'mask.regions.3d.{0}.nc'.format(model_version)), 'mask_regions.3d.nc', restore_meshes) Log.result('Mesh files ready!') def _copy_file(self, source, destiny, force): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return if not force and os.path.exists(destiny): if os.stat(source).st_size == os.stat(destiny).st_size: Log.info('File {0} already exists', destiny) return Log.info('Creating file {0}', destiny) shutil.copy(source, destiny) Log.info('File {0} ready', destiny) Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True) def _link_file(self, source, destiny): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return if os.path.lexists(destiny): os.remove(destiny) os.symlink(source, destiny) Log.info('File {0} ready', destiny) def main(): EarthDiags.parse_args() if __name__ == "__main__": main()