# coding=utf-8 """Earthdiagnostics workflow manager""" import datetime import operator import sys import threading import time import traceback # noinspection PyCompatibility from concurrent.futures import ThreadPoolExecutor from functools import cmp_to_key from threading import Thread import six from bscearth.utils.log import Log from earthdiagnostics.datafile import StorageStatus, LocalStatus from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError from earthdiagnostics.utils import Utils, TempFile class WorkManager(object): """ Class to produce and control the workflow of EarthDiagnostics Parameters ---------- config: Config data_manager: DataManager """ def __init__(self, config, data_manager): self.jobs = {} self.jobs[DiagnosticStatus.WAITING] = [] self.jobs[DiagnosticStatus.READY] = [] self.jobs[DiagnosticStatus.RUNNING] = [] self.jobs[DiagnosticStatus.COMPLETED] = [] self.jobs[DiagnosticStatus.FAILED] = [] self.config = config self.time = {} self.had_errors = False self.data_manager = data_manager self.uploads_failed = [] def prepare_job_list(self): """Create the list of jobs to run""" self._register_diagnostics() for fulldiag in self.config.get_commands(): Log.info("Adding {0} to diagnostic list", fulldiag) diag_options = fulldiag.split(',') diag_class = Diagnostic.get_diagnostic(diag_options[0]) if diag_class: try: for job in diag_class.generate_jobs(self, diag_options): self.add_job(job) for subjob in job.subjobs: self.add_job(subjob) continue except DiagnosticOptionError as ex: Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex) self.had_errors = True else: Log.error('{0} is not an available diagnostic', diag_options[0]) self.had_errors = True def add_job(self, job, old_status=None): if old_status is not None: self.jobs[old_status].remove(job) if job not in self.jobs[job.status]: self.jobs[job.status].append(job) def run(self): """ Run all the diagnostics Returns ------- bool Only True if all diagnostic were correctly executed """ if not self.jobs[DiagnosticStatus.WAITING] and not self.jobs[DiagnosticStatus.READY]: Log.result('No diagnostics to run') return True start_time = datetime.datetime.now() Log.info("Starting to compute at {0}", start_time) self.threads = Utils.available_cpu_count() if 0 < self.config.max_cores < self.threads: self.threads = self.config.max_cores Log.info('Using {0} threads', self.threads) self.downloader = Downloader() self.uploader = ThreadPoolExecutor(self.config.parallel_uploads) self.executor = ThreadPoolExecutor(self.threads) self.lock = threading.Lock() self.lock.acquire() for job in self.jobs[DiagnosticStatus.WAITING]: job.request_data() job.declare_data_generated() job.subscribe(self, self._job_status_changed) if self.config.skip_diags_done: if job.can_skip_run(): Log.info('Diagnostic {0} already done. Skipping !', job) job.status = DiagnosticStatus.COMPLETED continue job.check_is_ready() for file_object in self.data_manager.requested_files.values(): file_object.subscribe(self, self._file_object_status_changed) if file_object.download_required(): self.downloader.submit(file_object) self.downloader.start() printer_lock = threading.Lock() printer_lock.acquire() printer = Thread(target=self._print_status, name="Printer", args=(printer_lock,)) printer.start() self.lock.acquire() printer_lock.release() self.downloader.shutdown() self.executor.shutdown() self.uploader.shutdown(True) printer.join() TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) Log.result("Elapsed time: {0}\n", finish_time - start_time) self._print_errors() self._print_stats() return not self.had_errors def _print_status(self, lock): interval = 60 while True: if six.PY3: if lock.acquire(blocking=True, timeout=interval): return else: step = 0.5 for _ in range(int(interval / step)): if lock.acquire(blocking=False): return sys.sleep(step) Log.info('Current status:') Log.info('===============') Log.info('Waiting: {0:4}', len(self.jobs[DiagnosticStatus.WAITING])) Log.info('Ready to run: {0:4}', len(self.jobs[DiagnosticStatus.READY])) Log.info('Running: {0:4}', len(self.jobs[DiagnosticStatus.RUNNING])) Log.info('Completed: {0:4}', len(self.jobs[DiagnosticStatus.COMPLETED])) Log.info('Failed: {0:4}', len(self.jobs[DiagnosticStatus.FAILED])) def _job_status_changed(self, job, old_status): self.add_job(job, old_status) if job.status == DiagnosticStatus.READY: self.executor.submit(self._run_job, job) for request in job._requests: if request.only_suscriber(self): del self.data_manager.requested_files[request.remote_file] request.unsubscribe(self) request.clean_local() self._check_completion() def _file_object_status_changed(self, file_object): Log.debug('Checking file {0}. Local status {0.local_status} Storage status{0.storage_status}', file_object) if file_object.download_required(): self.downloader.submit(file_object) return if file_object.upload_required(): file_object.storage_status = StorageStatus.UPLOADING self.uploader.submit(file_object.upload) return if file_object.storage_status == StorageStatus.FAILED: self.uploads_failed.append(file_object) if file_object.local_status != LocalStatus.COMPUTING and \ file_object.storage_status != StorageStatus.UPLOADING and \ file_object.only_suscriber(self): del self.data_manager.requested_files[file_object.remote_file] file_object.unsubscribe(self) file_object.clean_local() self._check_completion() def _check_completion(self): self._pause_downloader_if_required() if self._jobs_running_or_ready(): return False if self._data_downloading_or_uploading(): return False try: self.lock.release() except threading.ThreadError: pass return True def _jobs_running_or_ready(self): if self.jobs[DiagnosticStatus.READY] or self.jobs[DiagnosticStatus.RUNNING]: return True for job in self.jobs[DiagnosticStatus.WAITING]: if job.all_requests_in_storage(): return True return False def _data_downloading_or_uploading(self): for request in self.data_manager.requested_files.values(): if request.storage_status == StorageStatus.UPLOADING: return True if request.local_status == LocalStatus.DOWNLOADING: return True if request.upload_required(): return True if request.download_required(): return True return False def _pause_downloader_if_required(self): self.downloader.on_hold = len(self.jobs[DiagnosticStatus.READY]) > 20 def _print_stats(self): Log.info('Time consumed by each diagnostic class') Log.info('--------------------------------------') times = {} for job in self.jobs[DiagnosticStatus.COMPLETED] + self.jobs[DiagnosticStatus.FAILED]: job_type = job.alias if job_type in times.keys(): times[job_type] += job.consumed_time else: times[job_type] = job.consumed_time for diag in sorted(times, key=operator.itemgetter(1)): Log.info('{0:23} {1:}', diag, times[diag]) def _print_errors(self): if self.jobs[DiagnosticStatus.FAILED]: self.had_errors = True Log.error('Failed jobs') Log.error('-----------') for job in self.jobs[DiagnosticStatus.FAILED]: Log.error('{0}: {0.message}', job) Log.error('Total wasted time: {0}', sum([job.consumed_time for job in self.jobs[DiagnosticStatus.FAILED]], datetime.timedelta())) Log.info('') if self.uploads_failed: self.had_errors = True Log.error('Failed uploads') Log.error('--------------') for file_object in self.uploads_failed: Log.error('{0}', file_object.remote_file) Log.info('') @staticmethod def _run_job(job): start_time = datetime.datetime.now() try: Log.info('Starting {0}', job) job.status = DiagnosticStatus.RUNNING job.compute() except Exception as ex: job.consumed_time = datetime.datetime.now() - start_time exc_type, _, exc_traceback = sys.exc_info() job.message = '{0}\n{1}'.format(ex, ''.join(traceback.format_tb(exc_traceback))) Log.error('Job {0} failed ({2}): {1}', job, job.message, exc_type) job.status = DiagnosticStatus.FAILED return False job.consumed_time = datetime.datetime.now() - start_time Log.result('Finished {0}', job) job.status = DiagnosticStatus.COMPLETED return True count = 0 @staticmethod def _register_diagnostics(): WorkManager._register_ocean_diagnostics() WorkManager._register_general_diagnostics() WorkManager._register_stats_diagnostics() @staticmethod def _register_stats_diagnostics(): from earthdiagnostics.statistics.monthlypercentile import MonthlyPercentile from earthdiagnostics.statistics.climatologicalpercentile import ClimatologicalPercentile from earthdiagnostics.statistics.daysoverpercentile import DaysOverPercentile from earthdiagnostics.statistics.discretize import Discretize Diagnostic.register(MonthlyPercentile) Diagnostic.register(ClimatologicalPercentile) Diagnostic.register(DaysOverPercentile) Diagnostic.register(Discretize) @staticmethod def _register_general_diagnostics(): from earthdiagnostics.general.attribute import Attribute from earthdiagnostics.general.timemean import DailyMean, MonthlyMean, YearlyMean from earthdiagnostics.general.module import Module from earthdiagnostics.general.rewrite import Rewrite from earthdiagnostics.general.relink import Relink from earthdiagnostics.general.relinkall import RelinkAll from earthdiagnostics.general.scale import Scale from earthdiagnostics.general.verticalmeanmetersiris import VerticalMeanMetersIris from earthdiagnostics.general.simplify_dimensions import SimplifyDimensions Diagnostic.register(DailyMean) Diagnostic.register(MonthlyMean) Diagnostic.register(YearlyMean) Diagnostic.register(Rewrite) Diagnostic.register(Relink) Diagnostic.register(RelinkAll) Diagnostic.register(Scale) Diagnostic.register(Attribute) Diagnostic.register(Module) Diagnostic.register(VerticalMeanMetersIris) Diagnostic.register(SimplifyDimensions) @staticmethod def _register_ocean_diagnostics(): from earthdiagnostics.ocean.mixedlayerheatcontent import MixedLayerHeatContent from earthdiagnostics.ocean.mixedlayersaltcontent import MixedLayerSaltContent from earthdiagnostics.ocean.siasiesiv import Siasiesiv from earthdiagnostics.ocean.verticalmean import VerticalMean from earthdiagnostics.ocean.verticalmeanmeters import VerticalMeanMeters from earthdiagnostics.ocean.verticalgradient import VerticalGradient from earthdiagnostics.ocean.interpolate import Interpolate from earthdiagnostics.ocean.interpolatecdo import InterpolateCDO from earthdiagnostics.ocean.moc import Moc from earthdiagnostics.ocean.areamoc import AreaMoc from earthdiagnostics.ocean.maxmoc import MaxMoc from earthdiagnostics.ocean.psi import Psi from earthdiagnostics.ocean.gyres import Gyres from earthdiagnostics.ocean.convectionsites import ConvectionSites from earthdiagnostics.ocean.cutsection import CutSection from earthdiagnostics.ocean.averagesection import AverageSection from earthdiagnostics.ocean.heatcontentlayer import HeatContentLayer from earthdiagnostics.ocean.heatcontent import HeatContent from earthdiagnostics.ocean.regionmean import RegionMean from earthdiagnostics.ocean.regionsum import RegionSum from earthdiagnostics.ocean.rotation import Rotation Diagnostic.register(MixedLayerSaltContent) Diagnostic.register(Siasiesiv) Diagnostic.register(VerticalMean) Diagnostic.register(VerticalMeanMeters) Diagnostic.register(Interpolate) Diagnostic.register(InterpolateCDO) Diagnostic.register(Moc) Diagnostic.register(AreaMoc) Diagnostic.register(MaxMoc) Diagnostic.register(Psi) Diagnostic.register(Gyres) Diagnostic.register(ConvectionSites) Diagnostic.register(CutSection) Diagnostic.register(AverageSection) Diagnostic.register(MixedLayerHeatContent) Diagnostic.register(HeatContentLayer) Diagnostic.register(HeatContent) Diagnostic.register(RegionMean) Diagnostic.register(RegionSum) Diagnostic.register(Rotation) Diagnostic.register(VerticalGradient) class Downloader(object): """ Download manager for EarthDiagnostics We are not using a ThreadPoolExecutor because we want to be able to control priorities in the download """ def __init__(self): self._downloads = [] self._lock = threading.Lock() self.stop = False self.on_hold = False def start(self): """Create the downloader thread and initialize it""" self._thread = threading.Thread(target=self._downloader) self._thread.start() def submit(self, datafile): """Add a datafile to the download queue""" self._lock.acquire() self._downloads.append(datafile) self._lock.release() def _downloader(self): try: while True: with self._lock: if not self._downloads or self.on_hold: if self.stop: return time.sleep(0.1) continue downloads = self._downloads[:100] downloads.sort(key=cmp_to_key(Downloader._prioritize)) datafile = downloads[0] self._downloads.remove(datafile) datafile.download() except Exception as ex: Log.critical('Unhandled error at downloader: {0}\n{1}', ex, traceback.print_exc()) @staticmethod def _suscribers_waiting(datafile): waiting = 0 for diag in datafile.suscribers: if not isinstance(diag, Diagnostic): continue if diag.pending_requests() == 1: waiting += 1 return waiting @staticmethod def _prioritize(datafile1, datafile2): waiting = Downloader._suscribers_waiting(datafile1) - Downloader._suscribers_waiting(datafile2) if waiting: return -waiting suscribers = len(datafile1.suscribers) - len(datafile2.suscribers) if suscribers: return -suscribers if datafile1.size is None: if datafile2.size is None: return 0 else: return 1 elif datafile2.size is None: return -1 size = datafile1.size - datafile2.size if size: return size return 0 def shutdown(self): """Stop the downloader after all downloads have finished""" self.stop = True self._thread.join()