From cbf94ccd34d3119228a048530e93ff6e35a2d2fa Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 18 Dec 2018 16:29:25 +0100 Subject: [PATCH 1/4] First approach, #239 --- autosubmit/autosubmit.py | 111 +++++++++++++++++++++++++++++++++ autosubmit/job/job_packages.py | 12 ++-- 2 files changed, 119 insertions(+), 4 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 0ff06a791..d0b6269ac 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -246,6 +246,10 @@ class Autosubmit: group = subparser.add_mutually_exclusive_group(required=True) group.add_argument('-o', '--offer', action="store_true", default=False, help='Offer experiment') group.add_argument('-p', '--pickup', action="store_true", default=False, help='Pick-up released experiment') + # Inspect + subparser = subparsers.add_parser('inspect', description="Generate all .cmd files") + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') # Check subparser = subparsers.add_parser('check', description="check configuration for specified experiment") @@ -395,6 +399,8 @@ class Autosubmit: args.expand, args.expand_status, args.notransitive) elif args.command == 'check': return Autosubmit.check(args.expid, args.notransitive) + elif args.command == 'inspect': + return Autosubmit.inspect(args.expid, args.notransitive) elif args.command == 'describe': return Autosubmit.describe(args.expid) elif args.command == 'migrate': @@ -426,6 +432,7 @@ class Autosubmit: return Autosubmit.archive(args.expid) elif args.command == 'unarchive': return Autosubmit.unarchive(args.expid) + elif args.command == 'readme': if os.path.isfile(Autosubmit.readme_path): with open(Autosubmit.readme_path) as f: @@ -631,6 +638,107 @@ class Autosubmit: platform.add_parameters(parameters, True) job_list.parameters = parameters + @staticmethod + def inspect(expid, notransitive=False): + """ + Generates cmd files experiment. + + :type expid: str + :param expid: identifier of experiment to be run + :return: True if run to the end, False otherwise + :rtype: bool + """ + if expid is None: + Log.critical("Missing experiment id") + + BasicConfig.read() + exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) + tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + if not os.path.exists(exp_path): + Log.critical("The directory %s is needed and does not exist" % exp_path) + Log.warning("Does an experiment with the given id exist?") + return 1 + Log.info("Starting inspect command") + Log.set_file(os.path.join(tmp_path, 'generate.log')) + os.system('clear') + signal.signal(signal.SIGINT, signal_handler) + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) + if not as_conf.check_conf_files(): + Log.critical('Can not generate scripts with invalid configuration') + return False + project_type = as_conf.get_project_type() + if project_type != "none": + # Check proj configuration + as_conf.check_proj() + + hpcarch = as_conf.get_platform() + + safetysleeptime = as_conf.get_safetysleeptime() + + + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + + Log.debug("The Experiment name is: {0}", expid) + Log.debug("Sleep: {0}", safetysleeptime) + + + + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + + Log.debug("Length of the jobs list: {0}", len(job_list)) + + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + + # check the job list script creation + Log.debug("Checking experiment templates...") + + platforms_to_test = set() + for job in job_list.get_job_list(): + if job.platform_name is None: + job.platform_name = hpcarch + # noinspection PyTypeChecker + job.platform = submitter.platforms[job.platform_name.lower()] + # noinspection PyTypeChecker + platforms_to_test.add(job.platform) + + job_list.check_scripts(as_conf) + + packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), + "job_packages_" + expid) + + if as_conf.get_wrapper_type() != 'none': + packages = packages_persistence.load() + for (exp_id, package_name, job_name) in packages: + if package_name not in job_list.packages_dict: + job_list.packages_dict[package_name] = [] + job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) + + for package_name, jobs in job_list.packages_dict.items(): + from job.job import WrapperJob + wrapper_job = WrapperJob(package_name, jobs[0].id, Status.SUBMITTED, 0, jobs, + None, + None, jobs[0].platform, as_conf) + job_list.job_package_map[jobs[0].id] = wrapper_job + + + as_conf.reload() + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + + # variables to be updated on the fly + safetysleeptime = as_conf.get_safetysleeptime() + Log.debug("Sleep: {0}", safetysleeptime) + #Generate + Log.info("Starting to generate cmd scripts") + for platform in platforms_to_test: + packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, job_list).build_packages() + for package in packages_to_submit: + package.submit(as_conf, job_list.parameters,True) + Log.info("no more scripts to generate, now proceed to check them manually") + time.sleep(safetysleeptime) + return True + + @staticmethod def run_experiment(expid, notransitive=False): @@ -691,6 +799,7 @@ class Autosubmit: Log.debug("The Experiment name is: {0}", expid) Log.debug("Sleep: {0}", safetysleeptime) Log.debug("Default retrials: {0}", retrials) + Log.info("Starting job submission...") pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') @@ -822,6 +931,7 @@ class Autosubmit: if Autosubmit.submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence): job_list.save() + if Autosubmit.exit: prev_status = job.status if prev_status != job.update_status(platform.check_job(job.id), @@ -898,6 +1008,7 @@ class Autosubmit: raise return save + @staticmethod def monitor(expid, file_format, lst, filter_chunks, filter_status, filter_section, hide, txt_only=False, group_by=None, expand=list(), expand_status=list(), hide_groups=False, notransitive=False): diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 4681beb99..6aba1e3ee 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -75,7 +75,7 @@ class JobPackageBase(object): """ return self._platform - def submit(self, configuration, parameters): + def submit(self, configuration, parameters,only_generate=False): for job in self.jobs: if job.check.lower() == Job.CHECK_ON_SUBMISSION: if not job.check_script(configuration, parameters): @@ -83,9 +83,13 @@ class JobPackageBase(object): job.update_parameters(configuration, parameters) # looking for directives on jobs self._custom_directives = self._custom_directives | set(job.custom_directives) - self._create_scripts(configuration) - self._send_files() - self._do_submission() + if only_generate: + self._create_scripts(configuration) + #self._send_files() + else: + self._create_scripts(configuration) + self._send_files() + self._do_submission() def _create_scripts(self, configuration): raise Exception('Not implemented') -- GitLab From 3cc5da82ee920150aba8e60831a239794fe81936 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 19 Dec 2018 11:06:34 +0100 Subject: [PATCH 2/4] autosubmit inspect created, #239 --- autosubmit/autosubmit.py | 8 ++++---- autosubmit/job/job_list.py | 11 +++++++++++ autosubmit/job/job_packager.py | 13 +++++++++---- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index d0b6269ac..d1a34bbaf 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -730,10 +730,10 @@ class Autosubmit: Log.debug("Sleep: {0}", safetysleeptime) #Generate Log.info("Starting to generate cmd scripts") - for platform in platforms_to_test: - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platform, job_list).build_packages() - for package in packages_to_submit: - package.submit(as_conf, job_list.parameters,True) + + packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platforms_to_test.pop(), job_list).build_packages(True) + for package in packages_to_submit: + package.submit(as_conf, job_list.parameters,True) Log.info("no more scripts to generate, now proceed to check them manually") time.sleep(safetysleeptime) return True diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 11d14a9f8..a4b52a17d 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -557,6 +557,17 @@ class JobList: return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.FAILED] + def get_all(self, platform=None): + """ + Returns a list of all jobs + + :param platform: job platform + :type platform: HPCPlatform + :return: ready jobs + :rtype: list + """ + return [job for job in self._job_list] + def get_ready(self, platform=None): """ Returns a list of ready jobs diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index f32d05406..7f0caf22c 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -50,7 +50,7 @@ class JobPackager(object): if len(jobs_list.get_ready(platform)) > 0: Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform))) - def build_packages(self): + def build_packages(self,only_generate=False): """ Returns the list of the built packages to be submitted @@ -59,8 +59,10 @@ class JobPackager(object): """ packages_to_submit = list() remote_dependencies_dict = dict() - - jobs_ready = self._jobs_list.get_ready(self._platform) + if only_generate: + jobs_ready = self._jobs_list.get_all() + else: + jobs_ready = self._jobs_list.get_ready(self._platform) if jobs_ready == 0: return packages_to_submit, remote_dependencies_dict if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): @@ -69,7 +71,10 @@ class JobPackager(object): available_sorted = sorted(jobs_ready, key=lambda k: k.long_name.split('_')[1][:6]) list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len(jobs_ready), self._max_jobs_to_submit) - jobs_to_submit = list_of_available[0:num_jobs_to_submit] + if only_generate: + jobs_to_submit = jobs_ready + else: + jobs_to_submit = list_of_available[0:num_jobs_to_submit] jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) -- GitLab From f6510d1bf61c3a47e4a553282defc6de49942f77 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 9 Jan 2019 16:43:48 +0100 Subject: [PATCH 3/4] Implemented point 1 and 2 while 3 is on the way of miguel commentary, #239 --- autosubmit/autosubmit.py | 48 +++++++++++++++++++++++++++++----- autosubmit/job/job_list.py | 16 +++++++++--- autosubmit/job/job_packager.py | 13 +++++---- 3 files changed, 60 insertions(+), 17 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index d1a34bbaf..c140ac890 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -179,6 +179,7 @@ class Autosubmit: 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') subparser.add_argument('--hide_groups', action='store_true', default=False, help='Hides the groups from the plot') + group.add_argument('-fs', '--filter_status', type=str, choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), help='Select the original status to filter the list of jobs') @@ -250,6 +251,30 @@ class Autosubmit: subparser = subparsers.add_parser('inspect', description="Generate all .cmd files") subparser.add_argument('expid', help='experiment identifier') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') + subparser.add_argument('-f', '--force', action="store_true",help='Overwrite all cmd') + subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, + help='Groups the jobs automatically or by date, member, chunk or split') + subparser.add_argument('-expand', type=str, + help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') + + group.add_argument('-fs', '--filter_status', type=str, + choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + help='Select the original status to filter the list of jobs') + group = subparser.add_mutually_exclusive_group(required=False) + group.add_argument('-fl', '--list', type=str, + help='Supply the list of job names to be filtered. Default = "Any". ' + 'LIST = "b037_20101101_fc3_21_sim b037_20111101_fc4_26_sim"') + group.add_argument('-fc', '--filter_chunks', type=str, + help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' + 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') + group.add_argument('-fs', '--filter_status', type=str, + choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), + help='Select the original status to filter the list of jobs') + group.add_argument('-ft', '--filter_type', type=str, + help='Select the job type to filter the list of jobs') + # Check subparser = subparsers.add_parser('check', description="check configuration for specified experiment") @@ -400,7 +425,9 @@ class Autosubmit: elif args.command == 'check': return Autosubmit.check(args.expid, args.notransitive) elif args.command == 'inspect': - return Autosubmit.inspect(args.expid, args.notransitive) + return Autosubmit.inspect(args.expid, args.list, args.filter_chunks, args.filter_status, + args.filter_type, args.group_by, args.expand, + args.expand_status,args.notransitive , args.force) elif args.command == 'describe': return Autosubmit.describe(args.expid) elif args.command == 'migrate': @@ -639,7 +666,7 @@ class Autosubmit: job_list.parameters = parameters @staticmethod - def inspect(expid, notransitive=False): + def inspect(expid, lst, filter_chunks, filter_status, filter_section, group_by=None, expand=list(), expand_status=list() , notransitive=False, force=False ): """ Generates cmd files experiment. @@ -648,12 +675,18 @@ class Autosubmit: :return: True if run to the end, False otherwise :rtype: bool """ + if expid is None: Log.critical("Missing experiment id") BasicConfig.read() exp_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid) tmp_path = os.path.join(exp_path, BasicConfig.LOCAL_TMP_DIR) + if os.path.exists(os.path.join(tmp_path, 'autosubmit.lock')): + locked=True + else: + locked=False + if not os.path.exists(exp_path): Log.critical("The directory %s is needed and does not exist" % exp_path) Log.warning("Does an experiment with the given id exist?") @@ -682,14 +715,10 @@ class Autosubmit: Log.debug("The Experiment name is: {0}", expid) Log.debug("Sleep: {0}", safetysleeptime) - - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) - Log.debug("Length of the jobs list: {0}", len(job_list)) Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) - # check the job list script creation Log.debug("Checking experiment templates...") @@ -730,8 +759,13 @@ class Autosubmit: Log.debug("Sleep: {0}", safetysleeptime) #Generate Log.info("Starting to generate cmd scripts") + if force or not locked: + Log.info("Overwritting all cmd scripts") + packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platforms_to_test.pop(), job_list).build_packages(True,True) + else: + Log.info("Generating unsubmitted cmd scripts") + packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platforms_to_test.pop(),job_list).build_packages(True,False) - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platforms_to_test.pop(), job_list).build_packages(True) for package in packages_to_submit: package.submit(as_conf, job_list.parameters,True) Log.info("no more scripts to generate, now proceed to check them manually") diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index a4b52a17d..690618180 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -557,17 +557,27 @@ class JobList: return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.FAILED] + def get_unsubmitted(self, platform=None): + """ + Returns a list of unsummited jobs + + :param platform: job platform + :type platform: HPCPlatform + :return: all jobs + :rtype: list + """ + return [job for job in self._job_list if (platform is None or job.platform is platform) and + ( job.status != Status.SUBMITTED and job.status != Status.QUEUING and job.status == Status.RUNNING and job.status == Status.COMPLETED ) ] def get_all(self, platform=None): """ Returns a list of all jobs :param platform: job platform :type platform: HPCPlatform - :return: ready jobs + :return: all jobs :rtype: list """ return [job for job in self._job_list] - def get_ready(self, platform=None): """ Returns a list of ready jobs @@ -646,7 +656,7 @@ class JobList: :param platform: job platform :type platform: HPCPlatform - :return: finsihed jobs + :return: finished jobs :rtype: list """ return self.get_completed(platform) + self.get_failed(platform) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 7f0caf22c..b3bf1f353 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -50,7 +50,7 @@ class JobPackager(object): if len(jobs_list.get_ready(platform)) > 0: Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform))) - def build_packages(self,only_generate=False): + def build_packages(self,only_generate=False,force=False): """ Returns the list of the built packages to be submitted @@ -59,10 +59,7 @@ class JobPackager(object): """ packages_to_submit = list() remote_dependencies_dict = dict() - if only_generate: - jobs_ready = self._jobs_list.get_all() - else: - jobs_ready = self._jobs_list.get_ready(self._platform) + jobs_ready = self._jobs_list.get_ready(self._platform) if jobs_ready == 0: return packages_to_submit, remote_dependencies_dict if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): @@ -71,8 +68,10 @@ class JobPackager(object): available_sorted = sorted(jobs_ready, key=lambda k: k.long_name.split('_')[1][:6]) list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len(jobs_ready), self._max_jobs_to_submit) - if only_generate: - jobs_to_submit = jobs_ready + if only_generate and force: + jobs_to_submit = self._jobs_list.get_all() + elif only_generate and not force: + jobs_to_submit = self._jobs_list.get_unsubmitted() else: jobs_to_submit = list_of_available[0:num_jobs_to_submit] -- GitLab From 8fa955c0efb5b1d0a03266646713a4291f729bf2 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 11 Jan 2019 16:56:24 +0100 Subject: [PATCH 4/4] Added filter capacity, #239 --- autosubmit/autosubmit.py | 110 +++++++++++++++++++++++++++------ autosubmit/job/job_packager.py | 27 ++++---- autosubmit/job/job_packages.py | 1 + 3 files changed, 105 insertions(+), 33 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index c140ac890..d270a8235 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -252,13 +252,6 @@ class Autosubmit: subparser.add_argument('expid', help='experiment identifier') subparser.add_argument('-nt', '--notransitive', action='store_true', default=False, help='Disable transitive reduction') subparser.add_argument('-f', '--force', action="store_true",help='Overwrite all cmd') - subparser.add_argument('-group_by', choices=('date', 'member', 'chunk', 'split', 'automatic'), default=None, - help='Groups the jobs automatically or by date, member, chunk or split') - subparser.add_argument('-expand', type=str, - help='Supply the list of dates/members/chunks to filter the list of jobs. Default = "Any". ' - 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') - subparser.add_argument('-expand_status', type=str, help='Select the statuses to be expanded') - group.add_argument('-fs', '--filter_status', type=str, choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), help='Select the original status to filter the list of jobs') @@ -425,9 +418,8 @@ class Autosubmit: elif args.command == 'check': return Autosubmit.check(args.expid, args.notransitive) elif args.command == 'inspect': - return Autosubmit.inspect(args.expid, args.list, args.filter_chunks, args.filter_status, - args.filter_type, args.group_by, args.expand, - args.expand_status,args.notransitive , args.force) + return Autosubmit.inspect(args.expid, args.list, args.filter_chunks, args.filter_status, + args.filter_type,args.notransitive , args.force) elif args.command == 'describe': return Autosubmit.describe(args.expid) elif args.command == 'migrate': @@ -610,8 +602,15 @@ class Autosubmit: Log.debug("Creating temporal directory...") exp_id_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id) - os.mkdir(os.path.join(exp_id_path, "tmp")) - os.chmod(os.path.join(exp_id_path, "tmp"), 0o775) + tmp_path = os.path.join(exp_id_path, "tmp") + os.mkdir(tmp_path) + os.chmod(tmp_path, 0o775) + + Log.debug("Creating temporal remote directory...") + remote_tmp_path = os.path.join(tmp_path,"LOG_"+exp_id) + os.mkdir(remote_tmp_path) + os.chmod(remote_tmp_path, 0o775) + Log.debug("Creating pkl directory...") os.mkdir(os.path.join(exp_id_path, "pkl")) @@ -666,7 +665,7 @@ class Autosubmit: job_list.parameters = parameters @staticmethod - def inspect(expid, lst, filter_chunks, filter_status, filter_section, group_by=None, expand=list(), expand_status=list() , notransitive=False, force=False ): + def inspect(expid, lst, filter_chunks, filter_status, filter_section , notransitive=False, force=False ): """ Generates cmd files experiment. @@ -759,12 +758,85 @@ class Autosubmit: Log.debug("Sleep: {0}", safetysleeptime) #Generate Log.info("Starting to generate cmd scripts") - if force or not locked: - Log.info("Overwritting all cmd scripts") - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platforms_to_test.pop(), job_list).build_packages(True,True) - else: - Log.info("Generating unsubmitted cmd scripts") - packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platforms_to_test.pop(),job_list).build_packages(True,False) + + if not isinstance(job_list, type([])): + jobs = [] + if (force and not locked) or (force and locked) : + Log.info("Overwritting all cmd scripts") + jobs = job_list.get_job_list() + elif locked: + Log.warning("There is a .lock file and not -f, generating only all unsubmitted cmd scripts") + jobs = job_list.get_unsubmitted() + else: + Log.info("Generating cmd scripts only for selected jobs") + + if filter_chunks: + fc = filter_chunks + Log.debug(fc) + + if fc == 'Any': + jobs = job_list.get_job_list() + else: + # noinspection PyTypeChecker + data = json.loads(Autosubmit._create_json(fc)) + for date_json in data['sds']: + date = date_json['sd'] + jobs_date = filter(lambda j: date2str(j.date) == date, job_list.get_job_list()) + + for member_json in date_json['ms']: + member = member_json['m'] + jobs_member = filter(lambda j: j.member == member, jobs_date) + + for chunk_json in member_json['cs']: + chunk = int(chunk_json) + jobs = jobs + [job for job in filter(lambda j: j.chunk == chunk, jobs_member)] + + elif filter_status: + Log.debug("Filtering jobs with status {0}", filter_status) + if filter_status == 'Any': + jobs = job_list.get_job_list() + else: + fs = Autosubmit._get_status(filter_status) + jobs = [job for job in filter(lambda j: j.status == fs, job_list.get_job_list())] + + elif filter_section: + ft = filter_section + Log.debug(ft) + + if ft == 'Any': + jobs = job_list.get_job_list() + else: + for job in job_list.get_job_list(): + if job.section == ft: + jobs.append(job) + + elif lst: + jobs_lst = lst.split() + + if jobs == 'Any': + jobs = job_list.get_job_list() + else: + for job in job_list.get_job_list(): + if job.name in jobs_lst: + jobs.append(job) + else: + jobs = job_list.get_job_list() + + referenced_jobs_to_remove = set() + for job in jobs: + for child in job.children: + if child not in jobs: + referenced_jobs_to_remove.add(child) + for parent in job.parents: + if parent not in jobs: + referenced_jobs_to_remove.add(parent) + + for job in jobs: + job.children = job.children - referenced_jobs_to_remove + job.parents = job.parents - referenced_jobs_to_remove + + + packages_to_submit, remote_dependencies_dict = JobPackager(as_conf, platforms_to_test.pop(), job_list).build_packages(True,jobs) for package in packages_to_submit: package.submit(as_conf, job_list.parameters,True) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index b3bf1f353..83ad5177e 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -50,7 +50,7 @@ class JobPackager(object): if len(jobs_list.get_ready(platform)) > 0: Log.info("Jobs ready for {0}: {1}", self._platform.name, len(jobs_list.get_ready(platform))) - def build_packages(self,only_generate=False,force=False): + def build_packages(self,only_generate=False, jobs_filtered=[]): """ Returns the list of the built packages to be submitted @@ -59,20 +59,19 @@ class JobPackager(object): """ packages_to_submit = list() remote_dependencies_dict = dict() - jobs_ready = self._jobs_list.get_ready(self._platform) - if jobs_ready == 0: - return packages_to_submit, remote_dependencies_dict - if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): - return packages_to_submit, remote_dependencies_dict - - available_sorted = sorted(jobs_ready, key=lambda k: k.long_name.split('_')[1][:6]) - list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) - num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len(jobs_ready), self._max_jobs_to_submit) - if only_generate and force: - jobs_to_submit = self._jobs_list.get_all() - elif only_generate and not force: - jobs_to_submit = self._jobs_list.get_unsubmitted() + + if only_generate: + jobs_to_submit = jobs_filtered else: + jobs_ready = self._jobs_list.get_ready(self._platform) + if jobs_ready == 0: + return packages_to_submit, remote_dependencies_dict + if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): + return packages_to_submit, remote_dependencies_dict + + available_sorted = sorted(jobs_ready, key=lambda k: k.long_name.split('_')[1][:6]) + list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) + num_jobs_to_submit = min(self._max_wait_jobs_to_submit, len(jobs_ready), self._max_jobs_to_submit) jobs_to_submit = list_of_available[0:num_jobs_to_submit] jobs_to_submit_by_section = self._divide_list_by_section(jobs_to_submit) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 6aba1e3ee..7ea66870e 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -91,6 +91,7 @@ class JobPackageBase(object): self._send_files() self._do_submission() + def _create_scripts(self, configuration): raise Exception('Not implemented') -- GitLab