work_manager.py 2.71 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import datetime

import operator

from bscearth.utils.log import Log

from diagnostic import DiagnosticStatus
from utils import Utils, TempFile
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed


class WorkManager(object):

    def __init__(self, config, job_list):
        self.pending_jobs = job_list
        self.config = config
        self.time = {}
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def run(self):

        time = datetime.datetime.now()
        Log.info("Starting to compute at {0}", time)
        self.threads = Utils.available_cpu_count()
        if 0 < self.config.max_cores < self.threads:
            self.threads = self.config.max_cores
        Log.info('Using {0} threads', self.threads)

        with ThreadPoolExecutor(self.threads) as executor:
            for job in self.pending_jobs:
                executor.submit(self._run_job, job)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        TempFile.clean()
        finish_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finish_time)
        Log.result("Elapsed time: {0}\n", finish_time - time)
        # self.print_errors()
        # self.print_stats()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return not self.had_errors

    def print_stats(self):
        Log.info('Time consumed by each diagnostic class')
        Log.info('--------------------------------------')
        total = dict()
        for runner in self.job_runners:
            for key, value in runner.time.items():
                if key in total:
                    total[key] += value
                else:
                    total[key] = value
        for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
            Log.info('{0:23} {1:}', diag.__name__, time)

    def print_errors(self):
        failed = [job for job in self.running_jobs if job.status == DiagnosticStatus.FAILED]
        if len(failed) == 0:
            return
        self.had_errors = True
        Log.error('Failed jobs')
        Log.error('-----------')
        for job in failed:
            Log.error(str(job))
        Log.info('')

    def _run_job(self, job):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
            Log.info('Starting {0}', job)
            job.status = DiagnosticStatus.RUNNING
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            time = datetime.datetime.now()
            job.compute()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            time = datetime.datetime.now() - time
            if type(job) in self.time:
                self.time[type(job)] += time
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            else:
                self.time[type(job)] = time
            Log.result('Finished {0}', job)
            job.status = DiagnosticStatus.COMPLETED
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            return True
        except Exception as ex:

            job.message = str(ex)
            Log.error('Job {0} failed: {1}', job, ex)
        self.semaphore.release()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    count = 0
    failed_jobs = list()