diags.py 14.5 KB
Newer Older
import argparse
import shutil
import os
from autosubmit.date.chunk_date_lib import *

from datamanager import DataManager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics import cdftools
from earthdiagnostics.utils import TempFile
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.ocean import *
from ocean import ConvectionSites, Gyres, Psi, MaxMoc, AreaMoc, Moc, VerticalMean, VerticalMeanMeters, Interpolate, \
    AverageSection, CutSection, MixedLayerSaltContent, Siasiesiv
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from parser import Parser
from utils import Utils
    """
    Launcher class for the diagnostics

    :param config_file: path to the configuration file
    :type config_file: str
    def __init__(self, config_file):
        Log.debug('Initialising Diags')
        self._read_config(config_file)
        TempFile.scratch_folder = self.scratch_dir
        cdftools.path = self.cdftools_path
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._create_dic_variables()
        Log.debug('Diags ready')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _create_dic_variables(self):
        self.dic_variables = dict()
        self.dic_variables['x'] = 'i'
        self.dic_variables['y'] = 'j'
        self.dic_variables['z'] = 'lev'
        self.dic_variables['nav_lon'] = 'lon'
        self.dic_variables['nav_lat'] = 'lat'
        self.dic_variables['nav_lev'] = 'lev'
        self.dic_variables['time_counter'] = 'time'
        self.dic_variables['t'] = 'time'

        """
        Run the diagnostics
        """
        Log.debug('Using netCDF version {0}', netCDF4.getlibversion())
        time = datetime.datetime.now()
        Log.info("Starting diagnostics at {0}", time)
        if not os.path.exists(self.scratch_dir):
            os.makedirs(self.scratch_dir)
        os.chdir(self.scratch_dir)

        self._prepare_mesh_files()
        Diagnostic.register(MixedLayerSaltContent, 'mlotstsc')
        Diagnostic.register(Siasiesiv, 'siasiesiv')
        Diagnostic.register(VerticalMean, 'vertmean')
        Diagnostic.register(VerticalMeanMeters, 'vertmeanmeters')
        Diagnostic.register(Interpolate, 'interp')
        Diagnostic.register(Moc, 'moc')
        Diagnostic.register(AreaMoc, 'mocarea')
        Diagnostic.register(MaxMoc, 'mocmax')
        Diagnostic.register(Psi, 'psi')
        Diagnostic.register(Gyres, 'gyres')
        Diagnostic.register(ConvectionSites, 'convection')
        Diagnostic.register(CutSection, 'cutsection')
        Diagnostic.register(AverageSection, 'avgsection')
        parse_date('20000101')

        self.data_manager.prepare_CMOR_files(self.startdates, self.members, self.force_CMOR)

        # Run diagnostics
        Log.info('Running diagnostics')
        list_jobs = Queue.Queue()
        for fulldiag in self._get_commands():
            Log.info("Running {0}", fulldiag)
            diag_class = Diagnostic.get_diagnostic(diag_options[0])
            if diag_class:
                for job in diag_class.generate_jobs(self, diag_options):
                    list_jobs.put(job)
            else:
                for startdate in self.startdates:
                    for member in self.members:
                        for chunk in range(1, self.chunks+1):
                            self._execute_diagnostic(diag_options, startdate, member, chunk)
            Log.result('Finished {0}', fulldiag)
        numthreads = min(Utils.available_cpu_count(), self.max_cores)
        threads = list()
        for numthread in range(0, numthreads):
            t = threading.Thread(target=Diags._run_jobs, args=(list_jobs, numthread))
        list_jobs.join()
        TempFile.clean()
        finsih_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finsih_time)
        Log.result("Time ellapsed: {0}", finsih_time - time)

    def _run_jobs(queue, numthread):
        def _run_job(current_job, retrials=1):
            while retrials > 0:
                try:
                    current_job.compute()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Log.result('Finished {0}', current_job)
                    return True
                except Exception as ex:
                    retrials -= 1
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Log.error('Job {0} failed: {1}', job, ex)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        failed_jobs = list()

        while not queue.empty():
            try:
                job = queue.get(timeout=1)
                if _run_job(job):
                    count += 1
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                else:
                    failed_jobs.append(str(job))
                queue.task_done()
            except Queue.Empty:
                continue
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if len(failed_jobs) == 0:
            Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count)
        else:
            Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count,
                       count + len(failed_jobs))
            for job in failed_jobs:
                Log.error('Job {0} could not be run', job)
    def _execute_diagnostic(self, diag_options, startdate, member, chunk):
        diag = diag_options[0]
            basin = diag_options[1]
            mixed_layer = int(diag_options[2])
            depth_min = int(diag_options[3])
            depth_max = int(diag_options[4])

            if mixed_layer == 1:
                mxl = 'mlotst'
                depth = ''
            elif mixed_layer == 0:
                mxl = ''
                depth = '{0}-{1}'.format(depth_min, depth_max)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            else:
                mxl = 'nomlotst'
                depth = ''
            variables = ('thetao', 'mlotst', 'ohcsum{0}{1}', 'ohcvmean{0}{1}'.format(mxl, depth))
            for [input_file, mlotst_file, ohcsum_file, ohcvmean_file] in self.data_manager.get_files(startdate, member,
                                                                                                     chunk, 'ocean',
                                                                                                     variables):
                Heat.total(input_file, mlotst_file, ohcsum_file, ohcvmean_file, basin, mixed_layer,
                           depth_min, depth_max)
        elif diag == 'ohclayer':
            depth_min = int(diag_options[1])
            depth_max = int(diag_options[2])

            depth = '{0}-{1}'.format(depth_min, depth_max)
            variables = ('thetao', 'ohc{0}'.format(depth))
            for [input_file, output_file] in self.data_manager.get_files(startdate, member, chunk, 'ocean', variables):
                Heat.layer(input_file, output_file, depth_min, depth_max)
        elif diag == 'mlotsthc':
            variables = ('thetao', 'mlotst', 'ohcvertsummlotst')
            for [input_file, mlotst_file, output_file] in self.data_manager.get_files(startdate, member, chunk, 'ocean',
                                                                                      variables):
                Heat.mixed_layer_content(input_file, mlotst_file, output_file)
        else:
            Log.warning('Diagnostic {0} not available', diag)
            return
    def _get_commands(self):
        Log.debug('Preparing command list')
        commands = self.diags.split()

        for alias, added_commands in self._aliases.items():
            if alias in commands:
                Log.info('Changing alias {0} for {1}', alias, ' '.join(added_commands))
                commands.remove(alias)
                for add_command in added_commands:
                    commands.append(add_command)
        Log.debug('Command list ready ')
        return commands

    def _prepare_mesh_files(self):
        Log.info('Copying mesh files')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._copy_file(os.path.join(self.con_files, 'mesh_mask_nemo.{0}.nc'.format(self.nemo_version)), 'mesh_hgr.nc')
        self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
        self._link_file('mesh_hgr.nc', 'mask.nc')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._copy_file(os.path.join(self.con_files, 'new_maskglo.{0}.nc'.format(self.nemo_version)), 'new_maskglo.nc')
        self._copy_file(os.path.join(self.con_files, 'mask.regions.{0}.nc'.format(self.nemo_version)),
                        'mask_regions.nc')
        self._copy_file(os.path.join(self.con_files, 'mask.regions.3d.{0}.nc'.format(self.nemo_version)),
                        'mask_regions.3d.nc')
        Log.result('Mesh files ready!')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _copy_file(self, source, destiny):
        if not os.path.exists(source):
            Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        if os.path.exists(destiny):
            if os.stat(source).st_size == os.stat(destiny).st_size:
                Log.info('File {0} already exists', destiny)
                return

        Log.info('Creating file {0}', destiny)
        shutil.copy(source, destiny)
        Log.info('File {0} ready', destiny)
        Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True)

    def _link_file(self, source, destiny):
        if not os.path.exists(source):
            Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version)

        if os.path.exists(destiny):
            if os.stat(source).st_size == os.stat(destiny).st_size:
                Log.info('File {0} already exists', destiny)
                return
            else:
                os.remove(destiny)

        os.symlink(source, destiny)
        Log.info('File {0} ready', destiny)

    def _read_config(self, config_file):
        self.parser = Parser()
        self.parser.optionxform = str
        self.parser.read(config_file)

        # Read diags config
        self.scratch_dir = self.parser.get_option('DIAGNOSTICS', 'SCRATCH_DIR')
        self.data_dir = self.parser.get_option('DIAGNOSTICS', 'DATA_DIR')
        self.con_files = self.parser.get_option('DIAGNOSTICS', 'CON_FILES')
        self.diags = self.parser.get_option('DIAGNOSTICS', 'DIAGS').lower()
        self.frequency = self.parser.get_option('DIAGNOSTICS', 'FREQUENCY')
        self.cdftools_path = self.parser.get_option('DIAGNOSTICS', 'CDFTOOLS_PATH')
        self.max_cores = self.parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 100000)

        # Read experiment config
        self.institute = self.parser.get_option('EXPERIMENT', 'INSTITUTE')
        self.expid = self.parser.get_option('EXPERIMENT', 'EXPID')
        self.experiment_name = self.parser.get_option('EXPERIMENT', 'NAME', self.expid)

        self.members = list()
        for member in self.parser.get_option('EXPERIMENT', 'MEMBERS').split():
            self.members.append(int(member))

        self.member_digits = self.parser.get_int_option('EXPERIMENT', 'MEMBER_DIGITS', 1)
        self.startdates = self.parser.get_option('EXPERIMENT', 'STARTDATES').split()
        self.chunk_size = self.parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
        self.chunks = self.parser.get_int_option('EXPERIMENT', 'CHUNKS')
        self.calendar = self.parser.get_option('EXPERIMENT', 'CALENDAR', 'standard')
        self.model = self.parser.get_option('EXPERIMENT', 'MODEL')
        self.nfrp = self.parser.get_int_option('EXPERIMENT', 'NFRP')
        self.nemo_version = self.parser.get_option('EXPERIMENT', 'NEMO_VERSION')

        self.add_name = self.parser.get_bool_option('CMOR', 'ADD_NAME')
        self.add_startdate = self.parser.get_bool_option('CMOR', 'ADD_STARTDATE')

        # Read aliases
        self._aliases = dict()
        if self.parser.has_section('ALIAS'):
            for option in self.parser.options('ALIAS'):
                self._aliases[option.lower()] = self.parser.get_option('ALIAS', option).lower().split()

        self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.expid)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if not os.path.exists(self.scratch_dir):
            os.makedirs(self.scratch_dir)
        os.chdir(self.scratch_dir)
        self.data_manager = DataManager(self.institute, self.model, self.expid, self.data_dir,
                                        self.frequency, self.chunk_size, self.experiment_name, self.chunks,
                                        self.scratch_dir, self.nfrp, self.member_digits,
                                        self.calendar)

        self.data_manager.add_startdate = self.add_startdate
        self.data_manager.add_name = self.add_name

        self.force_CMOR = self.parser.get_bool_option('CMOR', 'FORCE', False)
        self.data_manager.associated_experiment = self.parser.get_option('CMOR', 'ASSOCIATED_EXPERIMENT', 'to be filled')
        self.data_manager.associated_model = self.parser.get_option('CMOR', 'ASSOCIATED_MODEL', 'to be filled')
        self.data_manager.initialization_description = self.parser.get_option('CMOR', 'INITIALIZATION_DESCRIPTION',
                                                                              'to be filled')
        self.data_manager.initialization_method = self.parser.get_option('CMOR', 'INITIALIZATION_METHOD',
                                                                         'to be filled')
        self.data_manager.physics_description = self.parser.get_option('CMOR', 'PHYSICS_DESCRIPTION', 'to be filled')
        self.data_manager.physics_version = self.parser.get_option('CMOR', 'PHYSICS_VERSION', 'to be filled')
        self.data_manager.source = self.parser.get_option('CMOR', 'SOURCE', 'to be filled')
    parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.')
    parser.add_argument('-v', '--version', action='version', version='0.1',
                        help="returns Earth Diagnostics's version number and exit")
    parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
                                                     'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
                        default='DEBUG', type=str,
                        help="sets file's log level.")
    parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
                                                        'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
                        default='INFO', type=str,
                        help="sets console's log level")

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    parser.add_argument('-log', '--logfilepath', default=None, type=str)

    parser.add_argument('-f', '--configfile', default='diags.conf', type=str)

    args = parser.parse_args()
    Log.set_console_level(args.logconsole)
    Log.set_file_level(args.logfile)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    if args.logfilepath:
        Log.set_file(args.logfilepath)

    diags = Diags(args.configfile)


if __name__ == "__main__":
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    main()