diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 57306ef430f36382333164d0e62a6450fba418d7..2c3c4c7209a0ba930f547abe84c343942b41cd45 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -325,7 +325,7 @@ class JobPackager(object): total_wallclock = sum_str_hours(total_wallclock, job.wallclock) return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock, - jobs_resources=jobs_resources) + jobs_resources=jobs_resources,method=self.wrapper_method) class JobPackagerVertical(object): diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 1e8fe147d294c9698a16cab20a29a19da5868877..916c43f302e82b3ae6d30811c4e8aa6bfd590945 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -504,10 +504,11 @@ class JobPackageHybrid(JobPackageThread): Class to manage a hybrid (horizontal and vertical) thread-based package of jobs to be submitted by autosubmit """ - def __init__(self, jobs, num_processors, total_wallclock, dependency=None, jobs_resources=dict()): + def __init__(self, jobs, num_processors, total_wallclock, dependency=None, jobs_resources=dict(),method="ASThread"): all_jobs = [item for sublist in jobs for item in sublist] #flatten list - super(JobPackageHybrid, self).__init__(all_jobs, dependency, jobs_resources) + super(JobPackageHybrid, self).__init__(all_jobs, dependency, jobs_resources,method) self.jobs_lists = jobs + self.method=method self._num_processors = int(num_processors) self._threads = all_jobs[0].threads self._wallclock = total_wallclock diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 9298d50579606fc4cd1235b19d3fd27b0ef00242..aa97d3bd556610910ece82bea36b22e25925e11a 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -158,7 +158,7 @@ class SlurmPlatform(ParamikoPlatform): @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="#!/usr/bin/env python"): + def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="asthreads"): if method =='srun': language = "#!/bin/bash" return \ diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 00c2558709c2083115b2c5e1b878afb77f0b1f8f..5b49013b631774d398f9c067ce3478d8427b74a6 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -474,16 +474,7 @@ class BashHorizontalWrapperBuilder(BashWrapperBuilder): class SrunWrapperBuilder(WrapperBuilder): def build_imports(self): - scripts_bash = "(" - - for script in self.job_scripts: - scripts_bash+=str("\""+script+"\"")+" " - scripts_bash += ")" - return textwrap.dedent(""" - - # Defining scripts to be run - declare -a scripts={0} - """).format(str(scripts_bash), '\n'.ljust(13)) + pass # hybrids def build_joblist_thread(self): @@ -599,7 +590,36 @@ class SrunWrapperBuilder(WrapperBuilder): machinefile.write(machines) """).format(self.machinefiles_name, '\n'.ljust(13)) - def build_srun_launcher(self, jobs_list, threads, footer=True): + def build_srun_launcher(self, jobs_list, footer=True): + pass + + + # all should override -> abstract! + def build_main(self): + pass + + def dependency_directive(self): + pass + + def queue_directive(self): + pass + + def _indent(self, text, amount, ch=' '): + padding = amount * ch + return ''.join(padding + line for line in text.splitlines(True)) + +class SrunHorizontalWrapperBuilder(SrunWrapperBuilder): + def build_imports(self): + scripts_bash = "(" + for script in self.job_scripts: + scripts_bash+=str("\""+script+"\"")+" " + scripts_bash += ")" + return textwrap.dedent(""" + # Defining scripts to be run + declare -a scripts={0} + """).format(str(scripts_bash), '\n'.ljust(13)) + + def build_srun_launcher(self, jobs_list, footer=True): srun_launcher = textwrap.dedent(""" i=0 suffix=".cmd" @@ -608,7 +628,7 @@ class SrunWrapperBuilder(WrapperBuilder): out="${{template}}.${{i}}.out" err="${{template}}.${{i}}.err" srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & - sleep "0.4" + sleep "0.2" ((i=i+1)) done wait @@ -630,24 +650,90 @@ class SrunWrapperBuilder(WrapperBuilder): """).format(jobs_list, self.exit_thread, '\n'.ljust(13)),0) return srun_launcher - - # all should override -> abstract! def build_main(self): - pass - - def dependency_directive(self): - pass - - def queue_directive(self): - pass + nodelist = self.build_nodes_list() + srun_launcher = self.build_srun_launcher("scripts") + return nodelist, srun_launcher - def _indent(self, text, amount, ch=' '): - padding = amount * ch - return ''.join(padding + line for line in text.splitlines(True)) +class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): + def build_imports(self): + scripts_bash = textwrap.dedent(""" + # Defining scripts to be run""") + list_index=0 + scripts_array_vars = "( " + scripts_array_index = "( " + for scripts in self.job_scripts: + built_array = "(" + for script in scripts: + built_array+= str("\"" + script + "\"") + " " + built_array += ")" + scripts_bash+=textwrap.dedent(""" + declare -a scripts_{0}={1} + """).format(str(list_index),str(built_array), '\n'.ljust(13)) + scripts_array_vars += "\"scripts_{0}\" ".format(list_index) + scripts_array_index += "\"0\" ".format(list_index) + list_index += 1 + scripts_array_vars += ")" + scripts_array_index += ")" + scripts_bash += textwrap.dedent(""" + declare -a scripts_list={0} + declare -a scripts_index={1} + """).format(str(scripts_array_vars),str(scripts_array_index), '\n'.ljust(13)) + return scripts_bash + + def build_srun_launcher(self, jobs_list, footer=True): + srun_launcher = textwrap.dedent(""" + suffix=".cmd" + suffix_completed=".COMPLETED" + aux_scripts=("${{{0}[@]}}") + while [ "${{#aux_scripts[@]}}" -gt 0 ]; do + i_list=0 + prev_completed_path="" + for script_list in "${{{0}[@]}}"; do + declare -i i=${{scripts_index[$i_list]}} + if [ $i -ne -1 ]; then + declare -n scripts=$script_list + template=${{scripts[$i]}} + jobname=${{template%"$suffix"}} + out="${{template}}.${{i}}.out" + err="${{template}}.${{i}}.err" + if [ $i -eq 0 ]; then + completed_filename=${{template%"$suffix"}} + prev_template=$template + else + prev_template_index=$((i-1)) + prev_template=${{scripts[$prev_template_index]}} + completed_filename=${{prev_template%"$suffix"}} + fi + completed_filename="$completed_filename"_COMPLETED + completed_path=${{PWD}}/$completed_filename + + if [ -f "$completed_path" ]; + then + echo "`date '+%d/%m/%Y_%H:%M:%S'` $prev_template has been COMPLETED" + if [ $i -ge "${{#scripts[@]}}" ]; then + unset aux_scripts[$i_list] + i="-1" + fi + fi + if [ $i -lt "${{#scripts[@]}}" ]; then + if [ $i -eq 0 ] || [ -f "$completed_path" ] ; then + srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & + ((i=i+1)) + fi + fi + sleep "0.2" + scripts_index[$i_list]=$i + ((i_list=i_list+1)) + fi + done + done + wait + """).format(jobs_list, self.threads, '\n'.ljust(13)) -class SrunHorizontalWrapperBuilder(SrunWrapperBuilder): + return srun_launcher def build_main(self): nodelist = self.build_nodes_list() - srun_launcher = self.build_srun_launcher("scripts", "JobThread") + srun_launcher = self.build_srun_launcher("scripts_list") return nodelist, srun_launcher \ No newline at end of file diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index a4cee9c09ef4290ea4af4db5523ee00fbdef6c1e..9ba3ec8c893f36975c61078d3669bcd17f83f9c6 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -19,7 +19,7 @@ from autosubmit.platforms.wrappers.wrapper_builder import WrapperDirector, PythonVerticalWrapperBuilder, \ PythonHorizontalWrapperBuilder, PythonHorizontalVerticalWrapperBuilder, PythonVerticalHorizontalWrapperBuilder, \ - BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder, SrunHorizontalWrapperBuilder + BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder, SrunHorizontalWrapperBuilder,SrunVerticalHorizontalWrapperBuilder from autosubmit.config.config_common import AutosubmitConfig @@ -86,7 +86,10 @@ class SlurmWrapperFactory(WrapperFactory): return PythonHorizontalVerticalWrapperBuilder(**kwargs) def hybrid_wrapper_vertical_horizontal(self, **kwargs): - return PythonVerticalHorizontalWrapperBuilder(**kwargs) + if kwargs["method"] == 'srun': + return SrunVerticalHorizontalWrapperBuilder(**kwargs) + else: + return PythonVerticalHorizontalWrapperBuilder(**kwargs) def header_directives(self, **kwargs): return self.platform.wrapper_header(kwargs['name'], kwargs['queue'], kwargs['project'], kwargs['wallclock'],