From e2446e974aab3a375ae8c7938a7a85a9851e0170 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 26 Mar 2020 13:16:55 +0100 Subject: [PATCH 1/9] fixes for log_remote, checking speedup, fixed log in horizontal-vertical --- autosubmit/job/job.py | 6 +- autosubmit/job/job_packages.py | 60 +++++++------- autosubmit/platforms/headers/lsf_header.py | 3 +- autosubmit/platforms/paramiko_platform.py | 80 ++++++++++++------- .../platforms/wrappers/wrapper_builder.py | 6 +- 5 files changed, 90 insertions(+), 65 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index b8221332f..3b4d1c64f 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -708,7 +708,7 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = ' # %PACKED% \n sleep 5\n' + template = ' # %PACKED% \n sleep 5\n echo "Dummy_Template"' elif self.type == Type.PYTHON: template = 'time.sleep(5)' elif self.type == Type.R: @@ -964,8 +964,8 @@ class Job(object): self.parents.remove(parent) def synchronize_logs(self): - self.platform.move_file(self.remote_logs[0], self.local_logs[0], self.id) # .out - self.platform.move_file(self.remote_logs[1], self.local_logs[1], self.id) # .err + self.platform.move_file(self.remote_logs[0], self.local_logs[0], True) # .out + self.platform.move_file(self.remote_logs[1], self.local_logs[1], True) # .err self.remote_logs = self.local_logs diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index a489d8630..6af6fb584 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -198,15 +198,15 @@ class JobPackageArray(JobPackageBase): def _create_scripts(self, configuration): timestamp = str(int(time.time())) - for i in range(1, len(self.jobs) + 1): - self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) - self._job_inputs[self.jobs[i - 1].name] = self._create_i_input(timestamp, i) - self.jobs[i - 1].remote_logs = (timestamp + ".{0}.out".format(i), timestamp + ".{0}.err".format(i)) + for i in range(0, len(self.jobs)): + self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) + self._job_inputs[self.jobs[i].name] = self._create_i_input(timestamp, i) + self.jobs[i].remote_logs = (timestamp + ".{0}.out".format(i), timestamp + ".{0}.err".format(i)) self._common_script = self._create_common_script(timestamp) def _create_i_input(self, filename, index): filename += '.{0}'.format(index) - input_content = self._job_scripts[self.jobs[index - 1].name] + input_content = self._job_scripts[self.jobs[index].name] open(os.path.join(self._tmp_path, filename), 'w').write(input_content) os.chmod(os.path.join(self._tmp_path, filename), 0o755) return filename @@ -236,11 +236,11 @@ class JobPackageArray(JobPackageBase): if package_id is None: return - for i in range(1, len(self.jobs) + 1): - Log.info("{0} submitted", self.jobs[i - 1].name) - self.jobs[i - 1].id = str(package_id) + '[{0}]'.format(i) - self.jobs[i - 1].status = Status.SUBMITTED - self.jobs[i - 1].write_submit_time() + for i in range(0, len(self.jobs)): + Log.info("{0} submitted", self.jobs[i].name) + self.jobs[i].id = str(package_id) + '[{0}]'.format(i) + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].write_submit_time() class JobPackageThread(JobPackageBase): @@ -309,11 +309,11 @@ class JobPackageThread(JobPackageBase): def _create_scripts(self, configuration): - for i in range(1, len(self.jobs) + 1): - self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) - self.jobs[i - 1].remote_logs = ( - self._job_scripts[self.jobs[i - 1].name] + ".{0}.out".format(i - 1), - self._job_scripts[self.jobs[i - 1].name] + ".{0}.err".format(i - 1) + for i in range(0, len(self.jobs)): + self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) + self.jobs[i].remote_logs = ( + self._job_scripts[self.jobs[i].name] + ".{0}.out".format(i), + self._job_scripts[self.jobs[i].name] + ".{0}.err".format(i) ) self._common_script = self._create_common_script() @@ -354,11 +354,11 @@ class JobPackageThread(JobPackageBase): if package_id is None: return - for i in range(1, len(self.jobs) + 1): - Log.info("{0} submitted", self.jobs[i - 1].name) - self.jobs[i - 1].id = str(package_id) - self.jobs[i - 1].status = Status.SUBMITTED - self.jobs[i - 1].write_submit_time() + for i in range(0, len(self.jobs) ): + Log.info("{0} submitted", self.jobs[i].name) + self.jobs[i].id = str(package_id) + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].write_submit_time() def _common_script_content(self): pass @@ -405,11 +405,11 @@ class JobPackageThreadWrapped(JobPackageThread): return self._platform.project def _create_scripts(self, configuration): - for i in range(1, len(self.jobs) + 1): - self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) - self.jobs[i - 1].remote_logs = ( - self._job_scripts[self.jobs[i - 1].name] + ".{0}.out".format(i - 1), - self._job_scripts[self.jobs[i - 1].name] + ".{0}.err".format(i - 1) + for i in range(0, len(self.jobs)): + self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) + self.jobs[i].remote_logs = ( + self._job_scripts[self.jobs[i].name] + ".{0}.out".format(i), + self._job_scripts[self.jobs[i].name] + ".{0}.err".format(i) ) self._common_script = self._create_common_script() @@ -437,11 +437,11 @@ class JobPackageThreadWrapped(JobPackageThread): if package_id is None: raise Exception('Submission failed') - for i in range(1, len(self.jobs) + 1): - Log.info("{0} submitted", self.jobs[i - 1].name) - self.jobs[i - 1].id = str(package_id) - self.jobs[i - 1].status = Status.SUBMITTED - self.jobs[i - 1].write_submit_time() + for i in range(0, len(self.jobs)): + Log.info("{0} submitted", self.jobs[i].name) + self.jobs[i].id = str(package_id) + self.jobs[i].status = Status.SUBMITTED + self.jobs[i].write_submit_time() class JobPackageVertical(JobPackageThread): diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index 6634b2a0d..af22da2ca 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -128,13 +128,14 @@ class LsfHeader(object): def run(self): out = str(self.template) + '.' + str(self.id_run) + '.out' err = str(self.template) + '.' + str(self.id_run) + '.err' + command = str(self.template) + ' ' + str(self.id_run) + ' ' + os.getcwd() (self.status) = getstatusoutput(command + ' > ' + out + ' 2> ' + err) scripts = {3} for i in range(len(scripts)): - current = JobThread(scripts[i], i) + current = JobThread(scripts[i], i+self.id_run) current.start() current.join() completed_filename = scripts[i].replace('.cmd', '_COMPLETED') diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index d2e78c688..8f2a32ea4 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -201,20 +201,33 @@ class ParamikoPlatform(Platform): if not self.restore_connection(): return False - - try: - #ftp = self._ssh.open_sftp() - - self._ftpChannel.get(os.path.join(self.get_files_path(), filename), file_path) - #ftp.close() + file_exist = False + sleeptime = 5 + remote_path = os.path.join(self.get_files_path(), filename) + retries = 0 + while not file_exist and retries < 2: + try: + self._ftpChannel.stat(remote_path) # This return IOError if path doesn't exist + file_exist = True + except IOError: # File doesn't exist, retry in sleeptime + Log.info("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, + 1 - retries,remote_path) + sleep(sleeptime) + sleeptime= sleeptime+5 + retries= retries+1 + except: # Unrecoverable error + file_exist = False # won't exist + retries = 999 # no more retries + if file_exist: + self._ftpChannel.get(remote_path, file_path) return True - except BaseException: - # ftp.get creates a local file anyway - if os.path.exists(file_path): + else: + if os.path.exists(file_path): # ftp.get creates a local file anyway os.remove(file_path) if must_exist: raise Exception('File {0} does not exists'.format(filename)) - return False + else: + return False def delete_file(self, filename): """ @@ -241,36 +254,45 @@ class ParamikoPlatform(Platform): return False # Moves .err .out - def move_file(self, src, dest,migrate=False): + def move_file(self, src, dest,must_exist=False): """ Moves a file on the platform (includes .err and .out) :param src: source name :type src: str :param dest: destination name - :param migrate: ignore if file exist or not + :param must_exist: ignore if file exist or not :type dest: str """ if not self.restore_connection(): return False - try: - #ftp = self._ssh.open_sftp() - if not migrate: - self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(), dest)) - else: - try: - #self._ftpChannel.chdir((os.path.join(self.get_files_path(), src))) - self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(),dest)) - return True - except (IOError): - return False - #ftp.close() - + file_exist = False + sleeptime = 5 + remote_path = os.path.join(self.get_files_path(), os.path.join(self.get_files_path(), src)) + retries = 0 + while not file_exist and retries < 2: + try: + self._ftpChannel.stat(os.path.join(self.get_files_path(), src)) # This return IOError if path doesn't exist + file_exist = True + except IOError: # File doesn't exist, retry in sleeptime + Log.info("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, + 1 - retries, remote_path) + sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 + except: # Unrecoverable error + file_exist = False # won't exist + retries = 999 # no more retries + if file_exist: + self._ftpChannel.rename(os.path.join(self.get_files_path(), src), + os.path.join(self.get_files_path(), dest)) return True - except BaseException: - Log.debug('Could not move (rename) file {0} to {1}'.format(os.path.join(self.get_files_path(), src), - os.path.join(self.get_files_path(), dest))) - return False + else: + if must_exist: + raise Exception('File {0} does not exists'.format(os.path.join(self.get_files_path(), src))) + else: + return False + def submit_job(self, job, script_name, hold=False): """ diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 05816f45c..c1c1822e1 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -146,6 +146,7 @@ class PythonWrapperBuilder(WrapperBuilder): os.system("echo $(date +%s) > "+jobname+"_STAT") out = str(self.template) + "." + str(self.id_run) + ".out" err = str(self.template) + "." + str(self.id_run) + ".err" + print(out+"\\n") command = "bash " + str(self.template) + " " + str(self.id_run) + " " + os.getcwd() (self.status) = getstatusoutput(command + " > " + out + " 2> " + err) """).format('\n'.ljust(13)) @@ -294,7 +295,8 @@ class PythonWrapperBuilder(WrapperBuilder): {2} - current = {1}({0}[i], i) + current = {1}({0}[i], i+self.id_run) + print(self.id_run) pid_list.append(current) current.start() @@ -392,7 +394,7 @@ class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): nodes_list = self.build_nodes_list() self.exit_thread = "os._exit(1)" joblist_thread = self.build_joblist_thread() - threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i, " + threads_launcher = self.build_sequential_threads_launcher("scripts", "JobListThread(scripts[i], i*(len(scripts)-1), " "copy.deepcopy(all_cores))", footer=False) return joblist_thread + nodes_list + threads_launcher -- GitLab From d563a4bf92bb147fefbd59d175e577c2e1dcf602 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 27 Mar 2020 14:29:32 +0100 Subject: [PATCH 2/9] fixes for remote_clone --- autosubmit/git/autosubmit_git.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/autosubmit/git/autosubmit_git.py b/autosubmit/git/autosubmit_git.py index 136b91c2a..ceabd6ef8 100644 --- a/autosubmit/git/autosubmit_git.py +++ b/autosubmit/git/autosubmit_git.py @@ -155,10 +155,10 @@ class AutosubmitGit: if git_remote_project_path != '': if git_remote_project_path[-1] == '/': - git_remote_path=git_remote_project_path[:-1]+project_path + git_remote_path=os.path.join(git_remote_project_path[:-1], as_conf.expid, BasicConfig.LOCAL_PROJ_DIR) else: - git_remote_project_path+=project_path - project_path=git_remote_project_path + git_remote_path=os.path.join(git_remote_project_path, as_conf.expid, BasicConfig.LOCAL_PROJ_DIR) + project_path=git_remote_path if git_project_commit: Log.info("Fetching {0} into {1}", git_project_commit + " " + git_project_origin, project_path) -- GitLab From fda7d713646418d56f266680b0be9a4ca7ad1aec Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 1 Apr 2020 16:36:33 +0200 Subject: [PATCH 3/9] Weird bug on ftp.. sometimes only --- autosubmit/autosubmit.py | 3 +- autosubmit/job/job.py | 50 ++++++++++++++++----- autosubmit/job/job_list.py | 7 +++ autosubmit/job/job_list_persistence.py | 6 +-- autosubmit/platforms/locplatform.py | 6 +-- autosubmit/platforms/paramiko_platform.py | 54 +++++++++++------------ autosubmit/platforms/platform.py | 18 +++----- 7 files changed, 86 insertions(+), 58 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index c95e0ce79..158f55744 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -87,6 +87,8 @@ from job.job_exceptions import WrongTemplateException from job.job_packager import JobPackager from sets import Set + + # noinspection PyUnusedLocal def signal_handler(signal_received, frame): """ @@ -1762,7 +1764,6 @@ class Autosubmit: job.platform.get_logs_files(expid, job.remote_logs) except: pass - #Log.warning("Unable to retrieve the log file of {0} in platform {1}",job.name,job.platform.name) elif job.status != Status.SUSPENDED: job.status = Status.WAITING job.fail_count = 0 diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 3b4d1c64f..d3b0c34cc 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -34,6 +34,16 @@ from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty from autosubmit.config.basicConfig import BasicConfig from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates from time import sleep +from threading import Thread + + +def threaded(fn): + def wrapper(*args, **kwargs): + thread = Thread(target=fn, args=args, kwargs=kwargs) + thread.start() + return thread + return wrapper + class Job(object): """ @@ -88,6 +98,7 @@ class Job(object): self.file = None self._local_logs = ('', '') self._remote_logs = ('', '') + self.log_retrieved = False self.status = status self.old_status = self.status self.new_status=status @@ -104,7 +115,7 @@ class Job(object): self.check_warnings = 'false' self.packed = False self.hold = False - + self._running_thread = False def __getstate__(self): @@ -481,6 +492,28 @@ class Job(object): retrials_list.insert(0, retrial_dates) return retrials_list + @threaded + def retrieve_logfiles(self,copy_remote_logs): + out_exist = False + err_exist = False + retries = 10 + sleeptime = 5 + i= 0 + while not out_exist and not err_exist and i < retries: + out_exist = self.platform.check_file_exists(self.remote_logs[0]) + err_exist = self.platform.check_file_exists(self.remote_logs[1]) + sleeptime = sleeptime + 5 + i = i + 1 + if out_exist and err_exist: + if copy_remote_logs: + if self.local_logs != self.remote_logs: + self.synchronize_logs() # unifying names for log files + if self.platform.get_logs_files(self.expid, self.remote_logs): + self.log_retrieved = True + # Update the logs with Autosubmit Job Id Brand + for local_log in self.local_logs: + self.platform.write_jobid(self.id,os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + def update_status(self, copy_remote_logs=False): """ Updates job status, checking COMPLETED file if needed @@ -533,14 +566,10 @@ class Job(object): # Updating logs if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: self.write_end_time(self.status == Status.COMPLETED) - if self.local_logs != self.remote_logs: - self.synchronize_logs() # unifying names for log files - if copy_remote_logs: - self.platform.get_logs_files(self.expid, self.remote_logs) - # Update the logs with Autosubmit Job Id Brand - for local_log in self.local_logs: - self.platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) - + #New thread, check if file exist + if not self._running_thread: + thread = self.retrieve_logfiles(copy_remote_logs) + self._running_thread=True return self.status def update_children_status(self): @@ -708,7 +737,7 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = ' # %PACKED% \n sleep 5\n echo "Dummy_Template"' + template = 'sleep 5' elif self.type == Type.PYTHON: template = 'time.sleep(5)' elif self.type == Type.R: @@ -1007,6 +1036,7 @@ class WrapperJob(Job): # save start time, wallclock and processors?! self.checked_time = datetime.datetime.now() self.hold = hold + def _queuing_reason_cancel(self, reason): try: if len(reason.split('(', 1)) > 1: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 32869e539..a6410871b 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -873,6 +873,12 @@ class JobList: return [job for job in finished if job.packed is False] else: return finished + + def get_finished_not_logged(self, platform=None): + finished= self.get_completed(platform) + self.get_failed(platform) + return [job for job in finished if job.log_retrieved is False] + + def get_active(self, platform=None, wrapper=False): """ Returns a list of active jobs (In platforms queue + Ready) @@ -891,6 +897,7 @@ class JobList: return active + def get_job_by_name(self, name): """ Returns the job that its name matches parameter name diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 735f56e6b..a50cd63d8 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -91,7 +91,7 @@ class JobListPersistencePkl(JobListPersistence): job.priority, job.section, job.date, job.member, job.chunk, job.local_logs[0], job.local_logs[1], - job.remote_logs[0], job.remote_logs[1]) for job in job_list] + job.remote_logs[0], job.remote_logs[1],job.log_retrieved) for job in job_list] pickle.dump(jobs_data, fd) Log.debug('Job list saved') @@ -107,7 +107,7 @@ class JobListPersistenceDb(JobListPersistence): TABLE_FIELDS = ['name', 'id', 'status', 'priority', 'section', 'date', 'member', 'chunk', 'local_out', 'local_err', - 'remote_out', 'remote_err'] + 'remote_out', 'remote_err','log_retrieved'] def __init__(self, persistence_path, persistence_file): self.db_manager = DbManager(persistence_path, persistence_file, self.VERSION) @@ -134,7 +134,7 @@ class JobListPersistenceDb(JobListPersistence): job.priority, job.section, job.date, job.member, job.chunk, job.local_logs[0], job.local_logs[1], - job.remote_logs[0], job.remote_logs[1]) for job in job_list] + job.remote_logs[0], job.remote_logs[1],job.log_retrieved) for job in job_list] self.db_manager.insertMany(self.JOB_LIST_TABLE, jobs_data) def _reset_table(self): diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 0f3d1852b..699c3a785 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -114,20 +114,18 @@ class LocalPlatform(ParamikoPlatform): raise return True + def check_file_exists(self,filename): + return True def get_file(self, filename, must_exist=True, relative_path=''): - local_path = os.path.join(self.tmp_path, relative_path) if not os.path.exists(local_path): os.makedirs(local_path) - file_path = os.path.join(local_path, filename) if os.path.exists(file_path): os.remove(file_path) command = '{0} {1} {2}'.format(self.get_cmd, os.path.join(self.tmp_path, 'LOG_' + self.expid, filename), file_path) - - try: subprocess.check_call(command, stdout=open(os.devnull, 'w'), stderr=open(os.devnull, 'w'), shell=True) except subprocess.CalledProcessError: diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 8f2a32ea4..687c92238 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -175,6 +175,9 @@ class ParamikoPlatform(Platform): except BaseException as e: Log.error('Unknown Error') raise + except: + Log.error('Unknown Error') + raise # Gets .err and .out def get_file(self, filename, must_exist=True, relative_path=''): @@ -194,36 +197,16 @@ class ParamikoPlatform(Platform): local_path = os.path.join(self.tmp_path, relative_path) if not os.path.exists(local_path): os.makedirs(local_path) - file_path = os.path.join(local_path, filename) if os.path.exists(file_path): os.remove(file_path) - if not self.restore_connection(): return False - file_exist = False - sleeptime = 5 remote_path = os.path.join(self.get_files_path(), filename) - retries = 0 - while not file_exist and retries < 2: - try: - self._ftpChannel.stat(remote_path) # This return IOError if path doesn't exist - file_exist = True - except IOError: # File doesn't exist, retry in sleeptime - Log.info("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, - 1 - retries,remote_path) - sleep(sleeptime) - sleeptime= sleeptime+5 - retries= retries+1 - except: # Unrecoverable error - file_exist = False # won't exist - retries = 999 # no more retries - if file_exist: + try: self._ftpChannel.get(remote_path, file_path) return True - else: - if os.path.exists(file_path): # ftp.get creates a local file anyway - os.remove(file_path) + except: if must_exist: raise Exception('File {0} does not exists'.format(filename)) else: @@ -254,9 +237,9 @@ class ParamikoPlatform(Platform): return False # Moves .err .out - def move_file(self, src, dest,must_exist=False): + def check_file_exists(self, src): """ - Moves a file on the platform (includes .err and .out) + check if a file on the platform :param src: source name :type src: str :param dest: destination name @@ -270,7 +253,7 @@ class ParamikoPlatform(Platform): sleeptime = 5 remote_path = os.path.join(self.get_files_path(), os.path.join(self.get_files_path(), src)) retries = 0 - while not file_exist and retries < 2: + while not file_exist and retries < 5: try: self._ftpChannel.stat(os.path.join(self.get_files_path(), src)) # This return IOError if path doesn't exist file_exist = True @@ -283,11 +266,25 @@ class ParamikoPlatform(Platform): except: # Unrecoverable error file_exist = False # won't exist retries = 999 # no more retries - if file_exist: + return file_exist + + + def move_file(self, src, dest,must_exist=False): + """ + Moves a file on the platform (includes .err and .out) + :param src: source name + :type src: str + :param dest: destination name + :param must_exist: ignore if file exist or not + :type dest: str + """ + if not self.restore_connection(): + return False + try: self._ftpChannel.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(), dest)) return True - else: + except: if must_exist: raise Exception('File {0} does not exists'.format(os.path.join(self.get_files_path(), src))) else: @@ -721,7 +718,8 @@ class ParamikoPlatform(Platform): Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host) else: Log.error('Could not create the DIR {0} on HPC {1}'.format(self.remote_log_dir, self.host)) - + except: + Log.debug("Garbage detected") else: if self.send_command(self.get_mkdir_cmd()): Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host) diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 1dda5313b..bc6f8501c 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -221,13 +221,12 @@ class Platform(object): :return: True if successful, false otherwise :rtype: bool """ - while True: + if self.check_file_exists('{0}_COMPLETED'.format(job_name)): if self.get_file('{0}_COMPLETED'.format(job_name), False): return True - if retries == 0: + else: return False - retries -= 1 - sleep(1) + def remove_stat_file(self, job_name): """ @@ -258,6 +257,8 @@ class Platform(object): Log.debug('{0} been removed', filename) return True return False + def check_file_exists(self, src): + return True def get_stat_file(self, job_name, retries=0): """ @@ -274,17 +275,10 @@ class Platform(object): stat_local_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) - - while True: + if self.check_file_exists(filename): if self.get_file(filename, False): Log.debug('{0}_STAT file have been transfered', job_name) return True - if retries == 0: - break - retries -= 1 - # wait five seconds to check get file - sleep(5) - Log.debug('Something did not work well when transferring the STAT file') return False -- GitLab From 0d579e8a70b0aa134fca9788a835020cbc692507 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 2 Apr 2020 11:33:50 +0200 Subject: [PATCH 4/9] Working remote_logs --- autosubmit/job/job.py | 39 +++++++++++++---------- autosubmit/job/job_list.py | 4 +-- autosubmit/job/job_list_persistence.py | 4 +-- autosubmit/platforms/paramiko_platform.py | 12 +++++-- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d3b0c34cc..f5f950f87 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -27,7 +27,7 @@ import json import datetime import textwrap from collections import OrderedDict - +import copy from autosubmit.job.job_common import Status, Type from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty @@ -98,7 +98,7 @@ class Job(object): self.file = None self._local_logs = ('', '') self._remote_logs = ('', '') - self.log_retrieved = False + self.retrieving_log = False self.status = status self.old_status = self.status self.new_status=status @@ -115,7 +115,7 @@ class Job(object): self.check_warnings = 'false' self.packed = False self.hold = False - self._running_thread = False + def __getstate__(self): @@ -493,27 +493,34 @@ class Job(object): return retrials_list @threaded - def retrieve_logfiles(self,copy_remote_logs): + def retrieve_logfiles(self,copy_remote_logs,platform): + self.retrieving_log = True + while self.retrieving_log: + pass + sleep(2) + job = copy.deepcopy(self) + job.platform = platform + #job.platform.connect(True) out_exist = False err_exist = False retries = 10 sleeptime = 5 - i= 0 + i = 0 while not out_exist and not err_exist and i < retries: - out_exist = self.platform.check_file_exists(self.remote_logs[0]) - err_exist = self.platform.check_file_exists(self.remote_logs[1]) + out_exist = job.platform.check_file_exists(job.remote_logs[0]) + err_exist = job.platform.check_file_exists(job.remote_logs[1]) sleeptime = sleeptime + 5 i = i + 1 if out_exist and err_exist: if copy_remote_logs: - if self.local_logs != self.remote_logs: - self.synchronize_logs() # unifying names for log files - if self.platform.get_logs_files(self.expid, self.remote_logs): - self.log_retrieved = True + if job.local_logs != job.remote_logs: + job.synchronize_logs() # unifying names for log files + self.remote_logs = job.remote_logs + #if job.platform.get_logs_files(job.expid, job.remote_logs): # Update the logs with Autosubmit Job Id Brand - for local_log in self.local_logs: - self.platform.write_jobid(self.id,os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) - + for local_log in job.local_logs: + job.platform.write_jobid(job.id,os.path.join(job._tmp_path, 'LOG_' + str(job.expid), local_log)) + self.retrieving_log = False def update_status(self, copy_remote_logs=False): """ Updates job status, checking COMPLETED file if needed @@ -567,9 +574,7 @@ class Job(object): if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: self.write_end_time(self.status == Status.COMPLETED) #New thread, check if file exist - if not self._running_thread: - thread = self.retrieve_logfiles(copy_remote_logs) - self._running_thread=True + self.retrieve_logfiles(copy_remote_logs,self.platform) return self.status def update_children_status(self): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index a6410871b..59abd809c 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -874,9 +874,7 @@ class JobList: else: return finished - def get_finished_not_logged(self, platform=None): - finished= self.get_completed(platform) + self.get_failed(platform) - return [job for job in finished if job.log_retrieved is False] + def get_active(self, platform=None, wrapper=False): diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index a50cd63d8..0f4ad0474 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -91,7 +91,7 @@ class JobListPersistencePkl(JobListPersistence): job.priority, job.section, job.date, job.member, job.chunk, job.local_logs[0], job.local_logs[1], - job.remote_logs[0], job.remote_logs[1],job.log_retrieved) for job in job_list] + job.remote_logs[0], job.remote_logs[1]) for job in job_list] pickle.dump(jobs_data, fd) Log.debug('Job list saved') @@ -134,7 +134,7 @@ class JobListPersistenceDb(JobListPersistence): job.priority, job.section, job.date, job.member, job.chunk, job.local_logs[0], job.local_logs[1], - job.remote_logs[0], job.remote_logs[1],job.log_retrieved) for job in job_list] + job.remote_logs[0], job.remote_logs[1]) for job in job_list] self.db_manager.insertMany(self.JOB_LIST_TABLE, jobs_data) def _reset_table(self): diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 687c92238..4dcc44444 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -206,7 +206,10 @@ class ParamikoPlatform(Platform): try: self._ftpChannel.get(remote_path, file_path) return True - except: + except Exception as e: + if str(e).lower.contain("Garbage"): + Log.critical("Critical Error,seems that the user is invalid") + raise if must_exist: raise Exception('File {0} does not exists'.format(filename)) else: @@ -233,6 +236,9 @@ class ParamikoPlatform(Platform): except IOError: return False except BaseException as e: + if e.lower().contains("garbage"): + Log.error("Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ") + raise Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename))) return False @@ -266,6 +272,7 @@ class ParamikoPlatform(Platform): except: # Unrecoverable error file_exist = False # won't exist retries = 999 # no more retries + return file_exist @@ -719,7 +726,8 @@ class ParamikoPlatform(Platform): else: Log.error('Could not create the DIR {0} on HPC {1}'.format(self.remote_log_dir, self.host)) except: - Log.debug("Garbage detected") + Log.critical("Garbage detected") + raise else: if self.send_command(self.get_mkdir_cmd()): Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host) -- GitLab From 98bf5d3f255a262177cd2ca16ff8e0753bcbc51f Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 6 Apr 2020 17:05:20 +0200 Subject: [PATCH 5/9] Fixed %Thread% --- autosubmit/job/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index f5f950f87..017c157d7 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -698,6 +698,7 @@ class Job(object): parameters['MEMORY'] = self.memory parameters['MEMORY_PER_TASK'] = self.memory_per_task parameters['NUMTHREADS'] = self.threads + parameters['THREADS'] = self.threads parameters['NUMTASK'] = self.tasks parameters['WALLCLOCK'] = self.wallclock parameters['TASKTYPE'] = self.section -- GitLab From 6c3546854791a1004fd2f2490fb40dfe6c3eabfc Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 6 Apr 2020 17:54:22 +0200 Subject: [PATCH 6/9] fixed remote_logs --- autosubmit/job/job.py | 15 +++++---- autosubmit/platforms/locplatform.py | 32 ++++++++++++++++++ autosubmit/platforms/paramiko_platform.py | 41 +++++++++-------------- 3 files changed, 56 insertions(+), 32 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 017c157d7..a37336eeb 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -494,23 +494,26 @@ class Job(object): @threaded def retrieve_logfiles(self,copy_remote_logs,platform): - self.retrieving_log = True while self.retrieving_log: pass sleep(2) + self.retrieving_log = True job = copy.deepcopy(self) - job.platform = platform - #job.platform.connect(True) + job.platform = copy.copy(platform) + job.platform._ssh = None + job.platform._ftpChannel = None + job.platform.connect() out_exist = False err_exist = False - retries = 10 + retries = 2 sleeptime = 5 i = 0 while not out_exist and not err_exist and i < retries: - out_exist = job.platform.check_file_exists(job.remote_logs[0]) - err_exist = job.platform.check_file_exists(job.remote_logs[1]) + out_exist = job.platform.check_file_exists(job.remote_logs[0]) # will do 5 retries + err_exist = job.platform.check_file_exists(job.remote_logs[1]) # will do 5 retries sleeptime = sleeptime + 5 i = i + 1 + sleep(sleeptime) if out_exist and err_exist: if copy_remote_logs: if job.local_logs != job.remote_logs: diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 699c3a785..a9193977b 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -134,6 +134,38 @@ class LocalPlatform(ParamikoPlatform): return False return True + # Moves .err .out + def check_file_exists(self, src): + """ + Moves a file on the platform + :param src: source name + :type src: str + :param dest: destination name + :param must_exist: ignore if file exist or not + :type dest: str + """ + + file_exist = False + sleeptime = 5 + remote_path = os.path.join(self.get_files_path(), src) + retries = 0 + while not file_exist and retries < 5: + try: + file_exist = os.path.isfile(os.path.join(self.get_files_path(),src)) + if not file_exist: # File doesn't exist, retry in sleeptime + Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, + 1 - retries, remote_path) + os.sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 + except BaseException as e: # Unrecoverable error + Log.critical("Crashed while retrieving logs: {0}",e) + + file_exist = False # won't exist + retries = 999 # no more retries + + return file_exist + def delete_file(self, filename): command = '{0} {1}'.format(self.del_cmd, os.path.join(self.tmp_path, 'LOG_' + self.expid, filename)) try: diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 4dcc44444..8460bf8c1 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -161,17 +161,17 @@ class ParamikoPlatform(Platform): self.delete_file(filename) try: - #ftp = self._ssh.open_sftp() + local_path = os.path.join(os.path.join(self.tmp_path, filename)) + remote_path = os.path.join(self.get_files_path(), os.path.basename(filename)) + self._ftpChannel.put(local_path, remote_path) + self._ftpChannel.chmod(remote_path,os.stat(local_path).st_mode) + - self._ftpChannel.put(os.path.join(self.tmp_path, filename), os.path.join(self.get_files_path(), os.path.basename(filename))) - self._ftpChannel.chmod(os.path.join(self.get_files_path(), os.path.basename(filename)), - os.stat(os.path.join(self.tmp_path, filename)).st_mode) - #ftp.close() return True - except (OSError,IOError) as er: - Log.warning('Can not send file {0} to {1} due file not found skipping until next iteration', os.path.join(self.tmp_path, filename), + except BaseException as e: + Log.error('Can not send file {0} to {1}', os.path.join(self.tmp_path, filename), os.path.join(self.get_files_path(), filename)) - raise (IOError) + raise except BaseException as e: Log.error('Unknown Error') raise @@ -197,6 +197,7 @@ class ParamikoPlatform(Platform): local_path = os.path.join(self.tmp_path, relative_path) if not os.path.exists(local_path): os.makedirs(local_path) + file_path = os.path.join(local_path, filename) if os.path.exists(file_path): os.remove(file_path) @@ -242,34 +243,22 @@ class ParamikoPlatform(Platform): Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename))) return False - # Moves .err .out - def check_file_exists(self, src): - """ - check if a file on the platform - :param src: source name - :type src: str - :param dest: destination name - :param must_exist: ignore if file exist or not - :type dest: str - """ - if not self.restore_connection(): - return False - + def check_file_exists(self,filename): file_exist = False sleeptime = 5 - remote_path = os.path.join(self.get_files_path(), os.path.join(self.get_files_path(), src)) retries = 0 while not file_exist and retries < 5: try: - self._ftpChannel.stat(os.path.join(self.get_files_path(), src)) # This return IOError if path doesn't exist + self._ftpChannel.stat(os.path.join(self.get_files_path(), filename)) # This return IOError if path doesn't exist file_exist = True except IOError: # File doesn't exist, retry in sleeptime - Log.info("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, - 1 - retries, remote_path) + Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, + 1 - retries, os.path.join(self.get_files_path(),filename)) sleep(sleeptime) sleeptime = sleeptime + 5 retries = retries + 1 - except: # Unrecoverable error + except BaseException as e: # Unrecoverable error + Log.critical("Crashed while retrieving remote logs: {0}", e) file_exist = False # won't exist retries = 999 # no more retries -- GitLab From 9ab23bf8be22093046d35096be473ce82006e3ec Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 8 Apr 2020 13:20:26 +0200 Subject: [PATCH 7/9] remote_logs --- autosubmit/job/job.py | 9 +++--- autosubmit/platforms/paramiko_platform.py | 21 +------------- autosubmit/platforms/slurmplatform.py | 35 +++++++++++++++++++---- 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index a37336eeb..1c5327fe6 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -511,15 +511,16 @@ class Job(object): while not out_exist and not err_exist and i < retries: out_exist = job.platform.check_file_exists(job.remote_logs[0]) # will do 5 retries err_exist = job.platform.check_file_exists(job.remote_logs[1]) # will do 5 retries - sleeptime = sleeptime + 5 - i = i + 1 - sleep(sleeptime) + if not out_exist or not err_exist: + sleeptime = sleeptime + 5 + i = i + 1 + sleep(sleeptime) if out_exist and err_exist: if copy_remote_logs: if job.local_logs != job.remote_logs: job.synchronize_logs() # unifying names for log files self.remote_logs = job.remote_logs - #if job.platform.get_logs_files(job.expid, job.remote_logs): + job.platform.get_logs_files(job.expid, job.remote_logs) # Update the logs with Autosubmit Job Id Brand for local_log in job.local_logs: job.platform.write_jobid(job.id,os.path.join(job._tmp_path, 'LOG_' + str(job.expid), local_log)) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 8460bf8c1..a2bc431b1 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -243,26 +243,7 @@ class ParamikoPlatform(Platform): Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename))) return False - def check_file_exists(self,filename): - file_exist = False - sleeptime = 5 - retries = 0 - while not file_exist and retries < 5: - try: - self._ftpChannel.stat(os.path.join(self.get_files_path(), filename)) # This return IOError if path doesn't exist - file_exist = True - except IOError: # File doesn't exist, retry in sleeptime - Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, - 1 - retries, os.path.join(self.get_files_path(),filename)) - sleep(sleeptime) - sleeptime = sleeptime + 5 - retries = retries + 1 - except BaseException as e: # Unrecoverable error - Log.critical("Crashed while retrieving remote logs: {0}", e) - file_exist = False # won't exist - retries = 999 # no more retries - - return file_exist + def move_file(self, src, dest,must_exist=False): diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index ade701c60..5312aba5e 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -18,13 +18,15 @@ # along with Autosubmit. If not, see . import os +from time import sleep from xml.dom.minidom import parseString from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.slurm_header import SlurmHeader from autosubmit.platforms.wrappers.wrapper_factory import SlurmWrapperFactory -from autosubmit.config.basicConfig import BasicConfig +from bscearth.utils.log import Log + class SlurmPlatform(ParamikoPlatform): """ @@ -47,10 +49,10 @@ class SlurmPlatform(ParamikoPlatform): self._allow_arrays = False self._allow_wrappers = True self.update_cmds() - - exp_id_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, self.expid) + self.config = config + exp_id_path = os.path.join(config.LOCAL_ROOT_DIR, self.expid) tmp_path = os.path.join(exp_id_path, "tmp") - self._submit_script_path = os.path.join(tmp_path , BasicConfig.LOCAL_ASLOG_DIR,"submit_"+self.name+".sh") + self._submit_script_path = os.path.join(tmp_path , config.LOCAL_ASLOG_DIR,"submit_"+self.name+".sh") self._submit_script_file = open(self._submit_script_path, 'w').close() def open_submit_script(self): @@ -60,7 +62,7 @@ class SlurmPlatform(ParamikoPlatform): def get_submit_script(self): self._submit_script_file.close() os.chmod(self._submit_script_path, 0o750) - return os.path.join(BasicConfig.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path)) + return os.path.join(self.config.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path)) def submit_Script(self,hold=False): @@ -202,3 +204,26 @@ class SlurmPlatform(ParamikoPlatform): @staticmethod def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list")""" + + def check_file_exists(self,filename): + if not self.restore_connection(): + return False + file_exist = False + sleeptime = 5 + retries = 0 + while not file_exist and retries < 5: + try: + self._ftpChannel.stat(os.path.join(self.get_files_path(), filename)) # This return IOError if path doesn't exist + file_exist = True + except IOError: # File doesn't exist, retry in sleeptime + Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, + 1 - retries, os.path.join(self.get_files_path(),filename)) + sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 + except BaseException as e: # Unrecoverable error + Log.critical("Crashed while retrieving remote logs: {0}", e) + file_exist = False # won't exist + retries = 999 # no more retries + + return file_exist \ No newline at end of file -- GitLab From d1c43b480f701626a764f9d111c40e9a126d2e37 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 8 Apr 2020 14:08:52 +0200 Subject: [PATCH 8/9] Last fix remote log --- autosubmit/platforms/ecplatform.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index bff993360..10eafe943 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -19,7 +19,7 @@ import os import subprocess - +from time import sleep from autosubmit.platforms.paramiko_platform import ParamikoPlatform, ParamikoPlatformException from bscearth.utils.log import Log @@ -158,14 +158,27 @@ class EcPlatform(ParamikoPlatform): command = '{0} {3}:{2} {1}'.format(self.get_cmd, file_path, os.path.join(self.get_files_path(), filename), self.host) try: - process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) - out, _ = process.communicate() - process_ok = False if 'No such file' in out or process.returncode != 0 else True - except Exception: + retries = 0 + sleeptime = 5 + process_ok = False + while not process_ok and retries < 2: + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) + out, _ = process.communicate() + if 'No such file' in out or process.returncode != 0: + retries = retries + 1 + process_ok = False + sleeptime = sleeptime + 5 + sleep(sleeptime) + else: + process_ok = True + except Exception as e: + Log.error("Not recovered,{0}", e) + Log.error("command: , {0} ",command) process_ok = False - if not process_ok and must_exist: raise Exception('File {0} does not exists'.format(filename)) + if not process_ok: + Log.warning("File not transferred {0}, output: {1}",command,out) return process_ok def delete_file(self, filename): -- GitLab From fd4602ed6b2c5ba3899cecd1e7045f2f4fffbd67 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 8 Apr 2020 14:12:46 +0200 Subject: [PATCH 9/9] log retrieve removed, old approach --- autosubmit/job/job_list_persistence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 0f4ad0474..735f56e6b 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -107,7 +107,7 @@ class JobListPersistenceDb(JobListPersistence): TABLE_FIELDS = ['name', 'id', 'status', 'priority', 'section', 'date', 'member', 'chunk', 'local_out', 'local_err', - 'remote_out', 'remote_err','log_retrieved'] + 'remote_out', 'remote_err'] def __init__(self, persistence_path, persistence_file): self.db_manager = DbManager(persistence_path, persistence_file, self.VERSION) -- GitLab