work_manager.py 11.5 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import datetime
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from bscearth.utils.log import Log
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# noinspection PyCompatibility
from concurrent.futures import ThreadPoolExecutor
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError
from earthdiagnostics.general import *
from earthdiagnostics.ocean import *
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.statistics import *
from earthdiagnostics.utils import Utils, TempFile
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

class WorkManager(object):

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, config, data_manager):
        self.jobs = None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.config = config
        self.time = {}
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_manager = data_manager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def prepare_job_list(self):
        self._register_diagnostics()
        list_jobs = list()
        for fulldiag in self.config.get_commands():
            Log.info("Adding {0} to diagnostic list", fulldiag)
            diag_options = fulldiag.split(',')

            diag_class = Diagnostic.get_diagnostic(diag_options[0])
            if diag_class:
                try:
                    for job in diag_class.generate_jobs(self, diag_options):
                        list_jobs.append(job)
                        for subjob in job.subjobs:
                            list_jobs.append(subjob)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    continue
                except DiagnosticOptionError as ex:
                    Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex)
                    self.had_errors = True
            else:
                Log.error('{0} is not an available diagnostic', diag_options[0])
                self.had_errors = True
        self.jobs = list_jobs

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def run(self):
        if len(self.jobs) == 0:
            Log.result('No diagnostics to run')
            return
        
        start_time = datetime.datetime.now()
        Log.info("Starting to compute at {0}", start_time)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.threads = Utils.available_cpu_count()
        if 0 < self.config.max_cores < self.threads:
            self.threads = self.config.max_cores
        Log.info('Using {0} threads', self.threads)

        self.downloader = Downloader()
        self.uploader = ThreadPoolExecutor(self.config.parallel_uploads)
        self.executor = ThreadPoolExecutor(self.threads)

        for job in self.jobs:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            job.request_data()
            job.declare_data_generated()
            job.subscribe(self, self._job_status_changed)
        if self.config.skip_diags_done:
            for job in self.jobs:
                if job.can_skip_run():
                    Log.info('Diagnostic {0} already done. Skipping !', job)
                    job.status = DiagnosticStatus.COMPLETED

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for file_object in self.data_manager.requested_files.values():
            file_object.subscribe(self, self._file_object_status_changed)
            if file_object.download_required():
                self.downloader.submit(file_object)
        self.lock = threading.Lock()
        self.lock.acquire()

        # self.check_completion()
        self.lock.acquire()

        self.downloader.shutdown()
        self.executor.shutdown()
        self.uploader.shutdown(True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        TempFile.clean()
        finish_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finish_time)
        Log.result("Elapsed time: {0}\n", finish_time - start_time)
        self.print_errors()
        self.print_stats()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return not self.had_errors

    def _job_status_changed(self, job):
        if job.status == DiagnosticStatus.READY:
            self.executor.submit(self._run_job, job)
        for request in job._requests:
            if request.only_suscriber(self):
                del self.data_manager.requested_files[request.remote_file]
                request.unsubscribe(self)
                request.clean_local()
        self.check_completion()

    def _file_object_status_changed(self, file_object):
        Log.debug('Checking file {0}. Local status {0.local_status} Storage status{0.storage_status}', file_object)
        if file_object.download_required():
            self.downloader.submit(file_object)
            return
        if file_object.upload_required():
            file_object.storage_status = StorageStatus.UPLOADING
            self.uploader.submit(file_object.upload)
            return
        if file_object.local_status != LocalStatus.COMPUTING and \
                file_object.storage_status != StorageStatus.UPLOADING and \
                file_object.only_suscriber(self):
            del self.data_manager.requested_files[file_object.remote_file]
            file_object.unsubscribe(self)
            file_object.clean_local()
    def check_completion(self):
        counter = 0
        for job in self.jobs:
            if job.status == DiagnosticStatus.READY:
                counter += 1
                if counter > 20:
                    break

        self.downloader.on_hold = counter > 20

        for job in self.jobs:
            if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING):
                return False
            if job.status == DiagnosticStatus.WAITING:
                if job.all_requests_in_storage():
                    return False

        for request in self.data_manager.requested_files.values():
            if request.storage_status == StorageStatus.UPLOADING:
                return False
            if request.local_status == LocalStatus.DOWNLOADING:
                return False
            if request.upload_required():
                return False
            if request.download_required():
                return False
        try:
            self.lock.release()
        except Exception:
            pass
    def print_stats(self):
        Log.info('Time consumed by each diagnostic class')
        Log.info('--------------------------------------')

        times = {}
        for job in self.jobs:
            job_type = job.alias
            if job_type in times.keys():
                times[job_type] += job.consumed_time
            else:
                times[job_type] = job.consumed_time

        for diag in sorted(times, key=operator.itemgetter(1)):
            Log.info('{0:23} {1:}', diag, times[diag])

    def print_errors(self):
        failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED]
        if len(failed) == 0:
            return
        self.had_errors = True
        Log.error('Failed jobs')
        Log.error('-----------')
        for job in failed:
            Log.error('{0}: {0.message}', job)
        Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta()))
        Log.info('')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def _run_job(job):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        time = datetime.datetime.now()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
            Log.info('Starting {0}', job)
            job.status = DiagnosticStatus.RUNNING
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        except Exception as ex:
            job.consumed_time = datetime.datetime.now() - time
            exc_type, exc_value, exc_traceback = sys.exc_info()
            job.message = '{0}\n{1}'.format(ex, ''.join(traceback.format_tb(exc_traceback)))
            Log.error('Job {0} failed: {1}', job, job.message   )
            job.status = DiagnosticStatus.FAILED
        job.consumed_time = datetime.datetime.now() - time
        Log.result('Finished {0}', job)
        job.status = DiagnosticStatus.COMPLETED
        return True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    count = 0
    failed_jobs = list()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def _register_diagnostics():
        WorkManager._register_ocean_diagnostics()
        WorkManager._register_general_diagnostics()
        WorkManager._register_stats_diagnostics()

    @staticmethod
    def _register_stats_diagnostics():
        Diagnostic.register(MonthlyPercentile)
        Diagnostic.register(ClimatologicalPercentile)
        Diagnostic.register(DaysOverPercentile)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(Discretize)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def _register_general_diagnostics():
        Diagnostic.register(DailyMean)
        Diagnostic.register(MonthlyMean)
        Diagnostic.register(YearlyMean)
        Diagnostic.register(Rewrite)
        Diagnostic.register(Relink)
        Diagnostic.register(RelinkAll)
        Diagnostic.register(Scale)
        Diagnostic.register(Attribute)
        Diagnostic.register(Module)
        Diagnostic.register(VerticalMeanMetersIris)
        Diagnostic.register(SimplifyDimensions)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def _register_ocean_diagnostics():
        Diagnostic.register(MixedLayerSaltContent)
        Diagnostic.register(Siasiesiv)
        Diagnostic.register(VerticalMean)
        Diagnostic.register(VerticalMeanMeters)
        Diagnostic.register(Interpolate)
        Diagnostic.register(InterpolateCDO)
        Diagnostic.register(Moc)
        Diagnostic.register(AreaMoc)
        Diagnostic.register(MaxMoc)
        Diagnostic.register(Psi)
        Diagnostic.register(Gyres)
        Diagnostic.register(ConvectionSites)
        Diagnostic.register(CutSection)
        Diagnostic.register(AverageSection)
        Diagnostic.register(MixedLayerHeatContent)
        Diagnostic.register(HeatContentLayer)
        Diagnostic.register(HeatContent)
        Diagnostic.register(RegionMean)
        Diagnostic.register(Rotation)
        Diagnostic.register(VerticalGradient)
