From 1a5f3fc056c75d989ce2b8f847bf197b8dbe18a9 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 6 May 2024 15:49:54 +0200 Subject: [PATCH 1/8] update version --- autosubmit/autosubmit.py | 123 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 3 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index a3fd5e788..34106dc75 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -657,6 +657,17 @@ class Autosubmit: help='Read job files generated by the inspect subcommand.') subparser.add_argument('ID', metavar='ID', help='An ID of a Workflow (eg a000) or a Job (eg a000_20220401_fc0_1_1_APPLICATION).') + # stop + subparser = subparsers.add_parser( + 'stop', description='Stop an autosubmit process') + group = subparser.add_mutually_exclusive_group(required=True) + group.add_argument('expid', help='experiment identifier', nargs='?') + group.add_argument('-a', '--all', default=False, action='store_true', + help='Stop all user autosubmit processes') + subparser.add_argument('-k', '--kill', default=False, action='store_true', + help='Kills active jobs and set them to failure') + subparser.add_argument('-s', '--status', default="FAILED", action='store', metavar='STATUS', + help='Final status of killed jobs. Options are WAITING,COMPLETED,FAILED,SUSPENDED. Default is FAILED.') args, unknown = parser.parse_known_args() if args.version: Log.info(Autosubmit.autosubmit_version) @@ -761,6 +772,8 @@ class Autosubmit: return Autosubmit.update_description(args.expid, args.description) elif args.command == 'cat-log': return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect) + elif args.command == 'stop': + return Autosubmit.stop(args.expid, args.kill, args.status, args.all) @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): @@ -784,6 +797,9 @@ class Autosubmit: expid_less = ["expid", "describe", "testcase", "install", "-v", "readme", "changelog", "configure", "unarchive", "cat-log"] + if args.command == "stop": + if args.all: + expid_less.append("stop") global_log_command = ["delete", "archive", "upgrade"] if "offer" in args: if args.offer: @@ -814,7 +830,7 @@ class Autosubmit: if args.command in BasicConfig.ALLOWED_HOSTS: if 'all' not in BasicConfig.ALLOWED_HOSTS[args.command] and not (host in BasicConfig.ALLOWED_HOSTS[args.command] or fullhost in BasicConfig.ALLOWED_HOSTS[args.command]): raise AutosubmitCritical(message, 7071) - if expid != 'None' and args.command not in expid_less and args.command not in global_log_command: + if (expid != 'None' and expid) and args.command not in expid_less and args.command not in global_log_command: as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) as_conf.reload(force_load=True) if len(as_conf.experiment_data) == 0: @@ -828,7 +844,7 @@ class Autosubmit: if not os.path.exists(exp_path): raise AutosubmitCritical("Experiment does not exist", 7012) # delete is treated differently - if args.command not in ["monitor", "describe", "delete", "report", "stats", "dbfix"]: + if args.command not in ["monitor", "describe", "delete", "report", "stats", "dbfix", "stop"]: owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=True) else: owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=False) @@ -888,7 +904,7 @@ class Autosubmit: Autosubmit.autosubmit_version, expid,args.command), 7014) else: - if expid == 'None': + if expid == 'None' or not expid: exp_id = "" else: exp_id = "_" + expid @@ -6058,6 +6074,105 @@ class Autosubmit: job.platform.get_logs_files(expid, job.remote_logs) return job_list + + @staticmethod + def stop(expids, cancel=False, status="FAILED", all=False, force=False): + """ + The stop command allows users to stop the desired experiments. + + It is possible to use ``autosubmit stop -kill`` to also cancel all jobs in the remote and local platform queues and set them to failed if ``--status`` flag is not prompt. + + :param expids: List of experiments to stop + :param cancel: Cancel all jobs in the remote and local platform queues + :param status: desired final status of the jobs canceled (default: FAILED) + :param all: All user experiments + :return: + """ + def retrieve_expids(): + # Retrieve all expids in use by autosubmit attached to my current user + # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | awk '{print $NF}' | sort -u + expids = [] + try: + command = 'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | awk \'{print $NF}\' | sort -u' + process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) + output, error = process.communicate() + output = output.decode(locale.getlocale()[1]) + #delete -u from output + output = output.replace("-u", "") + expids = output.split('\n') + # delete empty strings + expids = [x for x in expids if x] + except Exception as e: + raise AutosubmitCritical( + "An error occurred while retrieving the expids", 7011, str(e)) + return expids + def proccess_id(expid): + # Retrieve the process id of the autosubmit process + # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "expid" | awk '{print $2}' + try: + command = f'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "{expid}" | awk \'{{print $2}}\'' + process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) + output, error = process.communicate() + output = output.decode(locale.getlocale()[1]) + # delete empty strings + output = output.split('\n') + except Exception as e: + raise AutosubmitCritical( + "An error occurred while retrieving the process id", 7011, str(e)) + return output[0] + if status not in Status.VALUE_TO_KEY.values(): + raise AutosubmitCritical("Invalid status. Expected one of {0}".format(Status.VALUE_TO_KEY.keys()), 7011) + # First retrieve expids + if all: + expids = retrieve_expids() + else: + expids = expids.lower() + if "," in expids: + expids = expids.split(",") + else: + expids = expids.split(" ") + # Obtain the proccess id + errors = "" + for expid in expids: + process_id = proccess_id(expid) + if process_id: + # Send the signal to stop the autosubmit process + try: + if force: + os.kill(int(process_id), signal.SIGKILL) # don't wait for logs + else: + os.kill(int(process_id), signal.SIGINT) # wait for logs + if not cancel: + process_end = False + while(not process_end): + process_active = process_id(expid) + if process_id != process_active: + process_end = True + if cancel: + # call prepare_run to obtain the platforms and as_conf + job_list, submitter, exp_history, host, as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run( + expid) + # get active jobs + active_jobs = [job for job in job_list.get_job_list() if + job.status in [Status.QUEUING, Status.RUNNING, Status.SUBMITTED]] + # change status of active jobs + for job in active_jobs: + # Cancel from the remote platform + job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True) + status = status.upper() + job.status = Status.VALUE_TO_KEY[status] + job_list.save() + except Exception as e: + errors += f"An error occurred while stopping the autosubmit process for expid {expid}: {str(e)}\n" + Log.warning(errors) + + + + + + + + @staticmethod def cat_log(exp_or_job_id: str, file: Union[None, str], mode: Union[None, str], inspect:bool=False) -> bool: """The cat-log command allows users to view Autosubmit logs using the command-line. @@ -6185,3 +6300,5 @@ class Autosubmit: raise AutosubmitCritical(f'The job log file {file} found is not a file: {workflow_log_file}', 7011) return view_file(workflow_log_file, mode) == 0 + + -- GitLab From c9b93a0196615ca78c137d1a55e876a682042fbf Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 8 May 2024 11:44:31 +0200 Subject: [PATCH 2/8] Added stop command --- autosubmit/autosubmit.py | 100 +++++++++++++++++++---------- autosubmit/job/job.py | 2 +- autosubmit/job/job_common.py | 1 + test/unit/test_stop.py | 121 +++++++++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+), 35 deletions(-) create mode 100644 test/unit/test_stop.py diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 34106dc75..6e9e0ad1c 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -664,10 +664,14 @@ class Autosubmit: group.add_argument('expid', help='experiment identifier', nargs='?') group.add_argument('-a', '--all', default=False, action='store_true', help='Stop all user autosubmit processes') - subparser.add_argument('-k', '--kill', default=False, action='store_true', + subparser.add_argument('-c', '--cancel', default=False, action='store_true', help='Kills active jobs and set them to failure') + subparser.add_argument('-oc', '--only_cancel', default=False, action='store_true', + help='Cancel active jobs if process is stopped') subparser.add_argument('-s', '--status', default="FAILED", action='store', metavar='STATUS', - help='Final status of killed jobs. Options are WAITING,COMPLETED,FAILED,SUSPENDED. Default is FAILED.') + help='Final status of killed jobs. Default is FAILED.') + subparser.add_argument('-f', '--force', default=False, action='store_true', + help='Force stop autosubmit process, equivalent to kill -9') args, unknown = parser.parse_known_args() if args.version: Log.info(Autosubmit.autosubmit_version) @@ -773,7 +777,7 @@ class Autosubmit: elif args.command == 'cat-log': return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect) elif args.command == 'stop': - return Autosubmit.stop(args.expid, args.kill, args.status, args.all) + return Autosubmit.stop(args.expid, args.cancel, args.only_cancel, args.status, args.all, args.force) @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): @@ -6076,7 +6080,7 @@ class Autosubmit: @staticmethod - def stop(expids, cancel=False, status="FAILED", all=False, force=False): + def stop(expids, cancel=False, only_cancel=False, status="FAILED", all=False, force=False): """ The stop command allows users to stop the desired experiments. @@ -6084,8 +6088,10 @@ class Autosubmit: :param expids: List of experiments to stop :param cancel: Cancel all jobs in the remote and local platform queues + :param only_cancel: Cancel all jobs in the remote and local platform queues if process is already stopped :param status: desired final status of the jobs canceled (default: FAILED) :param all: All user experiments + :return: """ def retrieve_expids(): @@ -6106,23 +6112,28 @@ class Autosubmit: raise AutosubmitCritical( "An error occurred while retrieving the expids", 7011, str(e)) return expids - def proccess_id(expid): + def proccess_id(expid=None): # Retrieve the process id of the autosubmit process # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "expid" | awk '{print $2}' try: - command = f'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "{expid}" | awk \'{{print $2}}\'' + command = f'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "{expid}" ' process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) output, error = process.communicate() output = output.decode(locale.getlocale()[1]) - # delete empty strings output = output.split('\n') + # delete noise + if output: + output = [x.split()[1] for x in output if x and "grep" not in x] + except Exception as e: raise AutosubmitCritical( "An error occurred while retrieving the process id", 7011, str(e)) - return output[0] + return output[0] if output else "" if status not in Status.VALUE_TO_KEY.values(): raise AutosubmitCritical("Invalid status. Expected one of {0}".format(Status.VALUE_TO_KEY.keys()), 7011) # First retrieve expids + if only_cancel: + cancel=True if all: expids = retrieve_expids() else: @@ -6133,38 +6144,59 @@ class Autosubmit: expids = expids.split(" ") # Obtain the proccess id errors = "" + valid_expids = [] for expid in expids: - process_id = proccess_id(expid) - if process_id: + process_id_ = proccess_id(expid) + if not process_id_ and only_cancel: + valid_expids.append(expid) + elif process_id_: # Send the signal to stop the autosubmit process try: if force: - os.kill(int(process_id), signal.SIGKILL) # don't wait for logs + try: + os.kill(int(process_id_), signal.SIGKILL) # don't wait for logs + except: + continue else: - os.kill(int(process_id), signal.SIGINT) # wait for logs - if not cancel: - process_end = False - while(not process_end): - process_active = process_id(expid) - if process_id != process_active: - process_end = True - if cancel: - # call prepare_run to obtain the platforms and as_conf - job_list, submitter, exp_history, host, as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run( - expid) - # get active jobs - active_jobs = [job for job in job_list.get_job_list() if - job.status in [Status.QUEUING, Status.RUNNING, Status.SUBMITTED]] - # change status of active jobs - for job in active_jobs: - # Cancel from the remote platform - job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True) - status = status.upper() - job.status = Status.VALUE_TO_KEY[status] - job_list.save() + try: + os.kill(int(process_id_), signal.SIGINT) # wait for logs + except: + continue + valid_expids.append(expid) except Exception as e: - errors += f"An error occurred while stopping the autosubmit process for expid {expid}: {str(e)}\n" - Log.warning(errors) + Log.warning(f"An error occurred while stopping the autosubmit process for expid:{expid}: {str(e)}") + for expid in valid_expids: + if not force: + Log.info(f"Checking the status of the expid:{expid}") + process_end = False + while (not process_end): + if not proccess_id(expid): + process_end = True + else: + Log.info(f"Waiting for the autosubmit run to safety stop {expid}") + sleep(5) + if cancel: + # call prepare_run to obtain the platforms and as_conf + job_list, _, _, _, as_conf, _, _, _ = Autosubmit.prepare_run( + expid) + # get active jobs + active_jobs = [job for job in job_list.get_job_list() if + job.status in [Status.QUEUING, Status.RUNNING, Status.SUBMITTED]] + # change status of active jobs + status = status.upper() + + for job in active_jobs: + # Cancel from the remote platform + Log.info(f'Cancelling job {job.name} on platform {job.platform.name}') + try: + job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True) + except Exception as e: + Log.warning(f"{str(e)}") + Log.info(f"Changing status of job {job.name} to {status}") + if status in Status.VALUE_TO_KEY.values(): + job.status = Status.KEY_TO_VALUE[status] + job_list.save() + diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index c76ecf5e1..5ee6d476c 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1976,7 +1976,7 @@ class Job(object): template_file.close() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'sleep 500' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 3999a03b0..27414223f 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -40,6 +40,7 @@ class Status: # Note: any change on constants must be applied on the dict below!!! VALUE_TO_KEY = {-3: 'SUSPENDED', -2: 'UNKNOWN', -1: 'FAILED', 0: 'WAITING', 1: 'READY', 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED', 6: 'HELD', 7: 'PREPARED', 8: 'SKIPPED', 9: 'DELAYED'} + KEY_TO_VALUE = {'SUSPENDED': -3, 'UNKNOWN': -2, 'FAILED': -1, 'WAITING': 0, 'READY': 1, 'SUBMITTED': 2, 'QUEUING': 3, 'RUNNING': 4, 'COMPLETED': 5, 'HELD': 6, 'PREPARED': 7, 'SKIPPED': 8, 'DELAYED': 9} LOGICAL_ORDER = ["WAITING", "DELAYED", "PREPARED", "READY", "SUBMITTED", "HELD", "QUEUING", "RUNNING", "SKIPPED", "FAILED", "UNKNOWN", "COMPLETED", "SUSPENDED"] def retval(self, value): diff --git a/test/unit/test_stop.py b/test/unit/test_stop.py new file mode 100644 index 000000000..57788a80b --- /dev/null +++ b/test/unit/test_stop.py @@ -0,0 +1,121 @@ +import inspect +from unittest import TestCase +import io +import sys +from contextlib import suppress, redirect_stdout +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import patch + +from autosubmit.autosubmit import Autosubmit, AutosubmitCritical +from autosubmitconfigparser.config.basicconfig import BasicConfig + +from unittest.mock import MagicMock + +import os +from unittest import TestCase +from copy import copy +import networkx +from networkx import DiGraph +#import patch +from textwrap import dedent +import shutil +import tempfile +from mock import Mock, patch +from random import randrange +from pathlib import Path +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_common import Type +from autosubmit.job.job_list import JobList +from autosubmit.job.job_list_persistence import JobListPersistencePkl +from autosubmitconfigparser.config.yamlparser import YAMLParserFactory +from log.log import AutosubmitCritical + +""" +This file contains the test for the `autosubmit stop` command. Found in /autosubmit.py line 6079. + +Possible usages: + +autosubmit stop expids +autosubmit stop expids [--kill] [status] [--force] +autosubmit stop --all [--kill] [status] [--force] + +""" + +class TestStop(TestCase): + + def setUp(self): + self.experiment_id = 'random-id' + self.autosubmit = Autosubmit() + self.original_root_dir = BasicConfig.LOCAL_ROOT_DIR + self.root_dir = TemporaryDirectory() + BasicConfig.LOCAL_ROOT_DIR = self.root_dir.name + self.exp_path = Path(self.root_dir.name, 'a000') + self.tmp_dir = self.exp_path / BasicConfig.LOCAL_TMP_DIR + self.aslogs_dir = self.tmp_dir / BasicConfig.LOCAL_ASLOG_DIR + self.status_path = self.exp_path / 'status' + self.aslogs_dir.mkdir(parents=True) + self.status_path.mkdir() + self.experiment_id = 'random-id' + self.temp_directory = tempfile.mkdtemp() + joblist_persistence = JobListPersistencePkl() + self.as_conf = Mock() + self.as_conf.experiment_data = dict() + self.as_conf.experiment_data["JOBS"] = dict() + self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] + self.as_conf.experiment_data["PLATFORMS"] = dict() + self.temp_directory = tempfile.mkdtemp() + joblist_persistence = JobListPersistencePkl() + + self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(),joblist_persistence, self.as_conf) + # creating jobs for self list + self.completed_job = self._createDummyJobWithStatus(Status.COMPLETED) + self.submitted_job = self._createDummyJobWithStatus(Status.SUBMITTED) + self.running_job = self._createDummyJobWithStatus(Status.RUNNING) + self.queuing_job = self._createDummyJobWithStatus(Status.QUEUING) + self.failed_job = self._createDummyJobWithStatus(Status.FAILED) + self.ready_job = self._createDummyJobWithStatus(Status.READY) + self.waiting_job = self._createDummyJobWithStatus(Status.WAITING) + + self.job_list._job_list = [self.running_job, self.queuing_job] + + def _createDummyJobWithStatus(self, status): + job_name = str(randrange(999999, 999999999)) + job_id = randrange(1, 999) + job = Job(job_name, job_id, status, 0) + job.type = randrange(0, 2) + return job + + def test_stop_expids(self): + # mock + fake_running_process = MagicMock()# process id of the experiment, to mock the process id of the experiment + fake_running_process.communicate.return_value = (b'bla 0001 bla bla bla', b'') + fake_running_expid = 'a000,a001' # experiment id of the experiment, to mock the experiment id of the experiment + with patch('subprocess.Popen', return_value=fake_running_process) as mock_popen: + with patch('os.kill') as mock_kill: + mock_job_list = MagicMock() + mock_job_list.return_value = self.job_list + with patch('autosubmit.autosubmit.Autosubmit.load_job_list', return_value=mock_job_list): + self.autosubmit.stop(fake_running_expid,force=True) + +class FakeBasicConfig: + def __init__(self): + pass + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' + STRUCTURES_DIR = '/dummy/structure/dir' + -- GitLab From b0ac92bac9db9b1a2e1b0ad4c6376f996e63c385 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 8 May 2024 12:36:41 +0200 Subject: [PATCH 3/8] Docs and added process.daemon= True --- autosubmit/autosubmit.py | 102 ++++++++++++++++------------ autosubmit/platforms/platform.py | 1 + docs/source/userguide/run/index.rst | 33 +++++++++ test/unit/test_stop.py | 19 ++++-- 4 files changed, 108 insertions(+), 47 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6e9e0ad1c..89b41fe53 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -835,56 +835,69 @@ class Autosubmit: if 'all' not in BasicConfig.ALLOWED_HOSTS[args.command] and not (host in BasicConfig.ALLOWED_HOSTS[args.command] or fullhost in BasicConfig.ALLOWED_HOSTS[args.command]): raise AutosubmitCritical(message, 7071) if (expid != 'None' and expid) and args.command not in expid_less and args.command not in global_log_command: - as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) - as_conf.reload(force_load=True) - if len(as_conf.experiment_data) == 0: - if args.command not in ["expid", "upgrade"]: - raise AutosubmitCritical( - "Experiment {0} has no yml data. Please, if you really wish to use AS 4 prompt:\nautosubmit upgrade {0}".format( - expid), 7012) - exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) - tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) - aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) - if not os.path.exists(exp_path): - raise AutosubmitCritical("Experiment does not exist", 7012) - # delete is treated differently - if args.command not in ["monitor", "describe", "delete", "report", "stats", "dbfix", "stop"]: - owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=True) + if "," in expid: + expids = expid.split(",") else: - owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=False) + expids = expid.split(" ") + expids = [x.strip() for x in expids] + for expid in expids: + as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) + as_conf.reload(force_load=True) + if len(as_conf.experiment_data) == 0: + if args.command not in ["expid", "upgrade"]: + raise AutosubmitCritical( + "Experiment {0} has no yml data. Please, if you really wish to use AS 4 prompt:\nautosubmit upgrade {0}".format( + expid), 7012) + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) + tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) + if not os.path.exists(exp_path): + raise AutosubmitCritical("Experiment does not exist", 7012) + # delete is treated differently + if args.command not in ["monitor", "describe", "delete", "report", "stats", "dbfix"]: + owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=True) + else: + owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=False) if not os.path.exists(tmp_path): os.mkdir(tmp_path) if not os.path.exists(aslogs_path): os.mkdir(aslogs_path) - if owner: - os.chmod(tmp_path, 0o775) - with suppress(PermissionError, FileNotFoundError, Exception): # for -txt option - os.chmod(f'{exp_path}/status', 0o775) - - Log.set_file(os.path.join(aslogs_path, args.command + '.log'), "out", log_level) - Log.set_file(os.path.join(aslogs_path, args.command + '_err.log'), "err") - if args.command in ["run"]: - if os.path.exists(os.path.join(aslogs_path, 'jobs_active_status.log')): - os.remove(os.path.join(aslogs_path, 'jobs_active_status.log')) - if os.path.exists(os.path.join(aslogs_path, 'jobs_failed_status.log')): - os.remove(os.path.join(aslogs_path, 'jobs_failed_status.log')) - Log.set_file(os.path.join(aslogs_path, 'jobs_active_status.log'), "status") - Log.set_file(os.path.join(aslogs_path, 'jobs_failed_status.log'), "status_failed") + if args.command == "stop": + exp_id = "_".join(expids) + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + args.command + exp_id + '.log'), "out", log_level) + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + args.command + exp_id + '_err.log'), "err") else: - st = os.stat(tmp_path) - oct_perm = str(oct(st.st_mode))[-3:] - if int(oct_perm[1]) in [6, 7] or int(oct_perm[2]) in [6, 7]: - Log.set_file(os.path.join(tmp_path, args.command + '.log'), "out", log_level) - Log.set_file(os.path.join(tmp_path, args.command + '_err.log'), "err") + if owner: + os.chmod(tmp_path, 0o775) + with suppress(PermissionError, FileNotFoundError, Exception): # for -txt option + os.chmod(f'{exp_path}/status', 0o775) + + Log.set_file(os.path.join(aslogs_path, args.command + '.log'), "out", log_level) + Log.set_file(os.path.join(aslogs_path, args.command + '_err.log'), "err") + if args.command in ["run"]: + if os.path.exists(os.path.join(aslogs_path, 'jobs_active_status.log')): + os.remove(os.path.join(aslogs_path, 'jobs_active_status.log')) + if os.path.exists(os.path.join(aslogs_path, 'jobs_failed_status.log')): + os.remove(os.path.join(aslogs_path, 'jobs_failed_status.log')) + Log.set_file(os.path.join(aslogs_path, 'jobs_active_status.log'), "status") + Log.set_file(os.path.join(aslogs_path, 'jobs_failed_status.log'), "status_failed") else: - Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, - args.command + expid + '.log'), "out", log_level) - Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, - args.command + expid + '_err.log'), "err") - Log.printlog( - "Permissions of {0} are {1}. The log is being written in the {2} path instead of {1}. Please tell to the owner to fix the permissions".format( - tmp_path, oct_perm, BasicConfig.GLOBAL_LOG_DIR)) + st = os.stat(tmp_path) + oct_perm = str(oct(st.st_mode))[-3:] + if int(oct_perm[1]) in [6, 7] or int(oct_perm[2]) in [6, 7]: + Log.set_file(os.path.join(tmp_path, args.command + '.log'), "out", log_level) + Log.set_file(os.path.join(tmp_path, args.command + '_err.log'), "err") + else: + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + args.command + expid + '.log'), "out", log_level) + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + args.command + expid + '_err.log'), "err") + Log.printlog( + "Permissions of {0} are {1}. The log is being written in the {2} path instead of {1}. Please tell to the owner to fix the permissions".format( + tmp_path, oct_perm, BasicConfig.GLOBAL_LOG_DIR)) Log.file_path = tmp_path if owner: if "update_version" in args: @@ -6091,7 +6104,7 @@ class Autosubmit: :param only_cancel: Cancel all jobs in the remote and local platform queues if process is already stopped :param status: desired final status of the jobs canceled (default: FAILED) :param all: All user experiments - + :param force: Force stop the autosubmit process, equivalent to kill -9 :return: """ def retrieve_expids(): @@ -6129,6 +6142,8 @@ class Autosubmit: raise AutosubmitCritical( "An error occurred while retrieving the process id", 7011, str(e)) return output[0] if output else "" + # Starts there + if status not in Status.VALUE_TO_KEY.values(): raise AutosubmitCritical("Invalid status. Expected one of {0}".format(Status.VALUE_TO_KEY.keys()), 7011) # First retrieve expids @@ -6142,6 +6157,7 @@ class Autosubmit: expids = expids.split(",") else: expids = expids.split(" ") + expids = [x.strip() for x in expids] # Obtain the proccess id errors = "" valid_expids = [] diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 23905d125..2fd59e1e5 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -19,6 +19,7 @@ from multiprocessing import Process, Queue def processed(fn): def wrapper(*args, **kwargs): process = Process(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_platform") + process.daemon = True # Set the process as a daemon process process.start() return process diff --git a/docs/source/userguide/run/index.rst b/docs/source/userguide/run/index.rst index e61e62f23..e9114f811 100644 --- a/docs/source/userguide/run/index.rst +++ b/docs/source/userguide/run/index.rst @@ -412,6 +412,39 @@ Finally, you can launch Autosubmit *run* in background and with ``nohup`` (conti How to stop the experiment -------------------------- +From Autosubmit 4.1.6, you can stop an experiment using the command `autosubmit stop` + +Options: +:: + + usage: autosubmit stop [-h] [-a] [-c] [-oc] [-s STATUS] [-f] [expid] + + Stop an autosubmit process + + positional arguments: + expid experiment identifier + + optional arguments: + -h, --help show this help message and exit + -a, --all Stop all current user autosubmit processes, if not defined use expid separated by , + -c, --cancel Kills active jobs and set them to failure + -oc, --only_cancel Cancel active jobs if process is stopped + -s STATUS, --status STATUS + Final status of killed jobs. Default is FAILED. + -f, --force Force stop autosubmit process, equivalent to kill -9. If not used, autosubmit will try to stop the process gracefully. + +Examples: +~~~~~~~~~ + +.. code-block:: bash + + autosubmit stop cxxx + autosubmit stop cxxx, cyyy + autosubmit stop -a + autosubmit stop -a -f + autosubmit stop -a -c + + You can stop Autosubmit by sending a signal to the process. To get the process identifier (PID) you can use the ps command on a shell interpreter/terminal. :: diff --git a/test/unit/test_stop.py b/test/unit/test_stop.py index 57788a80b..9b36b1c80 100644 --- a/test/unit/test_stop.py +++ b/test/unit/test_stop.py @@ -35,11 +35,22 @@ from log.log import AutosubmitCritical """ This file contains the test for the `autosubmit stop` command. Found in /autosubmit.py line 6079. -Possible usages: + usage: autosubmit stop [-h] [-a] [-c] [-oc] [-s STATUS] [-f] [expid] + + Stop an autosubmit process + + positional arguments: + expid experiment identifier + + optional arguments: + -h, --help show this help message and exit + -a, --all Stop all current user autosubmit processes, if not defined use expid separated by , + -c, --cancel Kills active jobs and set them to failure + -oc, --only_cancel Cancel active jobs if process is stopped + -s STATUS, --status STATUS + Final status of killed jobs. Default is FAILED. + -f, --force Force stop autosubmit process, equivalent to kill -9. If not used, autosubmit will try to stop the process gracefully. -autosubmit stop expids -autosubmit stop expids [--kill] [status] [--force] -autosubmit stop --all [--kill] [status] [--force] """ -- GitLab From 97e22c6f4a058feb8fee6efbab4c510373aad393 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 8 May 2024 12:37:28 +0200 Subject: [PATCH 4/8] formatting --- autosubmit/autosubmit.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 89b41fe53..372ec9481 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -6213,14 +6213,6 @@ class Autosubmit: job.status = Status.KEY_TO_VALUE[status] job_list.save() - - - - - - - - @staticmethod def cat_log(exp_or_job_id: str, file: Union[None, str], mode: Union[None, str], inspect:bool=False) -> bool: """The cat-log command allows users to view Autosubmit logs using the command-line. -- GitLab From 796e9835e5811882c18f0a73c0dc8e73fff61303 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 8 May 2024 12:38:08 +0200 Subject: [PATCH 5/8] unsed imports --- test/unit/test_stop.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/test/unit/test_stop.py b/test/unit/test_stop.py index 9b36b1c80..d3bc1dac7 100644 --- a/test/unit/test_stop.py +++ b/test/unit/test_stop.py @@ -1,36 +1,22 @@ import inspect -from unittest import TestCase -import io -import sys -from contextlib import suppress, redirect_stdout -from pathlib import Path from tempfile import TemporaryDirectory from unittest.mock import patch -from autosubmit.autosubmit import Autosubmit, AutosubmitCritical +from autosubmit.autosubmit import Autosubmit from autosubmitconfigparser.config.basicconfig import BasicConfig from unittest.mock import MagicMock -import os from unittest import TestCase -from copy import copy -import networkx -from networkx import DiGraph -#import patch -from textwrap import dedent -import shutil import tempfile from mock import Mock, patch from random import randrange from pathlib import Path from autosubmit.job.job import Job from autosubmit.job.job_common import Status -from autosubmit.job.job_common import Type from autosubmit.job.job_list import JobList from autosubmit.job.job_list_persistence import JobListPersistencePkl from autosubmitconfigparser.config.yamlparser import YAMLParserFactory -from log.log import AutosubmitCritical """ This file contains the test for the `autosubmit stop` command. Found in /autosubmit.py line 6079. -- GitLab From 8adbd8fadbd0a2d26b7bd13d6c775ee6cf9d4faf Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 8 May 2024 15:18:19 +0200 Subject: [PATCH 6/8] Fix processors being spawn in other non-run commands Fix processors not being terminated on kill Fix stop command to always return the expid even if there are added other flags --- autosubmit/autosubmit.py | 9 ++++++--- autosubmit/platforms/ecplatform.py | 3 ++- autosubmit/platforms/locplatform.py | 3 ++- autosubmit/platforms/paramiko_platform.py | 3 ++- autosubmit/platforms/sgeplatform.py | 3 ++- bin/autosubmit | 1 + 6 files changed, 15 insertions(+), 7 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 372ec9481..6856d8e5b 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -134,6 +134,7 @@ class Autosubmit: def __init__(self): self._experiment_data = {} + self.command = None @property def experiment_data(self): @@ -778,7 +779,6 @@ class Autosubmit: return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect) elif args.command == 'stop': return Autosubmit.stop(args.expid, args.cancel, args.only_cancel, args.status, args.all, args.force) - @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): Log.set_console_level(console_level) @@ -843,6 +843,8 @@ class Autosubmit: for expid in expids: as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) as_conf.reload(force_load=True) + as_conf.set_last_as_command(args.command) + if len(as_conf.experiment_data) == 0: if args.command not in ["expid", "upgrade"]: raise AutosubmitCritical( @@ -6109,10 +6111,10 @@ class Autosubmit: """ def retrieve_expids(): # Retrieve all expids in use by autosubmit attached to my current user - # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | awk '{print $NF}' | sort -u + # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep -oP '(?<=run )\S+' expids = [] try: - command = 'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | awk \'{print $NF}\' | sort -u' + command = f'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep -oP \'(?<=run )\S+\'' process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) output, error = process.communicate() output = output.decode(locale.getlocale()[1]) @@ -6121,6 +6123,7 @@ class Autosubmit: expids = output.split('\n') # delete empty strings expids = [x for x in expids if x] + except Exception as e: raise AutosubmitCritical( "An error occurred while retrieving the expids", 7011, str(e)) diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index b023677a4..f7db52563 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -174,7 +174,8 @@ class EcPlatform(ParamikoPlatform): as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): self.log_retrieval_process_active = True - self.recover_job_logs() + if as_conf.experiment_data["ASMISC"].get("COMMAND","").lower() == "run": + self.recover_job_logs() def restore_connection(self,as_conf): """ diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index ae8c7dd60..e9e6a23c1 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -116,7 +116,8 @@ class LocalPlatform(ParamikoPlatform): if not self.log_retrieval_process_active and ( as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS',"false")).lower() == "false"): self.log_retrieval_process_active = True - self.recover_job_logs() + if as_conf.experiment_data["ASMISC"].get("COMMAND","").lower() == "run": + self.recover_job_logs() def test_connection(self,as_conf): diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 131bb7534..43c6e21ca 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -304,7 +304,8 @@ class ParamikoPlatform(Platform): self.connected = True if not self.log_retrieval_process_active and (as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): self.log_retrieval_process_active = True - self.recover_job_logs() + if as_conf.experiment_data["ASMISC"].get("COMMAND", "").lower() == "run": + self.recover_job_logs() except SSHException: raise except IOError as e: diff --git a/autosubmit/platforms/sgeplatform.py b/autosubmit/platforms/sgeplatform.py index 875d45599..e1c166c69 100644 --- a/autosubmit/platforms/sgeplatform.py +++ b/autosubmit/platforms/sgeplatform.py @@ -126,7 +126,8 @@ class SgePlatform(ParamikoPlatform): as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): self.log_retrieval_process_active = True - self.recover_job_logs() + if as_conf.experiment_data["ASMISC"].get("COMMAND","").lower() == "run": + self.recover_job_logs() def restore_connection(self,as_conf): """ In this case, it does nothing because connection is established for each command diff --git a/bin/autosubmit b/bin/autosubmit index 42d2acd61..d08f47593 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -32,6 +32,7 @@ from autosubmit.autosubmit import Autosubmit from typing import Union + def exit_from_error(e: BaseException): trace = traceback.format_exc() try: -- GitLab From ce10302e4f12a2fa67f94695c4da1492ea020925 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 9 May 2024 16:11:33 +0200 Subject: [PATCH 7/8] changed few things added -fs -t if the user want only to stop running jobs instead of queuing submitted and running --- autosubmit/autosubmit.py | 83 +++++++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6856d8e5b..77e0dfd8f 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -126,6 +126,15 @@ class MyParser(argparse.ArgumentParser): self.print_help() sys.exit(2) +class CancelAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, values) + if values: + if namespace.filter_status.upper() == "SUBMITTED, QUEUING, RUNNING " or namespace.target.upper() == "FAILED": + pass + else: + parser.error("-fs and -t can only be used when --cancel is provided") + class Autosubmit: """ @@ -660,19 +669,21 @@ class Autosubmit: # stop subparser = subparsers.add_parser( - 'stop', description='Stop an autosubmit process') + 'stop', description='Completly stops an autosubmit run process') group = subparser.add_mutually_exclusive_group(required=True) - group.add_argument('expid', help='experiment identifier', nargs='?') - group.add_argument('-a', '--all', default=False, action='store_true', - help='Stop all user autosubmit processes') - subparser.add_argument('-c', '--cancel', default=False, action='store_true', - help='Kills active jobs and set them to failure') - subparser.add_argument('-oc', '--only_cancel', default=False, action='store_true', - help='Cancel active jobs if process is stopped') - subparser.add_argument('-s', '--status', default="FAILED", action='store', metavar='STATUS', - help='Final status of killed jobs. Default is FAILED.') + group.add_argument('expid', help='experiment identifier, stops the listed expids separated by ","', nargs='?') subparser.add_argument('-f', '--force', default=False, action='store_true', - help='Force stop autosubmit process, equivalent to kill -9') + help='Forces to stop autosubmit process, equivalent to kill -9') + group.add_argument('-a', '--all', default=False, action='store_true', + help='Stop all current running autosubmit processes, will ask for confirmation') + group.add_argument('-fa', '--force_all', default=False, action='store_true', + help='Stop all current running autosubmit processes') + subparser.add_argument('-c', '--cancel', action=CancelAction, default=False, nargs=0, + help='Orders to the schedulers to stop active jobs.') + subparser.add_argument('-fs', '--filter_status', type=str, default="SUBMITTED, QUEUING, RUNNING", + help='Select the status (one or more) to filter the list of jobs.') + subparser.add_argument('-t', '--target', type=str, default="FAILED", metavar='STATUS', + help='Final status of killed jobs. Default is FAILED.') args, unknown = parser.parse_known_args() if args.version: Log.info(Autosubmit.autosubmit_version) @@ -778,7 +789,7 @@ class Autosubmit: elif args.command == 'cat-log': return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect) elif args.command == 'stop': - return Autosubmit.stop(args.expid, args.cancel, args.only_cancel, args.status, args.all, args.force) + return Autosubmit.stop(args.expid, args.force, args.all, args.force_all, args.cancel, args.filter_status, args.target) @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): Log.set_console_level(console_level) @@ -6094,19 +6105,26 @@ class Autosubmit: return job_list + + @staticmethod - def stop(expids, cancel=False, only_cancel=False, status="FAILED", all=False, force=False): + def stop(expids, force=False, all=False, force_all=False, cancel=False, current_status="", status='FAILED'): """ The stop command allows users to stop the desired experiments. - - It is possible to use ``autosubmit stop -kill`` to also cancel all jobs in the remote and local platform queues and set them to failed if ``--status`` flag is not prompt. - - :param expids: List of experiments to stop - :param cancel: Cancel all jobs in the remote and local platform queues - :param only_cancel: Cancel all jobs in the remote and local platform queues if process is already stopped - :param status: desired final status of the jobs canceled (default: FAILED) - :param all: All user experiments - :param force: Force stop the autosubmit process, equivalent to kill -9 + :param expids: expids to stop + :type expids: str + :param force: force the stop of the experiment + :type force: bool + :param all: stop all experiments + :type all: bool + :param force_all: force the stop of all experiments + :type force_all: bool + :param cancel: cancel the jobs of the experiment + :type cancel: bool + :param current_status: what status to change # defaults to all active jobs. + :type current_status: str + :param status: status to change the active jobs to + :type status: str :return: """ def retrieve_expids(): @@ -6123,11 +6141,11 @@ class Autosubmit: expids = output.split('\n') # delete empty strings expids = [x for x in expids if x] - except Exception as e: raise AutosubmitCritical( "An error occurred while retrieving the expids", 7011, str(e)) return expids + def proccess_id(expid=None): # Retrieve the process id of the autosubmit process # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "expid" | awk '{print $2}' @@ -6146,27 +6164,30 @@ class Autosubmit: "An error occurred while retrieving the process id", 7011, str(e)) return output[0] if output else "" # Starts there - if status not in Status.VALUE_TO_KEY.values(): raise AutosubmitCritical("Invalid status. Expected one of {0}".format(Status.VALUE_TO_KEY.keys()), 7011) # First retrieve expids - if only_cancel: - cancel=True + if force_all: + all=True if all: expids = retrieve_expids() + if not all_yes: + expids = [expid.lower() for expid in expids if input(f"Do you really want to stop: {expid} (y/n)[enter=y]? ").lower() in ["true","yes","y","1",""]] else: expids = expids.lower() if "," in expids: expids = expids.split(",") else: expids = expids.split(" ") + expids = [x.strip() for x in expids] # Obtain the proccess id errors = "" valid_expids = [] for expid in expids: process_id_ = proccess_id(expid) - if not process_id_ and only_cancel: + if not process_id_: + Log.info(f"Expid {expid} was not running") valid_expids.append(expid) elif process_id_: # Send the signal to stop the autosubmit process @@ -6186,13 +6207,14 @@ class Autosubmit: Log.warning(f"An error occurred while stopping the autosubmit process for expid:{expid}: {str(e)}") for expid in valid_expids: if not force: - Log.info(f"Checking the status of the expid:{expid}") + Log.info(f"Checking the status of the expid: {expid}") process_end = False while (not process_end): if not proccess_id(expid): + Log.info(f"Expid {expid} is stopped") process_end = True else: - Log.info(f"Waiting for the autosubmit run to safety stop {expid}") + Log.info(f"Waiting for the autosubmit run to safety stop: {expid}") sleep(5) if cancel: # call prepare_run to obtain the platforms and as_conf @@ -6203,7 +6225,8 @@ class Autosubmit: job.status in [Status.QUEUING, Status.RUNNING, Status.SUBMITTED]] # change status of active jobs status = status.upper() - + if not active_jobs: + Log.info(f"No active jobs found for expid {expid}") for job in active_jobs: # Cancel from the remote platform Log.info(f'Cancelling job {job.name} on platform {job.platform.name}') -- GitLab From 17264e8e0bb629ff35a5ede3403f0d4a653f9ca0 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 10 May 2024 16:14:49 +0200 Subject: [PATCH 8/8] Reworked stop to add more stuff and address feedback --- autosubmit/autosubmit.py | 43 ++++++++++++++--------- autosubmit/platforms/paramiko_platform.py | 2 +- autosubmit/platforms/platform.py | 4 --- docs/source/userguide/run/index.rst | 26 +++++++++----- test/unit/test_stop.py | 36 +++++++++---------- 5 files changed, 62 insertions(+), 49 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 77e0dfd8f..27fead5e3 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -128,13 +128,11 @@ class MyParser(argparse.ArgumentParser): class CancelAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): - setattr(namespace, self.dest, values) - if values: - if namespace.filter_status.upper() == "SUBMITTED, QUEUING, RUNNING " or namespace.target.upper() == "FAILED": - pass - else: - parser.error("-fs and -t can only be used when --cancel is provided") - + setattr(namespace, self.dest, True) + if namespace.filter_status.upper() == "SUBMITTED, QUEUING, RUNNING " or namespace.target.upper() == "FAILED": + pass + else: + parser.error("-fs and -t can only be used when --cancel is provided") class Autosubmit: """ @@ -813,7 +811,7 @@ class Autosubmit: "readme", "changelog", "configure", "unarchive", "cat-log"] if args.command == "stop": - if args.all: + if args.all or args.force_all: expid_less.append("stop") global_log_command = ["delete", "archive", "upgrade"] if "offer" in args: @@ -1906,7 +1904,7 @@ class Autosubmit: return exp_history @staticmethod def prepare_run(expid, notransitive=False, start_time=None, start_after=None, - run_only_members=None,recover = False): + run_only_members=None, recover = False, check_scripts= False): """ Prepare the run of the experiment. :param expid: a string with the experiment id. @@ -1996,7 +1994,8 @@ class Autosubmit: # This function, looks at %JOBS.$JOB.FILE% ( mandatory ) and %JOBS.$JOB.CHECK% ( default True ). # Checks the contents of the .sh/.py/r files and looks for AS placeholders. try: - job_list.check_scripts(as_conf) + if check_scripts: + job_list.check_scripts(as_conf) except Exception as e: raise AutosubmitCritical( "Error while checking job templates", 7014, str(e)) @@ -6129,10 +6128,10 @@ class Autosubmit: """ def retrieve_expids(): # Retrieve all expids in use by autosubmit attached to my current user - # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep -oP '(?<=run )\S+' expids = [] try: - command = f'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep -oP \'(?<=run )\S+\'' + command = 'ps -ef | grep "$(whoami)" | grep -oP "(?<=run )\w{4}" | sort -u' + process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) output, error = process.communicate() output = output.decode(locale.getlocale()[1]) @@ -6145,7 +6144,6 @@ class Autosubmit: raise AutosubmitCritical( "An error occurred while retrieving the expids", 7011, str(e)) return expids - def proccess_id(expid=None): # Retrieve the process id of the autosubmit process # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "expid" | awk '{print $2}' @@ -6166,13 +6164,23 @@ class Autosubmit: # Starts there if status not in Status.VALUE_TO_KEY.values(): raise AutosubmitCritical("Invalid status. Expected one of {0}".format(Status.VALUE_TO_KEY.keys()), 7011) + if "," in current_status: + current_status = current_status.upper().split(",") + else: + current_status = current_status.upper().split(" ") + try: + current_status = [Status.KEY_TO_VALUE[x.strip()] for x in current_status] + except: + raise AutosubmitCritical("Invalid status -fs. All values must match one of {0}".format(Status.VALUE_TO_KEY.keys()), 7011) + + # First retrieve expids if force_all: all=True if all: expids = retrieve_expids() - if not all_yes: - expids = [expid.lower() for expid in expids if input(f"Do you really want to stop: {expid} (y/n)[enter=y]? ").lower() in ["true","yes","y","1",""]] + if not force_all: + expids = [expid.lower() for expid in expids if input(f"Do you really want to stop the experiment: {expid} (y/n)[enter=y]? ").lower() in ["true","yes","y","1",""]] else: expids = expids.lower() if "," in expids: @@ -6219,14 +6227,15 @@ class Autosubmit: if cancel: # call prepare_run to obtain the platforms and as_conf job_list, _, _, _, as_conf, _, _, _ = Autosubmit.prepare_run( - expid) + expid,check_scripts=False) # get active jobs active_jobs = [job for job in job_list.get_job_list() if - job.status in [Status.QUEUING, Status.RUNNING, Status.SUBMITTED]] + job.status in current_status] # change status of active jobs status = status.upper() if not active_jobs: Log.info(f"No active jobs found for expid {expid}") + return for job in active_jobs: # Cancel from the remote platform Log.info(f'Cancelling job {job.name} on platform {job.platform.name}') diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 43c6e21ca..b09d07e36 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -425,7 +425,7 @@ class ParamikoPlatform(Platform): except Exception as e: try: os.remove(file_path) - except Exception as e: + except Exception: pass if str(e) in "Garbage": if not ignore_log: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 2fd59e1e5..438078118 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -848,7 +848,3 @@ class Platform(object): self.restore_connection(None) except: pass - time.sleep(1) - - - diff --git a/docs/source/userguide/run/index.rst b/docs/source/userguide/run/index.rst index e9114f811..598d576de 100644 --- a/docs/source/userguide/run/index.rst +++ b/docs/source/userguide/run/index.rst @@ -417,22 +417,29 @@ From Autosubmit 4.1.6, you can stop an experiment using the command `autosubmit Options: :: - usage: autosubmit stop [-h] [-a] [-c] [-oc] [-s STATUS] [-f] [expid] +usage: autosubmit stop [-h] [-f] [-a] [-fa] [-c] [-fs FILTER_STATUS] + [-t STATUS] + [expid] - Stop an autosubmit process +Completly stops an autosubmit run process positional arguments: - expid experiment identifier + expid experiment identifier, stops the listed expids + separated by "," optional arguments: -h, --help show this help message and exit - -a, --all Stop all current user autosubmit processes, if not defined use expid separated by , - -c, --cancel Kills active jobs and set them to failure - -oc, --only_cancel Cancel active jobs if process is stopped - -s STATUS, --status STATUS + -f, --force Forces to stop autosubmit process, equivalent to kill + -9 + -a, --all Stop all current running autosubmit processes, will + ask for confirmation + -fa, --force_all Stop all current running autosubmit processes + -c, --cancel Orders to the schedulers to stop active jobs. + -fs FILTER_STATUS, --filter_status FILTER_STATUS + Select the status (one or more) to filter the list of + jobs. Default is SUBMITTED, QUEUING, RUNNING. + -t STATUS, --target STATUS Final status of killed jobs. Default is FAILED. - -f, --force Force stop autosubmit process, equivalent to kill -9. If not used, autosubmit will try to stop the process gracefully. - Examples: ~~~~~~~~~ @@ -443,6 +450,7 @@ Examples: autosubmit stop -a autosubmit stop -a -f autosubmit stop -a -c + autosubmit stop -fa --cancel -fs "SUBMITTED, QUEUING, RUNNING" -t "FAILED" You can stop Autosubmit by sending a signal to the process. diff --git a/test/unit/test_stop.py b/test/unit/test_stop.py index d3bc1dac7..d247d6ebb 100644 --- a/test/unit/test_stop.py +++ b/test/unit/test_stop.py @@ -21,23 +21,23 @@ from autosubmitconfigparser.config.yamlparser import YAMLParserFactory """ This file contains the test for the `autosubmit stop` command. Found in /autosubmit.py line 6079. - usage: autosubmit stop [-h] [-a] [-c] [-oc] [-s STATUS] [-f] [expid] - - Stop an autosubmit process - - positional arguments: - expid experiment identifier - - optional arguments: - -h, --help show this help message and exit - -a, --all Stop all current user autosubmit processes, if not defined use expid separated by , - -c, --cancel Kills active jobs and set them to failure - -oc, --only_cancel Cancel active jobs if process is stopped - -s STATUS, --status STATUS - Final status of killed jobs. Default is FAILED. - -f, --force Force stop autosubmit process, equivalent to kill -9. If not used, autosubmit will try to stop the process gracefully. - - +positional arguments: + expid experiment identifier, stops the listed expids + separated by "," + +optional arguments: + -h, --help show this help message and exit + -f, --force Forces to stop autosubmit process, equivalent to kill + -9 + -a, --all Stop all current running autosubmit processes, will + ask for confirmation + -fa, --force_all Stop all current running autosubmit processes + -c, --cancel Orders to the schedulers to stop active jobs. + -fs FILTER_STATUS, --filter_status FILTER_STATUS + Select the status (one or more) to filter the list of + jobs. + -t STATUS, --target STATUS + Final status of killed jobs. Default is FAILED. """ class TestStop(TestCase): @@ -94,7 +94,7 @@ class TestStop(TestCase): mock_job_list = MagicMock() mock_job_list.return_value = self.job_list with patch('autosubmit.autosubmit.Autosubmit.load_job_list', return_value=mock_job_list): - self.autosubmit.stop(fake_running_expid,force=True) + self.autosubmit.stop(fake_running_expid,force=True,all=False,force_all=False,cancel=False,current_status="RUNNING",status="FAILED") class FakeBasicConfig: def __init__(self): -- GitLab