#!/usr/bin/env python import argparse import threading import netCDF4 from datamanager import DataManager from earthdiagnostics.ocean import Salinity, Circulation, Heat, General, Siasiesiv from utils import Utils from earthdiagnostics import cdftools, TempFile from autosubmit.date.chunk_date_lib import * from parser import Parser from autosubmit.config.log import Log import shutil import os class Diags: """ Launcher class for the diagnostics :param config_file: path to the configuration file :type config_file: str """ def __init__(self, config_file): Log.debug('Initialising Diags') self._read_config(config_file) self.datamanager = DataManager(self.institute, self.model, self.expid, self.data_dir, self.frequency, self.chunk_size, self.experiment_name) self.datamanager.add_startdate = self.add_startdate self.datamanager.add_name = self.add_name TempFile.scratch_folder = self.scratch_dir cdftools.path = self.cdftools_path self._create_dic_variables() Log.debug('Diags ready') def _create_dic_variables(self): self.dic_variables = dict() self.dic_variables['x'] = 'i' self.dic_variables['y'] = 'j' self.dic_variables['z'] = 'lev' self.dic_variables['nav_lon'] = 'lon' self.dic_variables['nav_lat'] = 'lat' self.dic_variables['nav_lev'] = 'lev' self.dic_variables['time_counter'] = 'time' self.dic_variables['t'] = 'time' def run(self): """ Run the diagnostics """ Log.debug('Using netCDF version {0}', netCDF4.getlibversion()) time = datetime.datetime.now() Log.info("Starting diagnostics at {0}", time) if not os.path.exists(self.scratch_dir): os.makedirs(self.scratch_dir) os.chdir(self.scratch_dir) self._prepare_mesh_files() self.datamanager.prepare_CMOR_files(self.startdates, self.members) # Run diagnostics Log.info('Running diagnostics') list_jobs = list() for fulldiag in self._get_commands(): Log.info("Running {0}", fulldiag) diag_options = fulldiag.split(',') if diag_options[0] == 'siasiesiv': list_jobs += Siasiesiv.generate_jobs(self.datamanager, self.startdates, self.members, self.chunks, diag_options) continue for startdate in self.startdates: for member in self.members: for chunk in range(1, self.chunks+1): self._execute_diagnostic(diag_options, startdate, member, chunk) Log.result('Finished {0}', fulldiag) numthreads = min(Utils.available_cpu_count(), self.max_cores) threads = list() for numthread in range(0, numthreads): t = threading.Thread(target=Diags._run_jobs, args=([list_jobs[numthread:len(list_jobs): numthreads]])) threads.append(t) t.start() for t in threads: t.join() TempFile.clean() finsih_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finsih_time) Log.result("Time ellapsed: {0}", finsih_time - time) @staticmethod def _run_jobs(jobs): for job in jobs: job.compute() def _execute_diagnostic(self, diag_options, startdate, member, chunk): diag = diag_options[0] if diag == 'vertmeanmeters': variable = diag_options[1] depth_min = int(diag_options[2]) depth_max = int(diag_options[3]) depth = '{0}m-{1}m'.format(depth_min, depth_max) variables = (variable, '{0}mean{1}'.format(variable, depth)) for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): General.vertical_mean_meters(input_file, output_file, variable, depth_min, depth_max) elif diag == 'vertmean': variable = diag_options[1] lev_min = int(diag_options[2]) lev_max = int(diag_options[3]) lev = '{0}-{1}'.format(lev_min, lev_max) variables = (variable, '{0}mean{1}'.format(variable, lev)) for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): General.vertical_mean(input_file, output_file, variable, lev_min, lev_max) elif diag == 'convection': variables = ('mlotst', 'mlotstsites') for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Circulation.convection_sites(input_file, self.nemo_version, output_file) elif diag == 'psi': variables = ('uo', 'vo', 'vsftbarot') for [u_file, v_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Circulation.psi(u_file, v_file, output_file) elif diag == 'gyres': variables = ('vsftbarot', 'vsftbarotgyres') for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Circulation.gyres(str(input_file), self.nemo_version, str(output_file)) elif diag == 'ohc': basin = diag_options[1] mixed_layer = int(diag_options[2]) depth_min = int(diag_options[3]) depth_max = int(diag_options[4]) if mixed_layer == 1: mxl = 'mlotst' depth = '' elif mixed_layer == 0: mxl = '' depth = '{0}-{1}'.format(depth_min, depth_max) else: mxl = 'nomlotst' depth = '' variables = ('thetao', 'mlotst', 'ohcsum{0}{1}', 'ohcvmean{0}{1}'.format(mxl, depth)) for [input_file, mlotst_file, ohcsum_file, ohcvmean_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Heat.total(input_file, mlotst_file, ohcsum_file, ohcvmean_file, basin, mixed_layer, depth_min, depth_max) elif diag == 'ohclayer': depth_min = int(diag_options[1]) depth_max = int(diag_options[2]) depth = '{0}-{1}'.format(depth_min, depth_max) variables = ('thetao', 'ohc{0}'.format(depth)) for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Heat.layer(input_file, output_file, depth_min, depth_max) elif diag in ['moc', 'vsftmyz']: variables = ('vo', 'vsftmyz') for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Circulation.moc(input_file, output_file) elif diag in ['mocmax', 'vsftmyzmax']: if len(diag_options) != 5: Log.warning('vsftmyzmax requires 4 arguments. Skipping!') return lat_min = int(diag_options[1]) lat_max = int(diag_options[2]) lat = '{0}-{1}'.format(lat_min, lat_max) depth_min = int(diag_options[3]) depth_max = int(diag_options[4]) depth = '{0}-{1}'.format(depth_min, depth_max) variables = ('vsftmyz', 'vsftmyzmax') for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Circulation.max_moc(input_file, lat_min, lat_max, output_file.replace('vsftmyzmax', 'vsftmyzmax{0}-{1}'.format(lat, depth)), depth_min, depth_max) elif diag in ['mocarea', 'vsftmyzarea']: if len(diag_options) != 6: Log.warning('vsftmyzarea requires between 5 arguments. Skipping!') return lat_min = int(diag_options[1]) lat_max = int(diag_options[2]) lat = '{0}{1}'.format(Utils.get_cardinal_coordinate(lat_min, True), Utils.get_cardinal_coordinate(lat_max, True)) depth_min = int(diag_options[3]) depth_max = int(diag_options[4]) depth = '{0}-{1}'.format(depth_min, depth_max) basin = diag_options[5] variables = ('vsftmyz', 'vsftmyz{0}{1}{2}'.format(lat, depth, basin)) for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Circulation.area_moc(input_file, lat_min, lat_max, output_file, depth_min, depth_max, basin) elif diag == 'mlotsthc': variables = ('thetao', 'mlotst', 'ohcvertsummlotst') for [input_file, mlotst_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Heat.mixed_layer_content(input_file, mlotst_file, output_file) elif diag == 'mlotstsc': variables = ('so', 'mlotst', 'scvertsummlotst') for [input_file, mlotst_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): Salinity.mixed_layer_content(input_file, mlotst_file, output_file) elif diag == 'interp3d': variable = diag_options[1] if len(diag_options) == 3: domain = diag_options[2] if domain == 'seaice': domain = 'seaIce' else: domain = 'ocean' input_files = self.datamanager.get_files(startdate, member, chunk, domain, [variable]) output_files = self.datamanager.get_files(startdate, member, chunk, domain, [variable], 'regular') for x in range(0, len(input_files[0])): General.interpolate(input_files[0][x], output_files[0][x], variable, self.nemo_version) elif diag == 'cutsection': variable = diag_options[1] zonal = diag_options[2] == 'z' value = int(diag_options[3]) coordinate = Utils.get_cardinal_coordinate(value, zonal) variables = (variable, '{0}{1}'.format(variable, coordinate)) for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables, 'regular'): General.cut_section(input_file, output_file, variable, zonal, value) elif diag == 'avgsection': variable = diag_options[1] lon_min = int(diag_options[2]) lon_max = int(diag_options[3]) lat_min = int(diag_options[4]) lat_max = int(diag_options[5]) output_name = '{0}{1}{2}{3}{4}'.format(variable, Utils.get_cardinal_coordinate(lon_min, False), Utils.get_cardinal_coordinate(lon_max, False), Utils.get_cardinal_coordinate(lat_min, True), Utils.get_cardinal_coordinate(lat_max, True)) variables = (variable, output_name) for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables): General.avgsection(input_file, output_file, lon_min, lon_max, lat_min, lat_max) else: Log.warning('Diagnostic {0} not available', diag) return def _get_commands(self): Log.debug('Preparing command list') commands = self.diags.split() for alias, added_commands in self._aliases.items(): if alias in commands: Log.debug('Changing alias {0} for {1}', alias, ' '.join(added_commands)) commands.remove(alias) for add_command in added_commands: commands.append(add_command) Log.debug('Command list ready ') return commands def _prepare_mesh_files(self): Log.info('Copying mesh files') self._copy_file(os.path.join(self.con_files, 'mesh_mask_nemo.{0}.nc'.format(self.nemo_version)), 'mesh_hgr.nc') self._link_file('mesh_hgr.nc', 'mesh_zgr.nc') self._link_file('mesh_hgr.nc', 'mask.nc') self._copy_file(os.path.join(self.con_files, 'new_maskglo.{0}.nc'.format(self.nemo_version)), 'new_maskglo.nc') self._copy_file(os.path.join(self.con_files, 'mask.regions.{0}.nc'.format(self.nemo_version)), 'mask_regions.nc') self._copy_file(os.path.join(self.con_files, 'mask.regions.3d.{0}.nc'.format(self.nemo_version)), 'mask_regions.3d.nc') Log.result('Mesh files ready!') def _copy_file(self, source, destiny): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version) return if os.path.exists(destiny): if os.stat(source).st_size == os.stat(destiny).st_size: Log.info('File {0} already exists', destiny) return Log.info('Creating file {0}', destiny) shutil.copy(source, destiny) Log.info('File {0} ready', destiny) Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True) def _link_file(self, source, destiny): if not os.path.exists(source): Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version) if os.path.exists(destiny): if os.stat(source).st_size == os.stat(destiny).st_size: Log.info('File {0} already exists', destiny) return else: os.remove(destiny) os.symlink(source, destiny) Log.info('File {0} ready', destiny) def _read_config(self, config_file): self.parser = Parser() self.parser.optionxform = str self.parser.read(config_file) # Read diags config self.scratch_dir = self.parser.get_option('DIAGNOSTICS', 'SCRATCH_DIR') self.data_dir = self.parser.get_option('DIAGNOSTICS', 'DATA_DIR') self.con_files = self.parser.get_option('DIAGNOSTICS', 'CON_FILES') self.diags = self.parser.get_option('DIAGNOSTICS', 'DIAGS').lower() self.frequency = self.parser.get_option('DIAGNOSTICS', 'FREQUENCY') self.cdftools_path = self.parser.get_option('DIAGNOSTICS', 'CDFTOOLS_PATH') self.max_cores = self.parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 100000) # Read experiment config self.institute = self.parser.get_option('EXPERIMENT', 'INSTITUTE') self.expid = self.parser.get_option('EXPERIMENT', 'EXPID') self.experiment_name = self.parser.get_option('EXPERIMENT', 'NAME') self.members = list() for member in self.parser.get_option('EXPERIMENT', 'MEMBERS').split(): self.members.append(int(member)) self.startdates = self.parser.get_option('EXPERIMENT', 'STARTDATES').split() self.chunk_size = self.parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE') self.chunks = self.parser.get_int_option('EXPERIMENT', 'CHUNKS') self.model = self.parser.get_option('EXPERIMENT', 'MODEL') self.nemo_version = self.parser.get_option('EXPERIMENT', 'NEMO_VERSION') self.add_name = self.parser.get_bool_option('CMOR', 'ADD_NAME') self.add_startdate = self.parser.get_bool_option('CMOR', 'ADD_STARTDATE') # Read aliases self._aliases = dict() if self.parser.has_section('ALIAS'): for option in self.parser.options('ALIAS'): self._aliases[option.lower()] = self.parser.get_option('ALIAS', option).lower().split() self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.expid) if not os.path.exists(self.scratch_dir): os.makedirs(self.scratch_dir) os.chdir(self.scratch_dir) def main(): parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.') parser.add_argument('-v', '--version', action='version', version='0.1', help="returns Earth Diagnostics's version number and exit") parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='DEBUG', type=str, help="sets file's log level.") parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING', 'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'), default='INFO', type=str, help="sets console's log level") parser.add_argument('-f', '--configfile', default='diags.conf', type=str) args = parser.parse_args() Log.set_console_level(args.logconsole) Log.set_file_level(args.logfile) diags = Diags(args.configfile) diags.run() TempFile.clean() if __name__ == "__main__": main()