Newer
Older
import operator
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError
from earthdiagnostics.general import *
from earthdiagnostics.ocean import *
from earthdiagnostics.utils import Utils, TempFile
def __init__(self, config, data_manager):
self.jobs = None
self.time = {}
self.had_errors = False
def prepare_job_list(self):
self._register_diagnostics()
list_jobs = list()
for fulldiag in self.config.get_commands():
Log.info("Adding {0} to diagnostic list", fulldiag)
diag_options = fulldiag.split(',')
diag_class = Diagnostic.get_diagnostic(diag_options[0])
if diag_class:
try:
for job in diag_class.generate_jobs(self, diag_options):
list_jobs.append(job)
continue
except DiagnosticOptionError as ex:
Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex)
self.had_errors = True
else:
Log.error('{0} is not an available diagnostic', diag_options[0])
self.had_errors = True
self.jobs = list_jobs
def run(self):
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
self.downloader = Downloader()
self.uploader = ThreadPoolExecutor(self.config.parallel_uploads)
self.executor = ThreadPoolExecutor(self.threads)
job.subscribe(self, self._job_status_changed)
for subjob in job.subjobs:
subjob.subscribe(self, self._job_status_changed)
job.check_is_ready()
for file_object in self.data_manager.requested_files.values():
file_object.subscribe(self, self._file_object_status_changed)
if file_object.download_required():
self.downloader.submit(file_object)
self.downloader.start()
self.lock = threading.Lock()
self.lock.acquire()
self.check_completion()
self.lock.acquire()
self.downloader.shutdown()
self.executor.shutdown()
TempFile.clean()
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - time)
self.print_errors()
self.print_stats()
def _job_status_changed(self, job):
if job.status == DiagnosticStatus.READY:
self.executor.submit(self._run_job, job)
self.check_completion()
def _file_object_status_changed(self, file_object):
if file_object.download_required():
self.downloader.submit(file_object)
return
if file_object.upload_required():
self.uploader.submit(file_object.upload)
return
self.check_completion()
for job in self.jobs:
if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING):
return False
if job.status == DiagnosticStatus.WAITING:
if job.all_requests_in_storage():
return False
for request in self.data_manager.requested_files.values():
if request.storage_status == StorageStatus.UPLOADING:
return
if request.local_status == LocalStatus.DOWNLOADING:
return
if request.upload_required():
return
if request.download_required():
return
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def print_stats(self):
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
times = {}
for job in self.jobs:
job_type = job.alias
if job_type in times.keys():
times[job_type] += job.consumed_time
else:
times[job_type] = job.consumed_time
for diag in sorted(times, key=operator.itemgetter(1)):
Log.info('{0:23} {1:}', diag, times[diag])
def print_errors(self):
failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED]
if len(failed) == 0:
return
self.had_errors = True
Log.error('Failed jobs')
Log.error('-----------')
for job in failed:
Log.error('{0}: {0.message}', job)
Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta()))
Log.info('')
Log.info('Starting {0}', job)
job.status = DiagnosticStatus.RUNNING
job.consumed_time = datetime.datetime.now() - time
job.message = str(ex)
Log.error('Job {0} failed: {1}', job, ex)
job.status = DiagnosticStatus.FAILED
return False
job.consumed_time = datetime.datetime.now() - time
Log.result('Finished {0}', job)
job.status = DiagnosticStatus.COMPLETED
return True
@staticmethod
def _register_diagnostics():
WorkManager._register_ocean_diagnostics()
WorkManager._register_general_diagnostics()
WorkManager._register_stats_diagnostics()
@staticmethod
def _register_stats_diagnostics():
Diagnostic.register(MonthlyPercentile)
Diagnostic.register(ClimatologicalPercentile)
Diagnostic.register(DaysOverPercentile)
@staticmethod
def _register_general_diagnostics():
Diagnostic.register(DailyMean)
Diagnostic.register(MonthlyMean)
Diagnostic.register(YearlyMean)
Diagnostic.register(Rewrite)
Diagnostic.register(Relink)
Diagnostic.register(RelinkAll)
Diagnostic.register(Scale)
Diagnostic.register(Attribute)
Diagnostic.register(Module)
Diagnostic.register(VerticalMeanMetersIris)
Diagnostic.register(SimplifyDimensions)
@staticmethod
def _register_ocean_diagnostics():
Diagnostic.register(MixedLayerSaltContent)
Diagnostic.register(Siasiesiv)
Diagnostic.register(VerticalMean)
Diagnostic.register(VerticalMeanMeters)
Diagnostic.register(Interpolate)
Diagnostic.register(InterpolateCDO)
Diagnostic.register(Moc)
Diagnostic.register(AreaMoc)
Diagnostic.register(MaxMoc)
Diagnostic.register(Psi)
Diagnostic.register(Gyres)
Diagnostic.register(ConvectionSites)
Diagnostic.register(CutSection)
Diagnostic.register(AverageSection)
Diagnostic.register(MixedLayerHeatContent)
Diagnostic.register(HeatContentLayer)
Diagnostic.register(HeatContent)
Diagnostic.register(RegionMean)
Diagnostic.register(Rotation)
Diagnostic.register(VerticalGradient)
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class Downloader(object):
def __init__(self):
self._downloads = []
self._lock = threading.Lock()
self._wait = threading.Semaphore()
self.stop = False
def start(self):
self._thread = threading.Thread(target=self.downloader)
self._thread.start()
def submit(self, datafile):
self._lock.acquire()
self._downloads.append(datafile)
self._lock.release()
def downloader(self):
try:
def suscribers_waiting(datafile):
waiting = 0
for diag in datafile.suscribers:
if not isinstance(diag, Diagnostic):
continue
if diag.pending_requests() == 1:
waiting += 1
return waiting
def prioritize(datafile1, datafile2):
waiting = suscribers_waiting(datafile1) - suscribers_waiting(datafile2)
if waiting:
return -waiting
suscribers = len(datafile1.suscribers) - len(datafile2.suscribers)
if suscribers:
return -suscribers
size = datafile1.size - datafile2.size
if size:
return -size
return 0
while True:
with self._lock:
if len(self._downloads) == 0:
if self.stop:
return
time.sleep(0.01)
break
self._downloads.sort(prioritize)
datafile = self._downloads[0]
self._downloads.remove(datafile)
datafile.download()
except Exception as ex:
Log.critical('Unhandled error at downloader: {0}', ex)
def shutdown(self):
self.stop = True
self._thread.join()