From 263f7279934c6d9964c703bfd40317c25a7bdfd8 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 5 Feb 2021 13:34:05 +0100 Subject: [PATCH] Implemented #638. Added configuration setting WCHUNKINC to jobs_%expid%.conf. WALLCLOCK time is increased according to this setting multiplied by the number of chunks - 1. --- autosubmit/config/config_common.py | 14 +++++- autosubmit/config/files/jobs.conf | 2 + autosubmit/job/job.py | 69 +++++++++++++++++++----------- autosubmit/job/job_common.py | 37 ++++++++++++++++ 4 files changed, 94 insertions(+), 28 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 91150734f..e7f3613ef 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -188,6 +188,16 @@ class AutosubmitConfig(object): """ return self._jobs_parser.get_option(section, 'WALLCLOCK', '') + def get_wchunkinc(self, section): + """ + Gets the chunk increase to wallclock + :param section: job type + :type section: str + :return: wallclock increase per chunk + :rtype: str + """ + return self._jobs_parser.get_option(section, 'WCHUNKINC', '') + def get_synchronize(self, section): """ Gets wallclock for the given job type @@ -460,7 +470,7 @@ class AutosubmitConfig(object): try: self.reload() - except (AutosubmitCritical,AutosubmitError) as e: + except (AutosubmitCritical, AutosubmitError) as e: raise except BaseException as e: raise @@ -552,7 +562,7 @@ class AutosubmitConfig(object): self.wrong_config["Platform"] += [["Global", "There are repeated platforms"]] main_platform_found = False - if self.hpcarch in ["local","LOCAL"]: + if self.hpcarch in ["local", "LOCAL"]: main_platform_found = True elif self.ignore_undefined_platforms: main_platform_found = True diff --git a/autosubmit/config/files/jobs.conf b/autosubmit/config/files/jobs.conf index b42c36f19..e57b8c819 100644 --- a/autosubmit/config/files/jobs.conf +++ b/autosubmit/config/files/jobs.conf @@ -29,6 +29,8 @@ # RERUN_DEPENDENCIES = RERUN INI LOCAL_SETUP REMOTE_SETUP TRANSFER ## Wallclock to be submitted to the HPC queue in format HH:MM. If not specified, defaults to empty. # WALLCLOCK = 00:05 +## Wallclock chunk increase (WALLCLOCK will be increased according to the formula WALLCLOCK + WCHUNKINC * chunk) +# WCHUNKINC = 00:01 ## Processors number to be submitted to the HPC. If not specified, defaults to 1. # PROCESSORS = 1 ## Threads number to be submitted to the HPC. If not specified, defaults to 1. diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 1cad4bfa7..66693314b 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -33,7 +33,7 @@ import copy from bscearth.utils.config_parser import ConfigParserFactory from autosubmit.config.config_common import AutosubmitConfig -from autosubmit.job.job_common import Status, Type +from autosubmit.job.job_common import Status, Type, increase_wallclock_by_chunk from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty from autosubmit.config.basicConfig import BasicConfig @@ -46,10 +46,12 @@ from log.log import Log, AutosubmitCritical, AutosubmitError Log.get_logger("Autosubmit") # A wrapper for encapsulate threads , TODO: Python 3+ to be replaced by the < from concurrent.futures > + + def threaded(fn): def wrapper(*args, **kwargs): thread = Thread(target=fn, args=args, kwargs=kwargs) - thread.name = "JOB_"+str(args[0].name) + thread.name = "JOB_" + str(args[0].name) thread.start() return thread return wrapper @@ -83,6 +85,7 @@ class Job(object): self.platform_name = None self.section = None self.wallclock = None + self.wchunkinc = None self.tasks = '0' self.threads = '1' self.processors = '1' @@ -110,7 +113,7 @@ class Job(object): self.executable = None self._local_logs = ('', '') self._remote_logs = ('', '') - self.script_name = self.name+".cmd" + self.script_name = self.name + ".cmd" self.status = status self.prev_status = status self.old_status = self.status @@ -541,11 +544,13 @@ class Job(object): try: while (not out_exist and not err_exist) and i < retries: try: - out_exist = self._platform.check_file_exists(remote_logs[0],True) + out_exist = self._platform.check_file_exists( + remote_logs[0], True) except IOError as e: out_exist = False try: - err_exist = self._platform.check_file_exists(remote_logs[1],True) + err_exist = self._platform.check_file_exists( + remote_logs[1], True) except IOError as e: err_exists = False if not out_exist or not err_exist: @@ -556,7 +561,7 @@ class Job(object): self._platform.restore_connection() except: Log.printlog("{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format( - e.message, self.name), 6001) + e.message, self.name), 6001) if i >= retries: if not out_exist or not err_exist: Log.printlog("Failed to retrieve log files {1} and {2} e=6001".format( @@ -565,7 +570,8 @@ class Job(object): if copy_remote_logs: # unifying names for log files if remote_logs != local_logs: - self.synchronize_logs(self._platform, remote_logs, local_logs) + self.synchronize_logs( + self._platform, remote_logs, local_logs) remote_logs = copy.deepcopy(local_logs) self._platform.get_logs_files(self.expid, remote_logs) # Update the logs with Autosubmit Job Id Brand @@ -715,6 +721,7 @@ class Job(object): Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format( self.name), 6009) self.status = default_status + def update_parameters(self, as_conf, parameters, default_parameters={'d': '%d%', 'd_': '%d_%', 'Y': '%Y%', 'Y_': '%Y_%', 'M': '%M%', 'M_': '%M_%', 'm': '%m%', 'm_': '%m_%'}): @@ -728,7 +735,7 @@ class Job(object): :param parameters: :type parameters: dict """ - + chunk = 1 parameters = parameters.copy() parameters.update(default_parameters) parameters['JOBNAME'] = self.name @@ -805,8 +812,13 @@ class Job(object): self.memory = as_conf.get_memory(self.section) self.memory_per_task = as_conf.get_memory_per_task(self.section) self.wallclock = as_conf.get_wallclock(self.section) + self.wchunkinc = as_conf.get_wchunkinc(self.section) + # Increasing according to chunk + self.wallclock = increase_wallclock_by_chunk( + self.wallclock, self.wchunkinc, chunk) if self.wallclock == '': - Log.debug("Wallclock for {0} is not defined! , setting it to 02:00".format(self.name)) + Log.debug( + "Wallclock for {0} is not defined! , setting it to 02:00".format(self.name)) self.wallclock = '02:00' self.scratch_free_space = as_conf.get_scratch_free_space(self.section) if self.scratch_free_space == 0: @@ -915,7 +927,8 @@ class Job(object): def _get_paramiko_template(self, snippet, template): current_platform = self._platform return ''.join([ - snippet.as_header(current_platform.get_header(self), self.executable), + snippet.as_header( + current_platform.get_header(self), self.executable), template, snippet.as_tailer() ]) @@ -1051,7 +1064,8 @@ class Job(object): start_time = time.time() timestamp = date2str(datetime.datetime.now(), 'S') - self.local_logs = (self.name + "." + timestamp + ".out", self.name + "." + timestamp + ".err") + self.local_logs = (self.name + "." + timestamp + + ".out", self.name + "." + timestamp + ".err") path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') @@ -1365,17 +1379,17 @@ class WrapperJob(Job): remote_log_dir = self._platform.get_remote_log_dir() # PREPARE SCRIPT TO SEND command = textwrap.dedent(""" -cd {1} -for job in {0} -do - if [ -f "${{job}}_STAT" ] - then - echo ${{job}} $(head ${{job}}_STAT) - else - echo ${{job}} - fi -done -""").format(str(not_finished_jobs_names), str(remote_log_dir), '\n'.ljust(13)) + cd {1} + for job in {0} + do + if [ -f "${{job}}_STAT" ] + then + echo ${{job}} $(head ${{job}}_STAT) + else + echo ${{job}} + fi + done + """).format(str(not_finished_jobs_names), str(remote_log_dir), '\n'.ljust(13)) log_dir = os.path.join( self._tmp_path, 'LOG_{0}'.format(self.expid)) @@ -1415,10 +1429,12 @@ done self.as_config.get_copy_remote_logs() == 'true') if len(out) == 2: Log.info("Job {0} is RUNNING".format(jobname)) - over_wallclock = self._check_inner_job_wallclock(job) # messaged included + over_wallclock = self._check_inner_job_wallclock( + job) # messaged included if over_wallclock: job.status = Status.FAILED - Log.printlog("Job {0} is FAILED".format(jobname),6009) + Log.printlog( + "Job {0} is FAILED".format(jobname), 6009) elif len(out) == 3: end_time = self._check_time(out, 2) self._check_finished_job(job) @@ -1428,8 +1444,9 @@ done sleep(wait) retries = retries - 1 temp_list = self.inner_jobs_running - self.inner_jobs_running = [job for job in temp_list if job.status == Status.RUNNING] - if retries == 0: # or over_wallclock: + self.inner_jobs_running = [ + job for job in temp_list if job.status == Status.RUNNING] + if retries == 0: # or over_wallclock: self.status = Status.FAILED def _check_finished_job(self, job, failed_file=False): diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 73b6d7889..129e73822 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import textwrap +import datetime class Status: @@ -279,3 +280,39 @@ def parse_output_number(string_number): number = 0.0 pass return number + + +def increase_wallclock_by_chunk(current, increase, chunk): + """ + Receives the wallclock times an increases it according to a quantity times the number of the current chunk. + The result cannot be larger than 48:00. + If Chunk = 0 then no increment. + + :param current: WALLCLOCK HH:MM + :type current: str + :param increase: WCHUNKINC HH:MM + :type increase: str + :param chunk: chunk number + :type chunk: int + :return: HH:MM wallclock + :rtype: str + """ + try: + if current and increase and chunk and chunk > 0: + wallclock = current.split(":") + increase = increase.split(":") + current_time = datetime.timedelta( + hours=int(wallclock[0]), minutes=int(wallclock[1])) + increase_time = datetime.timedelta( + hours=int(increase[0]), minutes=int(increase[1])) * (chunk - 1) + final_time = current_time + increase_time + hours = int(final_time.total_seconds() // 3600) + minutes = int((final_time.total_seconds() // 60) - (hours * 60)) + if hours > 48 or (hours >= 48 and minutes > 0): + hours = 48 + minutes = 0 + return "%02d:%02d" % (hours, minutes) + return current + except Exception as exp: + # print(exp) + return current -- GitLab