diff --git a/VERSION b/VERSION index a95f2884441f750fc59ff84db191aa57f7ba609f..9d086c6dff671494d94c58ed5ddbb74280033537 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.4 +4.1.4 \ No newline at end of file diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index ac2a775d379e846d186815e09c091be1fce8238b..b1a3968b9dd934a56ab35a3f51033c232a9fab64 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1636,6 +1636,8 @@ class Autosubmit: Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers, hold=False) job_list.update_list(as_conf, False) + for job in job_list.get_job_list(): + job.status = Status.WAITING @staticmethod @@ -2011,8 +2013,7 @@ class Autosubmit: exp_history = Autosubmit.get_historical_database(expid, job_list,as_conf) # establish the connection to all platforms # Restore is a missleading, it is actually a "connect" function when the recover flag is not set. - Autosubmit.restore_platforms(platforms_to_test) - + Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf) return job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, False else: return job_list, submitter , None, None, as_conf , platforms_to_test, packages_persistence, True @@ -2193,7 +2194,6 @@ class Autosubmit: Log.printlog("Error trying to store failed job count", Log.WARNING) Log.result("Storing failed job count...done") while not recovery and (recovery_retrials < max_recovery_retrials or max_recovery_retrials <= 0 ): - delay = min(15 * consecutive_retrials, 120) recovery_retrials += 1 sleep(delay) @@ -2261,6 +2261,23 @@ class Autosubmit: Log.result("No more jobs to run.") + + # Wait for all remaining threads of I/O, close remaining connections + # search hint - finished run + Log.info("Waiting for all logs to be updated") + # get all threads + threads = threading.enumerate() + # print name + timeout = as_conf.experiment_data.get("CONFIG",{}).get("LAST_LOGS_TIMEOUT", 180) + for remaining in range(timeout, 0, -1): + if len(job_list.get_completed_without_logs()) == 0: + break + for job in job_list.get_completed_without_logs(): + job_list.update_log_status(job, as_conf) + sleep(1) + if remaining % 10 == 0: + Log.info(f"Timeout: {remaining}") + # Updating job data header with current information when experiment ends try: exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, @@ -2272,22 +2289,7 @@ class Autosubmit: Autosubmit.database_fix(expid) except Exception as e: pass - # Wait for all remaining threads of I/O, close remaining connections - timeout = 0 - active_threads = True - all_threads = threading.enumerate() - while active_threads and timeout <= 180: - active_threads = False - for thread in all_threads: - if "JOB_" in thread.name: - if thread.is_alive(): - active_threads = True - Log.info("{0} is still retrieving outputs, time remaining is {1} seconds.".format( - thread.name, 180 - timeout)) - break - if active_threads: - sleep(10) - timeout += 10 + for platform in platforms_to_test: platform.closeConnection() if len(job_list.get_failed()) > 0: @@ -2327,7 +2329,7 @@ class Autosubmit: for platform in platform_to_test: platform_issues = "" try: - message = platform.test_connection() + message = platform.test_connection(as_conf) if message is None: message = "OK" if message != "OK": @@ -2432,6 +2434,14 @@ class Autosubmit: if error_message != "": raise AutosubmitCritical("Submission Failed due wrong configuration:{0}".format(error_message), 7014) + if not inspect: + for package in valid_packages_to_submit: + wrapper_time = None + for job in package.jobs: # if jobs > 1 == wrapped == same submission time + job.write_submit_time(wrapper_submit_time=wrapper_time) + wrapper_time = job.submit_time_timestamp + + if save_1 or save_2: return True else: @@ -2833,7 +2843,7 @@ class Autosubmit: job.platform = submitter.platforms[job.platform_name] platforms_to_test.add(job.platform) for platform in platforms_to_test: - platform.test_connection() + platform.test_connection(as_conf) for job in current_active_jobs: job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True) @@ -2856,7 +2866,7 @@ class Autosubmit: # noinspection PyTypeChecker platforms_to_test.add(platforms[job.platform_name]) # establish the connection to all platforms - Autosubmit.restore_platforms(platforms_to_test) + Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf) if all_jobs: jobs_to_recover = job_list.get_job_list() @@ -2989,7 +2999,7 @@ class Autosubmit: job.platform_name = as_conf.get_platform() platforms_to_test.add(platforms[job.platform_name]) # establish the connection to all platforms on use - Autosubmit.restore_platforms(platforms_to_test) + Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf) Log.info('Migrating experiment {0}'.format(experiment_id)) Autosubmit._check_ownership(experiment_id, raise_error=True) if submitter.platforms is None: @@ -3206,7 +3216,7 @@ class Autosubmit: backup_files = [] # establish the connection to all platforms on use try: - Autosubmit.restore_platforms(platforms_to_test) + Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf) except AutosubmitCritical as e: raise AutosubmitCritical( e.message + "\nInvalid Remote Platform configuration, recover them manually or:\n 1) Configure platform.yml with the correct info\n 2) autosubmit expid -p --onlyremote", @@ -5401,7 +5411,7 @@ class Autosubmit: definitive_platforms = list() for platform in platforms_to_test: try: - Autosubmit.restore_platforms([platform]) + Autosubmit.restore_platforms([platform],as_conf=as_conf) definitive_platforms.append(platform.name) except Exception as e: pass diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index ee0558edd7e3847440e9aa3432a817157d45cad4..5fd081600ba82f151b9c7e5d42a0b711ce26cf82 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -16,346 +16,363 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import traceback +from time import time, sleep + import autosubmit.history.database_managers.database_models as Models import autosubmit.history.utils as HUtils -from time import time, sleep -from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager -from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR -from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, TwoDimWrapperDistributionStrategy, GeneralizedWrapperDistributionStrategy -from .data_classes.job_data import JobData +from autosubmitconfigparser.config.basicconfig import BasicConfig +from log.log import Log from .data_classes.experiment_run import ExperimentRun -from .platform_monitor.slurm_monitor import SlurmMonitor +from .data_classes.job_data import JobData +from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR +from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager from .internal_logging import Logging -from log.log import Log -from autosubmitconfigparser.config.basicconfig import BasicConfig +from .platform_monitor.slurm_monitor import SlurmMonitor +from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, \ + TwoDimWrapperDistributionStrategy, GeneralizedWrapperDistributionStrategy SECONDS_WAIT_PLATFORM = 60 + class ExperimentHistory: - def __init__(self, expid, jobdata_dir_path=DEFAULT_JOBDATA_DIR, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR): - self.expid = expid - BasicConfig.read() - self._log = Logging(expid, BasicConfig.HISTORICAL_LOG_DIR) - self._job_data_dir_path = BasicConfig.JOBDATA_DIR - self._historiclog_dir_path = BasicConfig.HISTORICAL_LOG_DIR - try: - self.manager = ExperimentHistoryDbManager(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR) - except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - self.manager = None - - def initialize_database(self): - try: - self.manager.initialize() - except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - - self.manager = None - - def is_header_ready(self): - if self.manager: - return self.manager.is_header_ready_db_version() - return False - - - def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", - member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""): - try: - next_counter = self._get_next_counter_by_job_name(job_name) - current_experiment_run = self.manager.get_experiment_run_dc_with_max_id() - job_data_dc = JobData(_id=0, - counter=next_counter, - job_name=job_name, - submit=submit, - status=status, - rowtype=self._get_defined_rowtype(wrapper_code), - ncpus=ncpus, - wallclock=wallclock, - qos=self._get_defined_queue_name(wrapper_queue, wrapper_code, qos), - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - children=children, - run_id=current_experiment_run.run_id) - return self.manager.register_submitted_job_data_dc(job_data_dc) - except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - - return None - - def write_start_time(self, job_name, start=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", - member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""): - try: - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) - if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, - wrapper_code=wrapper_code) - self._log.log("write_start_time {0} start not found.".format(job_name)) - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) - if not job_data_dc_last: - raise Exception("Job {0} has not been found in the database.".format(job_name)) - job_data_dc_last.start = start - job_data_dc_last.qos = self._get_defined_queue_name(wrapper_queue, wrapper_code, qos) - job_data_dc_last.status = status - job_data_dc_last.rowtype = self._get_defined_rowtype(wrapper_code) - job_data_dc_last.job_id = job_id - job_data_dc_last.children = children - return self.manager.update_job_data_dc_by_id(job_data_dc_last) - except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - - - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", - member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, - wrapper_queue=None, wrapper_code=None, children=""): - try: - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) - if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, - wrapper_code=wrapper_code, - children=children) - self._log.log("write_finish_time {0} submit not found.".format(job_name)) - job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) - if not job_data_dc_last: - raise Exception("Job {0} has not been found in the database.".format(job_name)) - job_data_dc_last.finish = finish if finish > 0 else int(time()) - job_data_dc_last.status = status - job_data_dc_last.job_id = job_id - job_data_dc_last.rowstatus = Models.RowStatus.PENDING_PROCESS - job_data_dc_last.out = out_file if out_file else "" - job_data_dc_last.err = err_file if err_file else "" - return self.manager.update_job_data_dc_by_id(job_data_dc_last) - except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - - - def write_platform_data_after_finish(self, job_data_dc, platform_obj): - """ - Call it in a thread. - """ - try: - sleep(SECONDS_WAIT_PLATFORM) - ssh_output = platform_obj.check_job_energy(job_data_dc.job_id) - slurm_monitor = SlurmMonitor(ssh_output) - self._verify_slurm_monitor(slurm_monitor, job_data_dc) - job_data_dcs_in_wrapper = self.manager.get_job_data_dcs_last_by_wrapper_code(job_data_dc.wrapper_code) - job_data_dcs_in_wrapper = sorted([job for job in job_data_dcs_in_wrapper if job.status == "COMPLETED"], key=lambda x: x._id) - job_data_dcs_to_update = [] - if len(job_data_dcs_in_wrapper) > 0: - info_handler = PlatformInformationHandler(StraightWrapperAssociationStrategy(self._historiclog_dir_path)) - job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) - if len(job_data_dcs_to_update) == 0: - info_handler.strategy = TwoDimWrapperDistributionStrategy(self._historiclog_dir_path) - job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) - if len(job_data_dcs_to_update) == 0: - info_handler.strategy = GeneralizedWrapperDistributionStrategy(self._historiclog_dir_path) - job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) - else: - info_handler = PlatformInformationHandler(SingleAssociationStrategy(self._historiclog_dir_path)) - job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) - return self.manager.update_list_job_data_dc_by_each_id(job_data_dcs_to_update) - except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - - - def _verify_slurm_monitor(self, slurm_monitor, job_data_dc): - try: - if slurm_monitor.header.status not in ["COMPLETED", "FAILED"]: - self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input), - "Slurm status {0} is not COMPLETED nor FAILED for ID {1}.\n".format(slurm_monitor.header.status, slurm_monitor.header.name)) - Log.debug(f'Historical Database error: Slurm status {slurm_monitor.header.status} is not COMPLETED nor FAILED for ID {slurm_monitor.header.name}.') - if not slurm_monitor.steps_plus_extern_approximate_header_energy(): - self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input), - "Steps + extern != total energy for ID {0}. Number of steps {1}.\n".format(slurm_monitor.header.name, slurm_monitor.step_count)) - Log.debug(f'Historical Database error: Steps + extern != total energy for ID {slurm_monitor.header.name}. Number of steps {slurm_monitor.step_count}.') - except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - - - def process_status_changes(self, job_list=None, chunk_unit="NA", chunk_size=0, current_config="",create=False): - """ Detect status differences between job_list and current job_data rows, and update. Creates a new run if necessary. """ - try: + def __init__(self, expid, jobdata_dir_path=DEFAULT_JOBDATA_DIR, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR): + self.expid = expid + BasicConfig.read() + self._log = Logging(expid, BasicConfig.HISTORICAL_LOG_DIR) + self._job_data_dir_path = BasicConfig.JOBDATA_DIR + self._historiclog_dir_path = BasicConfig.HISTORICAL_LOG_DIR + try: + self.manager = ExperimentHistoryDbManager(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR) + except Exception as exp: + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + self.manager = None + + def initialize_database(self): + try: + self.manager.initialize() + except Exception as exp: + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + self.manager = None + + def is_header_ready(self): + if self.manager: + return self.manager.is_header_ready_db_version() + return False + + def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", + member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, + wrapper_code=None, children=""): + try: + next_counter = self._get_next_counter_by_job_name(job_name) + current_experiment_run = self.manager.get_experiment_run_dc_with_max_id() + job_data_dc = JobData(_id=0, + counter=next_counter, + job_name=job_name, + submit=submit, + status=status, + rowtype=self._get_defined_rowtype(wrapper_code), + ncpus=ncpus, + wallclock=wallclock, + qos=self._get_defined_queue_name(wrapper_queue, wrapper_code, qos), + date=date, + member=member, + section=section, + chunk=chunk, + platform=platform, + job_id=job_id, + children=children, + run_id=current_experiment_run.run_id) + return self.manager.register_submitted_job_data_dc(job_data_dc) + except Exception as exp: + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + return None + + def write_start_time(self, job_name, start=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", + member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, + children=""): + try: + job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) + if not job_data_dc_last: + job_data_dc_last = self.write_submit_time(job_name=job_name, + status=status, + ncpus=ncpus, + wallclock=wallclock, + qos=qos, + date=date, + member=member, + section=section, + chunk=chunk, + platform=platform, + job_id=job_id, + wrapper_queue=wrapper_queue, + wrapper_code=wrapper_code) + self._log.log("write_start_time {0} start not found.".format(job_name)) + job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) + if not job_data_dc_last: + raise Exception("Job {0} has not been found in the database.".format(job_name)) + job_data_dc_last.start = start + job_data_dc_last.qos = self._get_defined_queue_name(wrapper_queue, wrapper_code, qos) + job_data_dc_last.status = status + job_data_dc_last.rowtype = self._get_defined_rowtype(wrapper_code) + job_data_dc_last.job_id = job_id + job_data_dc_last.children = children + return self.manager.update_job_data_dc_by_id(job_data_dc_last) + except Exception as exp: + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", + member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, + wrapper_queue=None, wrapper_code=None, children=""): + try: + job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) + if not job_data_dc_last: + job_data_dc_last = self.write_submit_time(job_name=job_name, + status=status, + ncpus=ncpus, + wallclock=wallclock, + qos=qos, + date=date, + member=member, + section=section, + chunk=chunk, + platform=platform, + job_id=job_id, + wrapper_queue=wrapper_queue, + wrapper_code=wrapper_code, + children=children) + self._log.log("write_finish_time {0} submit not found.".format(job_name)) + job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) + if not job_data_dc_last: + raise Exception("Job {0} has not been found in the database.".format(job_name)) + job_data_dc_last.finish = finish if finish > 0 else int(time()) + job_data_dc_last.status = status + job_data_dc_last.job_id = job_id + job_data_dc_last.rowstatus = Models.RowStatus.PENDING_PROCESS + job_data_dc_last.out = out_file if out_file else "" + job_data_dc_last.err = err_file if err_file else "" + return self.manager.update_job_data_dc_by_id(job_data_dc_last) + except Exception as exp: + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def write_platform_data_after_finish(self, job_data_dc, platform_obj): + """ + Call it in a thread. + """ + try: + sleep(SECONDS_WAIT_PLATFORM) + ssh_output = platform_obj.check_job_energy(job_data_dc.job_id) + slurm_monitor = SlurmMonitor(ssh_output) + self._verify_slurm_monitor(slurm_monitor, job_data_dc) + job_data_dcs_in_wrapper = self.manager.get_job_data_dcs_last_by_wrapper_code(job_data_dc.wrapper_code) + job_data_dcs_in_wrapper = sorted([job for job in job_data_dcs_in_wrapper if job.status == "COMPLETED"], + key=lambda x: x._id) + job_data_dcs_to_update = [] + if len(job_data_dcs_in_wrapper) > 0: + info_handler = PlatformInformationHandler( + StraightWrapperAssociationStrategy(self._historiclog_dir_path)) + job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, + slurm_monitor) + if len(job_data_dcs_to_update) == 0: + info_handler.strategy = TwoDimWrapperDistributionStrategy(self._historiclog_dir_path) + job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, + slurm_monitor) + if len(job_data_dcs_to_update) == 0: + info_handler.strategy = GeneralizedWrapperDistributionStrategy(self._historiclog_dir_path) + job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, + slurm_monitor) + else: + info_handler = PlatformInformationHandler(SingleAssociationStrategy(self._historiclog_dir_path)) + job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, + slurm_monitor) + return self.manager.update_list_job_data_dc_by_each_id(job_data_dcs_to_update) + except Exception as exp: + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def _verify_slurm_monitor(self, slurm_monitor, job_data_dc): + try: + if slurm_monitor.header.status not in ["COMPLETED", "FAILED"]: + self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, + slurm_monitor.original_input), + "Slurm status {0} is not COMPLETED nor FAILED for ID {1}.\n".format( + slurm_monitor.header.status, slurm_monitor.header.name)) + Log.debug( + f'Historical Database error: Slurm status {slurm_monitor.header.status} is not COMPLETED nor FAILED for ID {slurm_monitor.header.name}.') + if not slurm_monitor.steps_plus_extern_approximate_header_energy(): + self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, + slurm_monitor.original_input), + "Steps + extern != total energy for ID {0}. Number of steps {1}.\n".format( + slurm_monitor.header.name, slurm_monitor.step_count)) + Log.debug( + f'Historical Database error: Steps + extern != total energy for ID {slurm_monitor.header.name}. Number of steps {slurm_monitor.step_count}.') + except Exception as exp: + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def process_status_changes(self, job_list=None, chunk_unit="NA", chunk_size=0, current_config="", create=False): + """ Detect status differences between job_list and current job_data rows, and update. Creates a new run if necessary. """ + try: + try: + current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() + update_these_changes = self._get_built_list_of_changes(job_list) + except: + current_experiment_run_dc = 0 + update_these_changes = [] + # ("no runs") + should_create_new_run = self.should_we_create_a_new_run(job_list, len(update_these_changes), + current_experiment_run_dc, chunk_unit, chunk_size, + create) + if len(update_these_changes) > 0 and should_create_new_run == False: + self.manager.update_many_job_data_change_status(update_these_changes) + if should_create_new_run: + return self.create_new_experiment_run(chunk_unit, chunk_size, current_config, job_list) + return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list) + except Exception as exp: + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def _get_built_list_of_changes(self, job_list): + """ Return: List of (current timestamp, current datetime str, status, rowstatus, id in job_data). One tuple per change. """ + job_data_dcs = self.detect_changes_in_job_list(job_list) + return [(HUtils.get_current_datetime(), job.status, Models.RowStatus.CHANGED, job._id) for job in job_data_dcs] + + def process_job_list_changes_to_experiment_totals(self, job_list=None): + """ Updates current experiment_run row with totals calculated from job_list. """ + try: + current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() + return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list) + except Exception as exp: + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def should_we_create_a_new_run(self, job_list, changes_count, current_experiment_run_dc, new_chunk_unit, + new_chunk_size, create=False): + if create: + return True + elif not create and self.expid[0].lower() != "t": + if len(job_list) != current_experiment_run_dc.total: + return True + if changes_count > int(self._get_date_member_completed_count(job_list)): + return True + return self._chunk_config_has_changed(current_experiment_run_dc, new_chunk_unit, new_chunk_size) + + def _chunk_config_has_changed(self, current_exp_run_dc, new_chunk_unit, new_chunk_size): + if not current_exp_run_dc: + return True + if current_exp_run_dc.chunk_unit != new_chunk_unit or current_exp_run_dc.chunk_size != new_chunk_size: + return True + return False + + def update_counts_on_experiment_run_dc(self, experiment_run_dc, job_list=None): + """ Return updated row as Models.ExperimentRun. """ + status_counts = self.get_status_counts_from_job_list(job_list) + experiment_run_dc.completed = status_counts[HUtils.SupportedStatus.COMPLETED] + experiment_run_dc.failed = status_counts[HUtils.SupportedStatus.FAILED] + experiment_run_dc.queuing = status_counts[HUtils.SupportedStatus.QUEUING] + experiment_run_dc.submitted = status_counts[HUtils.SupportedStatus.SUBMITTED] + experiment_run_dc.running = status_counts[HUtils.SupportedStatus.RUNNING] + experiment_run_dc.suspended = status_counts[HUtils.SupportedStatus.SUSPENDED] + experiment_run_dc.total = status_counts["TOTAL"] + return self.manager.update_experiment_run_dc_by_id(experiment_run_dc) + + def finish_current_experiment_run(self): + if self.manager.is_there_a_last_experiment_run(): current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() - update_these_changes = self._get_built_list_of_changes(job_list) - except: - current_experiment_run_dc = 0 - update_these_changes = [] - #("no runs") - should_create_new_run = self.should_we_create_a_new_run(job_list, len(update_these_changes), current_experiment_run_dc, chunk_unit, chunk_size,create) - if len(update_these_changes) > 0 and should_create_new_run == False: - self.manager.update_many_job_data_change_status(update_these_changes) - if should_create_new_run: - return self.create_new_experiment_run(chunk_unit, chunk_size, current_config, job_list) - return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list) - except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - - - def _get_built_list_of_changes(self, job_list): - """ Return: List of (current timestamp, current datetime str, status, rowstatus, id in job_data). One tuple per change. """ - job_data_dcs = self.detect_changes_in_job_list(job_list) - return [(HUtils.get_current_datetime(), job.status, Models.RowStatus.CHANGED, job._id) for job in job_data_dcs] - - def process_job_list_changes_to_experiment_totals(self, job_list=None): - """ Updates current experiment_run row with totals calculated from job_list. """ - try: - current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() - return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list) - except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') - - - def should_we_create_a_new_run(self, job_list, changes_count, current_experiment_run_dc, new_chunk_unit, new_chunk_size,create=False): - if create: - return True - elif not create and self.expid[0].lower() != "t": - if len(job_list) != current_experiment_run_dc.total: - return True - if changes_count > int(self._get_date_member_completed_count(job_list)): - return True - return self._chunk_config_has_changed(current_experiment_run_dc, new_chunk_unit, new_chunk_size) - - def _chunk_config_has_changed(self, current_exp_run_dc, new_chunk_unit, new_chunk_size): - if not current_exp_run_dc: - return True - if current_exp_run_dc.chunk_unit != new_chunk_unit or current_exp_run_dc.chunk_size != new_chunk_size: - return True - return False - - def update_counts_on_experiment_run_dc(self, experiment_run_dc, job_list=None): - """ Return updated row as Models.ExperimentRun. """ - status_counts = self.get_status_counts_from_job_list(job_list) - experiment_run_dc.completed = status_counts[HUtils.SupportedStatus.COMPLETED] - experiment_run_dc.failed = status_counts[HUtils.SupportedStatus.FAILED] - experiment_run_dc.queuing = status_counts[HUtils.SupportedStatus.QUEUING] - experiment_run_dc.submitted = status_counts[HUtils.SupportedStatus.SUBMITTED] - experiment_run_dc.running = status_counts[HUtils.SupportedStatus.RUNNING] - experiment_run_dc.suspended = status_counts[HUtils.SupportedStatus.SUSPENDED] - experiment_run_dc.total = status_counts["TOTAL"] - return self.manager.update_experiment_run_dc_by_id(experiment_run_dc) - - def finish_current_experiment_run(self): - if self.manager.is_there_a_last_experiment_run(): - current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() - current_experiment_run_dc.finish = int(time()) - return self.manager.update_experiment_run_dc_by_id(current_experiment_run_dc) - return None - - def create_new_experiment_run(self, chunk_unit="NA", chunk_size=0, current_config="", job_list=None): - """ Also writes the finish timestamp of the previous run. """ - self.finish_current_experiment_run() - return self._create_new_experiment_run_dc_with_counts(chunk_unit=chunk_unit, chunk_size=chunk_size, current_config=current_config, job_list=job_list) - - def _create_new_experiment_run_dc_with_counts(self, chunk_unit, chunk_size, current_config="", job_list=None): - """ Create new experiment_run row and return the new Models.ExperimentRun data class from database. """ - status_counts = self.get_status_counts_from_job_list(job_list) - experiment_run_dc = ExperimentRun(0, - chunk_unit=chunk_unit, - chunk_size=chunk_size, - metadata=current_config, - start=int(time()), - completed=status_counts[HUtils.SupportedStatus.COMPLETED], - total=status_counts["TOTAL"], - failed=status_counts[HUtils.SupportedStatus.FAILED], - queuing=status_counts[HUtils.SupportedStatus.QUEUING], - running=status_counts[HUtils.SupportedStatus.RUNNING], - submitted=status_counts[HUtils.SupportedStatus.SUBMITTED], - suspended=status_counts[HUtils.SupportedStatus.SUSPENDED]) - return self.manager.register_experiment_run_dc(experiment_run_dc) - - def detect_changes_in_job_list(self, job_list): - """ Detect changes in job_list compared to the current contents of job_data table. Returns a list of JobData data classes where the status of each item is the new status.""" - job_name_to_job = {str(job.name): job for job in job_list} - current_job_data_dcs = self.manager.get_all_last_job_data_dcs() - differences = [] - for job_dc in current_job_data_dcs: - if job_dc.job_name in job_name_to_job: - if job_dc.status != job_name_to_job[job_dc.job_name].status_str: - if not (job_dc.status in ["COMPLETED", "FAILED"] and job_name_to_job[job_dc.job_name].status_str in ["WAITING", "READY"]): - # If the job is not changing from a finalized status to a starting status - job_dc.status = job_name_to_job[job_dc.job_name].status_str - differences.append(job_dc) - return differences - - def _get_defined_rowtype(self, code): - if code: - return code - else: - return Models.RowType.NORMAL - - def _get_defined_queue_name(self, wrapper_queue, wrapper_code, qos): - if wrapper_code and wrapper_code > 2 and wrapper_queue is not None and len(str(wrapper_queue)) > 0: - return wrapper_queue - return qos - - def _get_next_counter_by_job_name(self, job_name): - """ Return the counter attribute from the latest job data row by job_name. """ - job_data_dc = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) - max_counter = self.manager.get_job_data_max_counter() - if job_data_dc: - return max(max_counter, job_data_dc.counter + 1) - else: - return max_counter - - def _get_date_member_completed_count(self, job_list): - """ Each item in the job_list must have attributes: date, member, status_str. """ - job_list = job_list if job_list else [] - return sum(1 for job in job_list if job.date is not None and job.member is not None and job.status_str == HUtils.SupportedStatus.COMPLETED) - - def get_status_counts_from_job_list(self, job_list): - """ - Return dict with keys COMPLETED, FAILED, QUEUING, SUBMITTED, RUNNING, SUSPENDED, TOTAL. - """ - result = { - HUtils.SupportedStatus.COMPLETED: 0, - HUtils.SupportedStatus.FAILED: 0, - HUtils.SupportedStatus.QUEUING: 0, - HUtils.SupportedStatus.SUBMITTED: 0, - HUtils.SupportedStatus.RUNNING: 0, - HUtils.SupportedStatus.SUSPENDED: 0, - "TOTAL": 0 - } - - if not job_list: - job_list = [] - - for job in job_list: - if job.status_str in result: - result[job.status_str] += 1 - result["TOTAL"] = len(job_list) - return result + current_experiment_run_dc.finish = int(time()) + return self.manager.update_experiment_run_dc_by_id(current_experiment_run_dc) + return None + + def create_new_experiment_run(self, chunk_unit="NA", chunk_size=0, current_config="", job_list=None): + """ Also writes the finish timestamp of the previous run. """ + self.finish_current_experiment_run() + return self._create_new_experiment_run_dc_with_counts(chunk_unit=chunk_unit, chunk_size=chunk_size, + current_config=current_config, job_list=job_list) + + def _create_new_experiment_run_dc_with_counts(self, chunk_unit, chunk_size, current_config="", job_list=None): + """ Create new experiment_run row and return the new Models.ExperimentRun data class from database. """ + status_counts = self.get_status_counts_from_job_list(job_list) + experiment_run_dc = ExperimentRun(0, + chunk_unit=chunk_unit, + chunk_size=chunk_size, + metadata=current_config, + start=int(time()), + completed=status_counts[HUtils.SupportedStatus.COMPLETED], + total=status_counts["TOTAL"], + failed=status_counts[HUtils.SupportedStatus.FAILED], + queuing=status_counts[HUtils.SupportedStatus.QUEUING], + running=status_counts[HUtils.SupportedStatus.RUNNING], + submitted=status_counts[HUtils.SupportedStatus.SUBMITTED], + suspended=status_counts[HUtils.SupportedStatus.SUSPENDED]) + return self.manager.register_experiment_run_dc(experiment_run_dc) + + def detect_changes_in_job_list(self, job_list): + """ Detect changes in job_list compared to the current contents of job_data table. Returns a list of JobData data classes where the status of each item is the new status.""" + job_name_to_job = {str(job.name): job for job in job_list} + current_job_data_dcs = self.manager.get_all_last_job_data_dcs() + differences = [] + for job_dc in current_job_data_dcs: + if job_dc.job_name in job_name_to_job: + if job_dc.status != job_name_to_job[job_dc.job_name].status_str: + if not (job_dc.status in ["COMPLETED", "FAILED"] and job_name_to_job[ + job_dc.job_name].status_str in ["WAITING", "READY"]): + # If the job is not changing from a finalized status to a starting status + job_dc.status = job_name_to_job[job_dc.job_name].status_str + differences.append(job_dc) + return differences + + def _get_defined_rowtype(self, code): + if code: + return code + else: + return Models.RowType.NORMAL + + def _get_defined_queue_name(self, wrapper_queue, wrapper_code, qos): + if wrapper_code and wrapper_code > 2 and wrapper_queue is not None and len(str(wrapper_queue)) > 0: + return wrapper_queue + return qos + + def _get_next_counter_by_job_name(self, job_name): + """ Return the counter attribute from the latest job data row by job_name. """ + job_data_dc = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) + max_counter = self.manager.get_job_data_max_counter() + if job_data_dc: + return max(max_counter, job_data_dc.counter + 1) + else: + return max_counter + + def _get_date_member_completed_count(self, job_list): + """ Each item in the job_list must have attributes: date, member, status_str. """ + job_list = job_list if job_list else [] + return sum(1 for job in job_list if + job.date is not None and job.member is not None and job.status_str == HUtils.SupportedStatus.COMPLETED) + + def get_status_counts_from_job_list(self, job_list): + """ + Return dict with keys COMPLETED, FAILED, QUEUING, SUBMITTED, RUNNING, SUSPENDED, TOTAL. + """ + result = { + HUtils.SupportedStatus.COMPLETED: 0, + HUtils.SupportedStatus.FAILED: 0, + HUtils.SupportedStatus.QUEUING: 0, + HUtils.SupportedStatus.SUBMITTED: 0, + HUtils.SupportedStatus.RUNNING: 0, + HUtils.SupportedStatus.SUSPENDED: 0, + "TOTAL": 0 + } + + if not job_list: + job_list = [] + + for job in job_list: + if job.status_str in result: + result[job.status_str] += 1 + result["TOTAL"] = len(job_list) + return result diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index bed0521b36fbeb599a39b89910dda5fbb9418f45..bb6e3244b7e341d165df6c8ea002a778f4faf10e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -242,7 +242,12 @@ class Job(object): self.delete_when_edgeless = False # hetjobs self.het = None - + self.updated_log = False + self.ready_start_date = None + self.log_retrieved = False + self.start_time_written = False + self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time + self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial def _init_runtime_parameters(self): # hetjobs self.het = {'HETSIZE': 0} @@ -255,6 +260,8 @@ class Job(object): self._processors = '1' self._memory = '' self._memory_per_task = '' + self.log_retrieved = False + self.start_time_placeholder = "" @property @autosubmit_parameter(name='tasktype') @@ -902,7 +909,7 @@ class Job(object): """ return self.parents.__len__() - def _get_from_stat(self, index): + def _get_from_stat(self, index, fail_count =-1): """ Returns value from given row index position in STAT file associated to job @@ -911,7 +918,11 @@ class Job(object): :return: value in index position :rtype: int """ - logname = os.path.join(self._tmp_path, self.name + '_STAT') + if fail_count == -1: + logname = os.path.join(self._tmp_path, self.name + '_STAT') + else: + fail_count = str(fail_count) + logname = os.path.join(self._tmp_path, self.name + '_STAT_' + fail_count) if os.path.exists(logname): lines = open(logname).readlines() if len(lines) >= index + 1: @@ -941,23 +952,23 @@ class Job(object): lst.append(parse_date(fields[index])) return lst - def check_end_time(self): + def check_end_time(self, fail_count=-1): """ Returns end time from stat file :return: date and time :rtype: str """ - return self._get_from_stat(1) + return self._get_from_stat(1, fail_count) - def check_start_time(self): + def check_start_time(self, fail_count=-1): """ Returns job's start time :return: start time :rtype: str """ - return self._get_from_stat(0) + return self._get_from_stat(0,fail_count) def check_retrials_end_time(self): """ @@ -1003,220 +1014,108 @@ class Job(object): retrials_list.insert(0, retrial_dates) return retrials_list - def retrieve_logfiles_unthreaded(self, copy_remote_logs, local_logs): - remote_logs = (self.script_name + ".out."+str(self.fail_count), self.script_name + ".err."+str(self.fail_count)) - out_exist = False - err_exist = False - retries = 3 - sleeptime = 0 - i = 0 - no_continue = False - try: - while (not out_exist and not err_exist) and i < retries: - try: - out_exist = self._platform.check_file_exists( - remote_logs[0], True) - except IOError as e: - out_exist = False - try: - err_exist = self._platform.check_file_exists( - remote_logs[1], True) - except IOError as e: - err_exists = False - if not out_exist or not err_exist: - sleeptime = sleeptime + 5 - i = i + 1 - sleep(sleeptime) - if i >= retries: - if not out_exist or not err_exist: - Log.printlog("Failed to retrieve log files {1} and {2} e=6001".format( - retries, remote_logs[0], remote_logs[1])) - return - if str(copy_remote_logs).lower() == "true": - # unifying names for log files - if remote_logs != local_logs: - self.synchronize_logs( - self._platform, remote_logs, local_logs) - remote_logs = copy.deepcopy(local_logs) - self._platform.get_logs_files(self.expid, remote_logs) - # Update the logs with Autosubmit Job ID Brand - try: - for local_log in local_logs: - self._platform.write_jobid(self.id, os.path.join( - self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( - str(e), self.name)) - except AutosubmitError as e: - Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( - str(e), self.name), 6001) - except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error - Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( - str(e), self.name), 6001) - return - - @threaded - def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = 0,job_id="",auth_password=None, local_auth_password = None): - as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) - as_conf.reload(force_load=True) - max_retrials = self.retrials - max_logs = 0 - last_log = 0 - stat_file = self.script_name[:-4] + "_STAT_" - lang = locale.getlocale()[1] - if lang is None: - lang = locale.getdefaultlocale()[1] - if lang is None: - lang = 'UTF-8' - retries = 2 - count = 0 - success = False - error_message = "" - platform = None - while (count < retries) and not success: - try: - as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) - as_conf.reload(force_load=True) - max_retrials = self.retrials - max_logs = int(max_retrials) - fail_count - last_log = int(max_retrials) - fail_count - submitter = self._get_submitter(as_conf) - submitter.load_platforms(as_conf, auth_password=auth_password, local_auth_password=local_auth_password) - platform = submitter.platforms[platform_name] - platform.test_connection() - success = True - except BaseException as e: - error_message = str(e) - sleep(5) - pass - count = count + 1 - if not success: - raise AutosubmitError( - "Couldn't load the autosubmit platforms, seems that the local platform has some issue\n:{0}".format( - error_message), 6006) + def get_new_remotelog_name(self, count = -1): + """ + Checks if remote log file exists on remote host + if it exists, remote_log variable is updated + :param + """ + if count == -1: + count = self._fail_count try: - if self.wrapper_type is not None and self.wrapper_type == "vertical": - found = False - retrials = 0 - while retrials < 3 and not found: - if platform.check_stat_file_by_retrials(stat_file + str(max_logs)): - found = True - retrials = retrials + 1 - for i in range(max_logs-1,-1,-1): - if platform.check_stat_file_by_retrials(stat_file + str(i)): - last_log = i - else: - break - remote_logs = (self.script_name + ".out." + str(last_log), self.script_name + ".err." + str(last_log)) - - else: - remote_logs = (self.script_name + ".out."+str(fail_count), self.script_name + ".err." + str(fail_count)) - + remote_logs = (f"{self.script_name}.out.{count}", f"{self.script_name}.err.{count}") except BaseException as e: - Log.printlog( - "{0} \n Couldn't connect to the remote platform for {1} job err/out files. ".format(str(e), self.name), 6001) - out_exist = False - err_exist = False - retries = 3 - i = 0 + remote_logs = "" + Log.printlog(f"Trace {e} \n Failed to retrieve log file for job {self.name}", 6000) + return remote_logs + + def check_remote_log_exists(self, platform): try: - while (not out_exist and not err_exist) and i < retries: - try: - out_exist = platform.check_file_exists( - remote_logs[0], False, sleeptime=0, max_retries=1) - except IOError as e: - out_exist = False + out_exist = platform.check_file_exists(self.remote_logs[0], False, sleeptime=0, max_retries=1) + except IOError: + Log.debug(f'Output log {self.remote_logs[0]} still does not exist') + out_exist = False + try: + err_exist = platform.check_file_exists(self.remote_logs[1], False, sleeptime=0, max_retries=1) + except IOError: + Log.debug(f'Error log {self.remote_logs[1]} still does not exist') + err_exist = False + return out_exist or err_exist + + def retrieve_external_retrials_logfiles(self, platform): + log_retrieved = False + self.remote_logs = self.get_new_remotelog_name() + if not self.remote_logs: + self.log_retrieved = False + else: + if self.check_remote_log_exists(platform): try: - err_exist = platform.check_file_exists( - remote_logs[1], False, sleeptime=0, max_retries=1) - except IOError as e: - err_exist = False - if not out_exist or not err_exist: - i = i + 1 - sleep(5) + self.synchronize_logs(platform, self.remote_logs, self.local_logs) + remote_logs = copy.deepcopy(self.local_logs) + platform.get_logs_files(self.expid, remote_logs) + log_retrieved = True + except BaseException: + log_retrieved = False + self.log_retrieved = log_retrieved + + def retrieve_internal_retrials_logfiles(self, platform): + log_retrieved = False + original = copy.deepcopy(self.local_logs) + for i in range(0, int(self.retrials + 1)): + if i > 0: + self.local_logs = (original[0][:-4] + "_{0}".format(i) + ".out", original[1][:-4] + "_{0}".format(i) + ".err") + self.remote_logs = self.get_new_remotelog_name(i) + if not self.remote_logs: + self.log_retrieved = False + else: + if self.check_remote_log_exists(platform): try: - platform.restore_connection() - except BaseException as e: - Log.printlog("{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format( - str(e), self.name), 6001) - if i >= retries: - if not out_exist or not err_exist: - Log.printlog("Failed to retrieve log files {1} and {2} e=6001".format( - retries, remote_logs[0], remote_logs[1])) - return - if copy_remote_logs: - l_log = copy.deepcopy(local_logs) - # unifying names for log files - if remote_logs != local_logs: - if self.wrapper_type == "vertical": # internal_Retrial mechanism - log_start = last_log - exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) - tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) - time_stamp = "1970" - total_stats = ["", "","FAILED"] - while log_start <= max_logs: - try: - if platform.get_stat_file_by_retrials(stat_file+str(max_logs)): - with open(os.path.join(tmp_path,stat_file+str(max_logs)), 'r+') as f: - total_stats = [f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]] - try: - total_stats[0] = float(total_stats[0]) - total_stats[1] = float(total_stats[1]) - except Exception as e: - total_stats[0] = int(str(total_stats[0]).split('.')[0]) - total_stats[1] = int(str(total_stats[1]).split('.')[0]) - if max_logs != ( int(max_retrials) - fail_count ): - time_stamp = date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S') - else: - with open(os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP'), 'rb+') as f2: - for line in f2.readlines(): - if len(line) > 0: - line = line.decode(lang) - time_stamp = line.split(" ")[0] - - self.write_total_stat_by_retries(total_stats,max_logs == ( int(max_retrials) - fail_count )) - platform.remove_stat_file_by_retrials(stat_file+str(max_logs)) - l_log = (self.script_name[:-4] + "." + time_stamp + ".out",self.script_name[:-4] + "." + time_stamp + ".err") - r_log = ( remote_logs[0][:-1]+str(max_logs) , remote_logs[1][:-1]+str(max_logs) ) - self.synchronize_logs(platform, r_log, l_log,last = False) - platform.get_logs_files(self.expid, l_log) - try: - for local_log in l_log: - platform.write_jobid(job_id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - pass - max_logs = max_logs - 1 - else: - max_logs = -1 # exit, no more logs - except BaseException as e: - max_logs = -1 # exit - local_logs = copy.deepcopy(l_log) - remote_logs = copy.deepcopy(local_logs) - if self.wrapper_type != "vertical": - self.synchronize_logs(platform, remote_logs, local_logs) - remote_logs = copy.deepcopy(local_logs) + self.synchronize_logs(platform, self.remote_logs, self.local_logs) + remote_logs = copy.deepcopy(self.local_logs) platform.get_logs_files(self.expid, remote_logs) - # Update the logs with Autosubmit Job ID Brand - try: - for local_log in local_logs: - platform.write_jobid(job_id, os.path.join( - self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format( - str(e), self.name)) - with suppress(Exception): - platform.closeConnection() - except AutosubmitError as e: - Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format( - e.message, self.name), 6001) - with suppress(Exception): - platform.closeConnection() - except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error - Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( - e.message, self.name), 6001) - with suppress(Exception): - platform.closeConnection() - return + log_retrieved = True + except BaseException: + log_retrieved = False + self.log_retrieved = log_retrieved + def retrieve_logfiles(self, platform, raise_error=False): + """ + Retrieves log files from remote host meant to be used inside a process. + :param platform: platform that is calling the function, already connected. + :param raise_error: boolean to raise an error if the logs are not retrieved + :return: + """ + backup_logname = copy.copy(self.local_logs) + + if self.wrapper_type == "vertical": + stat_file = self.script_name[:-4] + "_STAT_" + self.retrieve_internal_retrials_logfiles(platform) + else: + stat_file = self.script_name[:-4] + "_STAT" + self.retrieve_external_retrials_logfiles(platform) + + if not self.log_retrieved: + self.local_logs = backup_logname + Log.printlog("Failed to retrieve logs for job {0}".format(self.name), 6000) + if raise_error: + raise + else: + # Update the logs with Autosubmit Job ID Brand + try: + for local_log in self.local_logs: + platform.write_jobid(self.id, os.path.join( + self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name)) + # write stats + if self.wrapper_type == "vertical": # Disable AS retrials for vertical wrappers to use internal ones + for i in range(0,int(self.retrials+1)): + if self.platform.get_stat_file(self.name, stat_file, count=i): + self.write_vertical_time(i) + self.inc_fail_count() + else: + self.platform.get_stat_file(self.name, stat_file) + self.write_start_time(from_stat_file=True) + self.write_end_time(self.status == Status.COMPLETED) def parse_time(self,wallclock): regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') @@ -1272,7 +1171,7 @@ class Job(object): :param failed_file: boolean, if True, checks if the job failed :return: """ - copy_remote_logs = as_conf.get_copy_remote_logs() + self.log_avaliable = False previous_status = self.status self.prev_status = previous_status new_status = self.new_status @@ -1320,28 +1219,23 @@ class Job(object): # after checking the jobs , no job should have the status "submitted" Log.printlog("Job {0} in SUBMITTED status. This should never happen on this step..".format( self.name), 6008) - if previous_status != Status.RUNNING and self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN, - Status.RUNNING]: - self.write_start_time() - if previous_status == Status.HELD and self.status in [Status.SUBMITTED, Status.QUEUING, Status.RUNNING]: - self.write_submit_time() + if self.status in [Status.COMPLETED, Status.FAILED]: + self.updated_log = False + + # # Write start_time() if not already written and job is running, completed or failed + # if self.status in [Status.RUNNING, Status.COMPLETED, Status.FAILED] and not self.start_time_written: + # self.write_start_time() + # Updating logs if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: - # New thread, check if file exist - expid = copy.deepcopy(self.expid) - platform_name = copy.deepcopy(self.platform_name) - local_logs = copy.deepcopy(self.local_logs) - remote_logs = copy.deepcopy(self.remote_logs) - if as_conf.get_disable_recovery_threads(self.platform.name) == "true": - self.retrieve_logfiles_unthreaded(copy_remote_logs, local_logs) + if str(as_conf.platforms_data.get(self.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "true": + self.retrieve_logfiles(self.platform) else: - self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = copy.copy(self.fail_count),job_id=self.id,auth_password=self._platform.pw, local_auth_password=self._platform.pw) - if self.wrapper_type == "vertical": - max_logs = int(self.retrials) - for i in range(0,max_logs): - self.inc_fail_count() - else: - self.write_end_time(self.status == Status.COMPLETED) + self.platform.add_job_to_log_recover(self) + + + + return self.status @staticmethod @@ -1735,6 +1629,14 @@ class Job(object): def update_dict_parameters(self,as_conf): self.retrials = as_conf.jobs_data.get(self.section,{}).get("RETRIALS", as_conf.experiment_data.get("CONFIG",{}).get("RETRIALS", 0)) + for wrapper_data in ( wrapper for wrapper in as_conf.experiment_data.get("WRAPPERS",{}).values() if type(wrapper) is dict): + jobs_in_wrapper = wrapper_data.get("JOBS_IN_WRAPPER", "").upper() + if "," in jobs_in_wrapper: + jobs_in_wrapper = jobs_in_wrapper.split(",") + else: + jobs_in_wrapper = jobs_in_wrapper.split(" ") + if self.section.upper() in jobs_in_wrapper: + self.retrials = wrapper_data.get("RETRIALS", self.retrials) self.splits = as_conf.jobs_data.get(self.section,{}).get("SPLITS", None) self.delete_when_edgeless = as_conf.jobs_data.get(self.section,{}).get("DELETE_WHEN_EDGELESS", True) self.dependencies = str(as_conf.jobs_data.get(self.section,{}).get("DEPENDENCIES","")) @@ -2237,76 +2139,65 @@ class Job(object): str(set(parameters) - set(variables))), 5013) return out - def write_submit_time(self, enabled=False, hold=False): + def write_submit_time(self, hold=False, enable_vertical_write=False, wrapper_submit_time=None): # type: (bool, bool) -> None """ Writes submit date and time to TOTAL_STATS file. It doesn't write if hold is True. """ - # print(traceback.format_stack()) + + self.start_time_written = False + if not enable_vertical_write: + if wrapper_submit_time: + self.submit_time_timestamp = wrapper_submit_time + else: + self.submit_time_timestamp = date2str(datetime.datetime.now(), 'S') + if self.wrapper_type != "vertical": + self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials + else: + self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", + f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials + return + if self.wrapper_type == "vertical" and self.fail_count > 0: + self.submit_time_timestamp = self.finish_time_timestamp print(("Call from {} with status {}".format(self.name, self.status_str))) if hold is True: return # Do not write for HELD jobs. - data_time = ["",time.time()] - if self.wrapper_type != "vertical" or enabled: - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - else: - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP') + + data_time = ["",int(datetime.datetime.strptime(self.submit_time_timestamp, "%Y%m%d%H%M%S").timestamp())] + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') if os.path.exists(path): f = open(path, 'a') f.write('\n') else: f = open(path, 'w') - if not enabled: - f.write(date2str(datetime.datetime.now(), 'S')) - if self.wrapper_type == "vertical": - f.write(" "+str(time.time())) - else: - path2 = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP') - f2 = open(path2, 'r') - for line in f2.readlines(): - if len(line) > 0: - data_time = line.split(" ") - try: - data_time[1] = float(data_time[1]) - except Exception as e: - data_time[1] = int(data_time[1]) - f.write(data_time[0]) - f2.close() - try: - os.remove(path2) - except Exception as e: - pass - # Get + f.write(self.submit_time_timestamp) + # Writing database - if self.wrapper_type != "vertical" or enabled: - exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.write_submit_time(self.name, submit=data_time[1], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), - children=self.children_names_str) + exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + exp_history.write_submit_time(self.name, submit=data_time[1], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, + wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, + platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), + children=self.children_names_str) - def write_start_time(self, enabled = False): + def write_start_time(self, enable_vertical_write=False, from_stat_file=False, count=-1): """ Writes start date and time to TOTAL_STATS file :return: True if successful, False otherwise :rtype: bool """ - timestamp = date2str(datetime.datetime.now(), 'S') - self.local_logs = (f"{self.name}.{timestamp}.out", f"{self.name}.{timestamp}.err") + if not enable_vertical_write and self.wrapper_type == "vertical": + return - if self.wrapper_type != "vertical" or enabled: - if self._platform.get_stat_file(self.name, retries=5): #fastlook - start_time = self.check_start_time() + self.start_time_written = True + if not from_stat_file: # last known start time from AS + self.start_time_placeholder = time.time() + elif from_stat_file: + start_time_ = self.check_start_time(count) # last known start time from the .cmd file + if start_time_: + start_time = start_time_ else: - Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format( - self.name), 3000) - start_time = time.time() - timestamp = date2str(datetime.datetime.now(), 'S') - - self.local_logs = (self.name + "." + timestamp + - ".out", self.name + "." + timestamp + ".err") - + start_time = self.start_time_placeholder if self.start_time_placeholder else time.time() path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') f.write(' ') @@ -2320,52 +2211,58 @@ class Job(object): children=self.children_names_str) return True - def write_end_time(self, completed,enabled = False): + def write_vertical_time(self, count=-1): + self.write_submit_time(enable_vertical_write=True) + self.write_start_time(enable_vertical_write=True, from_stat_file=True, count=count) + self.write_end_time(self.status == Status.COMPLETED, enable_vertical_write=True, count=count) + + def write_end_time(self, completed, enable_vertical_write=False, count = -1): """ Writes ends date and time to TOTAL_STATS file - :param enabled: :param completed: True if job was completed successfully, False otherwise :type completed: bool """ - if self.wrapper_type != "vertical" or enabled: - self._platform.get_stat_file(self.name, retries=5) - end_time = self.check_end_time() - path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') - f = open(path, 'a') - f.write(' ') - finish_time = None - final_status = None - if len(str(end_time)) > 0: - # noinspection PyTypeChecker - f.write(date2str(datetime.datetime.fromtimestamp(float(end_time)), 'S')) - # date2str(datetime.datetime.fromtimestamp(end_time), 'S') - finish_time = end_time - else: - f.write(date2str(datetime.datetime.now(), 'S')) - finish_time = time.time() # date2str(datetime.datetime.now(), 'S') - f.write(' ') - if completed: - final_status = "COMPLETED" - f.write('COMPLETED') - else: - final_status = "FAILED" - f.write('FAILED') - out, err = self.local_logs - path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) - # Launch first as simple non-threaded function - exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue, - wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) + if not enable_vertical_write and self.wrapper_type == "vertical": + return + end_time = self.check_end_time(count) + path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + f = open(path, 'a') + f.write(' ') + finish_time = None + final_status = None + if end_time > 0: + # noinspection PyTypeChecker + f.write(date2str(datetime.datetime.fromtimestamp(float(end_time)), 'S')) + self.finish_time_timestamp = date2str(datetime.datetime.fromtimestamp(end_time),'S') + # date2str(datetime.datetime.fromtimestamp(end_time), 'S') + finish_time = end_time + else: + f.write(date2str(datetime.datetime.now(), 'S')) + self.finish_time_timestamp = date2str(datetime.datetime.now(), 'S') + finish_time = time.time() # date2str(datetime.datetime.now(), 'S') + f.write(' ') + if completed: + final_status = "COMPLETED" + f.write('COMPLETED') + else: + final_status = "FAILED" + f.write('FAILED') + out, err = self.local_logs + path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) + # Launch first as simple non-threaded function + exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, ncpus=self.processors, + wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, + platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue, + wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) - # Launch second as threaded function only for slurm - if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm": - thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish, args=(job_data_dc, self.platform)) - thread_write_finish.name = "JOB_data_{}".format(self.name) - thread_write_finish.start() + # Launch second as threaded function only for slurm + if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm": + thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish, args=(job_data_dc, self.platform)) + thread_write_finish.name = "JOB_data_{}".format(self.name) + thread_write_finish.start() - def write_total_stat_by_retries(self,total_stats, first_retrial = False): + def write_total_stat_by_retries(self, total_stats, first_retrial = False): """ Writes all data to TOTAL_STATS file :param total_stats: data gathered by the wrapper @@ -2374,8 +2271,6 @@ class Job(object): :type first_retrial: bool """ - if first_retrial: - self.write_submit_time(enabled=True) path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') if first_retrial: @@ -2385,6 +2280,12 @@ class Job(object): out, err = self.local_logs path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) # Launch first as simple non-threaded function + + exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) + exp_history.write_start_time(self.name, start=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, + wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, + platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), + children=self.children_names_str) if not first_retrial: exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) exp_history.write_submit_time(self.name, submit=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, @@ -2392,12 +2293,6 @@ class Job(object): platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str) exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) - exp_history.write_start_time(self.name, start=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors, - wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, - platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name), - children=self.children_names_str) - - exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR) job_data_dc = exp_history.write_finish_time(self.name, finish=total_stats[1], status=total_stats[2], ncpus=self.processors, wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk, platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue, @@ -2470,7 +2365,7 @@ class Job(object): def synchronize_logs(self, platform, remote_logs, local_logs, last = True): platform.move_file(remote_logs[0], local_logs[0], True) # .out platform.move_file(remote_logs[1], local_logs[1], True) # .err - if last: + if last and local_logs[0] != "": self.local_logs = local_logs self.remote_logs = copy.deepcopy(local_logs) @@ -2593,6 +2488,7 @@ class WrapperJob(Job): if job.name in completed_files: completed_jobs.append(job) job.new_status = Status.COMPLETED + job.updated_log = False job.update_status(self.as_config) for job in completed_jobs: self.running_jobs_start.pop(job, None) diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 69d54135278b0f4283d108382a38f760e3a1f6b7..3999a03b06695faaad2361b34006455ae84f1846 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -199,7 +199,7 @@ class StatisticsSnippetPython: locale.setlocale(locale.LC_ALL, 'C') job_name_ptrn = '%CURRENT_LOGDIR%/%JOBNAME%' stat_file = open(job_name_ptrn + '_STAT', 'w') - stat_file.write('{0:.0f}\\n'.format(time.time())) + stat_file.write('int({0:.0f})\\n'.format(time.time())) stat_file.close() ################### # Autosubmit Checkpoint @@ -228,7 +228,7 @@ class StatisticsSnippetPython: ################### stat_file = open(job_name_ptrn + '_STAT', 'a') - stat_file.write('{0:.0f}\\n'.format(time.time())) + stat_file.write('int({0:.0f})\\n'.format(time.time())) stat_file.close() open(job_name_ptrn + '_COMPLETED', 'a').close() exit(0) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index f23ca4e7336774eb7ed730b84a47d497a43185ed..0cd615b786e5b49c7d73faef7945bf0074bc8326 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -25,6 +25,7 @@ from contextlib import suppress from shutil import move from threading import Thread from typing import List, Dict +from pathlib import Path import math import networkx as nx @@ -32,6 +33,12 @@ from bscearth.utils.date import date2str, parse_date from networkx import DiGraph from time import localtime, strftime, mktime +import math +import networkx as nx +from bscearth.utils.date import date2str, parse_date +from networkx import DiGraph +from time import localtime, strftime, mktime, time + import autosubmit.database.db_structure as DbStructure from autosubmit.helpers.data_transfer import JobRow from autosubmit.job.job import Job @@ -46,8 +53,6 @@ from autosubmitconfigparser.config.configcommon import AutosubmitConfig from log.log import AutosubmitCritical, AutosubmitError, Log -# Log.get_logger("Log.Autosubmit") - def threaded(fn): def wrapper(*args, **kwargs): @@ -97,6 +102,7 @@ class JobList(object): self.graph = DiGraph() self.depends_on_previous_chunk = dict() self.depends_on_previous_split = dict() + self.path_to_logs = Path(BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR,f'LOG_{self.expid}') @property def expid(self): @@ -1671,6 +1677,36 @@ class JobList(object): else: return completed_jobs + def get_completed_without_logs(self, platform=None): + """ + Returns a list of completed jobs without updated logs + + :param platform: job platform + :type platform: HPCPlatform + :return: completed jobs + :rtype: list + """ + + completed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and + job.status == Status.COMPLETED and job.updated_log is False ] + + return completed_jobs + + def get_completed_without_logs(self, platform=None): + """ + Returns a list of completed jobs without updated logs + + :param platform: job platform + :type platform: HPCPlatform + :return: completed jobs + :rtype: list + """ + + completed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and + job.status == Status.COMPLETED and job.updated_log is False ] + + return completed_jobs + def get_uncompleted(self, platform=None, wrapper=False): """ Returns a list of completed jobs @@ -2499,6 +2535,31 @@ class JobList(object): return jobs_to_check + def update_log_status(self, job, as_conf): + """ + Updates the log err and log out. + """ + if not hasattr(job,"updated_log") or not job.updated_log: # hasattr for backward compatibility (job.updated_logs is only for newer jobs, as the loaded ones may not have this set yet) + # order path_to_logs by name and get the two last element + log_file = False + if job.wrapper_type == "vertical" and job.fail_count > 0: + for log_recovered in self.path_to_logs.glob(f"{job.name}.*._{job.fail_count}.out"): + if job.local_logs[0][-4] in log_recovered.name: + log_file = True + break + else: + for log_recovered in self.path_to_logs.glob(f"{job.name}.*.out"): + if job.local_logs[0] == log_recovered.name: + log_file = True + break + + if log_file: + if not hasattr(job, "ready_start_date") or not job.ready_start_date or job.local_logs[0] >= job.ready_start_date: # hasattr for backward compatibility + job.local_logs = (log_recovered.name, log_recovered.name[:-4] + ".err") + job.updated_log = True + if not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false": + job.platform.add_job_to_log_recover(job) + def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time=False): # type: (AutosubmitConfig, bool, bool, object, bool) -> bool """ @@ -2578,6 +2639,8 @@ class JobList(object): # Check checkpoint jobs, the status can be Any for job in self.check_special_status(): job.status = Status.READY + # Run start time in format (YYYYMMDDHH:MM:SS) from current time + job.ready_start_date = strftime("%Y%m%d%H%M%S") job.id = None job.packed = False job.wrapper_type = None @@ -2586,6 +2649,10 @@ class JobList(object): # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): job.packed = False + # Log name has this format: + # a02o_20000101_fc0_2_SIM.20240212115021.err + # $jobname.$(YYYYMMDDHHMMSS).err or .out + self.update_log_status(job, as_conf) if job.synchronize is not None and len(str(job.synchronize)) > 0: tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): @@ -2673,6 +2740,9 @@ class JobList(object): if len(tmp2) == len(job.parents) and len(tmp3) != len(job.parents): job.status = Status.READY job.packed = False + # Run start time in format (YYYYMMDDHH:MM:SS) from current time + job.ready_start_date = strftime("%Y%m%d%H%M%S") + job.packed = False job.hold = False save = True Log.debug( diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 9fc95fbf63b0be0dae844bd00678116f7a99a60c..d3eda6a82d9b0ff170e46ef16ff20f78a9a5169e 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -484,6 +484,8 @@ class JobPackager(object): built_packages_tmp = list() for param in self.wrapper_info: current_info.append(param[self.current_wrapper_section]) + current_info.append(self._as_config) + if self.wrapper_type[self.current_wrapper_section] == 'vertical': built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits,wrapper_info=current_info) elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': @@ -595,9 +597,8 @@ class JobPackager(object): if job.packed is False: job.packed = True dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member(self.current_wrapper_section) - job_vertical_packager = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, wrapper_limits["max"], wrapper_limits, self._platform.max_wallclock) + job_vertical_packager = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, wrapper_limits["max"], wrapper_limits, self._platform.max_wallclock,wrapper_info=wrapper_info) jobs_list = job_vertical_packager.build_vertical_package(job) - packages.append(JobPackageVertical(jobs_list, configuration=self._as_config,wrapper_section=self.current_wrapper_section,wrapper_info=wrapper_info)) else: @@ -605,6 +606,7 @@ class JobPackager(object): return packages def _build_hybrid_package(self, jobs_list, wrapper_limits, section,wrapper_info={}): + self.wrapper_info = wrapper_info jobs_resources = dict() jobs_resources['MACHINEFILES'] = self._as_config.get_wrapper_machinefiles() @@ -620,12 +622,12 @@ class JobPackager(object): def _build_horizontal_vertical_package(self, horizontal_packager, section, jobs_resources): total_wallclock = '00:00' - horizontal_package = horizontal_packager.build_horizontal_package() + horizontal_package = horizontal_packager.build_horizontal_package(wrapper_info=self.wrapper_info) horizontal_packager.create_sections_order(section) horizontal_packager.add_sectioncombo_processors( horizontal_packager.total_processors) horizontal_package.sort( - key=lambda job: horizontal_packager.sort_by_expression(job.name)) + key=lambda job: horizontal_packager.sort_by_expression(job.section)) job = max(horizontal_package, key=attrgetter('total_wallclock')) wallclock = job.wallclock current_package = [horizontal_package] @@ -663,7 +665,7 @@ class JobPackager(object): dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member(self.current_wrapper_section) job_list = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, horizontal_packager.wrapper_limits["max"], horizontal_packager.wrapper_limits, - self._platform.max_wallclock).build_vertical_package(job) + self._platform.max_wallclock,wrapper_info=self.wrapper_info).build_vertical_package(job) current_package.append(list(set(job_list))) for job in current_package[-1]: @@ -718,6 +720,7 @@ class JobPackagerVertical(object): child = self.get_wrappable_child(job) # If not None, it is wrappable if child is not None and len(str(child)) > 0: + child.update_parameters(self.wrapper_info[-1],{}) # Calculate total wallclock per possible wrapper self.total_wallclock = sum_str_hours( self.total_wallclock, child.wallclock) @@ -856,14 +859,16 @@ class JobPackagerHorizontal(object): self._maxTotalProcessors = 0 self._sectionList = list() self._package_sections = dict() - - def build_horizontal_package(self, horizontal_vertical=False): + self.wrapper_info = [] + def build_horizontal_package(self, horizontal_vertical=False,wrapper_info={}): + self.wrapper_info = wrapper_info current_package = [] current_package_by_section = {} if horizontal_vertical: self._current_processors = 0 jobs_by_section = dict() for job in self.job_list: + job.update_parameters(self.wrapper_info[-1],{}) if job.section not in jobs_by_section: jobs_by_section[job.section] = list() jobs_by_section[job.section].append(job) @@ -918,9 +923,8 @@ class JobPackagerHorizontal(object): max(self._package_sections.values()), self._maxTotalProcessors) return True - def sort_by_expression(self, jobname): - jobname = jobname.split('_')[-1] - return self._sort_order_dict[jobname] + def sort_by_expression(self, section): + return self._sort_order_dict[section] def get_next_packages(self, jobs_sections, max_wallclock=None, potential_dependency=None, packages_remote_dependencies=list(), horizontal_vertical=False, max_procs=0): packages = [] @@ -939,12 +943,13 @@ class JobPackagerHorizontal(object): if other_parent.status != Status.COMPLETED and other_parent not in self.job_list: wrappable = False if wrappable and child not in next_section_list: + child.update_parameters(self.wrapper_info[-1],{}) next_section_list.append(child) next_section_list.sort( - key=lambda job: self.sort_by_expression(job.name)) + key=lambda job: self.sort_by_expression(job.section)) self.job_list = next_section_list - package_jobs = self.build_horizontal_package(horizontal_vertical) + package_jobs = self.build_horizontal_package(horizontal_vertical,wrapper_info=self.wrapper_info) if package_jobs: sections_aux = set() diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 5294193ec9093ee7133ff7cf2ae420432df69a25..581738da422fd0387fabe1a916cd04827c88da35 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -333,14 +333,15 @@ class JobPackageArray(JobPackageBase): package_id = self.platform.submit_job(None, self._common_script, hold=hold, export = self.export) - if package_id is None or not package_id: + if package_id is None or not package_id: # platforms with a submit.cmd return - - for i in range(0, len(self.jobs)): + wrapper_time = None + for i in range(0, len(self.jobs)): # platforms without a submit.cmd Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) + '[{0}]'.format(i) self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold) + self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) + wrapper_time = self.jobs[i].write_submit_time class JobPackageThread(JobPackageBase): @@ -611,6 +612,7 @@ class JobPackageThread(JobPackageBase): filenames += " " + self.platform.remote_log_dir + "/" + job.name + "_STAT " + \ self.platform.remote_log_dir + "/" + job.name + "_COMPLETED" self.platform.remove_multiple_files(filenames) + else: for job in self.jobs: self.platform.remove_stat_file(job.name) @@ -618,16 +620,18 @@ class JobPackageThread(JobPackageBase): if hold: job.hold = hold + package_id = self.platform.submit_job(None, self._common_script, hold=hold, export = self.export) if package_id is None or not package_id: return - - for i in range(0, len(self.jobs) ): + wrapper_time = None + for i in range(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) - self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold) + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) + wrapper_time = self.jobs[i].write_submit_time def _common_script_content(self): pass @@ -696,16 +700,19 @@ class JobPackageThreadWrapped(JobPackageThread): if hold: job.hold = hold + package_id = self.platform.submit_job(None, self._common_script, hold=hold, export = self.export) if package_id is None or not package_id: raise Exception('Submission failed') - + wrapper_time = None for i in range(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) - self.jobs[i].status = Status.SUBMITTED - self.jobs[i].write_submit_time(hold=hold) + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time) + wrapper_time = self.jobs[i].write_submit_time + class JobPackageVertical(JobPackageThread): """ Class to manage a vertical thread-based package of jobs to be submitted by autosubmit @@ -713,7 +720,7 @@ class JobPackageVertical(JobPackageThread): :type jobs: :param: dependency: """ - def __init__(self, jobs, dependency=None,configuration=None,wrapper_section="WRAPPERS", wrapper_info = {}): + def __init__(self, jobs, dependency=None,configuration=None,wrapper_section="WRAPPERS", wrapper_info = []): self._num_processors = 0 for job in jobs: if int(job.processors) >= int(self._num_processors): diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index e1b9bb3b256434becb3868f449471303d69b6779..4b0afea1f672cfc5427d0d8554ec77980f4c30c9 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -453,6 +453,8 @@ class Monitor: log_out = "" log_err = "" if job.status in [Status.FAILED, Status.COMPLETED]: + if type(job.local_logs) is not tuple: + job.local_logs = ("","") log_out = path + "/" + job.local_logs[0] log_err = path + "/" + job.local_logs[1] diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index fb880e694e26ec671b61235b2afde0b7e99b11b4..b023677a4ccae76a72d222bebe1b7d7085bef224 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -153,7 +153,7 @@ class EcPlatform(ParamikoPlatform): export += " ; " return export + self._submit_cmd + job_script - def connect(self, reconnect=False): + def connect(self, as_conf, reconnect=False): """ In this case, it does nothing because connection is established for each command @@ -170,7 +170,13 @@ class EcPlatform(ParamikoPlatform): self.connected = False except: self.connected = False - def restore_connection(self): + if not self.log_retrieval_process_active and ( + as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', + "false")).lower() == "false"): + self.log_retrieval_process_active = True + self.recover_job_logs() + + def restore_connection(self,as_conf): """ In this case, it does nothing because connection is established for each command @@ -187,7 +193,8 @@ class EcPlatform(ParamikoPlatform): self.connected = False except: self.connected = False - def test_connection(self): + + def test_connection(self,as_conf): """ In this case, it does nothing because connection is established for each command diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 7f41060eb80398eddce42dca098ca6260a49fa5b..ae8c7dd6017377d243612356c4c191d412a5beb5 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -28,7 +28,7 @@ from autosubmit.platforms.headers.local_header import LocalHeader from autosubmitconfigparser.config.basicconfig import BasicConfig from time import sleep from log.log import Log, AutosubmitError, AutosubmitCritical - +import threading class LocalPlatform(ParamikoPlatform): """ Class to manage jobs to localhost @@ -111,17 +111,27 @@ class LocalPlatform(ParamikoPlatform): def get_checkjob_cmd(self, job_id): return self.get_pscall(job_id) - def connect(self, reconnect=False): - self.connected = True - def test_connection(self): + def connect(self, as_conf, reconnect=False): self.connected = True - def restore_connection(self): + if not self.log_retrieval_process_active and ( + as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS',"false")).lower() == "false"): + self.log_retrieval_process_active = True + self.recover_job_logs() + + + def test_connection(self,as_conf): + if not self.connected: + self.connect(as_conf) + + + def restore_connection(self,as_conf): self.connected = True def check_Alljobs(self, job_list, as_conf, retries=5): for job,prev_job_status in job_list: self.check_job(job) - def send_command(self, command,ignore_log=False, x11 = False): + + def send_command(self, command, ignore_log=False, x11 = False): lang = locale.getlocale()[1] if lang is None: lang = locale.getdefaultlocale()[1] @@ -175,7 +185,7 @@ class LocalPlatform(ParamikoPlatform): return True # Moves .err .out - def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): """ Moves a file on the platform :param src: source name @@ -187,12 +197,17 @@ class LocalPlatform(ParamikoPlatform): file_exist = False remote_path = os.path.join(self.get_files_path(), src) retries = 0 + # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on + if not first: + max_retries = 1 + sleeptime = 0 while not file_exist and retries < max_retries: try: file_exist = os.path.isfile(os.path.join(self.get_files_path(),src)) if not file_exist: # File doesn't exist, retry in sleep-time - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, remote_path) + if first: + Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, + max_retries - retries, remote_path) if not wrapper_failed: sleep(sleeptime) sleeptime = sleeptime + 5 diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index a03ec5dee262ed14507dd98361453148c61c3306..ed65c772d3304004bd1da59af57dceed9e5b3045 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -138,27 +138,4 @@ class LsfPlatform(ParamikoPlatform): ############################################################################### """.format(filename, queue, project, wallclock, num_procs, dependency, '\n'.ljust(13).join(str(s) for s in directives)) - # def connect(self): - # """ - # In this case, it does nothing because connection is established for each command - # - # :return: True - # :rtype: bool - # """ - # self.connected = True - # def restore_connection(self): - # """ - # In this case, it does nothing because connection is established for each command - # - # :return: True - # :rtype: bool - # """ - # self.connected = True - # def test_connection(self): - # """ - # In this case, it does nothing because connection is established for each command - # - # :return: True - # :rtype: bool - # """ - # self.connected = True + diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 8bb6ef2cc016b016444bf38cad405cdddcb679c3..4d9e7169ff18abb3efe0a8925ab7d708ccea2f61 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1,5 +1,6 @@ +import copy + import locale -from binascii import hexlify from contextlib import suppress from time import sleep import sys @@ -7,7 +8,6 @@ import socket import os import paramiko import datetime -import time import select import re from datetime import timedelta @@ -15,17 +15,17 @@ import random from autosubmit.job.job_common import Status from autosubmit.job.job_common import Type from autosubmit.platforms.platform import Platform -from bscearth.utils.date import date2str from log.log import AutosubmitError, AutosubmitCritical, Log from paramiko.ssh_exception import (SSHException) import Xlib.support.connect as xlib_connect from threading import Thread +import threading import getpass from paramiko.agent import Agent def threaded(fn): def wrapper(*args, **kwargs): - thread = Thread(target=fn, args=args, kwargs=kwargs) + thread = Thread(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_X11") thread.start() return thread @@ -115,7 +115,7 @@ class ParamikoPlatform(Platform): self.local_x11_display = xlib_connect.get_display(display) - def test_connection(self): + def test_connection(self,as_conf): """ Test if the connection is still alive, reconnect if not. """ @@ -123,7 +123,7 @@ class ParamikoPlatform(Platform): if not self.connected: self.reset() try: - self.restore_connection() + self.restore_connection(as_conf) message = "OK" except BaseException as e: message = str(e) @@ -134,6 +134,7 @@ class ParamikoPlatform(Platform): except: message = "Timeout connection" return message + except EOFError as e: self.connected = False raise AutosubmitError("[{0}] not alive. Host: {1}".format( @@ -146,13 +147,13 @@ class ParamikoPlatform(Platform): raise AutosubmitCritical(str(e),7051) #raise AutosubmitError("[{0}] connection failed for host: {1}".format(self.name, self.host), 6002, e.message) - def restore_connection(self): + def restore_connection(self, as_conf): try: self.connected = False retries = 2 retry = 0 try: - self.connect() + self.connect(as_conf) except Exception as e: if ',' in self.host: Log.printlog("Connection Failed to {0}, will test another host".format( @@ -162,7 +163,7 @@ class ParamikoPlatform(Platform): "First connection to {0} is failed, check host configuration or try another login node ".format(self.host), 7050,str(e)) while self.connected is False and retry < retries: try: - self.connect(True) + self.connect(as_conf,True) except Exception as e: pass retry += 1 @@ -193,7 +194,8 @@ class ParamikoPlatform(Platform): key.public_blob = None self._ssh.connect(self._host_config['hostname'], port=port, username=self.user, timeout=60, banner_timeout=60) except BaseException as e: - Log.warning(f'Failed to authenticate with ssh-agent due to {e}') + Log.debug(f'Failed to authenticate with ssh-agent due to {e}') + Log.debug('Trying to authenticate with other methods') return False return True @@ -224,7 +226,7 @@ class ParamikoPlatform(Platform): # pass return tuple(answers) - def connect(self, reconnect=False): + def connect(self, as_conf, reconnect=False): """ Creates ssh connection to host @@ -266,7 +268,7 @@ class ParamikoPlatform(Platform): except Exception as e: self._ssh.connect(self._host_config['hostname'], port, username=self.user, key_filename=self._host_config_id, sock=self._proxy, timeout=60, - banner_timeout=60,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}) + banner_timeout=60, disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}) else: try: self._ssh.connect(self._host_config['hostname'], port, username=self.user, @@ -300,7 +302,10 @@ class ParamikoPlatform(Platform): self._ftpChannel = paramiko.SFTPClient.from_transport(self.transport,window_size=pow(4, 12) ,max_packet_size=pow(4, 12) ) self._ftpChannel.get_channel().settimeout(120) self.connected = True - except SSHException as e: + if not self.log_retrieval_process_active and (as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): + self.log_retrieval_process_active = True + self.recover_job_logs() + except SSHException: raise except IOError as e: if "refused" in str(e.strerror).lower(): @@ -315,7 +320,7 @@ class ParamikoPlatform(Platform): raise AutosubmitCritical("Authentication Failed, please check the platform.conf of {0}".format( self._host_config['hostname']), 7050, str(e)) if not reconnect and "," in self._host_config['hostname']: - self.restore_connection() + self.restore_connection(as_conf) else: raise AutosubmitError( "Couldn't establish a connection to the specified host, wrong configuration?", 6003, str(e)) @@ -473,7 +478,10 @@ class ParamikoPlatform(Platform): path_root = self.get_files_path() src = os.path.join(path_root, src) dest = os.path.join(path_root, dest) - self._ftpChannel.rename(src,dest) + try: + self._ftpChannel.stat(dest) + except IOError: + self._ftpChannel.rename(src,dest) return True except IOError as e: @@ -644,6 +652,9 @@ class ParamikoPlatform(Platform): job_status = Status.UNKNOWN Log.error( 'check_job() The job id ({0}) status is {1}.', job_id, job_status) + + if job_status in [Status.FAILED, Status.COMPLETED]: + job.updated_log = False if submit_hold_check: return job_status else: @@ -775,7 +786,6 @@ class ParamikoPlatform(Platform): elif retries == 0: job_status = Status.COMPLETED job.update_status(as_conf) - else: job_status = Status.UNKNOWN Log.error( @@ -887,6 +897,7 @@ class ParamikoPlatform(Platform): sys.stdout.write(session.recv(4096)) while session.recv_stderr_ready(): sys.stderr.write(session.recv_stderr(4096)) + @threaded def x11_status_checker(self, session, session_fileno): self.transport.accept() @@ -967,7 +978,7 @@ class ParamikoPlatform(Platform): except paramiko.SSHException as e: if str(e) in "SSH session not active": self._ssh = None - self.restore_connection() + self.restore_connection(None) timeout = timeout + 60 retries = retries - 1 if retries <= 0: @@ -1325,16 +1336,6 @@ class ParamikoPlatform(Platform): if self.transport: self.transport.close() self.transport.stop_thread() - with suppress(Exception): - del self._ssh._agent # May not be in all runs - with suppress(Exception): - del self._ssh._transport - with suppress(Exception): - del self._ftpChannel - with suppress(Exception): - del self.transport - with suppress(Exception): - del self._ssh def check_tmp_exists(self): try: @@ -1366,8 +1367,6 @@ class ParamikoPlatform(Platform): """ Creates log dir on remote host """ - - try: if self.send_command(self.get_mkdir_cmd()): Log.debug('{0} has been created on {1} .', diff --git a/autosubmit/platforms/pbsplatform.py b/autosubmit/platforms/pbsplatform.py index 132b8715c03cdddd367669af807384a8134a933e..1a1ef89b5cd6c27c537aaa971c371876504b8fc1 100644 --- a/autosubmit/platforms/pbsplatform.py +++ b/autosubmit/platforms/pbsplatform.py @@ -129,27 +129,4 @@ class PBSPlatform(ParamikoPlatform): return self._checkjob_cmd + str(job_id) else: return "ssh " + self.host + " " + self.get_qstatjob(job_id) - # def connect(self): - # """ - # In this case, it does nothing because connection is established for each command - # - # :return: True - # :rtype: bool - # """ - # self.connected = True - # def restore_connection(self): - # """ - # In this case, it does nothing because connection is established for each command - # - # :return: True - # :rtype: bool - # """ - # self.connected = True - # def test_connection(self): - # """ - # In this case, it does nothing because connection is established for each command - # - # :return: True - # :rtype: bool - # """ - # self.connected = True + diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 9014cd6a5b5544a490a1b1d7730672370da360bb..9e182c5c0fb229baa92731a8b4e68b31dde81c3f 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -463,9 +463,13 @@ class PJMPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): file_exist = False retries = 0 + # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on + if not first: + max_retries = 1 + sleeptime = 0 while not file_exist and retries < max_retries: try: # This return IOError if path doesn't exist @@ -473,8 +477,9 @@ class PJMPlatform(ParamikoPlatform): self.get_files_path(), filename)) file_exist = True except IOError as e: # File doesn't exist, retry in sleeptime - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(), filename)) + if first: + Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, + max_retries - retries, os.path.join(self.get_files_path(), filename)) if not wrapper_failed: sleep(sleeptime) sleeptime = sleeptime + 5 diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 05340a526ccce9f2b51ecfa6925ebc5be2763584..ac3d09eabc093bae281a14012840b715ac6bfcb6 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,6 +1,11 @@ +import copy + +import queue + +import time + import locale import os -from pathlib import Path import traceback from autosubmit.job.job_common import Status @@ -8,7 +13,16 @@ from typing import List, Union from autosubmit.helpers.parameters import autosubmit_parameter from log.log import AutosubmitCritical, AutosubmitError, Log -import getpass +from multiprocessing import Process, Queue + + +def processed(fn): + def wrapper(*args, **kwargs): + process = Process(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_platform") + process.start() + return process + + return wrapper class Platform(object): """ Class to manage the connections to the different platforms. @@ -78,6 +92,8 @@ class Platform(object): self.pw = auth_password else: self.pw = None + self.recovery_queue = Queue() + self.log_retrieval_process_active = False @property @@ -272,6 +288,7 @@ class Platform(object): for innerJob in package._jobs: # Setting status to COMPLETED, so it does not get stuck in the loop that calls this function innerJob.status = Status.COMPLETED + innerJob.updated_log = False # If called from RUN or inspect command if not only_wrappers: @@ -318,6 +335,7 @@ class Platform(object): raise except Exception as e: raise + return save, failed_packages, error_message, valid_packages_to_submit @property @@ -624,10 +642,10 @@ class Platform(object): if self.check_file_exists(filename): self.delete_file(filename) - def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3): + def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): return True - def get_stat_file(self, job_name, retries=0): + def get_stat_file(self, job_name, retries=0, count = -1): """ Copies *STAT* files from remote to local @@ -638,7 +656,10 @@ class Platform(object): :return: True if successful, False otherwise :rtype: bool """ - filename = job_name + '_STAT' + if count == -1: # No internal retrials + filename = job_name + '_STAT' + else: + filename = job_name + '_STAT_{0}'.format(str(count)) stat_local_path = os.path.join( self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR"), filename) if os.path.exists(stat_local_path): @@ -650,46 +671,6 @@ class Platform(object): Log.debug('{0}_STAT file not found', job_name) return False - def check_stat_file_by_retrials(self, job_name, retries=0): - """ - check *STAT* file - - :param retries: number of intents to get the completed files - :type retries: int - :param job_name: name of job to check - :type job_name: str - :return: True if successful, False otherwise - :rtype: bool - """ - filename = job_name - if self.check_file_exists(filename): - return True - else: - return False - - def get_stat_file_by_retrials(self, job_name, retries=0): - """ - Copies *STAT* files from remote to local - - :param retries: number of intents to get the completed files - :type retries: int - :param job_name: name of job to check - :type job_name: str - :return: True if successful, False otherwise - :rtype: bool - """ - filename = job_name - stat_local_path = os.path.join( - self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR"), filename) - if os.path.exists(stat_local_path): - os.remove(stat_local_path) - if self.check_file_exists(filename): - if self.get_file(filename, True): - return True - else: - return False - else: - return False @autosubmit_parameter(name='current_logdir') def get_files_path(self): @@ -821,3 +802,49 @@ class Platform(object): """ raise NotImplementedError + def add_job_to_log_recover(self, job): + self.recovery_queue.put((job,job.children)) + + def connect(self, as_conf, reconnect=False): + raise NotImplementedError + + def restore_connection(self,as_conf): + raise NotImplementedError + + @processed + def recover_job_logs(self): + job_names_processed = set() + self.connected = False + self.restore_connection(None) + while True: + try: + job,children = self.recovery_queue.get() + if job.wrapper_type != "vertical": + if f'{job.name}_{job.fail_count}' in job_names_processed: + continue + else: + if f'{job.name}' in job_names_processed: + continue + job.children = children + job.platform = self + try: + job.retrieve_logfiles(self, raise_error=True) + if job.wrapper_type != "vertical": + job_names_processed.add(f'{job.name}_{job.fail_count}') + else: + job_names_processed.add(f'{job.name}') + except: + pass + except queue.Empty: + pass + except (IOError, OSError): + pass + except Exception as e: + try: + self.restore_connection(None) + except: + pass + time.sleep(1) + + + diff --git a/autosubmit/platforms/sgeplatform.py b/autosubmit/platforms/sgeplatform.py index 58671cd98896fcd2c32f8c086dd73b51878f8f3b..875d455996b6fe1b1f102cd9f68465a142b058ac 100644 --- a/autosubmit/platforms/sgeplatform.py +++ b/autosubmit/platforms/sgeplatform.py @@ -114,7 +114,7 @@ class SgePlatform(ParamikoPlatform): def get_checkjob_cmd(self, job_id): return self.get_qstatjob(job_id) - def connect(self,reconnect=False): + def connect(self, as_conf, reconnect=False): """ In this case, it does nothing because connection is established for each command @@ -122,7 +122,12 @@ class SgePlatform(ParamikoPlatform): :rtype: bool """ self.connected = True - def restore_connection(self): + if not self.log_retrieval_process_active and ( + as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', + "false")).lower() == "false"): + self.log_retrieval_process_active = True + self.recover_job_logs() + def restore_connection(self,as_conf): """ In this case, it does nothing because connection is established for each command @@ -130,7 +135,8 @@ class SgePlatform(ParamikoPlatform): :rtype: bool """ self.connected = True - def test_connection(self): + + def test_connection(self,as_conf): """ In this case, it does nothing because connection is established for each command @@ -138,3 +144,5 @@ class SgePlatform(ParamikoPlatform): :rtype: bool """ self.connected = True + self.connected(as_conf,True) + diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index e741239dbc56fac949395e60b2c31d7db302f76b..c52a6c0e1c3bb20be24c67efe9ce6d880f7de1df 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -170,7 +170,6 @@ class SlurmPlatform(ParamikoPlatform): job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED - job.write_submit_time(hold=hold) # Check if there are duplicated jobnames if not duplicated_jobs_already_checked: job_name = package.name if hasattr(package, "name") else package.jobs[0].name @@ -629,9 +628,13 @@ class SlurmPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))""" - def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3): + def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True): file_exist = False retries = 0 + # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on + if not first: + max_retries = 1 + sleeptime = 0 while not file_exist and retries < max_retries: try: # This return IOError if path doesn't exist @@ -639,8 +642,9 @@ class SlurmPlatform(ParamikoPlatform): self.get_files_path(), filename)) file_exist = True except IOError as e: # File doesn't exist, retry in sleeptime - Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(), filename)) + if first: + Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime, + max_retries - retries, os.path.join(self.get_files_path(), filename)) if not wrapper_failed: sleep(sleeptime) sleeptime = sleeptime + 5 diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index df348f6dd7b1bc07fa5607f89934b134a1690aea..d40a985d1b2fc52907b8c189dff055836142aa65 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -451,12 +451,12 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): for i in range(len({0})): job_retrials = retrials completed = False - while job_retrials >= 0 and not completed: + fail_count = 0 + while fail_count <= job_retrials and not completed: current = {1} current.start() - os.system("echo "+str(time.time())+" > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) #Start/submit running + os.system("echo "+str(int(time.time()))+" > "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Start/submit running current.join({3}) - job_retrials = job_retrials - 1 total_steps = total_steps + 1 """).format(jobs_list, thread,self.retrials,str(self.wallclock_by_level),'\n'.ljust(13)) @@ -467,15 +467,17 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_filename = {0}[i].replace('.cmd', '_FAILED') failed_path = os.path.join(os.getcwd(), failed_filename) failed_wrapper = os.path.join(os.getcwd(), wrapper_id) - os.system("echo "+str(time.time())+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) #Completed + os.system("echo "+str(int(time.time()))+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed if os.path.exists(completed_path): completed = True print(datetime.now(), "The job ", current.template," has been COMPLETED") - os.system("echo COMPLETED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) + os.system("echo COMPLETED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count)) else: print(datetime.now(), "The job ", current.template," has FAILED") - os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) + os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count)) #{1} + fail_count = fail_count + 1 + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 8) sequential_threads_launcher += self._indent(textwrap.dedent(""" if not os.path.exists(completed_path): @@ -493,17 +495,17 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): def build_job_thread(self): # fastlook return textwrap.dedent(""" class JobThread(Thread): - def __init__ (self, template, id_run, retrials): + def __init__ (self, template, id_run, retrials, fail_count): Thread.__init__(self) self.template = template self.id_run = id_run self.retrials = retrials + self.fail_count = fail_count def run(self): jobname = self.template.replace('.cmd', '') - #os.system("echo $(date +%s) > "+jobname+"_STAT") - out = str(self.template) + ".out." + str(self.retrials) - err = str(self.template) + ".err." + str(self.retrials) + out = str(self.template) + ".out." + str(self.fail_count) + err = str(self.template) + ".err." + str(self.fail_count) print((out+"\\n")) command = "./" + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() print((command+"\\n")) @@ -515,7 +517,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): """).format(str(self.wallclock_by_level),'\n'.ljust(13)) def build_main(self): self.exit_thread = "os._exit(1)" - return self.build_sequential_threads_launcher("scripts", "JobThread(scripts[i], i, job_retrials)") + return self.build_sequential_threads_launcher("scripts", "JobThread(scripts[i], i, retrials, fail_count)") class PythonHorizontalWrapperBuilder(PythonWrapperBuilder): def build_main(self): diff --git a/docs/source/userguide/configure/develop_a_project.rst b/docs/source/userguide/configure/develop_a_project.rst index 7621b29d085e6cc1b62b975fe1ffd7ac552d4f36..74786fda59a80f6224041f3d327066d168212a07 100644 --- a/docs/source/userguide/configure/develop_a_project.rst +++ b/docs/source/userguide/configure/develop_a_project.rst @@ -121,7 +121,10 @@ Autosubmit configuration TOTALJOBS: 6 # Time (seconds) between connections to the HPC queue scheduler to poll already submitted jobs status # Default:10 - SAFETYSLEEPTIME:10 + SAFETYSLEEPTIME: 10 + # Time (seconds) before ending the run to retrieve the last logs. + # Default:180 + LAST_LOGS_TIMEOUT: 180 # Number of retrials if a job fails. Can ve override at job level # Default:0 RETRIALS:0 diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index c005020b87149a6862fff5447a2315d7c440b2ae..62ff9bc8d8e0405a6c8b4c803c714b512560b149 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -172,6 +172,7 @@ class TestWrappers(TestCase): self.temp_directory = tempfile.mkdtemp() self.job_list = JobList(self.experiment_id, self.config, YAMLParserFactory(), JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf) + self.parser_mock = MagicMock(spec='SafeConfigParser') self._platform.max_waiting_jobs = 100 @@ -200,6 +201,8 @@ class TestWrappers(TestCase): self.job_packager = JobPackager( self.as_conf, self._platform, self.job_list) self.job_list._ordered_jobs_by_date_member["WRAPPERS"] = dict() + self.wrapper_info = ['vertical', 'flexible', 'asthread', ['SIM'], 0,self.as_conf] + def tearDown(self) -> None: shutil.rmtree(self.temp_directory) @@ -272,8 +275,10 @@ class TestWrappers(TestCase): wrapper_limits["min_v"] = 2 wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapped_job_by_section + + returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits, self.wrapper_info) package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2, d1_m1_9_s2, d1_m1_10_s2] @@ -354,7 +359,7 @@ class TestWrappers(TestCase): wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapped_job_by_section returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits, wrapper_info=self.wrapper_info) package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2, d1_m1_9_s2, d1_m1_10_s2] @@ -362,7 +367,7 @@ class TestWrappers(TestCase): d1_m2_9_s2, d1_m2_10_s2] packages = [JobPackageVertical( - package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)] + package_m1_s2,configuration=self.as_conf, wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2,configuration=self.as_conf, wrapper_info=self.wrapper_info)] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -424,7 +429,7 @@ class TestWrappers(TestCase): wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapped_job_by_section returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits, self.wrapper_info) package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2] @@ -432,7 +437,7 @@ class TestWrappers(TestCase): d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2] packages = [JobPackageVertical( - package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)] + package_m1_s2,configuration=self.as_conf,wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2,configuration=self.as_conf,wrapper_info=self.wrapper_info)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -495,7 +500,7 @@ class TestWrappers(TestCase): wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapped_job_by_section returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits, self.wrapper_info) package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2] @@ -503,7 +508,7 @@ class TestWrappers(TestCase): d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2] packages = [JobPackageVertical( - package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)] + package_m1_s2,configuration=self.as_conf, wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2,configuration=self.as_conf, wrapper_info=self.wrapper_info)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -646,7 +651,7 @@ class TestWrappers(TestCase): wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapped_job_by_section returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits,wrapper_info=self.wrapper_info) package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] @@ -654,7 +659,7 @@ class TestWrappers(TestCase): d1_m2_4_s3] packages = [JobPackageVertical( - package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] + package_m1_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -726,12 +731,12 @@ class TestWrappers(TestCase): wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapper_job_by_section returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits,wrapper_info=self.wrapper_info) package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] - packages = [JobPackageVertical(package_m1_s2_s3,configuration=self.as_conf)] + packages = [JobPackageVertical(package_m1_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -805,7 +810,7 @@ class TestWrappers(TestCase): wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapped_job_by_section returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits, wrapper_info=self.wrapper_info) package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] @@ -813,7 +818,7 @@ class TestWrappers(TestCase): d1_m2_4_s3] packages = [JobPackageVertical( - package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] + package_m1_s2_s3,configuration=self.as_conf, wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf, wrapper_info=self.wrapper_info)] #returned_packages = returned_packages[0] # print("test_returned_packages_max_jobs_mixed_wrapper") @@ -895,7 +900,7 @@ class TestWrappers(TestCase): wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapped_job_by_section returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits,wrapper_info=self.wrapper_info) package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2] @@ -903,7 +908,7 @@ class TestWrappers(TestCase): d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2] packages = [JobPackageVertical( - package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] + package_m1_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -977,13 +982,13 @@ class TestWrappers(TestCase): wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapped_job_by_section returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits,wrapper_info=self.wrapper_info) package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3] package_m2_s2_s3 = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3] packages = [JobPackageVertical( - package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] + package_m1_s2_s3,configuration=self.as_conf, wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf, wrapper_info=self.wrapper_info)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -1075,13 +1080,13 @@ class TestWrappers(TestCase): wrapper_limits["min_h"] = 2 wrapper_limits["max_by_section"] = max_wrapped_job_by_section returned_packages = self.job_packager._build_vertical_packages( - section_list, wrapper_limits) + section_list, wrapper_limits, wrapper_info=self.wrapper_info) package_m1_s2_s3 = [d1_m1_2_s3, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] package_m2_s2_s3 = [d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] packages = [JobPackageVertical( - package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] + package_m1_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -1879,6 +1884,7 @@ class TestWrappers(TestCase): self._manage_dependencies(sections_dict) for job in self.job_list.get_job_list(): job._init_runtime_parameters() + job.update_parameters = MagicMock() def _manage_dependencies(self, sections_dict): for job in self.job_list.get_job_list():