#!/usr/bin/env python # coding=utf-8 """Entry point for EarthDiagnostics""" import argparse import os import sys import shutil import tempfile from datetime import datetime import netCDF4 import pkg_resources import iris import bscearth.utils.path from bscearth.utils.log import Log from earthdiagnostics import cdftools from earthdiagnostics.cmormanager import CMORManager from earthdiagnostics.config import Config from earthdiagnostics.constants import Basins from earthdiagnostics.obsreconmanager import ObsReconManager from earthdiagnostics.threddsmanager import THREDDSManager from earthdiagnostics.utils import TempFile, Utils from earthdiagnostics.work_manager import WorkManager class EarthDiags(object): """ Launcher class for the diagnostics Parameters ---------- config_file: str """ # Get the version number from the relevant file. # If not, from earthdiagnostics package scriptdir = os.path.abspath(os.path.dirname(__file__)) if not os.path.exists(os.path.join(scriptdir, "VERSION")): scriptdir = os.path.join(scriptdir, os.path.pardir) version_path = os.path.join(scriptdir, "VERSION") readme_path = os.path.join(scriptdir, "README") changes_path = os.path.join(scriptdir, "CHANGELOG") if os.path.isfile(version_path): with open(version_path) as f: version = f.read().strip() else: version = pkg_resources.require("earthdiagnostics")[0].version def __init__(self): self.time = dict() self.data_manager = None self.threads = None self.had_errors = False self.config = Config() def read_config(self, config_file): """ Read config file and initialize earthdiagnostics Parameters ---------- config_file Returns ------- """ Log.info( "Initialising Earth Diagnostics Version {0}", EarthDiags.version ) self.config.parse(config_file) os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" TempFile.scratch_folder = self.config.scratch_dir cdftools.path = self.config.cdftools_path self._create_dic_variables() Log.debug("Diags ready") Log.info( "Running diags for experiment {0}, startdates {1}, members {2}", self.config.experiment.expid, self.config.experiment.startdates, self.config.experiment.members, ) @staticmethod def parse_args(args): """ Entry point for the Earth Diagnostics. For more detailed documentation, use -h option """ # try: parser = argparse.ArgumentParser( description="Main executable for Earth Diagnostics." ) parser.add_argument( "-v", "--version", action="version", version=EarthDiags.version, help="returns Earth Diagnostics's version number and exit", ) parser.add_argument( "--doc", action="store_true", help="opens documentation and exits" ) parser.add_argument( "--clean", action="store_true", help="clean the scratch folder and exits", ) parser.add_argument( "--report", action="store_true", help="generates a report about the available files", ) parser.add_argument( "-lf", "--logfile", choices=( "EVERYTHING", "DEBUG", "INFO", "RESULT", "USER_WARNING", "WARNING", "ERROR", "CRITICAL", "NO_LOG", ), default="DEBUG", type=str, help="sets file's log level.", ) parser.add_argument( "-lc", "--logconsole", choices=( "EVERYTHING", "DEBUG", "INFO", "RESULT", "USER_WARNING", "WARNING", "ERROR", "CRITICAL", "NO_LOG", ), default="INFO", type=str, help="sets console's log level", ) parser.add_argument("-log", "--logfilepath", default=None, type=str) parser.add_argument( "-f", "--configfile", default="diags.conf", type=str ) args = parser.parse_args(args) if args.doc: return EarthDiags.open_documentation() Log.set_console_level(args.logconsole) Log.set_file_level(args.logfile) if args.logfilepath: Log.set_file(bscearth.utils.path.expand_path(args.logfilepath)) try: diags = EarthDiags() diags.read_config(args.configfile) if args.clean: result = diags.clean() elif args.report: result = diags.report() else: result = diags.run() except Exception: raise finally: TempFile.clean() return result @staticmethod def open_documentation(): """ Open Earthdiagnostics online documentation Returns ------- bool: True if successful """ Log.info("Opening documentation...") doc_path = os.path.join( "http://earthdiagnostics.readthedocs.io/en/latest" ) Utils.execute_shell_command(("xdg-open", doc_path)) Log.result("Documentation opened!") return True def _create_dic_variables(self): self.dic_variables = dict() self.dic_variables["x"] = "i" self.dic_variables["y"] = "j" self.dic_variables["z"] = "lev" self.dic_variables["nav_lon"] = self.config.data_convention.lon_name self.dic_variables["nav_lat"] = self.config.data_convention.lat_name self.dic_variables["nav_lev"] = "lev" self.dic_variables["time_counter"] = "time" self.dic_variables["t"] = "time" def run(self): """ Run the diagnostics Returns ------- bool """ start = datetime.now() self.had_errors = False Log.debug("Using netCDF version {0}", netCDF4.getlibversion()) self._prepare_scratch_dir() self._prepare_mesh_files() self._initialize_basins() Log.info("Time to prepare: {}", datetime.now() - start) self._prepare_data_manager() # Run diagnostics Log.info("Running diagnostics") work_manager = WorkManager(self.config, self.data_manager) work_manager.prepare_job_list() result = work_manager.run() if self.config.auto_clean: self._remove_scratch_dir() return result def _initialize_basins(self): self._read_basins_from_file("basins.nc") @staticmethod def _read_basins_from_file(filename): if not os.path.isfile(filename): return Basins().get_available_basins(iris.load_cube(filename)) def _prepare_scratch_dir(self): if self.config.use_ramdisk: self._remove_scratch_dir() tempfile.mkdtemp(dir="/dev/shm") os.symlink( tempfile.mkdtemp(dir="/dev/shm"), self.config.scratch_dir ) else: if not os.path.exists(self.config.scratch_dir): os.makedirs(self.config.scratch_dir) os.chdir(self.config.scratch_dir) def _prepare_data_manager(self): if self.config.data_adaptor == "CMOR": self.data_manager = CMORManager(self.config) elif self.config.data_adaptor == "THREDDS": self.data_manager = THREDDSManager(self.config) elif self.config.data_adaptor == "OBSRECON": self.data_manager = ObsReconManager(self.config) self.data_manager.prepare() def clean(self): """ Clean scratch folder Returns ------- bool """ Log.info("Removing scratch folder...") self._remove_scratch_dir() Log.result("Scratch folder removed") return True def _remove_scratch_dir(self): if os.path.islink(self.config.scratch_dir): # time.sleep(4) # shutil.rmtree(os.path.realpath(self.config.scratch_dir)) Utils.execute_shell_command( "rm -r {0}".format(os.path.realpath(self.config.scratch_dir)) ) os.remove(self.config.scratch_dir) elif os.path.isdir(self.config.scratch_dir): # time.sleep(4) # shutil.rmtree(self.config.scratch_dir) Utils.execute_shell_command( "rm -r {0}".format(self.config.scratch_dir) ) def report(self): """ Create a report of missing variables for a given experiment Returns ------- bool """ Log.info("Looking for existing vars...") self._prepare_data_manager() base_folder = self.config.report.path if not base_folder: base_folder = self.config.scratch_dir Utils.create_folder_tree(base_folder) for startdate in self.config.experiment.startdates: for member in self.config.experiment.members: results = self._get_variable_report(startdate, member) report_path = os.path.join( base_folder, "{0}_{1}.report".format( startdate, self.config.experiment.get_member_str(member), ), ) self._create_report(report_path, results) Log.result("Report finished") return True def _get_variable_report(self, startdate, member): var_manager = self.config.var_manager results = list() for var in var_manager.get_all_variables(): if var.domain is None: continue for table, priority in var.tables: if ( priority is None or priority > self.config.report.maximum_priority ): continue if not self.data_manager.file_exists( var.domain, var.short_name, startdate, member, 1, frequency=table.frequency, ): results.append((var, table, priority)) Log.debug( "Variable {0.short_name} not found in {1.name}", var, table, ) else: Log.result( "Variable {0.short_name} found in {1.name}", var, table ) return results @staticmethod def _create_report(report_path, results): tables = set([result[1].name for result in results]) for table in tables: file_handler = open("{0}.{1}".format(report_path, table), "w") table_results = [ result for result in results if result[1].name == table ] file_handler.write("\nTable {0}\n".format(table)) file_handler.write("===================================\n") priorities = set([result[2] for result in table_results]) priorities = sorted(priorities) for priority in priorities: priority_results = [ result[0] for result in table_results if result[2] == priority ] priority_results = sorted( priority_results, key=lambda v: v.short_name ) file_handler.write( "\nMissing variables with priority {0}:\n".format(priority) ) file_handler.write("--------------------------------------\n") for var in priority_results: file_handler.write( "{0:12}: {1}\n".format( var.short_name, var.standard_name ) ) file_handler.flush() file_handler.close() def _prepare_mesh_files(self): model_version = self.config.experiment.model_version if not model_version: Log.info("No model version defined. Skipping mesh files copy!") return Log.info("Copying mesh files") con_files = self.config.con_files model_version = self.config.experiment.model_version restore_meshes = self.config.restore_meshes mesh_mask = "mesh_mask_nemo.{0}.nc".format(model_version) new_mask_glo = "new_maskglo.{0}.nc".format(model_version) basins = "basins.{0}.nc".format(model_version) if self.config.mesh_mask: mesh_mask_path = self.config.mesh_mask else: mesh_mask_path = os.path.join(con_files, mesh_mask) if self.config.new_mask_glo: new_mask_glo_path = self.config.new_mask_glo else: new_mask_glo_path = os.path.join(con_files, new_mask_glo) if self.config.basins: basins_path = self.config.basins else: basins_path = os.path.join(con_files, basins) if self.config.scratch_masks: self._prepare_mesh_using_scratch( basins, basins_path, mesh_mask, mesh_mask_path, new_mask_glo, new_mask_glo_path, restore_meshes, ) else: self._copy_file(mesh_mask_path, "mesh_hgr.nc", restore_meshes) self._link_file("mesh_hgr.nc", "mesh_zgr.nc") self._link_file("mesh_hgr.nc", "mask.nc") self._copy_file( new_mask_glo_path, "new_maskglo.nc", restore_meshes ) self._copy_file(basins_path, "basins.nc", restore_meshes) Log.result("Mesh files ready!") def _prepare_mesh_using_scratch( self, basins, basins_path, mesh_mask, mesh_mask_path, new_mask_glo, new_mask_glo_path, restore_meshes, ): Utils.create_folder_tree(self.config.scratch_masks) Utils.give_group_write_permissions(self.config.scratch_masks) mesh_mask_scratch_path = os.path.join( self.config.scratch_masks, mesh_mask ) if self._copy_file( mesh_mask_path, mesh_mask_scratch_path, restore_meshes ): Utils.give_group_write_permissions(mesh_mask_scratch_path) self._link_file(mesh_mask_scratch_path, "mesh_hgr.nc") self._link_file(mesh_mask_scratch_path, "mesh_zgr.nc") self._link_file(mesh_mask_scratch_path, "mask.nc") new_maskglo_scratch_path = os.path.join( self.config.scratch_masks, new_mask_glo ) if self._copy_file( new_mask_glo_path, new_maskglo_scratch_path, restore_meshes ): Utils.give_group_write_permissions(new_maskglo_scratch_path) self._link_file(new_maskglo_scratch_path, "new_maskglo.nc") basins_scratch_path = os.path.join(self.config.scratch_masks, basins) if self._copy_file(basins_path, basins_scratch_path, restore_meshes): Utils.give_group_write_permissions(basins_path) self._link_file(basins_scratch_path, "basins.nc") def _copy_file(self, source, destiny, force): if not os.path.exists(source): Log.user_warning( "File {0} is not available for {1}", destiny, self.config.experiment.model_version, ) Log.debug("Looking for it in {0}", source) return False if not force and os.path.exists(destiny): # Small size differences can be due to the renaming of variables reference_size = os.stat(source).st_size delta_size = abs(reference_size - os.stat(destiny).st_size) if ( delta_size < 2048 or delta_size / reference_size < 1 / 1000 or True ): Log.info("File {0} already exists", destiny) return True Log.info("Copying file {0}", os.path.abspath(destiny)) shutil.copyfile(source, destiny) Log.info("File {0} ready", destiny) Utils.rename_variables(destiny, self.dic_variables, False) return True def _link_file(self, source, destiny): if not os.path.exists(source): Log.user_warning( "File {0} is not available for {1}", destiny, self.config.experiment.model_version, ) return if os.path.lexists(destiny): try: os.remove(destiny) except OSError as ex: if ex.errno == 13: # Permission denied Log.info("Link already created") return os.symlink(source, destiny) Log.info("File {0} ready", destiny) def main(): """Main for earthdiagnostics""" if not EarthDiags.parse_args(sys.argv[1:]): exit(1) if __name__ == "__main__": main()