diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index f28ff57664c6c6018d7e1178aa4becc7825f93ba..ac2a775d379e846d186815e09c091be1fce8238b 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -374,6 +374,9 @@ class Autosubmit: default=False, help='Generate possible wrapper in the current workflow') subparser.add_argument('-v', '--update_version', action='store_true', default=False, help='Update experiment version') + subparser.add_argument( + '-q', '--quick', action="store_true", help='Only checks one job per each section') + group.add_argument('-fs', '--filter_status', type=str, choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), @@ -695,7 +698,7 @@ class Autosubmit: return Autosubmit.check(args.expid, args.notransitive) elif args.command == 'inspect': return Autosubmit.inspect(args.expid, args.list, args.filter_chunks, args.filter_status, - args.filter_type, args.notransitive, args.force, args.check_wrapper) + args.filter_type, args.notransitive, args.force, args.check_wrapper, args.quick) elif args.command == 'report': return Autosubmit.report(args.expid, args.template, args.show_all_parameters, args.folder_path, args.placeholders) @@ -1393,7 +1396,7 @@ class Autosubmit: @staticmethod def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, - check_wrapper=False): + check_wrapper=False, quick=False): """ Generates cmd files experiment. @@ -1452,16 +1455,15 @@ class Autosubmit: if check_wrapper and (not locked or (force and locked)): Log.info("Generating all cmd script adapted for wrappers") jobs = job_list.get_uncompleted() - - jobs_cw = job_list.get_completed() + if force: + jobs_cw = job_list.get_completed() else: - if (force and not locked) or (force and locked): + if locked: + Log.warning("There is a .lock file and not -f, generating only all unsubmitted cmd scripts") + jobs = job_list.get_unsubmitted() + elif force: Log.info("Overwriting all cmd scripts") jobs = job_list.get_job_list() - elif locked: - Log.warning( - "There is a .lock file and not -f, generating only all unsubmitted cmd scripts") - jobs = job_list.get_unsubmitted() else: Log.info("Generating cmd scripts only for selected jobs") if filter_chunks: @@ -1515,10 +1517,36 @@ class Autosubmit: jobs.append(job) else: jobs = job_list.get_job_list() + if quick: + wrapped_sections = list() + if check_wrapper: + for wrapper_data in as_conf.experiment_data.get("WRAPPERS",{}).values(): + jobs_in_wrapper = wrapper_data.get("JOBS_IN_WRAPPER","").upper() + if "," in jobs_in_wrapper: + jobs_in_wrapper = jobs_in_wrapper.split(",") + else: + jobs_in_wrapper = jobs_in_wrapper.split(" ") + wrapped_sections.extend(jobs_in_wrapper) + wrapped_sections = list(set(wrapped_sections)) + jobs_aux = list() + sections_added = set() + for job in jobs: + if job.section not in sections_added or job.section in wrapped_sections: + sections_added.add(job.section) + jobs_aux.append(job) + jobs = jobs_aux + del jobs_aux + sections_added = set() + jobs_aux = list() + for job in jobs_cw: + if job.section not in sections_added or job.section in wrapped_sections: + sections_added.add(job.section) + jobs_aux.append(job) + jobs_cw = jobs_aux + del jobs_aux if isinstance(jobs, type([])): for job in jobs: job.status = Status.WAITING - Autosubmit.generate_scripts_andor_wrappers( as_conf, job_list, jobs, packages_persistence, False) if len(jobs_cw) > 0: @@ -1592,11 +1620,6 @@ class Autosubmit: job.platform = submitter.platforms[job.platform_name] if job.platform is not None and job.platform != "": platforms_to_test.add(job.platform) - if not only_wrappers: - job_list.check_scripts(as_conf) # added only in inspect - else: # no longer check_Scripts if -cw is added to monitor or create, just update the parameters - for job in ( job for job in job_list.get_job_list() ): - job.update_parameters(as_conf,parameters) job_list.update_list(as_conf, False) # Loading parameters again Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) @@ -1612,8 +1635,6 @@ class Autosubmit: while job_list.get_active(): Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers, hold=False) - # for job in job_list.get_uncompleted_and_not_waiting(): - # job.status = Status.COMPLETED job_list.update_list(as_conf, False) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 403a8dc742487fff17c917024159a401ba5c58f8..688fb45823a0523b5923924729c8bd14651346ea 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -94,7 +94,7 @@ class JobPackageBase(object): return self._platform @threaded - def check_scripts(self,jobs,configuration, parameters,only_generate,hold): + def check_scripts(self, jobs, configuration, parameters, only_generate, hold): for job in jobs: if only_generate and not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): break @@ -116,7 +116,32 @@ class JobPackageBase(object): pass - + def submit_unthreaded(self, configuration, parameters,only_generate=False,hold=False): + """ + :param hold: + :para configuration: Autosubmit basic configuration \n + :type configuration: AutosubmitConfig object \n + :param parameters; Parameters from joblist \n + :type parameters: JobList,parameters \n + :param only_generate: True if coming from generate_scripts_andor_wrappers(). If true, only generates scripts; otherwise, submits. \n + :type only_generate: Boolean + """ + for job in self.jobs: + if only_generate and not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + exit_ = True + break + if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: + raise AutosubmitCritical( + "Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name), 7014) + if not job.check_script(configuration, parameters, show_logs=job.check_warnings): + Log.warning( + f'Script {job.name} has some empty variables. An empty value has substituted these variables') + else: + Log.result("Script {0} OK", job.name) + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) + self._create_scripts(configuration) def submit(self, configuration, parameters,only_generate=False,hold=False): """ :param hold: @@ -125,10 +150,8 @@ class JobPackageBase(object): :param parameters; Parameters from joblist \n :type parameters: JobList,parameters \n :param only_generate: True if coming from generate_scripts_andor_wrappers(). If true, only generates scripts; otherwise, submits. \n - :type only_generate: Boolean + :type only_generate: Boolean """ - job = None - exit_=False thread_number = multiprocessing.cpu_count() if len(self.jobs) > 2500: thread_number = thread_number * 2 @@ -140,47 +163,38 @@ class JobPackageBase(object): thread_number = thread_number * 5 chunksize = int((len(self.jobs) + thread_number - 1) / thread_number) try: - if len(self.jobs) < thread_number: - for job in self.jobs: - if only_generate and not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - exit_=True - break - if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: - raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) - if not job.check_script(configuration, parameters,show_logs=job.check_warnings): - Log.warning(f'Script {job.name} has some empty variables. An empty value has substituted these variables') - else: - Log.result("Script {0} OK",job.name) - # looking for directives on jobs - self._custom_directives = self._custom_directives | set(job.custom_directives) + if len(self.jobs) < thread_number or str(configuration.experiment_data.get("CONFIG",{}).get("ENABLE_WRAPPER_THREADS","False")).lower() in ["true","yes","1"]: + self.submit_unthreaded(configuration, parameters, only_generate, hold) + Log.debug("Creating Scripts") + self._create_scripts(configuration) else: - Lhandle = list() + lhandle = list() for i in range(0, len(self.jobs), chunksize): - Lhandle.append(self.check_scripts(self.jobs[i:i + chunksize], configuration, parameters, only_generate, hold)) - for dataThread in Lhandle: + Log.debug("Checking Scripts") + lhandle.append(self.check_scripts(self.jobs[i:i + chunksize], configuration, parameters, only_generate, hold)) + for dataThread in lhandle: dataThread.join() - except AutosubmitCritical : #Raise the intended message - raise - except BaseException as e: #should be IOERROR - raise AutosubmitCritical( - "Error on {1}, template [{0}] still does not exists in running time(check=on_submission activated) ".format(job.file,job.name), 7014) - Log.debug("Creating Scripts") - if not exit_: - if len(self.jobs) < thread_number: - self._create_scripts(configuration) - else: - Lhandle = list() for i in range(0, len(self.jobs), chunksize): - Lhandle.append(self._create_scripts_threaded(self.jobs[i:i + chunksize],configuration)) - for dataThread in Lhandle: + Log.debug("Creating Scripts") + lhandle.append(self._create_scripts_threaded(self.jobs[i:i + chunksize], configuration)) + for dataThread in lhandle: dataThread.join() self._common_script = self._create_common_script() + except AutosubmitCritical: + raise + except BaseException as e: + raise AutosubmitCritical("Error while building the scripts: {0}".format(e), 7013) + try: if not only_generate: Log.debug("Sending Files") self._send_files() Log.debug("Submitting") self._do_submission(hold=hold) + except AutosubmitCritical: + raise + except BaseException as e: + raise AutosubmitCritical("Error while submitting jobs: {0}".format(e), 7013) + def _create_scripts(self, configuration): diff --git a/docs/source/userguide/configure/develop_a_project.rst b/docs/source/userguide/configure/develop_a_project.rst index 0a2346f97efdbf512c413c5a0ed2d83379cf9fe0..7621b29d085e6cc1b62b975fe1ffd7ac552d4f36 100644 --- a/docs/source/userguide/configure/develop_a_project.rst +++ b/docs/source/userguide/configure/develop_a_project.rst @@ -131,6 +131,8 @@ Autosubmit configuration # DELAY_RETRY_TIME:*11 # will wait 11,110,1110,11110... # Default output type for CREATE, MONITOR, SET STATUS, RECOVERY. Available options: pdf, svg, png, ps, txt # Default:pdf + # This parameter is used to enable the use of threads in autosubmit for the wrappers. # Default False + ENABLE_WRAPPER_THREADS: False OUTPUT:pdf # wrapper definition wrappers: diff --git a/docs/source/userguide/monitor_and_check/index.rst b/docs/source/userguide/monitor_and_check/index.rst index 4dbee3148845e64eef1a9e623beda03392bef8e1..625cb6316545989bd9aa55bb50a56f37f02541b1 100644 --- a/docs/source/userguide/monitor_and_check/index.rst +++ b/docs/source/userguide/monitor_and_check/index.rst @@ -146,6 +146,10 @@ With autosubmit.lock and no (-f) force, it will only generate all files that are Without autosubmit.lock, it will generate all unless filtered by -fl,fc,fs or ft. +To generate cmd only for a single job of the section : +:: + + autosubmit inspect expid -q How to monitor an experiment ---------------------------- diff --git a/requeriments.txt b/requeriments.txt index 9c039557166321fd083e38b823fc3fd99c8e7b1b..1fc627addaf35b11fc80905898e8d9e45ca624ed 100644 --- a/requeriments.txt +++ b/requeriments.txt @@ -1,13 +1,13 @@ zipp>=3.1.0 setuptools>=60.8.2 cython -autosubmitconfigparser==1.0.58 +autosubmitconfigparser==1.0.59 paramiko>=2.9.2 bcrypt>=3.2 PyNaCl>=1.5.0 configobj>=5.0.6 python-dateutil>=2.8.2 -matplotlib<3.6 +matplotlib == 3.8.3 py3dotplus>=1.1.0 pyparsing>=3.0.7 mock>=4.0.3