Newer
Older
import operator
from concurrent.futures import ThreadPoolExecutor
from datafile import StorageStatus, LocalStatus
from diagnostic import DiagnosticStatus
from utils import Utils, TempFile
def __init__(self, config, job_list, data_manager):
self.time = {}
self.had_errors = False
def run(self):
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
job.subscribe(self, self._job_status_changed)
self.downloader = ThreadPoolExecutor(1)
self.uploader = ThreadPoolExecutor(1)
self.executor = ThreadPoolExecutor(self.threads)
for file_object in self.data_manager.requested_files.values():
file_object.subscribe(self, self._file_object_status_changed)
if file_object.download_required():
self.downloader.submit(file_object.download)
self.lock = threading.Lock()
self.lock.acquire()
self.check_completion()
self.lock.acquire()
self.downloader.shutdown()
self.executor.shutdown()
TempFile.clean()
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - time)
self.print_errors()
self.print_stats()
def _job_status_changed(self, job):
if job.status == DiagnosticStatus.READY:
self.executor.submit(self._run_job, job)
self.check_completion()
def _file_object_status_changed(self, file_object):
if file_object.download_required():
self.downloader.submit(file_object.download)
return
if file_object.upload_required():
self.uploader.submit(file_object.upload)
return
self.check_completion()
for job in self.jobs:
if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING):
return False
if job.status == DiagnosticStatus.WAITING:
if job.all_requests_in_storage():
return False
for request in self.data_manager.requested_files.values():
if request.storage_status == StorageStatus.UPLOADING:
return
if request.local_status == LocalStatus.DOWNLOADING:
return
if request.upload_required():
return
if request.download_required():
return
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def print_stats(self):
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
times = {}
for job in self.jobs:
job_type = job.alias
if job_type in times.keys():
times[job_type] += job.consumed_time
else:
times[job_type] = job.consumed_time
for diag in sorted(times, key=operator.itemgetter(1)):
Log.info('{0:23} {1:}', diag, times[diag])
def print_errors(self):
failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED]
if len(failed) == 0:
return
self.had_errors = True
Log.error('Failed jobs')
Log.error('-----------')
for job in failed:
Log.error('{0}: {0.message}', job)
Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta()))
Log.info('')
Log.info('Starting {0}', job)
job.status = DiagnosticStatus.RUNNING
time = datetime.datetime.now()
job.consumed_time = datetime.datetime.now() - time
job.message = str(ex)
Log.error('Job {0} failed: {1}', job, ex)
job.status = DiagnosticStatus.FAILED
return False
job.consumed_time = datetime.datetime.now() - time
Log.result('Finished {0}', job)
job.status = DiagnosticStatus.COMPLETED
return True