From c80ef1f10c964b326521b60e264711b1e1b3db73 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 17 Feb 2020 16:03:27 +0100 Subject: [PATCH 1/4] Adapt Wrapper to be able to use THREADS (Testing 0 Priority queue) --- autosubmit/autosubmit.py | 37 ++-- autosubmit/config/config_common.py | 8 + autosubmit/job/job.py | 4 +- autosubmit/job/job_packager.py | 8 +- autosubmit/job/job_packages.py | 18 +- autosubmit/platforms/headers/slurm_header.py | 74 +++---- autosubmit/platforms/saga_platform.py | 2 +- autosubmit/platforms/slurmplatform.py | 10 +- .../platforms/wrappers/wrapper_builder.py | 201 +++++++++++++++++- .../platforms/wrappers/wrapper_factory.py | 11 +- test/unit/test_job.py | 2 +- 11 files changed, 302 insertions(+), 73 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index e5b5ece9e..25248b789 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -733,11 +733,11 @@ class Autosubmit: os.mkdir(tmp_path) os.chmod(tmp_path, 0o775) os.mkdir(os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR)) - os.chmod(os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR), 0o775) + os.chmod(os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR), 0o755) Log.debug("Creating temporal remote directory...") remote_tmp_path = os.path.join(tmp_path,"LOG_"+exp_id) os.mkdir(remote_tmp_path) - os.chmod(remote_tmp_path, 0o775) + os.chmod(remote_tmp_path, 0o755) Log.debug("Creating pkl directory...") @@ -745,9 +745,22 @@ class Autosubmit: Log.debug("Creating plot directory...") os.mkdir(os.path.join(exp_id_path, "plot")) - os.chmod(os.path.join(exp_id_path, "plot"), 0o775) + os.chmod(os.path.join(exp_id_path, "plot"), 0o755) Log.result("Experiment registered successfully") Log.user_warning("Remember to MODIFY the config files!") + try: + Log.debug("Setting the right permissions...") + os.chmod(os.path.join(exp_id_path, "conf"), 0o755) + os.chmod(os.path.join(exp_id_path, "pkl"), 0o755) + os.chmod(os.path.join(exp_id_path, "tmp"), 0o755) + os.chmod(os.path.join(exp_id_path, "plot"), 0o755) + os.chmod(os.path.join(exp_id_path, "conf/autosubmit_" + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/expdef_" + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/jobs_" + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/platforms_" + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/proj_" + str(exp_id) + ".conf"), 0o644) + except: + pass return exp_id @staticmethod @@ -852,7 +865,7 @@ class Autosubmit: Log.debug("Sleep: {0}", safetysleeptime) packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0664) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0644) packages_persistence.reset_table(True) job_list_original = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) @@ -1035,7 +1048,7 @@ class Autosubmit: aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) if not os.path.exists(aslogs_path): os.mkdir(aslogs_path) - os.chmod(aslogs_path,0o775) + os.chmod(aslogs_path,0o755) if not os.path.exists(exp_path): Log.critical("The directory %s is needed and does not exist" % exp_path) Log.warning("Does an experiment with the given id exist?") @@ -1123,7 +1136,7 @@ class Autosubmit: "job_packages_" + expid) if as_conf.get_wrapper_type() != 'none': - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0664) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0644) packages = packages_persistence.load() for (exp_id, package_name, job_name) in packages: if package_name not in job_list.packages_dict: @@ -1344,7 +1357,7 @@ class Autosubmit: if len(valid_packages_to_submit) > 0: jobs_id = platform.submit_Script(hold=hold) if jobs_id is None: - raise BaseException("Exiting AS being unable to get jobID") + raise BaseException("Exiting AS, AS is unable to get jobID this can be due a failure on the platform or a bad parameter on job.conf(check that queue parameter is valid for your current platform(CNS,BSC32,PRACE...)") i = 0 for package in valid_packages_to_submit: for job in package.jobs: @@ -1498,7 +1511,7 @@ class Autosubmit: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) # Permissons - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0664) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0644) #Database modification packages_persistence.reset_table(True) referenced_jobs_to_remove = set() @@ -2551,7 +2564,7 @@ class Autosubmit: year_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, str(year)) if not os.path.exists(year_path): os.mkdir(year_path) - os.chmod(year_path, 0o775) + os.chmod(year_path, 0o755) if compress: compress_type="w:gz" output_filepath = '{0}.tar.gz'.format(expid) @@ -2561,7 +2574,7 @@ class Autosubmit: with tarfile.open(os.path.join(year_path, output_filepath), compress_type) as tar: tar.add(exp_folder, arcname='') tar.close() - os.chmod(os.path.join(year_path,output_filepath), 0o775) + os.chmod(os.path.join(year_path,output_filepath), 0o755) except Exception as e: Log.critical("Can not write tar file: {0}".format(e)) return False @@ -2702,7 +2715,7 @@ class Autosubmit: aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) if not os.path.exists(aslogs_path): os.mkdir(aslogs_path) - os.chmod(aslogs_path,0o775) + os.chmod(aslogs_path,0o755) if not os.path.exists(exp_path): Log.critical("The directory %s is needed and does not exist." % exp_path) Log.warning("Does an experiment with the given id exist?") @@ -3441,7 +3454,7 @@ class Autosubmit: if as_conf.get_wrapper_type() != 'none' and check_wrapper: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0664) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0644) packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index d9e0ec5e7..1be9228b5 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1098,6 +1098,14 @@ class AutosubmitConfig(object): :rtype: int """ return int(self._conf_parser.get_option('wrapper', 'MAX_WRAPPED', self.get_total_jobs())) + def get_wrapper_method(self): + """ + Returns the method of make the wrapper + + :return: method + :rtype: string + """ + return self._conf_parser.get_option('wrapper', 'METHOD', 'ASThread') def get_wrapper_check_time(self): """ Returns time to check the status of jobs in the wrapper diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 482859a0a..414e7dace 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -817,7 +817,7 @@ class Job(object): template_content = template_content.replace("%%", "%") script_name = '{0}.cmd'.format(self.name) open(os.path.join(self._tmp_path, script_name), 'w').write(template_content) - os.chmod(os.path.join(self._tmp_path, script_name), 0o775) + os.chmod(os.path.join(self._tmp_path, script_name), 0o755) return script_name def create_wrapped_script(self, as_conf, wrapper_tag='wrapped'): @@ -831,7 +831,7 @@ class Job(object): template_content = template_content.replace("%%", "%") script_name = '{0}.{1}.cmd'.format(self.name, wrapper_tag) open(os.path.join(self._tmp_path, script_name), 'w').write(template_content) - os.chmod(os.path.join(self._tmp_path, script_name), 0o775) + os.chmod(os.path.join(self._tmp_path, script_name), 0o755) return script_name def check_script(self, as_conf, parameters,show_logs=False): diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 1f0423ced..a996d0c27 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -52,6 +52,7 @@ class JobPackager(object): self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() + self.wrapper_method = self._as_config.get_wrapper_method() # True or False self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() @@ -212,7 +213,7 @@ class JobPackager(object): def _build_horizontal_packages(self, section_list, max_wrapped_jobs, section): packages = [] horizontal_packager = JobPackagerHorizontal(section_list, self._platform.max_processors, max_wrapped_jobs, - self.max_jobs, self._platform.processors_per_node) + self.max_jobs, self._platform.processors_per_node, self.wrapper_method) package_jobs = horizontal_packager.build_horizontal_package() @@ -224,7 +225,7 @@ class JobPackager(object): if machinefile_function == 'COMPONENTS': jobs_resources = horizontal_packager.components_dict jobs_resources['MACHINEFILES'] = machinefile_function - current_package = JobPackageHorizontal(package_jobs, jobs_resources=jobs_resources) + current_package = JobPackageHorizontal(package_jobs, jobs_resources=jobs_resources,method=self.wrapper_method) packages.append(current_package) @@ -520,7 +521,7 @@ class JobPackagerVerticalMixed(JobPackagerVertical): class JobPackagerHorizontal(object): - def __init__(self, job_list, max_processors, max_wrapped_jobs, max_jobs, processors_node): + def __init__(self, job_list, max_processors, max_wrapped_jobs, max_jobs, processors_node, method="ASThread"): self.processors_node = processors_node self.max_processors = max_processors self.max_wrapped_jobs = max_wrapped_jobs @@ -530,6 +531,7 @@ class JobPackagerHorizontal(object): self._sort_order_dict = dict() self._components_dict = dict() self._section_processors = dict() + self.method = method self._maxTotalProcessors = 0 self._sectionList = list() diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 556232357..5fd4e1a43 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -208,7 +208,7 @@ class JobPackageArray(JobPackageBase): filename += '.{0}'.format(index) input_content = self._job_scripts[self.jobs[index - 1].name] open(os.path.join(self._tmp_path, filename), 'w').write(input_content) - os.chmod(os.path.join(self._tmp_path, filename), 0o775) + os.chmod(os.path.join(self._tmp_path, filename), 0o755) return filename def _create_common_script(self, filename): @@ -217,7 +217,7 @@ class JobPackageArray(JobPackageBase): directives=self.platform.custom_directives) filename += '.cmd' open(os.path.join(self._tmp_path, filename), 'w').write(script_content) - os.chmod(os.path.join(self._tmp_path, filename), 0o775) + os.chmod(os.path.join(self._tmp_path, filename), 0o755) return filename def _send_files(self): @@ -252,7 +252,7 @@ class JobPackageThread(JobPackageBase): """ FILE_PREFIX = 'ASThread' - def __init__(self, jobs, dependency=None, jobs_resources=dict()): + def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread'): super(JobPackageThread, self).__init__(jobs) self._job_scripts = {} # Seems like this one is not used at all in the class @@ -263,6 +263,7 @@ class JobPackageThread(JobPackageBase): self._jobs_resources = jobs_resources self._wrapper_factory = self.platform.wrapper self.queue = jobs[0]._queue + self.method = method #pipeline @property def name(self): @@ -301,6 +302,7 @@ class JobPackageThread(JobPackageBase): self._job_dependency = dependency def _create_scripts(self, configuration): + for i in range(1, len(self.jobs) + 1): self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) self.jobs[i - 1].remote_logs = ( @@ -313,7 +315,7 @@ class JobPackageThread(JobPackageBase): script_content = self._common_script_content() script_file = self.name + '.cmd' open(os.path.join(self._tmp_path, script_file), 'w').write(script_content) - os.chmod(os.path.join(self._tmp_path, script_file), 0o775) + os.chmod(os.path.join(self._tmp_path, script_file), 0o755) return script_file def _send_files(self): @@ -409,7 +411,7 @@ class JobPackageThreadWrapped(JobPackageThread): script_content = self._common_script_content() script_file = self.name + '.cmd' open(os.path.join(self._tmp_path, script_file), 'w').write(script_content) - os.chmod(os.path.join(self._tmp_path, script_file), 0o775) + os.chmod(os.path.join(self._tmp_path, script_file), 0o755) return script_file def _send_files(self): @@ -474,8 +476,9 @@ class JobPackageHorizontal(JobPackageThread): Class to manage a horizontal thread-based package of jobs to be submitted by autosubmit """ - def __init__(self, jobs, dependency=None, jobs_resources=dict()): + def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread'): super(JobPackageHorizontal, self).__init__(jobs, dependency, jobs_resources) + self.method = method self._queue = self.queue for job in jobs: if job.wallclock > self._wallclock: @@ -494,7 +497,7 @@ class JobPackageHorizontal(JobPackageThread): num_processors=self._num_processors, jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, rootdir=self.platform.root_dir, - directives=self._custom_directives,threads=self._threads) + directives=self._custom_directives,threads=self._threads,method=self.method.lower()) class JobPackageHybrid(JobPackageThread): """ @@ -550,3 +553,4 @@ class JobPackageHorizontalVertical(JobPackageHybrid): jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads) + diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 00fd27412..1296f408f 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -121,43 +121,43 @@ class SlurmHeader(object): return "" SERIAL = textwrap.dedent("""\ - ############################################################################### - # %TASKTYPE% %EXPID% EXPERIMENT - ############################################################################### - # - #%QUEUE_DIRECTIVE% - #%ACCOUNT_DIRECTIVE% - #%MEMORY_DIRECTIVE% - #%TASKS_PER_NODE_DIRECTIVE% - #%THREADS% - #%NUMTASK% - #SBATCH -n %NUMPROC% - #SBATCH -t %WALLCLOCK%:00 - #SBATCH -J %JOBNAME% - #SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% - #SBATCH --error=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% - %CUSTOM_DIRECTIVES% - # - ############################################################################### +############################################################################### +# %TASKTYPE% %EXPID% EXPERIMENT +############################################################################### +# +#%QUEUE_DIRECTIVE% +#%ACCOUNT_DIRECTIVE% +#%MEMORY_DIRECTIVE% +#%TASKS_PER_NODE_DIRECTIVE% +#%THREADS% +#%NUMTASK% +#SBATCH -n %NUMPROC% +#SBATCH -t %WALLCLOCK%:00 +#SBATCH -J %JOBNAME% +#SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% +#SBATCH --error=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% +%CUSTOM_DIRECTIVES% +# +############################################################################### """) PARALLEL = textwrap.dedent("""\ - ############################################################################### - # %TASKTYPE% %EXPID% EXPERIMENT - ############################################################################### - # - #%QUEUE_DIRECTIVE% - #%ACCOUNT_DIRECTIVE% - #%MEMORY_DIRECTIVE% - #%MEMORY_PER_TASK_DIRECTIVE% - #%TASKS_PER_NODE_DIRECTIVE% - #%THREADS% - #SBATCH -n %NUMPROC% - #SBATCH -t %WALLCLOCK%:00 - #SBATCH -J %JOBNAME% - #SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% - #SBATCH --error=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% - %CUSTOM_DIRECTIVES% - # - ############################################################################### - """) +############################################################################### +# %TASKTYPE% %EXPID% EXPERIMENT +############################################################################### +# +#%QUEUE_DIRECTIVE% +#%ACCOUNT_DIRECTIVE% +#%MEMORY_DIRECTIVE% +#%MEMORY_PER_TASK_DIRECTIVE% +#%TASKS_PER_NODE_DIRECTIVE% +#%THREADS% +#SBATCH -n %NUMPROC% +#SBATCH -t %WALLCLOCK%:00 +#SBATCH -J %JOBNAME% +#SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% +#SBATCH --error=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% +%CUSTOM_DIRECTIVES% +# +############################################################################### + """) diff --git a/autosubmit/platforms/saga_platform.py b/autosubmit/platforms/saga_platform.py index 6495a7abd..fd240956e 100644 --- a/autosubmit/platforms/saga_platform.py +++ b/autosubmit/platforms/saga_platform.py @@ -40,7 +40,7 @@ class SagaPlatform(Platform): destiny_path = os.path.join(self.get_files_path(), filename) subprocess.check_call(['ecaccess-file-put', os.path.join(self.tmp_path, filename), '{0}:{1}'.format(self.host, destiny_path)]) - subprocess.check_call(['ecaccess-file-chmod', '740', '{0}:{1}'.format(self.host, destiny_path)]) + subprocess.check_call(['ecaccess-file-chmod', '750', '{0}:{1}'.format(self.host, destiny_path)]) return except subprocess.CalledProcessError: raise Exception("Could't send file {0} to {1}:{2}".format(os.path.join(self.tmp_path, filename), diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 115abee24..a2e46b076 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -158,9 +158,13 @@ class SlurmPlatform(ParamikoPlatform): @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads): - return """\ - #!/usr/bin/env python + def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="#!/usr/bin/env python"): + if method =='srun': + language = "#!/bin/bash" + else: + language = "#!/usr/bin/env python" + return \ + language+""" ############################################################################### # {0} ############################################################################### diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 401f796d7..26dc689c8 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -33,9 +33,10 @@ class WrapperDirector: header = self._builder.build_header() job_thread = self._builder.build_job_thread() - - main = self._builder.build_main() - + if "bash" not in header[0:15]: + main = self._builder.build_main() + else: + nodes,main = self._builder.build_main() #What to do with nodes? # change to WrapperScript object wrapper_script = header + job_thread + main wrapper_script = wrapper_script.replace("_NEWLINE_", '\\n') @@ -66,6 +67,7 @@ class WrapperBuilder(object): def build_job_thread(self): pass + # hybrids def build_joblist_thread(self, **kwargs): pass @@ -128,7 +130,7 @@ class PythonWrapperBuilder(WrapperBuilder): import copy # Defining scripts to be run - scripts = {0} + scripts= {0} """).format(str(self.job_scripts), '\n'.ljust(13)) def build_job_thread(self): @@ -395,12 +397,14 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): return joblist_thread + nodes_list + threads_launcher + class BashWrapperBuilder(WrapperBuilder): def build_imports(self): return "" def build_main(self): + return textwrap.dedent(""" # Initializing variables scripts="{0}" @@ -464,3 +468,192 @@ class BashHorizontalWrapperBuilder(BashWrapperBuilder): def build_main(self): return super(BashHorizontalWrapperBuilder, self).build_main() + self.build_parallel_threads_launcher() + +#SRUN CLASES + +class SrunWrapperBuilder(WrapperBuilder): + + def build_imports(self): + scripts_bash = "(" + + for script in self.job_scripts: + scripts_bash+=str("\""+script+"\"")+" " + scripts_bash += ")" + return textwrap.dedent(""" + + # Defining scripts to be run + declare -a scripts={0} + """).format(str(scripts_bash), '\n'.ljust(13)) + + # hybrids + def build_joblist_thread(self): + pass + + def build_job_thread(self): + return textwrap.dedent(""" """) + # horizontal and hybrids + def build_nodes_list(self): + return self.get_nodes() + self.build_cores_list() + + def get_nodes(self): + + return textwrap.dedent(""" + # Getting the list of allocated nodes + {0} + os.system("mkdir -p machinefiles") + + with open('node_list', 'r') as file: + all_nodes = file.read() + + all_nodes = all_nodes.split("_NEWLINE_") + """).format(self.allocated_nodes, '\n'.ljust(13)) + + def build_cores_list(self): + return textwrap.dedent(""" + total_cores = {0} + jobs_resources = {1} + + processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) + + idx = 0 + all_cores = [] + while total_cores > 0: + if processors_per_node > 0: + processors_per_node -= 1 + total_cores -= 1 + all_cores.append(all_nodes[idx]) + else: + idx += 1 + processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) + + processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) + """).format(self.num_procs, str(self.jobs_resources), '\n'.ljust(13)) + + def build_machinefiles(self): + machinefile_function = self.get_machinefile_function() + if machinefile_function: + return self.get_machinefile_function() + self._indent(self.write_machinefiles(), self.machinefiles_indent) + return "" + + def build_machinefiles_standard(self): + return textwrap.dedent(""" + machines = str() + + cores = int(jobs_resources[section]['PROCESSORS']) + tasks = int(jobs_resources[section]['TASKS']) + nodes = int(ceil(int(cores)/float(tasks))) + if tasks < processors_per_node: + cores = tasks + + job_cores = cores + while nodes > 0: + while cores > 0: + if len(all_cores) > 0: + node = all_cores.pop(0) + if node: + machines += node +"_NEWLINE_" + cores -= 1 + for rest in range(processors_per_node-tasks): + if len(all_cores) > 0: + all_cores.pop(0) + nodes -= 1 + if tasks < processors_per_node: + cores = job_cores + """).format('\n'.ljust(13)) + + def _create_components_dict(self): + return textwrap.dedent(""" + xio_procs = int(jobs_resources[section]['COMPONENTS']['XIO_NUMPROC']) + rnf_procs = int(jobs_resources[section]['COMPONENTS']['RNF_NUMPROC']) + ifs_procs = int(jobs_resources[section]['COMPONENTS']['IFS_NUMPROC']) + nem_procs = int(jobs_resources[section]['COMPONENTS']['NEM_NUMPROC']) + + components = OrderedDict([ + ('XIO', xio_procs), + ('RNF', rnf_procs), + ('IFS', ifs_procs), + ('NEM', nem_procs) + ]) + + jobs_resources[section]['COMPONENTS'] = components + """).format('\n'.ljust(13)) + + def build_machinefiles_components(self): + return textwrap.dedent(""" + {0} + + machines = str() + for component, cores in jobs_resources[section]['COMPONENTS'].items(): + while cores > 0: + if len(all_cores) > 0: + node = all_cores.pop(0) + if node: + machines += node +"_NEWLINE_" + cores -= 1 + """).format(self._create_components_dict(), '\n'.ljust(13)) + + def write_machinefiles(self): + return textwrap.dedent(""" + machines = "_NEWLINE_".join([s for s in machines.split("_NEWLINE_") if s]) + with open("machinefiles/machinefile_"+{0}, "w") as machinefile: + machinefile.write(machines) + """).format(self.machinefiles_name, '\n'.ljust(13)) + + def build_srun_launcher(self, jobs_list, thread, footer=True): + num_procs=int(self.num_procs)/len(jobs_list) + srun_launcher = textwrap.dedent(""" + i=0 + suffix=".cmd" + for template in "${{{0}[@]}}"; do + jobname=${{template%"$suffix"}} + echo $(date +%s) > "${{jobname}}"_STAT + out="${{template}}.${{i}}.out" + err="${{template}}.${{i}}.err" + ((i=i+1)) + echo $template + srun -N 1 -n {2} $template$i$ > $out 2> $err & + echo $template$i$ + """).format(jobs_list, thread,num_procs, '\n'.ljust(13)) + if footer: + srun_launcher += self._indent(textwrap.dedent(""" + suffix_completed=".COMPLETED" + completed_filename=${{template%"$suffix"}} + completed_filename="$completed_filename"_COMPLETED + completed_path=${{pwd}}$completed_filename + if [ -f "$completed_path" ]; + then + echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED" + else + echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has FAILED" + fi + done + wait + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)),0) + else: + srun_launcher += self._indent(textwrap.dedent(""" + done + wait + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)),0) + return srun_launcher + + + # all should override -> abstract! + def build_main(self): + pass + + def dependency_directive(self): + pass + + def queue_directive(self): + pass + + def _indent(self, text, amount, ch=' '): + padding = amount * ch + return ''.join(padding + line for line in text.splitlines(True)) + +class SrunHorizontalWrapperBuilder(SrunWrapperBuilder): + + def build_main(self): + nodelist = self.build_nodes_list() + srun_launcher = self.build_srun_launcher("scripts", "JobThread") + return nodelist, srun_launcher \ No newline at end of file diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 549bd0dc2..d7a0db1fd 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -19,7 +19,8 @@ from autosubmit.platforms.wrappers.wrapper_builder import WrapperDirector, PythonVerticalWrapperBuilder, \ PythonHorizontalWrapperBuilder, PythonHorizontalVerticalWrapperBuilder, PythonVerticalHorizontalWrapperBuilder, \ - BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder + BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder, SrunHorizontalWrapperBuilder +from autosubmit.config.config_common import AutosubmitConfig class WrapperFactory(object): @@ -75,7 +76,11 @@ class SlurmWrapperFactory(WrapperFactory): return PythonVerticalWrapperBuilder(**kwargs) def horizontal_wrapper(self, **kwargs): - return PythonHorizontalWrapperBuilder(**kwargs) + + if kwargs["method"] == 'srun': + return SrunHorizontalWrapperBuilder(**kwargs) + else: + return PythonHorizontalWrapperBuilder(**kwargs) def hybrid_wrapper_horizontal_vertical(self, **kwargs): return PythonHorizontalVerticalWrapperBuilder(**kwargs) @@ -86,7 +91,7 @@ class SlurmWrapperFactory(WrapperFactory): def header_directives(self, **kwargs): return self.platform.wrapper_header(kwargs['name'], kwargs['queue'], kwargs['project'], kwargs['wallclock'], - kwargs['num_processors'], kwargs['dependency'], kwargs['directives'],kwargs['threads']) + kwargs['num_processors'], kwargs['dependency'], kwargs['directives'],kwargs['threads'],kwargs['method']) def allocated_nodes(self): return self.platform.allocated_nodes() diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 705d32eee..61967ae8b 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -178,7 +178,7 @@ class TestJob(TestCase): update_content_mock.assert_called_with(config) open_mock.assert_called_with(os.path.join(self.job._tmp_path, self.job.name + '.cmd'), 'w') write_mock.write.assert_called_with('some-content: 999, 777, 666 % %') - chmod_mock.assert_called_with(os.path.join(self.job._tmp_path, self.job.name + '.cmd'), 0o775) + chmod_mock.assert_called_with(os.path.join(self.job._tmp_path, self.job.name + '.cmd'), 0o755) def test_that_check_script_returns_false_when_there_is_an_unbound_template_variable(self): # arrange -- GitLab From 6b7fa0da9100a7de3a8b184397a35c99778fa03c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 20 Feb 2020 14:37:22 +0100 Subject: [PATCH 2/4] Added srun (working) for horizontal wrappers --- autosubmit/job/job_packager.py | 2 +- autosubmit/platforms/paramiko_platform.py | 2 +- autosubmit/platforms/slurmplatform.py | 58 +++++++++++++------ .../platforms/wrappers/wrapper_builder.py | 22 +++---- 4 files changed, 49 insertions(+), 35 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index a996d0c27..57306ef43 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -52,7 +52,7 @@ class JobPackager(object): self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() - self.wrapper_method = self._as_config.get_wrapper_method() + self.wrapper_method = self._as_config.get_wrapper_method().lower() # True or False self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 9fe98b936..703b91e80 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -252,7 +252,7 @@ class ParamikoPlatform(Platform): self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(), dest)) else: try: - self._ftpChannel.chdir((os.path.join(self.get_files_path(), src))) + #self._ftpChannel.chdir((os.path.join(self.get_files_path(), src))) self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(),dest)) except (IOError): pass diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index a2e46b076..9298d5057 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -161,27 +161,47 @@ class SlurmPlatform(ParamikoPlatform): def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="#!/usr/bin/env python"): if method =='srun': language = "#!/bin/bash" + return \ + language + """ +############################################################################### +# {0} +############################################################################### +# +#SBATCH -J {0} +{1} +#SBATCH -A {2} +#SBATCH --output={0}.out +#SBATCH --error={0}.err +#SBATCH -t {3}:00 +#SBATCH -n {4} +#SBATCH --cpus-per-task={7} +{5} +{6} +# +############################################################################### + """.format(filename, queue, project, wallclock, num_procs, dependency, + '\n'.ljust(13).join(str(s) for s in directives), threads) else: language = "#!/usr/bin/env python" - return \ - language+""" - ############################################################################### - # {0} - ############################################################################### - # - #SBATCH -J {0} - {1} - #SBATCH -A {2} - #SBATCH --output={0}.out - #SBATCH --error={0}.err - #SBATCH -t {3}:00 - #SBATCH --cpus-per-task={7} - #SBATCH -n {4} - {5} - {6} - # - ############################################################################### - """.format(filename, queue, project, wallclock, num_procs, dependency, + return \ + language+""" +############################################################################### +# {0} +############################################################################### +# +#SBATCH -J {0} +{1} +#SBATCH -A {2} +#SBATCH --output={0}.out +#SBATCH --error={0}.err +#SBATCH -t {3}:00 +#SBATCH --cpus-per-task={7} +#SBATCH -n {4} +{5} +{6} +# +############################################################################### + """.format(filename, queue, project, wallclock, num_procs, dependency, '\n'.ljust(13).join(str(s) for s in directives),threads) @staticmethod diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 26dc689c8..00c255870 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -599,27 +599,27 @@ class SrunWrapperBuilder(WrapperBuilder): machinefile.write(machines) """).format(self.machinefiles_name, '\n'.ljust(13)) - def build_srun_launcher(self, jobs_list, thread, footer=True): - num_procs=int(self.num_procs)/len(jobs_list) + def build_srun_launcher(self, jobs_list, threads, footer=True): srun_launcher = textwrap.dedent(""" i=0 suffix=".cmd" for template in "${{{0}[@]}}"; do jobname=${{template%"$suffix"}} - echo $(date +%s) > "${{jobname}}"_STAT out="${{template}}.${{i}}.out" err="${{template}}.${{i}}.err" + srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & + sleep "0.4" ((i=i+1)) - echo $template - srun -N 1 -n {2} $template$i$ > $out 2> $err & - echo $template$i$ - """).format(jobs_list, thread,num_procs, '\n'.ljust(13)) + done + wait + """).format(jobs_list, self.threads, '\n'.ljust(13)) if footer: srun_launcher += self._indent(textwrap.dedent(""" + for template in "${{{0}[@]}}"; do suffix_completed=".COMPLETED" completed_filename=${{template%"$suffix"}} completed_filename="$completed_filename"_COMPLETED - completed_path=${{pwd}}$completed_filename + completed_path=${{PWD}}/$completed_filename if [ -f "$completed_path" ]; then echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED" @@ -627,13 +627,7 @@ class SrunWrapperBuilder(WrapperBuilder): echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has FAILED" fi done - wait """).format(jobs_list, self.exit_thread, '\n'.ljust(13)),0) - else: - srun_launcher += self._indent(textwrap.dedent(""" - done - wait - """).format(jobs_list, self.exit_thread, '\n'.ljust(13)),0) return srun_launcher -- GitLab From 28e3413690f28029d6e3838b6cf2913ac1f5163a Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 20 Feb 2020 15:03:42 +0100 Subject: [PATCH 3/4] Pipeline fix tests --- test/unit/test_wrappers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index b0604c42d..9f64a7706 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -161,6 +161,7 @@ class TestWrappers(TestCase): self.config.get_wrapper_crossdate = Mock(return_value=False) self.config.get_remote_dependencies = Mock(return_value=False) self.config.get_wrapper_jobs = Mock(return_value='None') + self.config.get_wrapper_method = Mock(return_value='ASThread') self.job_packager = JobPackager(self.config, self.platform, self.job_list) ### ONE SECTION WRAPPER ### @@ -210,7 +211,7 @@ class TestWrappers(TestCase): self.job_packager.wrapper_type = 'vertical' returned_packages = self.job_packager._build_vertical_packages(section_list, max_wrapped_jobs) - +test_ordered_dict_jobs_running_date_mixed_wrapper package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2, d1_m1_9_s2, d1_m1_10_s2] package_m2_s2 = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2, d1_m2_6_s2, d1_m2_7_s2, d1_m2_8_s2, -- GitLab From 14cfd3fa8a475c03e182d3510e9aa920413954a7 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 20 Feb 2020 15:06:42 +0100 Subject: [PATCH 4/4] Pipeline fix tests --- test/unit/test_wrappers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index 9f64a7706..a9879483b 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -162,6 +162,7 @@ class TestWrappers(TestCase): self.config.get_remote_dependencies = Mock(return_value=False) self.config.get_wrapper_jobs = Mock(return_value='None') self.config.get_wrapper_method = Mock(return_value='ASThread') + self.job_packager = JobPackager(self.config, self.platform, self.job_list) ### ONE SECTION WRAPPER ### @@ -211,7 +212,7 @@ class TestWrappers(TestCase): self.job_packager.wrapper_type = 'vertical' returned_packages = self.job_packager._build_vertical_packages(section_list, max_wrapped_jobs) -test_ordered_dict_jobs_running_date_mixed_wrapper + package_m1_s2 = [d1_m1_1_s2, d1_m1_2_s2, d1_m1_3_s2, d1_m1_4_s2, d1_m1_5_s2, d1_m1_6_s2, d1_m1_7_s2, d1_m1_8_s2, d1_m1_9_s2, d1_m1_10_s2] package_m2_s2 = [d1_m2_1_s2, d1_m2_2_s2, d1_m2_3_s2, d1_m2_4_s2, d1_m2_5_s2, d1_m2_6_s2, d1_m2_7_s2, d1_m2_8_s2, -- GitLab