#!/usr/bin/env python # coding=utf-8 import argparse import shutil import pkg_resources import netCDF4 import os from bscearth.utils.date import * import bscearth.utils.path from earthdiagnostics.config import Config from earthdiagnostics.cmormanager import CMORManager from earthdiagnostics.threddsmanager import THREDDSManager from earthdiagnostics import cdftools from earthdiagnostics.utils import TempFile, Utils from earthdiagnostics.variable import VariableManager from work_manager import WorkManager class EarthDiags(object): """ Launcher class for the diagnostics :param config_file: path to the configuration file :type config_file: str """ # Get the version number from the relevant file. If not, from autosubmit package scriptdir = os.path.abspath(os.path.dirname(__file__)) if not os.path.exists(os.path.join(scriptdir, 'VERSION')): scriptdir = os.path.join(scriptdir, os.path.pardir) version_path = os.path.join(scriptdir, 'VERSION') readme_path = os.path.join(scriptdir, 'README') changes_path = os.path.join(scriptdir, 'CHANGELOG') documentation_path = os.path.join(scriptdir, 'EarthDiagnostics.pdf') if os.path.isfile(version_path): with open(version_path) as f: version = f.read().strip() else: version = pkg_resources.require("earthdiagnostics")[0].version def __init__(self, config_file): Log.info('Initialising Earth Diagnostics Version {0}', EarthDiags.version) self.config = Config(config_file) TempFile.scratch_folder = self.config.scratch_dir cdftools.path = self.config.cdftools_path self._create_dic_variables() self.time = dict() self.data_manager = None self.threads = None self.had_errors = False Log.debug('Diags ready') Log.info('Running diags for experiment {0}, startdates {1}, members {2}', self.config.experiment.expid, self.config.experiment.startdates, self.config.experiment.members) @staticmethod def parse_args(): """ Entry point for the Earth Diagnostics. For more detailed documentation, use -h option """ # try: parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.') parser.add_argument('-v', '--version', action='version', version=EarthDiags.version, help="returns Earth Diagnostics's version number and exit") parser.add_argument('--doc', action='store_true', help="opens documentation and exits") parser.add_argument('--clean', action='store_true', help="clean the scratch folder and exits") parser.add_argument('--report', action='store_true', help="generates a report about the available files") parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='DEBUG', type=str, help="sets file's log level.") parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='INFO', type=str, help="sets console's log level") parser.add_argument('-log', '--logfilepath', default=None, type=str) parser.add_argument('-f', '--configfile', default='diags.conf', type=str) args = parser.parse_args() if args.doc: Log.info('Opening documentation...') doc_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'EarthDiagnostics.pdf') Utils.execute_shell_command(('xdg-open', doc_path)) Log.result('Documentation opened!') return True Log.set_console_level(args.logconsole) Log.set_file_level(args.logfile) if Log.console_handler.level <= Log.DEBUG: Utils.cdo.debug = True Utils.nco.debug = False # This is due to a bug in nco. Must change when it's solved if args.logfilepath: Log.set_file(bscearth.utils.path.expand_path(args.logfilepath)) config_file_path = bscearth.utils.path.expand_path(args.configfile) if not os.path.isfile(config_file_path): Log.critical('Configuration file {0} can not be found', config_file_path) return False try: diags = EarthDiags(config_file_path) if args.clean: result = diags.clean() elif args.report: result = diags.report() else: result = diags.run() except Exception: raise finally: TempFile.clean() return result def _create_dic_variables(self): self.dic_variables = dict() self.dic_variables['x'] = 'i' self.dic_variables['y'] = 'j' self.dic_variables['z'] = 'lev' self.dic_variables['nav_lon'] = 'lon' self.dic_variables['nav_lat'] = 'lat' self.dic_variables['nav_lev'] = 'lev' self.dic_variables['time_counter'] = 'time' self.dic_variables['t'] = 'time' def run(self): """ Run the diagnostics """ self.had_errors = False Log.debug('Using netCDF version {0}', netCDF4.getlibversion()) if not os.path.exists(self.config.scratch_dir): os.makedirs(self.config.scratch_dir) os.chdir(self.config.scratch_dir) self._prepare_mesh_files() self._prepare_data_manager() # Run diagnostics Log.info('Running diagnostics') work_manager = WorkManager(self.config, self.data_manager) work_manager.prepare_job_list() return work_manager.run() def _prepare_data_manager(self): if self.config.data_adaptor == 'CMOR': self.data_manager = CMORManager(self.config) elif self.config.data_adaptor == 'THREDDS': self.data_manager = THREDDSManager(self.config) self.data_manager.prepare() def clean(self): Log.info('Removing scratch folder...') if os.path.exists(self.config.scratch_dir): shutil.rmtree(self.config.scratch_dir) Log.result('Scratch folder removed') return True def report(self): Log.info('Looking for existing vars...') self._prepare_data_manager() for startdate in self.config.experiment.startdates: for member in self.config.experiment.members: results = self._get_variable_report(startdate, member) report_path = os.path.join(self.config.scratch_dir, '{0}_{1}.report'.format(startdate, self.config.experiment.get_member_str(member))) Utils.create_folder_tree(self.config.scratch_dir) self.create_report(report_path, results) Log.result('Report finished') return True def _get_variable_report(self, startdate, member): var_manager = VariableManager() results = list() for var in var_manager.get_all_variables(): if var.priority is None or var.domain is None: continue for table in var.tables: if not self.data_manager.file_exists(var.domain, var.short_name, startdate, member, 1, frequency=table.frequency): results.append((var, table)) return results def create_report(self, report_path, results): realms = set([result[0].domain for result in results]) realms = sorted(realms) for realm in realms: file_handler = open('{0}.{1}'.format(report_path, realm), 'w') realm_results = [result for result in results if result[0].domain == realm] tables = set([result[1].name for result in realm_results]) tables = sorted(tables) for table in tables: table_results = [result for result in realm_results if result[1].name == table] file_handler.write('\nTable {0}\n'.format(table)) file_handler.write('===================================\n') priorities = set([int(result[0].priority) for result in table_results]) priorities = sorted(priorities) for priority in priorities: priority_results = [result for result in table_results if int(result[0].priority) == priority] priority_results = sorted(priority_results, key=lambda res: res[0].short_name) file_handler.write('\nMissing variables with priority {0}:\n'.format(priority)) file_handler.write('--------------------------------------\n') for var, table_name in priority_results: file_handler.write('{0:12}: {1}\n'.format(var.short_name, var.standard_name)) file_handler.close() def _prepare_mesh_files(self): Log.info('Copying mesh files') con_files = self.config.con_files model_version = self.config.experiment.model_version restore_meshes = self.config.restore_meshes mesh_mask = 'mesh_mask_nemo.{0}.nc'.format(model_version) new_mask_glo = 'new_maskglo.{0}.nc'.format(model_version) mask_regions = 'mask.regions.{0}.nc'.format(model_version) mask_regions_3d = 'mask.regions.3d.{0}.nc'.format(model_version) if self.config.scratch_masks: Utils.create_folder_tree(self.config.scratch_masks) Utils.give_group_write_permissions(self.config.scratch_masks) mesh_mask_path = os.path.join(self.config.scratch_masks, mesh_mask) if self._copy_file(os.path.join(con_files, mesh_mask), mesh_mask_path, restore_meshes): Utils.give_group_write_permissions(mesh_mask_path) self._link_file(mesh_mask_path, 'mesh_hgr.nc') self._link_file(mesh_mask_path, 'mesh_zgr.nc') self._link_file(mesh_mask_path, 'mask.nc') new_maskglo_scratch_path = os.path.join(self.config.scratch_masks, new_mask_glo) if self._copy_file(os.path.join(con_files, new_mask_glo), new_maskglo_scratch_path, restore_meshes): Utils.give_group_write_permissions(new_maskglo_scratch_path) self._link_file(new_maskglo_scratch_path, 'new_maskglo.nc') mask_regions_scratch_path = os.path.join(self.config.scratch_masks, mask_regions) if self._copy_file(os.path.join(con_files, mask_regions), mask_regions_scratch_path, restore_meshes): Utils.give_group_write_permissions(mask_regions_scratch_path) self._link_file(mask_regions_scratch_path, 'mask_regions.nc') mask_regions3d_scratch_path = os.path.join(self.config.scratch_masks, mask_regions_3d) if self._copy_file(os.path.join(con_files, mask_regions_3d), mask_regions3d_scratch_path, restore_meshes): Utils.give_group_write_permissions(mask_regions3d_scratch_path) self._link_file(mask_regions3d_scratch_path, 'mask_regions.3d.nc') else: self._copy_file(os.path.join(con_files, mesh_mask), 'mesh_hgr.nc', restore_meshes) self._link_file('mesh_hgr.nc', 'mesh_zgr.nc') self._link_file('mesh_hgr.nc', 'mask.nc') self._copy_file(os.path.join(con_files, new_mask_glo), 'new_maskglo.nc', restore_meshes) self._copy_file(os.path.join(con_files, mask_regions), 'mask_regions.nc', restore_meshes) self._copy_file(os.path.join(con_files, mask_regions_3d), 'mask_regions.3d.nc', restore_meshes) Log.result('Mesh files ready!') def _copy_file(self, source, destiny, force): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return False if not force and os.path.exists(destiny): if os.stat(source).st_size == os.stat(destiny).st_size: Log.info('File {0} already exists', destiny) return True Log.info('Copying file {0}', destiny) shutil.copy(source, destiny) Log.info('File {0} ready', destiny) Utils.rename_variables(destiny, self.dic_variables, False, True) return True def _link_file(self, source, destiny): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return if os.path.lexists(destiny): try: os.remove(destiny) except OSError: pass os.symlink(source, destiny) Log.info('File {0} ready', destiny) def main(): if not EarthDiags.parse_args(): exit(1) if __name__ == "__main__": main()