work_manager.py 13.9 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import datetime
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# noinspection PyCompatibility
from concurrent.futures import ThreadPoolExecutor
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from bscearth.utils.log import Log

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError
from earthdiagnostics.utils import Utils, TempFile
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

class WorkManager(object):

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, config, data_manager):
        self.jobs = None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.config = config
        self.time = {}
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_manager = data_manager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def prepare_job_list(self):
        self._register_diagnostics()
        list_jobs = list()
        for fulldiag in self.config.get_commands():
            Log.info("Adding {0} to diagnostic list", fulldiag)
            diag_options = fulldiag.split(',')

            diag_class = Diagnostic.get_diagnostic(diag_options[0])
            if diag_class:
                try:
                    for job in diag_class.generate_jobs(self, diag_options):
                        list_jobs.append(job)
                        for subjob in job.subjobs:
                            list_jobs.append(subjob)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    continue
                except DiagnosticOptionError as ex:
                    Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex)
                    self.had_errors = True
            else:
                Log.error('{0} is not an available diagnostic', diag_options[0])
                self.had_errors = True
        self.jobs = list_jobs

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def run(self):
        if len(self.jobs) == 0:
            Log.result('No diagnostics to run')
        start_time = datetime.datetime.now()
        Log.info("Starting to compute at {0}", start_time)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.threads = Utils.available_cpu_count()
        if 0 < self.config.max_cores < self.threads:
            self.threads = self.config.max_cores
        Log.info('Using {0} threads', self.threads)

        self.downloader = Downloader()
        self.uploader = ThreadPoolExecutor(self.config.parallel_uploads)
        self.executor = ThreadPoolExecutor(self.threads)

        for job in self.jobs:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            job.request_data()
            job.declare_data_generated()
            job.subscribe(self, self._job_status_changed)
        if self.config.skip_diags_done:
            for job in self.jobs:
                if job.can_skip_run():
                    Log.info('Diagnostic {0} already done. Skipping !', job)
                    job.status = DiagnosticStatus.COMPLETED

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for file_object in self.data_manager.requested_files.values():
            file_object.subscribe(self, self._file_object_status_changed)
            if file_object.download_required():
                self.downloader.submit(file_object)
        self.lock = threading.Lock()
        self.lock.acquire()

        # self.check_completion()
        self.lock.acquire()

        self.downloader.shutdown()
        self.executor.shutdown()
        self.uploader.shutdown(True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        TempFile.clean()
        finish_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finish_time)
        Log.result("Elapsed time: {0}\n", finish_time - start_time)
        self.print_errors()
        self.print_stats()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return not self.had_errors

    def _job_status_changed(self, job):
        if job.status == DiagnosticStatus.READY:
            self.executor.submit(self._run_job, job)
        for request in job._requests:
            if request.only_suscriber(self):
                del self.data_manager.requested_files[request.remote_file]
                request.unsubscribe(self)
                request.clean_local()
        self.check_completion()

    def _file_object_status_changed(self, file_object):
        Log.debug('Checking file {0}. Local status {0.local_status} Storage status{0.storage_status}', file_object)
        if file_object.download_required():
            self.downloader.submit(file_object)
            return
        if file_object.upload_required():
            file_object.storage_status = StorageStatus.UPLOADING
            self.uploader.submit(file_object.upload)
            return
        if file_object.local_status != LocalStatus.COMPUTING and \
                file_object.storage_status != StorageStatus.UPLOADING and \
                file_object.only_suscriber(self):
            del self.data_manager.requested_files[file_object.remote_file]
            file_object.unsubscribe(self)
            file_object.clean_local()
    def check_completion(self):
        counter = 0
        for job in self.jobs:
            if job.status == DiagnosticStatus.READY:
                counter += 1
                if counter > 20:
                    break

        self.downloader.on_hold = counter > 20

        for job in self.jobs:
            if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING):
                return False
            if job.status == DiagnosticStatus.WAITING:
                if job.all_requests_in_storage():
                    return False

        for request in self.data_manager.requested_files.values():
            if request.storage_status == StorageStatus.UPLOADING:
                return False
            if request.local_status == LocalStatus.DOWNLOADING:
                return False
            if request.upload_required():
                return False
            if request.download_required():
                return False
        try:
            self.lock.release()
        except Exception:
            pass
    def print_stats(self):
        Log.info('Time consumed by each diagnostic class')
        Log.info('--------------------------------------')

        times = {}
        for job in self.jobs:
            job_type = job.alias
            if job_type in times.keys():
                times[job_type] += job.consumed_time
            else:
                times[job_type] = job.consumed_time

        for diag in sorted(times, key=operator.itemgetter(1)):
            Log.info('{0:23} {1:}', diag, times[diag])

    def print_errors(self):
        failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED]
        if len(failed) == 0:
            return
        self.had_errors = True
        Log.error('Failed jobs')
        Log.error('-----------')
        for job in failed:
            Log.error('{0}: {0.message}', job)
        Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta()))
        Log.info('')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def _run_job(job):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        start_time = datetime.datetime.now()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
            Log.info('Starting {0}', job)
            job.status = DiagnosticStatus.RUNNING
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        except Exception as ex:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            job.consumed_time = datetime.datetime.now() - start_time
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            exc_type, _, exc_traceback = sys.exc_info()
            job.message = '{0}\n{1}'.format(ex, ''.join(traceback.format_tb(exc_traceback)))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.error('Job {0} failed ({2}): {1}', job, job.message, exc_type)
            job.status = DiagnosticStatus.FAILED
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        job.consumed_time = datetime.datetime.now() - start_time
        Log.result('Finished {0}', job)
        job.status = DiagnosticStatus.COMPLETED
        return True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    count = 0
    failed_jobs = list()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def _register_diagnostics():
        WorkManager._register_ocean_diagnostics()
        WorkManager._register_general_diagnostics()
        WorkManager._register_stats_diagnostics()

    @staticmethod
    def _register_stats_diagnostics():
        from earthdiagnostics.statistics.monthlypercentile import MonthlyPercentile
        from earthdiagnostics.statistics.climatologicalpercentile import ClimatologicalPercentile
        from earthdiagnostics.statistics.daysoverpercentile import DaysOverPercentile
        from earthdiagnostics.statistics.discretize import Discretize

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(MonthlyPercentile)
        Diagnostic.register(ClimatologicalPercentile)
        Diagnostic.register(DaysOverPercentile)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(Discretize)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def _register_general_diagnostics():
        from earthdiagnostics.general.attribute import Attribute
        from earthdiagnostics.general.dailymean import DailyMean
        from earthdiagnostics.general.module import Module
        from earthdiagnostics.general.monthlymean import MonthlyMean
        from earthdiagnostics.general.yearlymean import YearlyMean
        from earthdiagnostics.general.rewrite import Rewrite
        from earthdiagnostics.general.relink import Relink
        from earthdiagnostics.general.relinkall import RelinkAll
        from earthdiagnostics.general.scale import Scale
        from earthdiagnostics.general.verticalmeanmetersiris import VerticalMeanMetersIris
        from earthdiagnostics.general.simplify_dimensions import SimplifyDimensions

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(DailyMean)
        Diagnostic.register(MonthlyMean)
        Diagnostic.register(YearlyMean)
        Diagnostic.register(Rewrite)
        Diagnostic.register(Relink)
        Diagnostic.register(RelinkAll)
        Diagnostic.register(Scale)
        Diagnostic.register(Attribute)
        Diagnostic.register(Module)
        Diagnostic.register(VerticalMeanMetersIris)
        Diagnostic.register(SimplifyDimensions)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def _register_ocean_diagnostics():
        from earthdiagnostics.ocean.mixedlayerheatcontent import MixedLayerHeatContent
        from earthdiagnostics.ocean.mixedlayersaltcontent import MixedLayerSaltContent
        from earthdiagnostics.ocean.siasiesiv import Siasiesiv
        from earthdiagnostics.ocean.verticalmean import VerticalMean
        from earthdiagnostics.ocean.verticalmeanmeters import VerticalMeanMeters
        from earthdiagnostics.ocean.verticalgradient import VerticalGradient
        from earthdiagnostics.ocean.interpolate import Interpolate
        from earthdiagnostics.ocean.interpolatecdo import InterpolateCDO
        from earthdiagnostics.ocean.moc import Moc
        from earthdiagnostics.ocean.areamoc import AreaMoc
        from earthdiagnostics.ocean.maxmoc import MaxMoc
        from earthdiagnostics.ocean.psi import Psi
        from earthdiagnostics.ocean.gyres import Gyres
        from earthdiagnostics.ocean.convectionsites import ConvectionSites
        from earthdiagnostics.ocean.cutsection import CutSection
        from earthdiagnostics.ocean.averagesection import AverageSection
        from earthdiagnostics.ocean.heatcontentlayer import HeatContentLayer
        from earthdiagnostics.ocean.heatcontent import HeatContent
        from earthdiagnostics.ocean.regionmean import RegionMean
        from earthdiagnostics.ocean.regionsum import RegionSum
        from earthdiagnostics.ocean.rotation import Rotation

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(MixedLayerSaltContent)
        Diagnostic.register(Siasiesiv)
        Diagnostic.register(VerticalMean)
        Diagnostic.register(VerticalMeanMeters)
        Diagnostic.register(Interpolate)
        Diagnostic.register(InterpolateCDO)
        Diagnostic.register(Moc)
        Diagnostic.register(AreaMoc)
        Diagnostic.register(MaxMoc)
        Diagnostic.register(Psi)
        Diagnostic.register(Gyres)
        Diagnostic.register(ConvectionSites)
        Diagnostic.register(CutSection)
        Diagnostic.register(AverageSection)
        Diagnostic.register(MixedLayerHeatContent)
        Diagnostic.register(HeatContentLayer)
        Diagnostic.register(HeatContent)
        Diagnostic.register(RegionMean)
        Diagnostic.register(RegionSum)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(Rotation)
        Diagnostic.register(VerticalGradient)
