From cf4233c59ead442230f23a7ca9d7f67d3616922f Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 17 Sep 2019 09:11:07 +0200 Subject: [PATCH 1/2] Fixed #428. Added a method on platform that writes the Job Id at the top of the file, this method is used in job.py. Some helpful comments were added. --- .gitignore | 1 + autosubmit/job/job.py | 13 ++++--- autosubmit/job/job_dict.py | 2 +- autosubmit/job/job_list.py | 1 + autosubmit/platforms/locplatform.py | 7 ++-- autosubmit/platforms/paramiko_platform.py | 5 ++- autosubmit/platforms/platform.py | 43 +++++++++++++++++++++-- 7 files changed, 62 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index c748715fb..8eacd9c4d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ .*.log* /cover/ /.coverage +autosubmit/miniTest.py diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index a4f416ffa..a298bcb72 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -533,13 +533,17 @@ class Job(object): if previous_status != Status.RUNNING and self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN, Status.RUNNING]: self.write_start_time() - - if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: + # Updating logs + if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: self.write_end_time(self.status == Status.COMPLETED) if self.local_logs != self.remote_logs: self.synchronize_logs() # unifying names for log files if copy_remote_logs: self.platform.get_logs_files(self.expid, self.remote_logs) + # Update the logs with Autosubmit Job Id Brand + for local_log in self.local_logs: + self.platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + return self.status def update_children_status(self): @@ -963,10 +967,11 @@ class Job(object): self.parents.remove(parent) def synchronize_logs(self): - self.platform.move_file(self.remote_logs[0], self.local_logs[0]) # .out - self.platform.move_file(self.remote_logs[1], self.local_logs[1]) # .err + self.platform.move_file(self.remote_logs[0], self.local_logs[0], self.id) # .out + self.platform.move_file(self.remote_logs[1], self.local_logs[1], self.id) # .err self.remote_logs = self.local_logs + class WrapperJob(Job): def __init__(self, name, job_id, status, priority, job_list, total_wallclock, num_processors, platform, as_config): diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 1a0239c9a..7381ea479 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -296,7 +296,7 @@ class DicJobs: if split > -1: name += "_{0}".format(split) name += "_" + section - if name in jobs_data: + if name in jobs_data: job = Job(name, jobs_data[name][1], jobs_data[name][2], priority) job.local_logs = (jobs_data[name][8], jobs_data[name][9]) job.remote_logs = (jobs_data[name][10], jobs_data[name][11]) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index f7d75d408..b8947c00e 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -136,6 +136,7 @@ class JobList: Log.info("Creating jobs...") jobs_data = dict() + # jobs_data includes the name of the .our and .err files of the job in LOG_expid if not new: jobs_data = {str(row[0]): row for row in self.load()} self._create_jobs(dic_jobs, jobs_parser, priority, default_job_type, jobs_data) diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 8bef82e7d..f2766be55 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -115,6 +115,7 @@ class LocalPlatform(ParamikoPlatform): return True def get_file(self, filename, must_exist=True, relative_path=''): + local_path = os.path.join(self.tmp_path, relative_path) if not os.path.exists(local_path): os.makedirs(local_path) @@ -125,8 +126,10 @@ class LocalPlatform(ParamikoPlatform): command = '{0} {1} {2}'.format(self.get_cmd, os.path.join(self.tmp_path, 'LOG_' + self.expid, filename), file_path) - try: - subprocess.check_call(command, stdout=open(os.devnull, 'w'), stderr=open(os.devnull, 'w'), shell=True) + + + try: + subprocess.check_call(command, stdout=open(os.devnull, 'w'), stderr=open(os.devnull, 'w'), shell=True) except subprocess.CalledProcessError: if must_exist: raise Exception('File {0} does not exists'.format(filename)) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index fcfbcc468..b6f3d6372 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -156,6 +156,7 @@ class ParamikoPlatform(Platform): Log.error('Unknown Error') raise + # Gets .err and .out def get_file(self, filename, must_exist=True, relative_path=''): """ Copies a file from the current platform to experiment's tmp folder @@ -222,9 +223,10 @@ class ParamikoPlatform(Platform): Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename))) return False + # Moves .err .out def move_file(self, src, dest,migrate=False): """ - Moves a file on the platform + Moves a file on the platform (includes .err and .out) :param src: source name :type src: str :param dest: destination name @@ -247,6 +249,7 @@ class ParamikoPlatform(Platform): except (IOError): pass #ftp.close() + return True except BaseException: Log.debug('Could not move (rename) file {0} to {1}'.format(os.path.join(self.get_files_path(), src), diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index f1d0a042f..6eae247cc 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -1,4 +1,5 @@ from time import sleep +from time import time import os @@ -194,10 +195,11 @@ class Platform(object): :rtype: bool """ raise NotImplementedError - + + # Executed when calling from Job def get_logs_files(self, exp_id, remote_logs): """ - Get the given LOGS files + Get the LOGS files .err and .out :param exp_id: experiment id :type exp_id: str @@ -325,3 +327,40 @@ class Platform(object): :rtype: autosubmit.job.job_common.Status """ raise NotImplementedError + + def write_jobid(self, jobid, complete_path): + """ + Writes Job id in an out file. + + :param jobid: job id + :type jobid: str + :param complete_path: complete path to the file, includes filename + :type complete_path: str + :return: Modifies file and returns True, False if file could not be modified + :rtype: Boolean + """ + try: + + title_job = "[AUTOSUBMIT JOB ID] = " + str(jobid) + + if os.path.exists(complete_path): + file_type = complete_path[-3:] + if file_type == "out" or file_type == "err": + with open(complete_path, "r+") as f: + # Reading into memory (Potentially slow) + first_line = f.readline() + # Not rewrite + if not first_line.startswith("[AUTOSUBMIT JOB ID]"): + content = f.read() + # Write again (Potentially slow) + #start = time() + #Log.info("Attempting job identification of " + str(jobid)) + f.seek(0,0) + f.write(title_job + "\n\n" + content) + f.close() + #finish = time() + #Log.info("Job correctly identified in " + str(finish - start) + " seconds") + + except Exception as ex: + Log.info("Writing Job Id Failed : " + str(ex)) + -- GitLab From 8958756ff64739b006379c3597a05dbde2016dfd Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Tue, 17 Sep 2019 09:58:22 +0200 Subject: [PATCH 2/2] Fixed #428. Changed format --- autosubmit/platforms/platform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 6eae247cc..b117f88ce 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -341,7 +341,7 @@ class Platform(object): """ try: - title_job = "[AUTOSUBMIT JOB ID] = " + str(jobid) + title_job = "[INFO] JOBID=" + str(jobid) if os.path.exists(complete_path): file_type = complete_path[-3:] @@ -350,7 +350,7 @@ class Platform(object): # Reading into memory (Potentially slow) first_line = f.readline() # Not rewrite - if not first_line.startswith("[AUTOSUBMIT JOB ID]"): + if not first_line.startswith("[INFO] JOBID="): content = f.read() # Write again (Potentially slow) #start = time() -- GitLab