work_manager.py 3.06 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import datetime

import operator

from bscearth.utils.log import Log

from diagnostic import DiagnosticStatus
from utils import Utils, TempFile
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed


class WorkManager(object):

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def __init__(self, config, job_list, data_manager):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.pending_jobs = job_list
        self.config = config
        self.time = {}
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.data_manager = data_manager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def run(self):

        time = datetime.datetime.now()
        Log.info("Starting to compute at {0}", time)
        self.threads = Utils.available_cpu_count()
        if 0 < self.config.max_cores < self.threads:
            self.threads = self.config.max_cores
        Log.info('Using {0} threads', self.threads)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for job in self.pending_jobs:
            job.request_data()

        self.downloader = ThreadPoolExecutor(1)
        self.uploader = ThreadPoolExecutor(1)
        self.executor = ThreadPoolExecutor(self.threads)

        for file_object in self.data_manager.requested_files.values():
            self.downloader.submit(file_object.download)

        for job in self.pending_jobs:
            self.executor.submit(self._run_job, job)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        TempFile.clean()
        finish_time = datetime.datetime.now()
        Log.result("Diagnostics finished at {0}", finish_time)
        Log.result("Elapsed time: {0}\n", finish_time - time)
        # self.print_errors()
        # self.print_stats()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return not self.had_errors

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    # def print_stats(self):
    #     Log.info('Time consumed by each diagnostic class')
    #     Log.info('--------------------------------------')
    #     total = dict()
    #     for runner in self.job_runners:
    #         for key, value in runner.time.items():
    #             if key in total:
    #                 total[key] += value
    #             else:
    #                 total[key] = value
    #     for diag, time in sorted(total.items(), key=operator.itemgetter(1)):
    #         Log.info('{0:23} {1:}', diag.__name__, time)
    #
    # def print_errors(self):
    #     failed = [job for job in self.running_jobs if job.status == DiagnosticStatus.FAILED]
    #     if len(failed) == 0:
    #         return
    #     self.had_errors = True
    #     Log.error('Failed jobs')
    #     Log.error('-----------')
    #     for job in failed:
    #         Log.error(str(job))
    #     Log.info('')
    def _run_job(self, job):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        try:
            Log.info('Starting {0}', job)
            job.status = DiagnosticStatus.RUNNING
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            time = datetime.datetime.now()
            job.compute()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            time = datetime.datetime.now() - time
            if type(job) in self.time:
                self.time[type(job)] += time
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            else:
                self.time[type(job)] = time
            Log.result('Finished {0}', job)
            job.status = DiagnosticStatus.COMPLETED
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            return True
        except Exception as ex:
            job.message = str(ex)
            Log.error('Job {0} failed: {1}', job, ex)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    count = 0
    failed_jobs = list()