diff --git a/CHANGELOG b/CHANGELOG index 6a1ff2820cab872a8d6af07fc537b2ba046f7986..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,284 +0,0 @@ -3.13.0 - This has been the Autosubmit development version for many months, so it provides a lot of improvements, especially in terms of efficiency and stability. It also brings a full refactor of the wrapper module. - - -Multi-threaded wrappers were introduced in this version. -It provides the possibility to specify multiple hosts for the same platform (in a list) so it is more robust against connection issues/login failures. -In general, big experiments, with many startdates/members or featuring very big wrappers run much more efficiently with 3.13.0. -A completely new implementation of remote dependencies (PreSubmission) was introduced in this version. It helps to speed up the jobs in a Slurm platform by sending the next 10 Waiting jobs in advance to the queues. -Workflows have more flexibility by the inclusion of a new way to define dependencies for specific chunks. -Changes were made to the algorithm that handles the maximum active jobs by platform. From this version, wrapped jobs count as a single job for Autosubmit, and the maximum number of inner jobs can be defined with new wrapper parameters. -New POLICY option allowing to tune the behaviour for creating wrapper jobs (more greedy, more conservative, and a more balanced one. -Wrappers has a new option, QUEUE, that allows putting the wrapper job in a different queue than the single jobs. -There is a new log (.err, .out, COMPLETED, STAT files) recovering system, that performs re-tries (in background threads) of the log files transfer from the remote platforms in case of failure. -The user can specify a datetime or time to trigger the experiment start by sending the -st flag (plus the right format) using the autosubmit run command. -The user can specify an experiment dependency by providing the -sa (plus the right expid format) flag to the autosubmit run command. The experiment will start when the experiment specified in the -sa flag finishes. -When the user quits Autosubmit by using the CTRL+C keys, Autosubmit will make sure all threads are finished correctly before closing. -Job lifecycle information is stored in an external database that will allow users to visualize job historical information. This information is gathered in a way that does not interfere with the normal workflow (even if the information gathering fails or any of its components). Furthermore, threading is implemented to prevent unnecessary delays. - -Specific members can be selected to run by using the -rm flag with autosubmit run. Autosubmit will only run jobs belonging to the specified members. Jobs already running will be monitored and properly completed. -The git clone operation (Autosubmit create) now implements a backup procedure that will prevent loss of information in case of wrong configuration or network error. -There is an improvement of the security, now all commands that could change the workflow are locked by an owner-only mechanism. Ej: create, refresh and run. -New autosubmit dbfix expid command allowsing users to fix the database malformed error. -Custom shebang (header of the script templates) so it is possible to use Python or R templates with a specific Python/R version dependency. -Only create and run commands can update the workflow configuration and structure information. In the case of run, they will only be updated if a change is detected before the starting of the main run loop. -Increased robustness. AS will try to prevent as many errors as possible at the beginning of the run and will handle other delicate operations before run time. -Allows prioritizing a list of jobs to be run before the rest of the workflow. Via the use of the Two_step_start variable set in expdef.conf -Allows skipping jobs of the same section if their last queuing member/chunk is higher than other on queuing/waiting/ready status. -Reworked migrate command, with improvements in robustness and security. -New pklfix command to restore a corrupted local database. -New updatedescrip command to modify the experiment's description. - - -3.12.0 - In this version vertical and horizontal wrappers are fully supported. - -Horizontal-vertical wrappers are supported too. They were first developed in this Autosubmit version and they have been used in production together. -Due to technical limitations, we don't recommend to run experiments having many startdates/members (increased concurrency) or very large wrappers with 3.12.0b. As a rule of thumb, experiments with more than 10-20 members in total or wrappers with more than 50 jobs (the user can always reduce the wrapper size) may experience delays in the Autosubmit refresh cycle and generating the monitor views. -In this version, inner jobs inside QUEUING wrappers show a SUBMITTED status. This is fixed in 3.13.0. -Autosubmit migrate is not secured for big experiments, so it is recommended to backup the offered experiment (in the remote platforms) first. We encourage to use only 3.13.0 migrate. - In this version vertical and horizontal wrappers are fully supported. - Horizontal-vertical wrappers are supported too. They were first developed in this Autosubmit version and they have been used in production together. - Due to technical limitations, we don't recommend to run experiments having many startdates/members (increased concurrency) or very large wrappers with 3.12.0b. As a rule of thumb, experiments with more than 10-20 members in total or wrappers with more than 50 jobs (the user can always reduce the wrapper size) may experience delays in the Autosubmit refresh cycle and generating the monitor views. - In this version, inner jobs inside QUEUING wrappers show a SUBMITTED status. This is fixed in 3.12.1b. - - -3.11.1 - Fix minor issues - Added new command for describe experiment - -3.11.0 - Included %m% in the list of exceptions - Wrapper major refactoring - - WrapperJob, WrapperBuilder and WrapperFactory - - 2 types of hybrid wrapper: vertical-horizontal and horizontal-vertical - - Machinefiles for horizontal and hybrid - - Reduced submitting time by merging commands (rm and find) into one - Fixed stats plot for wrapped jobs - Checks for necessary configuration when defining wrapper in autosubmit.conf: - MAX_WALLCLOCK, MAX_PROCESSORS, PROCESSORS_PER_NODE - Wrapper regression tests for mn4 - Flag option not to do transitive reduction - Added the reasons for QUEUING (Reason) as returned by SLURM - Added master as default branch for git projects (otherwise clone failed if empty) - Some bug fixes: - - Bug fix for changing status of synchronized jobs - - Bug fix related to grouping - - Bug fix related to variable substitution %% - -3.10.0 - Vertical wrapper allowing mixed job types and additional constraints - Job grouping in the visualization graph - Txt output status for autosubmit monitor - Setstatus with allowing multiple job types - Host whitelist option in .autosubmitrc for autosubmit run - DELAY and SPLITS options for job configuration - Setting blank value for absent variables in project configuration - Minor bug fixes - -3.9.0 - Custom directives for the HPC resource manager headers - can be added on platforms and jobs configuration files - ~ only paramiko (LSF, SLURM and PBS) - First version with migrate experiments (to another user) - On CCA, TASKS and THREADS can be expressed in lots (e.g. 127:1) - Some bug fixes: - - QUEUE on slurm specified on directive qos instead of partition - - Variable expansion on CCA (ECMWF) headers - -3.8.1 - First version with job packages ~ only paramiko (LSF, SLURM and PBS) - - Vertical - - Horizontal - - With dependencies ~ only for vertical - Python wrapper for CCA (ECMWF) - On submission template checking - Some UX improvements - Other minor bug fixes - -3.8.0 - First version with LSF arrays: - - Include all the bug fixes & features from 3.7.7 - - NOT include the bug fixes from 3.7.8 - -3.7.8 - Some bug fixes: - - Database persistence - - Delete command - - Unarchive command - - CHUNKINI option - - Paramiko permissions - - Paramiko non-existing remote copy - -3.7.7 - Some improvements for Slurm platforms - Geo-definition of processors - New configuration variables: - - CHUNKINI - - MEMORY - - MEMORY_PER_TASK - - HYPERTHREADING - Other minor bug fixes - -3.7.6 - Fixed refresh - Fixed recovery for ECMWF - Local logs copy can be disabled - Some UX improvements - Other minor bug fixes - -3.7.5 - Fixed minor with LSF's logs - -3.7.4 - Forward dependencies - Performance improvements - Log files copied into LOCAL platform - PROCESSORS_PER_NODE/TASKS now optional - Exclusivity for MN3 (with Paramiko) - THREADS optional for ECMWF - Minor bug fixes - -3.7.3 - Fixed error with logs directives (err & out were swapped) - Added new option for MN3: SCRATCH_FREE_SPACE - PROCESSORS_PER_NODE/TASKS now available with Paramiko - Other minor bug fixes - -3.7.2 - Minor bug fixes - Regression test suite improved - Solved some problems with paramiko & ECMWF platform - -3.7.1 - Fixed issue in setstatus - Added new 'testcase' command - -3.7.0 - Big improvements on memory consumption - Added new configuration variables (default job's type, number of members..) - Added an alternative method to configure autosubmit without dialog library - UX improved (logs fixed, exceptions handled) - Fixed error with COMPLETED jobs shown as FAILED - Fixed error with LSF schedulers by default - Fixed bug on stats feature - Fixed some bugs with Git and SVN - Other minor bug fixes - -3.6.1 - Fixed an incompatibility with recent versions of radical.utils (saga) - -3.6.0 - Added multi-library communications support: SAGA & Paramiko - UX improved on some error cases - Fixed permission backwards incompatibility - Fixed authorization problems on SAGA implementation - Other minor bug fixes - -3.5.0 - Added another mechanism for SAGA errors prevention - Added no-plot option to setstatus - Added exclusivity and processes per host support for MN - Check method fixed (not working since 3.2) - Other minor bug fixes - -3.4.1 - Hot-fix ECMWF binary (bash, R, python) - Hot-fix Mail Notifications - -3.4.0 - Added email notifications support - Added mechanisms for incoherence prevention - Added mechanisms for SAGA pty errors prevention - -3.3.1 - Fixed bug with no-leap experiments - -3.3.0 - Added filters in monitor - Added support for Python jobs - Added support for R jobs - Added unitary test suite - Synchronize job param - Fixed recovery issue - Other minor bugs fixed - -3.2.0 - Changed WAIT default - Recovery without -s - Group permissions to log files - Reservation support for MN - Fixed retrials bug - Fixed rerun bug - Fixed stats bug - Other minor bugs fixed - -3.2.0b3 - SAGA related bug fixes - Minor bug fixes - -3.2.0b2 - Minor bug fixes - -3.2.0b1 - Now using SAGA for connection and queue management, adding support for more queue types - Stats revamped to provide more information and make it available earlier. - -3.1.9 - Hot fix LOCAL platform - -3.1.8 - Hot fix LOCAL platform - -3.1.7 - Fix issue StatsSnippet (job_st typo) - Fix issue recovery platforms to test - Fix issue s_rt SGE directive - -3.1.6 - Fix issue when creating, no option FILE_JOBS_CONF - Fixes in documentation - -3.1.5 - Connect fixed to use Proxy Command - Fixes in documentation - -3.1.4 - Documentation for Variables - Minor bug fixes - -3.1.3 - Minor bug fixes, mostly related to SLURM - -3.1.2 - Minor bug fixes - -3.1.1 - Fix for issue with 'noleap' calendar - -3.1.0 - Added archive and unarchive commands - -3.0.6 - Fixed bug in setstatus. - Change in test. - -3.0.5 - Fixed bug in recovery. - -3.0.4 - Fixed bug in platform headers. - Fixed bug in delete. - Added readme and changelog commands. - MAX_WAITING_JOBS and TOTAL_JOBS now defined by platform. - Simplified console output of run sub command. - -3.0.3 - Fixed bug in expid test. - -3.0.2 - Fixed bug in the local platform. - -3.0.1 - Fixed bug in config. - -3.0.0 - Restructure layout. diff --git a/VERSION b/VERSION index 4eba2a62eb71410f749c283a409a288380cd5424..ad59f742d41a5920e82962f7131fdc79d22b454d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.13.0 +3.14.0b diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 0d7d01479335504740a90c36e9981dfe85e0db5d..f613651b73c37ce716f74ec821065a8aab10f612 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -29,7 +29,7 @@ from notifications.mail_notifier import MailNotifier from bscearth.utils.date import date2str from monitor.monitor import Monitor from database.db_common import get_autosubmit_version, check_experiment_exists -from database.db_common import delete_experiment, update_experiment_descrip_version +from database.db_common import delete_experiment from experiment.experiment_common import copy_experiment from experiment.experiment_common import new_experiment from database.db_common import create_db @@ -485,12 +485,6 @@ class Autosubmit: 'pklfix', description='restore the backup of your pkl') subparser.add_argument('expid', help='experiment identifier') - # Update Description - subparser = subparsers.add_parser( - 'updatedescrip', description="Updates the experiment's description.") - subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('description', help='New description.') - # Test subparser = subparsers.add_parser( 'test', description='test experiment') @@ -633,11 +627,9 @@ class Autosubmit: return Autosubmit.database_fix(args.expid) elif args.command == 'pklfix': return Autosubmit.pkl_fix(args.expid) - elif args.command == 'updatedescrip': - return Autosubmit.update_description(args.expid, args.description) @staticmethod - def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): + def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): Log.set_console_level(console_level) expid_less = ["expid", "testcase", "install", "-v", "readme", "changelog", "configure", "unarchive"] @@ -1208,27 +1200,44 @@ class Autosubmit: :return: Nothing\n :rtype: \n """ - + Log.warning("Generating the auxiliar job_list used for the -CW flag.") job_list._job_list = jobs_filtered - # Current choice is Paramiko Submitter + parameters = as_conf.load_parameters() + date_list = as_conf.get_date_list() + if len(date_list) != len(set(date_list)): + raise AutosubmitCritical( + 'There are repeated start dates!', 7014) + num_chunks = as_conf.get_num_chunks() + chunk_ini = as_conf.get_chunk_ini() + member_list = as_conf.get_member_list() + run_only_members = as_conf.get_member_list(run_only=True) + date_format = '' + if as_conf.get_chunk_size_unit() is 'hour': + date_format = 'H' + for date in date_list: + if date.hour > 1: + date_format = 'H' + if date.minute > 1: + date_format = 'M' + wrapper_jobs = dict() + if as_conf.get_wrapper_type() == "multi": + for wrapper_section in as_conf.get_wrapper_multi(): + wrapper_jobs[wrapper_section] = as_conf.get_wrapper_jobs(wrapper_section) + wrapper_jobs["wrapper"] = as_conf.get_wrapper_jobs("wrapper") + Log.warning("Aux Job_list was generated successfully") submitter = Autosubmit._get_submitter(as_conf) - # Load platforms saves a dictionary Key: Platform Name, Value: Corresponding Platform Object submitter.load_platforms(as_conf) - # The value is retrieved from DEFAULT.HPCARCH hpcarch = as_conf.get_platform() Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) platforms_to_test = set() for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch - # Assign platform objects to each job - # noinspection PyTypeChecker job.platform = submitter.platforms[job.platform_name.lower()] - # Add object to set - # noinspection PyTypeChecker platforms_to_test.add(job.platform) - # case setstatus + job_list.check_scripts(as_conf) + job_list.update_list(as_conf, False) # Loading parameters again Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) @@ -1236,9 +1245,12 @@ class Autosubmit: unparsed_two_step_start = as_conf.get_parse_two_step_start() if unparsed_two_step_start != "": job_list.parse_two_step_start(unparsed_two_step_start) + job_list.create_dictionary(date_list, member_list, num_chunks, chunk_ini, date_format, as_conf.get_retrials(), wrapper_jobs ) + while job_list.get_active(): - Autosubmit.submit_ready_jobs( - as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers, hold=False) + Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, True, only_wrappers, hold=False) + #for job in job_list.get_uncompleted_and_not_waiting(): + # job.status = Status.COMPLETED job_list.update_list(as_conf, False) @staticmethod @@ -1260,6 +1272,7 @@ class Autosubmit: "The current host is not allowed to run Autosubmit", 7004) as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) + as_conf.check_conf_files(True) Log.info( @@ -1389,6 +1402,9 @@ class Autosubmit: try: job_list = Autosubmit.load_job_list( expid, as_conf, notransitive=notransitive) + except IOError as e: + raise AutosubmitError( + "Job_list not found", 6016, e.message) except BaseException as e: raise AutosubmitCritical( "Corrupted job_list, backup couldn't be restored", 7040, e.message) @@ -1419,6 +1435,9 @@ class Autosubmit: try: packages_persistence = JobPackagePersistence(os.path.join( BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + except IOError as e: + raise AutosubmitError( + "job_packages not found", 6016, e.message) except BaseException as e: raise AutosubmitCritical( "Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages", 7040, e.message) @@ -1427,6 +1446,9 @@ class Autosubmit: expid, "pkl", "job_packages_" + expid + ".db"), 0644) try: packages = packages_persistence.load() + except IOError as e: + raise AutosubmitError( + "job_packages not found", 6016, e.message) except BaseException as e: raise AutosubmitCritical( "Corrupted job_packages, python 2.7 and sqlite doesn't allow to restore these packages(will work on autosubmit4)", @@ -1482,16 +1504,17 @@ class Autosubmit: unparsed_two_step_start = as_conf.get_parse_two_step_start() if unparsed_two_step_start != "": job_list.parse_two_step_start(unparsed_two_step_start) - ######################### - # AUTOSUBMIT - MAIN LOOP - ######################### - # Main loop. Finishing when all jobs have been submitted + main_loop_retrials = 480 # Hard limit of tries 480 tries at 30seconds sleep each try # establish the connection to all platforms Autosubmit.restore_platforms(platforms_to_test) save = True Log.debug("Running main loop") + ######################### + # AUTOSUBMIT - MAIN LOOP + ######################### + # Main loop. Finishing when all jobs have been submitted while job_list.get_active(): try: if Autosubmit.exit: @@ -1674,53 +1697,64 @@ class Autosubmit: if Autosubmit.exit: job_list.save() time.sleep(safetysleeptime) - except AutosubmitError as e: # If an error is detected, restore all connections and job_list + except AutosubmitError as e: # If an error is detected, restore all connections and job_list Log.error("Trace: {0}", e.trace) Log.error("{1} [eCode={0}]", e.code, e.message) Log.info("Waiting 30 seconds before continue") # Save job_list if not is a failed submitted job recovery = True - try: - failed_jobs = job_list.get_failed() - failed_jobs += job_list.get_ready() - failed_names = {} - for job in failed_jobs: - if job.fail_count > 0: - failed_names[job.name] = job.fail_count - job_list = Autosubmit.load_job_list( - expid, as_conf, notransitive=notransitive) - Autosubmit._load_parameters( - as_conf, job_list, submitter.platforms) - for job in job_list.get_job_list(): - if job in failed_names: - job.fail_count = failed_names[job.name] - if job.platform_name is None: - job.platform_name = hpcarch - job.platform = submitter.platforms[job.platform_name.lower( - )] - - packages_persistence = JobPackagePersistence(os.path.join( - BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - packages = packages_persistence.load() - for (exp_id, package_name, job_name) in packages: - if package_name not in job_list.packages_dict: - job_list.packages_dict[package_name] = [] - job_list.packages_dict[package_name].append( - job_list.get_job_by_name(job_name)) - for package_name, jobs in job_list.packages_dict.items(): - from job.job import WrapperJob - for inner_job in jobs: - inner_job.packed = True - wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, - None, - None, jobs[0].platform, as_conf, jobs[0].hold) - job_list.job_package_map[jobs[0].id] = wrapper_job - save = job_list.update_list(as_conf) - job_list.save() - except BaseException as e: - raise AutosubmitCritical("Job_list couldn't be restored due I/O error to be solved on 3.14.", 7040, - e.message) + IO_issues = True + while IO_issues: + try: + failed_jobs = job_list.get_failed() + failed_jobs += job_list.get_ready() + failed_names = {} + for job in failed_jobs: + if job.fail_count > 0: + failed_names[job.name] = job.fail_count + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + if len(job_list._job_list) == 0: + sleep(5) + raise IOError + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + for job in job_list.get_job_list(): + if job in failed_names: + job.fail_count = failed_names[job.name] + if job.platform_name is None: + job.platform_name = hpcarch + job.platform = submitter.platforms[job.platform_name.lower( + )] + + packages_persistence = JobPackagePersistence(os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + packages = packages_persistence.load() + if len(job_list.job_packages_dict) > 0: + if len(packages) == 0: + sleep(5) + raise IOError + for (exp_id, package_name, job_name) in packages: + if package_name not in job_list.packages_dict: + job_list.packages_dict[package_name] = [] + job_list.packages_dict[package_name].append( + job_list.get_job_by_name(job_name)) + for package_name, jobs in job_list.packages_dict.items(): + from job.job import WrapperJob + for inner_job in jobs: + inner_job.packed = True + wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, + None, + None, jobs[0].platform, as_conf, jobs[0].hold) + job_list.job_package_map[jobs[0].id] = wrapper_job + save = job_list.update_list(as_conf) + job_list.save() + + IO_issues = False + except IOError as e: + IO_issues = True + except BaseException as e: + AutosubmitCritical("Unknown error during the recovery of the job_list",7056,e) + # Restore platforms and try again, to avoid endless loop with failed configuration, a hard limit is set. reconnected = False while not reconnected and main_loop_retrials > 0: @@ -1789,6 +1823,8 @@ class Autosubmit: raise AutosubmitCritical(message, 7000) except AutosubmitCritical as e: raise AutosubmitCritical(e.message, e.code, e.trace) + except IOError as e: + raise AutosubmitError(e.message,e.code,e.trace) except BaseException as e: raise @@ -1927,6 +1963,9 @@ class Autosubmit: raise AutosubmitCritical( "Submission failed. Check {0}: Queue, partition specified and total wallclock(sum of wallclock in case of wrapper)".format( error_msg[:-1]), 7014, e.message) + except IOError as e: + raise AutosubmitError( + "IO issues ", 6016, e.message) except BaseException as e: raise AutosubmitError( "Submission failed, this can be due a failure on the platform", 6015, e.message) @@ -2132,8 +2171,7 @@ class Autosubmit: job_list_wrappers = copy.deepcopy(job_list) jobs_wr_aux = copy.deepcopy(jobs) jobs_wr = [] - [jobs_wr.append(job) for job in jobs_wr_aux if ( - job.status == Status.READY or job.status == Status.WAITING)] + [jobs_wr.append(job) for job in jobs_wr_aux ] for job in jobs_wr: for child in job.children: if child not in jobs_wr: @@ -2145,6 +2183,8 @@ class Autosubmit: for job in jobs_wr: job.children = job.children - referenced_jobs_to_remove job.parents = job.parents - referenced_jobs_to_remove + + Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list_wrappers, jobs_wr, packages_persistence, True) @@ -2731,6 +2771,9 @@ class Autosubmit: Log.result( "Empty dirs on {0} have been successfully deleted".format(p.temp_dir)) + except IOError as e: + raise AutosubmitError( + "I/O Issues", 6016, e.message) except BaseException as e: error = True Log.printlog("The files/dirs on {0} cannot be copied to {1}.\nTRACE:{2}".format( @@ -3364,26 +3407,9 @@ class Autosubmit: Log.info("Changing {0} experiment version from {1} to {2}", expid, as_conf.get_version(), Autosubmit.autosubmit_version) - update_experiment_descrip_version( - expid, version=Autosubmit.autosubmit_version) as_conf.set_version(Autosubmit.autosubmit_version) return True - @staticmethod - def update_description(expid, new_description): - Log.info("Checking if experiment exists...") - check_experiment_exists(expid) - Log.info("Experiment found.") - Log.info("Setting {0} description to '{1}'".format( - expid, new_description)) - result = update_experiment_descrip_version( - expid, description=new_description) - if result: - Log.info("Update completed successfully.") - else: - Log.critical("Update failed.") - return True - @staticmethod def pkl_fix(expid): """ @@ -3780,10 +3806,16 @@ class Autosubmit: date_format = 'H' if date.minute > 1: date_format = 'M' + wrapper_jobs = dict() + if as_conf.get_wrapper_type() == "multi": + for wrapper_section in as_conf.get_wrapper_multi(): + wrapper_jobs[wrapper_section] = as_conf.get_wrapper_jobs(wrapper_section) + wrapper_jobs["wrapper"] = as_conf.get_wrapper_jobs("wrapper") + job_list.generate(date_list, member_list, num_chunks, chunk_ini, parameters, date_format, as_conf.get_retrials(), as_conf.get_default_job_type(), - as_conf.get_wrapper_type(), as_conf.get_wrapper_jobs(), notransitive=notransitive, update_structure=True, run_only_members=run_only_members) + as_conf.get_wrapper_type(), wrapper_jobs, notransitive=notransitive, update_structure=True, run_only_members=run_only_members) if rerun == "true": chunk_list = Autosubmit._create_json( @@ -4999,9 +5031,16 @@ class Autosubmit: date_format = 'H' if date.minute > 1: date_format = 'M' + wrapper_jobs = dict() + wrapper_jobs["wrapper"] = as_conf.get_wrapper_jobs() + if as_conf.get_wrapper_type() == "multi": + for wrapper_section in as_conf.get_wrapper_multi(): + wrapper_jobs[wrapper_section] = as_conf.get_wrapper_jobs(wrapper_section) + + job_list.generate(date_list, as_conf.get_member_list(), as_conf.get_num_chunks(), as_conf.get_chunk_ini(), as_conf.load_parameters(), date_format, as_conf.get_retrials(), - as_conf.get_default_job_type(), as_conf.get_wrapper_type(), as_conf.get_wrapper_jobs(), + as_conf.get_default_job_type(), as_conf.get_wrapper_type(), wrapper_jobs, new=False, notransitive=notransitive, run_only_members=run_only_members) if rerun == "true": diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 17923135d2acfcf0efd22be66835574845b121a3..6fce5da2055c6c2e769ade1417628e1ab932910b 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -474,12 +474,16 @@ class AutosubmitConfig(object): :return: True if everything is correct, False if it finds any error :rtype: bool """ + Log.info('\nChecking configuration files...') self.ignore_file_path = running_time self.ignore_undefined_platforms = running_time try: self.reload() + except IOError as e: + raise AutosubmitError( + "I/O Issues con config files", 6016, e.message) except (AutosubmitCritical, AutosubmitError) as e: raise except BaseException as e: @@ -544,8 +548,14 @@ class AutosubmitConfig(object): if not self.is_valid_storage_type(): self.wrong_config["Autosubmit"] += [['storage', "TYPE parameter not found"]] - if self.get_wrapper_type() != 'None': + if self.get_wrapper_type().lower() == 'multi': + list_of_wrappers = self.get_wrapper_multi() # list + for wrapper_section_name in self.get_wrapper_multi(): + self.check_wrapper_conf(wrapper_section_name) + elif self.get_wrapper_type() != 'None': self.check_wrapper_conf() + + if self.get_notifications() == 'true': for mail in self.get_mails_to(): if not self.is_valid_mail_address(mail): @@ -799,20 +809,20 @@ class AutosubmitConfig(object): "FILE_PROJECT_CONF parameter is invalid"]] return False - def check_wrapper_conf(self): - if not self.is_valid_jobs_in_wrapper(): - self.wrong_config["Wrapper"] += [['wrapper', + def check_wrapper_conf(self,wrapper_section_name="wrapper"): + if not self.is_valid_jobs_in_wrapper(wrapper_section_name): + self.wrong_config["Wrapper"] += [[wrapper_section_name, "JOBS_IN_WRAPPER contains non-defined jobs. parameter is invalid"]] - if 'horizontal' in self.get_wrapper_type(): + if 'horizontal' in self.get_wrapper_type(wrapper_section_name): if not self._platforms_parser.check_exists(self.get_platform(), 'PROCESSORS_PER_NODE'): self.wrong_config["Wrapper"] += [ - ['wrapper', "PROCESSORS_PER_NODE no exist in the horizontal-wrapper platform"]] + [wrapper_section_name, "PROCESSORS_PER_NODE no exist in the horizontal-wrapper platform"]] if not self._platforms_parser.check_exists(self.get_platform(), 'MAX_PROCESSORS'): - self.wrong_config["Wrapper"] += [['wrapper', + self.wrong_config["Wrapper"] += [[wrapper_section_name, "MAX_PROCESSORS no exist in the horizontal-wrapper platform"]] - if 'vertical' in self.get_wrapper_type(): + if 'vertical' in self.get_wrapper_type(wrapper_section_name): if not self._platforms_parser.check_exists(self.get_platform(), 'MAX_WALLCLOCK'): - self.wrong_config["Wrapper"] += [['wrapper', + self.wrong_config["Wrapper"] += [[wrapper_section_name, "MAX_WALLCLOCK no exist in the vertical-wrapper platform"]] if "Wrapper" not in self.wrong_config: Log.result('wrappers OK') @@ -831,6 +841,8 @@ class AutosubmitConfig(object): self.parser_factory, self._jobs_parser_file) self._exp_parser = AutosubmitConfig.get_parser( self.parser_factory, self._exp_parser_file) + except IOError as e: + raise AutosubmitError("IO issues during the parsing of configuration files",6014,e.message) except Exception as e: raise AutosubmitCritical( "{0} \n Repeated parameter, check if you have any uncommented value that should be commented".format(str(e)), 7014) @@ -1417,89 +1429,100 @@ class AutosubmitConfig(object): else: return False - def get_wrapper_type(self): + def get_wrapper_type(self, wrapper_section_name="wrapper"): """ - Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config + Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, MULTI NONE) the user has configured in the autosubmit's config :return: wrapper type (or none) :rtype: string """ - return self._conf_parser.get_option('wrapper', 'TYPE', 'None').lower() + return self._conf_parser.get_option(wrapper_section_name, 'TYPE', 'None').lower() + + def get_wrapper_multi(self): + """ + return the section name of the wrappers + + :return: wrapper section list + :rtype: string + """ + list_of_wrappers = self._conf_parser.get_option("wrapper", 'WRAPPER_LIST', []) + if "," in list_of_wrappers: + list_of_wrappers = list_of_wrappers.split(',') + else: + list_of_wrappers = [] + return list_of_wrappers - def get_wrapper_policy(self): + def get_wrapper_policy(self,wrapper_section_name="wrapper"): """ - Returns what kind of wrapper (VERTICAL, MIXED-VERTICAL, HORIZONTAL, HYBRID, NONE) the user has configured in the autosubmit's config + Returns what kind of policy (flexible, strict, mixed ) the user has configured in the autosubmit's config :return: wrapper type (or none) :rtype: string """ - return self._conf_parser.get_option('wrapper', 'POLICY', 'flexible').lower() + return self._conf_parser.get_option(wrapper_section_name, 'POLICY', 'flexible').lower() - def get_wrapper_jobs(self): + def get_wrapper_jobs(self,wrapper_section_name="wrapper"): """ Returns the jobs that should be wrapped, configured in the autosubmit's config :return: expression (or none) :rtype: string """ - return self._conf_parser.get_option('wrapper', 'JOBS_IN_WRAPPER', 'None') + return self._conf_parser.get_option(wrapper_section_name, 'JOBS_IN_WRAPPER', 'None') - def get_wrapper_queue(self): + def get_wrapper_queue(self,wrapper_section_name="wrapper"): """ Returns the wrapper queue if not defined, will be the one of the first job wrapped :return: expression (or none) :rtype: string """ - return self._conf_parser.get_option('wrapper', 'QUEUE', 'None') + return self._conf_parser.get_option(wrapper_section_name, 'QUEUE', 'None') - def get_min_wrapped_jobs(self): + def get_min_wrapped_jobs(self,wrapper_section_name="wrapper"): """ Returns the minim number of jobs that can be wrapped together as configured in autosubmit's config file :return: minim number of jobs (or total jobs) :rtype: int """ - return int(self._conf_parser.get_option('wrapper', 'MIN_WRAPPED', 2)) + return int(self._conf_parser.get_option(wrapper_section_name, 'MIN_WRAPPED', 2)) - def get_max_wrapped_jobs(self): + def get_max_wrapped_jobs(self,wrapper_section_name="wrapper"): """ Returns the maximum number of jobs that can be wrapped together as configured in autosubmit's config file :return: maximum number of jobs (or total jobs) :rtype: int """ - return int(self._conf_parser.get_option('wrapper', 'MAX_WRAPPED', self.get_total_jobs())) + return int(self._conf_parser.get_option(wrapper_section_name, 'MAX_WRAPPED', self.get_total_jobs())) - def get_wrapper_method(self): + def get_wrapper_method(self,wrapper_section_name="wrapper"): """ Returns the method of make the wrapper :return: method :rtype: string """ - return self._conf_parser.get_option('wrapper', 'METHOD', 'ASThread') + return self._conf_parser.get_option(wrapper_section_name, 'METHOD', 'ASThread') - def get_wrapper_check_time(self): + def get_wrapper_check_time(self,wrapper_section_name="wrapper"): """ Returns time to check the status of jobs in the wrapper :return: wrapper check time :rtype: int """ - return int(self._conf_parser.get_option('wrapper', 'CHECK_TIME_WRAPPER', self.get_safetysleeptime())) + return int(self._conf_parser.get_option(wrapper_section_name, 'CHECK_TIME_WRAPPER', self.get_safetysleeptime())) - def get_wrapper_machinefiles(self): + def get_wrapper_machinefiles(self,wrapper_section_name="wrapper"): """ Returns the strategy for creating the machinefiles in wrapper jobs :return: machinefiles function to use :rtype: string """ - return self._conf_parser.get_option('wrapper', 'MACHINEFILES', '') - def get_wrapper_export(self): - """ - Returns modules variable from wrapper + return self._conf_parser.get_option(wrapper_section_name, 'MACHINEFILES', '') :return: string :rtype: string @@ -1565,8 +1588,8 @@ class AutosubmitConfig(object): storage_type = self.get_storage_type() return storage_type in ['pkl', 'db'] - def is_valid_jobs_in_wrapper(self): - expression = self.get_wrapper_jobs() + def is_valid_jobs_in_wrapper(self,wrapper_section_name="wrapper"): + expression = self.get_wrapper_jobs(wrapper_section_name="wrapper") if expression != 'None': parser = self._jobs_parser sections = parser.sections() @@ -1614,6 +1637,8 @@ class AutosubmitConfig(object): try: with open(file_path) as f: parser.read(file_path) + except IOError as exp: + raise except Exception as exp: raise Exception( "{}\n This file and the correctness of its content are necessary.".format(str(exp))) diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index dd00ed255d27e2cda1bfdcf20604ee1f0d36775b..efe5aef7c915b567822f04d0b4abb0e8e041d1f2 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -194,52 +194,6 @@ def check_experiment_exists(name, error_on_inexistence=True): return True -def update_experiment_descrip_version(name, description=None, version=None): - """ - Updates the experiment's description and/or version - - :param name: experiment name (expid) - :rtype name: str - :param description: experiment new description - :rtype description: str - :param version: experiment autosubmit version - :rtype version: str - :return: If description has been update, True; otherwise, False. - :rtype: bool - """ - if not check_db(): - return False - try: - (conn, cursor) = open_conn() - except DbException as e: - raise AutosubmitCritical( - "Could not establish a connection to the database.", 7001, str(e)) - conn.isolation_level = None - - # Changing default unicode - conn.text_factory = str - # Conditional update - if description is not None and version is not None: - cursor.execute('update experiment set description=:description, autosubmit_version=:version where name=:name', { - 'description': description, 'version': version, 'name': name}) - elif description is not None and version is None: - cursor.execute('update experiment set description=:description where name=:name', { - 'description': description, 'name': name}) - elif version is not None and description is None: - cursor.execute('update experiment set autosubmit_version=:version where name=:name', { - 'version': version, 'name': name}) - else: - raise AutosubmitCritical( - "Not enough data to update {}.".format(name), 7005) - row = cursor.rowcount - close_conn(conn, cursor) - if row == 0: - raise AutosubmitCritical( - "Update on experiment {} failed.".format(name), 7005) - return False - return True - - def get_autosubmit_version(expid): """ Get the minimun autosubmit version needed for the experiment diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 28606c70a83d1e331d16eb995028a5734b89ae9e..ed4c94b30bba54691d0fbba1b94693a9ee2ce1f9 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -126,13 +126,23 @@ class JobList(object): old_job_list_names = [job.name for job in old_job_list] self._job_list = [job for job in old_job_list if len( job.parents) == 0 or len(set(old_job_list_names).intersection(set([jobp.name for jobp in job.parents]))) == len(job.parents)] - # for job in self._job_list: - # print("{0} {1}".format( - # job.name, Status.VALUE_TO_KEY[job.status])) - # print(job.parents) + def create_dictionary(self, date_list, member_list, num_chunks, chunk_ini, date_format, default_retrials, wrapper_jobs): + chunk_list = range(chunk_ini, num_chunks + 1) + + jobs_parser = self._get_jobs_parser() + dic_jobs = DicJobs(self, jobs_parser, date_list, member_list, + chunk_list, date_format, default_retrials) + self._dic_jobs = dic_jobs + # Perhaps this should be done by default independent of the wrapper_type supplied + for wrapper_section in wrapper_jobs: + if wrapper_jobs[wrapper_section] != 'None': + self._ordered_jobs_by_date_member[wrapper_section] = self._create_sorted_dict_jobs(wrapper_jobs[wrapper_section]) + else: + self._ordered_jobs_by_date_member[wrapper_section] = {} + pass def generate(self, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, - default_job_type, wrapper_type=None, wrapper_jobs=None, new=True, notransitive=False, update_structure=False, run_only_members=[]): + default_job_type, wrapper_type=None, wrapper_jobs=dict(), new=True, notransitive=False, update_structure=False, run_only_members=[],show_log=True): """ Creates all jobs needed for the current workflow @@ -172,8 +182,8 @@ class JobList(object): chunk_list, date_format, default_retrials) self._dic_jobs = dic_jobs priority = 0 - - Log.info("Creating jobs...") + if show_log: + Log.info("Creating jobs...") jobs_data = dict() # jobs_data includes the name of the .our and .err files of the job in LOG_expid if not new: @@ -183,11 +193,13 @@ class JobList(object): jobs_data = {str(row[0]): row for row in self.backup_load()} self._create_jobs(dic_jobs, jobs_parser, priority, default_job_type, jobs_data) - Log.info("Adding dependencies...") + if show_log: + Log.info("Adding dependencies...") self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, self.graph) - Log.info("Removing redundant dependencies...") + if show_log: + Log.info("Removing redundant dependencies...") self.update_genealogy( new, notransitive, update_structure=update_structure) for job in self._job_list: @@ -197,8 +209,9 @@ class JobList(object): # Checking for member constraints if len(run_only_members) > 0: # Found - Log.info("Considering only members {0}".format( - str(run_only_members))) + if show_log: + Log.info("Considering only members {0}".format( + str(run_only_members))) old_job_list = [job for job in self._job_list] self._job_list = [ job for job in old_job_list if job.member is None or job.member in run_only_members or job.status not in [Status.WAITING, Status.READY]] @@ -211,9 +224,13 @@ class JobList(object): job.children.add(jobc) # Perhaps this should be done by default independent of the wrapper_type supplied - if wrapper_type == 'vertical-mixed': - self._ordered_jobs_by_date_member = self._create_sorted_dict_jobs( - wrapper_jobs) + for wrapper_section in wrapper_jobs: + if wrapper_jobs[wrapper_section] != 'None': + self._ordered_jobs_by_date_member[wrapper_section] = self._create_sorted_dict_jobs(wrapper_jobs[wrapper_section]) + else: + self._ordered_jobs_by_date_member[wrapper_section] = {} + pass + @staticmethod def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, graph, option="DEPENDENCIES"): @@ -685,14 +702,17 @@ class JobList(object): date_format = 'M' return date_format - def get_ordered_jobs_by_date_member(self): + def copy_ordered_jobs_by_date_member(self): + pass + def get_ordered_jobs_by_date_member(self,section): """ Get the dictionary of jobs ordered according to wrapper's expression divided by date and member :return: jobs ordered divided by date and member :rtype: dict """ - return self._ordered_jobs_by_date_member + if len(self._ordered_jobs_by_date_member) > 0: + return self._ordered_jobs_by_date_member[section] def get_completed(self, platform=None, wrapper=False): """ @@ -729,6 +749,24 @@ class JobList(object): else: return uncompleted_jobs + def get_uncompleted_and_not_waiting(self, platform=None, wrapper=False): + """ + Returns a list of completed jobs and waiting + + :param platform: job platform + :type platform: HPCPlatform + :return: completed jobs + :rtype: list + """ + uncompleted_jobs = [job for job in self._job_list if + (platform is None or job.platform.name.lower() == platform.name.lower()) and + job.status != Status.COMPLETED and job.status != Status.WAITING] + + if wrapper: + return [job for job in uncompleted_jobs if job.packed is False] + else: + return uncompleted_jobs + def get_submitted(self, platform=None, hold=False, wrapper=False): """ Returns a list of submitted jobs diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index c7b89b7085542c6efa19af803430a934bd0a584b..5f27a4d3d7a39eacb39a1ee549a1a15dd6b4554e 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -41,11 +41,16 @@ class JobPackager(object): """ def __init__(self, as_config, platform, jobs_list, hold=False): + self.current_wrapper_section = "wrapper" self._as_config = as_config self._platform = platform self._jobs_list = jobs_list self.hold = hold - + # These are defined in the [wrapper] section of autosubmit_,conf + self.wrapper_type = dict() + self.wrapper_policy = dict() + self.wrapper_method = dict() + self.jobs_in_wrapper = dict() # Submitted + Queuing Jobs for specific Platform queuing_jobs = jobs_list.get_queuing(platform) # We now consider the running jobs count @@ -72,12 +77,22 @@ class JobPackager(object): self._max_jobs_to_submit = self._max_jobs_to_submit if self._max_jobs_to_submit > 0 else 0 self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) - # These are defined in the [wrapper] section of autosubmit_,conf - self.wrapper_type = self._as_config.get_wrapper_type() - self.wrapper_policy = self._as_config.get_wrapper_policy() - self.wrapper_method = self._as_config.get_wrapper_method().lower() + + self.wrapper_type["wrapper"] = self._as_config.get_wrapper_type() + self.wrapper_policy["wrapper"] = self._as_config.get_wrapper_policy() + self.wrapper_method["wrapper"] = self._as_config.get_wrapper_method().lower() + self.jobs_in_wrapper["wrapper"] = self._as_config.get_wrapper_jobs() + if self._as_config.get_wrapper_type() == "multi": + for wrapper_section in self._as_config.get_wrapper_multi(): + self.wrapper_type[wrapper_section] = self._as_config.get_wrapper_type(wrapper_section) + self.wrapper_policy[wrapper_section] = self._as_config.get_wrapper_policy(wrapper_section) + self.wrapper_method[wrapper_section] = self._as_config.get_wrapper_method(wrapper_section).lower() + self.jobs_in_wrapper[wrapper_section] = self._as_config.get_wrapper_jobs(wrapper_section) + + + # True or False - self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() + Log.debug( "Number of jobs available: {0}", self._max_wait_jobs_to_submit) if self.hold: @@ -134,7 +149,7 @@ class JobPackager(object): """ Returns the list of the built packages to be submitted - :return: List of packages depending on type of package, JobPackageVertical Object for 'vertical-mixed' or 'vertical'. \n + :return: List of packages depending on type of package, JobPackageVertical Object for 'vertical-mixed' or 'vertical'. :rtype: List() of JobPackageVertical """ packages_to_submit = list() @@ -199,9 +214,8 @@ class JobPackager(object): jobs_to_submit_seq = [ failed_job for failed_job in jobs_to_submit_tmp if failed_job.fail_count > 0] - jobs_to_submit_by_section = self._divide_list_by_section( - jobs_to_submit) - packages_to_submit = [] + jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) + for job in jobs_to_submit_seq: # Failed jobs at least one time job.packed = False if job.type == Type.PYTHON and not self._platform.allow_python_jobs: @@ -210,16 +224,21 @@ class JobPackager(object): package = JobPackageSimple([job]) packages_to_submit.append(package) + for section in jobs_to_submit_by_section: wrapped = False # Only if platform allows wrappers, wrapper type has been correctly defined, and job names for wrappers have been correctly defined # ('None' is a default value) or the correct section is included in the corresponding sections in [wrappers] - if self._platform.allow_wrappers and self.wrapper_type in ['horizontal', 'vertical', 'vertical-mixed', - 'vertical-horizontal', 'horizontal-vertical'] \ - and (self.jobs_in_wrapper == 'None' or section in self.jobs_in_wrapper): + wrapper_defined = False + + for wrapper_section in self.jobs_in_wrapper: + if section in self.jobs_in_wrapper[wrapper_section]: + wrapper_defined = True + self.current_wrapper_section = wrapper_section + break + if wrapper_defined and self._platform.allow_wrappers and self.wrapper_type[self.current_wrapper_section] in ['horizontal', 'vertical','vertical-horizontal', 'horizontal-vertical'] : # Trying to find the value in jobs_parser, if not, default to an autosubmit_.conf value (Looks first in [wrapper] section) - max_wrapped_jobs = int(self._as_config.jobs_parser.get_option( - section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs())) + max_wrapped_jobs = int(self._as_config.jobs_parser.get_option(section, "MAX_WRAPPED", self._as_config.get_max_wrapped_jobs(self.current_wrapper_section ))) if '&' not in section: if self._as_config.jobs_parser.has_option(section, 'DEPENDENCIES'): dependencies_keys = self._as_config.jobs_parser.get( @@ -227,7 +246,6 @@ class JobPackager(object): else: dependencies_keys = [] max_wrapper_job_by_section[section] = max_wrapped_jobs - else: multiple_sections = section.split('&') dependencies_keys = [] @@ -244,25 +262,25 @@ class JobPackager(object): for k in dependencies_keys: if "-" in k: k_divided = k.split("-") - if k_divided[0] not in self.jobs_in_wrapper: + if k_divided[0] not in self.jobs_in_wrapper[self.current_wrapper_section]: number = int(k_divided[1].strip(" ")) if number < max_wrapped_jobs: hard_limit_wrapper = number min_wrapped_jobs = min(self._as_config.jobs_parser.get_option( - section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs()), hard_limit_wrapper) + section, "MIN_WRAPPED", self._as_config.get_min_wrapped_jobs(self.current_wrapper_section)), hard_limit_wrapper) if len(self._jobs_list.jobs_to_run_first) > 0:# Allows to prepare an experiment with TWO_STEP_START and strict policy min_wrapped_jobs = 2 - if self.wrapper_type in ['vertical', 'vertical-mixed']: + if self.wrapper_type[self.current_wrapper_section] == 'vertical': wrapped = True built_packages_tmp = self._build_vertical_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, max_wrapper_job_by_section) - elif self.wrapper_type == 'horizontal': + elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': wrapped = True built_packages_tmp = self._build_horizontal_packages(jobs_to_submit_by_section[section], max_wrapped_jobs, section, max_wrapper_job_by_section) - elif self.wrapper_type in ['vertical-horizontal', 'horizontal-vertical']: + elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: wrapped = True built_packages_tmp = list() built_packages_tmp.append(self._build_hybrid_package( @@ -281,11 +299,7 @@ class JobPackager(object): if job.fail_count > 0: failed_innerjobs = True break - if len(self._jobs_list.jobs_to_run_first) > 0: - for job in aux_jobs: - p.jobs.remove(job) - - if failed_innerjobs and str(self.wrapper_policy) == "mixed": + if failed_innerjobs and str(self.wrapper_policy[self.current_wrapper_section]) == "mixed": for job in p.jobs: if job.fail_count == 0: continue @@ -316,45 +330,26 @@ class JobPackager(object): independent_inner_job and parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): deadlock = False - - if deadlock and self.wrapper_policy == "strict": + if deadlock and self.wrapper_policy[self.current_wrapper_section] == "strict": Log.debug( "Wrapper policy is set to strict, there is a deadlock so autosubmit will sleep a while") for job in p.jobs: job.packed = False - if job in self._jobs_list.jobs_to_run_first: - if job.status == Status.READY: - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - - elif deadlock and self.wrapper_policy == "mixed": + elif deadlock and self.wrapper_policy[self.current_wrapper_section] == "mixed": Log.debug( "Wrapper policy is set to mixed, there is a deadlock") for job in p.jobs: job.packed = False - if job in self._jobs_list.jobs_to_run_first: - if job.status == Status.READY: - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - else: - if job.fail_count > 0 and job.status == Status.READY: - Log.debug( - "Wrapper policy is set to semi-strict, there is a failed job that will be sent sequential") - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - elif deadlock and self.wrapper_policy != "strict" and self.wrapper_policy != "mixed": + if job.fail_count > 0 and job.status == Status.READY: + Log.debug( + "Wrapper policy is set to semi-strict, there is a failed job that will be sent sequential") + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped( + [job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + elif deadlock and self.wrapper_policy[self.current_wrapper_section] != "strict" and self.wrapper_policy[self.current_wrapper_section] != "mixed": Log.debug( "Wrapper policy is set to flexible and there is a deadlock, As will submit the jobs sequentally") for job in p.jobs: @@ -404,10 +399,13 @@ class JobPackager(object): :rtype: Dictionary Key: Section Name, Value: List(Job Object) """ # .jobs_in_wrapper defined in .conf, see constructor. - sections_split = self.jobs_in_wrapper.split() - + sections_split = set() + for jobs_in_wrapper_section in self.jobs_in_wrapper: + sections_split.update(set(self.jobs_in_wrapper[jobs_in_wrapper_section].split())) + sections_split = list(sections_split) jobs_section = dict() for job in jobs_list: + # This iterator will always return None if there is no '&' defined in the section name section = next( (s for s in sections_split if job.section in s and '&' in s), None) @@ -418,10 +416,11 @@ class JobPackager(object): jobs_section[section].append(job) return jobs_section + def _build_horizontal_packages(self, section_list, max_wrapped_jobs, section, max_wrapper_job_by_section): packages = [] horizontal_packager = JobPackagerHorizontal(section_list, self._platform.max_processors, max_wrapped_jobs, - self.max_jobs, self._platform.processors_per_node, self.wrapper_method, max_wrapper_job_by_section=max_wrapper_job_by_section) + self.max_jobs, self._platform.processors_per_node, self.wrapper_method[self.current_wrapper_section], max_wrapper_job_by_section=max_wrapper_job_by_section) package_jobs = horizontal_packager.build_horizontal_package() @@ -434,7 +433,7 @@ class JobPackager(object): jobs_resources = horizontal_packager.components_dict jobs_resources['MACHINEFILES'] = machinefile_function current_package = JobPackageHorizontal( - package_jobs, jobs_resources=jobs_resources, method=self.wrapper_method, configuration=self._as_config) + package_jobs, jobs_resources=jobs_resources, method=self.wrapper_method[self.current_wrapper_section], configuration=self._as_config) packages.append(current_package) return packages @@ -449,6 +448,8 @@ class JobPackager(object): :type max_wrapped_jobs: Integer. \n :param min_wrapped_jobs: Number of maximum jobs that can be wrapped (Can be user defined), per section. \n :type min_wrapped_jobs: Integer. \n + :param wrapper_section: Current Section + :type string :return: List of Wrapper Packages, Dictionary that details dependencies. \n :rtype: List() of JobPackageVertical(), Dictionary Key: String, Value: (Dictionary Key: Variable Name, Value: String/Int) """ @@ -457,16 +458,9 @@ class JobPackager(object): if self.max_jobs > 0: if job.packed is False: job.packed = True - if self.wrapper_type == 'vertical-mixed': - dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member() - job_vertical_packager = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, self.max_jobs, - max_wrapped_jobs, self._platform.max_wallclock, max_wrapper_job_by_section) - else: - job_vertical_packager = JobPackagerVerticalSimple([job], job.wallclock, self.max_jobs, - max_wrapped_jobs, self._platform.max_wallclock, max_wrapper_job_by_section) - - jobs_list = job_vertical_packager.build_vertical_package( - job) + dict_jobs = self._jobs_list.get_ordered_jobs_by_date_member(self.current_wrapper_section) + job_vertical_packager = JobPackagerVerticalMixed(dict_jobs, job, [job], job.wallclock, self.max_jobs,max_wrapped_jobs, self._platform.max_wallclock, max_wrapper_job_by_section) + jobs_list = job_vertical_packager.build_vertical_package(job) if job.status is Status.READY: packages.append(JobPackageVertical( @@ -487,7 +481,8 @@ class JobPackager(object): ## Create the horizontal ## horizontal_packager = JobPackagerHorizontal(jobs_list, self._platform.max_processors, max_wrapped_jobs, self.max_jobs, self._platform.processors_per_node, max_wrapper_job_by_section=max_wrapper_job_by_section) - if self.wrapper_type == 'vertical-horizontal': + + if self.wrapper_type[self.current_wrapper_section] == 'vertical-horizontal': return self._build_vertical_horizontal_package(horizontal_packager, jobs_resources) else: return self._build_horizontal_vertical_package(horizontal_packager, section, jobs_resources) @@ -526,12 +521,19 @@ class JobPackager(object): horizontal_package = horizontal_packager.build_horizontal_package() total_processors = horizontal_packager.total_processors current_package = [] - ## Create the vertical ## + actual_wrapped_jobs = len(horizontal_package) + remaining_wrapped_job_by_section = horizontal_packager.max_wrapper_job_by_section + #TODO create horizontal and vertical limit + for job in horizontal_package: + for section in horizontal_packager.max_wrapper_job_by_section: + if job.section == section: + remaining_wrapped_job_by_section[section] = remaining_wrapped_job_by_section[section] - 1 + for job in horizontal_package: job_list = JobPackagerVerticalSimple([job], job.wallclock, self.max_jobs, - horizontal_packager.max_wrapped_jobs, - self._platform.max_wallclock, horizontal_packager.max_wrapper_job_by_section).build_vertical_package(job) + horizontal_packager.max_wrapped_jobs-actual_wrapped_jobs, + self._platform.max_wallclock, remaining_wrapped_job_by_section).build_vertical_package(job) current_package.append(job_list) @@ -542,7 +544,7 @@ class JobPackager(object): for job in current_package[level]: job.level = level return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock, - jobs_resources=jobs_resources, method=self.wrapper_method, configuration=self._as_config) + jobs_resources=jobs_resources, method=self.wrapper_method[self.current_wrapper_section], configuration=self._as_config) class JobPackagerVertical(object): diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index fa232bd66ffd51280e40ffbc8e9fe6c5cc4d1cad..6457947b8c08e7b3e21851e9968b0560418444b3 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -86,7 +86,7 @@ class ParamikoPlatform(Platform): except EOFError as e: raise AutosubmitError("[{0}] not alive. Host: {1}".format( self.name, self.host), 6002, e.message) - except (AutosubmitError,AutosubmitCritical): + except (AutosubmitError,AutosubmitCritical,IOError): raise except BaseException as e: raise AutosubmitError("[{0}] connection failed for host: {1}".format(self.name, self.host), 6002, e.message) @@ -164,6 +164,9 @@ class ParamikoPlatform(Platform): self.transport.connect(username=self.user) self._ftpChannel = self._ssh.open_sftp() self.connected = True + except IOError as e: + raise AutosubmitError( + "File can't be located due an slow connection", 6016, e.message) except BaseException as e: self.connected = False if "Authentication failed." in e.message: @@ -657,6 +660,8 @@ class ParamikoPlatform(Platform): raise except AutosubmitError as e: raise + except IOError as e: + raise AutosubmitError(e.message,6016) except BaseException as e: raise AutosubmitError('Command {0} in {1} warning: {2}'.format( command, self.host, '\n'.join(stderr_readlines)), 6005, e.message) diff --git a/bin/autosubmit b/bin/autosubmit index d0e8d169cd49c12209f70318aa8d3eb90a20f684..1c0a705491746b62d91cfabc6cfb955b16cadc3e 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -22,7 +22,7 @@ import os import sys import traceback -from log.log import Log, AutosubmitCritical +from log.log import Log, AutosubmitCritical , AutosubmitError scriptdir = os.path.abspath(os.path.dirname(sys.argv[0])) assert sys.path[0] == scriptdir @@ -37,15 +37,21 @@ def main(): try: Autosubmit.parse_args() os._exit(0) + except AutosubmitError as e: + if e.trace is not None and len(e.trace) > 0: + Log.error("Trace: {0}", e.trace) + Log.critical("{1} [eCode={0}]", e.code, e.message) + Log.info("More info at https://autosubmit.readthedocs.io/en/latest/faq.html") + os._exit(1) except AutosubmitCritical as e: - if e.trace is not None: + if e.trace is not None and len(e.trace) > 0: Log.error("Trace: {0}", e.trace) Log.critical("{1} [eCode={0}]", e.code, e.message) Log.info("More info at https://autosubmit.readthedocs.io/en/latest/faq.html") os._exit(1) except Exception as e: - Log.error("Trace: {0}", str(e)) - if "temporarily unavailable" in str(e): + Log.error("Trace: {0}", str(e.message)) + if "temporarily unavailable" in str(e.message): Log.critical( "Another instance of autosubmit is running on this experiment. If this is not the case, delete autosubmit.lock", 7000) else: @@ -55,4 +61,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/docs/source/usage/run_modes/wrappers.rst b/docs/source/usage/run_modes/wrappers.rst index 15f5fb8f5133dfa649fe94cfcf537de9b3d421ee..a3aa57085eeddaea553a3dff904db6b06208d5bc 100644 --- a/docs/source/usage/run_modes/wrappers.rst +++ b/docs/source/usage/run_modes/wrappers.rst @@ -7,9 +7,9 @@ In order to understand the goal of this feature, please take a look at: https:// At the moment there are 4 types of wrappers that can be used depending on the experiment's workflow: * Vertical -* Vertical mixed * Horizontal * Hybrid (horizontal-vertical and vertical-horizontal approaches) +* Multiple wrappers - Same experiment When using the wrapper, it is useful to be able to visualize which packages are being created. So, when executing *autosubmit monitor cxxx*, a dashed box indicates the jobs that are wrapped together in the same job package. @@ -93,17 +93,17 @@ In order to be able to use the vertical wrapper, in ``platforms_cxxx.conf`` set Remember to add to each job the corresponding WALLCLOCK time. -Vertical-mixed wrapper -======================= +Vertical with multiple sections +=============================== -This is a version of the vertical wrapper that allows jobs of different types to be wrapped together. +This is a mode of the vertical wrapper that allows jobs of different types to be wrapped together. Note that the solution considers the order of the sections defined in the ``jobs_cxxx.conf`` file, so the order of the sections given in **JOBS_IN_WRAPPER** is irrelevant. Additionally, jobs are grouped within the corresponding date, member and chunk hierarchy. .. code-block:: ini [wrapper] - TYPE = vertical-mixed + TYPE = vertical JOBS_IN_WRAPPER = SIM&SIM2 # REQUIRED .. figure:: ../../workflows/vertical-mixed.png @@ -146,7 +146,7 @@ In order to be able to use the horizontal wrapper, in ``platforms_cxxx.conf`` se :alt: horizontally wrapped jobs Shared-memory Experiments -********************** +************************* There is also the possibility of setting the option **METHOD** to SRUN in the wrapper directive (**ONLY** for vertical and vertical-horizontal wrappers). @@ -205,6 +205,30 @@ Vertical-horizontal :align: center :alt: hybrid wrapper +Multiple wrappers at once +========================= +This is an special mode that allows you to use multiple **independent** wrappers on the same experiment. By using an special variable that allows to define subwrapper sections + +.. code-block:: ini + + [Wrapper] + TYPE = multi # REQUIRED + WRAPPER_LIST = wrapper_0,wrapper_1 + + [wrapper_0] + TYPE = vertical + JOBS_IN_WRAPPER = SIM + + [wrapper_1] + TYPE = vertical + JOBS_IN_WRAPPER = DA&REDUCE + +.. figure:: ../workflows/multiple_wrappers.png + :name: + :width: 100% + :align: center + :alt: multi wrapper + Summary ========================== @@ -213,10 +237,13 @@ In `autosubmit_cxxx.conf`: .. code-block:: ini # Basic Configuration of wrapper - #TYPE = {vertical,vertical-mixed,horizontal,horizontal-vertical,vertical-horizontal} # REQUIRED + #TYPE = {vertical,horizontal,horizontal-vertical,vertical-horizontal} # REQUIRED # JOBS_IN_WRAPPER = Sections that should be wrapped together ex SIM + # METHOD : Select between MACHINESFILES or Shared-Memory. # MIN_WRAPPED set the minim number of jobs that should be included in the wrapper. DEFAULT = 2 # MAX_WRAPPED set the maxim number of jobs that should be included in the wrapper. DEFAULT = TOTALJOBS + # Policy : Select the behaviour of the inner jobs Strict/Flexible/Mixed + [wrapper] TYPE = Vertical #REQUIRED diff --git a/docs/source/workflows/multiple_wrappers.png b/docs/source/workflows/multiple_wrappers.png new file mode 100644 index 0000000000000000000000000000000000000000..d214d7bd446a1edc5f778dcb1a37122394401c61 Binary files /dev/null and b/docs/source/workflows/multiple_wrappers.png differ diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index ccc1d1706f9319fff7a481937d66ffce609703e2..9077b962357ff68d77f196ef786343bf8a9a52d3 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -166,12 +166,14 @@ class TestWrappers(TestCase): self.config.get_wrapper_method = Mock(return_value='ASThread') self.config.get_wrapper_queue = Mock(return_value='debug') self.config.get_wrapper_policy = Mock(return_value='flexible') - + self.config.get = Mock(return_value='flexible') self.job_packager = JobPackager( self.config, self._platform, self.job_list) + self.job_list._ordered_jobs_by_date_member["wrapper"] = dict() ### ONE SECTION WRAPPER ### def test_returned_packages(self): + self.current_wrapper_section = {} date_list = ["d1", "d2"] member_list = ["m1", "m2"] chunk_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] @@ -212,12 +214,16 @@ class TestWrappers(TestCase): d1_m2_8_s2 = self.job_list.get_job_by_name('expid_d1_m2_8_s2') d1_m2_9_s2 = self.job_list.get_job_by_name('expid_d1_m2_9_s2') d1_m2_10_s2 = self.job_list.get_job_by_name('expid_d1_m2_10_s2') + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2, d1_m1_9_s2, d1_m1_10_s2] + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2, d1_m2_6_s2, d1_m2_7_s2, d1_m2_8_s2, d1_m2_9_s2, d1_m2_10_s2] section_list = [d1_m1_1_s2, d1_m2_1_s2] - + self.job_packager.current_wrapper_section = "wrapper" self.job_packager.max_jobs = max_jobs self.job_packager._platform.max_wallclock = max_wallclock self.job_packager.wrapper_type = 'vertical' + max_wrapped_job_by_section = {} max_wrapped_job_by_section["s1"] = max_wrapped_jobs max_wrapped_job_by_section["s2"] = max_wrapped_jobs @@ -279,6 +285,10 @@ class TestWrappers(TestCase): d1_m2_8_s2 = self.job_list.get_job_by_name('expid_d1_m2_8_s2') d1_m2_9_s2 = self.job_list.get_job_by_name('expid_d1_m2_9_s2') d1_m2_10_s2 = self.job_list.get_job_by_name('expid_d1_m2_10_s2') + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2, d1_m1_9_s2, d1_m1_10_s2] + + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2, d1_m2_6_s2, d1_m2_7_s2, d1_m2_8_s2, d1_m2_9_s2, d1_m2_10_s2] section_list = [d1_m1_1_s2, d1_m2_1_s2] @@ -301,16 +311,7 @@ class TestWrappers(TestCase): packages = [JobPackageVertical( package_m1_s2), JobPackageVertical(package_m2_s2)] - #returned_packages = returned_packages[0] - #print("max jobs test") for i in range(0, len(returned_packages)): - # print("Element " + str(i)) - # print("Returned from packager") - # for job in returned_packages[i]._jobs: - # print(job.name) - # print("Build for test") - # for _job in packages[i]._jobs: - # print(_job.name) self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) def test_returned_packages_max_wrapped_jobs(self): @@ -344,6 +345,10 @@ class TestWrappers(TestCase): d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') d1_m2_5_s2 = self.job_list.get_job_by_name('expid_d1_m2_5_s2') + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2] + + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2] section_list = [d1_m1_1_s2, d1_m2_1_s2] @@ -401,6 +406,10 @@ class TestWrappers(TestCase): d1_m2_3_s2 = self.job_list.get_job_by_name('expid_d1_m2_3_s2') d1_m2_4_s2 = self.job_list.get_job_by_name('expid_d1_m2_4_s2') d1_m2_5_s2 = self.job_list.get_job_by_name('expid_d1_m2_5_s2') + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2] + + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2] section_list = [d1_m1_1_s2, d1_m2_1_s2] @@ -453,6 +462,10 @@ class TestWrappers(TestCase): d1_m1_1_s3 = self.job_list.get_job_by_name('expid_d1_m1_1_s3') d1_m2_1_s3 = self.job_list.get_job_by_name('expid_d1_m2_1_s3') + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s3] + + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s3] section_list = [d1_m1_1_s3, d1_m2_1_s3] @@ -466,6 +479,10 @@ class TestWrappers(TestCase): max_wrapped_job_by_section["s4"] = max_wrapped_jobs returned_packages = self.job_packager._build_vertical_packages( section_list, max_wrapped_jobs,max_wrapped_job_by_section) + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s3] + + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s3] package_m1_s2 = [d1_m1_1_s3] package_m2_s2 = [d1_m2_1_s3] @@ -517,18 +534,18 @@ class TestWrappers(TestCase): d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') - self.job_list._ordered_jobs_by_date_member["d1"] = dict() - self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] - self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] section_list = [d1_m1_1_s2, d1_m2_1_s2] self.job_packager.max_jobs = max_jobs self.job_packager._platform.max_wallclock = max_wallclock - self.job_packager.wrapper_type = 'vertical-mixed' + self.job_packager.wrapper_type = 'vertical' self.job_packager.jobs_in_wrapper = wrapper_expression max_wrapped_job_by_section = {} max_wrapped_job_by_section["s1"] = max_wrapped_jobs @@ -586,19 +603,19 @@ class TestWrappers(TestCase): d1_m2_2_s3 = self.job_list.get_job_by_name('expid_d1_m2_2_s3') d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') - - self.job_list._ordered_jobs_by_date_member["d1"] = dict() - self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] - self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] section_list = [d1_m1_1_s2] self.job_packager.max_jobs = max_jobs self.job_packager._platform.max_wallclock = max_wallclock - self.job_packager.wrapper_type = 'vertical-mixed' + self.job_packager.wrapper_type = 'vertical' self.job_packager.jobs_in_wrapper = wrapper_expression max_wrapper_job_by_section = {} max_wrapper_job_by_section["s1"] = max_wrapped_jobs @@ -656,18 +673,18 @@ class TestWrappers(TestCase): d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') - self.job_list._ordered_jobs_by_date_member["d1"] = dict() - self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] - self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] section_list = [d1_m1_1_s2, d1_m2_1_s2] self.job_packager.max_jobs = max_jobs self.job_packager._platform.max_wallclock = max_wallclock - self.job_packager.wrapper_type = 'vertical-mixed' + self.job_packager.wrapper_type = 'vertical' self.job_packager.jobs_in_wrapper = wrapper_expression max_wrapped_job_by_section = {} max_wrapped_job_by_section["s1"] = max_wrapped_jobs @@ -736,18 +753,18 @@ class TestWrappers(TestCase): d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') - self.job_list._ordered_jobs_by_date_member["d1"] = dict() - self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] - self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] section_list = [d1_m1_1_s2, d1_m2_1_s2] self.job_packager.max_jobs = max_jobs self.job_packager._platform.max_wallclock = max_wallclock - self.job_packager.wrapper_type = 'vertical-mixed' + self.job_packager.wrapper_type = 'vertical' self.job_packager.jobs_in_wrapper = wrapper_expression max_wrapped_job_by_section = {} max_wrapped_job_by_section["s1"] = max_wrapped_jobs @@ -808,18 +825,18 @@ class TestWrappers(TestCase): d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') - self.job_list._ordered_jobs_by_date_member["d1"] = dict() - self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] - self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] section_list = [d1_m1_1_s2, d1_m2_1_s2] self.job_packager.max_jobs = max_jobs self.job_packager._platform.max_wallclock = max_wallclock - self.job_packager.wrapper_type = 'vertical-mixed' + self.job_packager.wrapper_type = 'vertical' self.job_packager.jobs_in_wrapper = wrapper_expression max_wrapped_job_by_section = {} max_wrapped_job_by_section["s1"] = max_wrapped_jobs @@ -891,11 +908,11 @@ class TestWrappers(TestCase): d1_m2_3_s3 = self.job_list.get_job_by_name('expid_d1_m2_3_s3') d1_m2_4_s3 = self.job_list.get_job_by_name('expid_d1_m2_4_s3') - self.job_list._ordered_jobs_by_date_member["d1"] = dict() - self.job_list._ordered_jobs_by_date_member["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"] = dict() + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m1"] = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] - self.job_list._ordered_jobs_by_date_member["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, + self.job_list._ordered_jobs_by_date_member["wrapper"]["d1"]["m2"] = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] wrapper_expression = "s2 s3" @@ -907,7 +924,7 @@ class TestWrappers(TestCase): self.job_packager.max_jobs = max_jobs self.job_packager._platform.max_wallclock = max_wallclock - self.job_packager.wrapper_type = 'vertical-mixed' + self.job_packager.wrapper_type = 'vertical' self.job_packager.jobs_in_wrapper = wrapper_expression max_wrapped_job_by_section = {} max_wrapped_job_by_section["s1"] = max_wrapped_jobs