diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index e5b5ece9e5de73f82f155372887c4b8645f75f08..25248b789ac819b8f5afeb69a6c5502742b53d40 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -733,11 +733,11 @@ class Autosubmit: os.mkdir(tmp_path) os.chmod(tmp_path, 0o775) os.mkdir(os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR)) - os.chmod(os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR), 0o775) + os.chmod(os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR), 0o755) Log.debug("Creating temporal remote directory...") remote_tmp_path = os.path.join(tmp_path,"LOG_"+exp_id) os.mkdir(remote_tmp_path) - os.chmod(remote_tmp_path, 0o775) + os.chmod(remote_tmp_path, 0o755) Log.debug("Creating pkl directory...") @@ -745,9 +745,22 @@ class Autosubmit: Log.debug("Creating plot directory...") os.mkdir(os.path.join(exp_id_path, "plot")) - os.chmod(os.path.join(exp_id_path, "plot"), 0o775) + os.chmod(os.path.join(exp_id_path, "plot"), 0o755) Log.result("Experiment registered successfully") Log.user_warning("Remember to MODIFY the config files!") + try: + Log.debug("Setting the right permissions...") + os.chmod(os.path.join(exp_id_path, "conf"), 0o755) + os.chmod(os.path.join(exp_id_path, "pkl"), 0o755) + os.chmod(os.path.join(exp_id_path, "tmp"), 0o755) + os.chmod(os.path.join(exp_id_path, "plot"), 0o755) + os.chmod(os.path.join(exp_id_path, "conf/autosubmit_" + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/expdef_" + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/jobs_" + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/platforms_" + str(exp_id) + ".conf"), 0o644) + os.chmod(os.path.join(exp_id_path, "conf/proj_" + str(exp_id) + ".conf"), 0o644) + except: + pass return exp_id @staticmethod @@ -852,7 +865,7 @@ class Autosubmit: Log.debug("Sleep: {0}", safetysleeptime) packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0664) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0644) packages_persistence.reset_table(True) job_list_original = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) @@ -1035,7 +1048,7 @@ class Autosubmit: aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) if not os.path.exists(aslogs_path): os.mkdir(aslogs_path) - os.chmod(aslogs_path,0o775) + os.chmod(aslogs_path,0o755) if not os.path.exists(exp_path): Log.critical("The directory %s is needed and does not exist" % exp_path) Log.warning("Does an experiment with the given id exist?") @@ -1123,7 +1136,7 @@ class Autosubmit: "job_packages_" + expid) if as_conf.get_wrapper_type() != 'none': - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0664) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0644) packages = packages_persistence.load() for (exp_id, package_name, job_name) in packages: if package_name not in job_list.packages_dict: @@ -1344,7 +1357,7 @@ class Autosubmit: if len(valid_packages_to_submit) > 0: jobs_id = platform.submit_Script(hold=hold) if jobs_id is None: - raise BaseException("Exiting AS being unable to get jobID") + raise BaseException("Exiting AS, AS is unable to get jobID this can be due a failure on the platform or a bad parameter on job.conf(check that queue parameter is valid for your current platform(CNS,BSC32,PRACE...)") i = 0 for package in valid_packages_to_submit: for job in package.jobs: @@ -1498,7 +1511,7 @@ class Autosubmit: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) # Permissons - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0664) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_packages_" + expid + ".db"), 0644) #Database modification packages_persistence.reset_table(True) referenced_jobs_to_remove = set() @@ -2551,7 +2564,7 @@ class Autosubmit: year_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, str(year)) if not os.path.exists(year_path): os.mkdir(year_path) - os.chmod(year_path, 0o775) + os.chmod(year_path, 0o755) if compress: compress_type="w:gz" output_filepath = '{0}.tar.gz'.format(expid) @@ -2561,7 +2574,7 @@ class Autosubmit: with tarfile.open(os.path.join(year_path, output_filepath), compress_type) as tar: tar.add(exp_folder, arcname='') tar.close() - os.chmod(os.path.join(year_path,output_filepath), 0o775) + os.chmod(os.path.join(year_path,output_filepath), 0o755) except Exception as e: Log.critical("Can not write tar file: {0}".format(e)) return False @@ -2702,7 +2715,7 @@ class Autosubmit: aslogs_path = os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR) if not os.path.exists(aslogs_path): os.mkdir(aslogs_path) - os.chmod(aslogs_path,0o775) + os.chmod(aslogs_path,0o755) if not os.path.exists(exp_path): Log.critical("The directory %s is needed and does not exist." % exp_path) Log.warning("Does an experiment with the given id exist?") @@ -3441,7 +3454,7 @@ class Autosubmit: if as_conf.get_wrapper_type() != 'none' and check_wrapper: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) - os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0664) + os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl","job_packages_" + expid+".db"), 0644) packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index d9e0ec5e76564b8af9a7825269600b7aadf2e9b5..1be9228b5efdcdf193775c3b9f39e104a19138f1 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1098,6 +1098,14 @@ class AutosubmitConfig(object): :rtype: int """ return int(self._conf_parser.get_option('wrapper', 'MAX_WRAPPED', self.get_total_jobs())) + def get_wrapper_method(self): + """ + Returns the method of make the wrapper + + :return: method + :rtype: string + """ + return self._conf_parser.get_option('wrapper', 'METHOD', 'ASThread') def get_wrapper_check_time(self): """ Returns time to check the status of jobs in the wrapper diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 482859a0a159783a97f0573f8b6a2a7d71e63826..414e7daceb44269d88331b63cfda8f2706a73f19 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -817,7 +817,7 @@ class Job(object): template_content = template_content.replace("%%", "%") script_name = '{0}.cmd'.format(self.name) open(os.path.join(self._tmp_path, script_name), 'w').write(template_content) - os.chmod(os.path.join(self._tmp_path, script_name), 0o775) + os.chmod(os.path.join(self._tmp_path, script_name), 0o755) return script_name def create_wrapped_script(self, as_conf, wrapper_tag='wrapped'): @@ -831,7 +831,7 @@ class Job(object): template_content = template_content.replace("%%", "%") script_name = '{0}.{1}.cmd'.format(self.name, wrapper_tag) open(os.path.join(self._tmp_path, script_name), 'w').write(template_content) - os.chmod(os.path.join(self._tmp_path, script_name), 0o775) + os.chmod(os.path.join(self._tmp_path, script_name), 0o755) return script_name def check_script(self, as_conf, parameters,show_logs=False): diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 1f0423cedd9573a31f5917ffba31e5c7fbfef596..57306ef430f36382333164d0e62a6450fba418d7 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -52,6 +52,7 @@ class JobPackager(object): self.max_jobs = min(self._max_wait_jobs_to_submit, self._max_jobs_to_submit) # These are defined in the [wrapper] section of autosubmit_,conf self.wrapper_type = self._as_config.get_wrapper_type() + self.wrapper_method = self._as_config.get_wrapper_method().lower() # True or False self.jobs_in_wrapper = self._as_config.get_wrapper_jobs() @@ -212,7 +213,7 @@ class JobPackager(object): def _build_horizontal_packages(self, section_list, max_wrapped_jobs, section): packages = [] horizontal_packager = JobPackagerHorizontal(section_list, self._platform.max_processors, max_wrapped_jobs, - self.max_jobs, self._platform.processors_per_node) + self.max_jobs, self._platform.processors_per_node, self.wrapper_method) package_jobs = horizontal_packager.build_horizontal_package() @@ -224,7 +225,7 @@ class JobPackager(object): if machinefile_function == 'COMPONENTS': jobs_resources = horizontal_packager.components_dict jobs_resources['MACHINEFILES'] = machinefile_function - current_package = JobPackageHorizontal(package_jobs, jobs_resources=jobs_resources) + current_package = JobPackageHorizontal(package_jobs, jobs_resources=jobs_resources,method=self.wrapper_method) packages.append(current_package) @@ -520,7 +521,7 @@ class JobPackagerVerticalMixed(JobPackagerVertical): class JobPackagerHorizontal(object): - def __init__(self, job_list, max_processors, max_wrapped_jobs, max_jobs, processors_node): + def __init__(self, job_list, max_processors, max_wrapped_jobs, max_jobs, processors_node, method="ASThread"): self.processors_node = processors_node self.max_processors = max_processors self.max_wrapped_jobs = max_wrapped_jobs @@ -530,6 +531,7 @@ class JobPackagerHorizontal(object): self._sort_order_dict = dict() self._components_dict = dict() self._section_processors = dict() + self.method = method self._maxTotalProcessors = 0 self._sectionList = list() diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 556232357224389973ff5223807a0c4e57d2a7ec..5fd4e1a434793d6073006c8edbc041f6e7065c55 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -208,7 +208,7 @@ class JobPackageArray(JobPackageBase): filename += '.{0}'.format(index) input_content = self._job_scripts[self.jobs[index - 1].name] open(os.path.join(self._tmp_path, filename), 'w').write(input_content) - os.chmod(os.path.join(self._tmp_path, filename), 0o775) + os.chmod(os.path.join(self._tmp_path, filename), 0o755) return filename def _create_common_script(self, filename): @@ -217,7 +217,7 @@ class JobPackageArray(JobPackageBase): directives=self.platform.custom_directives) filename += '.cmd' open(os.path.join(self._tmp_path, filename), 'w').write(script_content) - os.chmod(os.path.join(self._tmp_path, filename), 0o775) + os.chmod(os.path.join(self._tmp_path, filename), 0o755) return filename def _send_files(self): @@ -252,7 +252,7 @@ class JobPackageThread(JobPackageBase): """ FILE_PREFIX = 'ASThread' - def __init__(self, jobs, dependency=None, jobs_resources=dict()): + def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread'): super(JobPackageThread, self).__init__(jobs) self._job_scripts = {} # Seems like this one is not used at all in the class @@ -263,6 +263,7 @@ class JobPackageThread(JobPackageBase): self._jobs_resources = jobs_resources self._wrapper_factory = self.platform.wrapper self.queue = jobs[0]._queue + self.method = method #pipeline @property def name(self): @@ -301,6 +302,7 @@ class JobPackageThread(JobPackageBase): self._job_dependency = dependency def _create_scripts(self, configuration): + for i in range(1, len(self.jobs) + 1): self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) self.jobs[i - 1].remote_logs = ( @@ -313,7 +315,7 @@ class JobPackageThread(JobPackageBase): script_content = self._common_script_content() script_file = self.name + '.cmd' open(os.path.join(self._tmp_path, script_file), 'w').write(script_content) - os.chmod(os.path.join(self._tmp_path, script_file), 0o775) + os.chmod(os.path.join(self._tmp_path, script_file), 0o755) return script_file def _send_files(self): @@ -409,7 +411,7 @@ class JobPackageThreadWrapped(JobPackageThread): script_content = self._common_script_content() script_file = self.name + '.cmd' open(os.path.join(self._tmp_path, script_file), 'w').write(script_content) - os.chmod(os.path.join(self._tmp_path, script_file), 0o775) + os.chmod(os.path.join(self._tmp_path, script_file), 0o755) return script_file def _send_files(self): @@ -474,8 +476,9 @@ class JobPackageHorizontal(JobPackageThread): Class to manage a horizontal thread-based package of jobs to be submitted by autosubmit """ - def __init__(self, jobs, dependency=None, jobs_resources=dict()): + def __init__(self, jobs, dependency=None, jobs_resources=dict(),method='ASThread'): super(JobPackageHorizontal, self).__init__(jobs, dependency, jobs_resources) + self.method = method self._queue = self.queue for job in jobs: if job.wallclock > self._wallclock: @@ -494,7 +497,7 @@ class JobPackageHorizontal(JobPackageThread): num_processors=self._num_processors, jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, rootdir=self.platform.root_dir, - directives=self._custom_directives,threads=self._threads) + directives=self._custom_directives,threads=self._threads,method=self.method.lower()) class JobPackageHybrid(JobPackageThread): """ @@ -550,3 +553,4 @@ class JobPackageHorizontalVertical(JobPackageHybrid): jobs_scripts=self._jobs_scripts, dependency=self._job_dependency, jobs_resources=self._jobs_resources, expid=self._expid, rootdir=self.platform.root_dir, directives=self._custom_directives,threads=self._threads) + diff --git a/autosubmit/platforms/headers/slurm_header.py b/autosubmit/platforms/headers/slurm_header.py index 00fd274129d79f4ad2dfdf93603f9710a46cbfe9..1296f408fa019cc4b1829f64c8123dd0456ba660 100644 --- a/autosubmit/platforms/headers/slurm_header.py +++ b/autosubmit/platforms/headers/slurm_header.py @@ -121,43 +121,43 @@ class SlurmHeader(object): return "" SERIAL = textwrap.dedent("""\ - ############################################################################### - # %TASKTYPE% %EXPID% EXPERIMENT - ############################################################################### - # - #%QUEUE_DIRECTIVE% - #%ACCOUNT_DIRECTIVE% - #%MEMORY_DIRECTIVE% - #%TASKS_PER_NODE_DIRECTIVE% - #%THREADS% - #%NUMTASK% - #SBATCH -n %NUMPROC% - #SBATCH -t %WALLCLOCK%:00 - #SBATCH -J %JOBNAME% - #SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% - #SBATCH --error=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% - %CUSTOM_DIRECTIVES% - # - ############################################################################### +############################################################################### +# %TASKTYPE% %EXPID% EXPERIMENT +############################################################################### +# +#%QUEUE_DIRECTIVE% +#%ACCOUNT_DIRECTIVE% +#%MEMORY_DIRECTIVE% +#%TASKS_PER_NODE_DIRECTIVE% +#%THREADS% +#%NUMTASK% +#SBATCH -n %NUMPROC% +#SBATCH -t %WALLCLOCK%:00 +#SBATCH -J %JOBNAME% +#SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% +#SBATCH --error=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% +%CUSTOM_DIRECTIVES% +# +############################################################################### """) PARALLEL = textwrap.dedent("""\ - ############################################################################### - # %TASKTYPE% %EXPID% EXPERIMENT - ############################################################################### - # - #%QUEUE_DIRECTIVE% - #%ACCOUNT_DIRECTIVE% - #%MEMORY_DIRECTIVE% - #%MEMORY_PER_TASK_DIRECTIVE% - #%TASKS_PER_NODE_DIRECTIVE% - #%THREADS% - #SBATCH -n %NUMPROC% - #SBATCH -t %WALLCLOCK%:00 - #SBATCH -J %JOBNAME% - #SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% - #SBATCH --error=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% - %CUSTOM_DIRECTIVES% - # - ############################################################################### - """) +############################################################################### +# %TASKTYPE% %EXPID% EXPERIMENT +############################################################################### +# +#%QUEUE_DIRECTIVE% +#%ACCOUNT_DIRECTIVE% +#%MEMORY_DIRECTIVE% +#%MEMORY_PER_TASK_DIRECTIVE% +#%TASKS_PER_NODE_DIRECTIVE% +#%THREADS% +#SBATCH -n %NUMPROC% +#SBATCH -t %WALLCLOCK%:00 +#SBATCH -J %JOBNAME% +#SBATCH --output=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% +#SBATCH --error=%CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% +%CUSTOM_DIRECTIVES% +# +############################################################################### + """) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 9fe98b936a8e9e41c0a09e3eaf86fc214881ac24..703b91e8085df9372e675f43af0412b876c57575 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -252,7 +252,7 @@ class ParamikoPlatform(Platform): self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(), dest)) else: try: - self._ftpChannel.chdir((os.path.join(self.get_files_path(), src))) + #self._ftpChannel.chdir((os.path.join(self.get_files_path(), src))) self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(),dest)) except (IOError): pass diff --git a/autosubmit/platforms/saga_platform.py b/autosubmit/platforms/saga_platform.py index 6495a7abd9aaae3bc25b42e364dd2025b5f92895..fd240956eaface29021a98e0b8f7d1fc578d9bf2 100644 --- a/autosubmit/platforms/saga_platform.py +++ b/autosubmit/platforms/saga_platform.py @@ -40,7 +40,7 @@ class SagaPlatform(Platform): destiny_path = os.path.join(self.get_files_path(), filename) subprocess.check_call(['ecaccess-file-put', os.path.join(self.tmp_path, filename), '{0}:{1}'.format(self.host, destiny_path)]) - subprocess.check_call(['ecaccess-file-chmod', '740', '{0}:{1}'.format(self.host, destiny_path)]) + subprocess.check_call(['ecaccess-file-chmod', '750', '{0}:{1}'.format(self.host, destiny_path)]) return except subprocess.CalledProcessError: raise Exception("Could't send file {0} to {1}:{2}".format(os.path.join(self.tmp_path, filename), diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 115abee24c066d6c969dbf4add980b85dfbe0323..9298d50579606fc4cd1235b19d3fd27b0ef00242 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -158,26 +158,50 @@ class SlurmPlatform(ParamikoPlatform): @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads): - return """\ - #!/usr/bin/env python - ############################################################################### - # {0} - ############################################################################### - # - #SBATCH -J {0} - {1} - #SBATCH -A {2} - #SBATCH --output={0}.out - #SBATCH --error={0}.err - #SBATCH -t {3}:00 - #SBATCH --cpus-per-task={7} - #SBATCH -n {4} - {5} - {6} - # - ############################################################################### - """.format(filename, queue, project, wallclock, num_procs, dependency, + def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="#!/usr/bin/env python"): + if method =='srun': + language = "#!/bin/bash" + return \ + language + """ +############################################################################### +# {0} +############################################################################### +# +#SBATCH -J {0} +{1} +#SBATCH -A {2} +#SBATCH --output={0}.out +#SBATCH --error={0}.err +#SBATCH -t {3}:00 +#SBATCH -n {4} +#SBATCH --cpus-per-task={7} +{5} +{6} +# +############################################################################### + """.format(filename, queue, project, wallclock, num_procs, dependency, + '\n'.ljust(13).join(str(s) for s in directives), threads) + else: + language = "#!/usr/bin/env python" + return \ + language+""" +############################################################################### +# {0} +############################################################################### +# +#SBATCH -J {0} +{1} +#SBATCH -A {2} +#SBATCH --output={0}.out +#SBATCH --error={0}.err +#SBATCH -t {3}:00 +#SBATCH --cpus-per-task={7} +#SBATCH -n {4} +{5} +{6} +# +############################################################################### + """.format(filename, queue, project, wallclock, num_procs, dependency, '\n'.ljust(13).join(str(s) for s in directives),threads) @staticmethod diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 401f796d79f3d756f4d7bddf1e2b981e921a84ae..00c2558709c2083115b2c5e1b878afb77f0b1f8f 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -33,9 +33,10 @@ class WrapperDirector: header = self._builder.build_header() job_thread = self._builder.build_job_thread() - - main = self._builder.build_main() - + if "bash" not in header[0:15]: + main = self._builder.build_main() + else: + nodes,main = self._builder.build_main() #What to do with nodes? # change to WrapperScript object wrapper_script = header + job_thread + main wrapper_script = wrapper_script.replace("_NEWLINE_", '\\n') @@ -66,6 +67,7 @@ class WrapperBuilder(object): def build_job_thread(self): pass + # hybrids def build_joblist_thread(self, **kwargs): pass @@ -128,7 +130,7 @@ class PythonWrapperBuilder(WrapperBuilder): import copy # Defining scripts to be run - scripts = {0} + scripts= {0} """).format(str(self.job_scripts), '\n'.ljust(13)) def build_job_thread(self): @@ -395,12 +397,14 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): return joblist_thread + nodes_list + threads_launcher + class BashWrapperBuilder(WrapperBuilder): def build_imports(self): return "" def build_main(self): + return textwrap.dedent(""" # Initializing variables scripts="{0}" @@ -464,3 +468,186 @@ class BashHorizontalWrapperBuilder(BashWrapperBuilder): def build_main(self): return super(BashHorizontalWrapperBuilder, self).build_main() + self.build_parallel_threads_launcher() + +#SRUN CLASES + +class SrunWrapperBuilder(WrapperBuilder): + + def build_imports(self): + scripts_bash = "(" + + for script in self.job_scripts: + scripts_bash+=str("\""+script+"\"")+" " + scripts_bash += ")" + return textwrap.dedent(""" + + # Defining scripts to be run + declare -a scripts={0} + """).format(str(scripts_bash), '\n'.ljust(13)) + + # hybrids + def build_joblist_thread(self): + pass + + def build_job_thread(self): + return textwrap.dedent(""" """) + # horizontal and hybrids + def build_nodes_list(self): + return self.get_nodes() + self.build_cores_list() + + def get_nodes(self): + + return textwrap.dedent(""" + # Getting the list of allocated nodes + {0} + os.system("mkdir -p machinefiles") + + with open('node_list', 'r') as file: + all_nodes = file.read() + + all_nodes = all_nodes.split("_NEWLINE_") + """).format(self.allocated_nodes, '\n'.ljust(13)) + + def build_cores_list(self): + return textwrap.dedent(""" + total_cores = {0} + jobs_resources = {1} + + processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) + + idx = 0 + all_cores = [] + while total_cores > 0: + if processors_per_node > 0: + processors_per_node -= 1 + total_cores -= 1 + all_cores.append(all_nodes[idx]) + else: + idx += 1 + processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) + + processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) + """).format(self.num_procs, str(self.jobs_resources), '\n'.ljust(13)) + + def build_machinefiles(self): + machinefile_function = self.get_machinefile_function() + if machinefile_function: + return self.get_machinefile_function() + self._indent(self.write_machinefiles(), self.machinefiles_indent) + return "" + + def build_machinefiles_standard(self): + return textwrap.dedent(""" + machines = str() + + cores = int(jobs_resources[section]['PROCESSORS']) + tasks = int(jobs_resources[section]['TASKS']) + nodes = int(ceil(int(cores)/float(tasks))) + if tasks < processors_per_node: + cores = tasks + + job_cores = cores + while nodes > 0: + while cores > 0: + if len(all_cores) > 0: + node = all_cores.pop(0) + if node: + machines += node +"_NEWLINE_" + cores -= 1 + for rest in range(processors_per_node-tasks): + if len(all_cores) > 0: + all_cores.pop(0) + nodes -= 1 + if tasks < processors_per_node: + cores = job_cores + """).format('\n'.ljust(13)) + + def _create_components_dict(self): + return textwrap.dedent(""" + xio_procs = int(jobs_resources[section]['COMPONENTS']['XIO_NUMPROC']) + rnf_procs = int(jobs_resources[section]['COMPONENTS']['RNF_NUMPROC']) + ifs_procs = int(jobs_resources[section]['COMPONENTS']['IFS_NUMPROC']) + nem_procs = int(jobs_resources[section]['COMPONENTS']['NEM_NUMPROC']) + + components = OrderedDict([ + ('XIO', xio_procs), + ('RNF', rnf_procs), + ('IFS', ifs_procs), + ('NEM', nem_procs) + ]) + + jobs_resources[section]['COMPONENTS'] = components + """).format('\n'.ljust(13)) + + def build_machinefiles_components(self): + return textwrap.dedent(""" + {0} + + machines = str() + for component, cores in jobs_resources[section]['COMPONENTS'].items(): + while cores > 0: + if len(all_cores) > 0: + node = all_cores.pop(0) + if node: + machines += node +"_NEWLINE_" + cores -= 1 + """).format(self._create_components_dict(), '\n'.ljust(13)) + + def write_machinefiles(self): + return textwrap.dedent(""" + machines = "_NEWLINE_".join([s for s in machines.split("_NEWLINE_") if s]) + with open("machinefiles/machinefile_"+{0}, "w") as machinefile: + machinefile.write(machines) + """).format(self.machinefiles_name, '\n'.ljust(13)) + + def build_srun_launcher(self, jobs_list, threads, footer=True): + srun_launcher = textwrap.dedent(""" + i=0 + suffix=".cmd" + for template in "${{{0}[@]}}"; do + jobname=${{template%"$suffix"}} + out="${{template}}.${{i}}.out" + err="${{template}}.${{i}}.err" + srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & + sleep "0.4" + ((i=i+1)) + done + wait + """).format(jobs_list, self.threads, '\n'.ljust(13)) + if footer: + srun_launcher += self._indent(textwrap.dedent(""" + for template in "${{{0}[@]}}"; do + suffix_completed=".COMPLETED" + completed_filename=${{template%"$suffix"}} + completed_filename="$completed_filename"_COMPLETED + completed_path=${{PWD}}/$completed_filename + if [ -f "$completed_path" ]; + then + echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED" + else + echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has FAILED" + fi + done + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)),0) + return srun_launcher + + + # all should override -> abstract! + def build_main(self): + pass + + def dependency_directive(self): + pass + + def queue_directive(self): + pass + + def _indent(self, text, amount, ch=' '): + padding = amount * ch + return ''.join(padding + line for line in text.splitlines(True)) + +class SrunHorizontalWrapperBuilder(SrunWrapperBuilder): + + def build_main(self): + nodelist = self.build_nodes_list() + srun_launcher = self.build_srun_launcher("scripts", "JobThread") + return nodelist, srun_launcher \ No newline at end of file diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index 549bd0dc255a654c51ba75acfedc938cd0e63de4..d7a0db1fd97f04fa00fdabc4d497c08cca7e7eec 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -19,7 +19,8 @@ from autosubmit.platforms.wrappers.wrapper_builder import WrapperDirector, PythonVerticalWrapperBuilder, \ PythonHorizontalWrapperBuilder, PythonHorizontalVerticalWrapperBuilder, PythonVerticalHorizontalWrapperBuilder, \ - BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder + BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder, SrunHorizontalWrapperBuilder +from autosubmit.config.config_common import AutosubmitConfig class WrapperFactory(object): @@ -75,7 +76,11 @@ class SlurmWrapperFactory(WrapperFactory): return PythonVerticalWrapperBuilder(**kwargs) def horizontal_wrapper(self, **kwargs): - return PythonHorizontalWrapperBuilder(**kwargs) + + if kwargs["method"] == 'srun': + return SrunHorizontalWrapperBuilder(**kwargs) + else: + return PythonHorizontalWrapperBuilder(**kwargs) def hybrid_wrapper_horizontal_vertical(self, **kwargs): return PythonHorizontalVerticalWrapperBuilder(**kwargs) @@ -86,7 +91,7 @@ class SlurmWrapperFactory(WrapperFactory): def header_directives(self, **kwargs): return self.platform.wrapper_header(kwargs['name'], kwargs['queue'], kwargs['project'], kwargs['wallclock'], - kwargs['num_processors'], kwargs['dependency'], kwargs['directives'],kwargs['threads']) + kwargs['num_processors'], kwargs['dependency'], kwargs['directives'],kwargs['threads'],kwargs['method']) def allocated_nodes(self): return self.platform.allocated_nodes() diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 705d32eeef88260d1a5e52e1dd32d44846e30e30..61967ae8b0b534d181047d6c4819ee3928e50f23 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -178,7 +178,7 @@ class TestJob(TestCase): update_content_mock.assert_called_with(config) open_mock.assert_called_with(os.path.join(self.job._tmp_path, self.job.name + '.cmd'), 'w') write_mock.write.assert_called_with('some-content: 999, 777, 666 % %') - chmod_mock.assert_called_with(os.path.join(self.job._tmp_path, self.job.name + '.cmd'), 0o775) + chmod_mock.assert_called_with(os.path.join(self.job._tmp_path, self.job.name + '.cmd'), 0o755) def test_that_check_script_returns_false_when_there_is_an_unbound_template_variable(self): # arrange diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index b0604c42d8612c77ad9be54e296f7114d971fe00..a9879483bad52fed3895e09a6514418a21c59d2c 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -161,6 +161,8 @@ class TestWrappers(TestCase): self.config.get_wrapper_crossdate = Mock(return_value=False) self.config.get_remote_dependencies = Mock(return_value=False) self.config.get_wrapper_jobs = Mock(return_value='None') + self.config.get_wrapper_method = Mock(return_value='ASThread') + self.job_packager = JobPackager(self.config, self.platform, self.job_list) ### ONE SECTION WRAPPER ###