diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index dec9660ac150a2cbb65a1235429ce1371388c846..bb7ea8fd9a6d65ac7daf4d3a6607e7753abc31ee 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2129,7 +2129,7 @@ class Autosubmit: Log.debug("Preparing run") # This function is called only once, when the experiment is started. It is used to initialize the experiment and to check the correctness of the configuration files. # If there are issues while running, this function will be called again to reinitialize the experiment. - job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run(expid, notransitive,start_time, start_after, run_only_members) + job_list, submitter , exp_history, host , as_conf, platforms_to_test, packages_persistence, _ = Autosubmit.prepare_run(expid, notransitive, start_time, start_after, run_only_members) except AutosubmitCritical as e: #e.message += " HINT: check the CUSTOM_DIRECTIVE syntax in your jobs configuration files." raise AutosubmitCritical(e.message, 7014, e.trace) @@ -4825,8 +4825,14 @@ class Autosubmit: job_list.add_logs(prev_job_list_logs) job_list.save() as_conf.save() - JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), - "job_packages_" + expid).reset_table() + try: + packages_persistence = JobPackagePersistence( + os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + packages_persistence.reset_table() + packages_persistence.reset_table(True) + except: + pass + groups_dict = dict() # Setting up job historical database header. Must create a new run. @@ -4863,20 +4869,15 @@ class Autosubmit: expand_list=expand, expanded_status=status) groups_dict = job_grouping.group_jobs() # WRAPPERS - if len(as_conf.experiment_data.get("WRAPPERS", {})) > 0 and check_wrappers: - as_conf.check_conf_files(True) - packages_persistence = JobPackagePersistence( - os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - packages_persistence.reset_table(True) job_list_wr = Autosubmit.load_job_list( expid, as_conf, notransitive=notransitive, monitor=True, new=False) Autosubmit.generate_scripts_andor_wrappers( as_conf, job_list_wr, job_list_wr.get_job_list(), packages_persistence, True) - packages = packages_persistence.load(True) else: packages = None + Log.info("\nPlotting the jobs list...") monitor_exp = Monitor() # if output is set, use output @@ -4909,15 +4910,9 @@ class Autosubmit: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) except AutosubmitError as e: - # TODO: == "" or is None? - if e.trace == "": - e.trace = traceback.format_exc() - raise AutosubmitError(e.message, e.code, e.trace) + raise except AutosubmitCritical as e: - # TODO: == "" or is None? - if e.trace == "" or not e.trace: - e.trace = traceback.format_exc() - raise AutosubmitCritical(e.message, e.code, e.trace) + raise except BaseException as e: raise finally: @@ -5602,6 +5597,9 @@ class Autosubmit: final_list = list(set(final_list)) performed_changes = {} for job in final_list: + if final_status in [Status.WAITING, Status.PREPARED, Status.DELAYED, Status.READY]: + job.packed = False + job.fail_count = 0 if job.status in [Status.QUEUING, Status.RUNNING, Status.SUBMITTED] and job.platform.name not in definitive_platforms: Log.printlog("JOB: [{1}] is ignored as the [{0}] platform is currently offline".format( diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 7974f0e4a72361ab0c3acf6d1b406c0ab5c4e6cd..21417ff8b1fb1f856809e8ad3f024688613735b8 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -252,6 +252,7 @@ class Job(object): self.start_time_written = False self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial + self._script = None # Inline code to be executed def _init_runtime_parameters(self): # hetjobs self.het = {'HETSIZE': 0} @@ -288,6 +289,15 @@ class Job(object): def name(self, value): self._name = value + @property + @autosubmit_parameter(name='script') + def script(self): + """Allows to launch inline code instead of using the file parameter""" + return self._script + @script.setter + def script(self, value): + self._script = value + @property @autosubmit_parameter(name='fail_count') def fail_count(self): @@ -1393,25 +1403,23 @@ class Job(object): # Get the max tasks, each element can be a str or int self.het['TASKS'] = list() if len(self.tasks) == 1: - if int(self.tasks) <= 1 and int(job_platform.processors_per_node) > 1 and int( - self.processors) > int(job_platform.processors_per_node): + if int(job_platform.processors_per_node) > 1 and int(self.tasks) > int(job_platform.processors_per_node): self.tasks = job_platform.processors_per_node for task in range(self.het['HETSIZE']): - if int(self.tasks) <= 1 < int(job_platform.processors_per_node) and int( - self.processors) > int(job_platform.processors_per_node): + if int(job_platform.processors_per_node) > 1 and int(task) > int( + job_platform.processors_per_node): self.het['TASKS'].append(str(job_platform.processors_per_node)) else: self.het['TASKS'].append(str(self.tasks)) self.tasks = str(max([int(x) for x in self.tasks])) else: for task in self.tasks: - if int(task) <= 1 < int(job_platform.processors_per_node) and int( - self.processors) > int(job_platform.processors_per_node): + if int(job_platform.processors_per_node) > 1 and int(task) > int( + job_platform.processors_per_node): task = job_platform.processors_per_node self.het['TASKS'].append(str(task)) else: - if int(self.tasks) <= 1 < int(job_platform.processors_per_node) and int( - self.processors) > int(job_platform.processors_per_node): + if job_platform.processors_per_node and int(job_platform.processors_per_node) > 1 and int(self.tasks) > int(job_platform.processors_per_node): self.tasks = job_platform.processors_per_node self.tasks = str(self.tasks) @@ -1838,6 +1846,7 @@ class Job(object): self.check = as_conf.jobs_data[self.section].get("CHECK", False) self.check_warnings = as_conf.jobs_data[self.section].get("CHECK_WARNINGS", False) self.shape = as_conf.jobs_data[self.section].get("SHAPE", "") + self.script = as_conf.jobs_data[self.section].get("SCRIPT", "") if self.checkpoint: # To activate placeholder sustitution per in the template parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name @@ -1960,34 +1969,38 @@ class Job(object): :rtype: str """ self.update_parameters(as_conf, self.parameters) - try: - if as_conf.get_project_type().lower() != "none" and len(as_conf.get_project_type()) > 0: - template_file = open(os.path.join(as_conf.get_project_dir(), self.file), 'r') - template = '' - if as_conf.get_remote_dependencies() == "true": + if self.script and self.file: + Log.warning(f"Custom script for job {self.name} is being used, file contents are ignored.") + template = self.script + else: + try: + if as_conf.get_project_type().lower() != "none" and len(as_conf.get_project_type()) > 0: + template_file = open(os.path.join(as_conf.get_project_dir(), self.file), 'r') + template = '' + if as_conf.get_remote_dependencies() == "true": + if self.type == Type.BASH: + template = 'sleep 5' + "\n" + elif self.type == Type.PYTHON2: + template = 'time.sleep(5)' + "\n" + elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: + template = 'time.sleep(5)' + "\n" + elif self.type == Type.R: + template = 'Sys.sleep(5)' + "\n" + template += template_file.read() + template_file.close() + else: if self.type == Type.BASH: - template = 'sleep 5' + "\n" + template = 'sleep 5' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: template = 'time.sleep(5)' + "\n" elif self.type == Type.R: - template = 'Sys.sleep(5)' + "\n" - template += template_file.read() - template_file.close() - else: - if self.type == Type.BASH: - template = 'sleep 5' - elif self.type == Type.PYTHON2: - template = 'time.sleep(5)' + "\n" - elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: - template = 'time.sleep(5)' + "\n" - elif self.type == Type.R: - template = 'Sys.sleep(5)' - else: - template = '' - except Exception as e: - template = '' + template = 'Sys.sleep(5)' + else: + template = '' + except Exception as e: + template = '' if self.type == Type.BASH: snippet = StatisticsSnippetBash diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 27414223f360b4486463e7b4c6a0760b40fb2321..92dc1b1bd7e8dc05b0ca4cd986ae244dd96a2a5f 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -200,7 +200,7 @@ class StatisticsSnippetPython: locale.setlocale(locale.LC_ALL, 'C') job_name_ptrn = '%CURRENT_LOGDIR%/%JOBNAME%' stat_file = open(job_name_ptrn + '_STAT', 'w') - stat_file.write('int({0:.0f})\\n'.format(time.time())) + stat_file.write(f'{int(time.time())}\\n') stat_file.close() ################### # Autosubmit Checkpoint @@ -229,7 +229,7 @@ class StatisticsSnippetPython: ################### stat_file = open(job_name_ptrn + '_STAT', 'a') - stat_file.write('int({0:.0f})\\n'.format(time.time())) + stat_file.write(f'{int(time.time())}\\n') stat_file.close() open(job_name_ptrn + '_COMPLETED', 'a').close() exit(0) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 81851902b7bd1fe3535d46e068f2782bba87fe9e..163689316fff64d4d48389874a50df9be66400b5 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2664,6 +2664,10 @@ class JobList(object): job.status = Status.FAILED job.packed = False save = True + else: + for job in [ job for job in self._job_list if job.status in [ Status.WAITING, Status.READY, Status.DELAYED, Status.PREPARED ] ]: + job.fail_count = 0 + job.packed = False # Check checkpoint jobs, the status can be Any for job in self.check_special_status(): job.status = Status.READY diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index a4ae2547fec1ac87c61ce26f02a5d64a856540af..704333d507debd93200f1cda6f8bc7e811d8ad4e 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -337,6 +337,7 @@ class JobPackager(object): Check if the packages are ready to be built :return: List of jobs ready to be built, boolean indicating if packages can't be built for other reasons ( max_total_jobs...) """ + Log.info("Calculating size limits for {0}".format(self._platform.name)) jobs_ready = list() if len(self._jobs_list.jobs_to_run_first) > 0: jobs_ready = [job for job in self._jobs_list.jobs_to_run_first if @@ -465,6 +466,7 @@ class JobPackager(object): any_simple_packages = len(non_wrapped_jobs) > 0 # Prepare packages for wrapped jobs for wrapper_name, jobs in jobs_to_wrap.items(): + Log.info(f"Building packages for {wrapper_name}") if max_jobs_to_submit == 0: break self.current_wrapper_section = wrapper_name @@ -546,13 +548,14 @@ class JobPackager(object): sections_split[wrapper_name] = section_name jobs_by_section[wrapper_name] = list() + if self.jobs_in_wrapper: + Log.info(f"Calculating wrapper packages") jobs_by_section["SIMPLE"] = [] for wrapper_name,section_name in sections_split.items(): for job in jobs_list[:]: if job.section.upper() in section_name.split("&"): jobs_by_section[wrapper_name].append(job) jobs_list.remove(job) - # jobs not in wrapper for job in (job for job in jobs_list): jobs_by_section["SIMPLE"].append(job) for wrappers in list(jobs_by_section.keys()): @@ -707,37 +710,69 @@ class JobPackagerVertical(object): self.max_wallclock = max_wallclock self.wrapper_info = wrapper_info - def build_vertical_package(self, job, level=1): + def build_vertical_package(self, job): """ Goes through the job and all the related jobs (children, or part of the same date member ordered group), finds those suitable - and groups them together into a wrapper. + and groups them together into a wrapper. (iterative-version) - :param level: - :param job: Job to be wrapped. \n - :type job: Job Object \n - :return: List of jobs that are wrapped together. \n - :rtype: List() of Job Object \n + :param job: Job to be wrapped. + :type job: Job Object + :return: List of jobs that are wrapped together. + :rtype: List() of Job Object """ - # self.jobs_list starts as only 1 member, but wrapped jobs are added in the recursion - if len(self.jobs_list) >= self.wrapper_limits["max_v"] or len(self.jobs_list) >= self.wrapper_limits["max_by_section"][job.section] or len(self.jobs_list) >= self.wrapper_limits["max"]: - return self.jobs_list - child = self.get_wrappable_child(job) - # If not None, it is wrappable - if child is not None and len(str(child)) > 0: - child.update_parameters(self.wrapper_info[-1],{}) - # Calculate total wallclock per possible wrapper - self.total_wallclock = sum_str_hours( - self.total_wallclock, child.wallclock) - # Testing against max from platform - if self.total_wallclock <= self.max_wallclock: - # Marking, this is later tested in the main loop - child.packed = True - child.level = level - self.jobs_list.append(child) - # Recursive call - return self.build_vertical_package(child, level=level + 1) - # Wrapped jobs are accumulated and returned in this list + stack = [(job, 1)] + while stack: + job, level = stack.pop() + if level % 100 == 0: + Log.info(f"Wrapper package creation is still ongoing. So far {level} jobs have been wrapped.") + if len(self.jobs_list) >= self.wrapper_limits["max_v"] or len(self.jobs_list) >= \ + self.wrapper_limits["max_by_section"][job.section] or len(self.jobs_list) >= self.wrapper_limits[ + "max"]: + continue + child = self.get_wrappable_child(job) + if child is not None and len(str(child)) > 0: + child.update_parameters(self.wrapper_info[-1], {}) + self.total_wallclock = sum_str_hours(self.total_wallclock, child.wallclock) + if self.total_wallclock <= self.max_wallclock: + child.packed = True + child.level = level + self.jobs_list.append(child) + stack.append((child, level + 1)) return self.jobs_list + # def build_vertical_package(self, job, level=1): + # """ + # Goes through the job and all the related jobs (children, or part of the same date member ordered group), finds those suitable + # and groups them together into a wrapper. + # + # :param level: + # :param job: Job to be wrapped. \n + # :type job: Job Object \n + # :return: List of jobs that are wrapped together. \n + # :rtype: List() of Job Object \n + # """ + # # print log each 100 jobs + # if level % 100 == 0: + # Log.info(f"Wrapper package creation is still ongoing. So far {level} jobs have been wrapped.") + # # self.jobs_list starts as only 1 member, but wrapped jobs are added in the recursion + # if len(self.jobs_list) >= self.wrapper_limits["max_v"] or len(self.jobs_list) >= self.wrapper_limits["max_by_section"][job.section] or len(self.jobs_list) >= self.wrapper_limits["max"]: + # return self.jobs_list + # child = self.get_wrappable_child(job) + # # If not None, it is wrappable + # if child is not None and len(str(child)) > 0: + # child.update_parameters(self.wrapper_info[-1],{}) + # # Calculate total wallclock per possible wrapper + # self.total_wallclock = sum_str_hours( + # self.total_wallclock, child.wallclock) + # # Testing against max from platform + # if self.total_wallclock <= self.max_wallclock: + # # Marking, this is later tested in the main loop + # child.packed = True + # child.level = level + # self.jobs_list.append(child) + # # Recursive call + # return self.build_vertical_package(child, level=level + 1) + # # Wrapped jobs are accumulated and returned in this list + # return self.jobs_list def get_wrappable_child(self, job): pass @@ -864,6 +899,7 @@ class JobPackagerHorizontal(object): self._sectionList = list() self._package_sections = dict() self.wrapper_info = [] + def build_horizontal_package(self, horizontal_vertical=False,wrapper_info=[]): self.wrapper_info = wrapper_info current_package = [] @@ -871,14 +907,19 @@ class JobPackagerHorizontal(object): if horizontal_vertical: self._current_processors = 0 jobs_by_section = dict() + Log.info(f"Updating inner job parameters") for job in self.job_list: job.update_parameters(self.wrapper_info[-1],{}) if job.section not in jobs_by_section: jobs_by_section[job.section] = list() jobs_by_section[job.section].append(job) + Log.info(f"Building horizontal package") + jobs_processed = 0 for section in jobs_by_section: current_package_by_section[section] = 0 for job in jobs_by_section[section]: + if jobs_processed % 100 == 0: + Log.info(f"Wrapper package creation is still ongoing. So far {jobs_processed} jobs have been wrapped.") if str(job.processors).isdigit() and str(job.nodes).isdigit() and int(job.nodes) > 1 and int(job.processors) <= 1: job.processors = 0 if job.total_processors == "": @@ -887,7 +928,7 @@ class JobPackagerHorizontal(object): job_total_processors = int(job.total_processors) if len(current_package) < self.wrapper_limits["max_h"] and len(current_package) < self.wrapper_limits["max"] and current_package_by_section[section] < self.wrapper_limits["max_by_section"][section]: if int(job.tasks) != 0 and int(job.tasks) != int(self.processors_node) and \ - int(job.tasks) < job_total_processors: + int(self.processors_node) < int(job_total_processors): nodes = int( ceil(job_total_processors / float(job.tasks))) total_processors = int(self.processors_node) * nodes @@ -896,12 +937,10 @@ class JobPackagerHorizontal(object): if (self._current_processors + total_processors) <= int(self.max_processors): current_package.append(job) self._current_processors += total_processors - else: - current_package = [job] - self._current_processors = total_processors - current_package_by_section[section] += 1 + current_package_by_section[section] += 1 else: break + jobs_processed += 1 self.create_components_dict() diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 0b677c9d7498332ceeee23e4ec9fe28155e1146b..85fc41ff5bc51ac677e941f7253027e2cb59c738 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -52,6 +52,7 @@ class JobPackageBase(object): def __init__(self, jobs): # type: (List[Job]) -> None + self.nodes = "" self._common_script = None self._jobs = jobs # type: List[Job] self._expid = jobs[0].expid # type: str @@ -59,6 +60,8 @@ class JobPackageBase(object): self.export = jobs[0].export self.x11 = jobs[0].x11 self.het = dict() + self._num_processors = '0' + self._threads = '0' try: self._tmp_path = jobs[0]._tmp_path self._platform = jobs[0]._platform @@ -290,6 +293,8 @@ class JobPackageArray(JobPackageBase): self._array_size_id = "[1-" + str(len(jobs)) + "]" self._wallclock = '00:00' self._num_processors = '0' + self._threads = '0' + for job in jobs: if job.wallclock > self._wallclock: self._wallclock = job.wallclock @@ -371,9 +376,7 @@ class JobPackageThread(JobPackageBase): # It is in charge of merging ( switch ) the wrapper info by checking if the value is defined by the user in the wrapper section, current wrapper section, job or platform in that order. # Some variables are calculated in futher functions, like num_processors and wallclock. # These variables can only be present in the wrapper itself - self.parameters = dict() - self.wallclock = '00:00' - if len(wrapper_info) > 0 : + if len(wrapper_info) > 0: self.wrapper_type = wrapper_info[0] self.wrapper_policy = wrapper_info[1] self.wrapper_method = wrapper_info[2] @@ -389,15 +392,27 @@ class JobPackageThread(JobPackageBase): # Seems like this one is not used at all in the class self._job_dependency = dependency self._common_script = None + self.executable = None + self._wallclock = '00:00' # depends on the type of wrapper - if not hasattr(self,"_num_processors"): - self._num_processors = '0' + self._jobs_resources = jobs_resources self._wrapper_factory = self.platform.wrapper self.current_wrapper_section = wrapper_section self.inner_retrials = 0 - # temporal hetjob code , to be upgraded in the future + if not hasattr(self,"_num_processors"): + self._num_processors = '0' + self.parameters = dict() + self.nodes = jobs[0].nodes if not self.nodes else self.nodes + self.queue = jobs[0].queue + self.parameters["CURRENT_QUEUE"] = self.queue + self.partition = jobs[0].partition + self.tasks = jobs[0].tasks + self.exclusive = jobs[0].exclusive + self.custom_directives = jobs[0].custom_directives + self.wallclock = '00:00' + self.reservation = jobs[0].reservation if configuration is not None: self.inner_retrials = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get("RETRIALS",self.jobs[0].retrials) @@ -412,31 +427,17 @@ class JobPackageThread(JobPackageBase): if job.export.lower() not in "none" and len(job.export) > 0: self.export = job.export break - wr_executable = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get( - "EXECUTABLE", None) - if wr_executable: - self.executable = wr_executable - else: - self.executable = jobs[0].executable + self.executable = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get( + "EXECUTABLE", self.executable) if jobs[0].het.get("HETSIZE", 1) <= 1: wr_queue = configuration.get_wrapper_queue(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if wr_queue is not None and len(str(wr_queue)) > 0: self.queue = wr_queue self.parameters["CURRENT_QUEUE"] = wr_queue - else: - self.queue = jobs[0].queue - self.parameters["CURRENT_QUEUE"] = jobs[0].queue - wr_partition = configuration.get_wrapper_partition(configuration.experiment_data["WRAPPERS"][self.current_wrapper_section]) if wr_partition and len(str(wr_partition)) > 0: self.partition = wr_partition - else: - self.partition = jobs[0].partition - wr_exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",None) - if wr_exclusive is not None: - self.exclusive = wr_exclusive - else: - self.exclusive = jobs[0].exclusive + self.exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",self.exclusive) wr_custom_directives = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("CUSTOM_DIRECTIVES",[]) # parse custom_directives if type(wr_custom_directives) is list: @@ -460,44 +461,11 @@ class JobPackageThread(JobPackageBase): wr_custom_directives = [] if len(str(wr_custom_directives)) > 0: self.custom_directives = wr_custom_directives - else: - self.custom_directives = jobs[0].custom_directives + self.tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",self.tasks) + self.nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",self.nodes) + self.reservation = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("RESERVATION",self.reservation) - wr_tasks = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("TASKS",None) - if wr_tasks: - self.tasks = wr_tasks - else: - self.tasks = jobs[0].tasks - wr_nodes = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("NODES",None) - if wr_nodes: - self.nodes = wr_nodes - else: - self.nodes = jobs[0].nodes - wr_threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("THREADS",None) - if wr_threads: - self.threads = wr_threads - else: - self.threads = jobs[0].threads - else: - self.queue = jobs[0].queue - self.parameters["CURRENT_QUEUE"] = jobs[0].queue - self.partition = jobs[0].partition - self.nodes = jobs[0].nodes - self.tasks = jobs[0].tasks - self.threads = jobs[0].threads - self.exclusive = jobs[0].exclusive - self.custom_directives = jobs[0].custom_directives - else: - self.queue = jobs[0].queue - self.parameters["CURRENT_QUEUE"] = jobs[0].queue - self.partition = jobs[0].partition - self.nodes = jobs[0].nodes - self.tasks = jobs[0].tasks - self.threads = jobs[0].threads - self.exclusive = jobs[0].exclusive - self.custom_directives = jobs[0].custom_directives self.parameters["CURRENT_PROJ"] = self._project - self.parameters["NUMTHREADS"] = self.threads self.het = jobs[0].het # Memory needs more work outside this branch @@ -507,30 +475,12 @@ class JobPackageThread(JobPackageBase): self.memory_per_task = jobs[0].memory_per_task self.parameters["NODES"] = self.nodes self.processors = self._num_processors - self.parameters["RESERVATION"] = jobs[0].reservation # have to look + self.parameters["RESERVATION"] = self.reservation # have to look self.parameters['TASKS'] = self.tasks self.parameters["EXECUTABLE"] = self.executable # have to look self.method = method - self._wrapper_data = configuration.experiment_data["WRAPPERS"][self.current_wrapper_section] - self._wrapper_data["WRAPPER"] = self - - # self._wrapper_data["TYPE"] = self.wrapper_type - # self._wrapper_data["WRAPPER_POLICY"] = self.wrapper_policy - # self._wrapper_data["INNER_RETRIALS"] = self.inner_retrials - # self._wrapper_data["RETRIALS"] = self.inner_retrials - # self._wrapper_data["EXTEND_WALLCLOCK"] = self.extensible_wallclock - # self._wrapper_data["METHOD"] = self.wrapper_method - # self._wrapper_data["EXPORT"] = self.export - # self._wrapper_data["QUEUE"] = self.queue - # self._wrapper_data["NODES"] = self.nodes - # self._wrapper_data["TASKS"] = self.tasks - # self._wrapper_data["THREADS"] = self.threads - # self._wrapper_data["PROCESSORS"] = self._num_processors - # self._wrapper_data["PARTITION"] = self.partition - # self._wrapper_data["EXCLUSIVE"] = self.exclusive - # self._wrapper_data["EXECUTABLE"] = self.executable - # self._wrapper_data["CUSTOM_DIRECTIVES"] = self.custom_directives - # self._wrapper_data["HET"] = self.het + + @property def name(self): return self._name @@ -550,7 +500,7 @@ class JobPackageThread(JobPackageBase): return jobs_scripts @property def queue(self): - if str(self._num_processors) == '1' or str(self._num_processors) == '0': + if (not str(self.nodes).isdigit() or (self.nodes.isdigit() and int(self.nodes) < 1)) and (not self._num_processors.isdigit() or (self._num_processors.isdigit() and int(self._num_processors) <= 1)): return self.platform.serial_platform.serial_queue else: return self._queue @@ -648,7 +598,7 @@ class JobPackageThreadWrapped(JobPackageThread): self._common_script = None self._wallclock = '00:00' self._num_processors = '0' - self.threads = '1' + self._threads = '1' self.current_wrapper_section = wrapper_section @@ -721,12 +671,14 @@ class JobPackageVertical(JobPackageThread): :param: dependency: """ def __init__(self, jobs, dependency=None,configuration=None,wrapper_section="WRAPPERS", wrapper_info = []): - self._num_processors = 0 + + super(JobPackageVertical, self).__init__(jobs, dependency,configuration=configuration,wrapper_section=wrapper_section, wrapper_info = wrapper_info) for job in jobs: if int(job.processors) >= int(self._num_processors): self._num_processors = job.processors self._threads = job.threads - super(JobPackageVertical, self).__init__(jobs, dependency,configuration=configuration,wrapper_section=wrapper_section, wrapper_info = wrapper_info) + self._threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get("THREADS", + self._threads) for job in jobs: self._wallclock = sum_str_hours(self._wallclock, job.wallclock) self._name = self._expid + '_' + self.FILE_PREFIX + "_{0}_{1}_{2}".format(str(int(time.time())) + @@ -808,13 +760,15 @@ class JobPackageHorizontal(JobPackageThread): def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread',configuration=None,wrapper_section="WRAPPERS"): super(JobPackageHorizontal, self).__init__(jobs, dependency, jobs_resources,configuration=configuration,wrapper_section=wrapper_section) self.method = method - self._queue = self.queue for job in jobs: if job.wallclock > self._wallclock: self._wallclock = job.wallclock self._num_processors = str(int(self._num_processors) + int(job.processors)) self._threads = job.threads + self._threads = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section, {}).get("THREADS", + self._threads) + self._name = self._expid + '_' + self.FILE_PREFIX + "_{0}_{1}_{2}".format(str(int(time.time())) + str(random.randint(1, 10000)), self._num_processors, diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index c7ae6709e1b3353a6d1382a1a01ea38af4c5cf62..dae0ccc5e2c2d5f8e4e3ce29b57b6a774f2464b6 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -3,11 +3,7 @@ import math from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter from log.log import Log, AutosubmitCritical -import os -from autosubmit.job.job_package_persistence import JobPackagePersistence -from autosubmitconfigparser.config.basicconfig import BasicConfig -from typing import Dict -from bscearth.utils.date import date2str, previous_day, chunk_end_date, chunk_start_date, subs_dates +from bscearth.utils.date import date2str, chunk_end_date, chunk_start_date, subs_dates # Copyright 2017-2020 Earth Sciences Department, BSC-CNS diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index d0b16352376705045d6b499e9a077dd1b0ddf509..ce590465a0788fd78d984890712da0e60565f37b 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -279,7 +279,7 @@ class SlurmHeader(object): {kwargs["tasks"]} {kwargs["exclusive"]} {kwargs["custom_directives"]} - +{kwargs.get("reservation","#")} # """ else: diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index e474ee3a7832eb7a252f3d05fbde38836fe869bf..c8f520258dcc18c3ad9ed6f42f1fd3bf22b794a7 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -455,7 +455,7 @@ class PJMPlatform(ParamikoPlatform): {kwargs["tasks"]} {kwargs["exclusive"]} {kwargs["custom_directives"]} - + #PJM -g {kwargs["project"]} #PJM -o {kwargs["name"]}.out #PJM -e {kwargs["name"]}.err diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 438078118503dd8ae8a5a1e9da93675b447b217b..7a55bdd1764014fa798f3f7898d7d78389b1ea91 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -24,6 +24,7 @@ def processed(fn): return process return wrapper + class Platform(object): """ Class to manage the connections to the different platforms. @@ -47,7 +48,7 @@ class Platform(object): self._default_queue = None self._partition = None self.ec_queue = "hpc" - self.processors_per_node = "1" + self.processors_per_node = None self.scratch_free_space = None self.custom_directives = None self._host = '' diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index d40a985d1b2fc52907b8c189dff055836142aa65..7ff5552db84966a83e7ce97260abe89d49076748 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -455,7 +455,8 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): while fail_count <= job_retrials and not completed: current = {1} current.start() - os.system("echo "+str(int(time.time()))+" > "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Start/submit running + timer = int(time.time()) + os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed current.join({3}) total_steps = total_steps + 1 """).format(jobs_list, thread,self.retrials,str(self.wallclock_by_level),'\n'.ljust(13)) @@ -467,7 +468,8 @@ class PythonVerticalWrapperBuilder(PythonWrapperBuilder): failed_filename = {0}[i].replace('.cmd', '_FAILED') failed_path = os.path.join(os.getcwd(), failed_filename) failed_wrapper = os.path.join(os.getcwd(), wrapper_id) - os.system("echo "+str(int(time.time()))+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed + timer = int(time.time()) + os.system("echo "+str(timer)+" >> "+scripts[i][:-4]+"_STAT_"+str(fail_count)) #Completed if os.path.exists(completed_path): completed = True print(datetime.now(), "The job ", current.template," has been COMPLETED") diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 4b728dd6bf679ff89bbd8efb6d4d8bb777e51586..b71cd503df9f943ce810fac43d95915c28025e3c 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -31,9 +31,15 @@ class WrapperFactory(object): self.exception = "This type of wrapper is not supported for this platform" def get_wrapper(self, wrapper_builder, **kwargs): - wrapper_data = kwargs['wrapper_data'] + wrapper_data = kwargs['wrapper_data'] # this refers to the object with all parameters init wrapper_data.wallclock = kwargs['wallclock'] if wrapper_data.het.get("HETSIZE",0) <= 1: + if not str(kwargs['num_processors_value']).isdigit(): + kwargs['num_processors_value'] = 1 + if str(wrapper_data.nodes).isdigit() and int(wrapper_data.nodes) > 1 and int(kwargs['num_processors_value']) <= 1: + kwargs['num_processors'] = "#" + else: + kwargs['num_processors'] = self.processors(kwargs['num_processors_value']) kwargs['allocated_nodes'] = self.allocated_nodes() kwargs['dependency'] = self.dependency(kwargs['dependency']) kwargs['partition'] = self.partition(wrapper_data.partition) @@ -43,14 +49,8 @@ class WrapperFactory(object): kwargs["custom_directives"] = self.custom_directives(wrapper_data.custom_directives) kwargs['queue'] = self.queue(wrapper_data.queue) kwargs['threads'] = self.threads(wrapper_data.threads) - if str(kwargs['num_processors']).isdigit(): - kwargs['num_processors_value'] = int(wrapper_data.processors) - else: - kwargs['num_processors_value'] = 1 - if str(wrapper_data.nodes).isdigit() and int(wrapper_data.nodes) > 1 and kwargs['num_processors'] == '1': - kwargs['num_processors'] = "#" - else: - kwargs['num_processors'] = self.processors(wrapper_data.processors) + kwargs['reservation'] = self.reservation(wrapper_data.reservation) + kwargs["executable"] = wrapper_data.executable kwargs['header_directive'] = self.header_directives(**kwargs) @@ -89,6 +89,9 @@ class WrapperFactory(object): def allocated_nodes(self): return '' + def reservation(self, reservation): + return '#' if not reservation else self.reservation_directive(reservation) + def dependency(self, dependency): return '#' if dependency is None else self.dependency_directive(dependency) def queue(self, queue): @@ -118,6 +121,8 @@ class WrapperFactory(object): return '\n'.join(str(s) for s in custom_directives) return "" + def reservation_directive(self, reservation): + return '#' def dependency_directive(self, dependency): raise NotImplemented(self.exception) def queue_directive(self, queue): @@ -162,6 +167,8 @@ class SlurmWrapperFactory(WrapperFactory): def allocated_nodes(self): return self.platform.allocated_nodes() + def reservation_directive(self, reservation): + return "#SBATCH --reservation={0}".format(reservation) def dependency_directive(self, dependency): return '#SBATCH --dependency=afterok:{0}'.format(dependency) def queue_directive(self, queue): @@ -207,6 +214,8 @@ class PJMWrapperFactory(WrapperFactory): def allocated_nodes(self): return self.platform.allocated_nodes() + def reservation_directive(self, reservation): + return "#" # Reservation directive doesn't exist in PJM, they're handled directly by admins def queue_directive(self, queue): return '#PJM --qos={0}'.format(queue) diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index e12aa8eb6471d66bc70132e96c9e22e60417bd98..e70f1087225bd228027490c577a692f37dcd4e34 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -64,7 +64,7 @@ class TestJobPackage(TestCase): self._wrapper_factory.as_conf = self.as_conf self.jobs[0].wallclock = "00:00" - self.jobs[0].threads = "1" + self.jobs[0]._threads = "1" self.jobs[0].tasks = "1" self.jobs[0].exclusive = True self.jobs[0].queue = "debug" @@ -75,8 +75,8 @@ class TestJobPackage(TestCase): self.jobs[0]._platform = self.platform self.jobs[0].retrials = 0 self.jobs[1].wallclock = "00:00" - self.jobs[1].threads = "" - self.jobs[1].tasks = "" + self.jobs[1]._threads = "1" + self.jobs[1].tasks = "1" self.jobs[1].exclusive = True self.jobs[1].queue = "debug2" self.jobs[1].partition = "debug2" @@ -131,7 +131,7 @@ class TestJobPackage(TestCase): self.assertEqual(self.job_package_wrapper.inner_retrials, 0) self.assertEqual(self.job_package_wrapper.queue, "debug") self.assertEqual(self.job_package_wrapper.partition, "debug") - self.assertEqual(self.job_package_wrapper.threads, "1") + self.assertEqual(self.job_package_wrapper._threads, "1") self.assertEqual(self.job_package_wrapper.tasks, "1") options_slurm = { @@ -148,7 +148,7 @@ class TestJobPackage(TestCase): self.assertEqual(self.job_package_wrapper.inner_retrials, 30) self.assertEqual(self.job_package_wrapper.queue, "bsc32") self.assertEqual(self.job_package_wrapper.partition, "bsc32") - self.assertEqual(self.job_package_wrapper.threads, "30") + self.assertEqual(self.job_package_wrapper._threads, "30") self.assertEqual(self.job_package_wrapper.tasks, "40") self.assertEqual(self.job_package_wrapper.custom_directives, ['#SBATCH --mem=1000'])