diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index a3fd5e788d95c9a7ed3a01b1a80ef8619298de71..27fead5e3262096d40d7d52cd77943666a3768bf 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -126,6 +126,13 @@ class MyParser(argparse.ArgumentParser): self.print_help() sys.exit(2) +class CancelAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, True) + if namespace.filter_status.upper() == "SUBMITTED, QUEUING, RUNNING " or namespace.target.upper() == "FAILED": + pass + else: + parser.error("-fs and -t can only be used when --cancel is provided") class Autosubmit: """ @@ -134,6 +141,7 @@ class Autosubmit: def __init__(self): self._experiment_data = {} + self.command = None @property def experiment_data(self): @@ -657,6 +665,23 @@ class Autosubmit: help='Read job files generated by the inspect subcommand.') subparser.add_argument('ID', metavar='ID', help='An ID of a Workflow (eg a000) or a Job (eg a000_20220401_fc0_1_1_APPLICATION).') + # stop + subparser = subparsers.add_parser( + 'stop', description='Completly stops an autosubmit run process') + group = subparser.add_mutually_exclusive_group(required=True) + group.add_argument('expid', help='experiment identifier, stops the listed expids separated by ","', nargs='?') + subparser.add_argument('-f', '--force', default=False, action='store_true', + help='Forces to stop autosubmit process, equivalent to kill -9') + group.add_argument('-a', '--all', default=False, action='store_true', + help='Stop all current running autosubmit processes, will ask for confirmation') + group.add_argument('-fa', '--force_all', default=False, action='store_true', + help='Stop all current running autosubmit processes') + subparser.add_argument('-c', '--cancel', action=CancelAction, default=False, nargs=0, + help='Orders to the schedulers to stop active jobs.') + subparser.add_argument('-fs', '--filter_status', type=str, default="SUBMITTED, QUEUING, RUNNING", + help='Select the status (one or more) to filter the list of jobs.') + subparser.add_argument('-t', '--target', type=str, default="FAILED", metavar='STATUS', + help='Final status of killed jobs. Default is FAILED.') args, unknown = parser.parse_known_args() if args.version: Log.info(Autosubmit.autosubmit_version) @@ -761,7 +786,8 @@ class Autosubmit: return Autosubmit.update_description(args.expid, args.description) elif args.command == 'cat-log': return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect) - + elif args.command == 'stop': + return Autosubmit.stop(args.expid, args.force, args.all, args.force_all, args.cancel, args.filter_status, args.target) @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): Log.set_console_level(console_level) @@ -784,6 +810,9 @@ class Autosubmit: expid_less = ["expid", "describe", "testcase", "install", "-v", "readme", "changelog", "configure", "unarchive", "cat-log"] + if args.command == "stop": + if args.all or args.force_all: + expid_less.append("stop") global_log_command = ["delete", "archive", "upgrade"] if "offer" in args: if args.offer: @@ -814,57 +843,72 @@ class Autosubmit: if args.command in BasicConfig.ALLOWED_HOSTS: if 'all' not in BasicConfig.ALLOWED_HOSTS[args.command] and not (host in BasicConfig.ALLOWED_HOSTS[args.command] or fullhost in BasicConfig.ALLOWED_HOSTS[args.command]): raise AutosubmitCritical(message, 7071) - if expid != 'None' and args.command not in expid_less and args.command not in global_log_command: - as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) - as_conf.reload(force_load=True) - if len(as_conf.experiment_data) == 0: - if args.command not in ["expid", "upgrade"]: - raise AutosubmitCritical( - "Experiment {0} has no yml data. Please, if you really wish to use AS 4 prompt:\nautosubmit upgrade {0}".format( - expid), 7012) - exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) - tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) - aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) - if not os.path.exists(exp_path): - raise AutosubmitCritical("Experiment does not exist", 7012) - # delete is treated differently - if args.command not in ["monitor", "describe", "delete", "report", "stats", "dbfix"]: - owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=True) + if (expid != 'None' and expid) and args.command not in expid_less and args.command not in global_log_command: + if "," in expid: + expids = expid.split(",") else: - owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=False) + expids = expid.split(" ") + expids = [x.strip() for x in expids] + for expid in expids: + as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) + as_conf.reload(force_load=True) + as_conf.set_last_as_command(args.command) + + if len(as_conf.experiment_data) == 0: + if args.command not in ["expid", "upgrade"]: + raise AutosubmitCritical( + "Experiment {0} has no yml data. Please, if you really wish to use AS 4 prompt:\nautosubmit upgrade {0}".format( + expid), 7012) + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) + tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) + if not os.path.exists(exp_path): + raise AutosubmitCritical("Experiment does not exist", 7012) + # delete is treated differently + if args.command not in ["monitor", "describe", "delete", "report", "stats", "dbfix"]: + owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=True) + else: + owner, eadmin, currentOwner = Autosubmit._check_ownership(expid, raise_error=False) if not os.path.exists(tmp_path): os.mkdir(tmp_path) if not os.path.exists(aslogs_path): os.mkdir(aslogs_path) - if owner: - os.chmod(tmp_path, 0o775) - with suppress(PermissionError, FileNotFoundError, Exception): # for -txt option - os.chmod(f'{exp_path}/status', 0o775) - - Log.set_file(os.path.join(aslogs_path, args.command + '.log'), "out", log_level) - Log.set_file(os.path.join(aslogs_path, args.command + '_err.log'), "err") - if args.command in ["run"]: - if os.path.exists(os.path.join(aslogs_path, 'jobs_active_status.log')): - os.remove(os.path.join(aslogs_path, 'jobs_active_status.log')) - if os.path.exists(os.path.join(aslogs_path, 'jobs_failed_status.log')): - os.remove(os.path.join(aslogs_path, 'jobs_failed_status.log')) - Log.set_file(os.path.join(aslogs_path, 'jobs_active_status.log'), "status") - Log.set_file(os.path.join(aslogs_path, 'jobs_failed_status.log'), "status_failed") + if args.command == "stop": + exp_id = "_".join(expids) + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + args.command + exp_id + '.log'), "out", log_level) + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + args.command + exp_id + '_err.log'), "err") else: - st = os.stat(tmp_path) - oct_perm = str(oct(st.st_mode))[-3:] - if int(oct_perm[1]) in [6, 7] or int(oct_perm[2]) in [6, 7]: - Log.set_file(os.path.join(tmp_path, args.command + '.log'), "out", log_level) - Log.set_file(os.path.join(tmp_path, args.command + '_err.log'), "err") + if owner: + os.chmod(tmp_path, 0o775) + with suppress(PermissionError, FileNotFoundError, Exception): # for -txt option + os.chmod(f'{exp_path}/status', 0o775) + + Log.set_file(os.path.join(aslogs_path, args.command + '.log'), "out", log_level) + Log.set_file(os.path.join(aslogs_path, args.command + '_err.log'), "err") + if args.command in ["run"]: + if os.path.exists(os.path.join(aslogs_path, 'jobs_active_status.log')): + os.remove(os.path.join(aslogs_path, 'jobs_active_status.log')) + if os.path.exists(os.path.join(aslogs_path, 'jobs_failed_status.log')): + os.remove(os.path.join(aslogs_path, 'jobs_failed_status.log')) + Log.set_file(os.path.join(aslogs_path, 'jobs_active_status.log'), "status") + Log.set_file(os.path.join(aslogs_path, 'jobs_failed_status.log'), "status_failed") else: - Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, - args.command + expid + '.log'), "out", log_level) - Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, - args.command + expid + '_err.log'), "err") - Log.printlog( - "Permissions of {0} are {1}. The log is being written in the {2} path instead of {1}. Please tell to the owner to fix the permissions".format( - tmp_path, oct_perm, BasicConfig.GLOBAL_LOG_DIR)) + st = os.stat(tmp_path) + oct_perm = str(oct(st.st_mode))[-3:] + if int(oct_perm[1]) in [6, 7] or int(oct_perm[2]) in [6, 7]: + Log.set_file(os.path.join(tmp_path, args.command + '.log'), "out", log_level) + Log.set_file(os.path.join(tmp_path, args.command + '_err.log'), "err") + else: + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + args.command + expid + '.log'), "out", log_level) + Log.set_file(os.path.join(BasicConfig.GLOBAL_LOG_DIR, + args.command + expid + '_err.log'), "err") + Log.printlog( + "Permissions of {0} are {1}. The log is being written in the {2} path instead of {1}. Please tell to the owner to fix the permissions".format( + tmp_path, oct_perm, BasicConfig.GLOBAL_LOG_DIR)) Log.file_path = tmp_path if owner: if "update_version" in args: @@ -888,7 +932,7 @@ class Autosubmit: Autosubmit.autosubmit_version, expid,args.command), 7014) else: - if expid == 'None': + if expid == 'None' or not expid: exp_id = "" else: exp_id = "_" + expid @@ -1860,7 +1904,7 @@ class Autosubmit: return exp_history @staticmethod def prepare_run(expid, notransitive=False, start_time=None, start_after=None, - run_only_members=None,recover = False): + run_only_members=None, recover = False, check_scripts= False): """ Prepare the run of the experiment. :param expid: a string with the experiment id. @@ -1950,7 +1994,8 @@ class Autosubmit: # This function, looks at %JOBS.$JOB.FILE% ( mandatory ) and %JOBS.$JOB.CHECK% ( default True ). # Checks the contents of the .sh/.py/r files and looks for AS placeholders. try: - job_list.check_scripts(as_conf) + if check_scripts: + job_list.check_scripts(as_conf) except Exception as e: raise AutosubmitCritical( "Error while checking job templates", 7014, str(e)) @@ -6058,6 +6103,151 @@ class Autosubmit: job.platform.get_logs_files(expid, job.remote_logs) return job_list + + + + @staticmethod + def stop(expids, force=False, all=False, force_all=False, cancel=False, current_status="", status='FAILED'): + """ + The stop command allows users to stop the desired experiments. + :param expids: expids to stop + :type expids: str + :param force: force the stop of the experiment + :type force: bool + :param all: stop all experiments + :type all: bool + :param force_all: force the stop of all experiments + :type force_all: bool + :param cancel: cancel the jobs of the experiment + :type cancel: bool + :param current_status: what status to change # defaults to all active jobs. + :type current_status: str + :param status: status to change the active jobs to + :type status: str + :return: + """ + def retrieve_expids(): + # Retrieve all expids in use by autosubmit attached to my current user + expids = [] + try: + command = 'ps -ef | grep "$(whoami)" | grep -oP "(?<=run )\w{4}" | sort -u' + + process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) + output, error = process.communicate() + output = output.decode(locale.getlocale()[1]) + #delete -u from output + output = output.replace("-u", "") + expids = output.split('\n') + # delete empty strings + expids = [x for x in expids if x] + except Exception as e: + raise AutosubmitCritical( + "An error occurred while retrieving the expids", 7011, str(e)) + return expids + def proccess_id(expid=None): + # Retrieve the process id of the autosubmit process + # Bash command: ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "expid" | awk '{print $2}' + try: + command = f'ps -ef | grep "$(whoami)" | grep "autosubmit" | grep "run" | grep "{expid}" ' + process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) + output, error = process.communicate() + output = output.decode(locale.getlocale()[1]) + output = output.split('\n') + # delete noise + if output: + output = [x.split()[1] for x in output if x and "grep" not in x] + + except Exception as e: + raise AutosubmitCritical( + "An error occurred while retrieving the process id", 7011, str(e)) + return output[0] if output else "" + # Starts there + if status not in Status.VALUE_TO_KEY.values(): + raise AutosubmitCritical("Invalid status. Expected one of {0}".format(Status.VALUE_TO_KEY.keys()), 7011) + if "," in current_status: + current_status = current_status.upper().split(",") + else: + current_status = current_status.upper().split(" ") + try: + current_status = [Status.KEY_TO_VALUE[x.strip()] for x in current_status] + except: + raise AutosubmitCritical("Invalid status -fs. All values must match one of {0}".format(Status.VALUE_TO_KEY.keys()), 7011) + + + # First retrieve expids + if force_all: + all=True + if all: + expids = retrieve_expids() + if not force_all: + expids = [expid.lower() for expid in expids if input(f"Do you really want to stop the experiment: {expid} (y/n)[enter=y]? ").lower() in ["true","yes","y","1",""]] + else: + expids = expids.lower() + if "," in expids: + expids = expids.split(",") + else: + expids = expids.split(" ") + + expids = [x.strip() for x in expids] + # Obtain the proccess id + errors = "" + valid_expids = [] + for expid in expids: + process_id_ = proccess_id(expid) + if not process_id_: + Log.info(f"Expid {expid} was not running") + valid_expids.append(expid) + elif process_id_: + # Send the signal to stop the autosubmit process + try: + if force: + try: + os.kill(int(process_id_), signal.SIGKILL) # don't wait for logs + except: + continue + else: + try: + os.kill(int(process_id_), signal.SIGINT) # wait for logs + except: + continue + valid_expids.append(expid) + except Exception as e: + Log.warning(f"An error occurred while stopping the autosubmit process for expid:{expid}: {str(e)}") + for expid in valid_expids: + if not force: + Log.info(f"Checking the status of the expid: {expid}") + process_end = False + while (not process_end): + if not proccess_id(expid): + Log.info(f"Expid {expid} is stopped") + process_end = True + else: + Log.info(f"Waiting for the autosubmit run to safety stop: {expid}") + sleep(5) + if cancel: + # call prepare_run to obtain the platforms and as_conf + job_list, _, _, _, as_conf, _, _, _ = Autosubmit.prepare_run( + expid,check_scripts=False) + # get active jobs + active_jobs = [job for job in job_list.get_job_list() if + job.status in current_status] + # change status of active jobs + status = status.upper() + if not active_jobs: + Log.info(f"No active jobs found for expid {expid}") + return + for job in active_jobs: + # Cancel from the remote platform + Log.info(f'Cancelling job {job.name} on platform {job.platform.name}') + try: + job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True) + except Exception as e: + Log.warning(f"{str(e)}") + Log.info(f"Changing status of job {job.name} to {status}") + if status in Status.VALUE_TO_KEY.values(): + job.status = Status.KEY_TO_VALUE[status] + job_list.save() + @staticmethod def cat_log(exp_or_job_id: str, file: Union[None, str], mode: Union[None, str], inspect:bool=False) -> bool: """The cat-log command allows users to view Autosubmit logs using the command-line. @@ -6185,3 +6375,5 @@ class Autosubmit: raise AutosubmitCritical(f'The job log file {file} found is not a file: {workflow_log_file}', 7011) return view_file(workflow_log_file, mode) == 0 + + diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index c76ecf5e13be315996bcfd1cceee6ca60cd58cd2..5ee6d476c2529747a480197bb07e32d04753c66d 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1976,7 +1976,7 @@ class Job(object): template_file.close() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'sleep 500' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 3999a03b06695faaad2361b34006455ae84f1846..27414223f360b4486463e7b4c6a0760b40fb2321 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -40,6 +40,7 @@ class Status: # Note: any change on constants must be applied on the dict below!!! VALUE_TO_KEY = {-3: 'SUSPENDED', -2: 'UNKNOWN', -1: 'FAILED', 0: 'WAITING', 1: 'READY', 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED', 6: 'HELD', 7: 'PREPARED', 8: 'SKIPPED', 9: 'DELAYED'} + KEY_TO_VALUE = {'SUSPENDED': -3, 'UNKNOWN': -2, 'FAILED': -1, 'WAITING': 0, 'READY': 1, 'SUBMITTED': 2, 'QUEUING': 3, 'RUNNING': 4, 'COMPLETED': 5, 'HELD': 6, 'PREPARED': 7, 'SKIPPED': 8, 'DELAYED': 9} LOGICAL_ORDER = ["WAITING", "DELAYED", "PREPARED", "READY", "SUBMITTED", "HELD", "QUEUING", "RUNNING", "SKIPPED", "FAILED", "UNKNOWN", "COMPLETED", "SUSPENDED"] def retval(self, value): diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index b023677a4ccae76a72d222bebe1b7d7085bef224..f7db52563a20f7d9ed55777cb02c1f41a54c13fc 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -174,7 +174,8 @@ class EcPlatform(ParamikoPlatform): as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): self.log_retrieval_process_active = True - self.recover_job_logs() + if as_conf.experiment_data["ASMISC"].get("COMMAND","").lower() == "run": + self.recover_job_logs() def restore_connection(self,as_conf): """ diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index ae8c7dd6017377d243612356c4c191d412a5beb5..e9e6a23c16f8911701982e0fc4770fe7363b7911 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -116,7 +116,8 @@ class LocalPlatform(ParamikoPlatform): if not self.log_retrieval_process_active and ( as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS',"false")).lower() == "false"): self.log_retrieval_process_active = True - self.recover_job_logs() + if as_conf.experiment_data["ASMISC"].get("COMMAND","").lower() == "run": + self.recover_job_logs() def test_connection(self,as_conf): diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 131bb7534eb1aee51b493b7b928bb94e62ecf755..b09d07e36cbab682dbb8ba4c2c8dd64a093b4ce9 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -304,7 +304,8 @@ class ParamikoPlatform(Platform): self.connected = True if not self.log_retrieval_process_active and (as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): self.log_retrieval_process_active = True - self.recover_job_logs() + if as_conf.experiment_data["ASMISC"].get("COMMAND", "").lower() == "run": + self.recover_job_logs() except SSHException: raise except IOError as e: @@ -424,7 +425,7 @@ class ParamikoPlatform(Platform): except Exception as e: try: os.remove(file_path) - except Exception as e: + except Exception: pass if str(e) in "Garbage": if not ignore_log: diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 23905d125a7fbf25ccbd03a12048fd994e36cada..438078118503dd8ae8a5a1e9da93675b447b217b 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -19,6 +19,7 @@ from multiprocessing import Process, Queue def processed(fn): def wrapper(*args, **kwargs): process = Process(target=fn, args=args, kwargs=kwargs, name=f"{args[0].name}_platform") + process.daemon = True # Set the process as a daemon process process.start() return process @@ -847,7 +848,3 @@ class Platform(object): self.restore_connection(None) except: pass - time.sleep(1) - - - diff --git a/autosubmit/platforms/sgeplatform.py b/autosubmit/platforms/sgeplatform.py index 875d455996b6fe1b1f102cd9f68465a142b058ac..e1c166c6941511a7bc6bcd863cf2f1316adda475 100644 --- a/autosubmit/platforms/sgeplatform.py +++ b/autosubmit/platforms/sgeplatform.py @@ -126,7 +126,8 @@ class SgePlatform(ParamikoPlatform): as_conf is None or str(as_conf.platforms_data.get(self.name, {}).get('DISABLE_RECOVERY_THREADS', "false")).lower() == "false"): self.log_retrieval_process_active = True - self.recover_job_logs() + if as_conf.experiment_data["ASMISC"].get("COMMAND","").lower() == "run": + self.recover_job_logs() def restore_connection(self,as_conf): """ In this case, it does nothing because connection is established for each command diff --git a/bin/autosubmit b/bin/autosubmit index 42d2acd6148bbeba547835af55cb29f8ce1d2132..d08f47593a4bffef1fe577076a6ffe24c394b856 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -32,6 +32,7 @@ from autosubmit.autosubmit import Autosubmit from typing import Union + def exit_from_error(e: BaseException): trace = traceback.format_exc() try: diff --git a/docs/source/userguide/run/index.rst b/docs/source/userguide/run/index.rst index e61e62f23d8e8bc0df190f2d35df4ded8925f155..598d576de12ef14666590a38f5a40c3dea53227d 100644 --- a/docs/source/userguide/run/index.rst +++ b/docs/source/userguide/run/index.rst @@ -412,6 +412,47 @@ Finally, you can launch Autosubmit *run* in background and with ``nohup`` (conti How to stop the experiment -------------------------- +From Autosubmit 4.1.6, you can stop an experiment using the command `autosubmit stop` + +Options: +:: + +usage: autosubmit stop [-h] [-f] [-a] [-fa] [-c] [-fs FILTER_STATUS] + [-t STATUS] + [expid] + +Completly stops an autosubmit run process + + positional arguments: + expid experiment identifier, stops the listed expids + separated by "," + + optional arguments: + -h, --help show this help message and exit + -f, --force Forces to stop autosubmit process, equivalent to kill + -9 + -a, --all Stop all current running autosubmit processes, will + ask for confirmation + -fa, --force_all Stop all current running autosubmit processes + -c, --cancel Orders to the schedulers to stop active jobs. + -fs FILTER_STATUS, --filter_status FILTER_STATUS + Select the status (one or more) to filter the list of + jobs. Default is SUBMITTED, QUEUING, RUNNING. + -t STATUS, --target STATUS + Final status of killed jobs. Default is FAILED. +Examples: +~~~~~~~~~ + +.. code-block:: bash + + autosubmit stop cxxx + autosubmit stop cxxx, cyyy + autosubmit stop -a + autosubmit stop -a -f + autosubmit stop -a -c + autosubmit stop -fa --cancel -fs "SUBMITTED, QUEUING, RUNNING" -t "FAILED" + + You can stop Autosubmit by sending a signal to the process. To get the process identifier (PID) you can use the ps command on a shell interpreter/terminal. :: diff --git a/test/unit/test_stop.py b/test/unit/test_stop.py new file mode 100644 index 0000000000000000000000000000000000000000..d247d6ebb1a0bcbdc87f9ef1dc1072016d4c0143 --- /dev/null +++ b/test/unit/test_stop.py @@ -0,0 +1,118 @@ +import inspect +from tempfile import TemporaryDirectory +from unittest.mock import patch + +from autosubmit.autosubmit import Autosubmit +from autosubmitconfigparser.config.basicconfig import BasicConfig + +from unittest.mock import MagicMock + +from unittest import TestCase +import tempfile +from mock import Mock, patch +from random import randrange +from pathlib import Path +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status +from autosubmit.job.job_list import JobList +from autosubmit.job.job_list_persistence import JobListPersistencePkl +from autosubmitconfigparser.config.yamlparser import YAMLParserFactory + +""" +This file contains the test for the `autosubmit stop` command. Found in /autosubmit.py line 6079. + +positional arguments: + expid experiment identifier, stops the listed expids + separated by "," + +optional arguments: + -h, --help show this help message and exit + -f, --force Forces to stop autosubmit process, equivalent to kill + -9 + -a, --all Stop all current running autosubmit processes, will + ask for confirmation + -fa, --force_all Stop all current running autosubmit processes + -c, --cancel Orders to the schedulers to stop active jobs. + -fs FILTER_STATUS, --filter_status FILTER_STATUS + Select the status (one or more) to filter the list of + jobs. + -t STATUS, --target STATUS + Final status of killed jobs. Default is FAILED. +""" + +class TestStop(TestCase): + + def setUp(self): + self.experiment_id = 'random-id' + self.autosubmit = Autosubmit() + self.original_root_dir = BasicConfig.LOCAL_ROOT_DIR + self.root_dir = TemporaryDirectory() + BasicConfig.LOCAL_ROOT_DIR = self.root_dir.name + self.exp_path = Path(self.root_dir.name, 'a000') + self.tmp_dir = self.exp_path / BasicConfig.LOCAL_TMP_DIR + self.aslogs_dir = self.tmp_dir / BasicConfig.LOCAL_ASLOG_DIR + self.status_path = self.exp_path / 'status' + self.aslogs_dir.mkdir(parents=True) + self.status_path.mkdir() + self.experiment_id = 'random-id' + self.temp_directory = tempfile.mkdtemp() + joblist_persistence = JobListPersistencePkl() + self.as_conf = Mock() + self.as_conf.experiment_data = dict() + self.as_conf.experiment_data["JOBS"] = dict() + self.as_conf.jobs_data = self.as_conf.experiment_data["JOBS"] + self.as_conf.experiment_data["PLATFORMS"] = dict() + self.temp_directory = tempfile.mkdtemp() + joblist_persistence = JobListPersistencePkl() + + self.job_list = JobList(self.experiment_id, FakeBasicConfig, YAMLParserFactory(),joblist_persistence, self.as_conf) + # creating jobs for self list + self.completed_job = self._createDummyJobWithStatus(Status.COMPLETED) + self.submitted_job = self._createDummyJobWithStatus(Status.SUBMITTED) + self.running_job = self._createDummyJobWithStatus(Status.RUNNING) + self.queuing_job = self._createDummyJobWithStatus(Status.QUEUING) + self.failed_job = self._createDummyJobWithStatus(Status.FAILED) + self.ready_job = self._createDummyJobWithStatus(Status.READY) + self.waiting_job = self._createDummyJobWithStatus(Status.WAITING) + + self.job_list._job_list = [self.running_job, self.queuing_job] + + def _createDummyJobWithStatus(self, status): + job_name = str(randrange(999999, 999999999)) + job_id = randrange(1, 999) + job = Job(job_name, job_id, status, 0) + job.type = randrange(0, 2) + return job + + def test_stop_expids(self): + # mock + fake_running_process = MagicMock()# process id of the experiment, to mock the process id of the experiment + fake_running_process.communicate.return_value = (b'bla 0001 bla bla bla', b'') + fake_running_expid = 'a000,a001' # experiment id of the experiment, to mock the experiment id of the experiment + with patch('subprocess.Popen', return_value=fake_running_process) as mock_popen: + with patch('os.kill') as mock_kill: + mock_job_list = MagicMock() + mock_job_list.return_value = self.job_list + with patch('autosubmit.autosubmit.Autosubmit.load_job_list', return_value=mock_job_list): + self.autosubmit.stop(fake_running_expid,force=True,all=False,force_all=False,cancel=False,current_status="RUNNING",status="FAILED") + +class FakeBasicConfig: + def __init__(self): + pass + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + DB_DIR = '/dummy/db/dir' + DB_FILE = '/dummy/db/file' + DB_PATH = '/dummy/db/path' + LOCAL_ROOT_DIR = '/dummy/local/root/dir' + LOCAL_TMP_DIR = '/dummy/local/temp/dir' + LOCAL_PROJ_DIR = '/dummy/local/proj/dir' + DEFAULT_PLATFORMS_CONF = '' + DEFAULT_JOBS_CONF = '' + STRUCTURES_DIR = '/dummy/structure/dir' +