work_manager.py 4.32 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import datetime
import threading

import operator
from time import sleep

from bscearth.utils.log import Log

from diagnostic import DiagnosticStatus
from utils import Utils, TempFile


class WorkManager(object):

    def __init__(self, config, job_list):

        self.pending_jobs = job_list
        self.running_jobs = []
        self.finished_jobs = []
        self.config = config

    def run(self):

        time = datetime.datetime.now()
        Log.info("Starting to compute at {0}", time)
        self.threads = Utils.available_cpu_count()
        if 0 < self.config.max_cores < self.threads:
            self.threads = self.config.max_cores
        Log.info('Using {0} threads', self.threads)

        self.job_runners = [JobRunner() for x in range(0, self.threads)]

        while len(self.pending_jobs) > 0 or len(self.running_jobs) > 0:
            if len(self.pending_jobs) > 0:
                for runner in self.job_runners:
                    if runner.next_job is None:
                        next_job = self.pending_jobs.pop()
                        runner.set_next_job(next_job)
                        self.running_jobs.append(next_job)

            if len(self.running_jobs) > 0:
                for job in self.running_jobs:
                    if job.status in (DiagnosticStatus.COMPLETED, DiagnosticStatus.FAILED):
                        self.finished_jobs.append(job)
                        self.running_jobs.remove(job)
            sleep(0.1)

        TempFile.clean()
        finish_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finish_time)
        Log.result("Elapsed time: {0}\n", finish_time - time)
        self.print_errors()
        self.print_stats()
        return not self.had_errors

    def print_stats(self):
        Log.info('Time consumed by each diagnostic class')
        Log.info('--------------------------------------')
        total = dict()
        for runner in self.job_runners:
            for key, value in runner.time.items():
                if key in total:
                    total[key] += value
                else:
                    total[key] = value
        for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
            Log.info('{0:23} {1:}', diag.__name__, time)

    def print_errors(self):
        failed = [job for job in self.running_jobs if job.status == DiagnosticStatus.FAILED]
        if len(failed) == 0:
            return
        self.had_errors = True
        Log.error('Failed jobs')
        Log.error('-----------')
        for job in failed:
            Log.error(str(job))
        Log.info('')


class JobRunner(object):

    def __init__(self):
        self.next_job = None
        self.current_job = None
        self.time = {}
        self._stop = False
        self._is_running = False
        self.lock = threading.Lock()
        self.lock.acquire()

    def stop(self):
        self._stop = True
        self.lock.release()

    def set_next_job(self, job):
        self.next_job = job
        self.lock.release()

    def is_running(self):
        return self._is_running

    def start(self):
        threading.Thread(target=JobRunner._run_jobs, args=(self,))

    def keep_running(self):
        self.lock.acquire()
        if self.stop:
            return False

        if self.next_job is not None:
            self.current_job = self.next_job
            self.next_job = None

    def _run_jobs(self):
        self._is_running = True
        while self.keep_running():
            self._run_job()
        self._is_running = False
        return

    def _run_job(self):
        try:
            Log.info('Starting {0}', self.current_job)
            self.current_job.status = DiagnosticStatus.RUNNING
            time = datetime.datetime.now()
            self.current_job.compute()
            time = datetime.datetime.now() - time
            if type(self.current_job) in self.time:
                self.time[type(self.current_job)] += time
            else:
                self.time[type(self.current_job)] = time
            Log.result('Finished {0}', self.current_job)
            self.current_job.status = DiagnosticStatus.COMPLETED
            return True
        except Exception as ex:

            self.current_job.message = str(ex)
            Log.error('Job {0} failed: {1}', self.current_job, ex)

    count = 0
    failed_jobs = list()