# coding=utf-8 import datetime import operator from bscearth.utils.log import Log from concurrent.futures import ThreadPoolExecutor from earthdiagnostics.datafile import StorageStatus, LocalStatus from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError from earthdiagnostics.utils import Utils, TempFile import threading from earthdiagnostics.ocean import * from earthdiagnostics.general import * from earthdiagnostics.statistics import * class WorkManager(object): def __init__(self, config, data_manager): self.jobs = None self.config = config self.time = {} self.had_errors = False self.data_manager = data_manager def prepare_job_list(self): self._register_diagnostics() list_jobs = list() for fulldiag in self.config.get_commands(): Log.info("Adding {0} to diagnostic list", fulldiag) diag_options = fulldiag.split(',') diag_class = Diagnostic.get_diagnostic(diag_options[0]) if diag_class: try: for job in diag_class.generate_jobs(self, diag_options): list_jobs.append(job) continue except DiagnosticOptionError as ex: Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex) self.had_errors = True else: Log.error('{0} is not an available diagnostic', diag_options[0]) self.had_errors = True self.jobs = list_jobs def run(self): time = datetime.datetime.now() Log.info("Starting to compute at {0}", time) self.threads = Utils.available_cpu_count() if 0 < self.config.max_cores < self.threads: self.threads = self.config.max_cores Log.info('Using {0} threads', self.threads) self.downloader = ThreadPoolExecutor(1) self.uploader = ThreadPoolExecutor(1) self.executor = ThreadPoolExecutor(self.threads) for job in self.jobs: job.request_data() job.declare_data_generated() job.subscribe(self, self._job_status_changed) for subjob in job.subjobs: subjob.subscribe(self, self._job_status_changed) job.check_is_ready() for file_object in self.data_manager.requested_files.values(): file_object.subscribe(self, self._file_object_status_changed) if file_object.download_required(): self.downloader.submit(file_object.download) self.lock = threading.Lock() self.lock.acquire() self.check_completion() self.lock.acquire() self.downloader.shutdown() self.executor.shutdown() self.uploader.shutdown(True) TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) Log.result("Elapsed time: {0}\n", finish_time - time) self.print_errors() self.print_stats() return not self.had_errors def _job_status_changed(self, job): if job.status == DiagnosticStatus.READY: self.executor.submit(self._run_job, job) self.check_completion() def _file_object_status_changed(self, file_object): if file_object.download_required(): self.downloader.submit(file_object.download) return if file_object.upload_required(): self.uploader.submit(file_object.upload) return self.check_completion() def check_completion(self): for job in self.jobs: if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING): return False if job.status == DiagnosticStatus.WAITING: if job.all_requests_in_storage(): return False for request in self.data_manager.requested_files.values(): if request.storage_status == StorageStatus.UPLOADING: return if request.local_status == LocalStatus.DOWNLOADING: return if request.upload_required(): return if request.download_required(): return self.lock.release() return True def print_stats(self): Log.info('Time consumed by each diagnostic class') Log.info('--------------------------------------') times = {} for job in self.jobs: job_type = job.alias if job_type in times.keys(): times[job_type] += job.consumed_time else: times[job_type] = job.consumed_time for diag in sorted(times, key=operator.itemgetter(1)): Log.info('{0:23} {1:}', diag, times[diag]) def print_errors(self): failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED] if len(failed) == 0: return self.had_errors = True Log.error('Failed jobs') Log.error('-----------') for job in failed: Log.error('{0}: {0.message}', job) Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta())) Log.info('') @staticmethod def _run_job(job): Log.info('Starting {0}', job) job.status = DiagnosticStatus.RUNNING time = datetime.datetime.now() try: job.compute() except Exception as ex: job.consumed_time = datetime.datetime.now() - time job.message = str(ex) Log.error('Job {0} failed: {1}', job, ex) job.status = DiagnosticStatus.FAILED return False job.consumed_time = datetime.datetime.now() - time Log.result('Finished {0}', job) job.status = DiagnosticStatus.COMPLETED return True count = 0 failed_jobs = list() @staticmethod def _register_diagnostics(): WorkManager._register_ocean_diagnostics() WorkManager._register_general_diagnostics() WorkManager._register_stats_diagnostics() @staticmethod def _register_stats_diagnostics(): Diagnostic.register(MonthlyPercentile) Diagnostic.register(ClimatologicalPercentile) @staticmethod def _register_general_diagnostics(): Diagnostic.register(DailyMean) Diagnostic.register(MonthlyMean) Diagnostic.register(YearlyMean) Diagnostic.register(Rewrite) Diagnostic.register(Relink) Diagnostic.register(RelinkAll) Diagnostic.register(Scale) Diagnostic.register(Attribute) Diagnostic.register(Module) Diagnostic.register(VerticalMeanMetersIris) @staticmethod def _register_ocean_diagnostics(): Diagnostic.register(MixedLayerSaltContent) Diagnostic.register(Siasiesiv) Diagnostic.register(VerticalMean) Diagnostic.register(VerticalMeanMeters) Diagnostic.register(Interpolate) Diagnostic.register(InterpolateCDO) Diagnostic.register(Moc) Diagnostic.register(AreaMoc) Diagnostic.register(MaxMoc) Diagnostic.register(Psi) Diagnostic.register(Gyres) Diagnostic.register(ConvectionSites) Diagnostic.register(CutSection) Diagnostic.register(AverageSection) Diagnostic.register(MixedLayerHeatContent) Diagnostic.register(HeatContentLayer) Diagnostic.register(HeatContent) Diagnostic.register(RegionMean) Diagnostic.register(Rotation) Diagnostic.register(VerticalGradient)