# coding=utf-8 import datetime import operator from bscearth.utils.log import Log from concurrent.futures import ThreadPoolExecutor from datafile import StorageStatus from diagnostic import DiagnosticStatus from utils import Utils, TempFile import threading class WorkManager(object): def __init__(self, config, job_list, data_manager): self.jobs = job_list self.config = config self.time = {} self.had_errors = False self.data_manager = data_manager def run(self): time = datetime.datetime.now() Log.info("Starting to compute at {0}", time) self.threads = Utils.available_cpu_count() if 0 < self.config.max_cores < self.threads: self.threads = self.config.max_cores Log.info('Using {0} threads', self.threads) for job in self.jobs: job.request_data() job.declare_data_generated() job.subscribe(self, self._job_status_changed) self.downloader = ThreadPoolExecutor(1) self.uploader = ThreadPoolExecutor(1) self.executor = ThreadPoolExecutor(self.threads) for file_object in self.data_manager.requested_files.values(): file_object.subscribe(self, self._file_object_status_changed) if file_object.download_required(): self.downloader.submit(file_object.download) self.lock = threading.Lock() self.lock.acquire() self.check_completion() self.lock.acquire() self.downloader.shutdown() self.executor.shutdown() self.uploader.shutdown(True) TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) Log.result("Elapsed time: {0}\n", finish_time - time) self.print_errors() self.print_stats() return not self.had_errors def _job_status_changed(self, job, status): if status == DiagnosticStatus.READY: self.executor.submit(self._run_job, job) self.check_completion() def _file_object_status_changed(self, file_object): if file_object.download_required(): self.downloader.submit(file_object.download) return if file_object.upload_required(): self.uploader.submit(file_object.upload) return self.check_completion() def check_completion(self): for job in self.jobs: if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING): return False if job.status == DiagnosticStatus.WAITING: if job.all_requests_in_storage(): return False for request in self.data_manager.requested_files.values(): if request.upload_required(): return if request.download_required(): return self.lock.release() return True def print_stats(self): Log.info('Time consumed by each diagnostic class') Log.info('--------------------------------------') times = {} for job in self.jobs: job_type = job.alias if job_type in times.keys(): times[job_type] += job.consumed_time else: times[job_type] = job.consumed_time for diag in sorted(times, key=operator.itemgetter(1)): Log.info('{0:23} {1:}', diag, times[diag]) def print_errors(self): failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED] if len(failed) == 0: return self.had_errors = True Log.error('Failed jobs') Log.error('-----------') for job in failed: Log.error('{0}: {0.message}', job) Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta())) Log.info('') def _run_job(self, job): Log.info('Starting {0}', job) job.status = DiagnosticStatus.RUNNING time = datetime.datetime.now() try: job.compute() except Exception as ex: job.consumed_time = datetime.datetime.now() - time job.message = str(ex) Log.error('Job {0} failed: {1}', job, ex) job.status = DiagnosticStatus.FAILED return False job.consumed_time = datetime.datetime.now() - time Log.result('Finished {0}', job) job.status = DiagnosticStatus.COMPLETED return True count = 0 failed_jobs = list()