diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 5faf9e624fb9c608a39448472b1dd03047929c52..32caaa0910533807ae0bcfda80d90a9ba701fbd0 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -828,7 +828,7 @@ class JobDataStructure(MainDataBase): :param packed: True if job belongs to wrapper, False otherwise :type packed: boolean - :return: rowtype, 2 packed, 1 normal + :return: rowtype, >2 packed (wrapper code), 2 normal :rtype: int """ if code: @@ -1040,25 +1040,40 @@ class JobDataStructure(MainDataBase): return None return None - def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False): - """Writes submit time of job. - - Args: - job_name ([type]): [description] - submit (int, optional): [description]. Defaults to 0. - status (str, optional): [description]. Defaults to "UNKNOWN". - ncpus (int, optional): [description]. Defaults to 0. - wallclock (str, optional): [description]. Defaults to "00:00". - qos (str, optional): [description]. Defaults to "debug". - date (str, optional): [description]. Defaults to "". - member (str, optional): [description]. Defaults to "". - section (str, optional): [description]. Defaults to "". - chunk (int, optional): [description]. Defaults to 0. - platform (str, optional): [description]. Defaults to "NA". - job_id (int, optional): [description]. Defaults to 0. - - Returns: - [type]: [description] + def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False, wrapper_queue=None): + """ + Writes submit time of job into the historical database. + + :param job_name: Name of the job + :type job_name: str + :param submit: Submit time as timestamp + :type submit: int + :param status: Status of the job + :type status: str + :param ncpus: Number of processors requested by the job + :type ncpus: int + :param wallclock: Wallclock requested by the job + :type wallclock: str + :param qos: Queue requested by the job + :type qos: str + :param date: date of the job (from experiment config) + :type date: str + :param member: member of the job (from experiment config) + :type member: str + :param section: section of the job (from experiment config) + :type section: str + :param chunk: chunk of the job (from experiment config) + :type chunk: int + :param platform: Name of the target platform + :type platform: str + :param job_id: JobId in the platform + :type job_id: int + :param packed: True if job belongs to a wrapper + :type packed: bool + :param wrapper_queue: Name of the queue requested by the wrapper + :type wrapper_queue: str + :return: True if succesfully saved + :rtype: True or None """ try: job_data = self.get_job_data(job_name) @@ -1077,9 +1092,12 @@ class JobDataStructure(MainDataBase): job_max_counter + 1) if job_max_counter >= max_counter else max_counter else: current_counter = max_counter + package_code = self.get_job_package_code(job_name) + queue_name = wrapper_queue if ( + package_code and package_code > 2 and wrapper_queue is not None) else qos # Insert new last rowid = self._insert_job_data(JobData( - 0, current_counter, job_name, None, None, submit, 0, 0, status, self.determine_rowtype(self.get_job_package_code(job_name)), ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id, dict(), 0, self.current_run_id)) + 0, current_counter, job_name, None, None, submit, 0, 0, status, self.determine_rowtype(package_code), ncpus, wallclock, queue_name, 0, date, member, section, chunk, 1, platform, job_id, dict(), 0, self.current_run_id)) if rowid: return True else: @@ -1093,27 +1111,40 @@ class JobDataStructure(MainDataBase): # if rowid > 0: # print("Successfully inserted") - def write_start_time(self, job_name, start=0, status="UNKWOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False): - """Writes start time into the database - - Args: - job_name ([type]): [description] - start (int, optional): [description]. Defaults to 0. - status (str, optional): [description]. Defaults to "UNKWNONW". - ncpus (int, optional): [description]. Defaults to 0. - wallclock (str, optional): [description]. Defaults to "00:00". - qos (str, optional): [description]. Defaults to "debug". - date (str, optional): [description]. Defaults to "". - member (str, optional): [description]. Defaults to "". - section (str, optional): [description]. Defaults to "". - chunk (int, optional): [description]. Defaults to 0. - platform (str, optional): [description]. Defaults to "NA". - job_id (int, optional): [description]. Defaults to 0. - packed (bool, optional): [description]. Defaults to False. - nnodes (int, optional): [description]. Defaults to 0. - - Returns: - [type]: [description] + def write_start_time(self, job_name, start=0, status="UNKWOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False, wrapper_queue=None): + """ + Writes start time into the database + + :param job_name: Name of the job + :type job_name: str + :param start: Start time as timestamp + :type start: int + :param status: Status of the job + :type status: str + :param ncpus: Number of processors requested by the job + :type ncpus: int + :param wallclock: Wallclock requested by the job + :type wallclock: str + :param qos: Queue requested by the job + :type qos: str + :param date: date of the job (from experiment config) + :type date: str + :param member: member of the job (from experiment config) + :type member: str + :param section: section of the job (from experiment config) + :type section: str + :param chunk: chunk of the job (from experiment config) + :type chunk: int + :param platform: Name of the target platform + :type platform: str + :param job_id: JobId in the platform + :type job_id: int + :param packed: True if job belongs to a wrapper + :type packed: bool + :param wrapper_queue: Name of the queue requested by the wrapper + :type wrapper_queue: str + :return: True if succesfully saved + :rtype: True or None """ try: job_data_last = self.get_job_data_last(job_name) @@ -1121,17 +1152,21 @@ class JobDataStructure(MainDataBase): if job_data_last: job_data_last = job_data_last[0] if job_data_last.start == 0: + package_code = self.get_job_package_code(job_name) + queue_name = wrapper_queue if ( + package_code and package_code > 2 and wrapper_queue is not None) else qos job_data_last.start = start + job_data_last.qos = queue_name job_data_last.status = status job_data_last.rowtype = self.determine_rowtype( - self.get_job_package_code(job_name)) + package_code) job_data_last.job_id = job_id job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') _updated = self._update_start_job_data(job_data_last) return _updated # It is necessary to create a new row submit_inserted = self.write_submit_time( - job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, packed) + job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, packed, wrapper_queue) if submit_inserted: # print("retro start") self.write_start_time(job_name, start, status, @@ -1145,26 +1180,52 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't write start time.") return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True, out_file_path=None, out_file=None, err_file=None): - """Writes the finish time into the database - - Args: - job_name (str): Name of Job. - finish (int, optional): Finish time. Defaults to 0. - status (str, optional): Current Status. Defaults to "UNKNOWN". - ncpus (int, optional): Number of cpus. Defaults to 0. - wallclock (str, optional): Wallclock value. Defaults to "00:00". - qos (str, optional): Name of QoS. Defaults to "debug". - date (str, optional): Date from config. Defaults to "". - member (str, optional): Member from config. Defaults to "". - section (str, optional): Section from config. Defaults to "". - chunk (int, optional): Chunk from config. Defaults to 0. - platform (str, optional): Name of platform of job. Defaults to "NA". - job_id (int, optional): Id of job. Defaults to 0. - platform_object (obj, optional): Platform object. Defaults to None. - - Returns: - Boolean/None: True if success, None if exception. + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True, out_file_path=None, out_file=None, err_file=None, wrapper_queue=None): + """ + Writes the finish time into the database + + :param job_name: Name of the job + :type job_name: str + :param finish: Finish time as timestamp + :type finish: int + :param status: Status of the job + :type status: str + :param ncpus: Number of processors requested by the job + :type ncpus: int + :param wallclock: Wallclock requested by the job + :type wallclock: str + :param qos: Queue requested by the job + :type qos: str + :param date: date of the job (from experiment config) + :type date: str + :param member: member of the job (from experiment config) + :type member: str + :param section: section of the job (from experiment config) + :type section: str + :param chunk: chunk of the job (from experiment config) + :type chunk: int + :param platform: Name of the target platform + :type platform: str + :param job_id: JobId in the platform + :type job_id: int + :param platform_object: Platform Object + :type platform: Object + :param packed: True if job belongs to a wrapper + :type packed: bool + :param parent_id_list: List of parents (not in use) + :type parent_id_list: list + :param no_slurm: True if job belongs to slurm platform + :type no_slurm: bool + :param out_file_path: Path to the out file of the job + :type out_file_path: str + :param out_file: Name of the out file + :type out_file: str + :param err_file: Name of the err file + :type err_file: str + :param wrapper_queue: Name of the queue requested by the wrapper + :type wrapper_queue: str + :return: True if succesfully saved + :rtype: True or None """ try: # Current thread: @@ -1250,15 +1311,15 @@ class JobDataStructure(MainDataBase): return True # It is necessary to create a new row submit_inserted = self.write_submit_time( - job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, is_packed) + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, is_packed, wrapper_queue) write_inserted = self.write_start_time(job_name, finish, status, ncpus, - wallclock, qos, date, member, section, chunk, platform, job_id, is_packed) + wallclock, qos, date, member, section, chunk, platform, job_id, is_packed, wrapper_queue) # print(submit_inserted) # print(write_inserted) if submit_inserted and write_inserted: #print("retro finish") self.write_finish_time( - job_name, time.time(), status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, parent_id_list, no_slurm, out_file_path, out_file, err_file) + job_name, time.time(), status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, parent_id_list, no_slurm, out_file_path, out_file, err_file, wrapper_queue) else: return None except Exception as exp: diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index e84cb1897b7640c8332b8f76590e870fc2515caf..e0e81ccd3bc53459bd764cd9736cf2bbeb5c4623 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1148,9 +1148,10 @@ class Job(object): else: f = open(path, 'w') f.write(date2str(datetime.datetime.now(), 'S')) + # Get # Writing database JobDataStructure(self.expid).write_submit_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self.queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed) + self.wallclock, self.queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) def write_start_time(self): """ @@ -1176,7 +1177,7 @@ class Job(object): f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) # Writing database JobDataStructure(self.expid).write_start_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.packed, self._wrapper_queue) return True def write_end_time(self, completed): @@ -1211,10 +1212,10 @@ class Job(object): path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) # Launch first as simple non-threaded function JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, - self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], True, None, out, err) + self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], True, None, out, err, self._wrapper_queue) # Launch second as threaded function thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out, out, err)) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out, out, err, self._wrapper_queue)) thread_write_finish.start() def check_started_after(self, date_limit):