#!/usr/bin/env python import Queue import argparse import shutil import threading import netCDF4 import os from autosubmit.date.chunk_date_lib import * from datamanager import DataManager from earthdiagnostics import cdftools from earthdiagnostics.utils import TempFile from earthdiagnostics.diagnostic import Diagnostic from earthdiagnostics.ocean import * from ocean import ConvectionSites, Gyres, Psi, MaxMoc, AreaMoc, Moc, VerticalMean, VerticalMeanMeters, Interpolate, \ AverageSection, CutSection, MixedLayerSaltContent, Siasiesiv from parser import Parser from utils import Utils class Diags: """ Launcher class for the diagnostics :param config_file: path to the configuration file :type config_file: str """ def __init__(self, config_file): Log.debug('Initialising Diags') self._read_config(config_file) self.data_manager = DataManager(self.institute, self.model, self.expid, self.data_dir, self.frequency, self.chunk_size, self.experiment_name, self.chunks, self.scratch_dir, self.nfrp, self.calendar) self.data_manager.add_startdate = self.add_startdate self.data_manager.add_name = self.add_name TempFile.scratch_folder = self.scratch_dir cdftools.path = self.cdftools_path self._create_dic_variables() Log.debug('Diags ready') def _create_dic_variables(self): self.dic_variables = dict() self.dic_variables['x'] = 'i' self.dic_variables['y'] = 'j' self.dic_variables['z'] = 'lev' self.dic_variables['nav_lon'] = 'lon' self.dic_variables['nav_lat'] = 'lat' self.dic_variables['nav_lev'] = 'lev' self.dic_variables['time_counter'] = 'time' self.dic_variables['t'] = 'time' def run(self): """ Run the diagnostics """ Log.debug('Using netCDF version {0}', netCDF4.getlibversion()) time = datetime.datetime.now() Log.info("Starting diagnostics at {0}", time) if not os.path.exists(self.scratch_dir): os.makedirs(self.scratch_dir) os.chdir(self.scratch_dir) self._prepare_mesh_files() Diagnostic.register(MixedLayerSaltContent, 'mlotstsc') Diagnostic.register(Siasiesiv, 'siasiesiv') Diagnostic.register(VerticalMean, 'vertmean') Diagnostic.register(VerticalMeanMeters, 'vertmeanmeters') Diagnostic.register(Interpolate, 'interp') Diagnostic.register(Moc, 'moc') Diagnostic.register(AreaMoc, 'mocarea') Diagnostic.register(MaxMoc, 'mocmax') Diagnostic.register(Psi, 'psi') Diagnostic.register(Gyres, 'gyres') Diagnostic.register(ConvectionSites, 'convection') Diagnostic.register(CutSection, 'cutsection') Diagnostic.register(AverageSection, 'avgsection') parse_date('20000101') self.data_manager.prepare_CMOR_files(self.startdates, self.members, self.force_CMOR) # Run diagnostics Log.info('Running diagnostics') list_jobs = Queue.Queue() for fulldiag in self._get_commands(): Log.info("Running {0}", fulldiag) diag_options = fulldiag.split(',') diag_class = Diagnostic.get_diagnostic(diag_options[0]) if diag_class: for job in diag_class.generate_jobs(self, diag_options): list_jobs.put(job) continue else: for startdate in self.startdates: for member in self.members: for chunk in range(1, self.chunks+1): self._execute_diagnostic(diag_options, startdate, member, chunk) Log.result('Finished {0}', fulldiag) numthreads = min(Utils.available_cpu_count(), self.max_cores) threads = list() for numthread in range(0, numthreads): t = threading.Thread(target=Diags._run_jobs, args=(list_jobs, numthread)) threads.append(t) t.start() list_jobs.join() TempFile.clean() finsih_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finsih_time) Log.result("Time ellapsed: {0}", finsih_time - time) @staticmethod def _run_jobs(queue, numthread): def _run_job(current_job, retrials=1): while retrials > 0: try: current_job.compute() Log.result('Finished {0}', current_job) return True except Exception as ex: retrials -= 1 Log.error('Job {0} failed: {1}', job, ex) return False count = 0 failed_jobs = list() while not queue.empty(): try: job = queue.get(timeout=1) if _run_job(job): count += 1 else: failed_jobs.append(str(job)) queue.task_done() except Queue.Empty: continue if len(failed_jobs) == 0: Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count) else: Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count, count + len(failed_jobs)) for job in failed_jobs: Log.error('Job {0} could not be run', job) return def _execute_diagnostic(self, diag_options, startdate, member, chunk): diag = diag_options[0] if diag == 'ohc': basin = diag_options[1] mixed_layer = int(diag_options[2]) depth_min = int(diag_options[3]) depth_max = int(diag_options[4]) if mixed_layer == 1: mxl = 'mlotst' depth = '' elif mixed_layer == 0: mxl = '' depth = '{0}-{1}'.format(depth_min, depth_max) else: mxl = 'nomlotst' depth = '' variables = ('thetao', 'mlotst', 'ohcsum{0}{1}', 'ohcvmean{0}{1}'.format(mxl, depth)) for [input_file, mlotst_file, ohcsum_file, ohcvmean_file] in self.data_manager.get_files(startdate, member, chunk, 'ocean', variables): Heat.total(input_file, mlotst_file, ohcsum_file, ohcvmean_file, basin, mixed_layer, depth_min, depth_max) elif diag == 'ohclayer': depth_min = int(diag_options[1]) depth_max = int(diag_options[2]) depth = '{0}-{1}'.format(depth_min, depth_max) variables = ('thetao', 'ohc{0}'.format(depth)) for [input_file, output_file] in self.data_manager.get_files(startdate, member, chunk, 'ocean', variables): Heat.layer(input_file, output_file, depth_min, depth_max) elif diag == 'mlotsthc': variables = ('thetao', 'mlotst', 'ohcvertsummlotst') for [input_file, mlotst_file, output_file] in self.data_manager.get_files(startdate, member, chunk, 'ocean', variables): Heat.mixed_layer_content(input_file, mlotst_file, output_file) else: Log.warning('Diagnostic {0} not available', diag) return def _get_commands(self): Log.debug('Preparing command list') commands = self.diags.split() for alias, added_commands in self._aliases.items(): if alias in commands: Log.info('Changing alias {0} for {1}', alias, ' '.join(added_commands)) commands.remove(alias) for add_command in added_commands: commands.append(add_command) Log.debug('Command list ready ') return commands def _prepare_mesh_files(self): Log.info('Copying mesh files') self._copy_file(os.path.join(self.con_files, 'mesh_mask_nemo.{0}.nc'.format(self.nemo_version)), 'mesh_hgr.nc') self._link_file('mesh_hgr.nc', 'mesh_zgr.nc') self._link_file('mesh_hgr.nc', 'mask.nc') self._copy_file(os.path.join(self.con_files, 'new_maskglo.{0}.nc'.format(self.nemo_version)), 'new_maskglo.nc') self._copy_file(os.path.join(self.con_files, 'mask.regions.{0}.nc'.format(self.nemo_version)), 'mask_regions.nc') self._copy_file(os.path.join(self.con_files, 'mask.regions.3d.{0}.nc'.format(self.nemo_version)), 'mask_regions.3d.nc') Log.result('Mesh files ready!') def _copy_file(self, source, destiny): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version) return if os.path.exists(destiny): if os.stat(source).st_size == os.stat(destiny).st_size: Log.info('File {0} already exists', destiny) return Log.info('Creating file {0}', destiny) shutil.copy(source, destiny) Log.info('File {0} ready', destiny) Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True) def _link_file(self, source, destiny): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version) if os.path.exists(destiny): if os.stat(source).st_size == os.stat(destiny).st_size: Log.info('File {0} already exists', destiny) return else: os.remove(destiny) os.symlink(source, destiny) Log.info('File {0} ready', destiny) def _read_config(self, config_file): self.parser = Parser() self.parser.optionxform = str self.parser.read(config_file) # Read diags config self.scratch_dir = self.parser.get_option('DIAGNOSTICS', 'SCRATCH_DIR') self.data_dir = self.parser.get_option('DIAGNOSTICS', 'DATA_DIR') self.con_files = self.parser.get_option('DIAGNOSTICS', 'CON_FILES') self.diags = self.parser.get_option('DIAGNOSTICS', 'DIAGS').lower() self.frequency = self.parser.get_option('DIAGNOSTICS', 'FREQUENCY') self.cdftools_path = self.parser.get_option('DIAGNOSTICS', 'CDFTOOLS_PATH') self.max_cores = self.parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 100000) self.force_CMOR = self.parser.get_bool_option('DIAGNOSTICS', 'FORCE_CMOR', False) # Read experiment config self.institute = self.parser.get_option('EXPERIMENT', 'INSTITUTE') self.expid = self.parser.get_option('EXPERIMENT', 'EXPID') self.experiment_name = self.parser.get_option('EXPERIMENT', 'NAME') self.members = list() for member in self.parser.get_option('EXPERIMENT', 'MEMBERS').split(): self.members.append(int(member)) self.startdates = self.parser.get_option('EXPERIMENT', 'STARTDATES').split() self.chunk_size = self.parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE') self.chunks = self.parser.get_int_option('EXPERIMENT', 'CHUNKS') self.calendar = self.parser.get_option('EXPERIMENT', 'CALENDAR', 'standard') self.model = self.parser.get_option('EXPERIMENT', 'MODEL') self.nfrp = self.parser.get_int_option('EXPERIMENT', 'NFRP') self.nemo_version = self.parser.get_option('EXPERIMENT', 'NEMO_VERSION') self.add_name = self.parser.get_bool_option('CMOR', 'ADD_NAME') self.add_startdate = self.parser.get_bool_option('CMOR', 'ADD_STARTDATE') # Read aliases self._aliases = dict() if self.parser.has_section('ALIAS'): for option in self.parser.options('ALIAS'): self._aliases[option.lower()] = self.parser.get_option('ALIAS', option).lower().split() self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.expid) if not os.path.exists(self.scratch_dir): os.makedirs(self.scratch_dir) os.chdir(self.scratch_dir) def main(): parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.') parser.add_argument('-v', '--version', action='version', version='0.1', help="returns Earth Diagnostics's version number and exit") parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='DEBUG', type=str, help="sets file's log level.") parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='INFO', type=str, help="sets console's log level") parser.add_argument('-log', '--logfilepath', default=None, type=str) parser.add_argument('-f', '--configfile', default='diags.conf', type=str) args = parser.parse_args() Log.set_console_level(args.logconsole) Log.set_file_level(args.logfile) if args.logfilepath: Log.set_file(args.logfilepath) diags = Diags(args.configfile) diags.run() TempFile.clean() if __name__ == "__main__": main()