# coding=utf-8 import datetime import threading import operator from time import sleep from bscearth.utils.log import Log from diagnostic import DiagnosticStatus from utils import Utils, TempFile class WorkManager(object): def __init__(self, config, job_list): self.pending_jobs = job_list self.running_jobs = [] self.finished_jobs = [] self.config = config def run(self): time = datetime.datetime.now() Log.info("Starting to compute at {0}", time) self.threads = Utils.available_cpu_count() if 0 < self.config.max_cores < self.threads: self.threads = self.config.max_cores Log.info('Using {0} threads', self.threads) self.job_runners = [JobRunner() for x in range(0, self.threads)] while len(self.pending_jobs) > 0 or len(self.running_jobs) > 0: if len(self.pending_jobs) > 0: for runner in self.job_runners: if runner.next_job is None: next_job = self.pending_jobs.pop() runner.set_next_job(next_job) self.running_jobs.append(next_job) if len(self.running_jobs) > 0: for job in self.running_jobs: if job.status in (DiagnosticStatus.COMPLETED, DiagnosticStatus.FAILED): self.finished_jobs.append(job) self.running_jobs.remove(job) sleep(0.1) TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) Log.result("Elapsed time: {0}\n", finish_time - time) self.print_errors() self.print_stats() return not self.had_errors def print_stats(self): Log.info('Time consumed by each diagnostic class') Log.info('--------------------------------------') total = dict() for runner in self.job_runners: for key, value in runner.time.items(): if key in total: total[key] += value else: total[key] = value for diag, time in sorted(total.items(), key=operator.itemgetter(1)): Log.info('{0:23} {1:}', diag.__name__, time) def print_errors(self): failed = [job for job in self.running_jobs if job.status == DiagnosticStatus.FAILED] if len(failed) == 0: return self.had_errors = True Log.error('Failed jobs') Log.error('-----------') for job in failed: Log.error(str(job)) Log.info('') class JobRunner(object): def __init__(self): self.next_job = None self.current_job = None self.time = {} self._stop = False self._is_running = False self.lock = threading.Lock() self.lock.acquire() def stop(self): self._stop = True self.lock.release() def set_next_job(self, job): self.next_job = job self.lock.release() def is_running(self): return self._is_running def start(self): threading.Thread(target=JobRunner._run_jobs, args=(self,)) def keep_running(self): self.lock.acquire() if self.stop: return False if self.next_job is not None: self.current_job = self.next_job self.next_job = None def _run_jobs(self): self._is_running = True while self.keep_running(): self._run_job() self._is_running = False return def _run_job(self): try: Log.info('Starting {0}', self.current_job) self.current_job.status = DiagnosticStatus.RUNNING time = datetime.datetime.now() self.current_job.compute() time = datetime.datetime.now() - time if type(self.current_job) in self.time: self.time[type(self.current_job)] += time else: self.time[type(self.current_job)] = time Log.result('Finished {0}', self.current_job) self.current_job.status = DiagnosticStatus.COMPLETED return True except Exception as ex: self.current_job.message = str(ex) Log.error('Job {0} failed: {1}', self.current_job, ex) count = 0 failed_jobs = list()