From 809ec78d8af167da6dda543314bd85d3bc1d31af Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Fri, 28 May 2021 13:50:31 +0200 Subject: [PATCH 1/2] Implements #690 for the historical database. The queue value of the wrapper section in the config is now stored in the databas if applicable. --- autosubmit/database/db_jobdata.py | 195 ++++++++++++++++++++---------- autosubmit/job/job.py | 22 +++- 2 files changed, 145 insertions(+), 72 deletions(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 5faf9e624..32caaa091 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -828,7 +828,7 @@ class JobDataStructure(MainDataBase): :param packed: True if job belongs to wrapper, False otherwise :type packed: boolean - :return: rowtype, 2 packed, 1 normal + :return: rowtype, >2 packed (wrapper code), 2 normal :rtype: int """ if code: @@ -1040,25 +1040,40 @@ class JobDataStructure(MainDataBase): return None return None - def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False): - """Writes submit time of job. - - Args: - job_name ([type]): [description] - submit (int, optional): [description]. Defaults to 0. - status (str, optional): [description]. Defaults to "UNKNOWN". - ncpus (int, optional): [description]. Defaults to 0. - wallclock (str, optional): [description]. Defaults to "00:00". - qos (str, optional): [description]. Defaults to "debug". - date (str, optional): [description]. Defaults to "". - member (str, optional): [description]. Defaults to "". - section (str, optional): [description]. Defaults to "". - chunk (int, optional): [description]. Defaults to 0. - platform (str, optional): [description]. Defaults to "NA". - job_id (int, optional): [description]. Defaults to 0. - - Returns: - [type]: [description] + def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False, wrapper_queue=None): + """ + Writes submit time of job into the historical database. + + :param job_name: Name of the job + :type job_name: str + :param submit: Submit time as timestamp + :type submit: int + :param status: Status of the job + :type status: str + :param ncpus: Number of processors requested by the job + :type ncpus: int + :param wallclock: Wallclock requested by the job + :type wallclock: str + :param qos: Queue requested by the job + :type qos: str + :param date: date of the job (from experiment config) + :type date: str + :param member: member of the job (from experiment config) + :type member: str + :param section: section of the job (from experiment config) + :type section: str + :param chunk: chunk of the job (from experiment config) + :type chunk: int + :param platform: Name of the target platform + :type platform: str + :param job_id: JobId in the platform + :type job_id: int + :param packed: True if job belongs to a wrapper + :type packed: bool + :param wrapper_queue: Name of the queue requested by the wrapper + :type wrapper_queue: str + :return: True if succesfully saved + :rtype: True or None """ try: job_data = self.get_job_data(job_name) @@ -1077,9 +1092,12 @@ class JobDataStructure(MainDataBase): job_max_counter + 1) if job_max_counter >= max_counter else max_counter else: current_counter = max_counter + package_code = self.get_job_package_code(job_name) + queue_name = wrapper_queue if ( + package_code and package_code > 2 and wrapper_queue is not None) else qos # Insert new last rowid = self._insert_job_data(JobData( - 0, current_counter, job_name, None, None, submit, 0, 0, status, self.determine_rowtype(self.get_job_package_code(job_name)), ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id, dict(), 0, self.current_run_id)) + 0, current_counter, job_name, None, None, submit, 0, 0, status, self.determine_rowtype(package_code), ncpus, wallclock, queue_name, 0, date, member, section, chunk, 1, platform, job_id, dict(), 0, self.current_run_id)) if rowid: return True else: @@ -1093,27 +1111,40 @@ class JobDataStructure(MainDataBase): # if rowid > 0: # print("Successfully inserted") - def write_start_time(self, job_name, start=0, status="UNKWOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False): - """Writes start time into the database - - Args: - job_name ([type]): [description] - start (int, optional): [description]. Defaults to 0. - status (str, optional): [description]. Defaults to "UNKWNONW". - ncpus (int, optional): [description]. Defaults to 0. - wallclock (str, optional): [description]. Defaults to "00:00". - qos (str, optional): [description]. Defaults to "debug". - date (str, optional): [description]. Defaults to "". - member (str, optional): [description]. Defaults to "". - section (str, optional): [description]. Defaults to "". - chunk (int, optional): [description]. Defaults to 0. - platform (str, optional): [description]. Defaults to "NA". - job_id (int, optional): [description]. Defaults to 0. - packed (bool, optional): [description]. Defaults to False. - nnodes (int, optional): [description]. Defaults to 0. - - Returns: - [type]: [description] + def write_start_time(self, job_name, start=0, status="UNKWOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False, wrapper_queue=None): + """ + Writes start time into the database + + :param job_name: Name of the job + :type job_name: str + :param start: Start time as timestamp + :type start: int + :param status: Status of the job + :type status: str + :param ncpus: Number of processors requested by the job + :type ncpus: int + :param wallclock: Wallclock requested by the job + :type wallclock: str + :param qos: Queue requested by the job + :type qos: str + :param date: date of the job (from experiment config) + :type date: str + :param member: member of the job (from experiment config) + :type member: str + :param section: section of the job (from experiment config) + :type section: str + :param chunk: chunk of the job (from experiment config) + :type chunk: int + :param platform: Name of the target platform + :type platform: str + :param job_id: JobId in the platform + :type job_id: int + :param packed: True if job belongs to a wrapper + :type packed: bool + :param wrapper_queue: Name of the queue requested by the wrapper + :type wrapper_queue: str + :return: True if succesfully saved + :rtype: True or None """ try: job_data_last = self.get_job_data_last(job_name) @@ -1121,17 +1152,21 @@ class JobDataStructure(MainDataBase): if job_data_last: job_data_last = job_data_last[0] if job_data_last.start == 0: + package_code = self.get_job_package_code(job_name) + queue_name = wrapper_queue if ( + package_code and package_code > 2 and wrapper_queue is not None) else qos job_data_last.start = start + job_data_last.qos = queue_name job_data_last.status = status job_data_last.rowtype = self.determine_rowtype( - self.get_job_package_code(job_name)) + package_code) job_data_last.job_id = job_id job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') _updated = self._update_start_job_data(job_data_last) return _updated # It is necessary to create a new row submit_inserted = self.write_submit_time( - job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, packed) + job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, packed, wrapper_queue) if submit_inserted: # print("retro start") self.write_start_time(job_name, start, status, @@ -1145,26 +1180,52 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't write start time.") return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True, out_file_path=None, out_file=None, err_file=None): - """Writes the finish time into the database - - Args: - job_name (str): Name of Job. - finish (int, optional): Finish time. Defaults to 0. - status (str, optional): Current Status. Defaults to "UNKNOWN". - ncpus (int, optional): Number of cpus. Defaults to 0. - wallclock (str, optional): Wallclock value. Defaults to "00:00". - qos (str, optional): Name of QoS. Defaults to "debug". - date (str, optional): Date from config. Defaults to "". - member (str, optional): Member from config. Defaults to "". - section (str, optional): Section from config. Defaults to "". - chunk (int, optional): Chunk from config. Defaults to 0. - platform (str, optional): Name of platform of job. Defaults to "NA". - job_id (int, optional): Id of job. Defaults to 0. - platform_object (obj, optional): Platform object. Defaults to None. - - Returns: - Boolean/None: True if success, None if exception. + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True, out_file_path=None, out_file=None, err_file=None, wrapper_queue=None): + """ + Writes the finish time into the database + + :param job_name: Name of the job + :type job_name: str + :param finish: Finish time as timestamp + :type finish: int + :param status: Status of the job + :type status: str + :param ncpus: Number of processors requested by the job + :type ncpus: int + :param wallclock: Wallclock requested by the job + :type wallclock: str + :param qos: Queue requested by the job + :type qos: str + :param date: date of the job (from experiment config) + :type date: str + :param member: member of the job (from experiment config) + :type member: str + :param section: section of the job (from experiment config) + :type section: str + :param chunk: chunk of the job (from experiment config) + :type chunk: int + :param platform: Name of the target platform + :type platform: str + :param job_id: JobId in the platform + :type job_id: int + :param platform_object: Platform Object + :type platform: Object + :param packed: True if job belongs to a wrapper + :type packed: bool + :param parent_id_list: List of parents (not in use) + :type parent_id_list: list + :param no_slurm: True if job belongs to slurm platform + :type no_slurm: bool + :param out_file_path: Path to the out file of the job + :type out_file_path: str + :param out_file: Name of the out file + :type out_file: str + :param err_file: Name of the err file + :type err_file: str + :param wrapper_queue: Name of the queue requested by the wrapper + :type wrapper_queue: str + :return: True if succesfully saved + :rtype: True or None """ try: # Current thread: @@ -1250,15 +1311,15 @@ class JobDataStructure(MainDataBase): return True # It is necessary to create a new row submit_inserted = self.write_submit_time( - job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, is_packed) + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, is_packed, wrapper_queue) write_inserted = self.write_start_time(job_name, finish, status, ncpus, - wallclock, qos, date, member, section, chunk, platform, job_id, is_packed) + wallclock, qos, date, member, section, chunk, platform, job_id, is_packed, wrapper_queue) # print(submit_inserted) # print(write_inserted) if submit_inserted and write_inserted: #print("retro finish") self.write_finish_time( - job_name, time.time(), status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, parent_id_list, no_slurm, out_file_path, out_file, err_file) + job_name, time.time(), status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, parent_id_list, no_slurm, out_file_path, out_file, err_file, wrapper_queue) else: return None except Exception as exp: diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 6f0b96070..45fad60a9 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -136,6 +136,16 @@ class Job(object): self.distance_weight = 0 self.level = 0 self.modules = "none" + self._wrapper_queue = None + # Try to get the wrapper queue. If exception, stays as None + try: + as_conf = AutosubmitConfig( + self.expid, BasicConfig, ConfigParserFactory()) + as_conf.reload() + self._wrapper_queue = as_conf.get_wrapper_queue() + except Exception as exp: + pass + #Log.warning("Exception '{}' while trying to get the wrapper queue") def __getstate__(self): odict = self.__dict__ @@ -952,7 +962,8 @@ class Job(object): except Exception as e: self.modules = re.sub( '%(? Date: Mon, 31 May 2021 14:20:18 +0200 Subject: [PATCH 2/2] Added small fix for 'None' value --- autosubmit/job/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 45fad60a9..4999d9651 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -143,6 +143,7 @@ class Job(object): self.expid, BasicConfig, ConfigParserFactory()) as_conf.reload() self._wrapper_queue = as_conf.get_wrapper_queue() + self._wrapper_queue = None if self._wrapper_queue == "None" else self._wrapper_queue except Exception as exp: pass #Log.warning("Exception '{}' while trying to get the wrapper queue") -- GitLab