Newer
Older
import operator
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError
from earthdiagnostics.general import *
from earthdiagnostics.ocean import *
from earthdiagnostics.utils import Utils, TempFile
def __init__(self, config, data_manager):
self.jobs = None
self.time = {}
self.had_errors = False
def prepare_job_list(self):
self._register_diagnostics()
list_jobs = list()
for fulldiag in self.config.get_commands():
Log.info("Adding {0} to diagnostic list", fulldiag)
diag_options = fulldiag.split(',')
diag_class = Diagnostic.get_diagnostic(diag_options[0])
if diag_class:
try:
for job in diag_class.generate_jobs(self, diag_options):
list_jobs.append(job)
for subjob in job.subjobs:
list_jobs.append(subjob)
continue
except DiagnosticOptionError as ex:
Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex)
self.had_errors = True
else:
Log.error('{0} is not an available diagnostic', diag_options[0])
self.had_errors = True
self.jobs = list_jobs
def run(self):
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
self.downloader = Downloader()
self.uploader = ThreadPoolExecutor(self.config.parallel_uploads)
self.executor = ThreadPoolExecutor(self.threads)
job.subscribe(self, self._job_status_changed)
job.check_is_ready()
if self.config.skip_diags_done:
for job in self.jobs:
if job.can_skip_run():
Log.info('Diagnostic {0} already done. Skipping !', job)
job.status = DiagnosticStatus.COMPLETED
for file_object in self.data_manager.requested_files.values():
file_object.subscribe(self, self._file_object_status_changed)
if file_object.download_required():
self.downloader.submit(file_object)
self.downloader.start()
self.lock = threading.Lock()
self.lock.acquire()
self.lock.acquire()
self.downloader.shutdown()
self.executor.shutdown()
TempFile.clean()
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - time)
self.print_errors()
self.print_stats()
def _job_status_changed(self, job):
if job.status == DiagnosticStatus.READY:
self.executor.submit(self._run_job, job)
for request in job._requests:
if request.only_suscriber(self):
del self.data_manager.requested_files[request.remote_file]
request.unsubscribe(self)
def _file_object_status_changed(self, file_object):
Log.debug('Checking file {0}. Local status {0.local_status} Storage status{0.storage_status}', file_object)
if file_object.download_required():
self.downloader.submit(file_object)
return
if file_object.upload_required():
file_object.storage_status = StorageStatus.UPLOADING
self.uploader.submit(file_object.upload)
return
if file_object.local_status != LocalStatus.COMPUTING and \
file_object.storage_status != StorageStatus.UPLOADING and \
file_object.only_suscriber(self):
del self.data_manager.requested_files[file_object.remote_file]
file_object.unsubscribe(self)
self.check_completion()
counter = 0
for job in self.jobs:
if job.status == DiagnosticStatus.READY:
counter += 1
if counter > 20:
break
self.downloader.on_hold = counter > 20
for job in self.jobs:
if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING):
return False
if job.status == DiagnosticStatus.WAITING:
if job.all_requests_in_storage():
return False
for request in self.data_manager.requested_files.values():
if request.storage_status == StorageStatus.UPLOADING:
if request.local_status == LocalStatus.DOWNLOADING:
if request.upload_required():
if request.download_required():
return False
try:
self.lock.release()
except Exception:
pass
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def print_stats(self):
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
times = {}
for job in self.jobs:
job_type = job.alias
if job_type in times.keys():
times[job_type] += job.consumed_time
else:
times[job_type] = job.consumed_time
for diag in sorted(times, key=operator.itemgetter(1)):
Log.info('{0:23} {1:}', diag, times[diag])
def print_errors(self):
failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED]
if len(failed) == 0:
return
self.had_errors = True
Log.error('Failed jobs')
Log.error('-----------')
for job in failed:
Log.error('{0}: {0.message}', job)
Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta()))
Log.info('')
Log.info('Starting {0}', job)
job.status = DiagnosticStatus.RUNNING
job.consumed_time = datetime.datetime.now() - time
exc_type, exc_value, exc_traceback = sys.exc_info()
job.message = '{0}\n{1}'.format(ex, ''.join(traceback.format_tb(exc_traceback)))
Log.error('Job {0} failed: {1}', job, job.message )
job.status = DiagnosticStatus.FAILED
return False
job.consumed_time = datetime.datetime.now() - time
Log.result('Finished {0}', job)
job.status = DiagnosticStatus.COMPLETED
return True
@staticmethod
def _register_diagnostics():
WorkManager._register_ocean_diagnostics()
WorkManager._register_general_diagnostics()
WorkManager._register_stats_diagnostics()
@staticmethod
def _register_stats_diagnostics():
Diagnostic.register(MonthlyPercentile)
Diagnostic.register(ClimatologicalPercentile)
Diagnostic.register(DaysOverPercentile)
@staticmethod
def _register_general_diagnostics():
Diagnostic.register(DailyMean)
Diagnostic.register(MonthlyMean)
Diagnostic.register(YearlyMean)
Diagnostic.register(Rewrite)
Diagnostic.register(Relink)
Diagnostic.register(RelinkAll)
Diagnostic.register(Scale)
Diagnostic.register(Attribute)
Diagnostic.register(Module)
Diagnostic.register(VerticalMeanMetersIris)
Diagnostic.register(SimplifyDimensions)
@staticmethod
def _register_ocean_diagnostics():
Diagnostic.register(MixedLayerSaltContent)
Diagnostic.register(Siasiesiv)
Diagnostic.register(VerticalMean)
Diagnostic.register(VerticalMeanMeters)
Diagnostic.register(Interpolate)
Diagnostic.register(InterpolateCDO)
Diagnostic.register(Moc)
Diagnostic.register(AreaMoc)
Diagnostic.register(MaxMoc)
Diagnostic.register(Psi)
Diagnostic.register(Gyres)
Diagnostic.register(ConvectionSites)
Diagnostic.register(CutSection)
Diagnostic.register(AverageSection)
Diagnostic.register(MixedLayerHeatContent)
Diagnostic.register(HeatContentLayer)
Diagnostic.register(HeatContent)
Diagnostic.register(RegionMean)
Diagnostic.register(Rotation)
Diagnostic.register(VerticalGradient)
class Downloader(object):
def __init__(self):
self._downloads = []
self._lock = threading.Lock()
self.stop = False
self.on_hold = False
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def start(self):
self._thread = threading.Thread(target=self.downloader)
self._thread.start()
def submit(self, datafile):
self._lock.acquire()
self._downloads.append(datafile)
self._lock.release()
def downloader(self):
try:
def suscribers_waiting(datafile):
waiting = 0
for diag in datafile.suscribers:
if not isinstance(diag, Diagnostic):
continue
if diag.pending_requests() == 1:
waiting += 1
return waiting
def prioritize(datafile1, datafile2):
waiting = suscribers_waiting(datafile1) - suscribers_waiting(datafile2)
if waiting:
return -waiting
suscribers = len(datafile1.suscribers) - len(datafile2.suscribers)
if suscribers:
return -suscribers
Javier Vegas-Regidor
committed
if datafile1.size is None:
if datafile2.size is None:
return 0
else:
return -1
elif datafile2.size is None:
return 1
size = datafile1.size - datafile2.size
if size:
return -size
return 0
while True:
with self._lock:
if len(self._downloads) == 0 or self.on_hold:
if self.stop:
return
time.sleep(0.01)
break
self._downloads.sort(prioritize)
datafile = self._downloads[0]
self._downloads.remove(datafile)
datafile.download()
except Exception as ex:
Log.critical('Unhandled error at downloader: {0}\n{1}', ex, traceback.print_exc())
def shutdown(self):
self.stop = True
self._thread.join()