diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index e05b30a59349474b64659fb4106f27277e3551cf..01e5b508906054f159c26153d8be4e426f43ccb7 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -202,6 +202,7 @@ class Autosubmit: subparser.add_argument('--txt', action='store_true', default=False, help='Generates only txt status file') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') + subparser.add_argument('-cw', '--checkwrapper', action='store_true', default=False, help='check wrappers if inspect was execute') # Stats subparser = subparsers.add_parser('stats', description="plots statistics for specified experiment") @@ -249,11 +250,14 @@ class Autosubmit: group = subparser.add_mutually_exclusive_group(required=True) group.add_argument('-o', '--offer', action="store_true", default=False, help='Offer experiment') group.add_argument('-p', '--pickup', action="store_true", default=False, help='Pick-up released experiment') + # Inspect subparser = subparsers.add_parser('inspect', description="Generate all .cmd files") subparser.add_argument('expid', help='experiment identifier') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') subparser.add_argument('-f', '--force', action="store_true",help='Overwrite all cmd') + subparser.add_argument('-cw', '--checkwrapper', action='store_true', default=False, help='generate cmd for wrappers ') + group.add_argument('-fs', '--filter_status', type=str, choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), help='Select the original status to filter the list of jobs') @@ -325,6 +329,7 @@ class Autosubmit: subparser.add_argument('expid', help='experiment identifier') subparser.add_argument('-np', '--noplot', action='store_true', default=False, help='omit plot') subparser.add_argument('-s', '--save', action="store_true", default=False, help='Save changes to disk') + subparser.add_argument('-t', '--status_final', choices=('READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN', 'QUEUING', 'RUNNING'), @@ -408,7 +413,7 @@ class Autosubmit: elif args.command == 'monitor': return Autosubmit.monitor(args.expid, args.output, args.list, args.filter_chunks, args.filter_status, args.filter_type, args.hide, args.txt, args.group_by, args.expand, - args.expand_status, args.hide_groups, args.notransitive) + args.expand_status, args.hide_groups, args.notransitive,args.checkwrapper) elif args.command == 'stats': return Autosubmit.statistics(args.expid, args.filter_type, args.filter_period, args.output, args.hide, args.notransitive) @@ -421,7 +426,7 @@ class Autosubmit: return Autosubmit.check(args.expid, args.notransitive) elif args.command == 'inspect': return Autosubmit.inspect(args.expid, args.list, args.filter_chunks, args.filter_status, - args.filter_type,args.notransitive , args.force) + args.filter_type,args.notransitive , args.force,args.checkwrapper) elif args.command == 'describe': return Autosubmit.describe(args.expid) elif args.command == 'migrate': @@ -667,7 +672,7 @@ class Autosubmit: job_list.parameters = parameters @staticmethod - def inspect(expid, lst, filter_chunks, filter_status, filter_section , notransitive=False, force=False ): + def inspect(expid, lst, filter_chunks, filter_status, filter_section , notransitive=False, force=False, checkwrapper=False): """ Generates cmd files experiment. @@ -716,7 +721,9 @@ class Autosubmit: Log.debug("The Experiment name is: {0}", expid) Log.debug("Sleep: {0}", safetysleeptime) - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + job_list_original = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + job_list = copy.deepcopy(job_list_original) + job_list.packages_dict = {} Log.debug("Length of the jobs list: {0}", len(job_list)) Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) @@ -724,6 +731,8 @@ class Autosubmit: Log.debug("Checking experiment templates...") platforms_to_test = set() + + for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch @@ -736,25 +745,18 @@ class Autosubmit: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - + packages_persistence.reset_table(True) if as_conf.get_wrapper_type() != 'none': packages = packages_persistence.load() - for (exp_id, package_name, job_name) in packages: + for (exp_id, package_name, job_name) in packages: #LUNES if package_name not in job_list.packages_dict: job_list.packages_dict[package_name] = [] job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) - for package_name, jobs in job_list.packages_dict.items(): - from job.job import WrapperJob - wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, - None, - None, jobs[0].platform, as_conf) - job_list.job_package_map[jobs[0].id] = wrapper_job - - as_conf.reload() Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - + job_list.update_list(as_conf) + job_list.save() # variables to be updated on the fly safetysleeptime = as_conf.get_safetysleeptime() Log.debug("Sleep: {0}", safetysleeptime) @@ -762,67 +764,72 @@ class Autosubmit: Log.info("Starting to generate cmd scripts") if not isinstance(job_list, type([])): + jobs = [] - if (force and not locked) or (force and locked) : - Log.info("Overwritting all cmd scripts") - jobs = job_list.get_job_list() - elif locked: - Log.warning("There is a .lock file and not -f, generating only all unsubmitted cmd scripts") - jobs = job_list.get_unsubmitted() + jobs_cw = [] + if checkwrapper and ( not locked or (force and locked)): + Log.info("Generating all cmd script adapted for wrappers") + jobs = job_list.get_uncompleted() + jobs_cw = job_list.get_completed() else: - Log.info("Generating cmd scripts only for selected jobs") - - if filter_chunks: - fc = filter_chunks - Log.debug(fc) - - if fc == 'Any': - jobs = job_list.get_job_list() - else: - # noinspection PyTypeChecker - data = json.loads(Autosubmit._create_json(fc)) - for date_json in data['sds']: - date = date_json['sd'] - jobs_date = filter(lambda j: date2str(j.date) == date, job_list.get_job_list()) - - for member_json in date_json['ms']: - member = member_json['m'] - jobs_member = filter(lambda j: j.member == member, jobs_date) + if (force and not locked) or (force and locked) : + Log.info("Overwritting all cmd scripts") + jobs = job_list.get_job_list() + elif locked: + Log.warning("There is a .lock file and not -f, generating only all unsubmitted cmd scripts") + jobs = job_list.get_unsubmitted() + else: + Log.info("Generating cmd scripts only for selected jobs") + if filter_chunks: + fc = filter_chunks + Log.debug(fc) + if fc == 'Any': + jobs = job_list.get_job_list() + else: + # noinspection PyTypeChecker + data = json.loads(Autosubmit._create_json(fc)) + for date_json in data['sds']: + date = date_json['sd'] + jobs_date = filter(lambda j: date2str(j.date) == date, job_list.get_job_list()) + + for member_json in date_json['ms']: + member = member_json['m'] + jobs_member = filter(lambda j: j.member == member, jobs_date) + + for chunk_json in member_json['cs']: + chunk = int(chunk_json) + jobs = jobs + [job for job in filter(lambda j: j.chunk == chunk, jobs_member)] + + elif filter_status: + Log.debug("Filtering jobs with status {0}", filter_status) + if filter_status == 'Any': + jobs = job_list.get_job_list() + else: + fs = Autosubmit._get_status(filter_status) + jobs = [job for job in filter(lambda j: j.status == fs, job_list.get_job_list())] - for chunk_json in member_json['cs']: - chunk = int(chunk_json) - jobs = jobs + [job for job in filter(lambda j: j.chunk == chunk, jobs_member)] + elif filter_section: + ft = filter_section + Log.debug(ft) - elif filter_status: - Log.debug("Filtering jobs with status {0}", filter_status) - if filter_status == 'Any': - jobs = job_list.get_job_list() - else: - fs = Autosubmit._get_status(filter_status) - jobs = [job for job in filter(lambda j: j.status == fs, job_list.get_job_list())] + if ft == 'Any': + jobs = job_list.get_job_list() + else: + for job in job_list.get_job_list(): + if job.section == ft: + jobs.append(job) - elif filter_section: - ft = filter_section - Log.debug(ft) + elif lst: + jobs_lst = lst.split() - if ft == 'Any': - jobs = job_list.get_job_list() + if jobs == 'Any': + jobs = job_list.get_job_list() + else: + for job in job_list.get_job_list(): + if job.name in jobs_lst: + jobs.append(job) else: - for job in job_list.get_job_list(): - if job.section == ft: - jobs.append(job) - - elif lst: - jobs_lst = lst.split() - - if jobs == 'Any': jobs = job_list.get_job_list() - else: - for job in job_list.get_job_list(): - if job.name in jobs_lst: - jobs.append(job) - else: - jobs = job_list.get_job_list() referenced_jobs_to_remove = set() for job in jobs: @@ -838,10 +845,71 @@ class Autosubmit: job.parents = job.parents - referenced_jobs_to_remove - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platforms_to_test.pop(), job_list).build_packages(True,jobs) + jobs_by_platform=[] + + for platform in platforms_to_test: + [jobs_by_platform.append(job) for job in jobs if (platform is None or job._platform.name is platform.name)] + packages_to_submit, remote_dependencies_dict = JobPackager(as_conf,platform, job_list).build_packages(True,jobs_by_platform) + for package in packages_to_submit: + if hasattr(package, "name"): + job_list.packages_dict[package.name] = package.jobs + from job.job import WrapperJob + wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, package.jobs, + package._wallclock, package._num_processors, + package.platform, as_conf) + job_list.job_package_map[package.jobs[0].id] = wrapper_job + + if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: + remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id + + if isinstance(package, JobPackageThread): + packages_persistence.save(package.name, package.jobs, package._expid,True) + + package.submit(as_conf, job_list.parameters,True) + jobs_by_platform = [] + + if isinstance(jobs_cw, type([])): + referenced_jobs_to_remove = set() + for job in jobs_cw: + for child in job.children: + if child not in jobs_cw: + referenced_jobs_to_remove.add(child) + for parent in job.parents: + if parent not in jobs_cw: + referenced_jobs_to_remove.add(parent) + + for job in jobs_cw: + job.children = job.children - referenced_jobs_to_remove + job.parents = job.parents - referenced_jobs_to_remove + + for platform in platforms_to_test: + [jobs_by_platform.append(job) for job in jobs_cw if + (platform is None or job._platform.name is platform.name)] + packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, + job_list).build_packages(True, + jobs_by_platform) + for package in packages_to_submit: + if hasattr(package, "name"): + job_list.packages_dict[package.name] = package.jobs + from job.job import WrapperJob + wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, + package.jobs, + package._wallclock, package._num_processors, + package.platform, as_conf) + job_list.job_package_map[package.jobs[0].id] = wrapper_job + + if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: + remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id + + if isinstance(package, JobPackageThread): + packages_persistence.save(package.name, package.jobs, package._expid, True) + + package.submit(as_conf, job_list.parameters, True) + jobs_by_platform = [] + + + - for package in packages_to_submit: - package.submit(as_conf, job_list.parameters,True) Log.info("no more scripts to generate, now proceed to check them manually") time.sleep(safetysleeptime) return True @@ -1120,7 +1188,7 @@ class Autosubmit: @staticmethod def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, - group_by=None, expand=list(), expand_status=list(), hide_groups=False, notransitive=False): + group_by=None, expand=list(), expand_status=list(), hide_groups=False, notransitive=False, checkwrapper=False): """ Plots workflow graph for a given experiment with status of each job coded by node color. Plot is created in experiment's plot folder with name __