#!/usr/bin/env python # coding=utf-8 import Queue import argparse import shutil import threading import pkg_resources import netCDF4 import operator import os from bscearth.utils.date import * import bscearth.utils.path from earthdiagnostics.constants import Basins from earthdiagnostics.config import Config from earthdiagnostics.cmormanager import CMORManager from earthdiagnostics.threddsmanager import THREDDSManager from earthdiagnostics import cdftools from earthdiagnostics.utils import TempFile, Utils from earthdiagnostics.diagnostic import Diagnostic from earthdiagnostics.ocean import * from earthdiagnostics.general import * from earthdiagnostics.statistics import * from earthdiagnostics.variable import VariableManager from earthdiagnostics.diagnostic import DiagnosticOptionError import tempfile class EarthDiags(object): """ Launcher class for the diagnostics :param config_file: path to the configuration file :type config_file: str """ # Get the version number from the relevant file. If not, from autosubmit package scriptdir = os.path.abspath(os.path.dirname(__file__)) if not os.path.exists(os.path.join(scriptdir, 'VERSION')): scriptdir = os.path.join(scriptdir, os.path.pardir) version_path = os.path.join(scriptdir, 'VERSION') readme_path = os.path.join(scriptdir, 'README') changes_path = os.path.join(scriptdir, 'CHANGELOG') documentation_path = os.path.join(scriptdir, 'EarthDiagnostics.pdf') if os.path.isfile(version_path): with open(version_path) as f: version = f.read().strip() else: version = pkg_resources.require("earthdiagnostics")[0].version def __init__(self, config_file): Log.info('Initialising Earth Diagnostics Version {0}', EarthDiags.version) self.config = Config(config_file) TempFile.scratch_folder = self.config.scratch_dir cdftools.path = self.config.cdftools_path self._create_dic_variables() self.time = dict() self.data_manager = None self.threads = None self.had_errors = False Log.debug('Diags ready') Log.info('Running diags for experiment {0}, startdates {1}, members {2}', self.config.experiment.expid, self.config.experiment.startdates, self.config.experiment.members) @staticmethod def parse_args(): """ Entry point for the Earth Diagnostics. For more detailed documentation, use -h option """ # try: parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.') parser.add_argument('-v', '--version', action='version', version=EarthDiags.version, help="returns Earth Diagnostics's version number and exit") parser.add_argument('--doc', action='store_true', help="opens documentation and exits") parser.add_argument('--clean', action='store_true', help="clean the scratch folder and exits") parser.add_argument('--report', action='store_true', help="generates a report about the available files") parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='DEBUG', type=str, help="sets file's log level.") parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='INFO', type=str, help="sets console's log level") parser.add_argument('-log', '--logfilepath', default=None, type=str) parser.add_argument('-f', '--configfile', default='diags.conf', type=str) args = parser.parse_args() if args.doc: Log.info('Opening documentation...') doc_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'EarthDiagnostics.pdf') Utils.execute_shell_command(('xdg-open', doc_path)) Log.result('Documentation opened!') return True Log.set_console_level(args.logconsole) Log.set_file_level(args.logfile) if Log.console_handler.level <= Log.DEBUG: Utils.cdo.debug = True Utils.nco.debug = False # This is due to a bug in nco. Must change when it's solved if args.logfilepath: Log.set_file(bscearth.utils.path.expand_path(args.logfilepath)) config_file_path = bscearth.utils.path.expand_path(args.configfile) if not os.path.isfile(config_file_path): Log.critical('Configuration file {0} can not be found', config_file_path) return False try: diags = EarthDiags(config_file_path) if args.clean: result = diags.clean() elif args.report: result = diags.report() else: result = diags.run() except Exception: raise finally: TempFile.clean() return result def _create_dic_variables(self): self.dic_variables = dict() self.dic_variables['x'] = 'i' self.dic_variables['y'] = 'j' self.dic_variables['z'] = 'lev' self.dic_variables['nav_lon'] = 'lon' self.dic_variables['nav_lat'] = 'lat' self.dic_variables['nav_lev'] = 'lev' self.dic_variables['time_counter'] = 'time' self.dic_variables['t'] = 'time' def run(self): """ Run the diagnostics """ self.had_errors = False Log.debug('Using netCDF version {0}', netCDF4.getlibversion()) self._prepare_scratch_dir() self._prepare_mesh_files() self._initialize_basins() self._register_diagnostics() self._prepare_data_manager() # Run diagnostics Log.info('Running diagnostics') list_jobs = self.prepare_job_list() self._failed_jobs = [] time = datetime.datetime.now() Log.info("Starting to compute at {0}", time) self.threads = Utils.available_cpu_count() if 0 < self.config.max_cores < self.threads: self.threads = self.config.max_cores Log.info('Using {0} threads', self.threads) threads = list() for num_thread in range(0, self.threads): self.time[num_thread] = dict() t = threading.Thread(target=EarthDiags._run_jobs, args=(self, list_jobs, num_thread)) threads.append(t) t.start() for t in threads: t.join() TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) Log.result("Elapsed time: {0}\n", finish_time - time) self.print_errors() self.print_stats() if self.config.auto_clean: self._remove_scratch_dir() return not self.had_errors def _initialize_basins(self): self._read_basins_from_file('mask_regions.nc') self._read_basins_from_file('mask_regions.3d.nc') def _read_basins_from_file(self, filename): if not os.path.isfile(filename): return handler = Utils.openCdf(filename) Basins().get_available_basins(handler) handler.close() def _prepare_scratch_dir(self): if self.config.use_ramdisk: self._remove_scratch_dir() tempfile.mkdtemp(dir='/dev/shm') os.symlink(tempfile.mkdtemp(dir='/dev/shm'), self.config.scratch_dir) else: if not os.path.exists(self.config.scratch_dir): os.makedirs(self.config.scratch_dir) os.chdir(self.config.scratch_dir) def _prepare_data_manager(self): if self.config.data_adaptor == 'CMOR': self.data_manager = CMORManager(self.config) elif self.config.data_adaptor == 'THREDDS': self.data_manager = THREDDSManager(self.config) self.data_manager.prepare() def print_stats(self): Log.info('Time consumed by each diagnostic class') Log.info('--------------------------------------') total = dict() for num_thread in range(0, self.threads): for key, value in self.time[num_thread].items(): if key in total: total[key] += value else: total[key] = value for diag, time in sorted(total.items(), key=operator.itemgetter(1)): Log.info('{0:23} {1:}', diag.__name__, time) def print_errors(self): if len(self._failed_jobs) == 0: return self.had_errors = True Log.error('Failed jobs') Log.error('-----------') for job in self._failed_jobs: Log.error(str(job)) Log.info('') def prepare_job_list(self): list_jobs = Queue.Queue() for fulldiag in self.config.get_commands(): Log.info("Adding {0} to diagnostic list", fulldiag) diag_options = fulldiag.split(',') diag_class = Diagnostic.get_diagnostic(diag_options[0]) if diag_class: try: for job in diag_class.generate_jobs(self, diag_options): list_jobs.put(job) continue except DiagnosticOptionError as ex: Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex) self.had_errors = True else: Log.error('{0} is not an available diagnostic', diag_options[0]) self.had_errors = True return list_jobs @staticmethod def _register_diagnostics(): EarthDiags._register_ocean_diagnostics() EarthDiags._register_general_diagnostics() EarthDiags._register_stats_diagnostics() @staticmethod def _register_stats_diagnostics(): Diagnostic.register(MonthlyPercentile) Diagnostic.register(ClimatologicalPercentile) @staticmethod def _register_general_diagnostics(): Diagnostic.register(DailyMean) Diagnostic.register(MonthlyMean) Diagnostic.register(YearlyMean) Diagnostic.register(SimplifyDimensions) Diagnostic.register(Relink) Diagnostic.register(RelinkAll) Diagnostic.register(Scale) Diagnostic.register(Attribute) Diagnostic.register(SelectLevels) Diagnostic.register(Module) @staticmethod def _register_ocean_diagnostics(): Diagnostic.register(MixedLayerSaltContent) Diagnostic.register(Siasiesiv) Diagnostic.register(VerticalMean) Diagnostic.register(VerticalMeanMeters) Diagnostic.register(Interpolate) Diagnostic.register(InterpolateCDO) Diagnostic.register(Moc) Diagnostic.register(AreaMoc) Diagnostic.register(MaxMoc) Diagnostic.register(Psi) Diagnostic.register(Gyres) Diagnostic.register(ConvectionSites) Diagnostic.register(CutSection) Diagnostic.register(AverageSection) Diagnostic.register(MixedLayerHeatContent) Diagnostic.register(HeatContentLayer) Diagnostic.register(HeatContent) Diagnostic.register(RegionMean) Diagnostic.register(Rotation) Diagnostic.register(Mxl) Diagnostic.register(VerticalGradient) Diagnostic.register(MaskLand) def clean(self): Log.info('Removing scratch folder...') self._remove_scratch_dir() Log.result('Scratch folder removed') return True def _remove_scratch_dir(self): if os.path.islink(self.config.scratch_dir): shutil.rmtree(os.path.realpath(self.config.scratch_dir)) os.remove(self.config.scratch_dir) elif os.path.isdir(self.config.scratch_dir): shutil.rmtree(self.config.scratch_dir) def report(self): Log.info('Looking for existing vars...') self._prepare_data_manager() for startdate in self.config.experiment.startdates: for member in self.config.experiment.members: results = self._get_variable_report(startdate, member) report_path = os.path.join(self.config.scratch_dir, '{0}_{1}.report'.format(startdate, self.config.experiment.get_member_str(member))) Utils.create_folder_tree(self.config.scratch_dir) self.create_report(report_path, results) Log.result('Report finished') return True def _get_variable_report(self, startdate, member): var_manager = VariableManager() results = list() for var in var_manager.get_all_variables(): if var.priority is None or var.domain is None: continue if var.priority > 3: continue for table in var.tables: if not self.data_manager.file_exists(var.domain, var.short_name, startdate, member, 1, frequency=table.frequency): results.append((var, table)) Log.debug('Variable {0.short_name} not found in {1.name}', var, table) else: Log.result('Variable {0.short_name} found in {1.name}', var, table) return results def create_report(self, report_path, results): tables = set([result[1].name for result in results]) for table in tables: file_handler = open('{0}.{1}'.format(report_path, table), 'w') table_results = [result for result in results if result[1].name == table] file_handler.write('\nTable {0}\n'.format(table)) file_handler.write('===================================\n') priorities = set([int(result[0].priority) for result in table_results]) priorities = sorted(priorities) for priority in priorities: priority_results = [result for result in table_results if int(result[0].priority) == priority] priority_results = sorted(priority_results, key=lambda result: result[0].short_name) file_handler.write('\nMissing variables with priority {0}:\n'.format(priority)) file_handler.write('--------------------------------------\n') for var, table in priority_results: file_handler.write('{0:12}: {1}\n'.format(var.short_name, var.standard_name)) file_handler.flush() file_handler.close() def _run_jobs(self, queue, numthread): def _run_job(current_job, retrials=1): while retrials >= 0: try: Log.info('Starting {0}', current_job) time = datetime.datetime.now() current_job.compute() time = datetime.datetime.now() - time if type(current_job) in self.time[numthread]: self.time[numthread][type(current_job)] += time else: self.time[numthread][type(current_job)] = time Log.result('Finished {0}', current_job) return True except Exception as ex: retrials -= 1 Log.error('Job {0} failed: {1}', job, ex) return False count = 0 failed_jobs = list() while not queue.empty(): try: job = queue.get(timeout=1) if _run_job(job): count += 1 else: failed_jobs.append(str(job)) queue.task_done() except Queue.Empty: continue if len(failed_jobs) == 0: Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count) else: Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count, count + len(failed_jobs)) self._failed_jobs += failed_jobs return def _prepare_mesh_files(self): Log.info('Copying mesh files') con_files = self.config.con_files model_version = self.config.experiment.model_version restore_meshes = self.config.restore_meshes mesh_mask = 'mesh_mask_nemo.{0}.nc'.format(model_version) new_mask_glo = 'new_maskglo.{0}.nc'.format(model_version) mask_regions = 'mask.regions.{0}.nc'.format(model_version) mask_regions_3d = 'mask.regions.3d.{0}.nc'.format(model_version) if self.config.mesh_mask: mesh_mask_path = self.config.mesh_mask else: mesh_mask_path = os.path.join(con_files, mesh_mask) if self.config.new_mask_glo: new_mask_glo_path = self.config.new_mask_glo else: new_mask_glo_path = os.path.join(con_files, new_mask_glo) if self.config.mask_regions: mask_regions_path = self.config.mask_regions else: mask_regions_path = os.path.join(con_files, mask_regions) if self.config.mask_regions_3d: mask_regions_3d_path = self.config.mask_regions_3d else: mask_regions_3d_path = os.path.join(con_files, mask_regions_3d) if self.config.scratch_masks: Utils.create_folder_tree(self.config.scratch_masks) Utils.give_group_write_permissions(self.config.scratch_masks) mesh_mask_scratch_path = os.path.join(self.config.scratch_masks, mesh_mask) if self._copy_file(mesh_mask_path, mesh_mask_scratch_path, restore_meshes): Utils.give_group_write_permissions(mesh_mask_scratch_path) self._link_file(mesh_mask_scratch_path, 'mesh_hgr.nc') self._link_file(mesh_mask_scratch_path, 'mesh_zgr.nc') self._link_file(mesh_mask_scratch_path, 'mask.nc') new_maskglo_scratch_path = os.path.join(self.config.scratch_masks, new_mask_glo) if self._copy_file(new_mask_glo_path, new_maskglo_scratch_path, restore_meshes): Utils.give_group_write_permissions(new_maskglo_scratch_path) self._link_file(new_maskglo_scratch_path, 'new_maskglo.nc') mask_regions_scratch_path = os.path.join(self.config.scratch_masks, mask_regions) if self._copy_file(mask_regions_path, mask_regions_scratch_path, restore_meshes): Utils.give_group_write_permissions(mask_regions_scratch_path) self._link_file(mask_regions_scratch_path, 'mask_regions.nc') mask_regions3d_scratch_path = os.path.join(self.config.scratch_masks, mask_regions_3d) if self._copy_file(mask_regions_3d_path, mask_regions3d_scratch_path, restore_meshes): Utils.give_group_write_permissions(mask_regions3d_scratch_path) self._link_file(mask_regions3d_scratch_path, 'mask_regions.3d.nc') else: self._copy_file(mesh_mask_path, 'mesh_hgr.nc', restore_meshes) self._link_file('mesh_hgr.nc', 'mesh_zgr.nc') self._link_file('mesh_hgr.nc', 'mask.nc') self._copy_file(new_mask_glo_path, 'new_maskglo.nc', restore_meshes) self._copy_file(mask_regions_path, 'mask_regions.nc', restore_meshes) self._copy_file(mask_regions_3d_path, 'mask_regions.3d.nc', restore_meshes) Log.result('Mesh files ready!') def _copy_file(self, source, destiny, force): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return False if not force and os.path.exists(destiny): if os.stat(source).st_size == os.stat(destiny).st_size: Log.info('File {0} already exists', destiny) return True Log.info('Creating file {0}', destiny) shutil.copy(source, destiny) Log.info('File {0} ready', destiny) Utils.rename_variables(destiny, self.dic_variables, False, True) return True def _link_file(self, source, destiny): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return if os.path.lexists(destiny): try: os.remove(destiny) except OSError: pass os.symlink(source, destiny) Log.info('File {0} ready', destiny) def main(): if not EarthDiags.parse_args(): exit(1) if __name__ == "__main__": main()