Newer
Older
import operator
from concurrent.futures import ThreadPoolExecutor
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.ocean import *
from earthdiagnostics.general import *
from earthdiagnostics.statistics import *
def __init__(self, config, data_manager):
self.jobs = None
self.time = {}
self.had_errors = False
def prepare_job_list(self):
self._register_diagnostics()
list_jobs = list()
for fulldiag in self.config.get_commands():
Log.info("Adding {0} to diagnostic list", fulldiag)
diag_options = fulldiag.split(',')
diag_class = Diagnostic.get_diagnostic(diag_options[0])
if diag_class:
try:
for job in diag_class.generate_jobs(self, diag_options):
list_jobs.append(job)
continue
except DiagnosticOptionError as ex:
Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex)
self.had_errors = True
else:
Log.error('{0} is not an available diagnostic', diag_options[0])
self.had_errors = True
self.jobs = list_jobs
def run(self):
time = datetime.datetime.now()
Log.info("Starting to compute at {0}", time)
self.threads = Utils.available_cpu_count()
if 0 < self.config.max_cores < self.threads:
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
self.downloader = ThreadPoolExecutor(1)
self.uploader = ThreadPoolExecutor(1)
self.executor = ThreadPoolExecutor(self.threads)
job.subscribe(self, self._job_status_changed)
for subjob in job.subjobs:
subjob.subscribe(self, self._job_status_changed)
job.check_is_ready()
for file_object in self.data_manager.requested_files.values():
file_object.subscribe(self, self._file_object_status_changed)
if file_object.download_required():
self.downloader.submit(file_object.download)
self.lock = threading.Lock()
self.lock.acquire()
self.check_completion()
self.lock.acquire()
self.downloader.shutdown()
self.executor.shutdown()
TempFile.clean()
finish_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finish_time)
Log.result("Elapsed time: {0}\n", finish_time - time)
self.print_errors()
self.print_stats()
def _job_status_changed(self, job):
if job.status == DiagnosticStatus.READY:
self.executor.submit(self._run_job, job)
self.check_completion()
def _file_object_status_changed(self, file_object):
if file_object.download_required():
self.downloader.submit(file_object.download)
return
if file_object.upload_required():
self.uploader.submit(file_object.upload)
return
self.check_completion()
for job in self.jobs:
if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING):
return False
if job.status == DiagnosticStatus.WAITING:
if job.all_requests_in_storage():
return False
for request in self.data_manager.requested_files.values():
if request.storage_status == StorageStatus.UPLOADING:
return
if request.local_status == LocalStatus.DOWNLOADING:
return
if request.upload_required():
return
if request.download_required():
return
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def print_stats(self):
Log.info('Time consumed by each diagnostic class')
Log.info('--------------------------------------')
times = {}
for job in self.jobs:
job_type = job.alias
if job_type in times.keys():
times[job_type] += job.consumed_time
else:
times[job_type] = job.consumed_time
for diag in sorted(times, key=operator.itemgetter(1)):
Log.info('{0:23} {1:}', diag, times[diag])
def print_errors(self):
failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED]
if len(failed) == 0:
return
self.had_errors = True
Log.error('Failed jobs')
Log.error('-----------')
for job in failed:
Log.error('{0}: {0.message}', job)
Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta()))
Log.info('')
Log.info('Starting {0}', job)
job.status = DiagnosticStatus.RUNNING
time = datetime.datetime.now()
job.consumed_time = datetime.datetime.now() - time
job.message = str(ex)
Log.error('Job {0} failed: {1}', job, ex)
job.status = DiagnosticStatus.FAILED
return False
job.consumed_time = datetime.datetime.now() - time
Log.result('Finished {0}', job)
job.status = DiagnosticStatus.COMPLETED
return True
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@staticmethod
def _register_diagnostics():
WorkManager._register_ocean_diagnostics()
WorkManager._register_general_diagnostics()
WorkManager._register_stats_diagnostics()
@staticmethod
def _register_stats_diagnostics():
Diagnostic.register(MonthlyPercentile)
Diagnostic.register(ClimatologicalPercentile)
@staticmethod
def _register_general_diagnostics():
Diagnostic.register(DailyMean)
Diagnostic.register(MonthlyMean)
Diagnostic.register(YearlyMean)
Diagnostic.register(Rewrite)
Diagnostic.register(Relink)
Diagnostic.register(RelinkAll)
Diagnostic.register(Scale)
Diagnostic.register(Attribute)
@staticmethod
def _register_ocean_diagnostics():
Diagnostic.register(MixedLayerSaltContent)
Diagnostic.register(Siasiesiv)
Diagnostic.register(VerticalMean)
Diagnostic.register(VerticalMeanMeters)
Diagnostic.register(Interpolate)
Diagnostic.register(InterpolateCDO)
Diagnostic.register(Moc)
Diagnostic.register(AreaMoc)
Diagnostic.register(MaxMoc)
Diagnostic.register(Psi)
Diagnostic.register(Gyres)
Diagnostic.register(ConvectionSites)
Diagnostic.register(CutSection)
Diagnostic.register(AverageSection)
Diagnostic.register(MixedLayerHeatContent)
Diagnostic.register(HeatContentLayer)
Diagnostic.register(HeatContent)
Diagnostic.register(RegionMean)
Diagnostic.register(Rotation)