diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index c2a54ec069de42c249e87d1188919190fadc4cb5..ffc4bcdc9c48c4274b42b08823dc0fceaeea6187 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1854,7 +1854,7 @@ class Autosubmit: job_list.packages_dict[package_name] = [] job_list.packages_dict[package_name].append(job_list.get_job_by_name(job_name)) # This function, checks the stored STATUS of jobs inside wrappers. Since "wrapper status" is a memory variable. - job_list = Autosubmit.check_wrapper_stored_status(job_list, as_conf) + job_list = Autosubmit.check_wrapper_stored_status(as_conf, job_list) except Exception as e: raise AutosubmitCritical( "Autosubmit failed while processing job packages. This might be due to a change in your experiment configuration files after 'autosubmit create' was performed.", @@ -4404,7 +4404,7 @@ class Autosubmit: as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) # Get original configuration - as_conf.check_conf_files(False, only_experiment_data=True, no_log=True) + as_conf.check_conf_files(running_time=False, only_experiment_data=True, no_log=True) # Getting output type provided by the user in config, 'pdf' as default try: if not Autosubmit._copy_code(as_conf, expid, as_conf.experiment_data.get("PROJECT",{}).get("PROJECT_TYPE","none"), False): @@ -4412,7 +4412,7 @@ class Autosubmit: except: raise AutosubmitCritical("Error obtaining the project data, check the parameters related to PROJECT and GIT/SVN or LOCAL sections", code=7014) # Update configuration with the new config in the dist ( if any ) - as_conf.check_conf_files(True,force_load=True, only_experiment_data=False, no_log=False) + as_conf.check_conf_files(running_time=True,force_load=True, only_experiment_data=False, no_log=False) output_type = as_conf.get_output_type() if not os.path.exists(os.path.join(exp_path, "pkl")): diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 070062c1b43a36344796fe192fe207f465213b51..151a1c2ad4e3a9dc51953d469de81c12e13d4ffa 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -157,8 +157,9 @@ class Job(object): self.start_time = None self.edge_info = dict() self.partition = "" - - + self.total_jobs = None + self.max_waiting_jobs = None + self.exclusive = "" def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -995,8 +996,13 @@ class Job(object): return parameters def update_platform_associated_parameters(self,as_conf, parameters, job_platform, chunk): + self.executable = str(as_conf.jobs_data[self.section].get("EXECUTABLE", as_conf.platforms_data.get(job_platform.name,{}).get("EXECUTABLE",""))) + self.total_jobs = int(as_conf.jobs_data[self.section].get("TOTALJOBS", job_platform.total_jobs)) + self.max_waiting_jobs = int(as_conf.jobs_data[self.section].get("MAXWAITINGJOBS", job_platform.max_waiting_jobs)) + self.queue = self.queue self.processors = str(as_conf.jobs_data[self.section].get("PROCESSORS",as_conf.platforms_data.get(job_platform.name,{}).get("PROCESSORS","1"))) + self.exclusive = str(as_conf.jobs_data[self.section].get("EXCLUSIVE",as_conf.platforms_data.get(job_platform.name,{}).get("EXCLUSIVE",False))) self.threads = str(as_conf.jobs_data[self.section].get("THREADS",as_conf.platforms_data.get(job_platform.name,{}).get("THREADS","1"))) self.tasks = str(as_conf.jobs_data[self.section].get("TASKS",as_conf.platforms_data.get(job_platform.name,{}).get("TASKS","1"))) self.nodes = str(as_conf.jobs_data[self.section].get("NODES",as_conf.platforms_data.get(job_platform.name,{}).get("NODES",""))) diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index c007d220c6464f7637a8af6716ee6b048c775c7c..dcfaae136808a732e4174110b618cf580ded4804 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -378,7 +378,6 @@ class DicJobs: job.type = Type.PYTHON2 elif job_type == 'r': job.type = Type.R - job.executable = str(parameters[section].get( "EXECUTABLE", "")) hpcarch = self.experiment_data.get("DEFAULT",{}) hpcarch = hpcarch.get("HPCARCH","") job.platform_name = str(parameters[section].get("PLATFORM", hpcarch)).upper() @@ -387,6 +386,7 @@ class DicJobs: job.file = str(parameters[section].get( "FILE", "")) job.additional_files = parameters[section].get( "ADDITIONAL_FILES", []) + job.executable = str(parameters[section].get("EXECUTABLE", self.experiment_data["PLATFORMS"].get(job.platform_name,{}).get("EXECUTABLE",""))) job.queue = str(parameters[section].get( "QUEUE", "")) job.ec_queue = str(parameters[section].get("EC_QUEUE", "")) if job.ec_queue == "" and job.platform_name != "LOCAL": diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index e52f1634dab7da5ad985229471e1cdd5d8dce202..d51dd59ddb0bc9785c9784d1cfebd9eccf288fdb 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -42,19 +42,8 @@ class JobPackager(object): :type jobs_list: JobList object. """ - def __init__(self, as_config, platform, jobs_list, hold=False): - self.current_wrapper_section = "WRAPPERS" - self._as_config = as_config - self._platform = platform - self._jobs_list = jobs_list - self.hold = hold - # These are defined in the [wrapper] section of autosubmit_,conf - self.wrapper_type = dict() - self.wrapper_policy = dict() - self.wrapper_method = dict() - self.jobs_in_wrapper = dict() - self.extensible_wallclock = dict() - self.wrapper_info = list() + def calculate_job_limits(self,platform,job=None): + jobs_list = self._jobs_list # Submitted + Queuing Jobs for specific Platform queuing_jobs = jobs_list.get_queuing(platform) # We now consider the running jobs count @@ -77,15 +66,38 @@ class JobPackager(object): waiting_jobs = submitted_jobs_len + queuing_jobs_len # Calculate available space in Platform Queue - self._max_wait_jobs_to_submit = platform.max_waiting_jobs - waiting_jobs + if job is not None and job.max_waiting_jobs != platform.max_waiting_jobs is not None: + self._max_wait_jobs_to_submit = job.max_waiting_jobs - waiting_jobs + else: + self._max_wait_jobs_to_submit = platform.max_waiting_jobs - waiting_jobs # .total_jobs is defined in each section of platforms_.yml, if not from there, it comes form autosubmit_.yml # .total_jobs Maximum number of jobs at the same time - self._max_jobs_to_submit = platform.total_jobs - queuing_jobs_len + if job is not None and job.total_jobs != platform.total_jobs: + self._max_jobs_to_submit = job.total_jobs - queuing_jobs_len + else: + self._max_jobs_to_submit = platform.total_jobs - queuing_jobs_len # Subtracting running jobs self._max_jobs_to_submit = self._max_jobs_to_submit - running_jobs_len self._max_jobs_to_submit = self._max_jobs_to_submit if self._max_jobs_to_submit > 0 else 0 self.max_jobs = min(self._max_wait_jobs_to_submit,self._max_jobs_to_submit) + def __init__(self, as_config, platform, jobs_list, hold=False): + self.current_wrapper_section = "WRAPPERS" + self._as_config = as_config + self._platform = platform + self._jobs_list = jobs_list + self.hold = hold + # These are defined in the [wrapper] section of autosubmit_,conf + self.wrapper_type = dict() + self.wrapper_policy = dict() + self.wrapper_method = dict() + self.jobs_in_wrapper = dict() + self.extensible_wallclock = dict() + self.wrapper_info = list() + self.calculate_job_limits(platform) + self.special_variables = dict() + + #todo add default values #Wrapper building starts here @@ -144,7 +156,13 @@ class JobPackager(object): highest_completed.append(job) for job in highest_completed: job.distance_weight = job.distance_weight - 1 - + def _special_variables(self,job): + special_variables = dict() + if job.section not in self.special_variables: + special_variables[job.section] = dict() + if job.total_jobs != self._platform.total_jobs: + special_variables[job.section]["TOTAL_JOBS"] = job + self.special_variables.update(special_variables) def build_packages(self): # type: () -> List[JobPackageBase] """ @@ -194,10 +212,27 @@ class JobPackager(object): if len(jobs_ready) == 0: # If there are no jobs ready, result is tuple of empty return packages_to_submit - if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): - # If there is no more space in platform, result is tuple of empty - Log.debug("No more space in platform {0} for jobs {1}".format(self._platform.name, [job.name for job in jobs_ready])) - return packages_to_submit + #check if there are jobs listed on calculate_job_limits + for job in jobs_ready: + self._special_variables(job) + if len(self.special_variables) > 0: + for section in self.special_variables: + if "TOTAL_JOBS" in self.special_variables[section]: + self.calculate_job_limits(self._platform,self.special_variables[section]["TOTAL_JOBS"]) + if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): + # If there is no more space in platform, result is tuple of empty + Log.debug("No more space in platform {0} for jobs {1}".format(self._platform.name, + [job.name for job in jobs_ready])) + return packages_to_submit + self.calculate_job_limits(self._platform) + + else: + self.calculate_job_limits(self._platform) + if not (self._max_wait_jobs_to_submit > 0 and self._max_jobs_to_submit > 0): + # If there is no more space in platform, result is tuple of empty + Log.debug("No more space in platform {0} for jobs {1}".format(self._platform.name, [job.name for job in jobs_ready])) + return packages_to_submit + # Sort by 6 first digits of date available_sorted = sorted( diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index ac60de16941141832ff37c68679e5adc5314dfdf..19bdcb02e703e331e713c322aa7f616f1239c77c 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -123,6 +123,7 @@ class JobPackageBase(object): def _create_common_script(self,filename=""): pass + def submit(self, configuration, parameters,only_generate=False,hold=False): """ :param hold: @@ -161,6 +162,8 @@ class JobPackageBase(object): else: Log.result("Script {0} OK",job.name) job.update_parameters(configuration, parameters) + # Looking for special variables + # looking for directives on jobs self._custom_directives = self._custom_directives | set(job.custom_directives) else: @@ -349,6 +352,7 @@ class JobPackageThread(JobPackageBase): def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread',configuration=None,wrapper_section="WRAPPERS", wrapper_info= {}): super(JobPackageThread, self).__init__(jobs) + # to be pass as "configuration" if len(wrapper_info) > 0 : self.wrapper_type = wrapper_info[0] self.wrapper_policy = wrapper_info[1] @@ -361,7 +365,6 @@ class JobPackageThread(JobPackageBase): self.wrapper_method = None self.jobs_in_wrapper = None self.extensible_wallclock = 0 - self._job_scripts = {} # Seems like this one is not used at all in the class self._job_dependency = dependency @@ -390,10 +393,46 @@ class JobPackageThread(JobPackageBase): self.partition = wr_partition else: self.partition = jobs[0].partition + wr_exclusive = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXCLUSIVE",False) + if wr_exclusive: + self.exclusive = wr_exclusive + else: + self.exclusive = jobs[0].exclusive + wr_custom_directives = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("CUSTOM_DIRECTIVES",[]) + if len(str(wr_custom_directives)) > 0: + self.custom_directives = wr_custom_directives + else: + self.custom_directives = jobs[0].custom_directives + wr_executable = configuration.experiment_data["WRAPPERS"].get(self.current_wrapper_section,{}).get("EXECUTABLE",None) + if wr_executable is None: + self.executable = wr_executable + else: + self.executable = jobs[0].executable else: self.queue = jobs[0].queue self.partition = jobs[0].partition self.method = method + self._wrapper_factory.as_conf = configuration + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"] = configuration.experiment_data["WRAPPERS"][self.current_wrapper_section] + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["TYPE"] = self.wrapper_type + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["WRAPPER_POLICY"] = self.wrapper_policy + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["INNER_RETRIALS"] = self.inner_retrials + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["EXTEND_WALLCLOCK"] = self.extensible_wallclock + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["METHOD"] = self.wrapper_method + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["EXPORT"] = self.export + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["QUEUE"] = self.queue + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["PARTITION"] = self.partition + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["EXCLUSIVE"] = self.exclusive + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["EXECUTABLE"] = self.executable + self._wrapper_factory.as_conf.experiment_data["CURRENT_WRAPPER"]["CUSTOM_DIRECTIVES"] = self.custom_directives + + pass + + + + + + #pipeline @property def name(self): diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 2c014008d05a12ef56d2f4722ab90dfffbfc2664..99c44aa7af7b2074e1fd40261054d4e03170c4c1 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -308,8 +308,7 @@ class EcPlatform(ParamikoPlatform): return self._ssh_output def get_ssh_output_err(self): return self._ssh_output_err - @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, expid, dependency, rootdir, directives, partition=""): + def wrapper_header(self,**kwargs): return """\ #!/bin/bash ############################################################################### diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 9e55da58c14abb590c92060514b09132483d18f8..dc53de97f97fa9493af2e0de3949c93bcea17c93 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -67,6 +67,19 @@ class SlurmHeader(object): if job.parameters['CURRENT_PROJ'] != '': return "SBATCH -A {0}".format(job.parameters['CURRENT_PROJ']) return "" + def get_exclusive_directive(self, job): + """ + Returns account directive for the specified job + + :param job: job to create account directive for + :type job: Job + :return: account directive + :rtype: str + """ + # There is no account, so directive is empty + if job.exclusive: + return "SBATCH --exclusive" + return "" def get_nodes_directive(self, job): """ diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index 634ff620c8125a35cfc03c1a6a1e3c8317353cea..a03ec5dee262ed14507dd98361453148c61c3306 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -118,8 +118,7 @@ class LsfPlatform(ParamikoPlatform): export += " ; " return export + self._submit_cmd + job_script - @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, partition=""): + def wrapper_header(self,**kwargs): return """\ #!/usr/bin/env python3 ############################################################################### diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index b0b123f8d0238ca3278024f6d5c84485898faed4..07a637b7e7e9dc39fa5e42a087108f008287578b 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -77,6 +77,8 @@ class ParamikoSubmitter(Submitter): :rtype: dict """ exp_data = asconf.experiment_data + config = BasicConfig().props() + config.update(exp_data) raise_message="" platforms_used = list() hpcarch = asconf.get_platform() @@ -94,7 +96,7 @@ class ParamikoSubmitter(Submitter): platforms = dict() # Build Local Platform Object - local_platform = LocalPlatform(asconf.expid, 'local', BasicConfig) + local_platform = LocalPlatform(asconf.expid, 'local', config) local_platform.max_wallclock = asconf.get_max_wallclock() local_platform.max_processors = asconf.get_max_processors() local_platform.max_waiting_jobs = asconf.get_max_waiting_jobs() @@ -118,28 +120,29 @@ class ParamikoSubmitter(Submitter): platform_type = platform_data[section].get('TYPE', '').lower() platform_version = platform_data[section].get('VERSION', '') + try: if platform_type == 'pbs': remote_platform = PBSPlatform( - asconf.expid, section, BasicConfig, platform_version) + asconf.expid, section, config, platform_version) elif platform_type == 'sge': remote_platform = SgePlatform( - asconf.expid, section, BasicConfig) + asconf.expid, section, config) elif platform_type == 'ps': remote_platform = PsPlatform( - asconf.expid, section, BasicConfig) + asconf.expid, section, config) elif platform_type == 'lsf': remote_platform = LsfPlatform( - asconf.expid, section, BasicConfig) + asconf.expid, section, config) elif platform_type == 'ecaccess': remote_platform = EcPlatform( - asconf.expid, section, BasicConfig, platform_version) + asconf.expid, section, config, platform_version) elif platform_type == 'slurm': remote_platform = SlurmPlatform( - asconf.expid, section, BasicConfig) + asconf.expid, section, config) elif platform_type == 'pjm': remote_platform = PJMPlatform( - asconf.expid, section, BasicConfig) + asconf.expid, section, config) else: raise Exception( "Queue type not specified on platform {0}".format(section)) diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 57bcb7de3ba34f7c642122105360cd5db3fae70e..3a81d986484f959548b15c36661198504b06adfd 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -61,10 +61,10 @@ class PJMPlatform(ParamikoPlatform): self._allow_wrappers = True # NOT SURE IF WE NEED WRAPPERS self.update_cmds() self.config = config - exp_id_path = os.path.join(config.LOCAL_ROOT_DIR, self.expid) + exp_id_path = os.path.join(self.config.get("LOCAL_ROOT_DIR"), self.expid) tmp_path = os.path.join(exp_id_path, "tmp") self._submit_script_path = os.path.join( - tmp_path, config.LOCAL_ASLOG_DIR, "submit_" + self.name + ".sh") + tmp_path, self.config.get("LOCAL_ASLOG_DIR"), "submit_" + self.name + ".sh") self._submit_script_file = open(self._submit_script_path, 'wb').close() def submit_error(self,output): @@ -164,7 +164,7 @@ class PJMPlatform(ParamikoPlatform): def get_submit_script(self): self._submit_script_file.close() os.chmod(self._submit_script_path, 0o750) - return os.path.join(self.config.LOCAL_ASLOG_DIR, os.path.basename(self._submit_script_path)) + return os.path.join(self.config.get("LOCAL_ASLOG_DIR"), os.path.basename(self._submit_script_path)) def submit_job(self, job, script_name, hold=False, export="none"): """ @@ -412,8 +412,7 @@ class PJMPlatform(ParamikoPlatform): return reason[0] return reason - @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads, method="asthreads", partition=""): + def wrapper_header(self,**kwargs): if method == 'srun': language = "#!/bin/bash" return \ diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index edcb3c001ae111459551bffdd0e81ee609ff8c95..d4a064caae78b4b7576b1be7e95e64820e4a3853 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -24,7 +24,7 @@ class Platform(object): self.name = name # type: str self.config = config self.tmp_path = os.path.join( - self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) + self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR")) self._serial_platform = None self._serial_queue = None self._default_queue = None @@ -61,7 +61,16 @@ class Platform(object): self._submit_cmd = None self._checkhost_cmd = None self.cancel_cmd = None - + def get_exclusive_directive(self, job): + """ + Returns exclusive directive for the specified job + :param job: job to create exclusive directive for + :type job: Job + :return: exclusive directive + :rtype: str + """ + # only implemented for slurm + return "" def get_multiple_jobids(self,job_list,valid_packages_to_submit,failed_packages,error_message="",hold=False): return False,valid_packages_to_submit #raise NotImplementedError @@ -440,7 +449,7 @@ class Platform(object): """ filename = job_name + '_STAT' stat_local_path = os.path.join( - self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) + self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR"), filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) if self.check_file_exists(filename): @@ -480,7 +489,7 @@ class Platform(object): """ filename = job_name stat_local_path = os.path.join( - self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) + self.config.get("LOCAL_ROOT_DIR"), self.expid, self.config.get("LOCAL_TMP_DIR"), filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) if self.check_file_exists(filename): @@ -500,7 +509,7 @@ class Platform(object): """ if self.type == "local": path = os.path.join( - self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid)) + self.root_dir, self.config.get("LOCAL_TMP_DIR"), 'LOG_{0}'.format(self.expid)) else: path = os.path.join(self.root_dir, 'LOG_{0}'.format(self.expid)) return path diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 104f5c5ec078f4c3a1b835717048718692eca77a..2ea888b543c6b8fb88ae69baf8475855e2516083 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -67,10 +67,10 @@ class SlurmPlatform(ParamikoPlatform): self._allow_wrappers = True self.update_cmds() self.config = config - exp_id_path = os.path.join(config.LOCAL_ROOT_DIR, self.expid) + exp_id_path = os.path.join(self.config.get("LOCAL_ROOT_DIR"), self.expid) tmp_path = os.path.join(exp_id_path, "tmp") self._submit_script_path = os.path.join( - tmp_path, config.LOCAL_ASLOG_DIR, "submit_" + self.name + ".sh") + tmp_path, self.config.get("LOCAL_ASLOG_DIR"), "submit_" + self.name + ".sh") self._submit_script_file = open(self._submit_script_path, 'wb').close() def process_batch_ready_jobs(self,valid_packages_to_submit,failed_packages,error_message="",hold=False): @@ -190,7 +190,7 @@ class SlurmPlatform(ParamikoPlatform): def get_submit_script(self): self._submit_script_file.close() os.chmod(self._submit_script_path, 0o750) - return os.path.join(self.config.LOCAL_ASLOG_DIR, os.path.basename(self._submit_script_path)) + return os.path.join(self.config.get("LOCAL_ASLOG_DIR"), os.path.basename(self._submit_script_path)) def submit_job(self, job, script_name, hold=False, export="none"): """ @@ -606,55 +606,38 @@ class SlurmPlatform(ParamikoPlatform): job.new_status = Status.QUEUING # If it was HELD and was released, it should be QUEUING next. else: job.new_status = Status.HELD - @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads, method="asthreads", partition=""): - if method == 'srun': - language = "#!/bin/bash" - return \ - language + """ + def wrapper_header(self,**kwargs): + wr_header = f""" ############################################################################### -# {0} +# {kwargs["name"].split("_")[0]+"_Wrapper"} ############################################################################### # -#SBATCH -J {0} -{1} -{8} -#SBATCH -A {2} -#SBATCH --output={0}.out -#SBATCH --error={0}.err -#SBATCH -t {3}:00 -#SBATCH -n {4} -#SBATCH --cpus-per-task={7} -{5} -{6} +#SBATCH -J {kwargs["name"]} +{kwargs["queue"]} +{kwargs["partition"]} +{kwargs["dependency"]} +#SBATCH -A {kwargs["project"]} +#SBATCH --output={kwargs["name"]}.out +#SBATCH --error={kwargs["name"]}.err +#SBATCH -t {kwargs["wallclock"]}:00 +#SBATCH -n {kwargs["num_processors"]} +#SBATCH --cpus-per-task={kwargs["threads"]} +{kwargs["exclusive"]} +{kwargs["custom_directives"]} # ############################################################################### - """.format(filename, queue, project, wallclock, num_procs, dependency, - '\n'.ljust(13).join(str(s) for s in directives), threads,partition) +""" + if kwargs["method"] == 'srun': + language = kwargs["executable"] + if language is None or len(language) == 0: + language = "#!/bin/bash" + return language + wr_header else: - language = "#!/usr/bin/env python3" - return \ - language + """ -############################################################################### -# {0} -############################################################################### -# -#SBATCH -J {0} -{1} -{8} -#SBATCH -A {2} -#SBATCH --output={0}.out -#SBATCH --error={0}.err -#SBATCH -t {3}:00 -#SBATCH --cpus-per-task={7} -#SBATCH -n {4} -{5} -{6} -# -############################################################################### - """.format(filename, queue, project, wallclock, num_procs, dependency, - '\n'.ljust(13).join(str(s) for s in directives), threads,partition) + language = kwargs["executable"] + if language is None or len(language) == 0: + language = "#!/usr/bin/env python3" + return language + wr_header @staticmethod def allocated_nodes(): diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index a3252d6f1e223e2ea22990d9e7ffc29c3e10116c..4c344787291bb846155f818a5e55dfe1bb973845 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -26,15 +26,20 @@ from autosubmitconfigparser.config.configcommon import AutosubmitConfig class WrapperFactory(object): def __init__(self, platform): + self.as_conf = None self.platform = platform self.wrapper_director = WrapperDirector() self.exception = "This type of wrapper is not supported for this platform" def get_wrapper(self, wrapper_builder, **kwargs): + wrapper_data = self.as_conf.experiment_data["CURRENT_WRAPPER"] kwargs['allocated_nodes'] = self.allocated_nodes() kwargs['dependency'] = self.dependency(kwargs['dependency']) kwargs['queue'] = self.queue(kwargs['queue']) - kwargs['partition'] = self.partition(kwargs['partition']) + kwargs['partition'] = self.partition(wrapper_data['PARTITION']) + kwargs["exclusive"] = self.exclusive(wrapper_data['EXCLUSIVE']) + kwargs["custom_directives"] = self.custom_directives(wrapper_data["CUSTOM_DIRECTIVES"]) + kwargs["executable"] = wrapper_data["EXECUTABLE"] kwargs['header_directive'] = self.header_directives(**kwargs) builder = wrapper_builder(**kwargs) return self.wrapper_director.construct(builder) @@ -59,18 +64,35 @@ class WrapperFactory(object): def dependency(self, dependency): return '#' if dependency is None else self.dependency_directive(dependency) - def queue(self, queue): return '#' if not queue else self.queue_directive(queue) def partition(self, partition): return '#' if not partition else self.partition_directive(partition) + def exclusive(self, exclusive): + return '#' if not exclusive or str(exclusive).lower() == "false" else self.exclusive_directive(exclusive) + def custom_directives(self, custom_directives): + return '#' if not custom_directives else self.get_custom_directives(custom_directives) + def get_custom_directives(self, custom_directives): + """ + Returns custom directives for the specified job + :param job: Job object + :return: String with custom directives + """ + # There is no custom directives, so directive is empty + if custom_directives != '': + return '\n'.join(str(s) for s in custom_directives) + return "" + def dependency_directive(self, dependency): pass - def queue_directive(self, queue): pass def partition_directive(self, partition): pass + def exclusive_directive(self, exclusive): + pass + + class SlurmWrapperFactory(WrapperFactory): @@ -95,8 +117,7 @@ class SlurmWrapperFactory(WrapperFactory): return PythonVerticalHorizontalWrapperBuilder(**kwargs) def header_directives(self, **kwargs): - return self.platform.wrapper_header(kwargs['name'], kwargs['queue'], kwargs['project'], kwargs['wallclock'], - kwargs['num_processors'], kwargs['dependency'], kwargs['directives'],kwargs['threads'],kwargs['method'],kwargs['partition']) + return self.platform.wrapper_header(**kwargs) def allocated_nodes(self): return self.platform.allocated_nodes() @@ -109,6 +130,8 @@ class SlurmWrapperFactory(WrapperFactory): def partition_directive(self, partition): return '#SBATCH --partition={0}'.format(partition) + def exclusive_directive(self, exclusive): + return '#SBATCH --exclusive' class LSFWrapperFactory(WrapperFactory): diff --git a/requeriments.txt b/requeriments.txt index cec2b668df01df890db195303da958b6a4df05e9..f0d85648f06640873db3927a12c1b035ea8b0b24 100644 --- a/requeriments.txt +++ b/requeriments.txt @@ -1,4 +1,4 @@ -autosubmitconfigparser==1.0.24 +autosubmitconfigparser==1.0.25 paramiko>=2.9.2 bcrypt>=3.2 PyNaCl>=1.5.0 diff --git a/setup.py b/setup.py index 16579c1989ed87cb8bb019a7c6101f108569bdc8..701ae2e4219c34bef56d033a7925c67603ffb446 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ setup( url='http://www.bsc.es/projects/earthscience/autosubmit/', download_url='https://earth.bsc.es/wiki/doku.php?id=tools:autosubmit', keywords=['climate', 'weather', 'workflow', 'HPC'], - install_requires=['autosubmitconfigparser==1.0.24','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','argparse>=1.4.0','python-dateutil>=2.8.2','matplotlib<3.6','numpy<1.22','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','ruamel.yaml','pythondialog','pytest','nose','coverage','PyNaCl>=1.4.0','Pygments'], + install_requires=['autosubmitconfigparser==1.0.25','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','argparse>=1.4.0','python-dateutil>=2.8.2','matplotlib<3.6','numpy<1.22','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','ruamel.yaml','pythondialog','pytest','nose','coverage','PyNaCl>=1.4.0','Pygments'], classifiers=[ "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.9", diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 40b7dadff46210b9bc2e13f095118432fbeffba9..403b0e97962b150a4a698aa4ce61c06b73c52986 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -553,11 +553,17 @@ class TestDicJobs(TestCase): self.dictionary.build_job.assert_called_once_with(mock_section.name, priority, None, None, None, Type.BASH, {},splits) self.job_list.graph.add_node.assert_called_once_with(mock_section.name) - +import inspect class FakeBasicConfig: def __init__(self): pass - + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr DB_DIR = '/dummy/db/dir' DB_FILE = '/dummy/db/file' DB_PATH = '/dummy/db/path' diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 31702a9c74e1c264e55dfbee9ce373c87915b50f..b3eb35532de4e1d20c41186e455080c2038f93ea 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -34,7 +34,7 @@ class TestJob(TestCase): def test_when_the_job_has_more_than_one_processor_returns_the_parallel_platform(self): - platform = Platform(self.experiment_id, 'parallel-platform', FakeBasicConfig) + platform = Platform(self.experiment_id, 'parallel-platform', FakeBasicConfig().props()) platform.serial_platform = 'serial-platform' self.job._platform = platform @@ -45,7 +45,7 @@ class TestJob(TestCase): self.assertEqual(platform, returned_platform) def test_when_the_job_has_only_one_processor_returns_the_serial_platform(self): - platform = Platform(self.experiment_id, 'parallel-platform', FakeBasicConfig) + platform = Platform(self.experiment_id, 'parallel-platform', FakeBasicConfig().props()) platform.serial_platform = 'serial-platform' self.job._platform = platform @@ -56,7 +56,7 @@ class TestJob(TestCase): self.assertEqual('serial-platform', returned_platform) def test_set_platform(self): - dummy_platform = Platform('whatever', 'rand-name', FakeBasicConfig) + dummy_platform = Platform('whatever', 'rand-name', FakeBasicConfig().props()) self.assertNotEqual(dummy_platform, self.job.platform) self.job.platform = dummy_platform @@ -73,7 +73,7 @@ class TestJob(TestCase): def test_when_the_job_has_not_a_queue_and_some_processors_returns_the_queue_of_the_platform(self): dummy_queue = 'whatever-parallel' - dummy_platform = Platform('whatever', 'rand-name', FakeBasicConfig) + dummy_platform = Platform('whatever', 'rand-name', FakeBasicConfig().props()) dummy_platform.queue = dummy_queue self.job.platform = dummy_platform @@ -88,10 +88,10 @@ class TestJob(TestCase): serial_queue = 'whatever-serial' parallel_queue = 'whatever-parallel' - dummy_serial_platform = Platform('whatever', 'serial', FakeBasicConfig) + dummy_serial_platform = Platform('whatever', 'serial', FakeBasicConfig().props()) dummy_serial_platform.serial_queue = serial_queue - dummy_platform = Platform('whatever', 'parallel', FakeBasicConfig) + dummy_platform = Platform('whatever', 'parallel', FakeBasicConfig().props()) dummy_platform.serial_platform = dummy_serial_platform dummy_platform.queue = parallel_queue dummy_platform.processors_per_node = "1" @@ -312,8 +312,18 @@ class TestJob(TestCase): self.assertEqual('%Y%', parameters['Y']) self.assertEqual('%Y_%', parameters['Y_']) - +import inspect class FakeBasicConfig: + def __init__(self): + pass + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr + #convert this to dict DB_DIR = '/dummy/db/dir' DB_FILE = '/dummy/db/file' DB_PATH = '/dummy/db/path' @@ -325,3 +335,4 @@ class FakeBasicConfig: + diff --git a/test/unit/test_job_graph.py b/test/unit/test_job_graph.py index 100655a31666aaa83547fd2165dc2119403da9bb..336ab732c42dbe1e2df42bd7df2560b88b937081 100644 --- a/test/unit/test_job_graph.py +++ b/test/unit/test_job_graph.py @@ -918,7 +918,6 @@ class TestJobGraph(TestCase): job.split = split return job - class FakeBasicConfig: def __init__(self): pass diff --git a/test/unit/test_job_grouping.py b/test/unit/test_job_grouping.py index 03223a1016b86371907f9acd6ae4aab7222f18e5..288907f548de87e37bb406f6ba45fec0ab387066 100644 --- a/test/unit/test_job_grouping.py +++ b/test/unit/test_job_grouping.py @@ -983,11 +983,17 @@ class TestJobGrouping(TestCase): job.split = split return job - +import inspect class FakeBasicConfig: def __init__(self): pass - + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr DB_DIR = '/dummy/db/dir' DB_FILE = '/dummy/db/file' DB_PATH = '/dummy/db/path' diff --git a/test/unit/test_job_list.py b/test/unit/test_job_list.py index 34f20df8d8b7855b593419c1a7f71f2a6bc678d5..0a3f6b3b414b58da78f50b9bedf830c1f99c28f8 100644 --- a/test/unit/test_job_list.py +++ b/test/unit/test_job_list.py @@ -275,11 +275,17 @@ class TestJobList(TestCase): job.type = randrange(0, 2) return job - +import inspect class FakeBasicConfig: def __init__(self): pass - + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr DB_DIR = '/dummy/db/dir' DB_FILE = '/dummy/db/file' DB_PATH = '/dummy/db/path' diff --git a/test/unit/test_pjm.py b/test/unit/test_pjm.py index d68b8895304fabdb4457f155384be035f2f4431b..60b69180e7a7b460a1210bd04bc82c8f590ea5b3 100644 --- a/test/unit/test_pjm.py +++ b/test/unit/test_pjm.py @@ -13,8 +13,15 @@ import autosubmit.platforms.headers.pjm_header from tempfile import TemporaryDirectory from datetime import datetime from autosubmit.job.job import Job, Status - +import inspect class FakeBasicConfig: + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr DB_DIR = '/dummy/db/dir' DB_FILE = '/dummy/db/file' DB_PATH = '/dummy/db/path' diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index b3eed1a6073e3f6f4ccc24e11c42174e4c6110bf..7735127a44ec9d5c900b89856b804a1308785b4d 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -151,6 +151,8 @@ class TestWrappers(TestCase): def setUp(self): self.experiment_id = 'random-id' + self._wrapper_factory = MagicMock() + self.config = FakeBasicConfig self._platform = MagicMock() self.as_conf = MagicMock() @@ -160,6 +162,7 @@ class TestWrappers(TestCase): self.as_conf.experiment_data["PLATFORMS"] = dict() self.as_conf.experiment_data["WRAPPERS"] = dict() + self.job_list = JobList(self.experiment_id, self.config, YAMLParserFactory(), JobListPersistenceDb('.', '.'),self.as_conf) self.parser_mock = MagicMock(spec='SafeConfigParser') @@ -185,6 +188,8 @@ class TestWrappers(TestCase): 'EXTEND_WALLCLOCK': 0 } self.as_conf.experiment_data["WRAPPERS"]["WRAPPERS"] = options + self.as_conf.experiment_data["WRAPPERS"]["CURRENT_WRAPPER"] = options + self._wrapper_factory.as_conf = self.as_conf self.job_packager = JobPackager( self.as_conf, self._platform, self.job_list) self.job_list._ordered_jobs_by_date_member["WRAPPERS"] = dict() @@ -265,8 +270,7 @@ class TestWrappers(TestCase): package_m2_s2 = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2, d1_m2_6_s2, d1_m2_7_s2, d1_m2_8_s2, d1_m2_9_s2, d1_m2_10_s2] - packages = [JobPackageVertical( - package_m1_s2), JobPackageVertical(package_m2_s2)] + packages = [JobPackageVertical(package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)] # returned_packages = returned_packages[] for i in range(0, len(returned_packages)): @@ -348,7 +352,7 @@ class TestWrappers(TestCase): d1_m2_9_s2, d1_m2_10_s2] packages = [JobPackageVertical( - package_m1_s2), JobPackageVertical(package_m2_s2)] + package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)] for i in range(0, len(returned_packages)): self.assertListEqual(returned_packages[i]._jobs, packages[i]._jobs) @@ -418,7 +422,7 @@ class TestWrappers(TestCase): d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2] packages = [JobPackageVertical( - package_m1_s2), JobPackageVertical(package_m2_s2)] + package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -489,7 +493,7 @@ class TestWrappers(TestCase): d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2] packages = [JobPackageVertical( - package_m1_s2), JobPackageVertical(package_m2_s2)] + package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -557,7 +561,7 @@ class TestWrappers(TestCase): package_m2_s2 = [d1_m2_1_s3] packages = [JobPackageVertical( - package_m1_s2), JobPackageVertical(package_m2_s2)] + package_m1_s2,configuration=self.as_conf), JobPackageVertical(package_m2_s2,configuration=self.as_conf)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -640,7 +644,7 @@ class TestWrappers(TestCase): d1_m2_4_s3] packages = [JobPackageVertical( - package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -717,7 +721,7 @@ class TestWrappers(TestCase): package_m1_s2_s3 = [d1_m1_1_s2, d1_m1_1_s3, d1_m1_2_s2, d1_m1_2_s3, d1_m1_3_s2, d1_m1_3_s3, d1_m1_4_s2, d1_m1_4_s3] - packages = [JobPackageVertical(package_m1_s2_s3)] + packages = [JobPackageVertical(package_m1_s2_s3,configuration=self.as_conf)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -799,7 +803,7 @@ class TestWrappers(TestCase): d1_m2_4_s3] packages = [JobPackageVertical( - package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] #returned_packages = returned_packages[0] # print("test_returned_packages_max_jobs_mixed_wrapper") @@ -889,7 +893,7 @@ class TestWrappers(TestCase): d1_m2_2_s2, d1_m2_2_s3, d1_m2_3_s2] packages = [JobPackageVertical( - package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -969,7 +973,7 @@ class TestWrappers(TestCase): package_m2_s2_s3 = [d1_m2_1_s2, d1_m2_1_s3, d1_m2_2_s2, d1_m2_2_s3] packages = [JobPackageVertical( - package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -1067,7 +1071,7 @@ class TestWrappers(TestCase): package_m2_s2_s3 = [d1_m2_3_s2, d1_m2_3_s3, d1_m2_4_s2, d1_m2_4_s3] packages = [JobPackageVertical( - package_m1_s2_s3), JobPackageVertical(package_m2_s2_s3)] + package_m1_s2_s3,configuration=self.as_conf), JobPackageVertical(package_m2_s2_s3,configuration=self.as_conf)] #returned_packages = returned_packages[0] for i in range(0, len(returned_packages)): @@ -1515,11 +1519,17 @@ class TestWrappers(TestCase): return job - +import inspect class FakeBasicConfig: def __init__(self): pass - + def props(self): + pr = {} + for name in dir(self): + value = getattr(self, name) + if not name.startswith('__') and not inspect.ismethod(value) and not inspect.isfunction(value): + pr[name] = value + return pr DB_DIR = '/dummy/db/dir' DB_FILE = '/dummy/db/file' DB_PATH = '/dummy/db/path'