Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# coding=utf-8
import datetime
import threading
import operator
from time import sleep
from bscearth.utils.log import Log
from diagnostic import DiagnosticStatus
from utils import Utils, TempFile
class WorkManager(object):
def __init__(self, config, job_list):
self.pending_jobs = job_list
self.running_jobs = []
self.finished_jobs = []
self.config = config
def run(self):
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
self.job_runners = [JobRunner() for x in range(0, self.threads)]
while len(self.pending_jobs) > 0 or len(self.running_jobs) > 0:
if len(self.pending_jobs) > 0:
for runner in self.job_runners:
if runner.next_job is None:
next_job = self.pending_jobs.pop()
runner.set_next_job(next_job)
self.running_jobs.append(next_job)
if len(self.running_jobs) > 0:
for job in self.running_jobs:
if job.status in (DiagnosticStatus.COMPLETED, DiagnosticStatus.FAILED):
self.finished_jobs.append(job)
self.running_jobs.remove(job)
sleep(0.1)
TempFile.clean()
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - time)
self.print_errors()
self.print_stats()
return not self.had_errors
def print_stats(self):
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
total = dict()
for runner in self.job_runners:
for key, value in runner.time.items():
if key in total:
total[key] += value
else:
total[key] = value
for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
Log.info('{0:23} {1:}', diag.__name__, time)
def print_errors(self):
failed = [job for job in self.running_jobs if job.status == DiagnosticStatus.FAILED]
if len(failed) == 0:
return
self.had_errors = True
Log.error('Failed jobs')
Log.error('-----------')
for job in failed:
Log.error(str(job))
Log.info('')
class JobRunner(object):
def __init__(self):
self.next_job = None
self.current_job = None
self.time = {}
self._stop = False
self._is_running = False
self.lock = threading.Lock()
self.lock.acquire()
def stop(self):
self._stop = True
self.lock.release()
def set_next_job(self, job):
self.next_job = job
self.lock.release()
def is_running(self):
return self._is_running
def start(self):
threading.Thread(target=JobRunner._run_jobs, args=(self,))
def keep_running(self):
self.lock.acquire()
if self.stop:
return False
if self.next_job is not None:
self.current_job = self.next_job
self.next_job = None
def _run_jobs(self):
self._is_running = True
while self.keep_running():
self._run_job()
self._is_running = False
return
def _run_job(self):
try:
Log.info('Starting {0}', self.current_job)
self.current_job.status = DiagnosticStatus.RUNNING
time = datetime.datetime.now()
self.current_job.compute()
time = datetime.datetime.now() - time
if type(self.current_job) in self.time:
self.time[type(self.current_job)] += time
else:
self.time[type(self.current_job)] = time
Log.result('Finished {0}', self.current_job)
self.current_job.status = DiagnosticStatus.COMPLETED
return True
except Exception as ex:
self.current_job.message = str(ex)
Log.error('Job {0} failed: {1}', self.current_job, ex)
count = 0
failed_jobs = list()