earthdiags.py 19.8 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import argparse
import shutil
from bscearth.utils.date import *
import bscearth.utils.path
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.config import Config
from earthdiagnostics.cmormanager import CMORManager
from earthdiagnostics.threddsmanager import THREDDSManager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics import cdftools
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.ocean import *
from earthdiagnostics.general import *
from earthdiagnostics.statistics import *
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.diagnostic import DiagnosticOptionError
import tempfile
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class EarthDiags(object):
    """
    Launcher class for the diagnostics

    :param config_file: path to the configuration file
    :type config_file: str
    # Get the version number from the relevant file. If not, from autosubmit package
    scriptdir = os.path.abspath(os.path.dirname(__file__))
    if not os.path.exists(os.path.join(scriptdir, 'VERSION')):
        scriptdir = os.path.join(scriptdir, os.path.pardir)

    version_path = os.path.join(scriptdir, 'VERSION')
    readme_path = os.path.join(scriptdir, 'README')
    changes_path = os.path.join(scriptdir, 'CHANGELOG')
    documentation_path = os.path.join(scriptdir, 'EarthDiagnostics.pdf')
    if os.path.isfile(version_path):
        with open(version_path) as f:
        version = pkg_resources.require("earthdiagnostics")[0].version
    def __init__(self, config_file):
        Log.info('Initialising Earth Diagnostics Version {0}', EarthDiags.version)
        TempFile.scratch_folder = self.config.scratch_dir
        cdftools.path = self.config.cdftools_path
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._create_dic_variables()
        self.data_manager = None
        self.threads = None
        self.had_errors = False
        Log.debug('Diags ready')
        Log.info('Running diags for experiment {0}, startdates {1}, members {2}', self.config.experiment.expid,
                 self.config.experiment.startdates, self.config.experiment.members)
    @staticmethod
    def parse_args():
        """
        Entry point for the Earth Diagnostics. For more detailed documentation, use -h option
        """
        # try:
        parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.')
        parser.add_argument('-v', '--version', action='version', version=EarthDiags.version,
                            help="returns Earth Diagnostics's version number and exit")
        parser.add_argument('--doc', action='store_true',
                            help="opens documentation and exits")
        parser.add_argument('--clean', action='store_true',
                            help="clean the scratch folder and exits")
        parser.add_argument('--report', action='store_true',
                            help="generates a report about the available files")
        parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
                                                         'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
                            default='DEBUG', type=str,
                            help="sets file's log level.")
        parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
                                                            'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
                            default='INFO', type=str,
                            help="sets console's log level")

        parser.add_argument('-log', '--logfilepath', default=None, type=str)

        parser.add_argument('-f', '--configfile', default='diags.conf', type=str)

        args = parser.parse_args()
        if args.doc:
            Log.info('Opening documentation...')
            doc_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'EarthDiagnostics.pdf')
            Utils.execute_shell_command(('xdg-open', doc_path))
            Log.result('Documentation opened!')
            return True
        Log.set_console_level(args.logconsole)
        Log.set_file_level(args.logfile)

        if Log.console_handler.level <= Log.DEBUG:
            Utils.cdo.debug = True
            Utils.nco.debug = False  # This is due to a bug in nco. Must change when it's solved
            Log.set_file(bscearth.utils.path.expand_path(args.logfilepath))
        config_file_path = bscearth.utils.path.expand_path(args.configfile)
        if not os.path.isfile(config_file_path):
            Log.critical('Configuration file {0} can not be found', config_file_path)
        try:
            diags = EarthDiags(config_file_path)
            if args.clean:
                result = diags.clean()
            elif args.report:
                result = diags.report()
            else:
                result = diags.run()
        except Exception:
            raise
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _create_dic_variables(self):
        self.dic_variables = dict()
        self.dic_variables['x'] = 'i'
        self.dic_variables['y'] = 'j'
        self.dic_variables['z'] = 'lev'
        self.dic_variables['nav_lon'] = 'lon'
        self.dic_variables['nav_lat'] = 'lat'
        self.dic_variables['nav_lev'] = 'lev'
        self.dic_variables['time_counter'] = 'time'
        self.dic_variables['t'] = 'time'

        """
        Run the diagnostics
        """
        self.had_errors = False
        Log.debug('Using netCDF version {0}', netCDF4.getlibversion())

        self._prepare_scratch_dir()
        self._prepare_mesh_files()
        self._register_diagnostics()
        self._prepare_data_manager()

        # Run diagnostics
        Log.info('Running diagnostics')
        list_jobs = self.prepare_job_list()
        time = datetime.datetime.now()
        Log.info("Starting to compute at {0}", time)
        self.threads = Utils.available_cpu_count()
        if 0 < self.config.max_cores < self.threads:
            self.threads = self.config.max_cores
        Log.info('Using {0} threads', self.threads)
        for num_thread in range(0, self.threads):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            t = threading.Thread(target=EarthDiags._run_jobs, args=(self, list_jobs, num_thread))
        for t in threads:
            t.join()
        TempFile.clean()
        finish_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finish_time)
        Log.result("Elapsed time: {0}\n", finish_time - time)
        self.print_stats()
        if self.config.auto_clean:
            self._remove_scratch_dir()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return not self.had_errors
    def _prepare_scratch_dir(self):
        if self.config.use_ramdisk:
            self._remove_scratch_dir()
            tempfile.mkdtemp(dir='/dev/shm')
            os.symlink(tempfile.mkdtemp(dir='/dev/shm'), self.config.scratch_dir)
        else:
            if not os.path.exists(self.config.scratch_dir):
                os.makedirs(self.config.scratch_dir)
        os.chdir(self.config.scratch_dir)

    def _prepare_data_manager(self):
        if self.config.data_adaptor == 'CMOR':
            self.data_manager = CMORManager(self.config)
        elif self.config.data_adaptor == 'THREDDS':
            self.data_manager = THREDDSManager(self.config)
        self.data_manager.prepare()

    def print_stats(self):
        Log.info('Time consumed by each diagnostic class')
        Log.info('--------------------------------------')
        for num_thread in range(0, self.threads):
            for key, value in self.time[num_thread].items():
                if key in total:
                    total[key] += value
                else:
                    total[key] = value
        for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
            Log.info('{0:23} {1:}', diag.__name__, time)
    def print_errors(self):
        if len(self._failed_jobs) == 0:
            return
        self.had_errors = True
        Log.error('Failed jobs')
        Log.error('-----------')
        for job in self._failed_jobs:
            Log.error(str(job))
        Log.info('')

    def prepare_job_list(self):
        list_jobs = Queue.Queue()
        for fulldiag in self.config.get_commands():
            Log.info("Adding {0} to diagnostic list", fulldiag)
            diag_options = fulldiag.split(',')

            diag_class = Diagnostic.get_diagnostic(diag_options[0])
            if diag_class:
                try:
                    for job in diag_class.generate_jobs(self, diag_options):
                        list_jobs.put(job)
                    continue
                except DiagnosticOptionError as ex:
                    Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex)
                    self.had_errors = True
            else:
                Log.error('{0} is not an available diagnostic', diag_options[0])
                self.had_errors = True
        return list_jobs

    @staticmethod
    def _register_diagnostics():
        EarthDiags._register_ocean_diagnostics()
        EarthDiags._register_general_diagnostics()
        EarthDiags._register_stats_diagnostics()

    @staticmethod
    def _register_stats_diagnostics():
        Diagnostic.register(MonthlyPercentile)
        Diagnostic.register(ClimatologicalPercentile)

    @staticmethod
    def _register_general_diagnostics():
        Diagnostic.register(DailyMean)
        Diagnostic.register(MonthlyMean)
        Diagnostic.register(YearlyMean)
        Diagnostic.register(SimplifyDimensions)
        Diagnostic.register(Relink)
        Diagnostic.register(RelinkAll)
        Diagnostic.register(Scale)
        Diagnostic.register(Attribute)
        Diagnostic.register(SelectLevels)
        Diagnostic.register(Module)

    @staticmethod
    def _register_ocean_diagnostics():
        Diagnostic.register(MixedLayerSaltContent)
        Diagnostic.register(Siasiesiv)
        Diagnostic.register(VerticalMean)
        Diagnostic.register(VerticalMeanMeters)
        Diagnostic.register(Interpolate)
        Diagnostic.register(InterpolateCDO)
        Diagnostic.register(Moc)
        Diagnostic.register(AreaMoc)
        Diagnostic.register(MaxMoc)
        Diagnostic.register(Psi)
        Diagnostic.register(Gyres)
        Diagnostic.register(ConvectionSites)
        Diagnostic.register(CutSection)
        Diagnostic.register(AverageSection)
        Diagnostic.register(MixedLayerHeatContent)
        Diagnostic.register(HeatContentLayer)
        Diagnostic.register(HeatContent)
        Diagnostic.register(RegionMean)
        Diagnostic.register(Rotation)
        Diagnostic.register(Mxl)
        Diagnostic.register(VerticalGradient)
        Diagnostic.register(MaskLand)
    def clean(self):
        Log.info('Removing scratch folder...')
        self._remove_scratch_dir()
        Log.result('Scratch folder removed')
    def _remove_scratch_dir(self):
        if os.path.islink(self.config.scratch_dir):
            shutil.rmtree(os.path.realpath(self.config.scratch_dir))
            os.remove(self.config.scratch_dir)
        elif os.path.isdir(self.config.scratch_dir):
            shutil.rmtree(self.config.scratch_dir)

    def report(self):
        Log.info('Looking for existing vars...')
        self._prepare_data_manager()
        for startdate in self.config.experiment.startdates:
            for member in self.config.experiment.members:
                results = self._get_variable_report(startdate, member)
                report_path = os.path.join(self.config.scratch_dir,
                                           '{0}_{1}.report'.format(startdate,
                                                                   self.config.experiment.get_member_str(member)))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Utils.create_folder_tree(self.config.scratch_dir)
                self.create_report(report_path, results)

        Log.result('Report finished')
        return True

    def _get_variable_report(self, startdate, member):
        var_manager = VariableManager()
        results = list()
        for var in var_manager.get_all_variables():
            if var.priority is None or var.domain is None:
                continue
            for table in var.tables:
                if not self.data_manager.file_exists(var.domain, var.short_name, startdate, member, 1,
                                                     frequency=table.frequency):
                    results.append((var, table))
        return results

    def create_report(self, report_path, results):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        realms = set([result[0].domain for result in results])
        realms = sorted(realms)
        for realm in realms:
            file_handler = open('{0}.{1}'.format(report_path, realm), 'w')
            realm_results = [result for result in results if result[0].domain == realm]

            tables = set([result[1].name for result in realm_results])
            tables = sorted(tables)
            for table in tables:
                table_results = [result for result in realm_results if result[1].name == table]

                file_handler.write('\nTable {0}\n'.format(table))
                file_handler.write('===================================\n')

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                priorities = set([int(result[0].priority) for result in table_results])
                priorities = sorted(priorities)
                for priority in priorities:
                    priority_results = [result for result in table_results if int(result[0].priority) == priority]
                    priority_results = sorted(priority_results, key=lambda result: result[0].short_name)
                    file_handler.write('\nMissing variables with priority {0}:\n'.format(priority))
                    file_handler.write('--------------------------------------\n')

                    for var, table in priority_results:
                        file_handler.write('{0:12}: {1}\n'.format(var.short_name, var.standard_name))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            file_handler.close()
        def _run_job(current_job, retrials=1):
                    Log.info('Starting {0}', current_job)
                    current_job.compute()
                    time = datetime.datetime.now() - time
                    if type(current_job) in self.time[numthread]:
                        self.time[numthread][type(current_job)] += time
                    else:
                        self.time[numthread][type(current_job)] = time
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Log.result('Finished {0}', current_job)
                    return True
                except Exception as ex:
                    retrials -= 1
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Log.error('Job {0} failed: {1}', job, ex)
            return False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        failed_jobs = list()

        while not queue.empty():
            try:
                job = queue.get(timeout=1)
                if _run_job(job):
                    count += 1
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                else:
                    failed_jobs.append(str(job))
                queue.task_done()
            except Queue.Empty:
                continue
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if len(failed_jobs) == 0:
            Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count)
        else:
            Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count,
                       count + len(failed_jobs))
            self._failed_jobs += failed_jobs
    def _prepare_mesh_files(self):
        Log.info('Copying mesh files')
        con_files = self.config.con_files
        model_version = self.config.experiment.model_version
        restore_meshes = self.config.restore_meshes
        mesh_mask = 'mesh_mask_nemo.{0}.nc'.format(model_version)
        new_mask_glo = 'new_maskglo.{0}.nc'.format(model_version)
        mask_regions = 'mask.regions.{0}.nc'.format(model_version)
        mask_regions_3d = 'mask.regions.3d.{0}.nc'.format(model_version)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if self.config.scratch_masks:
            Utils.create_folder_tree(self.config.scratch_masks)
            Utils.give_group_write_permissions(self.config.scratch_masks)
            mesh_mask_path = os.path.join(self.config.scratch_masks, mesh_mask)
            if self._copy_file(os.path.join(con_files, mesh_mask), mesh_mask_path,
                Utils.give_group_write_permissions(mesh_mask_path)
                self._link_file(mesh_mask_path, 'mesh_hgr.nc')
                self._link_file(mesh_mask_path, 'mesh_zgr.nc')
                self._link_file(mesh_mask_path, 'mask.nc')
            new_maskglo_scratch_path = os.path.join(self.config.scratch_masks, new_mask_glo)
            if self._copy_file(os.path.join(con_files, new_mask_glo),
                               new_maskglo_scratch_path, restore_meshes):
                Utils.give_group_write_permissions(new_maskglo_scratch_path)
                self._link_file(new_maskglo_scratch_path, 'new_maskglo.nc')
            mask_regions_scratch_path = os.path.join(self.config.scratch_masks, mask_regions)
            if self._copy_file(os.path.join(con_files, mask_regions),
                               mask_regions_scratch_path, restore_meshes):
                Utils.give_group_write_permissions(mask_regions_scratch_path)
                self._link_file(mask_regions_scratch_path, 'mask_regions.nc')
            mask_regions3d_scratch_path = os.path.join(self.config.scratch_masks, mask_regions_3d)
            if self._copy_file(os.path.join(con_files, mask_regions_3d),
                               mask_regions3d_scratch_path, restore_meshes):
                Utils.give_group_write_permissions(mask_regions3d_scratch_path)
                self._link_file(mask_regions3d_scratch_path, 'mask_regions.3d.nc')
        else:
            self._copy_file(os.path.join(con_files, mesh_mask), 'mesh_hgr.nc', restore_meshes)
            self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
            self._link_file('mesh_hgr.nc', 'mask.nc')
            self._copy_file(os.path.join(con_files, new_mask_glo), 'new_maskglo.nc',
                            restore_meshes)
            self._copy_file(os.path.join(con_files, mask_regions),
                            'mask_regions.nc', restore_meshes)
            self._copy_file(os.path.join(con_files, mask_regions_3d),
                            'mask_regions.3d.nc', restore_meshes)
        Log.result('Mesh files ready!')
    def _copy_file(self, source, destiny, force):
        if not os.path.exists(source):
            Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        if not force and os.path.exists(destiny):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if os.stat(source).st_size == os.stat(destiny).st_size:
                Log.info('File {0} already exists', destiny)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        Log.info('Creating file {0}', destiny)
        shutil.copy(source, destiny)
        Log.info('File {0} ready', destiny)
        Utils.rename_variables(destiny, self.dic_variables, False, True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _link_file(self, source, destiny):
        if not os.path.exists(source):
            Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        if os.path.lexists(destiny):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            try:
                os.remove(destiny)
            except OSError:
                pass
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        os.symlink(source, destiny)
        Log.info('File {0} ready', destiny)

    if not EarthDiags.parse_args():
        exit(1)


if __name__ == "__main__":
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    main()