class Downloader(object):
    def __init__(self):
        self._downloads = []
        self._lock = threading.Lock()
        self.stop = False

    def start(self):
        self._thread = threading.Thread(target=self.downloader)
        self._thread.start()

    def submit(self, datafile):
        self._lock.acquire()
        self._downloads.append(datafile)
        self._lock.release()

    def downloader(self):
        try:
            def suscribers_waiting(datafile):
                waiting = 0
                for diag in datafile.suscribers:
                    if not isinstance(diag, Diagnostic):
                        continue
                    if diag.pending_requests() == 1:
                        waiting += 1
                return waiting

            def prioritize(datafile1, datafile2):
                waiting = suscribers_waiting(datafile1) - suscribers_waiting(datafile2)
                if waiting:
                    return -waiting

                suscribers = len(datafile1.suscribers) - len(datafile2.suscribers)
                if suscribers:
                    return -suscribers

                if datafile1.size is None:
                    if datafile2.size is None:
                        return 0
                    else:
                        return -1
                elif datafile2.size is None:
                    return 1
                size = datafile1.size - datafile2.size
                if size:
                    return -size
                return 0

            while True:
                with self._lock:
                    if len(self._downloads) == 0 or self.on_hold:
                        continue
                    self._downloads.sort(prioritize)
                    datafile = self._downloads[0]
                    self._downloads.remove(datafile)
                datafile.download()
        except Exception as ex:
            Log.critical('Unhandled error at downloader: {0}\n{1}', ex, traceback.print_exc())

    def shutdown(self):
        self.stop = True
        self._thread.join()