class Downloader(object):
    def __init__(self):
        self._downloads = []
        self._lock = threading.Lock()
        self.stop = False

    def start(self):
        self._thread = threading.Thread(target=self.downloader)
        self._thread.start()

    def submit(self, datafile):
        self._lock.acquire()
        self._downloads.append(datafile)
        self._lock.release()

    def downloader(self):
        try:
            def suscribers_waiting(datafile):
                waiting = 0
                for diag in datafile.suscribers:
                    if not isinstance(diag, Diagnostic):
                        continue
                    if diag.pending_requests() == 1:
                        waiting += 1
                return waiting

            def prioritize(datafile1, datafile2):
                waiting = suscribers_waiting(datafile1) - suscribers_waiting(datafile2)
                if waiting:
                    return -waiting

                suscribers = len(datafile1.suscribers) - len(datafile2.suscribers)
                if suscribers:
                    return -suscribers

                if datafile1.size is None:
                    if datafile2.size is None:
                        return 0
                    else:
                        return -1
                elif datafile2.size is None:
                    return 1
                size = datafile1.size - datafile2.size
                if size:
                    return -size
                return 0

            while True:
                with self._lock:
                    if len(self._downloads) == 0 or self.on_hold:
                        if self.stop:
                            return
                        time.sleep(0.01)
                        break
                    self._downloads.sort(prioritize)
                    datafile = self._downloads[0]
                    self._downloads.remove(datafile)
                datafile.download()
        except Exception as ex:
            Log.critical('Unhandled error at downloader: {0}\n{1}', ex, traceback.print_exc())

    def shutdown(self):
        self.stop = True
        self._thread.join()