From 923934b7b6ad376feea3bafa000e3ffd6c81debc Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 21 Feb 2020 12:21:36 +0100 Subject: [PATCH 1/5] Vertical-horizontal srun --- autosubmit/job/job_packager.py | 2 +- autosubmit/job/job_packages.py | 5 +- autosubmit/platforms/slurmplatform.py | 2 +- .../platforms/wrappers/wrapper_builder.py | 103 +++++++++++++----- .../platforms/wrappers/wrapper_factory.py | 7 +- 5 files changed, 86 insertions(+), 33 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 57306ef43..2c3c4c720 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -325,7 +325,7 @@ class JobPackager(object): total_wallclock = sum_str_hours(total_wallclock, job.wallclock) return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock, - jobs_resources=jobs_resources) + jobs_resources=jobs_resources,method=self.wrapper_method) class JobPackagerVertical(object): diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 1e8fe147d..916c43f30 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -504,10 +504,11 @@ class JobPackageHybrid(JobPackageThread): Class to manage a hybrid (horizontal and vertical) thread-based package of jobs to be submitted by autosubmit """ - def __init__(self, jobs, num_processors, total_wallclock, dependency=None, jobs_resources=dict()): + def __init__(self, jobs, num_processors, total_wallclock, dependency=None, jobs_resources=dict(),method="ASThread"): all_jobs = [item for sublist in jobs for item in sublist] #flatten list - super(JobPackageHybrid, self).__init__(all_jobs, dependency, jobs_resources) + super(JobPackageHybrid, self).__init__(all_jobs, dependency, jobs_resources,method) self.jobs_lists = jobs + self.method=method self._num_processors = int(num_processors) self._threads = all_jobs[0].threads self._wallclock = total_wallclock diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 9298d5057..aa97d3bd5 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -158,7 +158,7 @@ class SlurmPlatform(ParamikoPlatform): @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="#!/usr/bin/env python"): + def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="asthreads"): if method =='srun': language = "#!/bin/bash" return \ diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 00c255870..20c9f460e 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -474,16 +474,7 @@ class BashHorizontalWrapperBuilder(BashWrapperBuilder): class SrunWrapperBuilder(WrapperBuilder): def build_imports(self): - scripts_bash = "(" - - for script in self.job_scripts: - scripts_bash+=str("\""+script+"\"")+" " - scripts_bash += ")" - return textwrap.dedent(""" - - # Defining scripts to be run - declare -a scripts={0} - """).format(str(scripts_bash), '\n'.ljust(13)) + pass # hybrids def build_joblist_thread(self): @@ -599,7 +590,36 @@ class SrunWrapperBuilder(WrapperBuilder): machinefile.write(machines) """).format(self.machinefiles_name, '\n'.ljust(13)) - def build_srun_launcher(self, jobs_list, threads, footer=True): + def build_srun_launcher(self, jobs_list, footer=True): + pass + + + # all should override -> abstract! + def build_main(self): + pass + + def dependency_directive(self): + pass + + def queue_directive(self): + pass + + def _indent(self, text, amount, ch=' '): + padding = amount * ch + return ''.join(padding + line for line in text.splitlines(True)) + +class SrunHorizontalWrapperBuilder(SrunWrapperBuilder): + def build_imports(self): + scripts_bash = "(" + for script in self.job_scripts: + scripts_bash+=str("\""+script+"\"")+" " + scripts_bash += ")" + return textwrap.dedent(""" + # Defining scripts to be run + declare -a scripts={0} + """).format(str(scripts_bash), '\n'.ljust(13)) + + def build_srun_launcher(self, jobs_list, footer=True): srun_launcher = textwrap.dedent(""" i=0 suffix=".cmd" @@ -608,7 +628,7 @@ class SrunWrapperBuilder(WrapperBuilder): out="${{template}}.${{i}}.out" err="${{template}}.${{i}}.err" srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & - sleep "0.4" + sleep "0.2" ((i=i+1)) done wait @@ -630,24 +650,53 @@ class SrunWrapperBuilder(WrapperBuilder): """).format(jobs_list, self.exit_thread, '\n'.ljust(13)),0) return srun_launcher - - # all should override -> abstract! def build_main(self): - pass - - def dependency_directive(self): - pass - - def queue_directive(self): - pass - - def _indent(self, text, amount, ch=' '): - padding = amount * ch - return ''.join(padding + line for line in text.splitlines(True)) + nodelist = self.build_nodes_list() + srun_launcher = self.build_srun_launcher("scripts") + return nodelist, srun_launcher -class SrunHorizontalWrapperBuilder(SrunWrapperBuilder): +class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): + def build_imports(self): + scripts_bash = "(" + for script in self.job_scripts: + scripts_bash+=str("\""+script+"\"")+" " + scripts_bash += ")" + return textwrap.dedent(""" + # Defining scripts to be run + declare -a scripts={0} + """).format(str(scripts_bash), '\n'.ljust(13)) + def build_srun_launcher(self, jobs_list, footer=True): + srun_launcher = textwrap.dedent(""" + i=0 + suffix=".cmd" + for template in "${{{0}[@]}}"; do + jobname=${{template%"$suffix"}} + out="${{template}}.${{i}}.out" + err="${{template}}.${{i}}.err" + srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & + sleep "0.2" + ((i=i+1)) + done + wait + """).format(jobs_list, self.threads, '\n'.ljust(13)) + if footer: + srun_launcher += self._indent(textwrap.dedent(""" + for template in "${{{0}[@]}}"; do + suffix_completed=".COMPLETED" + completed_filename=${{template%"$suffix"}} + completed_filename="$completed_filename"_COMPLETED + completed_path=${{PWD}}/$completed_filename + if [ -f "$completed_path" ]; + then + echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED" + else + echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has FAILED" + fi + done + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 0) + return srun_launcher def build_main(self): nodelist = self.build_nodes_list() - srun_launcher = self.build_srun_launcher("scripts", "JobThread") + srun_launcher = self.build_srun_launcher("scripts") return nodelist, srun_launcher \ No newline at end of file diff --git a/autosubmit/platforms/wrappers/wrapper_factory.py b/autosubmit/platforms/wrappers/wrapper_factory.py index a4cee9c09..9ba3ec8c8 100644 --- a/autosubmit/platforms/wrappers/wrapper_factory.py +++ b/autosubmit/platforms/wrappers/wrapper_factory.py @@ -19,7 +19,7 @@ from autosubmit.platforms.wrappers.wrapper_builder import WrapperDirector, PythonVerticalWrapperBuilder, \ PythonHorizontalWrapperBuilder, PythonHorizontalVerticalWrapperBuilder, PythonVerticalHorizontalWrapperBuilder, \ - BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder, SrunHorizontalWrapperBuilder + BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder, SrunHorizontalWrapperBuilder,SrunVerticalHorizontalWrapperBuilder from autosubmit.config.config_common import AutosubmitConfig @@ -86,7 +86,10 @@ class SlurmWrapperFactory(WrapperFactory): return PythonHorizontalVerticalWrapperBuilder(**kwargs) def hybrid_wrapper_vertical_horizontal(self, **kwargs): - return PythonVerticalHorizontalWrapperBuilder(**kwargs) + if kwargs["method"] == 'srun': + return SrunVerticalHorizontalWrapperBuilder(**kwargs) + else: + return PythonVerticalHorizontalWrapperBuilder(**kwargs) def header_directives(self, **kwargs): return self.platform.wrapper_header(kwargs['name'], kwargs['queue'], kwargs['project'], kwargs['wallclock'], -- GitLab From 6595cccc71723846dcdbf6cc38da4e52dfee2249 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 21 Feb 2020 16:47:22 +0100 Subject: [PATCH 2/5] if rever needed --- .../platforms/wrappers/wrapper_builder.py | 83 ++++++++++++------- 1 file changed, 54 insertions(+), 29 deletions(-) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 20c9f460e..7da221175 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -657,46 +657,71 @@ class SrunHorizontalWrapperBuilder(SrunWrapperBuilder): class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): def build_imports(self): - scripts_bash = "(" - for script in self.job_scripts: - scripts_bash+=str("\""+script+"\"")+" " - scripts_bash += ")" - return textwrap.dedent(""" - # Defining scripts to be run - declare -a scripts={0} - """).format(str(scripts_bash), '\n'.ljust(13)) + scripts_bash = textwrap.dedent(""" + # Defining scripts to be run""") + list_index=0 + scripts_array_vars = "( " + for scripts in self.job_scripts: + built_array = "(" + for script in scripts: + built_array+= str("\"" + script + "\"") + " " + built_array += ")" + scripts_bash+=textwrap.dedent(""" + declare -a scripts_{0}={1} + """).format(str(list_index),str(built_array), '\n'.ljust(13)) + scripts_array_vars += "\"scripts_{0}\" ".format(list_index) + list_index += 1 + scripts_array_vars += ")" + scripts_bash += textwrap.dedent(""" + declare -a scripts_list={0} + """).format(str(scripts_array_vars), '\n'.ljust(13)) + return scripts_bash + def build_srun_launcher(self, jobs_list, footer=True): srun_launcher = textwrap.dedent(""" - i=0 suffix=".cmd" - for template in "${{{0}[@]}}"; do - jobname=${{template%"$suffix"}} - out="${{template}}.${{i}}.out" - err="${{template}}.${{i}}.err" - srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & - sleep "0.2" - ((i=i+1)) + while (( ${{0}[@]} )); do + array_index=0 + for script_list in "${{{0}[@]}}"; do + declare -n scripts=$script_list + i=0 + for template in "${{scripts[@]}}"; do + jobname=${{template%"$suffix"}} + out="${{template}}.${{i}}.out" + err="${{template}}.${{i}}.err" + srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & + sleep "0.2" + ((i=i+1)) + done + wait + done done - wait """).format(jobs_list, self.threads, '\n'.ljust(13)) if footer: srun_launcher += self._indent(textwrap.dedent(""" - for template in "${{{0}[@]}}"; do - suffix_completed=".COMPLETED" - completed_filename=${{template%"$suffix"}} - completed_filename="$completed_filename"_COMPLETED - completed_path=${{PWD}}/$completed_filename - if [ -f "$completed_path" ]; - then - echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED" - else - echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has FAILED" - fi + + + + for script_list in "${{{0}[@]}}"; do + declare -n scripts=$script_list + + for template in "${{scripts[@]}}"; do + suffix_completed=".COMPLETED" + completed_filename=${{template%"$suffix"}} + completed_filename="$completed_filename"_COMPLETED + completed_path=${{PWD}}/$completed_filename + if [ -f "$completed_path" ]; + then + echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED" + else + echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has FAILED" + fi + done done """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 0) return srun_launcher def build_main(self): nodelist = self.build_nodes_list() - srun_launcher = self.build_srun_launcher("scripts") + srun_launcher = self.build_srun_launcher("scripts_list") return nodelist, srun_launcher \ No newline at end of file -- GitLab From 2668b6f1e1fdcbfc3beb010c4ae153b5dc2a9e9c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 24 Feb 2020 15:12:00 +0100 Subject: [PATCH 3/5] Added vertical-horizontal to method srun --- .../platforms/wrappers/wrapper_builder.py | 73 +++++++++++-------- 1 file changed, 42 insertions(+), 31 deletions(-) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 7da221175..c3c38ce9d 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -661,6 +661,7 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): # Defining scripts to be run""") list_index=0 scripts_array_vars = "( " + scripts_array_index = "( " for scripts in self.job_scripts: built_array = "(" for script in scripts: @@ -670,55 +671,65 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): declare -a scripts_{0}={1} """).format(str(list_index),str(built_array), '\n'.ljust(13)) scripts_array_vars += "\"scripts_{0}\" ".format(list_index) + scripts_array_index += "\"0\" ".format(list_index) list_index += 1 scripts_array_vars += ")" + scripts_array_index += ")" scripts_bash += textwrap.dedent(""" declare -a scripts_list={0} - """).format(str(scripts_array_vars), '\n'.ljust(13)) + declare -a scripts_index={1} + """).format(str(scripts_array_vars),str(scripts_array_index), '\n'.ljust(13)) return scripts_bash def build_srun_launcher(self, jobs_list, footer=True): srun_launcher = textwrap.dedent(""" suffix=".cmd" - while (( ${{0}[@]} )); do - array_index=0 + suffix_completed=".COMPLETED" + aux_scripts=("${{{0}[@]}}") + while [ "${{#aux_scripts[@]}}" -gt 0 ]; do + i_list=0 + prev_completed_path="" for script_list in "${{{0}[@]}}"; do - declare -n scripts=$script_list - i=0 - for template in "${{scripts[@]}}"; do + declare -i i=${{scripts_index[$i_list]}} + if [ $i -ge 0 ]; then + declare -n scripts=$script_list + template=${{scripts[$i]}} + prev_template_index=$((i-1)) + prev_template=${{scripts[$prev_template_index]}} jobname=${{template%"$suffix"}} out="${{template}}.${{i}}.out" err="${{template}}.${{i}}.err" - srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & + if [ $i -eq 0 ]; then + completed_filename=${{template%"$suffix"}} + else + completed_filename=${{prev_template%"$suffix"}} + fi + completed_filename="$completed_filename"_COMPLETED + completed_path=${{PWD}}/$completed_filename + + if [ -f "$completed_path" ]; + then + echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED" + if [ $i -ge "${{#scripts[@]}}" ]; then + unset aux_scripts[$i_list] + $i=-1 + fi + fi + if [ $i -lt "${{#scripts[@]}}" ]; then + if [ $i -eq 0 ] || [ -f "$completed_path" ] ; then + srun --ntasks=1 --cpus-per-task={1} $template > $out 2> $err & + ((i=i+1)) + fi + fi sleep "0.2" - ((i=i+1)) - done - wait + scripts_index[$i_list]=$i + ((i_list=i_list+1)) + fi done done + wait """).format(jobs_list, self.threads, '\n'.ljust(13)) - if footer: - srun_launcher += self._indent(textwrap.dedent(""" - - - for script_list in "${{{0}[@]}}"; do - declare -n scripts=$script_list - - for template in "${{scripts[@]}}"; do - suffix_completed=".COMPLETED" - completed_filename=${{template%"$suffix"}} - completed_filename="$completed_filename"_COMPLETED - completed_path=${{PWD}}/$completed_filename - if [ -f "$completed_path" ]; - then - echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED" - else - echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has FAILED" - fi - done - done - """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 0) return srun_launcher def build_main(self): -- GitLab From 8a84d460988c8ac633d335fb942777f7b6ed7bb9 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 24 Feb 2020 15:43:21 +0100 Subject: [PATCH 4/5] Added vertical-horizontal to method srun (2) --- autosubmit/platforms/wrappers/wrapper_builder.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index c3c38ce9d..2580bc589 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -691,7 +691,7 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): prev_completed_path="" for script_list in "${{{0}[@]}}"; do declare -i i=${{scripts_index[$i_list]}} - if [ $i -ge 0 ]; then + if [ $i -ne -1 ]; then declare -n scripts=$script_list template=${{scripts[$i]}} prev_template_index=$((i-1)) @@ -701,6 +701,7 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): err="${{template}}.${{i}}.err" if [ $i -eq 0 ]; then completed_filename=${{template%"$suffix"}} + prev_template=$template else completed_filename=${{prev_template%"$suffix"}} fi @@ -709,10 +710,10 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): if [ -f "$completed_path" ]; then - echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED" + echo "`date '+%d/%m/%Y_%H:%M:%S'` $prev_template has been COMPLETED" if [ $i -ge "${{#scripts[@]}}" ]; then unset aux_scripts[$i_list] - $i=-1 + i="-1" fi fi if [ $i -lt "${{#scripts[@]}}" ]; then -- GitLab From 4573c617aea336bf55d64697519a459e9e89b3ed Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 24 Feb 2020 15:47:08 +0100 Subject: [PATCH 5/5] Added vertical-horizontal to method srun (3) --- autosubmit/platforms/wrappers/wrapper_builder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 2580bc589..5b49013b6 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -694,8 +694,6 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): if [ $i -ne -1 ]; then declare -n scripts=$script_list template=${{scripts[$i]}} - prev_template_index=$((i-1)) - prev_template=${{scripts[$prev_template_index]}} jobname=${{template%"$suffix"}} out="${{template}}.${{i}}.out" err="${{template}}.${{i}}.err" @@ -703,6 +701,8 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): completed_filename=${{template%"$suffix"}} prev_template=$template else + prev_template_index=$((i-1)) + prev_template=${{scripts[$prev_template_index]}} completed_filename=${{prev_template%"$suffix"}} fi completed_filename="$completed_filename"_COMPLETED -- GitLab