# coding=utf-8 import datetime import operator import sys import threading import time import traceback from bscearth.utils.log import Log # noinspection PyCompatibility from concurrent.futures import ThreadPoolExecutor from earthdiagnostics.datafile import StorageStatus, LocalStatus from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError from earthdiagnostics.utils import Utils, TempFile class WorkManager(object): def __init__(self, config, data_manager): self.jobs = None self.config = config self.time = {} self.had_errors = False self.data_manager = data_manager def prepare_job_list(self): self._register_diagnostics() list_jobs = list() for fulldiag in self.config.get_commands(): Log.info("Adding {0} to diagnostic list", fulldiag) diag_options = fulldiag.split(',') diag_class = Diagnostic.get_diagnostic(diag_options[0]) if diag_class: try: for job in diag_class.generate_jobs(self, diag_options): list_jobs.append(job) for subjob in job.subjobs: list_jobs.append(subjob) continue except DiagnosticOptionError as ex: Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex) self.had_errors = True else: Log.error('{0} is not an available diagnostic', diag_options[0]) self.had_errors = True self.jobs = list_jobs def run(self): if len(self.jobs) == 0: Log.result('No diagnostics to run') return True start_time = datetime.datetime.now() Log.info("Starting to compute at {0}", start_time) self.threads = Utils.available_cpu_count() if 0 < self.config.max_cores < self.threads: self.threads = self.config.max_cores Log.info('Using {0} threads', self.threads) self.downloader = Downloader() self.uploader = ThreadPoolExecutor(self.config.parallel_uploads) self.executor = ThreadPoolExecutor(self.threads) for job in self.jobs: job.request_data() job.declare_data_generated() job.subscribe(self, self._job_status_changed) job.check_is_ready() if self.config.skip_diags_done: for job in self.jobs: if job.can_skip_run(): Log.info('Diagnostic {0} already done. Skipping !', job) job.status = DiagnosticStatus.COMPLETED for file_object in self.data_manager.requested_files.values(): file_object.subscribe(self, self._file_object_status_changed) if file_object.download_required(): self.downloader.submit(file_object) self.downloader.start() self.lock = threading.Lock() self.lock.acquire() # self.check_completion() self.lock.acquire() self.downloader.shutdown() self.executor.shutdown() self.uploader.shutdown(True) TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) Log.result("Elapsed time: {0}\n", finish_time - start_time) self.print_errors() self.print_stats() return not self.had_errors def _job_status_changed(self, job): if job.status == DiagnosticStatus.READY: self.executor.submit(self._run_job, job) for request in job._requests: if request.only_suscriber(self): del self.data_manager.requested_files[request.remote_file] request.unsubscribe(self) request.clean_local() self.check_completion() def _file_object_status_changed(self, file_object): Log.debug('Checking file {0}. Local status {0.local_status} Storage status{0.storage_status}', file_object) if file_object.download_required(): self.downloader.submit(file_object) return if file_object.upload_required(): file_object.storage_status = StorageStatus.UPLOADING self.uploader.submit(file_object.upload) return if file_object.local_status != LocalStatus.COMPUTING and \ file_object.storage_status != StorageStatus.UPLOADING and \ file_object.only_suscriber(self): del self.data_manager.requested_files[file_object.remote_file] file_object.unsubscribe(self) file_object.clean_local() self.check_completion() def check_completion(self): counter = 0 for job in self.jobs: if job.status == DiagnosticStatus.READY: counter += 1 if counter > 20: break self.downloader.on_hold = counter > 20 for job in self.jobs: if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING): return False if job.status == DiagnosticStatus.WAITING: if job.all_requests_in_storage(): return False for request in self.data_manager.requested_files.values(): if request.storage_status == StorageStatus.UPLOADING: return False if request.local_status == LocalStatus.DOWNLOADING: return False if request.upload_required(): return False if request.download_required(): return False try: self.lock.release() except Exception: pass return True def print_stats(self): Log.info('Time consumed by each diagnostic class') Log.info('--------------------------------------') times = {} for job in self.jobs: job_type = job.alias if job_type in times.keys(): times[job_type] += job.consumed_time else: times[job_type] = job.consumed_time for diag in sorted(times, key=operator.itemgetter(1)): Log.info('{0:23} {1:}', diag, times[diag]) def print_errors(self): failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED] if len(failed) == 0: return self.had_errors = True Log.error('Failed jobs') Log.error('-----------') for job in failed: Log.error('{0}: {0.message}', job) Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta())) Log.info('') @staticmethod def _run_job(job): start_time = datetime.datetime.now() try: Log.info('Starting {0}', job) job.status = DiagnosticStatus.RUNNING job.compute() except Exception as ex: job.consumed_time = datetime.datetime.now() - start_time exc_type, exc_value, exc_traceback = sys.exc_info() job.message = '{0}\n{1}'.format(ex, ''.join(traceback.format_tb(exc_traceback))) Log.error('Job {0} failed ({2}): {1}', job, job.message, exc_type) job.status = DiagnosticStatus.FAILED return False job.consumed_time = datetime.datetime.now() - start_time Log.result('Finished {0}', job) job.status = DiagnosticStatus.COMPLETED return True count = 0 failed_jobs = list() @staticmethod def _register_diagnostics(): WorkManager._register_ocean_diagnostics() WorkManager._register_general_diagnostics() WorkManager._register_stats_diagnostics() @staticmethod def _register_stats_diagnostics(): from earthdiagnostics.statistics.monthlypercentile import MonthlyPercentile from earthdiagnostics.statistics.climatologicalpercentile import ClimatologicalPercentile from earthdiagnostics.statistics.daysoverpercentile import DaysOverPercentile from earthdiagnostics.statistics.discretize import Discretize Diagnostic.register(MonthlyPercentile) Diagnostic.register(ClimatologicalPercentile) Diagnostic.register(DaysOverPercentile) Diagnostic.register(Discretize) @staticmethod def _register_general_diagnostics(): from earthdiagnostics.general.attribute import Attribute from earthdiagnostics.general.dailymean import DailyMean from earthdiagnostics.general.module import Module from earthdiagnostics.general.monthlymean import MonthlyMean from earthdiagnostics.general.yearlymean import YearlyMean from earthdiagnostics.general.rewrite import Rewrite from earthdiagnostics.general.relink import Relink from earthdiagnostics.general.relinkall import RelinkAll from earthdiagnostics.general.scale import Scale from earthdiagnostics.general.verticalmeanmetersiris import VerticalMeanMetersIris from earthdiagnostics.general.simplify_dimensions import SimplifyDimensions Diagnostic.register(DailyMean) Diagnostic.register(MonthlyMean) Diagnostic.register(YearlyMean) Diagnostic.register(Rewrite) Diagnostic.register(Relink) Diagnostic.register(RelinkAll) Diagnostic.register(Scale) Diagnostic.register(Attribute) Diagnostic.register(Module) Diagnostic.register(VerticalMeanMetersIris) Diagnostic.register(SimplifyDimensions) @staticmethod def _register_ocean_diagnostics(): from earthdiagnostics.ocean.mixedlayerheatcontent import MixedLayerHeatContent from earthdiagnostics.ocean.mixedlayersaltcontent import MixedLayerSaltContent from earthdiagnostics.ocean.siasiesiv import Siasiesiv from earthdiagnostics.ocean.verticalmean import VerticalMean from earthdiagnostics.ocean.verticalmeanmeters import VerticalMeanMeters from earthdiagnostics.ocean.verticalgradient import VerticalGradient from earthdiagnostics.ocean.interpolate import Interpolate from earthdiagnostics.ocean.interpolatecdo import InterpolateCDO from earthdiagnostics.ocean.moc import Moc from earthdiagnostics.ocean.areamoc import AreaMoc from earthdiagnostics.ocean.maxmoc import MaxMoc from earthdiagnostics.ocean.psi import Psi from earthdiagnostics.ocean.gyres import Gyres from earthdiagnostics.ocean.convectionsites import ConvectionSites from earthdiagnostics.ocean.cutsection import CutSection from earthdiagnostics.ocean.averagesection import AverageSection from earthdiagnostics.ocean.heatcontentlayer import HeatContentLayer from earthdiagnostics.ocean.heatcontent import HeatContent from earthdiagnostics.ocean.regionmean import RegionMean from earthdiagnostics.ocean.regionsum import RegionSum from earthdiagnostics.ocean.rotation import Rotation Diagnostic.register(MixedLayerSaltContent) Diagnostic.register(Siasiesiv) Diagnostic.register(VerticalMean) Diagnostic.register(VerticalMeanMeters) Diagnostic.register(Interpolate) Diagnostic.register(InterpolateCDO) Diagnostic.register(Moc) Diagnostic.register(AreaMoc) Diagnostic.register(MaxMoc) Diagnostic.register(Psi) Diagnostic.register(Gyres) Diagnostic.register(ConvectionSites) Diagnostic.register(CutSection) Diagnostic.register(AverageSection) Diagnostic.register(MixedLayerHeatContent) Diagnostic.register(HeatContentLayer) Diagnostic.register(HeatContent) Diagnostic.register(RegionMean) Diagnostic.register(RegionSum) Diagnostic.register(Rotation) Diagnostic.register(VerticalGradient) class Downloader(object): def __init__(self): self._downloads = [] self._lock = threading.Lock() self.stop = False self.on_hold = False def start(self): self._thread = threading.Thread(target=self.downloader) self._thread.start() def submit(self, datafile): self._lock.acquire() self._downloads.append(datafile) self._lock.release() def downloader(self): try: def suscribers_waiting(datafile): waiting = 0 for diag in datafile.suscribers: if not isinstance(diag, Diagnostic): continue if diag.pending_requests() == 1: waiting += 1 return waiting def prioritize(datafile1, datafile2): waiting = suscribers_waiting(datafile1) - suscribers_waiting(datafile2) if waiting: return -waiting suscribers = len(datafile1.suscribers) - len(datafile2.suscribers) if suscribers: return -suscribers if datafile1.size is None: if datafile2.size is None: return 0 else: return -1 elif datafile2.size is None: return 1 size = datafile1.size - datafile2.size if size: return -size return 0 while True: with self._lock: if len(self._downloads) == 0 or self.on_hold: if self.stop: return time.sleep(0.01) continue self._downloads.sort(prioritize) datafile = self._downloads[0] self._downloads.remove(datafile) datafile.download() except Exception as ex: Log.critical('Unhandled error at downloader: {0}\n{1}', ex, traceback.print_exc()) def shutdown(self): self.stop = True self._thread.join()