diff --git a/VERSION b/VERSION index 18837e703d86e02a82d3525a10a24bd9fdf29ea1..5d30083e95d9e90fe531994a0a63f7e5a92c4f1f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.9 +4.1.10 diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 2eb58214d820ccd648b9e806731938b2282ecd25..1223bedfe69f5a45aaaaf7dc18e127a3fd992be9 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -13,6 +13,8 @@ from autosubmit.helpers.parameters import autosubmit_parameter from log.log import AutosubmitCritical, AutosubmitError, Log from multiprocessing import Process, Queue, Event +import time + # stop the background task gracefully before exit def stop_background(stop_event, process): # request the background thread stop @@ -832,28 +834,33 @@ class Platform(object): self.connected = False self.restore_connection(None) # check if id of self.main_process exists with ps ax | grep self.main_process_id + max_logs_to_process = 60 while not event.is_set() and os.system(f"ps ax | grep {str(self.main_process_id)} | grep -v grep > /dev/null 2>&1") == 0: + time.sleep(60) + logs_processed = 0 # avoid deadlocks just in case try: - job,children = self.recovery_queue.get(block=False) - if job.wrapper_type != "vertical": - if f'{job.name}_{job.fail_count}' in job_names_processed: - continue - else: - if f'{job.name}' in job_names_processed: - continue - job.children = children - job.platform = self - if job.x11: - Log.debug("Job {0} is an X11 job, skipping log retrieval as they're written in the ASLOGS".format(job.name)) - continue - try: - job.retrieve_logfiles(self, raise_error=True) + while not self.recovery_queue.empty() and logs_processed < max_logs_to_process: + logs_processed += 1 + job,children = self.recovery_queue.get(block=False) if job.wrapper_type != "vertical": - job_names_processed.add(f'{job.name}_{job.fail_count}') + if f'{job.name}_{job.fail_count}' in job_names_processed: + continue else: - job_names_processed.add(f'{job.name}') - except: - pass + if f'{job.name}' in job_names_processed: + continue + job.children = children + job.platform = self + if job.x11: + Log.debug("Job {0} is an X11 job, skipping log retrieval as they're written in the ASLOGS".format(job.name)) + continue + try: + job.retrieve_logfiles(self, raise_error=True) + if job.wrapper_type != "vertical": + job_names_processed.add(f'{job.name}_{job.fail_count}') + else: + job_names_processed.add(f'{job.name}') + except: + pass except queue.Empty: pass except (IOError, OSError):