diff --git a/VERSION b/VERSION
index a95f2884441f750fc59ff84db191aa57f7ba609f..9d086c6dff671494d94c58ed5ddbb74280033537 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-4.1.4
+4.1.4
\ No newline at end of file
diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py
index ac2a775d379e846d186815e09c091be1fce8238b..b1a3968b9dd934a56ab35a3f51033c232a9fab64 100644
--- a/autosubmit/autosubmit.py
+++ b/autosubmit/autosubmit.py
@@ -1636,6 +1636,8 @@ class Autosubmit:
Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, True,
only_wrappers, hold=False)
job_list.update_list(as_conf, False)
+ for job in job_list.get_job_list():
+ job.status = Status.WAITING
@staticmethod
@@ -2011,8 +2013,7 @@ class Autosubmit:
exp_history = Autosubmit.get_historical_database(expid, job_list,as_conf)
# establish the connection to all platforms
# Restore is a missleading, it is actually a "connect" function when the recover flag is not set.
- Autosubmit.restore_platforms(platforms_to_test)
-
+ Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf)
return job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, False
else:
return job_list, submitter , None, None, as_conf , platforms_to_test, packages_persistence, True
@@ -2193,7 +2194,6 @@ class Autosubmit:
Log.printlog("Error trying to store failed job count", Log.WARNING)
Log.result("Storing failed job count...done")
while not recovery and (recovery_retrials < max_recovery_retrials or max_recovery_retrials <= 0 ):
-
delay = min(15 * consecutive_retrials, 120)
recovery_retrials += 1
sleep(delay)
@@ -2261,6 +2261,23 @@ class Autosubmit:
Log.result("No more jobs to run.")
+
+ # Wait for all remaining threads of I/O, close remaining connections
+ # search hint - finished run
+ Log.info("Waiting for all logs to be updated")
+ # get all threads
+ threads = threading.enumerate()
+ # print name
+ timeout = as_conf.experiment_data.get("CONFIG",{}).get("LAST_LOGS_TIMEOUT", 180)
+ for remaining in range(timeout, 0, -1):
+ if len(job_list.get_completed_without_logs()) == 0:
+ break
+ for job in job_list.get_completed_without_logs():
+ job_list.update_log_status(job, as_conf)
+ sleep(1)
+ if remaining % 10 == 0:
+ Log.info(f"Timeout: {remaining}")
+
# Updating job data header with current information when experiment ends
try:
exp_history = ExperimentHistory(expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR,
@@ -2272,22 +2289,7 @@ class Autosubmit:
Autosubmit.database_fix(expid)
except Exception as e:
pass
- # Wait for all remaining threads of I/O, close remaining connections
- timeout = 0
- active_threads = True
- all_threads = threading.enumerate()
- while active_threads and timeout <= 180:
- active_threads = False
- for thread in all_threads:
- if "JOB_" in thread.name:
- if thread.is_alive():
- active_threads = True
- Log.info("{0} is still retrieving outputs, time remaining is {1} seconds.".format(
- thread.name, 180 - timeout))
- break
- if active_threads:
- sleep(10)
- timeout += 10
+
for platform in platforms_to_test:
platform.closeConnection()
if len(job_list.get_failed()) > 0:
@@ -2327,7 +2329,7 @@ class Autosubmit:
for platform in platform_to_test:
platform_issues = ""
try:
- message = platform.test_connection()
+ message = platform.test_connection(as_conf)
if message is None:
message = "OK"
if message != "OK":
@@ -2432,6 +2434,14 @@ class Autosubmit:
if error_message != "":
raise AutosubmitCritical("Submission Failed due wrong configuration:{0}".format(error_message),
7014)
+ if not inspect:
+ for package in valid_packages_to_submit:
+ wrapper_time = None
+ for job in package.jobs: # if jobs > 1 == wrapped == same submission time
+ job.write_submit_time(wrapper_submit_time=wrapper_time)
+ wrapper_time = job.submit_time_timestamp
+
+
if save_1 or save_2:
return True
else:
@@ -2833,7 +2843,7 @@ class Autosubmit:
job.platform = submitter.platforms[job.platform_name]
platforms_to_test.add(job.platform)
for platform in platforms_to_test:
- platform.test_connection()
+ platform.test_connection(as_conf)
for job in current_active_jobs:
job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True)
@@ -2856,7 +2866,7 @@ class Autosubmit:
# noinspection PyTypeChecker
platforms_to_test.add(platforms[job.platform_name])
# establish the connection to all platforms
- Autosubmit.restore_platforms(platforms_to_test)
+ Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf)
if all_jobs:
jobs_to_recover = job_list.get_job_list()
@@ -2989,7 +2999,7 @@ class Autosubmit:
job.platform_name = as_conf.get_platform()
platforms_to_test.add(platforms[job.platform_name])
# establish the connection to all platforms on use
- Autosubmit.restore_platforms(platforms_to_test)
+ Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf)
Log.info('Migrating experiment {0}'.format(experiment_id))
Autosubmit._check_ownership(experiment_id, raise_error=True)
if submitter.platforms is None:
@@ -3206,7 +3216,7 @@ class Autosubmit:
backup_files = []
# establish the connection to all platforms on use
try:
- Autosubmit.restore_platforms(platforms_to_test)
+ Autosubmit.restore_platforms(platforms_to_test,as_conf=as_conf)
except AutosubmitCritical as e:
raise AutosubmitCritical(
e.message + "\nInvalid Remote Platform configuration, recover them manually or:\n 1) Configure platform.yml with the correct info\n 2) autosubmit expid -p --onlyremote",
@@ -5401,7 +5411,7 @@ class Autosubmit:
definitive_platforms = list()
for platform in platforms_to_test:
try:
- Autosubmit.restore_platforms([platform])
+ Autosubmit.restore_platforms([platform],as_conf=as_conf)
definitive_platforms.append(platform.name)
except Exception as e:
pass
diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py
index ee0558edd7e3847440e9aa3432a817157d45cad4..5fd081600ba82f151b9c7e5d42a0b711ce26cf82 100644
--- a/autosubmit/history/experiment_history.py
+++ b/autosubmit/history/experiment_history.py
@@ -16,346 +16,363 @@
# You should have received a copy of the GNU General Public License
# along with Autosubmit. If not, see .
import traceback
+from time import time, sleep
+
import autosubmit.history.database_managers.database_models as Models
import autosubmit.history.utils as HUtils
-from time import time, sleep
-from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager
-from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR
-from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, TwoDimWrapperDistributionStrategy, GeneralizedWrapperDistributionStrategy
-from .data_classes.job_data import JobData
+from autosubmitconfigparser.config.basicconfig import BasicConfig
+from log.log import Log
from .data_classes.experiment_run import ExperimentRun
-from .platform_monitor.slurm_monitor import SlurmMonitor
+from .data_classes.job_data import JobData
+from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR
+from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager
from .internal_logging import Logging
-from log.log import Log
-from autosubmitconfigparser.config.basicconfig import BasicConfig
+from .platform_monitor.slurm_monitor import SlurmMonitor
+from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, \
+ TwoDimWrapperDistributionStrategy, GeneralizedWrapperDistributionStrategy
SECONDS_WAIT_PLATFORM = 60
+
class ExperimentHistory:
- def __init__(self, expid, jobdata_dir_path=DEFAULT_JOBDATA_DIR, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR):
- self.expid = expid
- BasicConfig.read()
- self._log = Logging(expid, BasicConfig.HISTORICAL_LOG_DIR)
- self._job_data_dir_path = BasicConfig.JOBDATA_DIR
- self._historiclog_dir_path = BasicConfig.HISTORICAL_LOG_DIR
- try:
- self.manager = ExperimentHistoryDbManager(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR)
- except Exception as exp:
- self._log.log(str(exp), traceback.format_exc())
- Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
- self.manager = None
-
- def initialize_database(self):
- try:
- self.manager.initialize()
- except Exception as exp:
- self._log.log(str(exp), traceback.format_exc())
- Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
-
- self.manager = None
-
- def is_header_ready(self):
- if self.manager:
- return self.manager.is_header_ready_db_version()
- return False
-
-
- def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="",
- member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""):
- try:
- next_counter = self._get_next_counter_by_job_name(job_name)
- current_experiment_run = self.manager.get_experiment_run_dc_with_max_id()
- job_data_dc = JobData(_id=0,
- counter=next_counter,
- job_name=job_name,
- submit=submit,
- status=status,
- rowtype=self._get_defined_rowtype(wrapper_code),
- ncpus=ncpus,
- wallclock=wallclock,
- qos=self._get_defined_queue_name(wrapper_queue, wrapper_code, qos),
- date=date,
- member=member,
- section=section,
- chunk=chunk,
- platform=platform,
- job_id=job_id,
- children=children,
- run_id=current_experiment_run.run_id)
- return self.manager.register_submitted_job_data_dc(job_data_dc)
- except Exception as exp:
- self._log.log(str(exp), traceback.format_exc())
- Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
-
- return None
-
- def write_start_time(self, job_name, start=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="",
- member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""):
- try:
- job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
- if not job_data_dc_last:
- job_data_dc_last = self.write_submit_time(job_name=job_name,
- status=status,
- ncpus=ncpus,
- wallclock=wallclock,
- qos=qos,
- date=date,
- member=member,
- section=section,
- chunk=chunk,
- platform=platform,
- job_id=job_id,
- wrapper_queue=wrapper_queue,
- wrapper_code=wrapper_code)
- self._log.log("write_start_time {0} start not found.".format(job_name))
- job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
- if not job_data_dc_last:
- raise Exception("Job {0} has not been found in the database.".format(job_name))
- job_data_dc_last.start = start
- job_data_dc_last.qos = self._get_defined_queue_name(wrapper_queue, wrapper_code, qos)
- job_data_dc_last.status = status
- job_data_dc_last.rowtype = self._get_defined_rowtype(wrapper_code)
- job_data_dc_last.job_id = job_id
- job_data_dc_last.children = children
- return self.manager.update_job_data_dc_by_id(job_data_dc_last)
- except Exception as exp:
- self._log.log(str(exp), traceback.format_exc())
- Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
-
-
- def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="",
- member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None,
- wrapper_queue=None, wrapper_code=None, children=""):
- try:
- job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
- if not job_data_dc_last:
- job_data_dc_last = self.write_submit_time(job_name=job_name,
- status=status,
- ncpus=ncpus,
- wallclock=wallclock,
- qos=qos,
- date=date,
- member=member,
- section=section,
- chunk=chunk,
- platform=platform,
- job_id=job_id,
- wrapper_queue=wrapper_queue,
- wrapper_code=wrapper_code,
- children=children)
- self._log.log("write_finish_time {0} submit not found.".format(job_name))
- job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
- if not job_data_dc_last:
- raise Exception("Job {0} has not been found in the database.".format(job_name))
- job_data_dc_last.finish = finish if finish > 0 else int(time())
- job_data_dc_last.status = status
- job_data_dc_last.job_id = job_id
- job_data_dc_last.rowstatus = Models.RowStatus.PENDING_PROCESS
- job_data_dc_last.out = out_file if out_file else ""
- job_data_dc_last.err = err_file if err_file else ""
- return self.manager.update_job_data_dc_by_id(job_data_dc_last)
- except Exception as exp:
- self._log.log(str(exp), traceback.format_exc())
- Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
-
-
- def write_platform_data_after_finish(self, job_data_dc, platform_obj):
- """
- Call it in a thread.
- """
- try:
- sleep(SECONDS_WAIT_PLATFORM)
- ssh_output = platform_obj.check_job_energy(job_data_dc.job_id)
- slurm_monitor = SlurmMonitor(ssh_output)
- self._verify_slurm_monitor(slurm_monitor, job_data_dc)
- job_data_dcs_in_wrapper = self.manager.get_job_data_dcs_last_by_wrapper_code(job_data_dc.wrapper_code)
- job_data_dcs_in_wrapper = sorted([job for job in job_data_dcs_in_wrapper if job.status == "COMPLETED"], key=lambda x: x._id)
- job_data_dcs_to_update = []
- if len(job_data_dcs_in_wrapper) > 0:
- info_handler = PlatformInformationHandler(StraightWrapperAssociationStrategy(self._historiclog_dir_path))
- job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor)
- if len(job_data_dcs_to_update) == 0:
- info_handler.strategy = TwoDimWrapperDistributionStrategy(self._historiclog_dir_path)
- job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor)
- if len(job_data_dcs_to_update) == 0:
- info_handler.strategy = GeneralizedWrapperDistributionStrategy(self._historiclog_dir_path)
- job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor)
- else:
- info_handler = PlatformInformationHandler(SingleAssociationStrategy(self._historiclog_dir_path))
- job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor)
- return self.manager.update_list_job_data_dc_by_each_id(job_data_dcs_to_update)
- except Exception as exp:
- self._log.log(str(exp), traceback.format_exc())
- Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
-
-
- def _verify_slurm_monitor(self, slurm_monitor, job_data_dc):
- try:
- if slurm_monitor.header.status not in ["COMPLETED", "FAILED"]:
- self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input),
- "Slurm status {0} is not COMPLETED nor FAILED for ID {1}.\n".format(slurm_monitor.header.status, slurm_monitor.header.name))
- Log.debug(f'Historical Database error: Slurm status {slurm_monitor.header.status} is not COMPLETED nor FAILED for ID {slurm_monitor.header.name}.')
- if not slurm_monitor.steps_plus_extern_approximate_header_energy():
- self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input),
- "Steps + extern != total energy for ID {0}. Number of steps {1}.\n".format(slurm_monitor.header.name, slurm_monitor.step_count))
- Log.debug(f'Historical Database error: Steps + extern != total energy for ID {slurm_monitor.header.name}. Number of steps {slurm_monitor.step_count}.')
- except Exception as exp:
- self._log.log(str(exp), traceback.format_exc())
- Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
-
-
- def process_status_changes(self, job_list=None, chunk_unit="NA", chunk_size=0, current_config="",create=False):
- """ Detect status differences between job_list and current job_data rows, and update. Creates a new run if necessary. """
- try:
+ def __init__(self, expid, jobdata_dir_path=DEFAULT_JOBDATA_DIR, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR):
+ self.expid = expid
+ BasicConfig.read()
+ self._log = Logging(expid, BasicConfig.HISTORICAL_LOG_DIR)
+ self._job_data_dir_path = BasicConfig.JOBDATA_DIR
+ self._historiclog_dir_path = BasicConfig.HISTORICAL_LOG_DIR
+ try:
+ self.manager = ExperimentHistoryDbManager(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR)
+ except Exception as exp:
+ self._log.log(str(exp), traceback.format_exc())
+ Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
+ self.manager = None
+
+ def initialize_database(self):
+ try:
+ self.manager.initialize()
+ except Exception as exp:
+ self._log.log(str(exp), traceback.format_exc())
+ Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
+
+ self.manager = None
+
+ def is_header_ready(self):
+ if self.manager:
+ return self.manager.is_header_ready_db_version()
+ return False
+
+ def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="",
+ member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None,
+ wrapper_code=None, children=""):
+
try:
+ next_counter = self._get_next_counter_by_job_name(job_name)
+ current_experiment_run = self.manager.get_experiment_run_dc_with_max_id()
+ job_data_dc = JobData(_id=0,
+ counter=next_counter,
+ job_name=job_name,
+ submit=submit,
+ status=status,
+ rowtype=self._get_defined_rowtype(wrapper_code),
+ ncpus=ncpus,
+ wallclock=wallclock,
+ qos=self._get_defined_queue_name(wrapper_queue, wrapper_code, qos),
+ date=date,
+ member=member,
+ section=section,
+ chunk=chunk,
+ platform=platform,
+ job_id=job_id,
+ children=children,
+ run_id=current_experiment_run.run_id)
+ return self.manager.register_submitted_job_data_dc(job_data_dc)
+ except Exception as exp:
+ self._log.log(str(exp), traceback.format_exc())
+ Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
+
+ return None
+
+ def write_start_time(self, job_name, start=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="",
+ member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None,
+ children=""):
+ try:
+ job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
+ if not job_data_dc_last:
+ job_data_dc_last = self.write_submit_time(job_name=job_name,
+ status=status,
+ ncpus=ncpus,
+ wallclock=wallclock,
+ qos=qos,
+ date=date,
+ member=member,
+ section=section,
+ chunk=chunk,
+ platform=platform,
+ job_id=job_id,
+ wrapper_queue=wrapper_queue,
+ wrapper_code=wrapper_code)
+ self._log.log("write_start_time {0} start not found.".format(job_name))
+ job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
+ if not job_data_dc_last:
+ raise Exception("Job {0} has not been found in the database.".format(job_name))
+ job_data_dc_last.start = start
+ job_data_dc_last.qos = self._get_defined_queue_name(wrapper_queue, wrapper_code, qos)
+ job_data_dc_last.status = status
+ job_data_dc_last.rowtype = self._get_defined_rowtype(wrapper_code)
+ job_data_dc_last.job_id = job_id
+ job_data_dc_last.children = children
+ return self.manager.update_job_data_dc_by_id(job_data_dc_last)
+ except Exception as exp:
+ self._log.log(str(exp), traceback.format_exc())
+ Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
+
+ def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="",
+ member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None,
+ wrapper_queue=None, wrapper_code=None, children=""):
+ try:
+ job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
+ if not job_data_dc_last:
+ job_data_dc_last = self.write_submit_time(job_name=job_name,
+ status=status,
+ ncpus=ncpus,
+ wallclock=wallclock,
+ qos=qos,
+ date=date,
+ member=member,
+ section=section,
+ chunk=chunk,
+ platform=platform,
+ job_id=job_id,
+ wrapper_queue=wrapper_queue,
+ wrapper_code=wrapper_code,
+ children=children)
+ self._log.log("write_finish_time {0} submit not found.".format(job_name))
+ job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
+ if not job_data_dc_last:
+ raise Exception("Job {0} has not been found in the database.".format(job_name))
+ job_data_dc_last.finish = finish if finish > 0 else int(time())
+ job_data_dc_last.status = status
+ job_data_dc_last.job_id = job_id
+ job_data_dc_last.rowstatus = Models.RowStatus.PENDING_PROCESS
+ job_data_dc_last.out = out_file if out_file else ""
+ job_data_dc_last.err = err_file if err_file else ""
+ return self.manager.update_job_data_dc_by_id(job_data_dc_last)
+ except Exception as exp:
+ self._log.log(str(exp), traceback.format_exc())
+ Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
+
+ def write_platform_data_after_finish(self, job_data_dc, platform_obj):
+ """
+ Call it in a thread.
+ """
+ try:
+ sleep(SECONDS_WAIT_PLATFORM)
+ ssh_output = platform_obj.check_job_energy(job_data_dc.job_id)
+ slurm_monitor = SlurmMonitor(ssh_output)
+ self._verify_slurm_monitor(slurm_monitor, job_data_dc)
+ job_data_dcs_in_wrapper = self.manager.get_job_data_dcs_last_by_wrapper_code(job_data_dc.wrapper_code)
+ job_data_dcs_in_wrapper = sorted([job for job in job_data_dcs_in_wrapper if job.status == "COMPLETED"],
+ key=lambda x: x._id)
+ job_data_dcs_to_update = []
+ if len(job_data_dcs_in_wrapper) > 0:
+ info_handler = PlatformInformationHandler(
+ StraightWrapperAssociationStrategy(self._historiclog_dir_path))
+ job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper,
+ slurm_monitor)
+ if len(job_data_dcs_to_update) == 0:
+ info_handler.strategy = TwoDimWrapperDistributionStrategy(self._historiclog_dir_path)
+ job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper,
+ slurm_monitor)
+ if len(job_data_dcs_to_update) == 0:
+ info_handler.strategy = GeneralizedWrapperDistributionStrategy(self._historiclog_dir_path)
+ job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper,
+ slurm_monitor)
+ else:
+ info_handler = PlatformInformationHandler(SingleAssociationStrategy(self._historiclog_dir_path))
+ job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper,
+ slurm_monitor)
+ return self.manager.update_list_job_data_dc_by_each_id(job_data_dcs_to_update)
+ except Exception as exp:
+ self._log.log(str(exp), traceback.format_exc())
+ Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
+
+ def _verify_slurm_monitor(self, slurm_monitor, job_data_dc):
+ try:
+ if slurm_monitor.header.status not in ["COMPLETED", "FAILED"]:
+ self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name,
+ slurm_monitor.original_input),
+ "Slurm status {0} is not COMPLETED nor FAILED for ID {1}.\n".format(
+ slurm_monitor.header.status, slurm_monitor.header.name))
+ Log.debug(
+ f'Historical Database error: Slurm status {slurm_monitor.header.status} is not COMPLETED nor FAILED for ID {slurm_monitor.header.name}.')
+ if not slurm_monitor.steps_plus_extern_approximate_header_energy():
+ self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name,
+ slurm_monitor.original_input),
+ "Steps + extern != total energy for ID {0}. Number of steps {1}.\n".format(
+ slurm_monitor.header.name, slurm_monitor.step_count))
+ Log.debug(
+ f'Historical Database error: Steps + extern != total energy for ID {slurm_monitor.header.name}. Number of steps {slurm_monitor.step_count}.')
+ except Exception as exp:
+ self._log.log(str(exp), traceback.format_exc())
+ Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
+
+ def process_status_changes(self, job_list=None, chunk_unit="NA", chunk_size=0, current_config="", create=False):
+ """ Detect status differences between job_list and current job_data rows, and update. Creates a new run if necessary. """
+ try:
+ try:
+ current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id()
+ update_these_changes = self._get_built_list_of_changes(job_list)
+ except:
+ current_experiment_run_dc = 0
+ update_these_changes = []
+ # ("no runs")
+ should_create_new_run = self.should_we_create_a_new_run(job_list, len(update_these_changes),
+ current_experiment_run_dc, chunk_unit, chunk_size,
+ create)
+ if len(update_these_changes) > 0 and should_create_new_run == False:
+ self.manager.update_many_job_data_change_status(update_these_changes)
+ if should_create_new_run:
+ return self.create_new_experiment_run(chunk_unit, chunk_size, current_config, job_list)
+ return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list)
+ except Exception as exp:
+ self._log.log(str(exp), traceback.format_exc())
+ Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
+
+ def _get_built_list_of_changes(self, job_list):
+ """ Return: List of (current timestamp, current datetime str, status, rowstatus, id in job_data). One tuple per change. """
+ job_data_dcs = self.detect_changes_in_job_list(job_list)
+ return [(HUtils.get_current_datetime(), job.status, Models.RowStatus.CHANGED, job._id) for job in job_data_dcs]
+
+ def process_job_list_changes_to_experiment_totals(self, job_list=None):
+ """ Updates current experiment_run row with totals calculated from job_list. """
+ try:
+ current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id()
+ return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list)
+ except Exception as exp:
+ self._log.log(str(exp), traceback.format_exc())
+ Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
+
+ def should_we_create_a_new_run(self, job_list, changes_count, current_experiment_run_dc, new_chunk_unit,
+ new_chunk_size, create=False):
+ if create:
+ return True
+ elif not create and self.expid[0].lower() != "t":
+ if len(job_list) != current_experiment_run_dc.total:
+ return True
+ if changes_count > int(self._get_date_member_completed_count(job_list)):
+ return True
+ return self._chunk_config_has_changed(current_experiment_run_dc, new_chunk_unit, new_chunk_size)
+
+ def _chunk_config_has_changed(self, current_exp_run_dc, new_chunk_unit, new_chunk_size):
+ if not current_exp_run_dc:
+ return True
+ if current_exp_run_dc.chunk_unit != new_chunk_unit or current_exp_run_dc.chunk_size != new_chunk_size:
+ return True
+ return False
+
+ def update_counts_on_experiment_run_dc(self, experiment_run_dc, job_list=None):
+ """ Return updated row as Models.ExperimentRun. """
+ status_counts = self.get_status_counts_from_job_list(job_list)
+ experiment_run_dc.completed = status_counts[HUtils.SupportedStatus.COMPLETED]
+ experiment_run_dc.failed = status_counts[HUtils.SupportedStatus.FAILED]
+ experiment_run_dc.queuing = status_counts[HUtils.SupportedStatus.QUEUING]
+ experiment_run_dc.submitted = status_counts[HUtils.SupportedStatus.SUBMITTED]
+ experiment_run_dc.running = status_counts[HUtils.SupportedStatus.RUNNING]
+ experiment_run_dc.suspended = status_counts[HUtils.SupportedStatus.SUSPENDED]
+ experiment_run_dc.total = status_counts["TOTAL"]
+ return self.manager.update_experiment_run_dc_by_id(experiment_run_dc)
+
+ def finish_current_experiment_run(self):
+ if self.manager.is_there_a_last_experiment_run():
current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id()
- update_these_changes = self._get_built_list_of_changes(job_list)
- except:
- current_experiment_run_dc = 0
- update_these_changes = []
- #("no runs")
- should_create_new_run = self.should_we_create_a_new_run(job_list, len(update_these_changes), current_experiment_run_dc, chunk_unit, chunk_size,create)
- if len(update_these_changes) > 0 and should_create_new_run == False:
- self.manager.update_many_job_data_change_status(update_these_changes)
- if should_create_new_run:
- return self.create_new_experiment_run(chunk_unit, chunk_size, current_config, job_list)
- return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list)
- except Exception as exp:
- self._log.log(str(exp), traceback.format_exc())
- Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
-
-
- def _get_built_list_of_changes(self, job_list):
- """ Return: List of (current timestamp, current datetime str, status, rowstatus, id in job_data). One tuple per change. """
- job_data_dcs = self.detect_changes_in_job_list(job_list)
- return [(HUtils.get_current_datetime(), job.status, Models.RowStatus.CHANGED, job._id) for job in job_data_dcs]
-
- def process_job_list_changes_to_experiment_totals(self, job_list=None):
- """ Updates current experiment_run row with totals calculated from job_list. """
- try:
- current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id()
- return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list)
- except Exception as exp:
- self._log.log(str(exp), traceback.format_exc())
- Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}')
-
-
- def should_we_create_a_new_run(self, job_list, changes_count, current_experiment_run_dc, new_chunk_unit, new_chunk_size,create=False):
- if create:
- return True
- elif not create and self.expid[0].lower() != "t":
- if len(job_list) != current_experiment_run_dc.total:
- return True
- if changes_count > int(self._get_date_member_completed_count(job_list)):
- return True
- return self._chunk_config_has_changed(current_experiment_run_dc, new_chunk_unit, new_chunk_size)
-
- def _chunk_config_has_changed(self, current_exp_run_dc, new_chunk_unit, new_chunk_size):
- if not current_exp_run_dc:
- return True
- if current_exp_run_dc.chunk_unit != new_chunk_unit or current_exp_run_dc.chunk_size != new_chunk_size:
- return True
- return False
-
- def update_counts_on_experiment_run_dc(self, experiment_run_dc, job_list=None):
- """ Return updated row as Models.ExperimentRun. """
- status_counts = self.get_status_counts_from_job_list(job_list)
- experiment_run_dc.completed = status_counts[HUtils.SupportedStatus.COMPLETED]
- experiment_run_dc.failed = status_counts[HUtils.SupportedStatus.FAILED]
- experiment_run_dc.queuing = status_counts[HUtils.SupportedStatus.QUEUING]
- experiment_run_dc.submitted = status_counts[HUtils.SupportedStatus.SUBMITTED]
- experiment_run_dc.running = status_counts[HUtils.SupportedStatus.RUNNING]
- experiment_run_dc.suspended = status_counts[HUtils.SupportedStatus.SUSPENDED]
- experiment_run_dc.total = status_counts["TOTAL"]
- return self.manager.update_experiment_run_dc_by_id(experiment_run_dc)
-
- def finish_current_experiment_run(self):
- if self.manager.is_there_a_last_experiment_run():
- current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id()
- current_experiment_run_dc.finish = int(time())
- return self.manager.update_experiment_run_dc_by_id(current_experiment_run_dc)
- return None
-
- def create_new_experiment_run(self, chunk_unit="NA", chunk_size=0, current_config="", job_list=None):
- """ Also writes the finish timestamp of the previous run. """
- self.finish_current_experiment_run()
- return self._create_new_experiment_run_dc_with_counts(chunk_unit=chunk_unit, chunk_size=chunk_size, current_config=current_config, job_list=job_list)
-
- def _create_new_experiment_run_dc_with_counts(self, chunk_unit, chunk_size, current_config="", job_list=None):
- """ Create new experiment_run row and return the new Models.ExperimentRun data class from database. """
- status_counts = self.get_status_counts_from_job_list(job_list)
- experiment_run_dc = ExperimentRun(0,
- chunk_unit=chunk_unit,
- chunk_size=chunk_size,
- metadata=current_config,
- start=int(time()),
- completed=status_counts[HUtils.SupportedStatus.COMPLETED],
- total=status_counts["TOTAL"],
- failed=status_counts[HUtils.SupportedStatus.FAILED],
- queuing=status_counts[HUtils.SupportedStatus.QUEUING],
- running=status_counts[HUtils.SupportedStatus.RUNNING],
- submitted=status_counts[HUtils.SupportedStatus.SUBMITTED],
- suspended=status_counts[HUtils.SupportedStatus.SUSPENDED])
- return self.manager.register_experiment_run_dc(experiment_run_dc)
-
- def detect_changes_in_job_list(self, job_list):
- """ Detect changes in job_list compared to the current contents of job_data table. Returns a list of JobData data classes where the status of each item is the new status."""
- job_name_to_job = {str(job.name): job for job in job_list}
- current_job_data_dcs = self.manager.get_all_last_job_data_dcs()
- differences = []
- for job_dc in current_job_data_dcs:
- if job_dc.job_name in job_name_to_job:
- if job_dc.status != job_name_to_job[job_dc.job_name].status_str:
- if not (job_dc.status in ["COMPLETED", "FAILED"] and job_name_to_job[job_dc.job_name].status_str in ["WAITING", "READY"]):
- # If the job is not changing from a finalized status to a starting status
- job_dc.status = job_name_to_job[job_dc.job_name].status_str
- differences.append(job_dc)
- return differences
-
- def _get_defined_rowtype(self, code):
- if code:
- return code
- else:
- return Models.RowType.NORMAL
-
- def _get_defined_queue_name(self, wrapper_queue, wrapper_code, qos):
- if wrapper_code and wrapper_code > 2 and wrapper_queue is not None and len(str(wrapper_queue)) > 0:
- return wrapper_queue
- return qos
-
- def _get_next_counter_by_job_name(self, job_name):
- """ Return the counter attribute from the latest job data row by job_name. """
- job_data_dc = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
- max_counter = self.manager.get_job_data_max_counter()
- if job_data_dc:
- return max(max_counter, job_data_dc.counter + 1)
- else:
- return max_counter
-
- def _get_date_member_completed_count(self, job_list):
- """ Each item in the job_list must have attributes: date, member, status_str. """
- job_list = job_list if job_list else []
- return sum(1 for job in job_list if job.date is not None and job.member is not None and job.status_str == HUtils.SupportedStatus.COMPLETED)
-
- def get_status_counts_from_job_list(self, job_list):
- """
- Return dict with keys COMPLETED, FAILED, QUEUING, SUBMITTED, RUNNING, SUSPENDED, TOTAL.
- """
- result = {
- HUtils.SupportedStatus.COMPLETED: 0,
- HUtils.SupportedStatus.FAILED: 0,
- HUtils.SupportedStatus.QUEUING: 0,
- HUtils.SupportedStatus.SUBMITTED: 0,
- HUtils.SupportedStatus.RUNNING: 0,
- HUtils.SupportedStatus.SUSPENDED: 0,
- "TOTAL": 0
- }
-
- if not job_list:
- job_list = []
-
- for job in job_list:
- if job.status_str in result:
- result[job.status_str] += 1
- result["TOTAL"] = len(job_list)
- return result
+ current_experiment_run_dc.finish = int(time())
+ return self.manager.update_experiment_run_dc_by_id(current_experiment_run_dc)
+ return None
+
+ def create_new_experiment_run(self, chunk_unit="NA", chunk_size=0, current_config="", job_list=None):
+ """ Also writes the finish timestamp of the previous run. """
+ self.finish_current_experiment_run()
+ return self._create_new_experiment_run_dc_with_counts(chunk_unit=chunk_unit, chunk_size=chunk_size,
+ current_config=current_config, job_list=job_list)
+
+ def _create_new_experiment_run_dc_with_counts(self, chunk_unit, chunk_size, current_config="", job_list=None):
+ """ Create new experiment_run row and return the new Models.ExperimentRun data class from database. """
+ status_counts = self.get_status_counts_from_job_list(job_list)
+ experiment_run_dc = ExperimentRun(0,
+ chunk_unit=chunk_unit,
+ chunk_size=chunk_size,
+ metadata=current_config,
+ start=int(time()),
+ completed=status_counts[HUtils.SupportedStatus.COMPLETED],
+ total=status_counts["TOTAL"],
+ failed=status_counts[HUtils.SupportedStatus.FAILED],
+ queuing=status_counts[HUtils.SupportedStatus.QUEUING],
+ running=status_counts[HUtils.SupportedStatus.RUNNING],
+ submitted=status_counts[HUtils.SupportedStatus.SUBMITTED],
+ suspended=status_counts[HUtils.SupportedStatus.SUSPENDED])
+ return self.manager.register_experiment_run_dc(experiment_run_dc)
+
+ def detect_changes_in_job_list(self, job_list):
+ """ Detect changes in job_list compared to the current contents of job_data table. Returns a list of JobData data classes where the status of each item is the new status."""
+ job_name_to_job = {str(job.name): job for job in job_list}
+ current_job_data_dcs = self.manager.get_all_last_job_data_dcs()
+ differences = []
+ for job_dc in current_job_data_dcs:
+ if job_dc.job_name in job_name_to_job:
+ if job_dc.status != job_name_to_job[job_dc.job_name].status_str:
+ if not (job_dc.status in ["COMPLETED", "FAILED"] and job_name_to_job[
+ job_dc.job_name].status_str in ["WAITING", "READY"]):
+ # If the job is not changing from a finalized status to a starting status
+ job_dc.status = job_name_to_job[job_dc.job_name].status_str
+ differences.append(job_dc)
+ return differences
+
+ def _get_defined_rowtype(self, code):
+ if code:
+ return code
+ else:
+ return Models.RowType.NORMAL
+
+ def _get_defined_queue_name(self, wrapper_queue, wrapper_code, qos):
+ if wrapper_code and wrapper_code > 2 and wrapper_queue is not None and len(str(wrapper_queue)) > 0:
+ return wrapper_queue
+ return qos
+
+ def _get_next_counter_by_job_name(self, job_name):
+ """ Return the counter attribute from the latest job data row by job_name. """
+ job_data_dc = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
+ max_counter = self.manager.get_job_data_max_counter()
+ if job_data_dc:
+ return max(max_counter, job_data_dc.counter + 1)
+ else:
+ return max_counter
+
+ def _get_date_member_completed_count(self, job_list):
+ """ Each item in the job_list must have attributes: date, member, status_str. """
+ job_list = job_list if job_list else []
+ return sum(1 for job in job_list if
+ job.date is not None and job.member is not None and job.status_str == HUtils.SupportedStatus.COMPLETED)
+
+ def get_status_counts_from_job_list(self, job_list):
+ """
+ Return dict with keys COMPLETED, FAILED, QUEUING, SUBMITTED, RUNNING, SUSPENDED, TOTAL.
+ """
+ result = {
+ HUtils.SupportedStatus.COMPLETED: 0,
+ HUtils.SupportedStatus.FAILED: 0,
+ HUtils.SupportedStatus.QUEUING: 0,
+ HUtils.SupportedStatus.SUBMITTED: 0,
+ HUtils.SupportedStatus.RUNNING: 0,
+ HUtils.SupportedStatus.SUSPENDED: 0,
+ "TOTAL": 0
+ }
+
+ if not job_list:
+ job_list = []
+
+ for job in job_list:
+ if job.status_str in result:
+ result[job.status_str] += 1
+ result["TOTAL"] = len(job_list)
+ return result
diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py
index bed0521b36fbeb599a39b89910dda5fbb9418f45..bb6e3244b7e341d165df6c8ea002a778f4faf10e 100644
--- a/autosubmit/job/job.py
+++ b/autosubmit/job/job.py
@@ -242,7 +242,12 @@ class Job(object):
self.delete_when_edgeless = False
# hetjobs
self.het = None
-
+ self.updated_log = False
+ self.ready_start_date = None
+ self.log_retrieved = False
+ self.start_time_written = False
+ self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time
+ self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial
def _init_runtime_parameters(self):
# hetjobs
self.het = {'HETSIZE': 0}
@@ -255,6 +260,8 @@ class Job(object):
self._processors = '1'
self._memory = ''
self._memory_per_task = ''
+ self.log_retrieved = False
+ self.start_time_placeholder = ""
@property
@autosubmit_parameter(name='tasktype')
@@ -902,7 +909,7 @@ class Job(object):
"""
return self.parents.__len__()
- def _get_from_stat(self, index):
+ def _get_from_stat(self, index, fail_count =-1):
"""
Returns value from given row index position in STAT file associated to job
@@ -911,7 +918,11 @@ class Job(object):
:return: value in index position
:rtype: int
"""
- logname = os.path.join(self._tmp_path, self.name + '_STAT')
+ if fail_count == -1:
+ logname = os.path.join(self._tmp_path, self.name + '_STAT')
+ else:
+ fail_count = str(fail_count)
+ logname = os.path.join(self._tmp_path, self.name + '_STAT_' + fail_count)
if os.path.exists(logname):
lines = open(logname).readlines()
if len(lines) >= index + 1:
@@ -941,23 +952,23 @@ class Job(object):
lst.append(parse_date(fields[index]))
return lst
- def check_end_time(self):
+ def check_end_time(self, fail_count=-1):
"""
Returns end time from stat file
:return: date and time
:rtype: str
"""
- return self._get_from_stat(1)
+ return self._get_from_stat(1, fail_count)
- def check_start_time(self):
+ def check_start_time(self, fail_count=-1):
"""
Returns job's start time
:return: start time
:rtype: str
"""
- return self._get_from_stat(0)
+ return self._get_from_stat(0,fail_count)
def check_retrials_end_time(self):
"""
@@ -1003,220 +1014,108 @@ class Job(object):
retrials_list.insert(0, retrial_dates)
return retrials_list
- def retrieve_logfiles_unthreaded(self, copy_remote_logs, local_logs):
- remote_logs = (self.script_name + ".out."+str(self.fail_count), self.script_name + ".err."+str(self.fail_count))
- out_exist = False
- err_exist = False
- retries = 3
- sleeptime = 0
- i = 0
- no_continue = False
- try:
- while (not out_exist and not err_exist) and i < retries:
- try:
- out_exist = self._platform.check_file_exists(
- remote_logs[0], True)
- except IOError as e:
- out_exist = False
- try:
- err_exist = self._platform.check_file_exists(
- remote_logs[1], True)
- except IOError as e:
- err_exists = False
- if not out_exist or not err_exist:
- sleeptime = sleeptime + 5
- i = i + 1
- sleep(sleeptime)
- if i >= retries:
- if not out_exist or not err_exist:
- Log.printlog("Failed to retrieve log files {1} and {2} e=6001".format(
- retries, remote_logs[0], remote_logs[1]))
- return
- if str(copy_remote_logs).lower() == "true":
- # unifying names for log files
- if remote_logs != local_logs:
- self.synchronize_logs(
- self._platform, remote_logs, local_logs)
- remote_logs = copy.deepcopy(local_logs)
- self._platform.get_logs_files(self.expid, remote_logs)
- # Update the logs with Autosubmit Job ID Brand
- try:
- for local_log in local_logs:
- self._platform.write_jobid(self.id, os.path.join(
- self._tmp_path, 'LOG_' + str(self.expid), local_log))
- except BaseException as e:
- Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(
- str(e), self.name))
- except AutosubmitError as e:
- Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format(
- str(e), self.name), 6001)
- except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error
- Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(
- str(e), self.name), 6001)
- return
-
- @threaded
- def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = 0,job_id="",auth_password=None, local_auth_password = None):
- as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory())
- as_conf.reload(force_load=True)
- max_retrials = self.retrials
- max_logs = 0
- last_log = 0
- stat_file = self.script_name[:-4] + "_STAT_"
- lang = locale.getlocale()[1]
- if lang is None:
- lang = locale.getdefaultlocale()[1]
- if lang is None:
- lang = 'UTF-8'
- retries = 2
- count = 0
- success = False
- error_message = ""
- platform = None
- while (count < retries) and not success:
- try:
- as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory())
- as_conf.reload(force_load=True)
- max_retrials = self.retrials
- max_logs = int(max_retrials) - fail_count
- last_log = int(max_retrials) - fail_count
- submitter = self._get_submitter(as_conf)
- submitter.load_platforms(as_conf, auth_password=auth_password, local_auth_password=local_auth_password)
- platform = submitter.platforms[platform_name]
- platform.test_connection()
- success = True
- except BaseException as e:
- error_message = str(e)
- sleep(5)
- pass
- count = count + 1
- if not success:
- raise AutosubmitError(
- "Couldn't load the autosubmit platforms, seems that the local platform has some issue\n:{0}".format(
- error_message), 6006)
+ def get_new_remotelog_name(self, count = -1):
+ """
+ Checks if remote log file exists on remote host
+ if it exists, remote_log variable is updated
+ :param
+ """
+ if count == -1:
+ count = self._fail_count
try:
- if self.wrapper_type is not None and self.wrapper_type == "vertical":
- found = False
- retrials = 0
- while retrials < 3 and not found:
- if platform.check_stat_file_by_retrials(stat_file + str(max_logs)):
- found = True
- retrials = retrials + 1
- for i in range(max_logs-1,-1,-1):
- if platform.check_stat_file_by_retrials(stat_file + str(i)):
- last_log = i
- else:
- break
- remote_logs = (self.script_name + ".out." + str(last_log), self.script_name + ".err." + str(last_log))
-
- else:
- remote_logs = (self.script_name + ".out."+str(fail_count), self.script_name + ".err." + str(fail_count))
-
+ remote_logs = (f"{self.script_name}.out.{count}", f"{self.script_name}.err.{count}")
except BaseException as e:
- Log.printlog(
- "{0} \n Couldn't connect to the remote platform for {1} job err/out files. ".format(str(e), self.name), 6001)
- out_exist = False
- err_exist = False
- retries = 3
- i = 0
+ remote_logs = ""
+ Log.printlog(f"Trace {e} \n Failed to retrieve log file for job {self.name}", 6000)
+ return remote_logs
+
+ def check_remote_log_exists(self, platform):
try:
- while (not out_exist and not err_exist) and i < retries:
- try:
- out_exist = platform.check_file_exists(
- remote_logs[0], False, sleeptime=0, max_retries=1)
- except IOError as e:
- out_exist = False
+ out_exist = platform.check_file_exists(self.remote_logs[0], False, sleeptime=0, max_retries=1)
+ except IOError:
+ Log.debug(f'Output log {self.remote_logs[0]} still does not exist')
+ out_exist = False
+ try:
+ err_exist = platform.check_file_exists(self.remote_logs[1], False, sleeptime=0, max_retries=1)
+ except IOError:
+ Log.debug(f'Error log {self.remote_logs[1]} still does not exist')
+ err_exist = False
+ return out_exist or err_exist
+
+ def retrieve_external_retrials_logfiles(self, platform):
+ log_retrieved = False
+ self.remote_logs = self.get_new_remotelog_name()
+ if not self.remote_logs:
+ self.log_retrieved = False
+ else:
+ if self.check_remote_log_exists(platform):
try:
- err_exist = platform.check_file_exists(
- remote_logs[1], False, sleeptime=0, max_retries=1)
- except IOError as e:
- err_exist = False
- if not out_exist or not err_exist:
- i = i + 1
- sleep(5)
+ self.synchronize_logs(platform, self.remote_logs, self.local_logs)
+ remote_logs = copy.deepcopy(self.local_logs)
+ platform.get_logs_files(self.expid, remote_logs)
+ log_retrieved = True
+ except BaseException:
+ log_retrieved = False
+ self.log_retrieved = log_retrieved
+
+ def retrieve_internal_retrials_logfiles(self, platform):
+ log_retrieved = False
+ original = copy.deepcopy(self.local_logs)
+ for i in range(0, int(self.retrials + 1)):
+ if i > 0:
+ self.local_logs = (original[0][:-4] + "_{0}".format(i) + ".out", original[1][:-4] + "_{0}".format(i) + ".err")
+ self.remote_logs = self.get_new_remotelog_name(i)
+ if not self.remote_logs:
+ self.log_retrieved = False
+ else:
+ if self.check_remote_log_exists(platform):
try:
- platform.restore_connection()
- except BaseException as e:
- Log.printlog("{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(
- str(e), self.name), 6001)
- if i >= retries:
- if not out_exist or not err_exist:
- Log.printlog("Failed to retrieve log files {1} and {2} e=6001".format(
- retries, remote_logs[0], remote_logs[1]))
- return
- if copy_remote_logs:
- l_log = copy.deepcopy(local_logs)
- # unifying names for log files
- if remote_logs != local_logs:
- if self.wrapper_type == "vertical": # internal_Retrial mechanism
- log_start = last_log
- exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid)
- tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR)
- time_stamp = "1970"
- total_stats = ["", "","FAILED"]
- while log_start <= max_logs:
- try:
- if platform.get_stat_file_by_retrials(stat_file+str(max_logs)):
- with open(os.path.join(tmp_path,stat_file+str(max_logs)), 'r+') as f:
- total_stats = [f.readline()[:-1],f.readline()[:-1],f.readline()[:-1]]
- try:
- total_stats[0] = float(total_stats[0])
- total_stats[1] = float(total_stats[1])
- except Exception as e:
- total_stats[0] = int(str(total_stats[0]).split('.')[0])
- total_stats[1] = int(str(total_stats[1]).split('.')[0])
- if max_logs != ( int(max_retrials) - fail_count ):
- time_stamp = date2str(datetime.datetime.fromtimestamp(total_stats[0]), 'S')
- else:
- with open(os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP'), 'rb+') as f2:
- for line in f2.readlines():
- if len(line) > 0:
- line = line.decode(lang)
- time_stamp = line.split(" ")[0]
-
- self.write_total_stat_by_retries(total_stats,max_logs == ( int(max_retrials) - fail_count ))
- platform.remove_stat_file_by_retrials(stat_file+str(max_logs))
- l_log = (self.script_name[:-4] + "." + time_stamp + ".out",self.script_name[:-4] + "." + time_stamp + ".err")
- r_log = ( remote_logs[0][:-1]+str(max_logs) , remote_logs[1][:-1]+str(max_logs) )
- self.synchronize_logs(platform, r_log, l_log,last = False)
- platform.get_logs_files(self.expid, l_log)
- try:
- for local_log in l_log:
- platform.write_jobid(job_id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log))
- except BaseException as e:
- pass
- max_logs = max_logs - 1
- else:
- max_logs = -1 # exit, no more logs
- except BaseException as e:
- max_logs = -1 # exit
- local_logs = copy.deepcopy(l_log)
- remote_logs = copy.deepcopy(local_logs)
- if self.wrapper_type != "vertical":
- self.synchronize_logs(platform, remote_logs, local_logs)
- remote_logs = copy.deepcopy(local_logs)
+ self.synchronize_logs(platform, self.remote_logs, self.local_logs)
+ remote_logs = copy.deepcopy(self.local_logs)
platform.get_logs_files(self.expid, remote_logs)
- # Update the logs with Autosubmit Job ID Brand
- try:
- for local_log in local_logs:
- platform.write_jobid(job_id, os.path.join(
- self._tmp_path, 'LOG_' + str(self.expid), local_log))
- except BaseException as e:
- Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(
- str(e), self.name))
- with suppress(Exception):
- platform.closeConnection()
- except AutosubmitError as e:
- Log.printlog("Trace {0} \nFailed to retrieve log file for job {1}".format(
- e.message, self.name), 6001)
- with suppress(Exception):
- platform.closeConnection()
- except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error
- Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(
- e.message, self.name), 6001)
- with suppress(Exception):
- platform.closeConnection()
- return
+ log_retrieved = True
+ except BaseException:
+ log_retrieved = False
+ self.log_retrieved = log_retrieved
+ def retrieve_logfiles(self, platform, raise_error=False):
+ """
+ Retrieves log files from remote host meant to be used inside a process.
+ :param platform: platform that is calling the function, already connected.
+ :param raise_error: boolean to raise an error if the logs are not retrieved
+ :return:
+ """
+ backup_logname = copy.copy(self.local_logs)
+
+ if self.wrapper_type == "vertical":
+ stat_file = self.script_name[:-4] + "_STAT_"
+ self.retrieve_internal_retrials_logfiles(platform)
+ else:
+ stat_file = self.script_name[:-4] + "_STAT"
+ self.retrieve_external_retrials_logfiles(platform)
+
+ if not self.log_retrieved:
+ self.local_logs = backup_logname
+ Log.printlog("Failed to retrieve logs for job {0}".format(self.name), 6000)
+ if raise_error:
+ raise
+ else:
+ # Update the logs with Autosubmit Job ID Brand
+ try:
+ for local_log in self.local_logs:
+ platform.write_jobid(self.id, os.path.join(
+ self._tmp_path, 'LOG_' + str(self.expid), local_log))
+ except BaseException as e:
+ Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(str(e), self.name))
+ # write stats
+ if self.wrapper_type == "vertical": # Disable AS retrials for vertical wrappers to use internal ones
+ for i in range(0,int(self.retrials+1)):
+ if self.platform.get_stat_file(self.name, stat_file, count=i):
+ self.write_vertical_time(i)
+ self.inc_fail_count()
+ else:
+ self.platform.get_stat_file(self.name, stat_file)
+ self.write_start_time(from_stat_file=True)
+ self.write_end_time(self.status == Status.COMPLETED)
def parse_time(self,wallclock):
regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?')
@@ -1272,7 +1171,7 @@ class Job(object):
:param failed_file: boolean, if True, checks if the job failed
:return:
"""
- copy_remote_logs = as_conf.get_copy_remote_logs()
+ self.log_avaliable = False
previous_status = self.status
self.prev_status = previous_status
new_status = self.new_status
@@ -1320,28 +1219,23 @@ class Job(object):
# after checking the jobs , no job should have the status "submitted"
Log.printlog("Job {0} in SUBMITTED status. This should never happen on this step..".format(
self.name), 6008)
- if previous_status != Status.RUNNING and self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN,
- Status.RUNNING]:
- self.write_start_time()
- if previous_status == Status.HELD and self.status in [Status.SUBMITTED, Status.QUEUING, Status.RUNNING]:
- self.write_submit_time()
+ if self.status in [Status.COMPLETED, Status.FAILED]:
+ self.updated_log = False
+
+ # # Write start_time() if not already written and job is running, completed or failed
+ # if self.status in [Status.RUNNING, Status.COMPLETED, Status.FAILED] and not self.start_time_written:
+ # self.write_start_time()
+
# Updating logs
if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]:
- # New thread, check if file exist
- expid = copy.deepcopy(self.expid)
- platform_name = copy.deepcopy(self.platform_name)
- local_logs = copy.deepcopy(self.local_logs)
- remote_logs = copy.deepcopy(self.remote_logs)
- if as_conf.get_disable_recovery_threads(self.platform.name) == "true":
- self.retrieve_logfiles_unthreaded(copy_remote_logs, local_logs)
+ if str(as_conf.platforms_data.get(self.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "true":
+ self.retrieve_logfiles(self.platform)
else:
- self.retrieve_logfiles(copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = copy.copy(self.fail_count),job_id=self.id,auth_password=self._platform.pw, local_auth_password=self._platform.pw)
- if self.wrapper_type == "vertical":
- max_logs = int(self.retrials)
- for i in range(0,max_logs):
- self.inc_fail_count()
- else:
- self.write_end_time(self.status == Status.COMPLETED)
+ self.platform.add_job_to_log_recover(self)
+
+
+
+
return self.status
@staticmethod
@@ -1735,6 +1629,14 @@ class Job(object):
def update_dict_parameters(self,as_conf):
self.retrials = as_conf.jobs_data.get(self.section,{}).get("RETRIALS", as_conf.experiment_data.get("CONFIG",{}).get("RETRIALS", 0))
+ for wrapper_data in ( wrapper for wrapper in as_conf.experiment_data.get("WRAPPERS",{}).values() if type(wrapper) is dict):
+ jobs_in_wrapper = wrapper_data.get("JOBS_IN_WRAPPER", "").upper()
+ if "," in jobs_in_wrapper:
+ jobs_in_wrapper = jobs_in_wrapper.split(",")
+ else:
+ jobs_in_wrapper = jobs_in_wrapper.split(" ")
+ if self.section.upper() in jobs_in_wrapper:
+ self.retrials = wrapper_data.get("RETRIALS", self.retrials)
self.splits = as_conf.jobs_data.get(self.section,{}).get("SPLITS", None)
self.delete_when_edgeless = as_conf.jobs_data.get(self.section,{}).get("DELETE_WHEN_EDGELESS", True)
self.dependencies = str(as_conf.jobs_data.get(self.section,{}).get("DEPENDENCIES",""))
@@ -2237,76 +2139,65 @@ class Job(object):
str(set(parameters) - set(variables))), 5013)
return out
- def write_submit_time(self, enabled=False, hold=False):
+ def write_submit_time(self, hold=False, enable_vertical_write=False, wrapper_submit_time=None):
# type: (bool, bool) -> None
"""
Writes submit date and time to TOTAL_STATS file. It doesn't write if hold is True.
"""
- # print(traceback.format_stack())
+
+ self.start_time_written = False
+ if not enable_vertical_write:
+ if wrapper_submit_time:
+ self.submit_time_timestamp = wrapper_submit_time
+ else:
+ self.submit_time_timestamp = date2str(datetime.datetime.now(), 'S')
+ if self.wrapper_type != "vertical":
+ self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out", f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials
+ else:
+ self.local_logs = (f"{self.name}.{self.submit_time_timestamp}.out",
+ f"{self.name}.{self.submit_time_timestamp}.err") # for wrappers with inner retrials
+ return
+ if self.wrapper_type == "vertical" and self.fail_count > 0:
+ self.submit_time_timestamp = self.finish_time_timestamp
print(("Call from {} with status {}".format(self.name, self.status_str)))
if hold is True:
return # Do not write for HELD jobs.
- data_time = ["",time.time()]
- if self.wrapper_type != "vertical" or enabled:
- path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS')
- else:
- path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP')
+
+ data_time = ["",int(datetime.datetime.strptime(self.submit_time_timestamp, "%Y%m%d%H%M%S").timestamp())]
+ path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS')
if os.path.exists(path):
f = open(path, 'a')
f.write('\n')
else:
f = open(path, 'w')
- if not enabled:
- f.write(date2str(datetime.datetime.now(), 'S'))
- if self.wrapper_type == "vertical":
- f.write(" "+str(time.time()))
- else:
- path2 = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS_TMP')
- f2 = open(path2, 'r')
- for line in f2.readlines():
- if len(line) > 0:
- data_time = line.split(" ")
- try:
- data_time[1] = float(data_time[1])
- except Exception as e:
- data_time[1] = int(data_time[1])
- f.write(data_time[0])
- f2.close()
- try:
- os.remove(path2)
- except Exception as e:
- pass
- # Get
+ f.write(self.submit_time_timestamp)
+
# Writing database
- if self.wrapper_type != "vertical" or enabled:
- exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
- exp_history.write_submit_time(self.name, submit=data_time[1], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors,
- wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk,
- platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name),
- children=self.children_names_str)
+ exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
+ exp_history.write_submit_time(self.name, submit=data_time[1], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors,
+ wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk,
+ platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name),
+ children=self.children_names_str)
- def write_start_time(self, enabled = False):
+ def write_start_time(self, enable_vertical_write=False, from_stat_file=False, count=-1):
"""
Writes start date and time to TOTAL_STATS file
:return: True if successful, False otherwise
:rtype: bool
"""
- timestamp = date2str(datetime.datetime.now(), 'S')
- self.local_logs = (f"{self.name}.{timestamp}.out", f"{self.name}.{timestamp}.err")
+ if not enable_vertical_write and self.wrapper_type == "vertical":
+ return
- if self.wrapper_type != "vertical" or enabled:
- if self._platform.get_stat_file(self.name, retries=5): #fastlook
- start_time = self.check_start_time()
+ self.start_time_written = True
+ if not from_stat_file: # last known start time from AS
+ self.start_time_placeholder = time.time()
+ elif from_stat_file:
+ start_time_ = self.check_start_time(count) # last known start time from the .cmd file
+ if start_time_:
+ start_time = start_time_
else:
- Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format(
- self.name), 3000)
- start_time = time.time()
- timestamp = date2str(datetime.datetime.now(), 'S')
-
- self.local_logs = (self.name + "." + timestamp +
- ".out", self.name + "." + timestamp + ".err")
-
+ start_time = self.start_time_placeholder if self.start_time_placeholder else time.time()
path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS')
f = open(path, 'a')
f.write(' ')
@@ -2320,52 +2211,58 @@ class Job(object):
children=self.children_names_str)
return True
- def write_end_time(self, completed,enabled = False):
+ def write_vertical_time(self, count=-1):
+ self.write_submit_time(enable_vertical_write=True)
+ self.write_start_time(enable_vertical_write=True, from_stat_file=True, count=count)
+ self.write_end_time(self.status == Status.COMPLETED, enable_vertical_write=True, count=count)
+
+ def write_end_time(self, completed, enable_vertical_write=False, count = -1):
"""
Writes ends date and time to TOTAL_STATS file
- :param enabled:
:param completed: True if job was completed successfully, False otherwise
:type completed: bool
"""
- if self.wrapper_type != "vertical" or enabled:
- self._platform.get_stat_file(self.name, retries=5)
- end_time = self.check_end_time()
- path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS')
- f = open(path, 'a')
- f.write(' ')
- finish_time = None
- final_status = None
- if len(str(end_time)) > 0:
- # noinspection PyTypeChecker
- f.write(date2str(datetime.datetime.fromtimestamp(float(end_time)), 'S'))
- # date2str(datetime.datetime.fromtimestamp(end_time), 'S')
- finish_time = end_time
- else:
- f.write(date2str(datetime.datetime.now(), 'S'))
- finish_time = time.time() # date2str(datetime.datetime.now(), 'S')
- f.write(' ')
- if completed:
- final_status = "COMPLETED"
- f.write('COMPLETED')
- else:
- final_status = "FAILED"
- f.write('FAILED')
- out, err = self.local_logs
- path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out)
- # Launch first as simple non-threaded function
- exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
- job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, ncpus=self.processors,
- wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk,
- platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue,
- wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str)
+ if not enable_vertical_write and self.wrapper_type == "vertical":
+ return
+ end_time = self.check_end_time(count)
+ path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS')
+ f = open(path, 'a')
+ f.write(' ')
+ finish_time = None
+ final_status = None
+ if end_time > 0:
+ # noinspection PyTypeChecker
+ f.write(date2str(datetime.datetime.fromtimestamp(float(end_time)), 'S'))
+ self.finish_time_timestamp = date2str(datetime.datetime.fromtimestamp(end_time),'S')
+ # date2str(datetime.datetime.fromtimestamp(end_time), 'S')
+ finish_time = end_time
+ else:
+ f.write(date2str(datetime.datetime.now(), 'S'))
+ self.finish_time_timestamp = date2str(datetime.datetime.now(), 'S')
+ finish_time = time.time() # date2str(datetime.datetime.now(), 'S')
+ f.write(' ')
+ if completed:
+ final_status = "COMPLETED"
+ f.write('COMPLETED')
+ else:
+ final_status = "FAILED"
+ f.write('FAILED')
+ out, err = self.local_logs
+ path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out)
+ # Launch first as simple non-threaded function
+ exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
+ job_data_dc = exp_history.write_finish_time(self.name, finish=finish_time, status=final_status, ncpus=self.processors,
+ wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk,
+ platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue,
+ wrapper_code=get_job_package_code(self.expid, self.name), children=self.children_names_str)
- # Launch second as threaded function only for slurm
- if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm":
- thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish, args=(job_data_dc, self.platform))
- thread_write_finish.name = "JOB_data_{}".format(self.name)
- thread_write_finish.start()
+ # Launch second as threaded function only for slurm
+ if job_data_dc and type(self.platform) is not str and self.platform.type == "slurm":
+ thread_write_finish = Thread(target=ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR).write_platform_data_after_finish, args=(job_data_dc, self.platform))
+ thread_write_finish.name = "JOB_data_{}".format(self.name)
+ thread_write_finish.start()
- def write_total_stat_by_retries(self,total_stats, first_retrial = False):
+ def write_total_stat_by_retries(self, total_stats, first_retrial = False):
"""
Writes all data to TOTAL_STATS file
:param total_stats: data gathered by the wrapper
@@ -2374,8 +2271,6 @@ class Job(object):
:type first_retrial: bool
"""
- if first_retrial:
- self.write_submit_time(enabled=True)
path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS')
f = open(path, 'a')
if first_retrial:
@@ -2385,6 +2280,12 @@ class Job(object):
out, err = self.local_logs
path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out)
# Launch first as simple non-threaded function
+
+ exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
+ exp_history.write_start_time(self.name, start=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors,
+ wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk,
+ platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name),
+ children=self.children_names_str)
if not first_retrial:
exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
exp_history.write_submit_time(self.name, submit=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors,
@@ -2392,12 +2293,6 @@ class Job(object):
platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name),
children=self.children_names_str)
exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
- exp_history.write_start_time(self.name, start=total_stats[0], status=Status.VALUE_TO_KEY.get(self.status, "UNKNOWN"), ncpus=self.processors,
- wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk,
- platform=self.platform_name, job_id=self.id, wrapper_queue=self._wrapper_queue, wrapper_code=get_job_package_code(self.expid, self.name),
- children=self.children_names_str)
-
- exp_history = ExperimentHistory(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
job_data_dc = exp_history.write_finish_time(self.name, finish=total_stats[1], status=total_stats[2], ncpus=self.processors,
wallclock=self.wallclock, qos=self.queue, date=self.date, member=self.member, section=self.section, chunk=self.chunk,
platform=self.platform_name, job_id=self.id, out_file=out, err_file=err, wrapper_queue=self._wrapper_queue,
@@ -2470,7 +2365,7 @@ class Job(object):
def synchronize_logs(self, platform, remote_logs, local_logs, last = True):
platform.move_file(remote_logs[0], local_logs[0], True) # .out
platform.move_file(remote_logs[1], local_logs[1], True) # .err
- if last:
+ if last and local_logs[0] != "":
self.local_logs = local_logs
self.remote_logs = copy.deepcopy(local_logs)
@@ -2593,6 +2488,7 @@ class WrapperJob(Job):
if job.name in completed_files:
completed_jobs.append(job)
job.new_status = Status.COMPLETED
+ job.updated_log = False
job.update_status(self.as_config)
for job in completed_jobs:
self.running_jobs_start.pop(job, None)
diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py
index 69d54135278b0f4283d108382a38f760e3a1f6b7..3999a03b06695faaad2361b34006455ae84f1846 100644
--- a/autosubmit/job/job_common.py
+++ b/autosubmit/job/job_common.py
@@ -199,7 +199,7 @@ class StatisticsSnippetPython:
locale.setlocale(locale.LC_ALL, 'C')
job_name_ptrn = '%CURRENT_LOGDIR%/%JOBNAME%'
stat_file = open(job_name_ptrn + '_STAT', 'w')
- stat_file.write('{0:.0f}\\n'.format(time.time()))
+ stat_file.write('int({0:.0f})\\n'.format(time.time()))
stat_file.close()
###################
# Autosubmit Checkpoint
@@ -228,7 +228,7 @@ class StatisticsSnippetPython:
###################
stat_file = open(job_name_ptrn + '_STAT', 'a')
- stat_file.write('{0:.0f}\\n'.format(time.time()))
+ stat_file.write('int({0:.0f})\\n'.format(time.time()))
stat_file.close()
open(job_name_ptrn + '_COMPLETED', 'a').close()
exit(0)
diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py
index f23ca4e7336774eb7ed730b84a47d497a43185ed..0cd615b786e5b49c7d73faef7945bf0074bc8326 100644
--- a/autosubmit/job/job_list.py
+++ b/autosubmit/job/job_list.py
@@ -25,6 +25,7 @@ from contextlib import suppress
from shutil import move
from threading import Thread
from typing import List, Dict
+from pathlib import Path
import math
import networkx as nx
@@ -32,6 +33,12 @@ from bscearth.utils.date import date2str, parse_date
from networkx import DiGraph
from time import localtime, strftime, mktime
+import math
+import networkx as nx
+from bscearth.utils.date import date2str, parse_date
+from networkx import DiGraph
+from time import localtime, strftime, mktime, time
+
import autosubmit.database.db_structure as DbStructure
from autosubmit.helpers.data_transfer import JobRow
from autosubmit.job.job import Job
@@ -46,8 +53,6 @@ from autosubmitconfigparser.config.configcommon import AutosubmitConfig
from log.log import AutosubmitCritical, AutosubmitError, Log
-# Log.get_logger("Log.Autosubmit")
-
def threaded(fn):
def wrapper(*args, **kwargs):
@@ -97,6 +102,7 @@ class JobList(object):
self.graph = DiGraph()
self.depends_on_previous_chunk = dict()
self.depends_on_previous_split = dict()
+ self.path_to_logs = Path(BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR,f'LOG_{self.expid}')
@property
def expid(self):
@@ -1671,6 +1677,36 @@ class JobList(object):
else:
return completed_jobs
+ def get_completed_without_logs(self, platform=None):
+ """
+ Returns a list of completed jobs without updated logs
+
+ :param platform: job platform
+ :type platform: HPCPlatform
+ :return: completed jobs
+ :rtype: list
+ """
+
+ completed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and
+ job.status == Status.COMPLETED and job.updated_log is False ]
+
+ return completed_jobs
+
+ def get_completed_without_logs(self, platform=None):
+ """
+ Returns a list of completed jobs without updated logs
+
+ :param platform: job platform
+ :type platform: HPCPlatform
+ :return: completed jobs
+ :rtype: list
+ """
+
+ completed_jobs = [job for job in self._job_list if (platform is None or job.platform.name == platform.name) and
+ job.status == Status.COMPLETED and job.updated_log is False ]
+
+ return completed_jobs
+
def get_uncompleted(self, platform=None, wrapper=False):
"""
Returns a list of completed jobs
@@ -2499,6 +2535,31 @@ class JobList(object):
return jobs_to_check
+ def update_log_status(self, job, as_conf):
+ """
+ Updates the log err and log out.
+ """
+ if not hasattr(job,"updated_log") or not job.updated_log: # hasattr for backward compatibility (job.updated_logs is only for newer jobs, as the loaded ones may not have this set yet)
+ # order path_to_logs by name and get the two last element
+ log_file = False
+ if job.wrapper_type == "vertical" and job.fail_count > 0:
+ for log_recovered in self.path_to_logs.glob(f"{job.name}.*._{job.fail_count}.out"):
+ if job.local_logs[0][-4] in log_recovered.name:
+ log_file = True
+ break
+ else:
+ for log_recovered in self.path_to_logs.glob(f"{job.name}.*.out"):
+ if job.local_logs[0] == log_recovered.name:
+ log_file = True
+ break
+
+ if log_file:
+ if not hasattr(job, "ready_start_date") or not job.ready_start_date or job.local_logs[0] >= job.ready_start_date: # hasattr for backward compatibility
+ job.local_logs = (log_recovered.name, log_recovered.name[:-4] + ".err")
+ job.updated_log = True
+ if not job.updated_log and str(as_conf.platforms_data.get(job.platform.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false":
+ job.platform.add_job_to_log_recover(job)
+
def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time=False):
# type: (AutosubmitConfig, bool, bool, object, bool) -> bool
"""
@@ -2578,6 +2639,8 @@ class JobList(object):
# Check checkpoint jobs, the status can be Any
for job in self.check_special_status():
job.status = Status.READY
+ # Run start time in format (YYYYMMDDHH:MM:SS) from current time
+ job.ready_start_date = strftime("%Y%m%d%H%M%S")
job.id = None
job.packed = False
job.wrapper_type = None
@@ -2586,6 +2649,10 @@ class JobList(object):
# if waiting jobs has all parents completed change its State to READY
for job in self.get_completed():
job.packed = False
+ # Log name has this format:
+ # a02o_20000101_fc0_2_SIM.20240212115021.err
+ # $jobname.$(YYYYMMDDHHMMSS).err or .out
+ self.update_log_status(job, as_conf)
if job.synchronize is not None and len(str(job.synchronize)) > 0:
tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED]
if len(tmp) != len(job.parents):
@@ -2673,6 +2740,9 @@ class JobList(object):
if len(tmp2) == len(job.parents) and len(tmp3) != len(job.parents):
job.status = Status.READY
job.packed = False
+ # Run start time in format (YYYYMMDDHH:MM:SS) from current time
+ job.ready_start_date = strftime("%Y%m%d%H%M%S")
+ job.packed = False
job.hold = False
save = True
Log.debug(
diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py
index 9fc95fbf63b0be0dae844bd00678116f7a99a60c..d3eda6a82d9b0ff170e46ef16ff20f78a9a5169e 100644
--- a/autosubmit/job/job_packager.py
+++ b/autosubmit/job/job_packager.py
@@ -484,6 +484,8 @@ class JobPackager(object):
built_packages_tmp = list()
for param in self.wrapper_info:
current_info.append(param[self.current_wrapper_section])
+ current_info.append(self._as_config)
+
if self.wrapper_type[self.current_wrapper_section] == 'vertical':
built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits,wrapper_info=current_info)
elif self.wrapper_type[self.current_wrapper_section] == 'horizontal':
@@ -595,9 +597,8 @@ class JobPackager(object):
if job.packed is False:
job.packed = True
dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member(self.current_wrapper_section)
- job_vertical_packager = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, wrapper_limits["max"], wrapper_limits, self._platform.max_wallclock)
+ job_vertical_packager = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, wrapper_limits["max"], wrapper_limits, self._platform.max_wallclock,wrapper_info=wrapper_info)
jobs_list = job_vertical_packager.build_vertical_package(job)
-
packages.append(JobPackageVertical(jobs_list, configuration=self._as_config,wrapper_section=self.current_wrapper_section,wrapper_info=wrapper_info))
else:
@@ -605,6 +606,7 @@ class JobPackager(object):
return packages
def _build_hybrid_package(self, jobs_list, wrapper_limits, section,wrapper_info={}):
+ self.wrapper_info = wrapper_info
jobs_resources = dict()
jobs_resources['MACHINEFILES'] = self._as_config.get_wrapper_machinefiles()
@@ -620,12 +622,12 @@ class JobPackager(object):
def _build_horizontal_vertical_package(self, horizontal_packager, section, jobs_resources):
total_wallclock = '00:00'
- horizontal_package = horizontal_packager.build_horizontal_package()
+ horizontal_package = horizontal_packager.build_horizontal_package(wrapper_info=self.wrapper_info)
horizontal_packager.create_sections_order(section)
horizontal_packager.add_sectioncombo_processors(
horizontal_packager.total_processors)
horizontal_package.sort(
- key=lambda job: horizontal_packager.sort_by_expression(job.name))
+ key=lambda job: horizontal_packager.sort_by_expression(job.section))
job = max(horizontal_package, key=attrgetter('total_wallclock'))
wallclock = job.wallclock
current_package = [horizontal_package]
@@ -663,7 +665,7 @@ class JobPackager(object):
dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member(self.current_wrapper_section)
job_list = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock,
horizontal_packager.wrapper_limits["max"], horizontal_packager.wrapper_limits,
- self._platform.max_wallclock).build_vertical_package(job)
+ self._platform.max_wallclock,wrapper_info=self.wrapper_info).build_vertical_package(job)
current_package.append(list(set(job_list)))
for job in current_package[-1]:
@@ -718,6 +720,7 @@ class JobPackagerVertical(object):
child = self.get_wrappable_child(job)
# If not None, it is wrappable
if child is not None and len(str(child)) > 0:
+ child.update_parameters(self.wrapper_info[-1],{})
# Calculate total wallclock per possible wrapper
self.total_wallclock = sum_str_hours(
self.total_wallclock, child.wallclock)
@@ -856,14 +859,16 @@ class JobPackagerHorizontal(object):
self._maxTotalProcessors = 0
self._sectionList = list()
self._package_sections = dict()
-
- def build_horizontal_package(self, horizontal_vertical=False):
+ self.wrapper_info = []
+ def build_horizontal_package(self, horizontal_vertical=False,wrapper_info={}):
+ self.wrapper_info = wrapper_info
current_package = []
current_package_by_section = {}
if horizontal_vertical:
self._current_processors = 0
jobs_by_section = dict()
for job in self.job_list:
+ job.update_parameters(self.wrapper_info[-1],{})
if job.section not in jobs_by_section:
jobs_by_section[job.section] = list()
jobs_by_section[job.section].append(job)
@@ -918,9 +923,8 @@ class JobPackagerHorizontal(object):
max(self._package_sections.values()), self._maxTotalProcessors)
return True
- def sort_by_expression(self, jobname):
- jobname = jobname.split('_')[-1]
- return self._sort_order_dict[jobname]
+ def sort_by_expression(self, section):
+ return self._sort_order_dict[section]
def get_next_packages(self, jobs_sections, max_wallclock=None, potential_dependency=None, packages_remote_dependencies=list(), horizontal_vertical=False, max_procs=0):
packages = []
@@ -939,12 +943,13 @@ class JobPackagerHorizontal(object):
if other_parent.status != Status.COMPLETED and other_parent not in self.job_list:
wrappable = False
if wrappable and child not in next_section_list:
+ child.update_parameters(self.wrapper_info[-1],{})
next_section_list.append(child)
next_section_list.sort(
- key=lambda job: self.sort_by_expression(job.name))
+ key=lambda job: self.sort_by_expression(job.section))
self.job_list = next_section_list
- package_jobs = self.build_horizontal_package(horizontal_vertical)
+ package_jobs = self.build_horizontal_package(horizontal_vertical,wrapper_info=self.wrapper_info)
if package_jobs:
sections_aux = set()
diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py
index 5294193ec9093ee7133ff7cf2ae420432df69a25..581738da422fd0387fabe1a916cd04827c88da35 100644
--- a/autosubmit/job/job_packages.py
+++ b/autosubmit/job/job_packages.py
@@ -333,14 +333,15 @@ class JobPackageArray(JobPackageBase):
package_id = self.platform.submit_job(None, self._common_script, hold=hold, export = self.export)
- if package_id is None or not package_id:
+ if package_id is None or not package_id: # platforms with a submit.cmd
return
-
- for i in range(0, len(self.jobs)):
+ wrapper_time = None
+ for i in range(0, len(self.jobs)): # platforms without a submit.cmd
Log.info("{0} submitted", self.jobs[i].name)
self.jobs[i].id = str(package_id) + '[{0}]'.format(i)
self.jobs[i].status = Status.SUBMITTED
- self.jobs[i].write_submit_time(hold=hold)
+ self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time)
+ wrapper_time = self.jobs[i].write_submit_time
class JobPackageThread(JobPackageBase):
@@ -611,6 +612,7 @@ class JobPackageThread(JobPackageBase):
filenames += " " + self.platform.remote_log_dir + "/" + job.name + "_STAT " + \
self.platform.remote_log_dir + "/" + job.name + "_COMPLETED"
self.platform.remove_multiple_files(filenames)
+
else:
for job in self.jobs:
self.platform.remove_stat_file(job.name)
@@ -618,16 +620,18 @@ class JobPackageThread(JobPackageBase):
if hold:
job.hold = hold
+
package_id = self.platform.submit_job(None, self._common_script, hold=hold, export = self.export)
if package_id is None or not package_id:
return
-
- for i in range(0, len(self.jobs) ):
+ wrapper_time = None
+ for i in range(0, len(self.jobs)):
Log.info("{0} submitted", self.jobs[i].name)
self.jobs[i].id = str(package_id)
- self.jobs[i].status = Status.SUBMITTED
- self.jobs[i].write_submit_time(hold=hold)
+ self.jobs[i].status = Status.SUBMITTED
+ self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time)
+ wrapper_time = self.jobs[i].write_submit_time
def _common_script_content(self):
pass
@@ -696,16 +700,19 @@ class JobPackageThreadWrapped(JobPackageThread):
if hold:
job.hold = hold
+
package_id = self.platform.submit_job(None, self._common_script, hold=hold, export = self.export)
if package_id is None or not package_id:
raise Exception('Submission failed')
-
+ wrapper_time = None
for i in range(0, len(self.jobs)):
Log.info("{0} submitted", self.jobs[i].name)
self.jobs[i].id = str(package_id)
- self.jobs[i].status = Status.SUBMITTED
- self.jobs[i].write_submit_time(hold=hold)
+ self.jobs[i].status = Status.SUBMITTED
+ self.jobs[i].write_submit_time(hold=hold,wrapper_submit_time=wrapper_time)
+ wrapper_time = self.jobs[i].write_submit_time
+
class JobPackageVertical(JobPackageThread):
"""
Class to manage a vertical thread-based package of jobs to be submitted by autosubmit
@@ -713,7 +720,7 @@ class JobPackageVertical(JobPackageThread):
:type jobs:
:param: dependency:
"""
- def __init__(self, jobs, dependency=None,configuration=None,wrapper_section="WRAPPERS", wrapper_info = {}):
+ def __init__(self, jobs, dependency=None,configuration=None,wrapper_section="WRAPPERS", wrapper_info = []):
self._num_processors = 0
for job in jobs:
if int(job.processors) >= int(self._num_processors):
diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py
index e1b9bb3b256434becb3868f449471303d69b6779..4b0afea1f672cfc5427d0d8554ec77980f4c30c9 100644
--- a/autosubmit/monitor/monitor.py
+++ b/autosubmit/monitor/monitor.py
@@ -453,6 +453,8 @@ class Monitor:
log_out = ""
log_err = ""
if job.status in [Status.FAILED, Status.COMPLETED]:
+ if type(job.local_logs) is not tuple:
+ job.local_logs = ("","")
log_out = path + "/" + job.local_logs[0]
log_err = path + "/" + job.local_logs[1]
diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py
index fb880e694e26ec671b61235b2afde0b7e99b11b4..b023677a4ccae76a72d222bebe1b7d7085bef224 100644
--- a/autosubmit/platforms/ecplatform.py
+++ b/autosubmit/platforms/ecplatform.py
@@ -153,7 +153,7 @@ class EcPlatform(ParamikoPlatform):
export += " ; "
return export + self._submit_cmd + job_script
- def connect(self, reconnect=False):
+ def connect(self, as_conf, reconnect=False):
"""
In this case, it does nothing because connection is established for each command
@@ -170,7 +170,13 @@ class EcPlatform(ParamikoPlatform):
self.connected = False
except:
self.connected = False
- def restore_connection(self):
+ if not self.log_retrieval_process_active and (
+ as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS',
+ "false")).lower() == "false"):
+ self.log_retrieval_process_active = True
+ self.recover_job_logs()
+
+ def restore_connection(self,as_conf):
"""
In this case, it does nothing because connection is established for each command
@@ -187,7 +193,8 @@ class EcPlatform(ParamikoPlatform):
self.connected = False
except:
self.connected = False
- def test_connection(self):
+
+ def test_connection(self,as_conf):
"""
In this case, it does nothing because connection is established for each command
diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py
index 7f41060eb80398eddce42dca098ca6260a49fa5b..ae8c7dd6017377d243612356c4c191d412a5beb5 100644
--- a/autosubmit/platforms/locplatform.py
+++ b/autosubmit/platforms/locplatform.py
@@ -28,7 +28,7 @@ from autosubmit.platforms.headers.local_header import LocalHeader
from autosubmitconfigparser.config.basicconfig import BasicConfig
from time import sleep
from log.log import Log, AutosubmitError, AutosubmitCritical
-
+import threading
class LocalPlatform(ParamikoPlatform):
"""
Class to manage jobs to localhost
@@ -111,17 +111,27 @@ class LocalPlatform(ParamikoPlatform):
def get_checkjob_cmd(self, job_id):
return self.get_pscall(job_id)
- def connect(self, reconnect=False):
- self.connected = True
- def test_connection(self):
+ def connect(self, as_conf, reconnect=False):
self.connected = True
- def restore_connection(self):
+ if not self.log_retrieval_process_active and (
+ as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS',"false")).lower() == "false"):
+ self.log_retrieval_process_active = True
+ self.recover_job_logs()
+
+
+ def test_connection(self,as_conf):
+ if not self.connected:
+ self.connect(as_conf)
+
+
+ def restore_connection(self,as_conf):
self.connected = True
def check_Alljobs(self, job_list, as_conf, retries=5):
for job,prev_job_status in job_list:
self.check_job(job)
- def send_command(self, command,ignore_log=False, x11 = False):
+
+ def send_command(self, command, ignore_log=False, x11 = False):
lang = locale.getlocale()[1]
if lang is None:
lang = locale.getdefaultlocale()[1]
@@ -175,7 +185,7 @@ class LocalPlatform(ParamikoPlatform):
return True
# Moves .err .out
- def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3):
+ def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3, first=True):
"""
Moves a file on the platform
:param src: source name
@@ -187,12 +197,17 @@ class LocalPlatform(ParamikoPlatform):
file_exist = False
remote_path = os.path.join(self.get_files_path(), src)
retries = 0
+ # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on
+ if not first:
+ max_retries = 1
+ sleeptime = 0
while not file_exist and retries < max_retries:
try:
file_exist = os.path.isfile(os.path.join(self.get_files_path(),src))
if not file_exist: # File doesn't exist, retry in sleep-time
- Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime,
- max_retries - retries, remote_path)
+ if first:
+ Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime,
+ max_retries - retries, remote_path)
if not wrapper_failed:
sleep(sleeptime)
sleeptime = sleeptime + 5
diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py
index a03ec5dee262ed14507dd98361453148c61c3306..ed65c772d3304004bd1da59af57dceed9e5b3045 100644
--- a/autosubmit/platforms/lsfplatform.py
+++ b/autosubmit/platforms/lsfplatform.py
@@ -138,27 +138,4 @@ class LsfPlatform(ParamikoPlatform):
###############################################################################
""".format(filename, queue, project, wallclock, num_procs, dependency,
'\n'.ljust(13).join(str(s) for s in directives))
- # def connect(self):
- # """
- # In this case, it does nothing because connection is established for each command
- #
- # :return: True
- # :rtype: bool
- # """
- # self.connected = True
- # def restore_connection(self):
- # """
- # In this case, it does nothing because connection is established for each command
- #
- # :return: True
- # :rtype: bool
- # """
- # self.connected = True
- # def test_connection(self):
- # """
- # In this case, it does nothing because connection is established for each command
- #
- # :return: True
- # :rtype: bool
- # """
- # self.connected = True
+
diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py
index 8bb6ef2cc016b016444bf38cad405cdddcb679c3..4d9e7169ff18abb3efe0a8925ab7d708ccea2f61 100644
--- a/autosubmit/platforms/paramiko_platform.py
+++ b/autosubmit/platforms/paramiko_platform.py
@@ -1,5 +1,6 @@
+import copy
+
import locale
-from binascii import hexlify
from contextlib import suppress
from time import sleep
import sys
@@ -7,7 +8,6 @@ import socket
import os
import paramiko
import datetime
-import time
import select
import re
from datetime import timedelta
@@ -15,17 +15,17 @@ import random
from autosubmit.job.job_common import Status
from autosubmit.job.job_common import Type
from autosubmit.platforms.platform import Platform
-from bscearth.utils.date import date2str
from log.log import AutosubmitError, AutosubmitCritical, Log
from paramiko.ssh_exception import (SSHException)
import Xlib.support.connect as xlib_connect
from threading import Thread
+import threading
import getpass
from paramiko.agent import Agent
def threaded(fn):
def wrapper(*args, **kwargs):
- thread = Thread(target=fn, args=args, kwargs=kwargs)
+ thread = Thread(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_X11")
thread.start()
return thread
@@ -115,7 +115,7 @@ class ParamikoPlatform(Platform):
self.local_x11_display = xlib_connect.get_display(display)
- def test_connection(self):
+ def test_connection(self,as_conf):
"""
Test if the connection is still alive, reconnect if not.
"""
@@ -123,7 +123,7 @@ class ParamikoPlatform(Platform):
if not self.connected:
self.reset()
try:
- self.restore_connection()
+ self.restore_connection(as_conf)
message = "OK"
except BaseException as e:
message = str(e)
@@ -134,6 +134,7 @@ class ParamikoPlatform(Platform):
except:
message = "Timeout connection"
return message
+
except EOFError as e:
self.connected = False
raise AutosubmitError("[{0}] not alive. Host: {1}".format(
@@ -146,13 +147,13 @@ class ParamikoPlatform(Platform):
raise AutosubmitCritical(str(e),7051)
#raise AutosubmitError("[{0}] connection failed for host: {1}".format(self.name, self.host), 6002, e.message)
- def restore_connection(self):
+ def restore_connection(self, as_conf):
try:
self.connected = False
retries = 2
retry = 0
try:
- self.connect()
+ self.connect(as_conf)
except Exception as e:
if ',' in self.host:
Log.printlog("Connection Failed to {0}, will test another host".format(
@@ -162,7 +163,7 @@ class ParamikoPlatform(Platform):
"First connection to {0} is failed, check host configuration or try another login node ".format(self.host), 7050,str(e))
while self.connected is False and retry < retries:
try:
- self.connect(True)
+ self.connect(as_conf,True)
except Exception as e:
pass
retry += 1
@@ -193,7 +194,8 @@ class ParamikoPlatform(Platform):
key.public_blob = None
self._ssh.connect(self._host_config['hostname'], port=port, username=self.user, timeout=60, banner_timeout=60)
except BaseException as e:
- Log.warning(f'Failed to authenticate with ssh-agent due to {e}')
+ Log.debug(f'Failed to authenticate with ssh-agent due to {e}')
+ Log.debug('Trying to authenticate with other methods')
return False
return True
@@ -224,7 +226,7 @@ class ParamikoPlatform(Platform):
# pass
return tuple(answers)
- def connect(self, reconnect=False):
+ def connect(self, as_conf, reconnect=False):
"""
Creates ssh connection to host
@@ -266,7 +268,7 @@ class ParamikoPlatform(Platform):
except Exception as e:
self._ssh.connect(self._host_config['hostname'], port, username=self.user,
key_filename=self._host_config_id, sock=self._proxy, timeout=60,
- banner_timeout=60,disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']})
+ banner_timeout=60, disabled_algorithms={'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']})
else:
try:
self._ssh.connect(self._host_config['hostname'], port, username=self.user,
@@ -300,7 +302,10 @@ class ParamikoPlatform(Platform):
self._ftpChannel = paramiko.SFTPClient.from_transport(self.transport,window_size=pow(4, 12) ,max_packet_size=pow(4, 12) )
self._ftpChannel.get_channel().settimeout(120)
self.connected = True
- except SSHException as e:
+ if not self.log_retrieval_process_active and (as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"):
+ self.log_retrieval_process_active = True
+ self.recover_job_logs()
+ except SSHException:
raise
except IOError as e:
if "refused" in str(e.strerror).lower():
@@ -315,7 +320,7 @@ class ParamikoPlatform(Platform):
raise AutosubmitCritical("Authentication Failed, please check the platform.conf of {0}".format(
self._host_config['hostname']), 7050, str(e))
if not reconnect and "," in self._host_config['hostname']:
- self.restore_connection()
+ self.restore_connection(as_conf)
else:
raise AutosubmitError(
"Couldn't establish a connection to the specified host, wrong configuration?", 6003, str(e))
@@ -473,7 +478,10 @@ class ParamikoPlatform(Platform):
path_root = self.get_files_path()
src = os.path.join(path_root, src)
dest = os.path.join(path_root, dest)
- self._ftpChannel.rename(src,dest)
+ try:
+ self._ftpChannel.stat(dest)
+ except IOError:
+ self._ftpChannel.rename(src,dest)
return True
except IOError as e:
@@ -644,6 +652,9 @@ class ParamikoPlatform(Platform):
job_status = Status.UNKNOWN
Log.error(
'check_job() The job id ({0}) status is {1}.', job_id, job_status)
+
+ if job_status in [Status.FAILED, Status.COMPLETED]:
+ job.updated_log = False
if submit_hold_check:
return job_status
else:
@@ -775,7 +786,6 @@ class ParamikoPlatform(Platform):
elif retries == 0:
job_status = Status.COMPLETED
job.update_status(as_conf)
-
else:
job_status = Status.UNKNOWN
Log.error(
@@ -887,6 +897,7 @@ class ParamikoPlatform(Platform):
sys.stdout.write(session.recv(4096))
while session.recv_stderr_ready():
sys.stderr.write(session.recv_stderr(4096))
+
@threaded
def x11_status_checker(self, session, session_fileno):
self.transport.accept()
@@ -967,7 +978,7 @@ class ParamikoPlatform(Platform):
except paramiko.SSHException as e:
if str(e) in "SSH session not active":
self._ssh = None
- self.restore_connection()
+ self.restore_connection(None)
timeout = timeout + 60
retries = retries - 1
if retries <= 0:
@@ -1325,16 +1336,6 @@ class ParamikoPlatform(Platform):
if self.transport:
self.transport.close()
self.transport.stop_thread()
- with suppress(Exception):
- del self._ssh._agent # May not be in all runs
- with suppress(Exception):
- del self._ssh._transport
- with suppress(Exception):
- del self._ftpChannel
- with suppress(Exception):
- del self.transport
- with suppress(Exception):
- del self._ssh
def check_tmp_exists(self):
try:
@@ -1366,8 +1367,6 @@ class ParamikoPlatform(Platform):
"""
Creates log dir on remote host
"""
-
-
try:
if self.send_command(self.get_mkdir_cmd()):
Log.debug('{0} has been created on {1} .',
diff --git a/autosubmit/platforms/pbsplatform.py b/autosubmit/platforms/pbsplatform.py
index 132b8715c03cdddd367669af807384a8134a933e..1a1ef89b5cd6c27c537aaa971c371876504b8fc1 100644
--- a/autosubmit/platforms/pbsplatform.py
+++ b/autosubmit/platforms/pbsplatform.py
@@ -129,27 +129,4 @@ class PBSPlatform(ParamikoPlatform):
return self._checkjob_cmd + str(job_id)
else:
return "ssh " + self.host + " " + self.get_qstatjob(job_id)
- # def connect(self):
- # """
- # In this case, it does nothing because connection is established for each command
- #
- # :return: True
- # :rtype: bool
- # """
- # self.connected = True
- # def restore_connection(self):
- # """
- # In this case, it does nothing because connection is established for each command
- #
- # :return: True
- # :rtype: bool
- # """
- # self.connected = True
- # def test_connection(self):
- # """
- # In this case, it does nothing because connection is established for each command
- #
- # :return: True
- # :rtype: bool
- # """
- # self.connected = True
+
diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py
index 9014cd6a5b5544a490a1b1d7730672370da360bb..9e182c5c0fb229baa92731a8b4e68b31dde81c3f 100644
--- a/autosubmit/platforms/pjmplatform.py
+++ b/autosubmit/platforms/pjmplatform.py
@@ -463,9 +463,13 @@ class PJMPlatform(ParamikoPlatform):
def allocated_nodes():
return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))"""
- def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3):
+ def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True):
file_exist = False
retries = 0
+ # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on
+ if not first:
+ max_retries = 1
+ sleeptime = 0
while not file_exist and retries < max_retries:
try:
# This return IOError if path doesn't exist
@@ -473,8 +477,9 @@ class PJMPlatform(ParamikoPlatform):
self.get_files_path(), filename))
file_exist = True
except IOError as e: # File doesn't exist, retry in sleeptime
- Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime,
- max_retries - retries, os.path.join(self.get_files_path(), filename))
+ if first:
+ Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime,
+ max_retries - retries, os.path.join(self.get_files_path(), filename))
if not wrapper_failed:
sleep(sleeptime)
sleeptime = sleeptime + 5
diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py
index 05340a526ccce9f2b51ecfa6925ebc5be2763584..ac3d09eabc093bae281a14012840b715ac6bfcb6 100644
--- a/autosubmit/platforms/platform.py
+++ b/autosubmit/platforms/platform.py
@@ -1,6 +1,11 @@
+import copy
+
+import queue
+
+import time
+
import locale
import os
-from pathlib import Path
import traceback
from autosubmit.job.job_common import Status
@@ -8,7 +13,16 @@ from typing import List, Union
from autosubmit.helpers.parameters import autosubmit_parameter
from log.log import AutosubmitCritical, AutosubmitError, Log
-import getpass
+from multiprocessing import Process, Queue
+
+
+def processed(fn):
+ def wrapper(*args, **kwargs):
+ process = Process(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_platform")
+ process.start()
+ return process
+
+ return wrapper
class Platform(object):
"""
Class to manage the connections to the different platforms.
@@ -78,6 +92,8 @@ class Platform(object):
self.pw = auth_password
else:
self.pw = None
+ self.recovery_queue = Queue()
+ self.log_retrieval_process_active = False
@property
@@ -272,6 +288,7 @@ class Platform(object):
for innerJob in package._jobs:
# Setting status to COMPLETED, so it does not get stuck in the loop that calls this function
innerJob.status = Status.COMPLETED
+ innerJob.updated_log = False
# If called from RUN or inspect command
if not only_wrappers:
@@ -318,6 +335,7 @@ class Platform(object):
raise
except Exception as e:
raise
+
return save, failed_packages, error_message, valid_packages_to_submit
@property
@@ -624,10 +642,10 @@ class Platform(object):
if self.check_file_exists(filename):
self.delete_file(filename)
- def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3):
+ def check_file_exists(self, src, wrapper_failed=False, sleeptime=5, max_retries=3, first=True):
return True
- def get_stat_file(self, job_name, retries=0):
+ def get_stat_file(self, job_name, retries=0, count = -1):
"""
Copies *STAT* files from remote to local
@@ -638,7 +656,10 @@ class Platform(object):
:return: True if successful, False otherwise
:rtype: bool
"""
- filename = job_name + '_STAT'
+ if count == -1: # No internal retrials
+ filename = job_name + '_STAT'
+ else:
+ filename = job_name + '_STAT_{0}'.format(str(count))
stat_local_path = os.path.join(
self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR"), filename)
if os.path.exists(stat_local_path):
@@ -650,46 +671,6 @@ class Platform(object):
Log.debug('{0}_STAT file not found', job_name)
return False
- def check_stat_file_by_retrials(self, job_name, retries=0):
- """
- check *STAT* file
-
- :param retries: number of intents to get the completed files
- :type retries: int
- :param job_name: name of job to check
- :type job_name: str
- :return: True if successful, False otherwise
- :rtype: bool
- """
- filename = job_name
- if self.check_file_exists(filename):
- return True
- else:
- return False
-
- def get_stat_file_by_retrials(self, job_name, retries=0):
- """
- Copies *STAT* files from remote to local
-
- :param retries: number of intents to get the completed files
- :type retries: int
- :param job_name: name of job to check
- :type job_name: str
- :return: True if successful, False otherwise
- :rtype: bool
- """
- filename = job_name
- stat_local_path = os.path.join(
- self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR"), filename)
- if os.path.exists(stat_local_path):
- os.remove(stat_local_path)
- if self.check_file_exists(filename):
- if self.get_file(filename, True):
- return True
- else:
- return False
- else:
- return False
@autosubmit_parameter(name='current_logdir')
def get_files_path(self):
@@ -821,3 +802,49 @@ class Platform(object):
"""
raise NotImplementedError
+ def add_job_to_log_recover(self, job):
+ self.recovery_queue.put((job,job.children))
+
+ def connect(self, as_conf, reconnect=False):
+ raise NotImplementedError
+
+ def restore_connection(self,as_conf):
+ raise NotImplementedError
+
+ @processed
+ def recover_job_logs(self):
+ job_names_processed = set()
+ self.connected = False
+ self.restore_connection(None)
+ while True:
+ try:
+ job,children = self.recovery_queue.get()
+ if job.wrapper_type != "vertical":
+ if f'{job.name}_{job.fail_count}' in job_names_processed:
+ continue
+ else:
+ if f'{job.name}' in job_names_processed:
+ continue
+ job.children = children
+ job.platform = self
+ try:
+ job.retrieve_logfiles(self, raise_error=True)
+ if job.wrapper_type != "vertical":
+ job_names_processed.add(f'{job.name}_{job.fail_count}')
+ else:
+ job_names_processed.add(f'{job.name}')
+ except:
+ pass
+ except queue.Empty:
+ pass
+ except (IOError, OSError):
+ pass
+ except Exception as e:
+ try:
+ self.restore_connection(None)
+ except:
+ pass
+ time.sleep(1)
+
+
+
diff --git a/autosubmit/platforms/sgeplatform.py b/autosubmit/platforms/sgeplatform.py
index 58671cd98896fcd2c32f8c086dd73b51878f8f3b..875d455996b6fe1b1f102cd9f68465a142b058ac 100644
--- a/autosubmit/platforms/sgeplatform.py
+++ b/autosubmit/platforms/sgeplatform.py
@@ -114,7 +114,7 @@ class SgePlatform(ParamikoPlatform):
def get_checkjob_cmd(self, job_id):
return self.get_qstatjob(job_id)
- def connect(self,reconnect=False):
+ def connect(self, as_conf, reconnect=False):
"""
In this case, it does nothing because connection is established for each command
@@ -122,7 +122,12 @@ class SgePlatform(ParamikoPlatform):
:rtype: bool
"""
self.connected = True
- def restore_connection(self):
+ if not self.log_retrieval_process_active and (
+ as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS',
+ "false")).lower() == "false"):
+ self.log_retrieval_process_active = True
+ self.recover_job_logs()
+ def restore_connection(self,as_conf):
"""
In this case, it does nothing because connection is established for each command
@@ -130,7 +135,8 @@ class SgePlatform(ParamikoPlatform):
:rtype: bool
"""
self.connected = True
- def test_connection(self):
+
+ def test_connection(self,as_conf):
"""
In this case, it does nothing because connection is established for each command
@@ -138,3 +144,5 @@ class SgePlatform(ParamikoPlatform):
:rtype: bool
"""
self.connected = True
+ self.connected(as_conf,True)
+
diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py
index e741239dbc56fac949395e60b2c31d7db302f76b..c52a6c0e1c3bb20be24c67efe9ce6d880f7de1df 100644
--- a/autosubmit/platforms/slurmplatform.py
+++ b/autosubmit/platforms/slurmplatform.py
@@ -170,7 +170,6 @@ class SlurmPlatform(ParamikoPlatform):
job.hold = hold
job.id = str(jobs_id[i])
job.status = Status.SUBMITTED
- job.write_submit_time(hold=hold)
# Check if there are duplicated jobnames
if not duplicated_jobs_already_checked:
job_name = package.name if hasattr(package, "name") else package.jobs[0].name
@@ -629,9 +628,13 @@ class SlurmPlatform(ParamikoPlatform):
def allocated_nodes():
return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list_{0}".format(node_id))"""
- def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3):
+ def check_file_exists(self, filename, wrapper_failed=False, sleeptime=5, max_retries=3, first=True):
file_exist = False
retries = 0
+ # Not first is meant for vertical_wrappers. There you have to download STAT_{MAX_LOGS} then STAT_{MAX_LOGS-1} and so on
+ if not first:
+ max_retries = 1
+ sleeptime = 0
while not file_exist and retries < max_retries:
try:
# This return IOError if path doesn't exist
@@ -639,8 +642,9 @@ class SlurmPlatform(ParamikoPlatform):
self.get_files_path(), filename))
file_exist = True
except IOError as e: # File doesn't exist, retry in sleeptime
- Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime,
- max_retries - retries, os.path.join(self.get_files_path(), filename))
+ if first:
+ Log.debug("{2} File does not exist.. waiting {0}s for a new retry (retries left: {1})", sleeptime,
+ max_retries - retries, os.path.join(self.get_files_path(), filename))
if not wrapper_failed:
sleep(sleeptime)
sleeptime = sleeptime + 5
diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py
index df348f6dd7b1bc07fa5607f89934b134a1690aea..d40a985d1b2fc52907b8c189dff055836142aa65 100644
--- a/autosubmit/platforms/wrappers/wrapper_builder.py
+++ b/autosubmit/platforms/wrappers/wrapper_builder.py
@@ -451,12 +451,12 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder):
for i in range(len({0})):
job_retrials = retrials
completed = False
- while job_retrials >= 0 and not completed:
+ fail_count = 0
+ while fail_count <= job_retrials and not completed:
current = {1}
current.start()
- os.system("echo "+str(time.time())+" > "+scripts[i][:-4]+"_STAT_"+str(job_retrials)) #Start/submit running
+ os.system("echo "+str(int(time.time()))+" > "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Start/submit running
current.join({3})
- job_retrials = job_retrials - 1
total_steps = total_steps + 1
""").format(jobs_list, thread,self.retrials,str(self.wallclock_by_level),'\n'.ljust(13))
@@ -467,15 +467,17 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder):
failed_filename = {0}[i].replace('.cmd', '_FAILED')
failed_path = os.path.join(os.getcwd(), failed_filename)
failed_wrapper = os.path.join(os.getcwd(), wrapper_id)
- os.system("echo "+str(time.time())+" >> "+scripts[i][:-4]+"_STAT_"+str(job_retrials+1)) #Completed
+ os.system("echo "+str(int(time.time()))+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed
if os.path.exists(completed_path):
completed = True
print(datetime.now(), "The job ", current.template," has been COMPLETED")
- os.system("echo COMPLETED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1))
+ os.system("echo COMPLETED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count))
else:
print(datetime.now(), "The job ", current.template," has FAILED")
- os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(job_retrials+1))
+ os.system("echo FAILED >> " + scripts[i][:-4]+"_STAT_"+str(fail_count))
#{1}
+ fail_count = fail_count + 1
+
""").format(jobs_list, self.exit_thread, '\n'.ljust(13)), 8)
sequential_threads_launcher += self._indent(textwrap.dedent("""
if not os.path.exists(completed_path):
@@ -493,17 +495,17 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder):
def build_job_thread(self): # fastlook
return textwrap.dedent("""
class JobThread(Thread):
- def __init__ (self, template, id_run, retrials):
+ def __init__ (self, template, id_run, retrials, fail_count):
Thread.__init__(self)
self.template = template
self.id_run = id_run
self.retrials = retrials
+ self.fail_count = fail_count
def run(self):
jobname = self.template.replace('.cmd', '')
- #os.system("echo $(date +%s) > "+jobname+"_STAT")
- out = str(self.template) + ".out." + str(self.retrials)
- err = str(self.template) + ".err." + str(self.retrials)
+ out = str(self.template) + ".out." + str(self.fail_count)
+ err = str(self.template) + ".err." + str(self.fail_count)
print((out+"\\n"))
command = "./" + str(self.template) + " " + str(self.id_run) + " " + os.getcwd()
print((command+"\\n"))
@@ -515,7 +517,7 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder):
""").format(str(self.wallclock_by_level),'\n'.ljust(13))
def build_main(self):
self.exit_thread = "os._exit(1)"
- return self.build_sequential_threads_launcher("scripts", "JobThread(scripts[i], i, job_retrials)")
+ return self.build_sequential_threads_launcher("scripts", "JobThread(scripts[i], i, retrials, fail_count)")
class PythonHorizontalWrapperBuilder(PythonWrapperBuilder):
def build_main(self):
diff --git a/docs/source/userguide/configure/develop_a_project.rst b/docs/source/userguide/configure/develop_a_project.rst
index 7621b29d085e6cc1b62b975fe1ffd7ac552d4f36..74786fda59a80f6224041f3d327066d168212a07 100644
--- a/docs/source/userguide/configure/develop_a_project.rst
+++ b/docs/source/userguide/configure/develop_a_project.rst
@@ -121,7 +121,10 @@ Autosubmit configuration
TOTALJOBS: 6
# Time (seconds) between connections to the HPC queue scheduler to poll already submitted jobs status
# Default:10
- SAFETYSLEEPTIME:10
+ SAFETYSLEEPTIME: 10
+ # Time (seconds) before ending the run to retrieve the last logs.
+ # Default:180
+ LAST_LOGS_TIMEOUT: 180
# Number of retrials if a job fails. Can ve override at job level
# Default:0
RETRIALS:0
diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py
index c005020b87149a6862fff5447a2315d7c440b2ae..62ff9bc8d8e0405a6c8b4c803c714b512560b149 100644
--- a/test/unit/test_wrappers.py
+++ b/test/unit/test_wrappers.py
@@ -172,6 +172,7 @@ class TestWrappers(TestCase):
self.temp_directory = tempfile.mkdtemp()
self.job_list = JobList(self.experiment_id, self.config, YAMLParserFactory(),
JobListPersistenceDb(self.temp_directory, 'db'),self.as_conf)
+
self.parser_mock = MagicMock(spec='SafeConfigParser')
self._platform.max_waiting_jobs = 100
@@ -200,6 +201,8 @@ class TestWrappers(TestCase):
self.job_packager = JobPackager(
self.as_conf, self._platform, self.job_list)
self.job_list._ordered_jobs_by_date_member["WRAPPERS"] = dict()
+ self.wrapper_info = ['vertical', 'flexible', 'asthread', ['SIM'], 0,self.as_conf]
+
def tearDown(self) -> None:
shutil.rmtree(self.temp_directory)
@@ -272,8 +275,10 @@ class TestWrappers(TestCase):
wrapper_limits["min_v"] = 2
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapped_job_by_section
+
+
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits, self.wrapper_info)
package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2,
d1_m1_9_s2, d1_m1_10_s2]
@@ -354,7 +359,7 @@ class TestWrappers(TestCase):
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapped_job_by_section
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits, wrapper_info=self.wrapper_info)
package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2,
d1_m1_9_s2, d1_m1_10_s2]
@@ -362,7 +367,7 @@ class TestWrappers(TestCase):
d1_m2_9_s2, d1_m2_10_s2]
packages = [JobPackageVertical(
- package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)]
+ package_m1_s2,configuration=self.as_conf, wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2,configuration=self.as_conf, wrapper_info=self.wrapper_info)]
for i in range(0, len(returned_packages)):
self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs)
@@ -424,7 +429,7 @@ class TestWrappers(TestCase):
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapped_job_by_section
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits, self.wrapper_info)
package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2,
d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2]
@@ -432,7 +437,7 @@ class TestWrappers(TestCase):
d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2]
packages = [JobPackageVertical(
- package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)]
+ package_m1_s2,configuration=self.as_conf,wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2,configuration=self.as_conf,wrapper_info=self.wrapper_info)]
#returned_packages = returned_packages[0]
for i in range(0, len(returned_packages)):
@@ -495,7 +500,7 @@ class TestWrappers(TestCase):
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapped_job_by_section
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits, self.wrapper_info)
package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2,
d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2]
@@ -503,7 +508,7 @@ class TestWrappers(TestCase):
d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2]
packages = [JobPackageVertical(
- package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)]
+ package_m1_s2,configuration=self.as_conf, wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2,configuration=self.as_conf, wrapper_info=self.wrapper_info)]
#returned_packages = returned_packages[0]
for i in range(0, len(returned_packages)):
@@ -646,7 +651,7 @@ class TestWrappers(TestCase):
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapped_job_by_section
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits,wrapper_info=self.wrapper_info)
package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2,
d1_m1_4_s3]
@@ -654,7 +659,7 @@ class TestWrappers(TestCase):
d1_m2_4_s3]
packages = [JobPackageVertical(
- package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)]
+ package_m1_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info)]
#returned_packages = returned_packages[0]
for i in range(0, len(returned_packages)):
@@ -726,12 +731,12 @@ class TestWrappers(TestCase):
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapper_job_by_section
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits,wrapper_info=self.wrapper_info)
package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2,
d1_m1_4_s3]
- packages = [JobPackageVertical(package_m1_s2_s3,configuration=self.as_conf)]
+ packages = [JobPackageVertical(package_m1_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info)]
#returned_packages = returned_packages[0]
for i in range(0, len(returned_packages)):
@@ -805,7 +810,7 @@ class TestWrappers(TestCase):
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapped_job_by_section
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits, wrapper_info=self.wrapper_info)
package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2,
d1_m1_4_s3]
@@ -813,7 +818,7 @@ class TestWrappers(TestCase):
d1_m2_4_s3]
packages = [JobPackageVertical(
- package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)]
+ package_m1_s2_s3,configuration=self.as_conf, wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf, wrapper_info=self.wrapper_info)]
#returned_packages = returned_packages[0]
# print("test_returned_packages_max_jobs_mixed_wrapper")
@@ -895,7 +900,7 @@ class TestWrappers(TestCase):
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapped_job_by_section
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits,wrapper_info=self.wrapper_info)
package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3,
d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2]
@@ -903,7 +908,7 @@ class TestWrappers(TestCase):
d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2]
packages = [JobPackageVertical(
- package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)]
+ package_m1_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info)]
#returned_packages = returned_packages[0]
for i in range(0, len(returned_packages)):
@@ -977,13 +982,13 @@ class TestWrappers(TestCase):
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapped_job_by_section
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits,wrapper_info=self.wrapper_info)
package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3]
package_m2_s2_s3 = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3]
packages = [JobPackageVertical(
- package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)]
+ package_m1_s2_s3,configuration=self.as_conf, wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf, wrapper_info=self.wrapper_info)]
#returned_packages = returned_packages[0]
for i in range(0, len(returned_packages)):
@@ -1075,13 +1080,13 @@ class TestWrappers(TestCase):
wrapper_limits["min_h"] = 2
wrapper_limits["max_by_section"] = max_wrapped_job_by_section
returned_packages = self.job_packager._build_vertical_packages(
- section_list, wrapper_limits)
+ section_list, wrapper_limits, wrapper_info=self.wrapper_info)
package_m1_s2_s3 = [d1_m1_2_s3, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3]
package_m2_s2_s3 = [d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3]
packages = [JobPackageVertical(
- package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)]
+ package_m1_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf,wrapper_info=self.wrapper_info)]
#returned_packages = returned_packages[0]
for i in range(0, len(returned_packages)):
@@ -1879,6 +1884,7 @@ class TestWrappers(TestCase):
self._manage_dependencies(sections_dict)
for job in self.job_list.get_job_list():
job._init_runtime_parameters()
+ job.update_parameters = MagicMock()
def _manage_dependencies(self, sections_dict):
for job in self.job_list.get_job_list():