work_manager.py 3.6 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import datetime

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from bscearth.utils.log import Log
from concurrent.futures import ThreadPoolExecutor
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

from diagnostic import DiagnosticStatus
from utils import Utils, TempFile
import threading
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed


class WorkManager(object):

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, config, job_list, data_manager):
        self.jobs = job_list
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.config = config
        self.time = {}
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_manager = data_manager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def run(self):

        time = datetime.datetime.now()
        Log.info("Starting to compute at {0}", time)
        self.threads = Utils.available_cpu_count()
        if 0 < self.config.max_cores < self.threads:
            self.threads = self.config.max_cores
        Log.info('Using {0} threads', self.threads)

        for job in self.jobs:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            job.request_data()
            job.subscribe(self, self._job_status_changed)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        self.downloader = ThreadPoolExecutor(1)
        self.uploader = ThreadPoolExecutor(1)
        self.executor = ThreadPoolExecutor(self.threads)

        for file_object in self.data_manager.requested_files.values():
            self.downloader.submit(file_object.download)

        self.lock = threading.Lock()
        self.lock.acquire()
        self.lock.acquire()

        self.downloader.shutdown(True)
        self.uploader.shutdown(True)
        self.executor.shutdown(True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        TempFile.clean()
        finish_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finish_time)
        Log.result("Elapsed time: {0}\n", finish_time - time)
        self.print_errors()
        self.print_stats()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return not self.had_errors

    def _job_status_changed(self, job, status):
        if status == DiagnosticStatus.READY:
            self.executor.submit(self._run_job, job)
        self.check_completion()

    def check_completion(self):
        if any([job.status not in (DiagnosticStatus.COMPLETED, DiagnosticStatus.FAILED) for job in self.jobs]):
            return
        self.lock.release()

    def print_stats(self):
        Log.info('Time consumed by each diagnostic class')
        Log.info('--------------------------------------')

        times = {}
        for job in self.jobs:
            job_type = job.alias
            if job_type in times.keys():
                times[job_type] += job.consumed_time
            else:
                times[job_type] = job.consumed_time

        for diag in sorted(times, key=operator.itemgetter(1)):
            Log.info('{0:23} {1:}', diag, times[diag])

    def print_errors(self):
        failed = [job for job in self.jobs if job.status == DiagnosticStatus.FAILED]
        if len(failed) == 0:
            return
        self.had_errors = True
        Log.error('Failed jobs')
        Log.error('-----------')
        for job in failed:
            Log.error('{0}: {0.message}', job)
        Log.error('Total wasted time: {0}', sum([job.consumed_time for job in failed], datetime.timedelta()))
        Log.info('')
    def _run_job(self, job):
        Log.info('Starting {0}', job)
        job.status = DiagnosticStatus.RUNNING
        time = datetime.datetime.now()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        except Exception as ex:
            job.consumed_time = datetime.datetime.now() - time
            job.message = str(ex)
            Log.error('Job {0} failed: {1}', job, ex)
            job.status = DiagnosticStatus.FAILED
        job.consumed_time = datetime.datetime.now() - time
        Log.result('Finished {0}', job)
        job.status = DiagnosticStatus.COMPLETED
        return True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    count = 0
    failed_jobs = list()