#!/usr/bin/env python # coding=utf-8 """Entry point for EarthDiagnostics""" import argparse import os import shutil import tempfile from distutils.spawn import find_executable import bscearth.utils.path import netCDF4 import pkg_resources from bscearth.utils.log import Log from earthdiagnostics import cdftools from earthdiagnostics.cmormanager import CMORManager from earthdiagnostics.config import Config from earthdiagnostics.constants import Basins from earthdiagnostics.obsreconmanager import ObsReconManager from earthdiagnostics.threddsmanager import THREDDSManager from earthdiagnostics.utils import TempFile, Utils from earthdiagnostics.work_manager import WorkManager class EarthDiags(object): """ Launcher class for the diagnostics Parameters ---------- config_file: str """ # Get the version number from the relevant file. If not, from autosubmit package scriptdir = os.path.abspath(os.path.dirname(__file__)) if not os.path.exists(os.path.join(scriptdir, 'VERSION')): scriptdir = os.path.join(scriptdir, os.path.pardir) version_path = os.path.join(scriptdir, 'VERSION') readme_path = os.path.join(scriptdir, 'README') changes_path = os.path.join(scriptdir, 'CHANGELOG') if os.path.isfile(version_path): with open(version_path) as f: version = f.read().strip() else: version = pkg_resources.require("earthdiagnostics")[0].version def __init__(self): self.time = dict() self.data_manager = None self.threads = None self.had_errors = False self.config = Config() def read_config(self, config_file): """ Read config file and initialize earthdiagnostics Parameters ---------- config_file Returns ------- """ Log.info('Initialising Earth Diagnostics Version {0}', EarthDiags.version) self.config.parse(config_file) os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE' TempFile.scratch_folder = self.config.scratch_dir cdftools.path = self.config.cdftools_path self._create_dic_variables() Log.debug('Diags ready') Log.info('Running diags for experiment {0}, startdates {1}, members {2}', self.config.experiment.expid, self.config.experiment.startdates, self.config.experiment.members) @staticmethod def parse_args(): """ Entry point for the Earth Diagnostics. For more detailed documentation, use -h option """ # try: parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.') parser.add_argument('-v', '--version', action='version', version=EarthDiags.version, help="returns Earth Diagnostics's version number and exit") parser.add_argument('--doc', action='store_true', help="opens documentation and exits") parser.add_argument('--clean', action='store_true', help="clean the scratch folder and exits") parser.add_argument('--report', action='store_true', help="generates a report about the available files") parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='DEBUG', type=str, help="sets file's log level.") parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='INFO', type=str, help="sets console's log level") parser.add_argument('-log', '--logfilepath', default=None, type=str) parser.add_argument('-f', '--configfile', default='diags.conf', type=str) args = parser.parse_args() if args.doc: Log.info('Opening documentation...') doc_path = os.path.join('http://earthdiagnostics.readthedocs.io/en/latest') Utils.execute_shell_command(('xdg-open', doc_path)) Log.result('Documentation opened!') return True Log.set_console_level(args.logconsole) Log.set_file_level(args.logfile) if Log.console_handler.level <= Log.DEBUG: Utils.cdo.debug = True Utils.nco.debug = True Utils.cdo.CDO = find_executable('cdo') if args.logfilepath: Log.set_file(bscearth.utils.path.expand_path(args.logfilepath)) config_file_path = bscearth.utils.path.expand_path(args.configfile) if not os.path.isfile(config_file_path): Log.critical('Configuration file {0} can not be found', config_file_path) return False try: diags = EarthDiags() diags.read_config(config_file_path) if args.clean: result = diags.clean() elif args.report: result = diags.report() else: result = diags.run() except Exception: raise finally: TempFile.clean() return result def _create_dic_variables(self): self.dic_variables = dict() self.dic_variables['x'] = 'i' self.dic_variables['y'] = 'j' self.dic_variables['z'] = 'lev' if self.config.data_convention.lower() in ['primavera', 'cmip6']: self.dic_variables['nav_lon'] = 'longitude' self.dic_variables['nav_lat'] = 'latitude' else: self.dic_variables['nav_lon'] = 'lon' self.dic_variables['nav_lat'] = 'lat' self.dic_variables['nav_lev'] = 'lev' self.dic_variables['time_counter'] = 'time' self.dic_variables['t'] = 'time' def run(self): """ Run the diagnostics Returns ------- bool """ self.had_errors = False Log.debug('Using netCDF version {0}', netCDF4.getlibversion()) self._prepare_scratch_dir() self._prepare_mesh_files() self._initialize_basins() self._prepare_data_manager() # Run diagnostics Log.info('Running diagnostics') work_manager = WorkManager(self.config, self.data_manager) work_manager.prepare_job_list() result = work_manager.run() if self.config.auto_clean: self._remove_scratch_dir() return result def _initialize_basins(self): self._read_basins_from_file('mask_regions.nc') self._read_basins_from_file('mask_regions.3d.nc') @staticmethod def _read_basins_from_file(filename): if not os.path.isfile(filename): return handler = Utils.open_cdf(filename) Basins().get_available_basins(handler) handler.close() def _prepare_scratch_dir(self): if self.config.use_ramdisk: self._remove_scratch_dir() tempfile.mkdtemp(dir='/dev/shm') os.symlink(tempfile.mkdtemp(dir='/dev/shm'), self.config.scratch_dir) else: if not os.path.exists(self.config.scratch_dir): os.makedirs(self.config.scratch_dir) os.chdir(self.config.scratch_dir) def _prepare_data_manager(self): if self.config.data_adaptor == 'CMOR': self.data_manager = CMORManager(self.config) elif self.config.data_adaptor == 'THREDDS': self.data_manager = THREDDSManager(self.config) elif self.config.data_adaptor == 'OBSRECON': self.data_manager = ObsReconManager(self.config) self.data_manager.prepare() def clean(self): """ Clean scratch folder Returns ------- bool """ Log.info('Removing scratch folder...') self._remove_scratch_dir() Log.result('Scratch folder removed') return True def _remove_scratch_dir(self): if os.path.islink(self.config.scratch_dir): shutil.rmtree(os.path.realpath(self.config.scratch_dir)) os.remove(self.config.scratch_dir) elif os.path.isdir(self.config.scratch_dir): shutil.rmtree(self.config.scratch_dir) def report(self): """ Create a report of missing variables for a given experiment Returns ------- bool """ Log.info('Looking for existing vars...') self._prepare_data_manager() base_folder = self.config.report.path if not base_folder: base_folder = self.config.scratch_dir Utils.create_folder_tree(base_folder) for startdate in self.config.experiment.startdates: for member in self.config.experiment.members: results = self._get_variable_report(startdate, member) report_path = os.path.join(base_folder, '{0}_{1}.report'.format(startdate, self.config.experiment.get_member_str(member))) self._create_report(report_path, results) Log.result('Report finished') return True def _get_variable_report(self, startdate, member): var_manager = self.config.var_manager results = list() for var in var_manager.get_all_variables(): if var.domain is None: continue for table, priority in var.tables: if priority is None or priority > self.config.report.maximum_priority: continue if not self.data_manager.file_exists(var.domain, var.short_name, startdate, member, 1, frequency=table.frequency): results.append((var, table, priority)) Log.debug('Variable {0.short_name} not found in {1.name}', var, table) else: Log.result('Variable {0.short_name} found in {1.name}', var, table) return results @staticmethod def _create_report(report_path, results): tables = set([result[1].name for result in results]) for table in tables: file_handler = open('{0}.{1}'.format(report_path, table), 'w') table_results = [result for result in results if result[1].name == table] file_handler.write('\nTable {0}\n'.format(table)) file_handler.write('===================================\n') priorities = set([result[2] for result in table_results]) priorities = sorted(priorities) for priority in priorities: priority_results = [result[0] for result in table_results if result[2] == priority] priority_results = sorted(priority_results, key=lambda v: v.short_name) file_handler.write('\nMissing variables with priority {0}:\n'.format(priority)) file_handler.write('--------------------------------------\n') for var in priority_results: file_handler.write('{0:12}: {1}\n'.format(var.short_name, var.standard_name)) file_handler.flush() file_handler.close() def _prepare_mesh_files(self): model_version = self.config.experiment.model_version if not model_version: Log.info('No model version defined. Skipping mesh files copy!') return Log.info('Copying mesh files') con_files = self.config.con_files model_version = self.config.experiment.model_version restore_meshes = self.config.restore_meshes mesh_mask = 'mesh_mask_nemo.{0}.nc'.format(model_version) new_mask_glo = 'new_maskglo.{0}.nc'.format(model_version) mask_regions = 'mask.regions.{0}.nc'.format(model_version) mask_regions_3d = 'mask.regions.3d.{0}.nc'.format(model_version) if self.config.mesh_mask: mesh_mask_path = self.config.mesh_mask else: mesh_mask_path = os.path.join(con_files, mesh_mask) if self.config.new_mask_glo: new_mask_glo_path = self.config.new_mask_glo else: new_mask_glo_path = os.path.join(con_files, new_mask_glo) if self.config.mask_regions: mask_regions_path = self.config.mask_regions else: mask_regions_path = os.path.join(con_files, mask_regions) if self.config.mask_regions_3d: mask_regions_3d_path = self.config.mask_regions_3d else: mask_regions_3d_path = os.path.join(con_files, mask_regions_3d) if self.config.scratch_masks: self._prepare_mesh_using_scratch(mask_regions, mask_regions_3d, mask_regions_3d_path, mask_regions_path, mesh_mask, mesh_mask_path, new_mask_glo, new_mask_glo_path, restore_meshes) else: self._copy_file(mesh_mask_path, 'mesh_hgr.nc', restore_meshes) self._link_file('mesh_hgr.nc', 'mesh_zgr.nc') self._link_file('mesh_hgr.nc', 'mask.nc') self._copy_file(new_mask_glo_path, 'new_maskglo.nc', restore_meshes) self._copy_file(mask_regions_path, 'mask_regions.nc', restore_meshes) self._copy_file(mask_regions_3d_path, 'mask_regions.3d.nc', restore_meshes) Log.result('Mesh files ready!') def _prepare_mesh_using_scratch(self, mask_regions, mask_regions_3d, mask_regions_3d_path, mask_regions_path, mesh_mask, mesh_mask_path, new_mask_glo, new_mask_glo_path, restore_meshes): Utils.create_folder_tree(self.config.scratch_masks) Utils.give_group_write_permissions(self.config.scratch_masks) mesh_mask_scratch_path = os.path.join(self.config.scratch_masks, mesh_mask) if self._copy_file(mesh_mask_path, mesh_mask_scratch_path, restore_meshes): Utils.give_group_write_permissions(mesh_mask_scratch_path) self._link_file(mesh_mask_scratch_path, 'mesh_hgr.nc') self._link_file(mesh_mask_scratch_path, 'mesh_zgr.nc') self._link_file(mesh_mask_scratch_path, 'mask.nc') new_maskglo_scratch_path = os.path.join(self.config.scratch_masks, new_mask_glo) if self._copy_file(new_mask_glo_path, new_maskglo_scratch_path, restore_meshes): Utils.give_group_write_permissions(new_maskglo_scratch_path) self._link_file(new_maskglo_scratch_path, 'new_maskglo.nc') mask_regions_scratch_path = os.path.join(self.config.scratch_masks, mask_regions) if self._copy_file(mask_regions_path, mask_regions_scratch_path, restore_meshes): Utils.give_group_write_permissions(mask_regions_scratch_path) self._link_file(mask_regions_scratch_path, 'mask_regions.nc') mask_regions3d_scratch_path = os.path.join(self.config.scratch_masks, mask_regions_3d) if self._copy_file(mask_regions_3d_path, mask_regions3d_scratch_path, restore_meshes): Utils.give_group_write_permissions(mask_regions3d_scratch_path) self._link_file(mask_regions3d_scratch_path, 'mask_regions.3d.nc') def _copy_file(self, source, destiny, force): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return False if not force and os.path.exists(destiny): # Small size differences can be due to the renaming of variables reference_size = os.stat(source).st_size delta_size = abs(reference_size - os.stat(destiny).st_size) if delta_size < 2048 or delta_size / reference_size < 1 / 1000: Log.info('File {0} already exists', destiny) return True Log.info('Copying file {0}', destiny) shutil.copyfile(source, destiny) Log.info('File {0} ready', destiny) Utils.rename_variables(destiny, self.dic_variables, False, True) return True def _link_file(self, source, destiny): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version) return if os.path.lexists(destiny): try: os.remove(destiny) except OSError as ex: if ex.errno == 13: # Permission denied Log.info('Link already created') return os.symlink(source, destiny) Log.info('File {0} ready', destiny) def main(): """Main for earthdiagnostics""" if not EarthDiags.parse_args(): exit(1) if __name__ == "__main__": main()