# coding=utf-8 """Earthdiagnostics workflow manager""" import time import datetime import operator import sys import threading import traceback # noinspection PyCompatibility from concurrent.futures import ThreadPoolExecutor from functools import cmp_to_key from threading import Thread import six from bscearth.utils.log import Log from earthdiagnostics.datafile import StorageStatus, LocalStatus from earthdiagnostics.diagnostic import ( DiagnosticStatus, Diagnostic, DiagnosticOptionError, ) from earthdiagnostics.utils import Utils, TempFile class WorkManager(object): """ Class to produce and control the workflow of EarthDiagnostics Parameters ---------- config: Config data_manager: DataManager """ def __init__(self, config, data_manager): self.jobs = {} self.jobs[DiagnosticStatus.WAITING] = [] self.jobs[DiagnosticStatus.READY] = [] self.jobs[DiagnosticStatus.RUNNING] = [] self.jobs[DiagnosticStatus.COMPLETED] = [] self.jobs[DiagnosticStatus.FAILED] = [] self.config = config self.time = {} self.had_errors = False self.data_manager = data_manager self.uploads_failed = [] def prepare_job_list(self): """Create the list of jobs to run""" self._register_diagnostics() for fulldiag in self.config.get_commands(): Log.info("Adding {0} to diagnostic list", fulldiag) diag_options = fulldiag.split(",") diag_class = Diagnostic.get_diagnostic(diag_options[0]) if diag_class: try: for job in diag_class.generate_jobs(self, diag_options): self.add_job(job) for subjob in job.subjobs: self.add_job(subjob) except DiagnosticOptionError as ex: Log.error( "Can not configure diagnostic {0}: {1}", diag_options[0], ex, ) self.had_errors = True else: Log.error( "{0} is not an available diagnostic", diag_options[0] ) self.had_errors = True def add_job(self, job, old_status=None): if old_status is not None: self.jobs[old_status].remove(job) if job not in self.jobs[job.status]: self.jobs[job.status].append(job) def run(self): """ Run all the diagnostics Returns ------- bool Only True if all diagnostic were correctly executed """ if ( not self.jobs[DiagnosticStatus.WAITING] and not self.jobs[DiagnosticStatus.READY] ): Log.result("No diagnostics to run") return True start_time = datetime.datetime.now() Log.info("Starting to compute at {0}", start_time) self.threads = Utils.available_cpu_count() if 0 < self.config.max_cores < self.threads: self.threads = self.config.max_cores Log.info("Using {0} threads", self.threads) self.downloader = Downloader() self.uploader = ThreadPoolExecutor(self.config.parallel_uploads) self.executor = ThreadPoolExecutor(self.threads) self.lock = threading.Lock() self.lock.acquire() for job in self.jobs[DiagnosticStatus.WAITING].copy(): job.request_data() job.declare_data_generated() job.subscribe(self, self._job_status_changed) if self.config.skip_diags_done: if job.can_skip_run(): Log.info("Diagnostic {0} already done. Skipping !", job) job.status = DiagnosticStatus.COMPLETED continue job.check_is_ready() for file_object in self.data_manager.requested_files.values(): file_object.subscribe(self, self._file_object_status_changed) if file_object.download_required(): self.downloader.submit(file_object) self.downloader.start() printer_lock = threading.Lock() printer_lock.acquire() printer = Thread( target=self._print_status, name="Printer", args=(printer_lock,) ) printer.start() self.lock.acquire() printer_lock.release() self.downloader.shutdown() self.executor.shutdown() self.uploader.shutdown(True) printer.join() TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) Log.result("Elapsed time: {0}\n", finish_time - start_time) self._print_errors() self._print_stats() return not self.had_errors def _print_status(self, lock): interval = 60 while True: if six.PY3: if lock.acquire(blocking=True, timeout=interval): return else: step = 0.5 for _ in range(int(interval / step)): if lock.acquire(False): return time.sleep(step) Log.info("Current status:") Log.info("===============") Log.info( "Waiting: {0:4}", len(self.jobs[DiagnosticStatus.WAITING]) ) Log.info( "Ready to run: {0:4}", len(self.jobs[DiagnosticStatus.READY]) ) Log.info( "Running: {0:4}", len(self.jobs[DiagnosticStatus.RUNNING]) ) Log.info( "Completed: {0:4}", len(self.jobs[DiagnosticStatus.COMPLETED]), ) Log.info( "Failed: {0:4}", len(self.jobs[DiagnosticStatus.FAILED]) ) Log.info("===============") def _job_status_changed(self, job, old_status): self.add_job(job, old_status) if job.status == DiagnosticStatus.READY: self.executor.submit(self._run_job, job) for request in job._requests: if request.only_suscriber(self): del self.data_manager.requested_files[request.remote_file] request.unsubscribe(self) request.clean_local() self._check_completion() def _file_object_status_changed(self, file_object): Log.debug( "Checking file {0}. Local status {0.local_status} " "Storage status{0.storage_status}", file_object, ) if file_object.download_required(): self.downloader.submit(file_object) return if file_object.upload_required(): file_object.storage_status = StorageStatus.UPLOADING self.uploader.submit(file_object.upload) return if file_object.storage_status == StorageStatus.FAILED: self.uploads_failed.append(file_object) if ( file_object.local_status != LocalStatus.COMPUTING and file_object.storage_status != StorageStatus.UPLOADING and file_object.only_suscriber(self) ): del self.data_manager.requested_files[file_object.remote_file] file_object.unsubscribe(self) file_object.clean_local() self._check_completion() def _check_completion(self): self._pause_downloader_if_required() if self._jobs_running_or_ready(): return False if self._data_downloading_or_uploading(): return False try: self.lock.release() except threading.ThreadError: pass return True def _jobs_running_or_ready(self): if ( self.jobs[DiagnosticStatus.READY] or self.jobs[DiagnosticStatus.RUNNING] ): return True for job in self.jobs[DiagnosticStatus.WAITING]: if job.all_requests_in_storage(): return True return False def _data_downloading_or_uploading(self): for request in self.data_manager.requested_files.values(): if request.storage_status == StorageStatus.UPLOADING: return True if request.local_status == LocalStatus.DOWNLOADING: return True if request.upload_required(): return True if request.download_required(): return True return False def _pause_downloader_if_required(self): self.downloader.on_hold = len(self.jobs[DiagnosticStatus.READY]) > 20 def _print_stats(self): Log.info("Time consumed by each diagnostic class") Log.info("--------------------------------------") times = {} for job in ( self.jobs[DiagnosticStatus.COMPLETED] + self.jobs[DiagnosticStatus.FAILED] ): job_type = job.alias if job_type in times.keys(): times[job_type] += job.consumed_time else: times[job_type] = job.consumed_time for diag in sorted(times, key=operator.itemgetter(1)): Log.info("{0:23} {1:}", diag, times[diag]) def _print_errors(self): if self.jobs[DiagnosticStatus.FAILED]: self.had_errors = True Log.error("Failed jobs") Log.error("-----------") for job in self.jobs[DiagnosticStatus.FAILED]: Log.error("{0}: {0.message}", job) Log.error( "Total wasted time: {0}", sum( [ job.consumed_time for job in self.jobs[DiagnosticStatus.FAILED] ], datetime.timedelta(), ), ) Log.info("") if self.uploads_failed: self.had_errors = True Log.error("Failed uploads") Log.error("--------------") for file_object in self.uploads_failed: Log.error("{0}", file_object.remote_file) Log.info("") @staticmethod def _run_job(job): start_time = datetime.datetime.now() try: Log.info("Starting {0}", job) job.status = DiagnosticStatus.RUNNING job.compute() except Exception as ex: job.consumed_time = datetime.datetime.now() - start_time exc_type, _, exc_traceback = sys.exc_info() job.message = "{0}\n{1}".format( ex, "".join(traceback.format_tb(exc_traceback)) ) Log.error("Job {0} failed ({2}): {1}", job, job.message, exc_type) job.status = DiagnosticStatus.FAILED return False job.consumed_time = datetime.datetime.now() - start_time Log.result("Finished {0}", job) job.status = DiagnosticStatus.COMPLETED return True count = 0 @staticmethod def _register_diagnostics(): WorkManager._register_ocean_diagnostics() WorkManager._register_general_diagnostics() WorkManager._register_stats_diagnostics() @staticmethod def _register_stats_diagnostics(): from earthdiagnostics.statistics.monthlypercentile import ( MonthlyPercentile, ) from earthdiagnostics.statistics.climatologicalpercentile import ( ClimatologicalPercentile, ) from earthdiagnostics.statistics.daysoverpercentile import ( DaysOverPercentile, ) from earthdiagnostics.statistics.discretize import Discretize Diagnostic.register(MonthlyPercentile) Diagnostic.register(ClimatologicalPercentile) Diagnostic.register(DaysOverPercentile) Diagnostic.register(Discretize) @staticmethod def _register_general_diagnostics(): from earthdiagnostics.general.attribute import Attribute from earthdiagnostics.general.timemean import ( DailyMean, MonthlyMean, YearlyMean, ) from earthdiagnostics.general.module import Module from earthdiagnostics.general.rewrite import Rewrite from earthdiagnostics.general.scale import Scale from earthdiagnostics.general.verticalmeanmetersiris import ( VerticalMeanMetersIris, ) from earthdiagnostics.general.simplify_dimensions import ( SimplifyDimensions, ) Diagnostic.register(DailyMean) Diagnostic.register(MonthlyMean) Diagnostic.register(YearlyMean) Diagnostic.register(Rewrite) Diagnostic.register(Scale) Diagnostic.register(Attribute) Diagnostic.register(Module) Diagnostic.register(VerticalMeanMetersIris) Diagnostic.register(SimplifyDimensions) @staticmethod def _register_ocean_diagnostics(): from .ocean.mixedlayerheatcontent import MixedLayerHeatContent from .ocean.mixedlayersaltcontent import MixedLayerSaltContent from .ocean.siasiesiv import Siasiesiv from .ocean.verticalmean import VerticalMean from .ocean.verticalmeanmeters import VerticalMeanMeters from .ocean.verticalgradient import VerticalGradient from .ocean.indices import Indices from .ocean.interpolate import Interpolate from .ocean.interpolatecdo import InterpolateCDO from .ocean.moc import Moc from .ocean.areamoc import AreaMoc from .ocean.maxmoc import MaxMoc from .ocean.psi import Psi from .ocean.gyres import Gyres from .ocean.convectionsites import ConvectionSites from .ocean.cutsection import CutSection from .ocean.averagesection import AverageSection from .ocean.heatcontentlayer import HeatContentLayer from .ocean.heatcontent import HeatContent from .ocean.regionmean import RegionMean from .ocean.regionsum import RegionSum from .ocean.rotation import Rotation from .ocean.sivolume import Sivolume from .ocean.sivol2d import Sivol2d from .ocean.zonalmean import ZonalMean from .ocean.density import Density Diagnostic.register(MixedLayerSaltContent) Diagnostic.register(Siasiesiv) Diagnostic.register(VerticalMean) Diagnostic.register(VerticalMeanMeters) Diagnostic.register(Indices) Diagnostic.register(Interpolate) Diagnostic.register(InterpolateCDO) Diagnostic.register(Moc) Diagnostic.register(AreaMoc) Diagnostic.register(MaxMoc) Diagnostic.register(Psi) Diagnostic.register(Gyres) Diagnostic.register(ConvectionSites) Diagnostic.register(CutSection) Diagnostic.register(AverageSection) Diagnostic.register(MixedLayerHeatContent) Diagnostic.register(HeatContentLayer) Diagnostic.register(HeatContent) Diagnostic.register(RegionMean) Diagnostic.register(RegionSum) Diagnostic.register(Rotation) Diagnostic.register(VerticalGradient) Diagnostic.register(Sivolume) Diagnostic.register(Sivol2d) Diagnostic.register(ZonalMean) Diagnostic.register(Density) class Downloader(object): """ Download manager for EarthDiagnostics We are not using a ThreadPoolExecutor because we want to be able to control priorities in the download """ def __init__(self): self._downloads = [] self._lock = threading.Lock() self.stop = False self.on_hold = False def start(self): """Create the downloader thread and initialize it""" self._thread = threading.Thread(target=self._downloader) self._thread.start() def submit(self, datafile): """Add a datafile to the download queue""" self._lock.acquire() self._downloads.append(datafile) self._lock.release() def _downloader(self): try: while True: with self._lock: if not self._downloads or self.on_hold: if self.stop: return time.sleep(0.1) continue self._downloads.sort( key=cmp_to_key(Downloader._prioritize) ) datafile = self._downloads[0] self._downloads.remove(datafile) datafile.download() except Exception as ex: Log.critical( "Unhandled error at downloader: {0}\n{1}", ex, traceback.print_exc(), ) @staticmethod def _suscribers_waiting(datafile): waiting = 0 for diag in datafile.suscribers: if not isinstance(diag, Diagnostic): continue if diag.pending_requests() == 1: waiting += 1 return waiting @staticmethod def _prioritize(datafile1, datafile2): waiting = Downloader._suscribers_waiting( datafile1 ) - Downloader._suscribers_waiting(datafile2) if waiting: return -waiting suscribers2 = len(datafile2.suscribers) if datafile1.suscribers is None: suscribers1 = 0 else: suscribers1 = len(datafile1.suscribers) if datafile2.suscribers is None: suscribers2 = 0 else: suscribers2 = len(datafile2.suscribers) suscribers = suscribers1 - suscribers2 if suscribers: return -suscribers return 0 def shutdown(self): """Stop the downloader after all downloads have finished""" self.stop = True self._thread.join()