#!/usr/bin/env python # coding=utf-8 import argparse import shutil import pkg_resources import netCDF4 import os from bscearth.utils.date import * import bscearth.utils.path import tempfile from earthdiagnostics.constants import Basins from earthdiagnostics.config import Config from earthdiagnostics.cmormanager import CMORManager from earthdiagnostics.threddsmanager import THREDDSManager from earthdiagnostics.obsreconmanager import ObsReconManager from earthdiagnostics import cdftools from earthdiagnostics.utils import TempFile, Utils from earthdiagnostics.variable import VariableManager from work_manager import WorkManager class EarthDiags(object): """ Launcher class for the diagnostics :param config_file: path to the configuration file :type config_file: str """ # Get the version number from the relevant file. If not, from autosubmit package scriptdir = os.path.abspath(os.path.dirname(__file__)) if not os.path.exists(os.path.join(scriptdir, 'VERSION')): scriptdir = os.path.join(scriptdir, os.path.pardir) version_path = os.path.join(scriptdir, 'VERSION') readme_path = os.path.join(scriptdir, 'README') changes_path = os.path.join(scriptdir, 'CHANGELOG') documentation_path = os.path.join(scriptdir, 'EarthDiagnostics.pdf') if os.path.isfile(version_path): with open(version_path) as f: version = f.read().strip() else: version = pkg_resources.require("earthdiagnostics")[0].version def __init__(self, config_file): Log.info('Initialising Earth Diagnostics Version {0}', EarthDiags.version) self.config = Config(config_file) TempFile.scratch_folder = self.config.scratch_dir cdftools.path = self.config.cdftools_path self._create_dic_variables() self.time = dict() self.data_manager = None self.threads = None self.had_errors = False Log.debug('Diags ready') Log.info('Running diags for experiment {0}, startdates {1}, members {2}', self.config.experiment.expid, self.config.experiment.startdates, self.config.experiment.members) @staticmethod def parse_args(): """ Entry point for the Earth Diagnostics. For more detailed documentation, use -h option """ # try: parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.') parser.add_argument('-v', '--version', action='version', version=EarthDiags.version, help="returns Earth Diagnostics's version number and exit") parser.add_argument('--doc', action='store_true', help="opens documentation and exits") parser.add_argument('--clean', action='store_true', help="clean the scratch folder and exits") parser.add_argument('--report', action='store_true', help="generates a report about the available files") parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='DEBUG', type=str, help="sets file's log level.") parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='INFO', type=str, help="sets console's log level") parser.add_argument('-log', '--logfilepath', default=None, type=str) parser.add_argument('-f', '--configfile', default='diags.conf', type=str) args = parser.parse_args() if args.doc: Log.info('Opening documentation...') doc_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'EarthDiagnostics.pdf') Utils.execute_shell_command(('xdg-open', doc_path)) Log.result('Documentation opened!') return True Log.set_console_level(args.logconsole) Log.set_file_level(args.logfile) if Log.console_handler.level <= Log.DEBUG: Utils.cdo.debug = True Utils.nco.debug = False # This is due to a bug in nco. Must change when it's solved if args.logfilepath: Log.set_file(bscearth.utils.path.expand_path(args.logfilepath)) config_file_path = bscearth.utils.path.expand_path(args.configfile) if not os.path.isfile(config_file_path): Log.critical('Configuration file {0} can not be found', config_file_path) return False try: diags = EarthDiags(config_file_path) if args.clean: result = diags.clean() elif args.report: result = diags.report() else: result = diags.run() except Exception: raise finally: TempFile.clean() return result def _create_dic_variables(self): self.dic_variables = dict() self.dic_variables['x'] = 'i' self.dic_variables['y'] = 'j' self.dic_variables['z'] = 'lev' self.dic_variables['nav_lon'] = 'lon' self.dic_variables['nav_lat'] = 'lat' self.dic_variables['nav_lev'] = 'lev' self.dic_variables['time_counter'] = 'time' self.dic_variables['t'] = 'time' def run(self): """ Run the diagnostics """ self.had_errors = False Log.debug('Using netCDF version {0}', netCDF4.getlibversion()) self._prepare_scratch_dir() self._prepare_mesh_files() self._initialize_basins() self._prepare_data_manager() # Run diagnostics Log.info('Running diagnostics') work_manager = WorkManager(self.config, self.data_manager) work_manager.prepare_job_list() result = work_manager.run() if self.config.auto_clean: self._remove_scratch_dir() return result def _initialize_basins(self): self._read_basins_from_file('mask_regions.nc') self._read_basins_from_file('mask_regions.3d.nc') @staticmethod def _read_basins_from_file(filename): if not os.path.isfile(filename): return handler = Utils.openCdf(filename) Basins().get_available_basins(handler) handler.close() def _prepare_scratch_dir(self): if self.config.use_ramdisk: self._remove_scratch_dir() tempfile.mkdtemp(dir='/dev/shm') os.symlink(tempfile.mkdtemp(dir='/dev/shm'), self.config.scratch_dir) else: if not os.path.exists(self.config.scratch_dir): os.makedirs(self.config.scratch_dir) os.chdir(self.config.scratch_dir) def _prepare_data_manager(self): if self.config.data_adaptor == 'CMOR': self.data_manager = CMORManager(self.config) elif self.config.data_adaptor == 'THREDDS': self.data_manager = THREDDSManager(self.config) elif self.config.data_adaptor == 'OBSRECON': self.data_manager = ObsReconManager(self.config) self.data_manager.prepare() def clean(self): Log.info('Removing scratch folder...') self._remove_scratch_dir() Log.result('Scratch folder removed') return True def _remove_scratch_dir(self): if os.path.islink(self.config.scratch_dir): shutil.rmtree(os.path.realpath(self.config.scratch_dir)) os.remove(self.config.scratch_dir) elif os.path.isdir(self.config.scratch_dir): shutil.rmtree(self.config.scratch_dir) def report(self): Log.info('Looking for existing vars...') self._prepare_data_manager() base_folder = self.config.report.path if not base_folder: base_folder = self.config.scratch_dir Utils.create_folder_tree(base_folder) for startdate in self.config.experiment.startdates: for member in self.config.experiment.members: results = self._get_variable_report(startdate, member) report_path = os.path.join(base_folder, '{0}_{1}.report'.format(startdate, self.config.experiment.get_member_str(member))) self.create_report(report_path, results) Log.result('Report finished') return True def _get_variable_report(self, startdate, member): var_manager = VariableManager() results = list() for var in var_manager.get_all_variables(): if var.domain is None: continue for table, priority in var.tables: if priority is None or priority > self.config.report.maximum_priority: continue if not self.data_manager.file_exists(var.domain, var.short_name, startdate, member, 1, frequency=table.frequency): results.append((var, table, priority)) Log.debug('Variable {0.short_name} not found in {1.name}', var, table) else: Log.result('Variable {0.short_name} found in {1.name}', var, table) return results @staticmethod def create_report(report_path, results): tables = set([result[1].name for result in results]) for table in tables: file_handler = open('{0}.{1}'.format(report_path, table), 'w') table_results = [result for result in results if result[1].name == table] file_handler.write('\nTable {0}\n'.format(table)) file_handler.write('===================================\n') priorities = set([result[2] for result in table_results]) priorities = sorted(priorities) for priority in priorities: priority_results = [result[0] for result in table_results if result[2] == priority] priority_results = sorted(priority_results, key=lambda v: v.short_name) file_handler.write('\nMissing variables with priority {0}:\n'.format(priority)) file_handler.write('--------------------------------------\n') for var in priority_results: file_handler.write('{0:12}: {1}\n'.format(var.short_name, var.standard_name)) file_handler.flush() file_handler.close() def _prepare_mesh_files(self): model_version = self.config.experiment.model_version if not model_version: Log.info('No model version defined. Skipping mesh files copy!') return Log.info('Copying mesh files') con_files = self.config.con_files model_version = self.config.experiment.model_version restore_meshes = self.config.restore_meshes mesh_mask = 'mesh_mask_nemo.{0}.nc'.format(model_version) new_mask_glo = 'new_maskglo.{0}.nc'.format(model_version) mask_regions = 'mask.regions.{0}.nc'.format(model_version) mask_regions_3d = 'mask.regions.3d.{0}.nc'.format(model_version) if self.config.mesh_mask: mesh_mask_path = self.config.mesh_mask else: mesh_mask_path = os.path.join(con_files, mesh_mask) if self.config.new_mask_glo: new_mask_glo_path = self.config.new_mask_glo else: new_mask_glo_path = os.path.join(con_files, new_mask_glo) if self.config.mask_regions: mask_regions_path = self.config.mask_regions else: mask_regions_path = os.path.join(con_files, mask_regions) if self.config.mask_regions_3d: mask_regions_3d_path = self.config.mask_regions_3d else: mask_regions_3d_path = os.path.join(con_files, mask_regions_3d) if self.config.scratch_masks: Utils.create_folder_tree(self.config.scratch_masks) Utils.give_group_write_permissions(self.config.scratch_masks) mesh_mask_scratch_path = os.path.join(self.config.scratch_masks, mesh_mask) if self._copy_file(mesh_mask_path, mesh_mask_scratch_path, restore_meshes): Utils.give_group_write_permissions(mesh_mask_scratch_path) self._link_file(mesh_mask_scratch_path, 'mesh_hgr.nc') self._link_file(mesh_mask_scratch_path, 'mesh_zgr.nc') self._link_file(mesh_mask_scratch_path, 'mask.nc') new_maskglo_scratch_path = os.path.join(self.config.scratch_masks, new_mask_glo) if self._copy_file(new_mask_glo_path, new_maskglo_scratch_path, restore_meshes): Utils.give_group_write_permissions(new_maskglo_scratch_path) self._link_file(new_maskglo_scratch_path, 'new_maskglo.nc') mask_regions_scratch_path = os.path.join(self.config.scratch_masks, mask_regions) if self._copy_file(mask_regions_path, mask_regions_scratch_path, restore_meshes): Utils.give_group_write_permissions(mask_regions_scratch_path) self._link_file(mask_regions_scratch_path, 'mask_regions.nc') mask_regions3d_scratch_path = os.path.join(self.config.scratch_masks, mask_regions_3d) if self._copy_file(mask_regions_3d_path, mask_regions3d_scratch_path, restore_meshes): Utils.give_group_write_permissions(mask_regions3d_scratch_path) self._link_file(mask_regions3d_scratch_path, 'mask_regions.3d.nc') else: self._copy_file(mesh_mask_path, 'mesh_hgr.nc', restore_meshes) self._link_file('mesh_hgr.nc', 'mesh_zgr.nc') self._link_file('mesh_hgr.nc', 'mask.nc') self._copy_file(new_mask_glo_path, 'new_maskglo.nc', restore_meshes) self._copy_file(mask_regions_path, 'mask_regions.nc', restore_meshes) self._copy_file(mask_regions_3d_path, 'mask_regions.3d.nc', restore_meshes) Log.result('Mesh files ready!') def _copy_file(self, source, destiny, force): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return False if not force and os.path.exists(destiny): if os.stat(source).st_size == os.stat(destiny).st_size: Log.info('File {0} already exists', destiny) return True Log.info('Copying file {0}', destiny) shutil.copy(source, destiny) Log.info('File {0} ready', destiny) Utils.rename_variables(destiny, self.dic_variables, False, True) return True def _link_file(self, source, destiny): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return if os.path.lexists(destiny): try: os.remove(destiny) except OSError: pass os.symlink(source, destiny) Log.info('File {0} ready', destiny) def main(): if not EarthDiags.parse_args(): exit(1) if __name__ == "__main__": main()