From 16321675666daf811eae2b86625beb621865f04f Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 21 Feb 2019 12:17:54 +0100 Subject: [PATCH 1/4] Inspect: added support for wrappers(need refine) ,#239,#367 --- autosubmit/autosubmit.py | 42 ++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index e05b30a59..89926dcd6 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -724,7 +724,10 @@ class Autosubmit: Log.debug("Checking experiment templates...") platforms_to_test = set() + + for job in job_list.get_job_list(): + if job.platform_name is None: job.platform_name = hpcarch # noinspection PyTypeChecker @@ -744,17 +747,10 @@ class Autosubmit: job_list.packages_dict[package_name] = [] job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) - for package_name, jobs in job_list.packages_dict.items(): - from job.job import WrapperJob - wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, - None, - None, jobs[0].platform, as_conf) - job_list.job_package_map[jobs[0].id] = wrapper_job - - as_conf.reload() Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - + job_list.update_list(as_conf) + job_list.save() # variables to be updated on the fly safetysleeptime = as_conf.get_safetysleeptime() Log.debug("Sleep: {0}", safetysleeptime) @@ -838,10 +834,36 @@ class Autosubmit: job.parents = job.parents - referenced_jobs_to_remove - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platforms_to_test.pop(), job_list).build_packages(True,jobs) + jobs_by_platform=[] + for platform in platforms_to_test: + [jobs_by_platform.append(job) for job in jobs if (platform is None or job._platform.name is platform.name)] + + packages_to_submit, remote_dependencies_dict = JobPackager(as_conf,platform, job_list).build_packages(True,jobs_by_platform) + jobs_by_platform = [] for package in packages_to_submit: + if hasattr(package, "name"): + job_list.packages_dict[package.name] = package.jobs + + from job.job import WrapperJob + wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.RUNNING, 0, package.jobs, + package._wallclock, package._num_processors, + package.platform, as_conf) + job_list.job_package_map[package.jobs[0].id] = wrapper_job + + if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: + remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id + + if isinstance(package, JobPackageThread): + packages_persistence.save(package.name, package.jobs, package._expid) + + save = True + job_list.update_list(as_conf,save) + package.submit(as_conf, job_list.parameters,True) + + + Log.info("no more scripts to generate, now proceed to check them manually") time.sleep(safetysleeptime) return True -- GitLab From 132f7e8098c9a912025f19024d359594f4617cc5 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 21 Feb 2019 12:57:07 +0100 Subject: [PATCH 2/4] Inspect: added support for wrapper ,#239 --- autosubmit/autosubmit.py | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 89926dcd6..1b5c83cad 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -837,30 +837,24 @@ class Autosubmit: jobs_by_platform=[] for platform in platforms_to_test: [jobs_by_platform.append(job) for job in jobs if (platform is None or job._platform.name is platform.name)] - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf,platform, job_list).build_packages(True,jobs_by_platform) - jobs_by_platform = [] - - for package in packages_to_submit: - if hasattr(package, "name"): - job_list.packages_dict[package.name] = package.jobs - - from job.job import WrapperJob - wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.RUNNING, 0, package.jobs, - package._wallclock, package._num_processors, - package.platform, as_conf) - job_list.job_package_map[package.jobs[0].id] = wrapper_job - - if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: - remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id - - if isinstance(package, JobPackageThread): - packages_persistence.save(package.name, package.jobs, package._expid) - - save = True - job_list.update_list(as_conf,save) - - package.submit(as_conf, job_list.parameters,True) + for package in packages_to_submit: + if hasattr(package, "name"): + job_list.packages_dict[package.name] = package.jobs + from job.job import WrapperJob + wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.RUNNING, 0, package.jobs, + package._wallclock, package._num_processors, + package.platform, as_conf) + job_list.job_package_map[package.jobs[0].id] = wrapper_job + + if remote_dependencies_dict and package.name in remote_dependencies_dict['name_to_id']: + remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id + + if isinstance(package, JobPackageThread): + packages_persistence.save(package.name, package.jobs, package._expid) + + package.submit(as_conf, job_list.parameters,True) + jobs_by_platform = [] -- GitLab From 2ff3806b458a701852e4e5e8cf7a160ce2d6d029 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 21 Feb 2019 15:35:17 +0100 Subject: [PATCH 3/4] Monitor: added support for wrapper without run ,#367 --- autosubmit/autosubmit.py | 14 ++++++++------ autosubmit/job/job_package_persistence.py | 23 +++++++++++++++-------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 1b5c83cad..825a6f315 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -202,6 +202,7 @@ class Autosubmit: subparser.add_argument('--txt', action='store_true', default=False, help='Generates only txt status file') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') + subparser.add_argument('-cw', '--checkwrapper', action='store_true', default=False, help='check wrappers if inspect was execute') # Stats subparser = subparsers.add_parser('stats', description="plots statistics for specified experiment") @@ -408,7 +409,7 @@ class Autosubmit: elif args.command == 'monitor': return Autosubmit.monitor(args.expid, args.output, args.list, args.filter_chunks, args.filter_status, args.filter_type, args.hide, args.txt, args.group_by, args.expand, - args.expand_status, args.hide_groups, args.notransitive) + args.expand_status, args.hide_groups, args.notransitive,args.checkwrapper) elif args.command == 'stats': return Autosubmit.statistics(args.expid, args.filter_type, args.filter_period, args.output, args.hide, args.notransitive) @@ -741,7 +742,7 @@ class Autosubmit: "job_packages_" + expid) if as_conf.get_wrapper_type() != 'none': - packages = packages_persistence.load() + packages = packages_persistence.load(True) for (exp_id, package_name, job_name) in packages: if package_name not in job_list.packages_dict: job_list.packages_dict[package_name] = [] @@ -842,7 +843,7 @@ class Autosubmit: if hasattr(package, "name"): job_list.packages_dict[package.name] = package.jobs from job.job import WrapperJob - wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.RUNNING, 0, package.jobs, + wrapper_job = WrapperJob(package.name, package.jobs[0].id, Status.SUBMITTED, 0, package.jobs, package._wallclock, package._num_processors, package.platform, as_conf) job_list.job_package_map[package.jobs[0].id] = wrapper_job @@ -851,7 +852,7 @@ class Autosubmit: remote_dependencies_dict['name_to_id'][package.name] = package.jobs[0].id if isinstance(package, JobPackageThread): - packages_persistence.save(package.name, package.jobs, package._expid) + packages_persistence.save(package.name, package.jobs, package._expid,True) package.submit(as_conf, job_list.parameters,True) jobs_by_platform = [] @@ -1136,7 +1137,7 @@ class Autosubmit: @staticmethod def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, - group_by=None, expand=list(), expand_status=list(), hide_groups=False, notransitive=False): + group_by=None, expand=list(), expand_status=list(), hide_groups=False, notransitive=False, checkwrapper=False): """ Plots workflow graph for a given experiment with status of each job coded by node color. Plot is created in experiment's plot folder with name __