work_manager.py 3.61 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import datetime

from bscearth.utils.log import Log
from concurrent.futures import ThreadPoolExecutor
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

from diagnostic import DiagnosticStatus
from utils import Utils, TempFile
import threading
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed


class WorkManager(object):

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, config, job_list, data_manager):
        self.jobs = job_list
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.config = config
        self.time = {}
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_manager = data_manager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def run(self):

        time = datetime.datetime.now()
        Log.info("Starting to compute at {0}", time)
        self.threads = Utils.available_cpu_count()
        if 0 < self.config.max_cores < self.threads:
            self.threads = self.config.max_cores
        Log.info('Using {0} threads', self.threads)

        for job in self.jobs:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            job.request_data()
            job.subscribe(self, self._job_status_changed)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        self.downloader = ThreadPoolExecutor(1)
        self.uploader = ThreadPoolExecutor(1)
        self.executor = ThreadPoolExecutor(self.threads)

        for file_object in self.data_manager.requested_files.values():
            self.downloader.submit(file_object.download)

        self.lock = threading.Lock()
        self.lock.acquire()
        self.lock.acquire()

        self.downloader.shutdown(True)
        self.uploader.shutdown(True)
        self.executor.shutdown(True)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        TempFile.clean()
        finish_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finish_time)
        Log.result("Elapsed time: {0}\n", finish_time - time)
        # self.print_errors()
        # self.print_stats()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return not self.had_errors

    def _job_status_changed(self, job, status):
        if status == DiagnosticStatus.READY:
            self.executor.submit(self._run_job, job)
        self.check_completion()

    def check_completion(self):
        if any([job.status not in (DiagnosticStatus.COMPLETED, DiagnosticStatus.FAILED) for job in self.jobs]):
            return
        self.lock.release()

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    # def print_stats(self):
    #     Log.info('Time consumed by each diagnostic class')
    #     Log.info('--------------------------------------')
    #     total = dict()
    #     for runner in self.job_runners:
    #         for key, value in runner.time.items():
    #             if key in total:
    #                 total[key] += value
    #             else:
    #                 total[key] = value
    #     for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
    #         Log.info('{0:23} {1:}', diag.__name__, time)
    #
    # def print_errors(self):
    #     failed = [job for job in self.running_jobs if job.status == DiagnosticStatus.FAILED]
    #     if len(failed) == 0:
    #         return
    #     self.had_errors = True
    #     Log.error('Failed jobs')
    #     Log.error('-----------')
    #     for job in failed:
    #         Log.error(str(job))
    #     Log.info('')
    def _run_job(self, job):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
            Log.info('Starting {0}', job)
            job.status = DiagnosticStatus.RUNNING
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            time = datetime.datetime.now()
            job.compute()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            time = datetime.datetime.now() - time
            if type(job) in self.time:
                self.time[type(job)] += time
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            else:
                self.time[type(job)] = time
            Log.result('Finished {0}', job)
            job.status = DiagnosticStatus.COMPLETED
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            return True
        except Exception as ex:
            job.message = str(ex)
            Log.error('Job {0} failed: {1}', job, ex)
            job.status = DiagnosticStatus.FAILED
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    count = 0
    failed_jobs = list()