Newer
Older
# coding=utf-8
import datetime
import operator
from bscearth.utils.log import Log
from diagnostic import DiagnosticStatus
from utils import Utils, TempFile
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class WorkManager(object):
def __init__(self, config, job_list):
self.pending_jobs = job_list
self.config = config
self.time = {}
self.had_errors = False
def run(self):
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
with ThreadPoolExecutor(self.threads) as executor:
for job in self.pending_jobs:
executor.submit(self._run_job, job)
TempFile.clean()
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - time)
# self.print_errors()
# self.print_stats()
return not self.had_errors
def print_stats(self):
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
total = dict()
for runner in self.job_runners:
for key, value in runner.time.items():
if key in total:
total[key] += value
else:
total[key] = value
for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
Log.info('{0:23} {1:}', diag.__name__, time)
def print_errors(self):
failed = [job for job in self.running_jobs if job.status == DiagnosticStatus.FAILED]
if len(failed) == 0:
return
self.had_errors = True
Log.error('Failed jobs')
Log.error('-----------')
for job in failed:
Log.error(str(job))
Log.info('')
Log.info('Starting {0}', job)
job.status = DiagnosticStatus.RUNNING
if type(job) in self.time:
self.time[type(job)] += time
self.time[type(job)] = time
Log.result('Finished {0}', job)
job.status = DiagnosticStatus.COMPLETED
job.message = str(ex)
Log.error('Job {0} failed: {1}', job, ex)
self.semaphore.release()