Newer
Older
import operator
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError
from earthdiagnostics.utils import Utils, TempFile
def __init__(self, config, data_manager):
self.jobs = None
self.time = {}
self.had_errors = False
def prepare_job_list(self):
self._register_diagnostics()
list_jobs = list()
for fulldiag in self.config.get_commands():
Log.info("Adding {0} to diagnostic list", fulldiag)
diag_options = fulldiag.split(',')
diag_class = Diagnostic.get_diagnostic(diag_options[0])
if diag_class:
try:
for job in diag_class.generate_jobs(self, diag_options):
list_jobs.append(job)
for subjob in job.subjobs:
list_jobs.append(subjob)
continue
except DiagnosticOptionError as ex:
Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex)
self.had_errors = True
else:
Log.error('{0} is not an available diagnostic', diag_options[0])
self.had_errors = True
self.jobs = list_jobs
if len(self.jobs) == 0:
Log.result('No diagnostics to run')
start_time = datetime.datetime.now()
Log.info("Starting to compute at {0}", start_time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
self.downloader = Downloader()
self.uploader = ThreadPoolExecutor(self.config.parallel_uploads)
self.executor = ThreadPoolExecutor(self.threads)
job.subscribe(self, self._job_status_changed)
job.check_is_ready()
if self.config.skip_diags_done:
for job in self.jobs:
if job.can_skip_run():
Log.info('Diagnostic {0} already done. Skipping !', job)
job.status = DiagnosticStatus.COMPLETED
for file_object in self.data_manager.requested_files.values():
file_object.subscribe(self, self._file_object_status_changed)
if file_object.download_required():
self.downloader.submit(file_object)
self.downloader.start()
self.lock = threading.Lock()
self.lock.acquire()
self.lock.acquire()
self.downloader.shutdown()
self.executor.shutdown()
TempFile.clean()
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - start_time)
self.print_errors()
self.print_stats()
def _job_status_changed(self, job):
if job.status == DiagnosticStatus.READY:
self.executor.submit(self._run_job, job)
for request in job._requests:
if request.only_suscriber(self):
del self.data_manager.requested_files[request.remote_file]
request.unsubscribe(self)
def _file_object_status_changed(self, file_object):
Log.debug('Checking file {0}. Local status {0.local_status} Storage status{0.storage_status}', file_object)
if file_object.download_required():
self.downloader.submit(file_object)
return
if file_object.upload_required():
file_object.storage_status = StorageStatus.UPLOADING
self.uploader.submit(file_object.upload)
return
if file_object.local_status != LocalStatus.COMPUTING and \
file_object.storage_status != StorageStatus.UPLOADING and \
file_object.only_suscriber(self):
del self.data_manager.requested_files[file_object.remote_file]
file_object.unsubscribe(self)
self.check_completion()
counter = 0
for job in self.jobs:
if job.status == DiagnosticStatus.READY:
counter += 1
if counter > 20:
break
self.downloader.on_hold = counter > 20
for job in self.jobs:
if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING):
return False
if job.status == DiagnosticStatus.WAITING:
if job.all_requests_in_storage():
return False
for request in self.data_manager.requested_files.values():
if request.storage_status == StorageStatus.UPLOADING:
if request.local_status == LocalStatus.DOWNLOADING:
if request.upload_required():
if request.download_required():
return False
try:
self.lock.release()
except Exception:
pass
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def print_stats(self):
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
times = {}
for job in self.jobs:
job_type = job.alias
if job_type in times.keys():
times[job_type] += job.consumed_time
else:
times[job_type] = job.consumed_time
for diag in sorted(times, key=operator.itemgetter(1)):
Log.info('{0:23} {1:}', diag, times[diag])
def print_errors(self):
failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED]
if len(failed) == 0:
return
self.had_errors = True
Log.error('Failed jobs')
Log.error('-----------')
for job in failed:
Log.error('{0}: {0.message}', job)
Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta()))
Log.info('')
Log.info('Starting {0}', job)
job.status = DiagnosticStatus.RUNNING
job.consumed_time = datetime.datetime.now() - start_time
exc_type, exc_value, exc_traceback = sys.exc_info()
job.message = '{0}\n{1}'.format(ex, ''.join(traceback.format_tb(exc_traceback)))
Log.error('Job {0} failed ({2}): {1}', job, job.message, exc_type)
job.status = DiagnosticStatus.FAILED
return False
job.consumed_time = datetime.datetime.now() - start_time
Log.result('Finished {0}', job)
job.status = DiagnosticStatus.COMPLETED
return True
@staticmethod
def _register_diagnostics():
WorkManager._register_ocean_diagnostics()
WorkManager._register_general_diagnostics()
WorkManager._register_stats_diagnostics()
@staticmethod
def _register_stats_diagnostics():
from earthdiagnostics.statistics.monthlypercentile import MonthlyPercentile
from earthdiagnostics.statistics.climatologicalpercentile import ClimatologicalPercentile
from earthdiagnostics.statistics.daysoverpercentile import DaysOverPercentile
from earthdiagnostics.statistics.discretize import Discretize
Diagnostic.register(MonthlyPercentile)
Diagnostic.register(ClimatologicalPercentile)
Diagnostic.register(DaysOverPercentile)
@staticmethod
def _register_general_diagnostics():
from earthdiagnostics.general.attribute import Attribute
from earthdiagnostics.general.dailymean import DailyMean
from earthdiagnostics.general.module import Module
from earthdiagnostics.general.monthlymean import MonthlyMean
from earthdiagnostics.general.yearlymean import YearlyMean
from earthdiagnostics.general.rewrite import Rewrite
from earthdiagnostics.general.relink import Relink
from earthdiagnostics.general.relinkall import RelinkAll
from earthdiagnostics.general.scale import Scale
from earthdiagnostics.general.verticalmeanmetersiris import VerticalMeanMetersIris
from earthdiagnostics.general.simplify_dimensions import SimplifyDimensions
Diagnostic.register(DailyMean)
Diagnostic.register(MonthlyMean)
Diagnostic.register(YearlyMean)
Diagnostic.register(Rewrite)
Diagnostic.register(Relink)
Diagnostic.register(RelinkAll)
Diagnostic.register(Scale)
Diagnostic.register(Attribute)
Diagnostic.register(Module)
Diagnostic.register(VerticalMeanMetersIris)
Diagnostic.register(SimplifyDimensions)
@staticmethod
def _register_ocean_diagnostics():
from earthdiagnostics.ocean.mixedlayerheatcontent import MixedLayerHeatContent
from earthdiagnostics.ocean.mixedlayersaltcontent import MixedLayerSaltContent
from earthdiagnostics.ocean.siasiesiv import Siasiesiv
from earthdiagnostics.ocean.verticalmean import VerticalMean
from earthdiagnostics.ocean.verticalmeanmeters import VerticalMeanMeters
from earthdiagnostics.ocean.verticalgradient import VerticalGradient
from earthdiagnostics.ocean.interpolate import Interpolate
from earthdiagnostics.ocean.interpolatecdo import InterpolateCDO
from earthdiagnostics.ocean.moc import Moc
from earthdiagnostics.ocean.areamoc import AreaMoc
from earthdiagnostics.ocean.maxmoc import MaxMoc
from earthdiagnostics.ocean.psi import Psi
from earthdiagnostics.ocean.gyres import Gyres
from earthdiagnostics.ocean.convectionsites import ConvectionSites
from earthdiagnostics.ocean.cutsection import CutSection
from earthdiagnostics.ocean.averagesection import AverageSection
from earthdiagnostics.ocean.heatcontentlayer import HeatContentLayer
from earthdiagnostics.ocean.heatcontent import HeatContent
from earthdiagnostics.ocean.regionmean import RegionMean
from earthdiagnostics.ocean.regionsum import RegionSum
from earthdiagnostics.ocean.rotation import Rotation
Diagnostic.register(MixedLayerSaltContent)
Diagnostic.register(Siasiesiv)
Diagnostic.register(VerticalMean)
Diagnostic.register(VerticalMeanMeters)
Diagnostic.register(Interpolate)
Diagnostic.register(InterpolateCDO)
Diagnostic.register(Moc)
Diagnostic.register(AreaMoc)
Diagnostic.register(MaxMoc)
Diagnostic.register(Psi)
Diagnostic.register(Gyres)
Diagnostic.register(ConvectionSites)
Diagnostic.register(CutSection)
Diagnostic.register(AverageSection)
Diagnostic.register(MixedLayerHeatContent)
Diagnostic.register(HeatContentLayer)
Diagnostic.register(HeatContent)
Diagnostic.register(RegionMean)
Diagnostic.register(VerticalGradient)
class Downloader(object):
def __init__(self):
self._downloads = []
self._lock = threading.Lock()
self.stop = False
self.on_hold = False
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def start(self):
self._thread = threading.Thread(target=self.downloader)
self._thread.start()
def submit(self, datafile):
self._lock.acquire()
self._downloads.append(datafile)
self._lock.release()
def downloader(self):
try:
def suscribers_waiting(datafile):
waiting = 0
for diag in datafile.suscribers:
if not isinstance(diag, Diagnostic):
continue
if diag.pending_requests() == 1:
waiting += 1
return waiting
def prioritize(datafile1, datafile2):
waiting = suscribers_waiting(datafile1) - suscribers_waiting(datafile2)
if waiting:
return -waiting
suscribers = len(datafile1.suscribers) - len(datafile2.suscribers)
if suscribers:
return -suscribers
Javier Vegas-Regidor
committed
if datafile1.size is None:
if datafile2.size is None:
return 0
else:
return -1
elif datafile2.size is None:
return 1
size = datafile1.size - datafile2.size
if size:
return -size
return 0
while True:
with self._lock:
if len(self._downloads) == 0 or self.on_hold:
if self.stop:
return
time.sleep(0.01)
self._downloads.sort(prioritize)
datafile = self._downloads[0]
self._downloads.remove(datafile)
datafile.download()
except Exception as ex:
Log.critical('Unhandled error at downloader: {0}\n{1}', ex, traceback.print_exc())
def shutdown(self):
self.stop = True
self._thread.join()