diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6b16b161e9d9e0dafb8c9b8628db4fb8d2265cf2..dec9660ac150a2cbb65a1235429ce1371388c846 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2136,6 +2136,7 @@ class Autosubmit: except Exception as e: raise AutosubmitCritical("Error in run initialization", 7014, str(e)) # Changing default to 7014 Log.debug("Running main running loop") + did_run = False ######################### # AUTOSUBMIT - MAIN LOOP ######################### @@ -2155,6 +2156,9 @@ class Autosubmit: max_recovery_retrials = as_conf.experiment_data.get("CONFIG",{}).get("RECOVERY_RETRIALS",3650) # (72h - 122h ) recovery_retrials = 0 while job_list.get_active(): + for job in [job for job in job_list.get_job_list() if job.status == Status.READY]: + job.update_parameters(as_conf, {}) + did_run = True try: if Autosubmit.exit: Autosubmit.terminate(threading.enumerate()) @@ -2318,7 +2322,15 @@ class Autosubmit: Log.result("No more jobs to run.") - + if not did_run and len(job_list.get_completed_without_logs()) > 0: + #connect to platforms + Log.info(f"Connecting to the platforms, to recover missing logs") + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + raise AutosubmitCritical("No platforms configured!!!", 7014) + platforms = [value for value in submitter.platforms.values()] + Autosubmit.restore_platforms(platforms, as_conf=as_conf, expid=expid) # Wait for all remaining threads of I/O, close remaining connections # search hint - finished run Log.info("Waiting for all logs to be updated") @@ -2330,6 +2342,7 @@ class Autosubmit: if len(job_list.get_completed_without_logs()) == 0: break for job in job_list.get_completed_without_logs(): + job.platform = submitter.platforms[job.platform_name.upper()] job_list.update_log_status(job, as_conf) sleep(1) if remaining % 10 == 0: diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 77e97f27c7325150ce6cd4687ba3c821c6605770..be0a76da67787315c7bf974c7c094ae6c3dc0e4b 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1977,7 +1977,7 @@ class Job(object): template_file.close() else: if self.type == Type.BASH: - template = 'sleep 500' + template = 'sleep 5' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 2410cba62ab7229e451e4d40e1ea1c88876efbee..81851902b7bd1fe3535d46e068f2782bba87fe9e 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2680,7 +2680,8 @@ class JobList(object): # Log name has this format: # a02o_20000101_fc0_2_SIM.20240212115021.err # $jobname.$(YYYYMMDDHHMMSS).err or .out - self.update_log_status(job, as_conf) + if not first_time: + self.update_log_status(job, as_conf) if job.synchronize is not None and len(str(job.synchronize)) > 0: tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index f7db52563a20f7d9ed55777cb02c1f41a54c13fc..44beb9ea69856265eaef28e5f535b366a1c3fc81 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -174,6 +174,7 @@ class EcPlatform(ParamikoPlatform): as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): self.log_retrieval_process_active = True + if as_conf.experiment_data["ASMISC"].get("COMMAND","").lower() == "run": self.recover_job_logs() @@ -208,6 +209,12 @@ class EcPlatform(ParamikoPlatform): try: if output.lower().find("yes") != -1: self.connected = True + if not self.log_retrieval_process_active and ( + as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', + "false")).lower() == "false"): + self.log_retrieval_process_active = True + if as_conf.experiment_data["ASMISC"].get("COMMAND", "").lower() == "run": + self.recover_job_logs() return "OK" else: self.connected = False diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index f0a23255c23eba42d6e193a0147f300a5024abb8..e474ee3a7832eb7a252f3d05fbde38836fe869bf 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -65,6 +65,12 @@ class PJMPlatform(ParamikoPlatform): tmp_path = os.path.join(exp_id_path, "tmp") self._submit_script_path = os.path.join( tmp_path, self.config.get("LOCAL_ASLOG_DIR"), "submit_" + self.name + ".sh") + self._submit_script_base_name = os.path.join( + tmp_path, self.config.get("LOCAL_ASLOG_DIR"), "submit_") + self._submit_script_file = open(self._submit_script_path, 'wb').close() + + def generate_new_name_submit_script_file(self): + self._submit_script_path = self._submit_script_base_name + os.urandom(16).hex() + ".sh" self._submit_script_file = open(self._submit_script_path, 'wb').close() def submit_error(self,output): @@ -158,6 +164,10 @@ class PJMPlatform(ParamikoPlatform): def open_submit_script(self): self._submit_script_file = open(self._submit_script_path, 'wb').close() + # remove file + with suppress(FileNotFoundError): + os.remove(self._submit_script_path) + self.generate_new_name_submit_script_file() self._submit_script_file = open(self._submit_script_path, 'ab') def get_submit_script(self): @@ -197,6 +207,8 @@ class PJMPlatform(ParamikoPlatform): self.send_file(self.get_submit_script(), False) cmd = os.path.join(self.get_files_path(), os.path.basename(self._submit_script_path)) + # remove file after submisison + cmd = f"{cmd} ; rm {cmd}" try: self.send_command(cmd) except AutosubmitError as e: diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index c52a6c0e1c3bb20be24c67efe9ce6d880f7de1df..13d0a7cad28f0eb00d20acf0862266c23df6ae8a 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +from contextlib import suppress # Copyright 2017-2020 Earth Sciences Department, BSC-CNS @@ -67,6 +68,12 @@ class SlurmPlatform(ParamikoPlatform): tmp_path = os.path.join(exp_id_path, "tmp") self._submit_script_path = os.path.join( tmp_path, self.config.get("LOCAL_ASLOG_DIR"), "submit_" + self.name + ".sh") + self._submit_script_base_name = os.path.join( + tmp_path, self.config.get("LOCAL_ASLOG_DIR"), "submit_") + self._submit_script_file = open(self._submit_script_path, 'wb').close() + + def generate_new_name_submit_script_file(self): + self._submit_script_path = self._submit_script_base_name + os.urandom(16).hex() + ".sh" self._submit_script_file = open(self._submit_script_path, 'wb').close() def process_batch_ready_jobs(self,valid_packages_to_submit,failed_packages,error_message="",hold=False): @@ -199,6 +206,10 @@ class SlurmPlatform(ParamikoPlatform): def open_submit_script(self): self._submit_script_file = open(self._submit_script_path, 'wb').close() + # remove file + with suppress(FileNotFoundError): + os.remove(self._submit_script_path) + self.generate_new_name_submit_script_file() self._submit_script_file = open(self._submit_script_path, 'ab') def get_submit_script(self): @@ -252,6 +263,8 @@ class SlurmPlatform(ParamikoPlatform): self.send_file(self.get_submit_script(), False) cmd = os.path.join(self.get_files_path(), os.path.basename(self._submit_script_path)) + # remove file after submisison + cmd = f"{cmd} ; rm {cmd}" try: self.send_command(cmd) except AutosubmitError as e: