earthdiags.py 16.3 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
"""Entry point for EarthDiagnostics"""
from datetime import datetime
import iris

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from bscearth.utils.log import Log
from earthdiagnostics import cdftools
from earthdiagnostics.cmormanager import CMORManager
from earthdiagnostics.config import Config
from earthdiagnostics.constants import Basins
from earthdiagnostics.obsreconmanager import ObsReconManager
from earthdiagnostics.threddsmanager import THREDDSManager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.work_manager import WorkManager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class EarthDiags(object):
    """
    Launcher class for the diagnostics
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    Parameters
    ----------
    config_file: str
    # Get the version number from the relevant file. If not, from autosubmit package
    scriptdir = os.path.abspath(os.path.dirname(__file__))
    if not os.path.exists(os.path.join(scriptdir, 'VERSION')):
        scriptdir = os.path.join(scriptdir, os.path.pardir)

    version_path = os.path.join(scriptdir, 'VERSION')
    readme_path = os.path.join(scriptdir, 'README')
    changes_path = os.path.join(scriptdir, 'CHANGELOG')
    if os.path.isfile(version_path):
        with open(version_path) as f:
        version = pkg_resources.require("earthdiagnostics")[0].version
    def __init__(self):
        self.time = dict()
        self.data_manager = None
        self.threads = None
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.config = Config()

    def read_config(self, config_file):
        """
        Read config file and initialize earthdiagnostics

        Parameters
        ----------
        config_file

        Returns
        -------

        """
        Log.info(
            'Initialising Earth Diagnostics Version {0}', EarthDiags.version)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.config.parse(config_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
        TempFile.scratch_folder = self.config.scratch_dir
        cdftools.path = self.config.cdftools_path
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._create_dic_variables()
        Log.debug('Diags ready')
        Log.info('Running diags for experiment {0}, startdates {1}, members {2}', self.config.experiment.expid,
                 self.config.experiment.startdates, self.config.experiment.members)
    def parse_args(args):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Entry point for the Earth Diagnostics.

        For more detailed documentation, use -h option
        parser = argparse.ArgumentParser(
            description='Main executable for Earth Diagnostics.')
        parser.add_argument('-v', '--version', action='version', version=EarthDiags.version,
                            help="returns Earth Diagnostics's version number and exit")
        parser.add_argument('--doc', action='store_true',
                            help="opens documentation and exits")
        parser.add_argument('--clean', action='store_true',
                            help="clean the scratch folder and exits")
        parser.add_argument('--report', action='store_true',
                            help="generates a report about the available files")
        parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
                                                         'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
                            default='DEBUG', type=str,
                            help="sets file's log level.")
        parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
                                                            'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
                            default='INFO', type=str,
                            help="sets console's log level")

        parser.add_argument('-log', '--logfilepath', default=None, type=str)

        parser.add_argument('-f', '--configfile',
                            default='diags.conf', type=str)
        args = parser.parse_args(args)
            return EarthDiags.open_documentation()
        Log.set_console_level(args.logconsole)
        Log.set_file_level(args.logfile)
            Log.set_file(bscearth.utils.path.expand_path(args.logfilepath))
            diags = EarthDiags()
            diags.read_config(args.configfile)
            if args.clean:
                result = diags.clean()
            elif args.report:
                result = diags.report()
            else:
                result = diags.run()
        except Exception:
            raise
    @staticmethod
    def open_documentation():
        """
        Open Earthdiagnostics online documentation

        Returns
        -------
        bool:
            True if successful
        """
        Log.info('Opening documentation...')
        doc_path = os.path.join(
            'http://earthdiagnostics.readthedocs.io/en/latest')
        Utils.execute_shell_command(('xdg-open', doc_path))
        Log.result('Documentation opened!')
        return True

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _create_dic_variables(self):
        self.dic_variables = dict()
        self.dic_variables['x'] = 'i'
        self.dic_variables['y'] = 'j'
        self.dic_variables['z'] = 'lev'
        self.dic_variables['nav_lon'] = self.config.data_convention.lon_name
        self.dic_variables['nav_lat'] = self.config.data_convention.lat_name
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.dic_variables['nav_lev'] = 'lev'
        self.dic_variables['time_counter'] = 'time'
        self.dic_variables['t'] = 'time'

        """
        Run the diagnostics
        start = datetime.now()
        self.had_errors = False
        Log.debug('Using netCDF version {0}', netCDF4.getlibversion())

        self._prepare_scratch_dir()
        self._prepare_mesh_files()
        self._initialize_basins()
        Log.info('Time to prepare: {}', datetime.now() - start)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._prepare_data_manager()

        # Run diagnostics
        Log.info('Running diagnostics')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        work_manager = WorkManager(self.config, self.data_manager)
        work_manager.prepare_job_list()

        result = work_manager.run()

        if self.config.auto_clean:
            self._remove_scratch_dir()
    def _initialize_basins(self):
        self._read_basins_from_file('basins.nc')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def _read_basins_from_file(filename):
        if not os.path.isfile(filename):
            return
        Basins().get_available_basins(iris.load_cube(filename))
    def _prepare_scratch_dir(self):
        if self.config.use_ramdisk:
            self._remove_scratch_dir()
            tempfile.mkdtemp(dir='/dev/shm')
            os.symlink(tempfile.mkdtemp(dir='/dev/shm'),
                       self.config.scratch_dir)
        else:
            if not os.path.exists(self.config.scratch_dir):
                os.makedirs(self.config.scratch_dir)
        os.chdir(self.config.scratch_dir)

    def _prepare_data_manager(self):
        if self.config.data_adaptor == 'CMOR':
            self.data_manager = CMORManager(self.config)
        elif self.config.data_adaptor == 'THREDDS':
            self.data_manager = THREDDSManager(self.config)
        elif self.config.data_adaptor == 'OBSRECON':
            self.data_manager = ObsReconManager(self.config)
        self.data_manager.prepare()

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Clean scratch folder

        Returns
        -------
        bool

        """
        Log.info('Removing scratch folder...')
        self._remove_scratch_dir()
        Log.result('Scratch folder removed')
    def _remove_scratch_dir(self):
        if os.path.islink(self.config.scratch_dir):
            # time.sleep(4)
            # shutil.rmtree(os.path.realpath(self.config.scratch_dir))
            Utils.execute_shell_command(
                'rm -r {0}'.format(os.path.realpath(self.config.scratch_dir)))
            os.remove(self.config.scratch_dir)
        elif os.path.isdir(self.config.scratch_dir):
            # time.sleep(4)
            # shutil.rmtree(self.config.scratch_dir)
            Utils.execute_shell_command(
                'rm -r {0}'.format(self.config.scratch_dir))
    def report(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Create a report of missing variables for a given experiment

        Returns
        -------
        bool
        """
        Log.info('Looking for existing vars...')
        self._prepare_data_manager()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        base_folder = self.config.report.path
        if not base_folder:
            base_folder = self.config.scratch_dir
        Utils.create_folder_tree(base_folder)
        for startdate in self.config.experiment.startdates:
            for member in self.config.experiment.members:
                results = self._get_variable_report(startdate, member)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

                report_path = os.path.join(base_folder,
                                           '{0}_{1}.report'.format(startdate,
                                                                   self.config.experiment.get_member_str(member)))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                self._create_report(report_path, results)

        Log.result('Report finished')
        return True

    def _get_variable_report(self, startdate, member):
        var_manager = self.config.var_manager
        results = list()
        for var in var_manager.get_all_variables():
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if var.domain is None:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            for table, priority in var.tables:
                if priority is None or priority > self.config.report.maximum_priority:
                    continue
                if not self.data_manager.file_exists(var.domain, var.short_name, startdate, member, 1,
                                                     frequency=table.frequency):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    results.append((var, table, priority))
                    Log.debug(
                        'Variable {0.short_name} not found in {1.name}', var, table)
                    Log.result(
                        'Variable {0.short_name} found in {1.name}', var, table)
        return results
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _create_report(report_path, results):
        tables = set([result[1].name for result in results])
        for table in tables:
            file_handler = open('{0}.{1}'.format(report_path, table), 'w')
            table_results = [
                result for result in results if result[1].name == table]
            file_handler.write('\nTable {0}\n'.format(table))
            file_handler.write('===================================\n')
            priorities = set([result[2] for result in table_results])
            priorities = sorted(priorities)
            for priority in priorities:
                priority_results = [result[0]
                                    for result in table_results if result[2] == priority]
                priority_results = sorted(
                    priority_results, key=lambda v: v.short_name)
                file_handler.write(
                    '\nMissing variables with priority {0}:\n'.format(priority))
                file_handler.write('--------------------------------------\n')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                for var in priority_results:
                    file_handler.write('{0:12}: {1}\n'.format(
                        var.short_name, var.standard_name))
            file_handler.flush()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            file_handler.close()
    def _prepare_mesh_files(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        model_version = self.config.experiment.model_version
        if not model_version:
            Log.info('No model version defined. Skipping mesh files copy!')
            return
        Log.info('Copying mesh files')
        con_files = self.config.con_files
        model_version = self.config.experiment.model_version
        restore_meshes = self.config.restore_meshes
        mesh_mask = 'mesh_mask_nemo.{0}.nc'.format(model_version)
        new_mask_glo = 'new_maskglo.{0}.nc'.format(model_version)
        basins = 'basins.{0}.nc'.format(model_version)

        if self.config.mesh_mask:
            mesh_mask_path = self.config.mesh_mask
        else:
            mesh_mask_path = os.path.join(con_files, mesh_mask)

        if self.config.new_mask_glo:
            new_mask_glo_path = self.config.new_mask_glo
        else:
            new_mask_glo_path = os.path.join(con_files, new_mask_glo)
        if self.config.basins:
            basins_path = self.config.basins
            basins_path = os.path.join(con_files, basins)
        if self.config.scratch_masks:
            self._prepare_mesh_using_scratch(
                basins, basins_path,
                mesh_mask, mesh_mask_path,
                new_mask_glo, new_mask_glo_path,
                restore_meshes
            )
            self._copy_file(mesh_mask_path, 'mesh_hgr.nc', restore_meshes)
            self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
            self._link_file('mesh_hgr.nc', 'mask.nc')
            self._copy_file(new_mask_glo_path, 'new_maskglo.nc',
            self._copy_file(basins_path, 'basins.nc', restore_meshes)
        Log.result('Mesh files ready!')
    def _prepare_mesh_using_scratch(self, basins, basins_path,
                                    mesh_mask, mesh_mask_path,
                                    new_mask_glo, new_mask_glo_path,
                                    restore_meshes):
        Utils.create_folder_tree(self.config.scratch_masks)
        Utils.give_group_write_permissions(self.config.scratch_masks)
        mesh_mask_scratch_path = os.path.join(
            self.config.scratch_masks, mesh_mask)
        if self._copy_file(mesh_mask_path, mesh_mask_scratch_path,
                           restore_meshes):
            Utils.give_group_write_permissions(mesh_mask_scratch_path)
            self._link_file(mesh_mask_scratch_path, 'mesh_hgr.nc')
            self._link_file(mesh_mask_scratch_path, 'mesh_zgr.nc')
            self._link_file(mesh_mask_scratch_path, 'mask.nc')

        new_maskglo_scratch_path = os.path.join(
            self.config.scratch_masks, new_mask_glo)
        if self._copy_file(new_mask_glo_path,
                           new_maskglo_scratch_path, restore_meshes):
            Utils.give_group_write_permissions(new_maskglo_scratch_path)
            self._link_file(new_maskglo_scratch_path, 'new_maskglo.nc')

        basins_scratch_path = os.path.join(
            self.config.scratch_masks, basins)
        if self._copy_file(basins_path, basins_scratch_path, restore_meshes):
            Utils.give_group_write_permissions(basins_path)
            self._link_file(basins_scratch_path, 'basins.nc')
    def _copy_file(self, source, destiny, force):
        if not os.path.exists(source):
            Log.user_warning('File {0} is not available for {1}',
                             destiny, self.config.experiment.model_version)
            Log.debug('Looking for it in {0}', source)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        if not force and os.path.exists(destiny):
            # Small size differences can be due to the renaming of variables
            reference_size = os.stat(source).st_size
            delta_size = abs(reference_size - os.stat(destiny).st_size)
            if delta_size < 2048 or delta_size / reference_size < 1 / 1000 or True:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.info('File {0} already exists', destiny)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        Log.info('Copying file {0}', destiny)
        shutil.copyfile(source, destiny)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info('File {0} ready', destiny)
        Utils.rename_variables(destiny, self.dic_variables, False)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _link_file(self, source, destiny):
        if not os.path.exists(source):
            Log.user_warning('File {0} is not available for {1}',
                             destiny, self.config.experiment.model_version)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        if os.path.lexists(destiny):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            try:
                os.remove(destiny)
            except OSError as ex:
                if ex.errno == 13:  # Permission denied
                    Log.info('Link already created')
                    return
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        os.symlink(source, destiny)
        Log.info('File {0} ready', destiny)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Main for earthdiagnostics"""
    if not EarthDiags.parse_args(sys.argv[1:]):


if __name__ == "__main__":
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    main()