Newer
Older
import operator
from concurrent.futures import ThreadPoolExecutor
from diagnostic import DiagnosticStatus
from utils import Utils, TempFile
def __init__(self, config, job_list, data_manager):
self.time = {}
self.had_errors = False
def run(self):
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
job.subscribe(self, self._job_status_changed)
self.downloader = ThreadPoolExecutor(1)
self.uploader = ThreadPoolExecutor(1)
self.executor = ThreadPoolExecutor(self.threads)
for file_object in self.data_manager.requested_files.values():
self.downloader.submit(file_object.download)
self.lock = threading.Lock()
self.lock.acquire()
self.lock.acquire()
self.downloader.shutdown(True)
self.uploader.shutdown(True)
self.executor.shutdown(True)
TempFile.clean()
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - time)
self.print_errors()
self.print_stats()
def _job_status_changed(self, job, status):
if status == DiagnosticStatus.READY:
self.executor.submit(self._run_job, job)
self.check_completion()
def check_completion(self):
if any([job.status not in (DiagnosticStatus.COMPLETED, DiagnosticStatus.FAILED) for job in self.jobs]):
return
self.lock.release()
def print_stats(self):
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
times = {}
for job in self.jobs:
job_type = job.alias
if job_type in times.keys():
times[job_type] += job.consumed_time
else:
times[job_type] = job.consumed_time
for diag in sorted(times, key=operator.itemgetter(1)):
Log.info('{0:23} {1:}', diag, times[diag])
def print_errors(self):
failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED]
if len(failed) == 0:
return
self.had_errors = True
Log.error('Failed jobs')
Log.error('-----------')
for job in failed:
Log.error('{0}: {0.message}', job)
Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta()))
Log.info('')
Log.info('Starting {0}', job)
job.status = DiagnosticStatus.RUNNING
time = datetime.datetime.now()
job.consumed_time = datetime.datetime.now() - time
job.message = str(ex)
Log.error('Job {0} failed: {1}', job, ex)
job.status = DiagnosticStatus.FAILED
return False
job.consumed_time = datetime.datetime.now() - time
Log.result('Finished {0}', job)
job.status = DiagnosticStatus.COMPLETED
return True