diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 01e5b508906054f159c26153d8be4e426f43ccb7..13ec1fe2a8e8e0928bdd81c05fe5ada869fba300 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -180,6 +180,8 @@ class Autosubmit: 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') subparser.add_argument('--hide_groups', action='store_true', default=False, help='Hides the groups from the plot') + subparser.add_argument('-cw', '--checkwrapper', action='store_true', default=False, help='Generate possible wrapper in the current workflow') + group.add_argument('-fs', '--filter_status', type=str, choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), @@ -202,7 +204,6 @@ class Autosubmit: subparser.add_argument('--txt', action='store_true', default=False, help='Generates only txt status file') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') - subparser.add_argument('-cw', '--checkwrapper', action='store_true', default=False, help='check wrappers if inspect was execute') # Stats subparser = subparsers.add_parser('stats', description="plots statistics for specified experiment") @@ -256,7 +257,7 @@ class Autosubmit: subparser.add_argument('expid', help='experiment identifier') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') subparser.add_argument('-f', '--force', action="store_true",help='Overwrite all cmd') - subparser.add_argument('-cw', '--checkwrapper', action='store_true', default=False, help='generate cmd for wrappers ') + subparser.add_argument('-cw', '--checkwrapper', action='store_true', default=False, help='Generate possible wrapper in the current workflow') group.add_argument('-fs', '--filter_status', type=str, choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), @@ -298,6 +299,7 @@ class Autosubmit: 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') + subparser.add_argument('-cw', '--checkwrapper', action='store_true', default=False, help='Generate possible wrapper in the current workflow') # Configure subparser = subparsers.add_parser('configure', description="configure database and path for autosubmit. It " @@ -356,6 +358,8 @@ class Autosubmit: 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') + subparser.add_argument('-cw', '--checkwrapper', action='store_true', default=False, help='Generate possible wrapper in the current workflow') + # Test Case subparser = subparsers.add_parser('testcase', description='create test case experiment') @@ -433,7 +437,7 @@ class Autosubmit: return Autosubmit.migrate(args.expid, args.offer, args.pickup) elif args.command == 'create': return Autosubmit.create(args.expid, args.noplot, args.hide, args.output, args.group_by, args.expand, - args.expand_status, args.notransitive) + args.expand_status, args.notransitive,args.checkwrapper) elif args.command == 'configure': if not args.advanced or (args.advanced and dialog is None): return Autosubmit.configure(args.advanced, args.databasepath, args.databasefilename, @@ -446,7 +450,7 @@ class Autosubmit: elif args.command == 'setstatus': return Autosubmit.set_status(args.expid, args.noplot, args.save, args.status_final, args.list, args.filter_chunks, args.filter_status, args.filter_type, args.hide, - args.group_by, args.expand, args.expand_status, args.notransitive) + args.group_by, args.expand, args.expand_status, args.notransitive,args.checkwrapper) elif args.command == 'testcase': return Autosubmit.testcase(args.copy, args.description, args.chunks, args.member, args.stardate, args.HPC, args.branch) @@ -709,54 +713,17 @@ class Autosubmit: if project_type != "none": # Check proj configuration as_conf.check_proj() - - hpcarch = as_conf.get_platform() - safetysleeptime = as_conf.get_safetysleeptime() - - - submitter = Autosubmit._get_submitter(as_conf) - submitter.load_platforms(as_conf) - Log.debug("The Experiment name is: {0}", expid) Log.debug("Sleep: {0}", safetysleeptime) - + packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid) + packages_persistence.reset_table(True) job_list_original = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) job_list = copy.deepcopy(job_list_original) job_list.packages_dict = {} Log.debug("Length of the jobs list: {0}", len(job_list)) - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - # check the job list script creation - Log.debug("Checking experiment templates...") - - platforms_to_test = set() - - - for job in job_list.get_job_list(): - if job.platform_name is None: - job.platform_name = hpcarch - # noinspection PyTypeChecker - job.platform = submitter.platforms[job.platform_name.lower()] - # noinspection PyTypeChecker - platforms_to_test.add(job.platform) - - job_list.check_scripts(as_conf) - - packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid) - packages_persistence.reset_table(True) - if as_conf.get_wrapper_type() != 'none': - packages = packages_persistence.load() - for (exp_id, package_name, job_name) in packages: #LUNES - if package_name not in job_list.packages_dict: - job_list.packages_dict[package_name] = [] - job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) - - as_conf.reload() - Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - job_list.update_list(as_conf) - job_list.save() # variables to be updated on the fly safetysleeptime = as_conf.get_safetysleeptime() Log.debug("Sleep: {0}", safetysleeptime) @@ -764,12 +731,12 @@ class Autosubmit: Log.info("Starting to generate cmd scripts") if not isinstance(job_list, type([])): - jobs = [] jobs_cw = [] if checkwrapper and ( not locked or (force and locked)): Log.info("Generating all cmd script adapted for wrappers") jobs = job_list.get_uncompleted() + jobs_cw = job_list.get_completed() else: if (force and not locked) or (force and locked) : @@ -830,7 +797,7 @@ class Autosubmit: jobs.append(job) else: jobs = job_list.get_job_list() - + if isinstance(jobs, type([])): referenced_jobs_to_remove = set() for job in jobs: for child in job.children: @@ -841,34 +808,9 @@ class Autosubmit: referenced_jobs_to_remove.add(parent) for job in jobs: - job.children = job.children - referenced_jobs_to_remove - job.parents = job.parents - referenced_jobs_to_remove - - - jobs_by_platform=[] - - for platform in platforms_to_test: - [jobs_by_platform.append(job) for job in jobs if (platform is None or job._platform.name is platform.name)] - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf,platform, job_list).build_packages(True,jobs_by_platform) - for package in packages_to_submit: - if hasattr(package, "name"): - job_list.packages_dict[package.name] = package.jobs - from job.job import WrapperJob - wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, package.jobs, - package._wallclock, package._num_processors, - package.platform, as_conf) - job_list.job_package_map[package.jobs[0].id] = wrapper_job - - if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: - remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id - - if isinstance(package, JobPackageThread): - packages_persistence.save(package.name, package.jobs, package._expid,True) - - package.submit(as_conf, job_list.parameters,True) - jobs_by_platform = [] - - if isinstance(jobs_cw, type([])): + job.status=Status.WAITING + Autosubmit.generate_scripts_andor_wrappers(as_conf,job_list, jobs,packages_persistence,False) + if len(jobs_cw) >0: referenced_jobs_to_remove = set() for job in jobs_cw: for child in job.children: @@ -879,41 +821,46 @@ class Autosubmit: referenced_jobs_to_remove.add(parent) for job in jobs_cw: - job.children = job.children - referenced_jobs_to_remove - job.parents = job.parents - referenced_jobs_to_remove - - for platform in platforms_to_test: - [jobs_by_platform.append(job) for job in jobs_cw if - (platform is None or job._platform.name is platform.name)] - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, - job_list).build_packages(True, - jobs_by_platform) - for package in packages_to_submit: - if hasattr(package, "name"): - job_list.packages_dict[package.name] = package.jobs - from job.job import WrapperJob - wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, - package.jobs, - package._wallclock, package._num_processors, - package.platform, as_conf) - job_list.job_package_map[package.jobs[0].id] = wrapper_job - - if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: - remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id - - if isinstance(package, JobPackageThread): - packages_persistence.save(package.name, package.jobs, package._expid, True) - - package.submit(as_conf, job_list.parameters, True) - jobs_by_platform = [] - - - + job.status = Status.WAITING + Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list, jobs_cw,packages_persistence,False) Log.info("no more scripts to generate, now proceed to check them manually") time.sleep(safetysleeptime) return True + @staticmethod + def generate_scripts_andor_wrappers(as_conf,job_list,jobs_filtered,packages_persistence,only_wrappers=False): + job_list._job_list=jobs_filtered + job_list.update_list(as_conf,False) + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + hpcarch = as_conf.get_platform() + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + platforms_to_test = set() + for job in job_list.get_job_list(): + if job.platform_name is None: + job.platform_name = hpcarch + # noinspection PyTypeChecker + job.platform = submitter.platforms[job.platform_name.lower()] + # noinspection PyTypeChecker + platforms_to_test.add(job.platform) + ## case setstatus + job_list.update_list(as_conf, False) + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + while job_list.get_active(): + Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence,True,only_wrappers) + for jobready in job_list.get_ready(): + jobready.status=Status.COMPLETED + if as_conf.get_wrapper_type() != "none": + for platform in platforms_to_test: + queuing_jobs = job_list.get_in_queue_grouped_id(platform) + for wrapper_id in job_list.job_package_map: + job_list.job_package_map[wrapper_id].status=Status.COMPLETED + for innerjob in job_list.job_package_map[wrapper_id].job_list: + innerjob.status=Status.COMPLETED + + job_list.update_list(as_conf, False) + @staticmethod @@ -1137,7 +1084,7 @@ class Autosubmit: return False @staticmethod - def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence): + def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False,only_wrappers=False): """ Gets READY jobs and send them to the platforms if there is available space on the queues @@ -1158,8 +1105,8 @@ class Autosubmit: remote_dependency = remote_dependencies_dict['dependencies'][package.name] remote_dependency_id = remote_dependencies_dict['name_to_id'][remote_dependency] package.set_job_dependency(remote_dependency_id) - - package.submit(as_conf, job_list.parameters) + if not only_wrappers: + package.submit(as_conf, job_list.parameters,inspect) if hasattr(package, "name"): job_list.packages_dict[package.name] = package.jobs @@ -1174,7 +1121,7 @@ class Autosubmit: remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id if isinstance(package, JobPackageThread): - packages_persistence.save(package.name, package.jobs, package._expid) + packages_persistence.save(package.name, package.jobs, package._expid,inspect) save = True except WrongTemplateException as e: @@ -1283,6 +1230,9 @@ class Autosubmit: else: jobs = job_list.get_job_list() + + + referenced_jobs_to_remove = set() for job in jobs: for child in job.children: @@ -1295,12 +1245,33 @@ class Autosubmit: for job in jobs: job.children = job.children - referenced_jobs_to_remove job.parents = job.parents - referenced_jobs_to_remove + #WRAPPERS + if as_conf.get_wrapper_type() != 'none' and checkwrapper: + packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid) + packages_persistence.reset_table(True) + referenced_jobs_to_remove = set() + job_list_wrappers = copy.deepcopy(job_list) + jobs_wr = copy.deepcopy(jobs) + [job for job in jobs_wr if (job.status != Status.COMPLETED)] + for job in jobs_wr: + for child in job.children: + if child not in jobs_wr: + referenced_jobs_to_remove.add(child) + for parent in job.parents: + if parent not in jobs_wr: + referenced_jobs_to_remove.add(parent) - packages = None - if as_conf.get_wrapper_type() != 'none': + for job in jobs_wr: + job.children = job.children - referenced_jobs_to_remove + job.parents = job.parents - referenced_jobs_to_remove + Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list_wrappers, jobs_wr, + packages_persistence, True) + packages = packages_persistence.load(True) + else: packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load(checkwrapper) + "job_packages_" + expid).load() sys.setrecursionlimit(50000) @@ -2262,7 +2233,7 @@ class Autosubmit: jobs_destiny) @staticmethod - def create(expid, noplot, hide, output='pdf', group_by=None, expand=list(), expand_status=list(), notransitive=False): + def create(expid, noplot, hide, output='pdf', group_by=None, expand=list(), expand_status=list(), notransitive=False,checkwrappers=False): """ Creates job list for given experiment. Configuration files must be valid before realizing this process. @@ -2372,11 +2343,37 @@ class Autosubmit: job_grouping = JobGrouping(group_by, copy.deepcopy(job_list.get_job_list()), job_list, expand_list=expand, expanded_status=status) groups_dict = job_grouping.group_jobs() - + # WRAPPERS + if as_conf.get_wrapper_type() != 'none' and checkwrappers: + packages_persistence = JobPackagePersistence( + os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid) + packages_persistence.reset_table(True) + referenced_jobs_to_remove = set() + job_list_wrappers = copy.deepcopy(job_list) + jobs_wr = job_list_wrappers.get_job_list() + for job in jobs_wr: + for child in job.children: + if child not in jobs_wr: + referenced_jobs_to_remove.add(child) + for parent in job.parents: + if parent not in jobs_wr: + referenced_jobs_to_remove.add(parent) + + for job in jobs_wr: + job.children = job.children - referenced_jobs_to_remove + job.parents = job.parents - referenced_jobs_to_remove + Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list_wrappers, jobs_wr, + packages_persistence, True) + + packages = packages_persistence.load(True) + else: + packages= None + Log.info("\nPlotting the jobs list...") monitor_exp = Monitor() monitor_exp.generate_output(expid, job_list.get_job_list(), - os.path.join(exp_path, "/tmp/LOG_", expid), output, None, not hide, + os.path.join(exp_path, "/tmp/LOG_", expid), output, packages, not hide, groups=groups_dict) Log.result("\nJob list created successfully") @@ -2485,7 +2482,7 @@ class Autosubmit: @staticmethod def set_status(expid, noplot, save, final, lst, filter_chunks, filter_status, filter_section, hide, group_by=None, - expand=list(), expand_status=list(), notransitive=False): + expand=list(), expand_status=list(), notransitive=False,checkwrapper=False): """ Set status @@ -2621,10 +2618,32 @@ class Autosubmit: Log.error("Save disabled due invalid expid, please check or/and jobs expid name") - - packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).load() - + if as_conf.get_wrapper_type() != 'none' and checkwrapper: + packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid) + packages_persistence.reset_table(True) + referenced_jobs_to_remove = set() + job_list_wrappers = copy.deepcopy(job_list) + jobs_wr = copy.deepcopy(job_list.get_job_list()) + [job for job in jobs_wr if (job.status != Status.COMPLETED)] + for job in jobs_wr: + for child in job.children: + if child not in jobs_wr: + referenced_jobs_to_remove.add(child) + for parent in job.parents: + if parent not in jobs_wr: + referenced_jobs_to_remove.add(parent) + + for job in jobs_wr: + job.children = job.children - referenced_jobs_to_remove + job.parents = job.parents - referenced_jobs_to_remove + Autosubmit.generate_scripts_andor_wrappers(as_conf, job_list_wrappers, jobs_wr, + packages_persistence, True) + + packages = packages_persistence.load(True) + else: + packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid).load() if not noplot: groups_dict = dict() if group_by: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 4cdd937d9fe1f6c0731ff2750f07d05b2b41f09d..2abd9996ad9614c6bd6e47bfc69b852508a3f591 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -704,6 +704,17 @@ class JobList: jobs_by_id[job.id].append(job) return jobs_by_id + def get_in_ready_grouped_id(self, platform): + jobs=[] + [jobs.append(job) for job in jobs if (platform is None or job._platform.name is platform.name)] + + jobs_by_id = dict() + for job in jobs: + if job.id not in jobs_by_id: + jobs_by_id[job.id] = list() + jobs_by_id[job.id].append(job) + return jobs_by_id + def sort_by_name(self): """ Returns a list of jobs sorted by name diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 83ad5177e3c127378c8e0da814fe21f1491dbd46..cb20eebd31e5d393208951f2b7847ce97bc40dbe 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -210,8 +210,7 @@ class JobPackager(object): total_wallclock = '00:00' horizontal_package = horizontal_packager.build_horizontal_package() - total_processors = horizontal_packager.total_processors - + total_processors=horizontal_packager.total_processors horizontal_packager.create_sections_order(section) horizontal_package.sort(key=lambda job: horizontal_packager.sort_by_expression(job.name)) @@ -219,13 +218,14 @@ class JobPackager(object): wallclock = job.wallclock current_package = [horizontal_package] - + #current_package = [] ## Get the next horizontal packages ## - current_package += horizontal_packager.get_next_packages(section, max_wallclock=self._platform.max_wallclock) + current_package += horizontal_packager.get_next_packages(section, max_wallclock=self._platform.max_wallclock,horizontal_vertical=True) for i in range(len(current_package)): total_wallclock = sum_str_hours(total_wallclock, wallclock) + return JobPackageHorizontalVertical(current_package, total_processors, total_wallclock, jobs_resources=jobs_resources) @@ -347,8 +347,10 @@ class JobPackagerHorizontal(object): self._sort_order_dict = dict() self._components_dict = dict() - def build_horizontal_package(self): + def build_horizontal_package(self,horizontal_vertical=False): current_package = [] + if horizontal_vertical: + self._current_processors = 0 for job in self.job_list: if self.max_jobs > 0 and len(current_package) < self.max_wrapped_jobs: self.max_jobs -= 1 @@ -379,7 +381,7 @@ class JobPackagerHorizontal(object): jobname = jobname.split('_')[-1] return self._sort_order_dict[jobname] - def get_next_packages(self, jobs_sections, max_wallclock=None, potential_dependency=None, remote_dependencies_dict=dict()): + def get_next_packages(self, jobs_sections, max_wallclock=None, potential_dependency=None, remote_dependencies_dict=dict(),horizontal_vertical=False): packages = [] job = max(self.job_list, key=attrgetter('total_wallclock')) wallclock = job.wallclock @@ -400,7 +402,7 @@ class JobPackagerHorizontal(object): next_section_list.sort(key=lambda job: self.sort_by_expression(job.name)) self.job_list = next_section_list - package_jobs = self.build_horizontal_package() + package_jobs = self.build_horizontal_package(horizontal_vertical) if package_jobs: if max_wallclock: