# coding=utf-8 import datetime import operator import sys import threading import time import traceback from bscearth.utils.log import Log # noinspection PyCompatibility from concurrent.futures import ThreadPoolExecutor from earthdiagnostics.datafile import StorageStatus, LocalStatus from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError from earthdiagnostics.general import * from earthdiagnostics.ocean import * from earthdiagnostics.statistics import * from earthdiagnostics.utils import Utils, TempFile class WorkManager(object): def __init__(self, config, data_manager): self.jobs = None self.config = config self.time = {} self.had_errors = False self.data_manager = data_manager def prepare_job_list(self): self._register_diagnostics() list_jobs = list() for fulldiag in self.config.get_commands(): Log.info("Adding {0} to diagnostic list", fulldiag) diag_options = fulldiag.split(',') diag_class = Diagnostic.get_diagnostic(diag_options[0]) if diag_class: try: for job in diag_class.generate_jobs(self, diag_options): list_jobs.append(job) for subjob in job.subjobs: list_jobs.append(subjob) continue except DiagnosticOptionError as ex: Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex) self.had_errors = True else: Log.error('{0} is not an available diagnostic', diag_options[0]) self.had_errors = True self.jobs = list_jobs def run(self): if len(self.jobs) == 0: Log.result('No diagnostics to run') return True start_time = datetime.datetime.now() Log.info("Starting to compute at {0}", start_time) self.threads = Utils.available_cpu_count() if 0 < self.config.max_cores < self.threads: self.threads = self.config.max_cores Log.info('Using {0} threads', self.threads) self.downloader = Downloader() self.uploader = ThreadPoolExecutor(self.config.parallel_uploads) self.executor = ThreadPoolExecutor(self.threads) for job in self.jobs: job.request_data() job.declare_data_generated() job.subscribe(self, self._job_status_changed) job.check_is_ready() if self.config.skip_diags_done: for job in self.jobs: if job.can_skip_run(): Log.info('Diagnostic {0} already done. Skipping !', job) job.status = DiagnosticStatus.COMPLETED for file_object in self.data_manager.requested_files.values(): file_object.subscribe(self, self._file_object_status_changed) if file_object.download_required(): self.downloader.submit(file_object) self.downloader.start() self.lock = threading.Lock() self.lock.acquire() # self.check_completion() self.lock.acquire() self.downloader.shutdown() self.executor.shutdown() self.uploader.shutdown(True) TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) Log.result("Elapsed time: {0}\n", finish_time - start_time) self.print_errors() self.print_stats() return not self.had_errors def _job_status_changed(self, job): if job.status == DiagnosticStatus.READY: self.executor.submit(self._run_job, job) for request in job._requests: if request.only_suscriber(self): del self.data_manager.requested_files[request.remote_file] request.unsubscribe(self) request.clean_local() self.check_completion() def _file_object_status_changed(self, file_object): Log.debug('Checking file {0}. Local status {0.local_status} Storage status{0.storage_status}', file_object) if file_object.download_required(): self.downloader.submit(file_object) return if file_object.upload_required(): file_object.storage_status = StorageStatus.UPLOADING self.uploader.submit(file_object.upload) return if file_object.local_status != LocalStatus.COMPUTING and \ file_object.storage_status != StorageStatus.UPLOADING and \ file_object.only_suscriber(self): del self.data_manager.requested_files[file_object.remote_file] file_object.unsubscribe(self) file_object.clean_local() self.check_completion() def check_completion(self): counter = 0 for job in self.jobs: if job.status == DiagnosticStatus.READY: counter += 1 if counter > 20: break self.downloader.on_hold = counter > 20 for job in self.jobs: if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING): return False if job.status == DiagnosticStatus.WAITING: if job.all_requests_in_storage(): return False for request in self.data_manager.requested_files.values(): if request.storage_status == StorageStatus.UPLOADING: return False if request.local_status == LocalStatus.DOWNLOADING: return False if request.upload_required(): return False if request.download_required(): return False try: self.lock.release() except Exception: pass return True def print_stats(self): Log.info('Time consumed by each diagnostic class') Log.info('--------------------------------------') times = {} for job in self.jobs: job_type = job.alias if job_type in times.keys(): times[job_type] += job.consumed_time else: times[job_type] = job.consumed_time for diag in sorted(times, key=operator.itemgetter(1)): Log.info('{0:23} {1:}', diag, times[diag]) def print_errors(self): failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED] if len(failed) == 0: return self.had_errors = True Log.error('Failed jobs') Log.error('-----------') for job in failed: Log.error('{0}: {0.message}', job) Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta())) Log.info('') @staticmethod def _run_job(job): start_time = datetime.datetime.now() try: Log.info('Starting {0}', job) job.status = DiagnosticStatus.RUNNING job.compute() except Exception as ex: job.consumed_time = datetime.datetime.now() - start_time exc_type, exc_value, exc_traceback = sys.exc_info() job.message = '{0}\n{1}'.format(ex, ''.join(traceback.format_tb(exc_traceback))) Log.error('Job {0} failed ({2}): {1}', job, job.message, exc_type) job.status = DiagnosticStatus.FAILED return False job.consumed_time = datetime.datetime.now() - start_time Log.result('Finished {0}', job) job.status = DiagnosticStatus.COMPLETED return True count = 0 failed_jobs = list() @staticmethod def _register_diagnostics(): WorkManager._register_ocean_diagnostics() WorkManager._register_general_diagnostics() WorkManager._register_stats_diagnostics() @staticmethod def _register_stats_diagnostics(): Diagnostic.register(MonthlyPercentile) Diagnostic.register(ClimatologicalPercentile) Diagnostic.register(DaysOverPercentile) Diagnostic.register(Discretize) @staticmethod def _register_general_diagnostics(): Diagnostic.register(DailyMean) Diagnostic.register(MonthlyMean) Diagnostic.register(YearlyMean) Diagnostic.register(Rewrite) Diagnostic.register(Relink) Diagnostic.register(RelinkAll) Diagnostic.register(Scale) Diagnostic.register(Attribute) Diagnostic.register(Module) Diagnostic.register(VerticalMeanMetersIris) Diagnostic.register(SimplifyDimensions) @staticmethod def _register_ocean_diagnostics(): Diagnostic.register(MixedLayerSaltContent) Diagnostic.register(Siasiesiv) Diagnostic.register(VerticalMean) Diagnostic.register(VerticalMeanMeters) Diagnostic.register(Interpolate) Diagnostic.register(InterpolateCDO) Diagnostic.register(Moc) Diagnostic.register(AreaMoc) Diagnostic.register(MaxMoc) Diagnostic.register(Psi) Diagnostic.register(Gyres) Diagnostic.register(ConvectionSites) Diagnostic.register(CutSection) Diagnostic.register(AverageSection) Diagnostic.register(MixedLayerHeatContent) Diagnostic.register(HeatContentLayer) Diagnostic.register(HeatContent) Diagnostic.register(RegionMean) Diagnostic.register(RegionSum) Diagnostic.register(Rotation) Diagnostic.register(VerticalGradient) class Downloader(object): def __init__(self): self._downloads = [] self._lock = threading.Lock() self.stop = False self.on_hold = False def start(self): self._thread = threading.Thread(target=self.downloader) self._thread.start() def submit(self, datafile): self._lock.acquire() self._downloads.append(datafile) self._lock.release() def downloader(self): try: def suscribers_waiting(datafile): waiting = 0 for diag in datafile.suscribers: if not isinstance(diag, Diagnostic): continue if diag.pending_requests() == 1: waiting += 1 return waiting def prioritize(datafile1, datafile2): waiting = suscribers_waiting(datafile1) - suscribers_waiting(datafile2) if waiting: return -waiting suscribers = len(datafile1.suscribers) - len(datafile2.suscribers) if suscribers: return -suscribers if datafile1.size is None: if datafile2.size is None: return 0 else: return -1 elif datafile2.size is None: return 1 size = datafile1.size - datafile2.size if size: return -size return 0 while True: with self._lock: if len(self._downloads) == 0 or self.on_hold: if self.stop: return time.sleep(0.01) continue self._downloads.sort(prioritize) datafile = self._downloads[0] self._downloads.remove(datafile) datafile.download() except Exception as ex: Log.critical('Unhandled error at downloader: {0}\n{1}', ex, traceback.print_exc()) def shutdown(self): self.stop = True self._thread.join()