Newer
Older
Javier Vegas-Regidor
committed
#!/usr/bin/env python
import argparse
import shutil
Javier Vegas-Regidor
committed
import threading
Javier Vegas-Regidor
committed
import pkg_resources
import netCDF4
Javier Vegas-Regidor
committed
import operator
from bscearth.utils.date import *
Javier Vegas-Regidor
committed
from earthdiagnostics.cmormanager import CMORManager
from earthdiagnostics.threddsmanager import THREDDSManager
from earthdiagnostics.diagnostic import Diagnostic
from earthdiagnostics.ocean import *
from earthdiagnostics.general import *
from earthdiagnostics.statistics import *
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.diagnostic import DiagnosticOptionError
:param config_file: path to the configuration file
:type config_file: str
Javier Vegas-Regidor
committed
# Get the version number from the relevant file. If not, from autosubmit package
scriptdir = os.path.abspath(os.path.dirname(__file__))
if not os.path.exists(os.path.join(scriptdir, 'VERSION')):
scriptdir = os.path.join(scriptdir, os.path.pardir)
version_path = os.path.join(scriptdir, 'VERSION')
readme_path = os.path.join(scriptdir, 'README')
changes_path = os.path.join(scriptdir, 'CHANGELOG')
documentation_path = os.path.join(scriptdir, 'EarthDiagnostics.pdf')
Javier Vegas-Regidor
committed
if os.path.isfile(version_path):
with open(version_path) as f:
Javier Vegas-Regidor
committed
version = f.read().strip()
Javier Vegas-Regidor
committed
version = pkg_resources.require("earthdiagnostics")[0].version
def __init__(self, config_file):
Log.info('Initialising Earth Diagnostics Version {0}', EarthDiags.version)
Javier Vegas-Regidor
committed
self.config = Config(config_file)
Javier Vegas-Regidor
committed
TempFile.scratch_folder = self.config.scratch_dir
cdftools.path = self.config.cdftools_path
Javier Vegas-Regidor
committed
self.time = dict()
self.data_manager = None
self.threads = None
self.had_errors = False
Log.info('Running diags for experiment {0}, startdates {1}, members {2}', self.config.experiment.expid,
self.config.experiment.startdates, self.config.experiment.members)
Javier Vegas-Regidor
committed
@staticmethod
def parse_args():
"""
Entry point for the Earth Diagnostics. For more detailed documentation, use -h option
"""
Javier Vegas-Regidor
committed
# try:
parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.')
parser.add_argument('-v', '--version', action='version', version=EarthDiags.version,
help="returns Earth Diagnostics's version number and exit")
parser.add_argument('--doc', action='store_true',
help="opens documentation and exits")
parser.add_argument('--clean', action='store_true',
help="clean the scratch folder and exits")
parser.add_argument('--report', action='store_true',
help="generates a report about the available files")
Javier Vegas-Regidor
committed
parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='DEBUG', type=str,
help="sets file's log level.")
parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='INFO', type=str,
help="sets console's log level")
parser.add_argument('-log', '--logfilepath', default=None, type=str)
parser.add_argument('-f', '--configfile', default='diags.conf', type=str)
args = parser.parse_args()
if args.doc:
Log.info('Opening documentation...')
doc_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'EarthDiagnostics.pdf')
Utils.execute_shell_command(('xdg-open', doc_path))
Log.result('Documentation opened!')
Javier Vegas-Regidor
committed
Log.set_console_level(args.logconsole)
Log.set_file_level(args.logfile)
Javier Vegas-Regidor
committed
if Log.console_handler.level <= Log.DEBUG:
Utils.cdo.debug = True
Utils.nco.debug = False # This is due to a bug in nco. Must change when it's solved
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
if args.logfilepath:
Log.set_file(bscearth.utils.path.expand_path(args.logfilepath))
Javier Vegas-Regidor
committed
config_file_path = bscearth.utils.path.expand_path(args.configfile)
Javier Vegas-Regidor
committed
if not os.path.isfile(config_file_path):
Log.critical('Configuration file {0} can not be found', config_file_path)
Javier Vegas-Regidor
committed
return False
try:
diags = EarthDiags(config_file_path)
if args.clean:
result = diags.clean()
elif args.report:
result = diags.report()
else:
result = diags.run()
finally:
TempFile.clean()
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
def _create_dic_variables(self):
self.dic_variables = dict()
self.dic_variables['x'] = 'i'
self.dic_variables['y'] = 'j'
self.dic_variables['z'] = 'lev'
self.dic_variables['nav_lon'] = 'lon'
self.dic_variables['nav_lat'] = 'lat'
self.dic_variables['nav_lev'] = 'lev'
self.dic_variables['time_counter'] = 'time'
self.dic_variables['t'] = 'time'
self.had_errors = False
Log.debug('Using netCDF version {0}', netCDF4.getlibversion())
Javier Vegas-Regidor
committed
if not os.path.exists(self.config.scratch_dir):
os.makedirs(self.config.scratch_dir)
os.chdir(self.config.scratch_dir)
# Run diagnostics
Log.info('Running diagnostics')
self._failed_jobs = []
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
Javier Vegas-Regidor
committed
threads = list()
for num_thread in range(0, self.threads):
Javier Vegas-Regidor
committed
self.time[num_thread] = dict()
t = threading.Thread(target=EarthDiags._run_jobs, args=(self, list_jobs, num_thread))
Javier Vegas-Regidor
committed
threads.append(t)
t.start()
for t in threads:
t.join()
Javier Vegas-Regidor
committed
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - time)
self.print_errors()
def _prepare_data_manager(self):
if self.config.data_adaptor == 'CMOR':
self.data_manager = CMORManager(self.config)
elif self.config.data_adaptor == 'THREDDS':
self.data_manager = THREDDSManager(self.config)
self.data_manager.prepare()
Javier Vegas-Regidor
committed
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
Javier Vegas-Regidor
committed
total = dict()
for num_thread in range(0, self.threads):
Javier Vegas-Regidor
committed
for key, value in self.time[num_thread].items():
Javier Vegas-Regidor
committed
if key in total:
total[key] += value
else:
total[key] = value
for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
Javier Vegas-Regidor
committed
Log.info('{0:23} {1:}', diag.__name__, time)
def print_errors(self):
if len(self._failed_jobs) == 0:
return
self.had_errors = True
Log.error('Failed jobs')
Log.error('-----------')
for job in self._failed_jobs:
Log.error(str(job))
Log.info('')
def prepare_job_list(self):
list_jobs = Queue.Queue()
for fulldiag in self.config.get_commands():
Log.info("Adding {0} to diagnostic list", fulldiag)
diag_options = fulldiag.split(',')
diag_class = Diagnostic.get_diagnostic(diag_options[0])
if diag_class:
try:
for job in diag_class.generate_jobs(self, diag_options):
list_jobs.put(job)
continue
except DiagnosticOptionError as ex:
Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex)
self.had_errors = True
else:
Log.error('{0} is not an available diagnostic', diag_options[0])
@staticmethod
def _register_diagnostics():
EarthDiags._register_ocean_diagnostics()
EarthDiags._register_general_diagnostics()
EarthDiags._register_stats_diagnostics()
@staticmethod
def _register_stats_diagnostics():
Diagnostic.register(MonthlyPercentile)
Diagnostic.register(ClimatologicalPercentile)
@staticmethod
def _register_general_diagnostics():
Diagnostic.register(MonthlyMean)
Diagnostic.register(Rewrite)
Diagnostic.register(Relink)
@staticmethod
def _register_ocean_diagnostics():
Diagnostic.register(MixedLayerSaltContent)
Diagnostic.register(Siasiesiv)
Diagnostic.register(VerticalMean)
Diagnostic.register(VerticalMeanMeters)
Diagnostic.register(Interpolate)
Diagnostic.register(InterpolateCDO)
Diagnostic.register(Moc)
Diagnostic.register(AreaMoc)
Diagnostic.register(MaxMoc)
Diagnostic.register(Psi)
Diagnostic.register(Gyres)
Diagnostic.register(ConvectionSites)
Diagnostic.register(CutSection)
Diagnostic.register(AverageSection)
Diagnostic.register(MixedLayerHeatContent)
Diagnostic.register(HeatContentLayer)
Diagnostic.register(HeatContent)
Diagnostic.register(RegionMean)
Diagnostic.register(Rotation)
def clean(self):
Log.info('Removing scratch folder...')
Javier Vegas-Regidor
committed
if os.path.exists(self.config.scratch_dir):
shutil.rmtree(self.config.scratch_dir)
Log.result('Scratch folder removed')
def report(self):
Log.info('Looking for existing vars...')
self._prepare_data_manager()
for startdate in self.config.experiment.startdates:
for member in self.config.experiment.members:
results = self._get_variable_report(startdate, member)
report_path = os.path.join(self.config.scratch_dir, '{0}_fc{1}.report'.format(startdate, member))
self.create_report(report_path, results)
Log.result('Report finished')
return True
def _get_variable_report(self, startdate, member):
var_manager = VariableManager()
results = list()
for var in var_manager.get_all_variables():
if var.priority is None or var.domain is None:
continue
for table in var.tables:
if not self.data_manager.file_exists(var.domain, var.short_name, startdate, member, 1,
frequency=table.frequency):
results.append((var, table))
return results
def create_report(self, report_path, results):
current_table = None
current_priority = 0
results = sorted(results, key=lambda result: result[0].short_name)
results = sorted(results, key=lambda result: result[0].priority)
results = sorted(results, key=lambda result: result[1].name)
file_handler = open(report_path, 'w')
for var, table in results:
if current_table != table.name:
file_handler.write('\nTable {0}\n'.format(table.name))
file_handler.write('===================================\n')
current_table = table.name
current_priority = 0
if current_priority != var.priority:
file_handler.write('\nMissing variables with priority {0}:\n'.format(var.priority))
file_handler.write('--------------------------------------\n')
current_priority = var.priority
file_handler.write('{0:12}: {1}\n'.format(var.short_name, var.standard_name))
file_handler.close()
Javier Vegas-Regidor
committed
def _run_jobs(self, queue, numthread):
def _run_job(current_job, retrials=1):
Javier Vegas-Regidor
committed
while retrials >= 0:
Log.info('Starting {0}', current_job)
Javier Vegas-Regidor
committed
time = datetime.datetime.now()
Javier Vegas-Regidor
committed
time = datetime.datetime.now() - time
if type(current_job) in self.time[numthread]:
self.time[numthread][type(current_job)] += time
else:
self.time[numthread][type(current_job)] = time
return True
except Exception as ex:
retrials -= 1
while not queue.empty():
try:
job = queue.get(timeout=1)
if _run_job(job):
count += 1
queue.task_done()
except Queue.Empty:
continue
if len(failed_jobs) == 0:
Log.result('Thread {0} finished after taking care of {1} tasks', numthread, count)
else:
Log.result('Thread {0} finished after running successfully {1} of {2} tasks', numthread, count,
count + len(failed_jobs))
self._failed_jobs += failed_jobs
Javier Vegas-Regidor
committed
def _prepare_mesh_files(self):
Log.info('Copying mesh files')
Javier Vegas-Regidor
committed
con_files = self.config.con_files
model_version = self.config.experiment.model_version
restore_meshes = self.config.restore_meshes
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
mesh_mask = 'mesh_mask_nemo.{0}.nc'.format(model_version)
new_mask_glo = 'new_maskglo.{0}.nc'.format(model_version)
mask_regions = 'mask.regions.{0}.nc'.format(model_version)
mask_regions_3d = 'mask.regions.3d.{0}.nc'.format(model_version)
if self.config.scratch_masks:
Utils.create_folder_tree(self.config.scratch_masks)
if self._copy_file(os.path.join(con_files, mesh_mask), os.path.join(self.config.scratch_masks, mesh_mask),
restore_meshes):
self._link_file(os.path.join(self.config.scratch_masks, mesh_mask), 'mesh_hgr.nc')
self._link_file(os.path.join(self.config.scratch_masks, mesh_mask), 'mesh_zgr.nc')
self._link_file(os.path.join(self.config.scratch_masks, mesh_mask), 'mask.nc')
if self._copy_file(os.path.join(con_files, new_mask_glo),
os.path.join(self.config.scratch_masks, new_mask_glo), restore_meshes):
self._link_file(os.path.join(self.config.scratch_masks, mesh_mask), 'new_maskglo.nc')
if self._copy_file(os.path.join(con_files, mask_regions),
os.path.join(self.config.scratch_masks, mask_regions), restore_meshes):
self._link_file(os.path.join(self.config.scratch_masks, mesh_mask), 'mask_regions.nc')
if self._copy_file(os.path.join(con_files, mask_regions_3d),
os.path.join(self.config.scratch_masks, mask_regions_3d), restore_meshes):
self._link_file(os.path.join(self.config.scratch_masks, mesh_mask), 'mask_regions.3d.nc')
else:
self._copy_file(os.path.join(con_files, mesh_mask), 'mesh_hgr.nc', restore_meshes)
self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
self._link_file('mesh_hgr.nc', 'mask.nc')
self._copy_file(os.path.join(con_files, new_mask_glo), 'new_maskglo.nc',
restore_meshes)
self._copy_file(os.path.join(con_files, mask_regions),
'mask_regions.nc', restore_meshes)
self._copy_file(os.path.join(con_files, mask_regions_3d),
'mask_regions.3d.nc', restore_meshes)
def _copy_file(self, source, destiny, force):
Javier Vegas-Regidor
committed
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
return False
if not force and os.path.exists(destiny):
if os.stat(source).st_size == os.stat(destiny).st_size:
Log.info('File {0} already exists', destiny)
return True
Log.info('Creating file {0}', destiny)
shutil.copy(source, destiny)
Log.info('File {0} ready', destiny)
Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True)
return True
def _link_file(self, source, destiny):
if not os.path.exists(source):
Javier Vegas-Regidor
committed
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
if os.path.lexists(destiny):
os.remove(destiny)
os.symlink(source, destiny)
Log.info('File {0} ready', destiny)
Javier Vegas-Regidor
committed
if not EarthDiags.parse_args():
exit(1)
if __name__ == "__main__":