diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index c95e0ce790b127f83bad109dddbab5a231ae510c..158f55744f884958f372586cb3a7b84f9da2fa26 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -87,6 +87,8 @@ from job.job_exceptions import WrongTemplateException from job.job_packager import JobPackager from sets import Set + + # noinspection PyUnusedLocal def signal_handler(signal_received, frame): """ @@ -1762,7 +1764,6 @@ class Autosubmit: job.platform.get_logs_files(expid, job.remote_logs) except: pass - #Log.warning("Unable to retrieve the log file of {0} in platform {1}",job.name,job.platform.name) elif job.status != Status.SUSPENDED: job.status = Status.WAITING job.fail_count = 0 diff --git a/autosubmit/git/autosubmit_git.py b/autosubmit/git/autosubmit_git.py index 136b91c2a987be2d1539d467804273bf17655ee4..ceabd6ef844b77a310fbe10112ba70bcc1cbd859 100644 --- a/autosubmit/git/autosubmit_git.py +++ b/autosubmit/git/autosubmit_git.py @@ -155,10 +155,10 @@ class AutosubmitGit: if git_remote_project_path != '': if git_remote_project_path[-1] == '/': - git_remote_path=git_remote_project_path[:-1]+project_path + git_remote_path=os.path.join(git_remote_project_path[:-1], as_conf.expid, BasicConfig.LOCAL_PROJ_DIR) else: - git_remote_project_path+=project_path - project_path=git_remote_project_path + git_remote_path=os.path.join(git_remote_project_path, as_conf.expid, BasicConfig.LOCAL_PROJ_DIR) + project_path=git_remote_path if git_project_commit: Log.info("Fetching {0} into {1}", git_project_commit + " " + git_project_origin, project_path) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index b8221332fcfb4892d474cca8ebcac46ce4c2c185..1c5327fe64d0d1daacd38c071b2f6eac10c4986f 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -27,13 +27,23 @@ import json import datetime import textwrap from collections import OrderedDict - +import copy from autosubmit.job.job_common import Status, Type from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty from autosubmit.config.basicConfig import BasicConfig from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates from time import sleep +from threading import Thread + + +def threaded(fn): + def wrapper(*args, **kwargs): + thread = Thread(target=fn, args=args, kwargs=kwargs) + thread.start() + return thread + return wrapper + class Job(object): """ @@ -88,6 +98,7 @@ class Job(object): self.file = None self._local_logs = ('', '') self._remote_logs = ('', '') + self.retrieving_log = False self.status = status self.old_status = self.status self.new_status=status @@ -481,6 +492,39 @@ class Job(object): retrials_list.insert(0, retrial_dates) return retrials_list + @threaded + def retrieve_logfiles(self,copy_remote_logs,platform): + while self.retrieving_log: + pass + sleep(2) + self.retrieving_log = True + job = copy.deepcopy(self) + job.platform = copy.copy(platform) + job.platform._ssh = None + job.platform._ftpChannel = None + job.platform.connect() + out_exist = False + err_exist = False + retries = 2 + sleeptime = 5 + i = 0 + while not out_exist and not err_exist and i < retries: + out_exist = job.platform.check_file_exists(job.remote_logs[0]) # will do 5 retries + err_exist = job.platform.check_file_exists(job.remote_logs[1]) # will do 5 retries + if not out_exist or not err_exist: + sleeptime = sleeptime + 5 + i = i + 1 + sleep(sleeptime) + if out_exist and err_exist: + if copy_remote_logs: + if job.local_logs != job.remote_logs: + job.synchronize_logs() # unifying names for log files + self.remote_logs = job.remote_logs + job.platform.get_logs_files(job.expid, job.remote_logs) + # Update the logs with Autosubmit Job Id Brand + for local_log in job.local_logs: + job.platform.write_jobid(job.id,os.path.join(job._tmp_path, 'LOG_' + str(job.expid), local_log)) + self.retrieving_log = False def update_status(self, copy_remote_logs=False): """ Updates job status, checking COMPLETED file if needed @@ -533,14 +577,8 @@ class Job(object): # Updating logs if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: self.write_end_time(self.status == Status.COMPLETED) - if self.local_logs != self.remote_logs: - self.synchronize_logs() # unifying names for log files - if copy_remote_logs: - self.platform.get_logs_files(self.expid, self.remote_logs) - # Update the logs with Autosubmit Job Id Brand - for local_log in self.local_logs: - self.platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) - + #New thread, check if file exist + self.retrieve_logfiles(copy_remote_logs,self.platform) return self.status def update_children_status(self): @@ -664,6 +702,7 @@ class Job(object): parameters['MEMORY'] = self.memory parameters['MEMORY_PER_TASK'] = self.memory_per_task parameters['NUMTHREADS'] = self.threads + parameters['THREADS'] = self.threads parameters['NUMTASK'] = self.tasks parameters['WALLCLOCK'] = self.wallclock parameters['TASKTYPE'] = self.section @@ -708,7 +747,7 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = ' # %PACKED% \n sleep 5\n' + template = 'sleep 5' elif self.type == Type.PYTHON: template = 'time.sleep(5)' elif self.type == Type.R: @@ -964,8 +1003,8 @@ class Job(object): self.parents.remove(parent) def synchronize_logs(self): - self.platform.move_file(self.remote_logs[0], self.local_logs[0], self.id) # .out - self.platform.move_file(self.remote_logs[1], self.local_logs[1], self.id) # .err + self.platform.move_file(self.remote_logs[0], self.local_logs[0], True) # .out + self.platform.move_file(self.remote_logs[1], self.local_logs[1], True) # .err self.remote_logs = self.local_logs @@ -1007,6 +1046,7 @@ class WrapperJob(Job): # save start time, wallclock and processors?! self.checked_time = datetime.datetime.now() self.hold = hold + def _queuing_reason_cancel(self, reason): try: if len(reason.split('(', 1)) > 1: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 32869e53964d75b0890c2ef49b9dd94ebf7aa6df..59abd809c72677096b1d24612be840a0e99fc74e 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -873,6 +873,10 @@ class JobList: return [job for job in finished if job.packed is False] else: return finished + + + + def get_active(self, platform=None, wrapper=False): """ Returns a list of active jobs (In platforms queue + Ready) @@ -891,6 +895,7 @@ class JobList: return active + def get_job_by_name(self, name): """ Returns the job that its name matches parameter name diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index a489d86304e15b16e0bbc65ced9a1fee6c8cb5f9..6af6fb5846a94a7142242c4d3dc52869ba10b0dc 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -198,15 +198,15 @@ class JobPackageArray(JobPackageBase): def _create_scripts(self, configuration): timestamp = str(int(time.time())) - for i in range(1, len(self.jobs) + 1): - self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) - self._job_inputs[self.jobs[i - 1].name] = self._create_i_input(timestamp, i) - self.jobs[i - 1].remote_logs = (timestamp + ".{0}.out".format(i), timestamp + ".{0}.err".format(i)) + for i in range(0, len(self.jobs)): + self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) + self._job_inputs[self.jobs[i].name] = self._create_i_input(timestamp, i) + self.jobs[i].remote_logs = (timestamp + ".{0}.out".format(i), timestamp + ".{0}.err".format(i)) self._common_script = self._create_common_script(timestamp) def _create_i_input(self, filename, index): filename += '.{0}'.format(index) - input_content = self._job_scripts[self.jobs[index - 1].name] + input_content = self._job_scripts[self.jobs[index].name] open(os.path.join(self._tmp_path, filename), 'w').write(input_content) os.chmod(os.path.join(self._tmp_path, filename), 0o755) return filename @@ -236,11 +236,11 @@ class JobPackageArray(JobPackageBase): if package_id is None: return - for i in range(1, len(self.jobs) + 1): - Log.info("{0} submitted", self.jobs[i - 1].name) - self.jobs[i - 1].id = str(package_id) + '[{0}]'.format(i) - self.jobs[i - 1].status = Status.SUBMITTED - self.jobs[i - 1].write_submit_time() + for i in range(0, len(self.jobs)): + Log.info("{0} submitted", self.jobs[i].name) + self.jobs[i].id = str(package_id) + '[{0}]'.format(i) + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].write_submit_time() class JobPackageThread(JobPackageBase): @@ -309,11 +309,11 @@ class JobPackageThread(JobPackageBase): def _create_scripts(self, configuration): - for i in range(1, len(self.jobs) + 1): - self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) - self.jobs[i - 1].remote_logs = ( - self._job_scripts[self.jobs[i - 1].name] + ".{0}.out".format(i - 1), - self._job_scripts[self.jobs[i - 1].name] + ".{0}.err".format(i - 1) + for i in range(0, len(self.jobs)): + self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) + self.jobs[i].remote_logs = ( + self._job_scripts[self.jobs[i].name] + ".{0}.out".format(i), + self._job_scripts[self.jobs[i].name] + ".{0}.err".format(i) ) self._common_script = self._create_common_script() @@ -354,11 +354,11 @@ class JobPackageThread(JobPackageBase): if package_id is None: return - for i in range(1, len(self.jobs) + 1): - Log.info("{0} submitted", self.jobs[i - 1].name) - self.jobs[i - 1].id = str(package_id) - self.jobs[i - 1].status = Status.SUBMITTED - self.jobs[i - 1].write_submit_time() + for i in range(0, len(self.jobs) ): + Log.info("{0} submitted", self.jobs[i].name) + self.jobs[i].id = str(package_id) + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].write_submit_time() def _common_script_content(self): pass @@ -405,11 +405,11 @@ class JobPackageThreadWrapped(JobPackageThread): return self._platform.project def _create_scripts(self, configuration): - for i in range(1, len(self.jobs) + 1): - self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) - self.jobs[i - 1].remote_logs = ( - self._job_scripts[self.jobs[i - 1].name] + ".{0}.out".format(i - 1), - self._job_scripts[self.jobs[i - 1].name] + ".{0}.err".format(i - 1) + for i in range(0, len(self.jobs)): + self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) + self.jobs[i].remote_logs = ( + self._job_scripts[self.jobs[i].name] + ".{0}.out".format(i), + self._job_scripts[self.jobs[i].name] + ".{0}.err".format(i) ) self._common_script = self._create_common_script() @@ -437,11 +437,11 @@ class JobPackageThreadWrapped(JobPackageThread): if package_id is None: raise Exception('Submission failed') - for i in range(1, len(self.jobs) + 1): - Log.info("{0} submitted", self.jobs[i - 1].name) - self.jobs[i - 1].id = str(package_id) - self.jobs[i - 1].status = Status.SUBMITTED - self.jobs[i - 1].write_submit_time() + for i in range(0, len(self.jobs)): + Log.info("{0} submitted", self.jobs[i].name) + self.jobs[i].id = str(package_id) + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].write_submit_time() class JobPackageVertical(JobPackageThread): diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index bff993360a38b5e79eb45fdc84e7d7845a5b9893..10eafe943552c86901d107346fcb7239b769a17c 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -19,7 +19,7 @@ import os import subprocess - +from time import sleep from autosubmit.platforms.paramiko_platform import ParamikoPlatform, ParamikoPlatformException from bscearth.utils.log import Log @@ -158,14 +158,27 @@ class EcPlatform(ParamikoPlatform): command = '{0} {3}:{2} {1}'.format(self.get_cmd, file_path, os.path.join(self.get_files_path(), filename), self.host) try: - process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) - out, _ = process.communicate() - process_ok = False if 'No such file' in out or process.returncode != 0 else True - except Exception: + retries = 0 + sleeptime = 5 + process_ok = False + while not process_ok and retries < 2: + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) + out, _ = process.communicate() + if 'No such file' in out or process.returncode != 0: + retries = retries + 1 + process_ok = False + sleeptime = sleeptime + 5 + sleep(sleeptime) + else: + process_ok = True + except Exception as e: + Log.error("Not recovered,{0}", e) + Log.error("command: , {0} ",command) process_ok = False - if not process_ok and must_exist: raise Exception('File {0} does not exists'.format(filename)) + if not process_ok: + Log.warning("File not transferred {0}, output: {1}",command,out) return process_ok def delete_file(self, filename): diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index 6634b2a0d04e7ca7c329dc3f9134e851571e1281..af22da2ca7fcd519ffb49cfb89caddc0b4a9ee59 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -128,13 +128,14 @@ class LsfHeader(object): def run(self): out = str(self.template) + '.' + str(self.id_run) + '.out' err = str(self.template) + '.' + str(self.id_run) + '.err' + command = str(self.template) + ' ' + str(self.id_run) + ' ' + os.getcwd() (self.status) = getstatusoutput(command + ' > ' + out + ' 2> ' + err) scripts = {3} for i in range(len(scripts)): - current = JobThread(scripts[i], i) + current = JobThread(scripts[i], i+self.id_run) current.start() current.join() completed_filename = scripts[i].replace('.cmd', '_COMPLETED') diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 0f3d1852b335ded671c6651941c7e8cda73b9dd7..a9193977b73918c7b49f4505b8dfa6aeb603d8ff 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -114,20 +114,18 @@ class LocalPlatform(ParamikoPlatform): raise return True + def check_file_exists(self,filename): + return True def get_file(self, filename, must_exist=True, relative_path=''): - local_path = os.path.join(self.tmp_path, relative_path) if not os.path.exists(local_path): os.makedirs(local_path) - file_path = os.path.join(local_path, filename) if os.path.exists(file_path): os.remove(file_path) command = '{0} {1} {2}'.format(self.get_cmd, os.path.join(self.tmp_path, 'LOG_' + self.expid, filename), file_path) - - try: subprocess.check_call(command, stdout=open(os.devnull, 'w'), stderr=open(os.devnull, 'w'), shell=True) except subprocess.CalledProcessError: @@ -136,6 +134,38 @@ class LocalPlatform(ParamikoPlatform): return False return True + # Moves .err .out + def check_file_exists(self, src): + """ + Moves a file on the platform + :param src: source name + :type src: str + :param dest: destination name + :param must_exist: ignore if file exist or not + :type dest: str + """ + + file_exist = False + sleeptime = 5 + remote_path = os.path.join(self.get_files_path(), src) + retries = 0 + while not file_exist and retries < 5: + try: + file_exist = os.path.isfile(os.path.join(self.get_files_path(),src)) + if not file_exist: # File doesn't exist, retry in sleeptime + Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, + 1 - retries, remote_path) + os.sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 + except BaseException as e: # Unrecoverable error + Log.critical("Crashed while retrieving logs: {0}",e) + + file_exist = False # won't exist + retries = 999 # no more retries + + return file_exist + def delete_file(self, filename): command = '{0} {1}'.format(self.del_cmd, os.path.join(self.tmp_path, 'LOG_' + self.expid, filename)) try: diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index d2e78c68893a4fe274a2a7162e7a13400c72e4f2..a2bc431b18b5ef082b91ef777f05b2f8c66963f8 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -161,20 +161,23 @@ class ParamikoPlatform(Platform): self.delete_file(filename) try: - #ftp = self._ssh.open_sftp() + local_path = os.path.join(os.path.join(self.tmp_path, filename)) + remote_path = os.path.join(self.get_files_path(), os.path.basename(filename)) + self._ftpChannel.put(local_path, remote_path) + self._ftpChannel.chmod(remote_path,os.stat(local_path).st_mode) + - self._ftpChannel.put(os.path.join(self.tmp_path, filename), os.path.join(self.get_files_path(), os.path.basename(filename))) - self._ftpChannel.chmod(os.path.join(self.get_files_path(), os.path.basename(filename)), - os.stat(os.path.join(self.tmp_path, filename)).st_mode) - #ftp.close() return True - except (OSError,IOError) as er: - Log.warning('Can not send file {0} to {1} due file not found skipping until next iteration', os.path.join(self.tmp_path, filename), + except BaseException as e: + Log.error('Can not send file {0} to {1}', os.path.join(self.tmp_path, filename), os.path.join(self.get_files_path(), filename)) - raise (IOError) + raise except BaseException as e: Log.error('Unknown Error') raise + except: + Log.error('Unknown Error') + raise # Gets .err and .out def get_file(self, filename, must_exist=True, relative_path=''): @@ -198,23 +201,20 @@ class ParamikoPlatform(Platform): file_path = os.path.join(local_path, filename) if os.path.exists(file_path): os.remove(file_path) - if not self.restore_connection(): return False - + remote_path = os.path.join(self.get_files_path(), filename) try: - #ftp = self._ssh.open_sftp() - - self._ftpChannel.get(os.path.join(self.get_files_path(), filename), file_path) - #ftp.close() + self._ftpChannel.get(remote_path, file_path) return True - except BaseException: - # ftp.get creates a local file anyway - if os.path.exists(file_path): - os.remove(file_path) + except Exception as e: + if str(e).lower.contain("Garbage"): + Log.critical("Critical Error,seems that the user is invalid") + raise if must_exist: raise Exception('File {0} does not exists'.format(filename)) - return False + else: + return False def delete_file(self, filename): """ @@ -237,40 +237,36 @@ class ParamikoPlatform(Platform): except IOError: return False except BaseException as e: + if e.lower().contains("garbage"): + Log.error("Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ") + raise Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename))) return False - # Moves .err .out - def move_file(self, src, dest,migrate=False): + + + + def move_file(self, src, dest,must_exist=False): """ Moves a file on the platform (includes .err and .out) :param src: source name :type src: str :param dest: destination name - :param migrate: ignore if file exist or not + :param must_exist: ignore if file exist or not :type dest: str """ if not self.restore_connection(): return False - try: - #ftp = self._ssh.open_sftp() - if not migrate: - self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(), dest)) - else: - try: - #self._ftpChannel.chdir((os.path.join(self.get_files_path(), src))) - self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(),dest)) - return True - except (IOError): - return False - #ftp.close() - + self._ftpChannel.rename(os.path.join(self.get_files_path(), src), + os.path.join(self.get_files_path(), dest)) return True - except BaseException: - Log.debug('Could not move (rename) file {0} to {1}'.format(os.path.join(self.get_files_path(), src), - os.path.join(self.get_files_path(), dest))) - return False + except: + if must_exist: + raise Exception('File {0} does not exists'.format(os.path.join(self.get_files_path(), src))) + else: + return False + def submit_job(self, job, script_name, hold=False): """ @@ -699,7 +695,9 @@ class ParamikoPlatform(Platform): Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host) else: Log.error('Could not create the DIR {0} on HPC {1}'.format(self.remote_log_dir, self.host)) - + except: + Log.critical("Garbage detected") + raise else: if self.send_command(self.get_mkdir_cmd()): Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host) diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 1dda5313bd7ec01640d8ffcca0b1b72da348d260..bc6f8501c52d7f66c5fa7953f8623c679bf15810 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -221,13 +221,12 @@ class Platform(object): :return: True if successful, false otherwise :rtype: bool """ - while True: + if self.check_file_exists('{0}_COMPLETED'.format(job_name)): if self.get_file('{0}_COMPLETED'.format(job_name), False): return True - if retries == 0: + else: return False - retries -= 1 - sleep(1) + def remove_stat_file(self, job_name): """ @@ -258,6 +257,8 @@ class Platform(object): Log.debug('{0} been removed', filename) return True return False + def check_file_exists(self, src): + return True def get_stat_file(self, job_name, retries=0): """ @@ -274,17 +275,10 @@ class Platform(object): stat_local_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) - - while True: + if self.check_file_exists(filename): if self.get_file(filename, False): Log.debug('{0}_STAT file have been transfered', job_name) return True - if retries == 0: - break - retries -= 1 - # wait five seconds to check get file - sleep(5) - Log.debug('Something did not work well when transferring the STAT file') return False diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index ade701c6069fb8697cba9141b2851753b855568c..5312aba5ece2187973ee039846ac29e702062939 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -18,13 +18,15 @@ # along with Autosubmit. If not, see . import os +from time import sleep from xml.dom.minidom import parseString from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.slurm_header import SlurmHeader from autosubmit.platforms.wrappers.wrapper_factory import SlurmWrapperFactory -from autosubmit.config.basicConfig import BasicConfig +from bscearth.utils.log import Log + class SlurmPlatform(ParamikoPlatform): """ @@ -47,10 +49,10 @@ class SlurmPlatform(ParamikoPlatform): self._allow_arrays = False self._allow_wrappers = True self.update_cmds() - - exp_id_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, self.expid) + self.config = config + exp_id_path = os.path.join(config.LOCAL_ROOT_DIR, self.expid) tmp_path = os.path.join(exp_id_path, "tmp") - self._submit_script_path = os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR,"submit_"+self.name+".sh") + self._submit_script_path = os.path.join(tmp_path , config.LOCAL_ASLOG_DIR,"submit_"+self.name+".sh") self._submit_script_file = open(self._submit_script_path, 'w').close() def open_submit_script(self): @@ -60,7 +62,7 @@ class SlurmPlatform(ParamikoPlatform): def get_submit_script(self): self._submit_script_file.close() os.chmod(self._submit_script_path, 0o750) - return os.path.join(BasicConfig.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path)) + return os.path.join(self.config.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path)) def submit_Script(self,hold=False): @@ -202,3 +204,26 @@ class SlurmPlatform(ParamikoPlatform): @staticmethod def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list")""" + + def check_file_exists(self,filename): + if not self.restore_connection(): + return False + file_exist = False + sleeptime = 5 + retries = 0 + while not file_exist and retries < 5: + try: + self._ftpChannel.stat(os.path.join(self.get_files_path(), filename)) # This return IOError if path doesn't exist + file_exist = True + except IOError: # File doesn't exist, retry in sleeptime + Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, + 1 - retries, os.path.join(self.get_files_path(),filename)) + sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 + except BaseException as e: # Unrecoverable error + Log.critical("Crashed while retrieving remote logs: {0}", e) + file_exist = False # won't exist + retries = 999 # no more retries + + return file_exist \ No newline at end of file diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 05816f45c51196085982aa3adf1d6f1094c4129f..c1c1822e1b9a6083b39b7ad4d6917fbc84c07363 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -146,6 +146,7 @@ class PythonWrapperBuilder(WrapperBuilder): os.system("echo $(date +%s) > "+jobname+"_STAT") out = str(self.template) + "." + str(self.id_run) + ".out" err = str(self.template) + "." + str(self.id_run) + ".err" + print(out+"\\n") command = "bash " + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() (self.status) = getstatusoutput(command + " > " + out + " 2> " + err) """).format('\n'.ljust(13)) @@ -294,7 +295,8 @@ class PythonWrapperBuilder(WrapperBuilder): {2} - current = {1}({0}[i], i) + current = {1}({0}[i], i+self.id_run) + print(self.id_run) pid_list.append(current) current.start() @@ -392,7 +394,7 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): nodes_list = self.build_nodes_list() self.exit_thread = "os._exit(1)" joblist_thread = self.build_joblist_thread() - threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i, " + threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i*(len(scripts)-1), " "copy.deepcopy(all_cores))", footer=False) return joblist_thread + nodes_list + threads_launcher