work_manager.py 16.8 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
"""Earthdiagnostics workflow manager"""
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
import datetime
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# noinspection PyCompatibility
from concurrent.futures import ThreadPoolExecutor
from functools import cmp_to_key
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from threading import Thread
import six
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from bscearth.utils.log import Log

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError
from earthdiagnostics.utils import Utils, TempFile
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

class WorkManager(object):
    """
    Class to produce and control the workflow of EarthDiagnostics

    Parameters
    ----------
    config: Config
    data_manager: DataManager
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, config, data_manager):
        self.jobs = {}
        self.jobs[DiagnosticStatus.WAITING] = []
        self.jobs[DiagnosticStatus.READY] = []
        self.jobs[DiagnosticStatus.RUNNING] = []
        self.jobs[DiagnosticStatus.COMPLETED] = []
        self.jobs[DiagnosticStatus.FAILED] = []

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.config = config
        self.time = {}
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_manager = data_manager
        self.uploads_failed = []
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def prepare_job_list(self):
        """Create the list of jobs to run"""
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._register_diagnostics()
        for fulldiag in self.config.get_commands():
            Log.info("Adding {0} to diagnostic list", fulldiag)
            diag_options = fulldiag.split(',')

            diag_class = Diagnostic.get_diagnostic(diag_options[0])
            if diag_class:
                try:
                    for job in diag_class.generate_jobs(self, diag_options):
                        for subjob in job.subjobs:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    continue
                except DiagnosticOptionError as ex:
                    Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex)
                    self.had_errors = True
            else:
                Log.error('{0} is not an available diagnostic', diag_options[0])
                self.had_errors = True

    def add_job(self, job, old_status=None):
        if old_status is not None:
            self.jobs[old_status].remove(job)
        if job not in self.jobs[job.status]:
            self.jobs[job.status].append(job)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def run(self):
        """
        Run all the diagnostics

        Returns
        -------
        bool
            Only True if all diagnostic were correctly executed
        """
        if not self.jobs[DiagnosticStatus.WAITING] and not self.jobs[DiagnosticStatus.READY]:
            Log.result('No diagnostics to run')
        start_time = datetime.datetime.now()
        Log.info("Starting to compute at {0}", start_time)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.threads = Utils.available_cpu_count()
        if 0 < self.config.max_cores < self.threads:
            self.threads = self.config.max_cores
        Log.info('Using {0} threads', self.threads)

        self.downloader = Downloader()
        self.uploader = ThreadPoolExecutor(self.config.parallel_uploads)
        self.executor = ThreadPoolExecutor(self.threads)

        self.lock = threading.Lock()
        for job in self.jobs[DiagnosticStatus.WAITING]:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            job.request_data()
            job.declare_data_generated()
            job.subscribe(self, self._job_status_changed)
            if self.config.skip_diags_done:
                if job.can_skip_run():
                    Log.info('Diagnostic {0} already done. Skipping !', job)
                    job.status = DiagnosticStatus.COMPLETED
                    continue
            job.check_is_ready()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for file_object in self.data_manager.requested_files.values():
            file_object.subscribe(self, self._file_object_status_changed)
            if file_object.download_required():
                self.downloader.submit(file_object)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        printer_lock = threading.Lock()
        printer_lock.acquire()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        printer = Thread(target=self._print_status, name="Printer", args=(printer_lock,))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        printer.start()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.lock.acquire()
        printer_lock.release()
        self.downloader.shutdown()
        self.executor.shutdown()
        self.uploader.shutdown(True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        printer.join()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        TempFile.clean()
        finish_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finish_time)
        Log.result("Elapsed time: {0}\n", finish_time - start_time)
        self._print_errors()
        self._print_stats()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return not self.had_errors

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _print_status(self, lock):
        interval = 60
        while True:
            if six.PY3:
                if lock.acquire(blocking=True, timeout=interval):
                    return
            else:
                step = 0.5
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                for _ in range(int(interval / step)):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    if lock.acquire(blocking=False):
                        return
                    sys.sleep(step)

            Log.info('Current status:')
            Log.info('===============')
            Log.info('Waiting:      {0:4}', len(self.jobs[DiagnosticStatus.WAITING]))
            Log.info('Ready to run: {0:4}', len(self.jobs[DiagnosticStatus.READY]))
            Log.info('Running:      {0:4}', len(self.jobs[DiagnosticStatus.RUNNING]))
            Log.info('Completed:    {0:4}', len(self.jobs[DiagnosticStatus.COMPLETED]))
            Log.info('Failed:       {0:4}', len(self.jobs[DiagnosticStatus.FAILED]))

    def _job_status_changed(self, job, old_status):
        self.add_job(job, old_status)
        if job.status == DiagnosticStatus.READY:
            self.executor.submit(self._run_job, job)
        for request in job._requests:
            if request.only_suscriber(self):
                del self.data_manager.requested_files[request.remote_file]
                request.unsubscribe(self)
                request.clean_local()
        self._check_completion()
    def _file_object_status_changed(self, file_object):
        Log.debug('Checking file {0}. Local status {0.local_status} Storage status{0.storage_status}', file_object)
        if file_object.download_required():
            self.downloader.submit(file_object)
            return
        if file_object.upload_required():
            file_object.storage_status = StorageStatus.UPLOADING
            self.uploader.submit(file_object.upload)
            return
        if file_object.storage_status == StorageStatus.FAILED:
            self.uploads_failed.append(file_object)
        if file_object.local_status != LocalStatus.COMPUTING and \
                file_object.storage_status != StorageStatus.UPLOADING and \
                file_object.only_suscriber(self):
            del self.data_manager.requested_files[file_object.remote_file]
            file_object.unsubscribe(self)
            file_object.clean_local()
        self._check_completion()
    def _check_completion(self):
        self._pause_downloader_if_required()
        if self._jobs_running_or_ready():
            return False
        if self._data_downloading_or_uploading():
            return False

        try:
            self.lock.release()
        except threading.ThreadError:
            pass
        return True

    def _jobs_running_or_ready(self):
        if self.jobs[DiagnosticStatus.READY] or self.jobs[DiagnosticStatus.RUNNING]:
            return True
        for job in self.jobs[DiagnosticStatus.WAITING]:
            if job.all_requests_in_storage():
                return True
        return False
    def _data_downloading_or_uploading(self):
        for request in self.data_manager.requested_files.values():
            if request.storage_status == StorageStatus.UPLOADING:
                return True
            if request.local_status == LocalStatus.DOWNLOADING:
                return True
            if request.upload_required():
                return True
            if request.download_required():
                return True
        return False

    def _pause_downloader_if_required(self):
        self.downloader.on_hold = len(self.jobs[DiagnosticStatus.READY]) > 20
    def _print_stats(self):
        Log.info('Time consumed by each diagnostic class')
        Log.info('--------------------------------------')

        times = {}
        for job in self.jobs[DiagnosticStatus.COMPLETED] + self.jobs[DiagnosticStatus.FAILED]:
            job_type = job.alias
            if job_type in times.keys():
                times[job_type] += job.consumed_time
            else:
                times[job_type] = job.consumed_time

        for diag in sorted(times, key=operator.itemgetter(1)):
            Log.info('{0:23} {1:}', diag, times[diag])

    def _print_errors(self):
        if self.jobs[DiagnosticStatus.FAILED]:
            self.had_errors = True
            Log.error('Failed jobs')
            Log.error('-----------')
            for job in self.jobs[DiagnosticStatus.FAILED]:
                Log.error('{0}: {0.message}', job)
            Log.error('Total wasted time: {0}', sum([job.consumed_time for job in self.jobs[DiagnosticStatus.FAILED]],
                                                    datetime.timedelta()))
        if self.uploads_failed:
            self.had_errors = True
            Log.error('Failed uploads')
            Log.error('--------------')
            for file_object in self.uploads_failed:
                Log.error('{0}', file_object.remote_file)
            Log.info('')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def _run_job(job):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        start_time = datetime.datetime.now()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
            Log.info('Starting {0}', job)
            job.status = DiagnosticStatus.RUNNING
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        except Exception as ex:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            job.consumed_time = datetime.datetime.now() - start_time
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            exc_type, _, exc_traceback = sys.exc_info()
            job.message = '{0}\n{1}'.format(ex, ''.join(traceback.format_tb(exc_traceback)))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.error('Job {0} failed ({2}): {1}', job, job.message, exc_type)
            job.status = DiagnosticStatus.FAILED
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        job.consumed_time = datetime.datetime.now() - start_time
        Log.result('Finished {0}', job)
        job.status = DiagnosticStatus.COMPLETED
        return True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    count = 0
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def _register_diagnostics():
        WorkManager._register_ocean_diagnostics()
        WorkManager._register_general_diagnostics()
        WorkManager._register_stats_diagnostics()

    @staticmethod
    def _register_stats_diagnostics():
        from earthdiagnostics.statistics.monthlypercentile import MonthlyPercentile
        from earthdiagnostics.statistics.climatologicalpercentile import ClimatologicalPercentile
        from earthdiagnostics.statistics.daysoverpercentile import DaysOverPercentile
        from earthdiagnostics.statistics.discretize import Discretize

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(MonthlyPercentile)
        Diagnostic.register(ClimatologicalPercentile)
        Diagnostic.register(DaysOverPercentile)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(Discretize)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def _register_general_diagnostics():
        from earthdiagnostics.general.attribute import Attribute
        from earthdiagnostics.general.timemean import DailyMean, MonthlyMean, YearlyMean
        from earthdiagnostics.general.module import Module
        from earthdiagnostics.general.rewrite import Rewrite
        from earthdiagnostics.general.relink import Relink
        from earthdiagnostics.general.relinkall import RelinkAll
        from earthdiagnostics.general.scale import Scale
        from earthdiagnostics.general.verticalmeanmetersiris import VerticalMeanMetersIris
        from earthdiagnostics.general.simplify_dimensions import SimplifyDimensions

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(DailyMean)
        Diagnostic.register(MonthlyMean)
        Diagnostic.register(YearlyMean)
        Diagnostic.register(Rewrite)
        Diagnostic.register(Relink)
        Diagnostic.register(RelinkAll)
        Diagnostic.register(Scale)
        Diagnostic.register(Attribute)
        Diagnostic.register(Module)
        Diagnostic.register(VerticalMeanMetersIris)
        Diagnostic.register(SimplifyDimensions)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def _register_ocean_diagnostics():
        from earthdiagnostics.ocean.mixedlayerheatcontent import MixedLayerHeatContent
        from earthdiagnostics.ocean.mixedlayersaltcontent import MixedLayerSaltContent
        from earthdiagnostics.ocean.siasiesiv import Siasiesiv
        from earthdiagnostics.ocean.verticalmean import VerticalMean
        from earthdiagnostics.ocean.verticalmeanmeters import VerticalMeanMeters
        from earthdiagnostics.ocean.verticalgradient import VerticalGradient
        from earthdiagnostics.ocean.interpolate import Interpolate
        from earthdiagnostics.ocean.interpolatecdo import InterpolateCDO
        from earthdiagnostics.ocean.moc import Moc
        from earthdiagnostics.ocean.areamoc import AreaMoc
        from earthdiagnostics.ocean.maxmoc import MaxMoc
        from earthdiagnostics.ocean.psi import Psi
        from earthdiagnostics.ocean.gyres import Gyres
        from earthdiagnostics.ocean.convectionsites import ConvectionSites
        from earthdiagnostics.ocean.cutsection import CutSection
        from earthdiagnostics.ocean.averagesection import AverageSection
        from earthdiagnostics.ocean.heatcontentlayer import HeatContentLayer
        from earthdiagnostics.ocean.heatcontent import HeatContent
        from earthdiagnostics.ocean.regionmean import RegionMean
        from earthdiagnostics.ocean.regionsum import RegionSum
        from earthdiagnostics.ocean.rotation import Rotation

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(MixedLayerSaltContent)
        Diagnostic.register(Siasiesiv)
        Diagnostic.register(VerticalMean)
        Diagnostic.register(VerticalMeanMeters)
        Diagnostic.register(Interpolate)
        Diagnostic.register(InterpolateCDO)
        Diagnostic.register(Moc)
        Diagnostic.register(AreaMoc)
        Diagnostic.register(MaxMoc)
        Diagnostic.register(Psi)
        Diagnostic.register(Gyres)
        Diagnostic.register(ConvectionSites)
        Diagnostic.register(CutSection)
        Diagnostic.register(AverageSection)
        Diagnostic.register(MixedLayerHeatContent)
        Diagnostic.register(HeatContentLayer)
        Diagnostic.register(HeatContent)
        Diagnostic.register(RegionMean)
        Diagnostic.register(RegionSum)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Diagnostic.register(Rotation)
        Diagnostic.register(VerticalGradient)
    """
    Download manager for EarthDiagnostics

    We are not using a ThreadPoolExecutor because we want to be able to control priorities in the download
    """
    def __init__(self):
        self._downloads = []
        self._lock = threading.Lock()
        self.stop = False
        """Create the downloader thread and initialize it"""
        self._thread = threading.Thread(target=self._downloader)
        self._thread.start()

    def submit(self, datafile):
        """Add a datafile to the download queue"""
        self._lock.acquire()
        self._downloads.append(datafile)
        self._lock.release()

    def _downloader(self):
                    if not self._downloads or self.on_hold:
                        time.sleep(0.1)
                        continue
                    downloads = self._downloads[:100]
                    downloads.sort(key=cmp_to_key(Downloader._prioritize))
                    datafile = downloads[0]
                    self._downloads.remove(datafile)
                datafile.download()
        except Exception as ex:
            Log.critical('Unhandled error at downloader: {0}\n{1}', ex, traceback.print_exc())
    @staticmethod
    def _suscribers_waiting(datafile):
        waiting = 0
        for diag in datafile.suscribers:
            if not isinstance(diag, Diagnostic):
                continue
            if diag.pending_requests() == 1:
                waiting += 1
        return waiting
    @staticmethod
    def _prioritize(datafile1, datafile2):
        waiting = Downloader._suscribers_waiting(datafile1) - Downloader._suscribers_waiting(datafile2)
        if waiting:
            return -waiting

        suscribers = len(datafile1.suscribers) - len(datafile2.suscribers)
        if suscribers:
            return -suscribers

        if datafile1.size is None:
            if datafile2.size is None:
                return 0
            else:
                return 1
        elif datafile2.size is None:
            return -1
        size = datafile1.size - datafile2.size
        if size:
            return size
        """Stop the downloader after all downloads have finished"""
        self.stop = True
        self._thread.join()