diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index dec9660ac150a2cbb65a1235429ce1371388c846..3ab6e77980b7e1657f82de1bc937fc1bc4152b27 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -85,6 +85,7 @@ from typing import List import autosubmit.history.utils as HUtils import autosubmit.helpers.autosubmit_helper as AutosubmitHelper import autosubmit.statistics.utils as StatisticsUtils +from autosubmit.helpers.utils import proccess_id, terminate_child_process from contextlib import suppress @@ -1696,6 +1697,24 @@ class Autosubmit: job.status = Status.WAITING + @staticmethod + def terminate_child_process(expid, platform = None): + # get pid of the main process + pid = os.getpid() + # In case some one used 4.1.6 or 4.1.5 + process_ids = proccess_id(expid,"run", single_instance = False, platform = platform) + if process_ids: + for process_id in [ process_id for process_id in process_ids if process_id != pid]: + # force kill + os.kill(process_id, signal.SIGKILL) + process_ids = proccess_id(expid,"log", single_instance = False, platform = platform) + # 4.1.7 + + if process_ids: + for process_id in [ process_id for process_id in process_ids if process_id != pid]: + # force kill + os.kill(process_id, signal.SIGKILL) + + @staticmethod def terminate(all_threads): # Closing threads on Ctrl+C @@ -1716,6 +1735,7 @@ class Autosubmit: sleep(10) timeout += 10 + @staticmethod def manage_wrapper_job(as_conf, job_list, platform, wrapper_id, save=False): check_wrapper_jobs_sleeptime = as_conf.get_wrapper_check_time() @@ -2161,6 +2181,7 @@ class Autosubmit: did_run = True try: if Autosubmit.exit: + terminate_child_process(expid) Autosubmit.terminate(threading.enumerate()) if job_list.get_failed(): return 1 @@ -2319,8 +2340,6 @@ class Autosubmit: raise AutosubmitCritical(message, 7000) except BaseException as e: raise # If this happens, there is a bug in the code or an exception not-well caught - - Log.result("No more jobs to run.") if not did_run and len(job_list.get_completed_without_logs()) > 0: #connect to platforms @@ -2359,7 +2378,7 @@ class Autosubmit: Autosubmit.database_fix(expid) except Exception as e: pass - + terminate_child_process(expid) for platform in platforms_to_test: platform.closeConnection() if len(job_list.get_failed()) > 0: @@ -2374,10 +2393,13 @@ class Autosubmit: Log.warning("Database is locked") except (portalocker.AlreadyLocked, portalocker.LockException) as e: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" + terminate_child_process(expid) raise AutosubmitCritical(message, 7000) except AutosubmitCritical as e: + terminate_child_process(expid) raise except BaseException as e: + terminate_child_process(expid) raise finally: if profile: @@ -6128,8 +6150,133 @@ class Autosubmit: job.platform.get_logs_files(expid, job.remote_logs) return job_list + @staticmethod + def cat_log(exp_or_job_id: str, file: Union[None, str], mode: Union[None, str], inspect:bool=False) -> bool: + """The cat-log command allows users to view Autosubmit logs using the command-line. + + It is possible to use ``autosubmit cat-log`` for Workflow and for Job logs. It decides + whether to show Workflow or Job logs based on the ``ID`` given. Shorter ID's, such as + ``a000` are considered Workflow ID's, so it will display logs for that workflow. For + longer ID's, such as ``a000_20220401_fc0_1_GSV``, the command will display logs for + that specific job. + + Users can choose the log file using the ``FILE`` parameter, to display an error or + output log file, for instance. + + Finally, the ``MODE`` parameter allows users to choose whether to display the complete + file contents (similar to the ``cat`` command) or to start tailing its output (akin to + ``tail -f``). + + Args: + exp_or_job_id: A workflow or job ID. + file: the type of the file to be printed (not the file path!). + mode: the mode to print the file (e.g. cat, tail). + inspect: when True it will use job files in tmp/ instead of tmp/LOG_a000/. + """ + def view_file(log_file: Path, mode: str): + if mode == 'c': + cmd = ['cat', str(log_file)] + subprocess.Popen( + cmd, + stdin=subprocess.DEVNULL, + stdout=None + ) + return 0 + elif mode == 't': + cmd = [ + 'tail', + '--lines=+1', + '--retry', + '--follow=name', + workflow_log_file + ] + proc = subprocess.Popen(cmd, stdin=subprocess.DEVNULL) + with suppress(KeyboardInterrupt): + return proc.wait() == 0 + + MODES = { + 'c': 'cat', + 't': 'tail' + } + FILES = { + 'o': 'output', + 'j': 'job', + 'e': 'error', + 's': 'status' + } + if file is None: + file = 'o' + if file not in FILES.keys(): + raise AutosubmitCritical(f'Invalid cat-log file {file}. Expected one of {[f for f in FILES.keys()]}', 7011) + if mode is None: + mode = 'c' + if mode not in MODES.keys(): + raise AutosubmitCritical(f'Invalid cat-log mode {mode}. Expected one of {[m for m in MODES.keys()]}', 7011) + + is_workflow = '_' not in exp_or_job_id + + expid = exp_or_job_id if is_workflow else exp_or_job_id[:4] + + # Workflow folder. + # e.g. ~/autosubmit/a000 + exp_path = Path(BasicConfig.LOCAL_ROOT_DIR, expid) + # Directory with workflow temporary/volatile files. Contains the output of commands such as inspect, + # and also STAT/COMPLETED files for each workflow task. + # e.g. ~/autosubmit/a000/tmp + tmp_path = exp_path / BasicConfig.LOCAL_TMP_DIR + # Directory with logs for Autosubmit executed commands (create, run, etc.) and jobs statuses files. + # e.g. ~/autosubmit/a000/tmp/ASLOGS + aslogs_path = tmp_path / BasicConfig.LOCAL_ASLOG_DIR + # Directory with the logs of the workflow run, for each workflow task. Includes the generated + # .cmd files, and STAT/COMPLETED files for the run. The files with similar names in the parent + # directory are generated with inspect, while these are with the run subcommand. + # e.g. ~/autosubmit/a000/tmp/LOG_a000 + exp_logs_path = tmp_path / f'LOG_{expid}' + + if is_workflow: + if file not in ['o', 'e', 's']: + raise AutosubmitCritical(f'Invalid arguments for cat-log: workflow logs only support o(output), ' + f'e(error), and s(status). Requested: {mode}', 7011) + + if file in ['e', 'o']: + search_pattern = '*_run_err.log' if file == 'e' else '*_run.log' + workflow_log_files = sorted(aslogs_path.glob(search_pattern)) + else: + search_pattern = f'{expid}_*.txt' + status_files_path = exp_path / 'status' + workflow_log_files = sorted(status_files_path.glob(search_pattern)) + + if not workflow_log_files: + Log.info('No logs found.') + return True + workflow_log_file = workflow_log_files[-1] + if not workflow_log_file.is_file(): + raise AutosubmitCritical(f'The workflow log file found is not a file: {workflow_log_file}', 7011) + return view_file(workflow_log_file, mode) == 0 + else: + job_logs_path = tmp_path if inspect else exp_logs_path + if file == 'j': + workflow_log_file = job_logs_path / f'{exp_or_job_id}.cmd' + elif file == 's': + workflow_log_file = job_logs_path / f'{exp_or_job_id}_TOTAL_STATS' + else: + search_pattern = f'{exp_or_job_id}.*.{"err" if file == "e" else "out"}' + workflow_log_files = sorted(job_logs_path.glob(search_pattern)) + if not workflow_log_files: + Log.info('No logs found.') + return True + workflow_log_file = workflow_log_files[-1] + + if not workflow_log_file.exists(): + Log.info('No logs found.') + return True + + if not workflow_log_file.is_file(): + raise AutosubmitCritical(f'The job log file {file} found is not a file: {workflow_log_file}', 7011) + + return view_file(workflow_log_file, mode) == 0 @staticmethod def stop(expids, force=False, all=False, force_all=False, cancel=False, current_status="", status='FAILED'): @@ -6169,23 +6316,7 @@ class Autosubmit: raise AutosubmitCritical( "An error occurred while retrieving the expids", 7011, str(e)) return expids - def proccess_id(expid=None): - # Retrieve the process id of the autosubmit process - # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "expid" | awk '{print $2}' - try: - command = f'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "{expid}" ' - process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) - output, error = process.communicate() - output = output.decode(locale.getlocale()[1]) - output = output.split('\n') - # delete noise - if output: - output = [x.split()[1] for x in output if x and "grep" not in x] - except Exception as e: - raise AutosubmitCritical( - "An error occurred while retrieving the process id", 7011, str(e)) - return output[0] if output else "" # Starts there if status not in Status.VALUE_TO_KEY.values(): raise AutosubmitCritical("Invalid status. Expected one of {0}".format(Status.VALUE_TO_KEY.keys()), 7011) @@ -6272,133 +6403,8 @@ class Autosubmit: if status in Status.VALUE_TO_KEY.values(): job.status = Status.KEY_TO_VALUE[status] job_list.save() + terminate_child_process(expid) - @staticmethod - def cat_log(exp_or_job_id: str, file: Union[None, str], mode: Union[None, str], inspect:bool=False) -> bool: - """The cat-log command allows users to view Autosubmit logs using the command-line. - - It is possible to use ``autosubmit cat-log`` for Workflow and for Job logs. It decides - whether to show Workflow or Job logs based on the ``ID`` given. Shorter ID's, such as - ``a000` are considered Workflow ID's, so it will display logs for that workflow. For - longer ID's, such as ``a000_20220401_fc0_1_GSV``, the command will display logs for - that specific job. - - Users can choose the log file using the ``FILE`` parameter, to display an error or - output log file, for instance. - - Finally, the ``MODE`` parameter allows users to choose whether to display the complete - file contents (similar to the ``cat`` command) or to start tailing its output (akin to - ``tail -f``). - - Args: - exp_or_job_id: A workflow or job ID. - file: the type of the file to be printed (not the file path!). - mode: the mode to print the file (e.g. cat, tail). - inspect: when True it will use job files in tmp/ instead of tmp/LOG_a000/. - """ - def view_file(log_file: Path, mode: str): - if mode == 'c': - cmd = ['cat', str(log_file)] - subprocess.Popen( - cmd, - stdin=subprocess.DEVNULL, - stdout=None - ) - return 0 - elif mode == 't': - cmd = [ - 'tail', - '--lines=+1', - '--retry', - '--follow=name', - workflow_log_file - ] - proc = subprocess.Popen(cmd, stdin=subprocess.DEVNULL) - with suppress(KeyboardInterrupt): - return proc.wait() == 0 - - MODES = { - 'c': 'cat', - 't': 'tail' - } - FILES = { - 'o': 'output', - 'j': 'job', - 'e': 'error', - 's': 'status' - } - if file is None: - file = 'o' - if file not in FILES.keys(): - raise AutosubmitCritical(f'Invalid cat-log file {file}. Expected one of {[f for f in FILES.keys()]}', 7011) - if mode is None: - mode = 'c' - if mode not in MODES.keys(): - raise AutosubmitCritical(f'Invalid cat-log mode {mode}. Expected one of {[m for m in MODES.keys()]}', 7011) - - is_workflow = '_' not in exp_or_job_id - - expid = exp_or_job_id if is_workflow else exp_or_job_id[:4] - - # Workflow folder. - # e.g. ~/autosubmit/a000 - exp_path = Path(BasicConfig.LOCAL_ROOT_DIR, expid) - # Directory with workflow temporary/volatile files. Contains the output of commands such as inspect, - # and also STAT/COMPLETED files for each workflow task. - # e.g. ~/autosubmit/a000/tmp - tmp_path = exp_path / BasicConfig.LOCAL_TMP_DIR - # Directory with logs for Autosubmit executed commands (create, run, etc.) and jobs statuses files. - # e.g. ~/autosubmit/a000/tmp/ASLOGS - aslogs_path = tmp_path / BasicConfig.LOCAL_ASLOG_DIR - # Directory with the logs of the workflow run, for each workflow task. Includes the generated - # .cmd files, and STAT/COMPLETED files for the run. The files with similar names in the parent - # directory are generated with inspect, while these are with the run subcommand. - # e.g. ~/autosubmit/a000/tmp/LOG_a000 - exp_logs_path = tmp_path / f'LOG_{expid}' - - if is_workflow: - if file not in ['o', 'e', 's']: - raise AutosubmitCritical(f'Invalid arguments for cat-log: workflow logs only support o(output), ' - f'e(error), and s(status). Requested: {mode}', 7011) - - if file in ['e', 'o']: - search_pattern = '*_run_err.log' if file == 'e' else '*_run.log' - workflow_log_files = sorted(aslogs_path.glob(search_pattern)) - else: - search_pattern = f'{expid}_*.txt' - status_files_path = exp_path / 'status' - workflow_log_files = sorted(status_files_path.glob(search_pattern)) - - if not workflow_log_files: - Log.info('No logs found.') - return True - workflow_log_file = workflow_log_files[-1] - if not workflow_log_file.is_file(): - raise AutosubmitCritical(f'The workflow log file found is not a file: {workflow_log_file}', 7011) - - return view_file(workflow_log_file, mode) == 0 - else: - job_logs_path = tmp_path if inspect else exp_logs_path - if file == 'j': - workflow_log_file = job_logs_path / f'{exp_or_job_id}.cmd' - elif file == 's': - workflow_log_file = job_logs_path / f'{exp_or_job_id}_TOTAL_STATS' - else: - search_pattern = f'{exp_or_job_id}.*.{"err" if file == "e" else "out"}' - workflow_log_files = sorted(job_logs_path.glob(search_pattern)) - if not workflow_log_files: - Log.info('No logs found.') - return True - workflow_log_file = workflow_log_files[-1] - - if not workflow_log_file.exists(): - Log.info('No logs found.') - return True - - if not workflow_log_file.is_file(): - raise AutosubmitCritical(f'The job log file {file} found is not a file: {workflow_log_file}', 7011) - - return view_file(workflow_log_file, mode) == 0 diff --git a/autosubmit/helpers/utils.py b/autosubmit/helpers/utils.py index fca94a126a7310ab6184ca25a0580ab12d1b520a..43b4c923ca8bc5e2069e6dc7c135948d75805ad4 100644 --- a/autosubmit/helpers/utils.py +++ b/autosubmit/helpers/utils.py @@ -1,3 +1,5 @@ +import signal + import os import pwd @@ -5,27 +7,71 @@ from log.log import Log, AutosubmitCritical from autosubmitconfigparser.config.basicconfig import BasicConfig from typing import Tuple +import subprocess +import locale + + +def terminate_child_process(expid, platform=None): + # get pid of the main process + pid = os.getpid() + # In case someone used 4.1.6 or 4.1.5 + process_ids = proccess_id(expid, "run", single_instance=False, platform=platform) + if process_ids: + for process_id in [process_id for process_id in process_ids if process_id != pid]: + # force kill + os.kill(process_id, signal.SIGKILL) + process_ids = proccess_id(expid, "log", single_instance=False, platform=platform) + # 4.1.7 + + if process_ids: + for process_id in [process_id for process_id in process_ids if process_id != pid]: + # force kill + os.kill(process_id, signal.SIGKILL) + +def proccess_id(expid=None, command="run", single_instance=True, platform=None): + # Retrieve the process id of the autosubmit process + # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "expid" | awk '{print $2}' + try: + if not platform: + command = f'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "{command}" | grep "{expid}" ' + else: + command = f'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "{command}" | grep "{expid}" | grep " {platform.lower()} " ' + process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) + output, error = process.communicate() + output = output.decode(locale.getlocale()[1]) + output = output.split('\n') + # delete noise + if output: + output = [int(x.split()[1]) for x in output if x and "grep" not in x] + + except Exception as e: + raise AutosubmitCritical( + "An error occurred while retrieving the process id", 7011, str(e)) + if single_instance: + return output[0] if output else "" + else: + return output if output else "" + def check_experiment_ownership(expid, basic_config, raise_error=False, logger=None): - # [A-Za-z09]+ variable is not needed, LOG is global thus it will be read if available - ## type: (str, BasicConfig, bool, Log) -> Tuple[bool, bool, str] - my_user_ID = os.getuid() - current_owner_ID = 0 - current_owner_name = "NA" - try: - current_owner_ID = os.stat(os.path.join(basic_config.LOCAL_ROOT_DIR, expid)).st_uid - current_owner_name = pwd.getpwuid(os.stat(os.path.join(basic_config.LOCAL_ROOT_DIR, expid)).st_uid).pw_name - except Exception as e: - if logger: - logger.info("Error while trying to get the experiment's owner information.") - finally: - if current_owner_ID <= 0 and logger: - logger.info("Current owner '{0}' of experiment {1} does not exist anymore.", current_owner_name, expid) - is_owner = current_owner_ID == my_user_ID - eadmin_user = os.popen('id -u eadmin').read().strip() # If eadmin no exists, it would be "" so INT() would fail. - if eadmin_user != "": - is_eadmin = my_user_ID == int(eadmin_user) - else: - is_eadmin = False - if not is_owner and raise_error: - raise AutosubmitCritical("You don't own the experiment {0}.".format(expid), 7012) - return is_owner, is_eadmin, current_owner_name \ No newline at end of file + # [A-Za-z09]+ variable is not needed, LOG is global thus it will be read if available + ## type: (str, BasicConfig, bool, Log) -> Tuple[bool, bool, str] + my_user_ID = os.getuid() + current_owner_ID = 0 + current_owner_name = "NA" + try: + current_owner_ID = os.stat(os.path.join(basic_config.LOCAL_ROOT_DIR, expid)).st_uid + current_owner_name = pwd.getpwuid(os.stat(os.path.join(basic_config.LOCAL_ROOT_DIR, expid)).st_uid).pw_name + except Exception as e: + if logger: + logger.info("Error while trying to get the experiment's owner information.") + finally: + if current_owner_ID <= 0 and logger: + logger.info("Current owner '{0}' of experiment {1} does not exist anymore.", current_owner_name, expid) + is_owner = current_owner_ID == my_user_ID + eadmin_user = os.popen('id -u eadmin').read().strip() # If eadmin no exists, it would be "" so INT() would fail. + if eadmin_user != "": + is_eadmin = my_user_ID == int(eadmin_user) + else: + is_eadmin = False + if not is_owner and raise_error: + raise AutosubmitCritical("You don't own the experiment {0}.".format(expid), 7012) + return is_owner, is_eadmin, current_owner_name \ No newline at end of file diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index b09d07e36cbab682dbb8ba4c2c8dd64a093b4ce9..0209ce41b202e322ca9b58a15f67448381aca232 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -22,7 +22,7 @@ from threading import Thread import threading import getpass from paramiko.agent import Agent - +from autosubmit.helpers.utils import terminate_child_process def threaded(fn): def wrapper(*args, **kwargs): thread = Thread(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_X11") @@ -113,8 +113,8 @@ class ParamikoPlatform(Platform): if display is None: display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) - - + self.log_retrieval_process_active = False + terminate_child_process(self.expid, self.name) def test_connection(self,as_conf): """ Test if the connection is still alive, reconnect if not. @@ -325,7 +325,6 @@ class ParamikoPlatform(Platform): else: raise AutosubmitError( "Couldn't establish a connection to the specified host, wrong configuration?", 6003, str(e)) - def check_completed_files(self, sections=None): if self.host == 'localhost': return None