Newer
Older
# coding=utf-8
import datetime
import operator
from bscearth.utils.log import Log
from diagnostic import DiagnosticStatus
from utils import Utils, TempFile
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def __init__(self, config, job_list, data_manager):
self.pending_jobs = job_list
self.config = config
self.time = {}
self.had_errors = False
def run(self):
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
for job in self.pending_jobs:
job.request_data()
self.downloader = ThreadPoolExecutor(1)
self.uploader = ThreadPoolExecutor(1)
self.executor = ThreadPoolExecutor(self.threads)
for file_object in self.data_manager.requested_files.values():
self.downloader.submit(file_object.download)
for job in self.pending_jobs:
self.executor.submit(self._run_job, job)
TempFile.clean()
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - time)
# self.print_errors()
# self.print_stats()
# def print_stats(self):
# Log.info('Time consumed by each diagnostic class')
# Log.info('--------------------------------------')
# total = dict()
# for runner in self.job_runners:
# for key, value in runner.time.items():
# if key in total:
# total[key] += value
# else:
# total[key] = value
# for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
# Log.info('{0:23} {1:}', diag.__name__, time)
#
# def print_errors(self):
# failed = [job for job in self.running_jobs if job.status == DiagnosticStatus.FAILED]
# if len(failed) == 0:
# return
# self.had_errors = True
# Log.error('Failed jobs')
# Log.error('-----------')
# for job in failed:
# Log.error(str(job))
# Log.info('')
Log.info('Starting {0}', job)
job.status = DiagnosticStatus.RUNNING
if type(job) in self.time:
self.time[type(job)] += time
self.time[type(job)] = time
Log.result('Finished {0}', job)
job.status = DiagnosticStatus.COMPLETED
job.message = str(ex)
Log.error('Job {0} failed: {1}', job, ex)