From 7f9363053babec67c3af379798984fd3cebc8240 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Wed, 8 Jul 2020 17:51:40 +0200 Subject: [PATCH 1/4] Implement job historic database. First steps. --- autosubmit/config/basicConfig.py | 2 + autosubmit/database/db_jobdata.py | 532 ++++++++++++++++++++++++++++ autosubmit/database/db_structure.py | 22 +- autosubmit/job/job.py | 29 +- 4 files changed, 577 insertions(+), 8 deletions(-) create mode 100644 autosubmit/database/db_jobdata.py diff --git a/autosubmit/config/basicConfig.py b/autosubmit/config/basicConfig.py index 9d9bfac80..21d5a0a6a 100755 --- a/autosubmit/config/basicConfig.py +++ b/autosubmit/config/basicConfig.py @@ -38,6 +38,8 @@ class BasicConfig: DB_DIR = os.path.join(os.path.expanduser('~'), 'debug', 'autosubmit') STRUCTURES_DIR = os.path.join( '/esarchive', 'autosubmit', 'as_metadata', 'structures') + JOBDATA_DIR = os.path.join( + '/esarchive', 'autosubmit', 'as_metadata', 'data') DB_FILE = 'autosubmit.db' DB_PATH = os.path.join(DB_DIR, DB_FILE) LOCAL_ROOT_DIR = DB_DIR diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py new file mode 100644 index 000000000..e0fc60c73 --- /dev/null +++ b/autosubmit/database/db_jobdata.py @@ -0,0 +1,532 @@ +#!/usr/bin/env python + +# Copyright 2015 Earth Sciences Department, BSC-CNS + +# This file is part of Autosubmit. + +# Autosubmit is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# Autosubmit is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . + +import os +import sys +import string +import time +import pickle +import textwrap +import traceback +import sqlite3 +import copy +from datetime import datetime +from networkx import DiGraph +from autosubmit.config.basicConfig import BasicConfig + + +class JobData(): + """Job Data object + """ + + def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=1, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1): + """[summary] + + Args: + _id ([type]): [description] + counter (int, optional): [description]. Defaults to 1. + job_name (str, optional): [description]. Defaults to "None". + created ([type], optional): [description]. Defaults to None. + modified ([type], optional): [description]. Defaults to None. + submit (int, optional): [description]. Defaults to 0. + start (int, optional): [description]. Defaults to 0. + finish (int, optional): [description]. Defaults to 0. + status (str, optional): [description]. Defaults to "UNKNOWN". + rowtype (int, optional): [description]. Defaults to 1. + ncpus (int, optional): [description]. Defaults to 0. + wallclock (str, optional): [description]. Defaults to "00:00". + qos (str, optional): [description]. Defaults to "debug". + energy (int, optional): [description]. Defaults to 0. + date (str, optional): [description]. Defaults to "". + section (str, optional): [description]. Defaults to "". + member (str, optional): [description]. Defaults to "". + chunk (int, optional): [description]. Defaults to 0. + """ + self._id = _id + self.counter = counter + self.job_name = job_name + self.created = created if created else datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + self.modified = modified if modified else datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + self._submit = int(submit) + self._start = int(start) + self._finish = int(finish) + self.status = status + self.rowtype = rowtype + self.ncpus = ncpus + self.wallclock = wallclock + self.qos = qos if qos else "debug" + self.energy = energy + self.date = date if date else "" + self.section = section if section else "" + self.member = member if member else "" + self.chunk = chunk if chunk else 0 + self.last = last + + @property + def submit(self): + return int(self._submit) + + @property + def start(self): + return int(self._start) + + @property + def finish(self): + return int(self._finish) + + @submit.setter + def submit(self, submit): + self._submit = int(submit) + + @start.setter + def start(self, start): + self._start = int(start) + + @finish.setter + def finish(self, finish): + self._finish = int(finish) + + +class JobDataList(): + def __init__(self, expid): + self.jobdata_list = list() + self.expid = expid + + def add_jobdata(self, jobdata): + self.jobdata_list.append(jobdata) + + def size(self): + return len(self.jobdata_list) + + +class JobDataStructure(): + def __init__(self, expid): + BasicConfig.read() + self.expid = expid + self.folder_path = BasicConfig.JOBDATA_DIR + self.database_path = os.path.join( + self.folder_path, "job_data_" + str(expid) + ".db") + self.conn = None + self.jobdata_list = JobDataList(self.expid) + self.create_table_query = textwrap.dedent( + '''CREATE TABLE + IF NOT EXISTS job_data ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + counter INTEGER NOT NULL, + job_name TEXT NOT NULL, + created TEXT NOT NULL, + modified TEXT NOT NULL, + submit INTEGER NOT NULL, + start INTEGER NOT NULL, + finish INTEGER NOT NULL, + status TEXT NOT NULL, + rowtype INTEGER NOT NULL, + ncpus INTEGER NOT NULL, + wallclock TEXT NOT NULL, + qos TEXT NOT NULL, + energy INTEGER NOT NULL, + date TEXT NOT NULL, + section TEXT NOT NULL, + member TEXT NOT NULL, + chunk INTEGER NOT NULL, + last INTEGER NOT NULL, + UNIQUE(counter,job_name) + );''') + if not os.path.exists(self.database_path): + open(self.database_path, "w") + self.conn = self.create_connection(self.database_path) + self.create_table() + else: + self.conn = self.create_connection(self.database_path) + + def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0): + """Write submit always generates a new record + + Args: + job_name ([type]): [description] + submit (int, optional): [description]. Defaults to 0. + status (str, optional): [description]. Defaults to "UNKNOWN". + ncpus (int, optional): [description]. Defaults to 0. + wallclock (str, optional): [description]. Defaults to "00:00". + qos (str, optional): [description]. Defaults to "debug". + """ + #print("Saving write submit " + job_name) + try: + job_data = self.get_job_data(job_name) + current_counter = max_counter = 1 + #submit = parse_date(submit) if submit > 0 else 0 + #print("submit job data " + str(job_data)) + if job_data and len(job_data) > 0: + #print("job data has 1 element") + max_counter = self._get_maxcounter_jobdata() + job_max_counter = max(job.counter for job in job_data) + current_last = [ + job for job in job_data if job.counter == job_max_counter] + + # Deactivate current last for this job + current_last[0].modified = datetime.today().strftime( + '%Y-%m-%d-%H:%M:%S') + up_id = self._deactivate_current_last(current_last[0]) + # Finding current counter + current_counter = ( + job_max_counter + 1) if job_max_counter >= max_counter else max_counter + 1 + + # Insert new last + #print("Inserting new job data") + rowid = self._insert_job_data(JobData( + 0, current_counter, job_name, None, None, submit, 0, 0, status, 1, ncpus, wallclock, qos, 0, date, member, section, chunk, 1)) + return True + except Exception as exp: + print(traceback.format_exc()) + print(exp) + return None + + # if rowid > 0: + # print("Successfully inserted") + + def write_start_time(self, job_name, start=0, status="UNKWNONW", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0): + """[summary] + + Args: + job_name ([type]): [description] + start (int, optional): [description]. Defaults to 0. + status (str, optional): [description]. Defaults to "UNKWNONW". + ncpus (int, optional): [description]. Defaults to 0. + wallclock (str, optional): [description]. Defaults to "00:00". + qos (str, optional): [description]. Defaults to "debug". + date (str, optional): [description]. Defaults to "". + member (str, optional): [description]. Defaults to "". + section (str, optional): [description]. Defaults to "". + chunk (int, optional): [description]. Defaults to 0. + + Returns: + [type]: [description] + """ + try: + job_data_last = self.get_job_data_last(job_name) + # Updating existing row + if job_data_last: + if job_data_last.start == 0: + job_data_last.start = start + job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + rowid = self._update_start_job_data(job_data_last) + return rowid + # It is necessary to create a new row + submit_inserted = self.write_submit_time( + job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk) + if submit_inserted: + self.write_start_time(job_name, start, status, + ncpus, wallclock, qos, date, member, section, chunk) + else: + return None + except Exception as exp: + print(traceback.format_exc()) + print(exp) + return None + + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0): + """Write finish time in database. If possible, calls Slurm to retrieve energy. + + Args: + job_name ([type]): [description] + finish (int, optional): [description]. Defaults to 0. + status (str, optional): [description]. Defaults to "UNKNOWN". + ncpus (int, optional): [description]. Defaults to 0. + wallclock (str, optional): [description]. Defaults to "00:00". + qos (str, optional): [description]. Defaults to "debug". + date (str, optional): [description]. Defaults to "". + member (str, optional): [description]. Defaults to "". + section (str, optional): [description]. Defaults to "". + chunk (int, optional): [description]. Defaults to 0. + + Returns: + [type]: [description] + """ + try: + job_data_last = self.get_job_data_last(job_name) + # Updating existing row + if job_data_last: + if job_data_last.finish == 0: + # Call Slurm here, update times. + job_data_last.finish = finish + job_data_last.status = status + job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + rowid = self._update_finish_job_data(job_data_last) + return True + # It is necessary to create a new row + submit_inserted = self.write_submit_time( + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk) + write_inserted = self.write_start_time(job_name, finish, status, ncpus, + wallclock, qos, date, member, section, chunk) + if submit_inserted and write_inserted: + self.write_finish_time( + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk) + else: + return None + except Exception as exp: + print(traceback.format_exc()) + print(exp) + return None + + def get_all_job_data(self): + """[summary] + + Raises: + Exception: [description] + """ + try: + if os.path.exists(self.folder_path): + + current_table = self._get_all_job_data() + current_job_data = dict() + for item in current_table: + _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last = item + self.jobdata_list.add_jobdata(JobData(_id, _counter, _job_name, _created, _modified, + _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last)) + + else: + raise Exception("Job data folder not found :" + + str(self.jobdata_path)) + except Exception as exp: + print(traceback.format_exc()) + print(exp) + + def get_job_data(self, job_name): + try: + job_data = list() + if os.path.exists(self.folder_path): + current_job = self._get_job_data(job_name) + for item in current_job: + _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last = item + job_data.append(JobData(_id, _counter, _job_name, _created, _modified, + _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last)) + return job_data + else: + raise Exception("Job data folder not found :" + + str(self.jobdata_path)) + except Exception as exp: + print(traceback.format_exc()) + print(exp) + return None + + def get_job_data_last(self, job_name): + """[summary] + + Args: + job_name ([type]): [description] + + Raises: + Exception: [description] + + Returns: + [type]: [description] + """ + try: + if os.path.exists(self.folder_path): + current_job_last = self._get_job_data_last(job_name) + if current_job_last: + _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last = current_job_last + return JobData(_id, _counter, _job_name, _created, _modified, + _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last) + else: + return None + else: + raise Exception("Job data folder not found :" + + str(self.jobdata_path)) + except Exception as exp: + print(traceback.format_exc()) + print(exp) + return None + + def _deactivate_current_last(self, jobdata): + try: + sql = ''' UPDATE job_data SET last=0, modified = ? WHERE id = ?''' + tuplerow = (jobdata.modified, jobdata._id) + cur = self.conn.cursor() + cur.execute(sql, tuplerow) + self.conn.commit() + return cur.lastrowid + except Exception as exp: + print(traceback.format_exc()) + print("Error on Update : " + str(type(e).__name__)) + return None + + def _update_start_job_data(self, jobdata): + """Update start time of job data row + + Args: + jobdata ([type]): [description] + + Returns: + [type]: [description] + """ + # current_time = + try: + sql = ''' UPDATE job_data SET start=?, modified=? WHERE id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (int(jobdata.start), + jobdata.modified, jobdata._id)) + self.conn.commit() + return cur.lastrowid + except sqlite3.Error as e: + print(traceback.format_exc()) + print("Error on Insert : " + str(type(e).__name__)) + return None + + def _update_finish_job_data(self, jobdata): + try: + sql = ''' UPDATE job_data SET finish=?, modified=?, status=? WHERE id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (jobdata.finish, jobdata.modified, + jobdata.status, jobdata._id)) + self.conn.commit() + return cur.lastrowid + except sqlite3.Error as e: + print(traceback.format_exc()) + print("Error on Insert : " + str(type(e).__name__)) + return None + + def _insert_job_data(self, jobdata): + """[summary] + """ + try: + if self.conn: + #print("preparing to insert") + sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' + tuplerow = (jobdata.counter, jobdata.job_name, jobdata.created, jobdata.modified, jobdata.submit, jobdata.start, + jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last) + cur = self.conn.cursor() + #print("pre insert") + cur.execute(sql, tuplerow) + self.conn.commit() + #print("Inserted " + str(jobdata.job_name)) + return cur.lastrowid + else: + print("Not a valid connection.") + return None + except sqlite3.Error as e: + print(traceback.format_exc()) + print("Error on Insert : " + str(type(e).__name__)) + return None + + def _get__all_job_data(self): + """ + Get all registers from job_data.\n + :return: row content: exp_id, name, status, seconds_diff + :rtype: 4-tuple (int, str, str, int) + """ + try: + #conn = create_connection(path) + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute( + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last FROM job_data") + rows = cur.fetchall() + return rows + else: + raise Exception("Not a valid connection") + except Exception as exp: + print(traceback.format_exc()) + return list() + + def _get_job_data(self, job_name): + """[summary] + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute( + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last FROM job_data WHERE job_name=?", (job_name,)) + rows = cur.fetchall() + # print(rows) + return rows + else: + raise Exception("Not a valid connection") + except Exception as exp: + print(traceback.format_exc()) + return None + + def _get_job_data_last(self, job_name): + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute( + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last FROM job_data WHERE last=1 and job_name=?", (job_name,)) + rows = cur.fetchall() + if rows and len(rows) > 0: + return rows[0] + else: + return None + else: + raise Exception("Not a valid connection") + except Exception as exp: + print(traceback.format_exc()) + return None + + def _get_maxcounter_jobdata(self): + """[summary] + + Returns: + [type]: [description] + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute("SELECT MAX(counter) as maxcounter FROM job_data") + rows = cur.fetchall() + if len(rows) > 0: + #print("Row " + str(rows[0])) + result, = rows[0] + return int(result) + else: + return None + except Exception as exp: + print(traceback.format_exc()) + return None + + def create_table(self): + """ create a table from the create_table_sql statement + :param conn: Connection object + :param create_table_sql: a CREATE TABLE statement + :return: + """ + try: + if self.conn: + c = self.conn.cursor() + c.execute(self.create_table_query) + else: + raise Exception("Not a valid connection") + except Exception as e: + print(e) + + def create_connection(self, db_file): + """ + Create a database connection to the SQLite database specified by db_file. + :param db_file: database file name + :return: Connection object or None + """ + try: + conn = sqlite3.connect(db_file) + return conn + except: + return None diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py index fb3e3f0ff..7beff44e8 100644 --- a/autosubmit/database/db_structure.py +++ b/autosubmit/database/db_structure.py @@ -1,3 +1,22 @@ +#!/usr/bin/env python + +# Copyright 2015 Earth Sciences Department, BSC-CNS + +# This file is part of Autosubmit. + +# Autosubmit is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# Autosubmit is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . + import os import sys import string @@ -55,7 +74,8 @@ def get_structure(exp_id, structures_path): return None else: # pkl folder not found - raise Exception("pkl folder not found " + str(structures_path)) + raise Exception("Structures folder not found " + + str(structures_path)) except Exception as exp: print(traceback.format_exc()) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 3b9464c83..83118bad8 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -38,6 +38,7 @@ from autosubmit.job.job_common import Status, Type from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty from autosubmit.config.basicConfig import BasicConfig +from autosubmit.database.db_jobdata import JobDataStructure from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates from time import sleep from threading import Thread @@ -125,6 +126,7 @@ class Job(object): self.packed = False self.hold = False self.distance_weight = 0 + def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -552,9 +554,9 @@ class Job(object): if new_status == Status.COMPLETED: Log.debug("This job seems to have completed: checking...") - if not self.platform.get_completed_files(self.name): - log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') + log_name = os.path.join( + self._tmp_path, self.name + '_COMPLETED') self.check_completion() else: @@ -640,7 +642,7 @@ class Job(object): :param default_status: status to set if job is not completed. By default is FAILED :type default_status: Status """ - log_name = os.path.join(self._tmp_path,self.name + '_COMPLETED') + log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') if os.path.exists(log_name): self.status = Status.COMPLETED @@ -961,6 +963,9 @@ class Job(object): else: f = open(path, 'w') f.write(date2str(datetime.datetime.now(), 'S')) + # Writing database + JobDataStructure(self.expid).write_submit_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk) def write_start_time(self): """ @@ -980,6 +985,9 @@ class Job(object): f.write(' ') # noinspection PyTypeChecker f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) + # Writing database + JobDataStructure(self.expid).write_start_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk) return True def write_end_time(self, completed): @@ -993,16 +1001,25 @@ class Job(object): path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') f.write(' ') + finish_time = None + final_status = None if end_time > 0: # noinspection PyTypeChecker f.write(date2str(datetime.datetime.fromtimestamp(end_time), 'S')) + # date2str(datetime.datetime.fromtimestamp(end_time), 'S') + finish_time = end_time else: f.write(date2str(datetime.datetime.now(), 'S')) + finish_time = time.time() # date2str(datetime.datetime.now(), 'S') f.write(' ') if completed: + final_status = "COMPLETED" f.write('COMPLETED') else: + final_status = "FAILED" f.write('FAILED') + JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk) def check_started_after(self, date_limit): """ @@ -1354,7 +1371,8 @@ done total = total * 1.15 hour = int(total) minute = int((total - int(total)) * 60.0) - second = int(((total - int(total)) * 60 - int((total - int(total)) * 60.0)) * 60.0) + second = int(((total - int(total)) * 60 - + int((total - int(total)) * 60.0)) * 60.0) wallclock_delta = datetime.timedelta(hours=hour, minutes=minute, seconds=second) if elapsed > wallclock_delta: @@ -1370,6 +1388,3 @@ done time = int(output[index]) time = self._parse_timestamp(time) return time - - - -- GitLab From c138dbdf0368504184c34b3d4c9cd7f9adf76809 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Mon, 13 Jul 2020 13:16:09 +0200 Subject: [PATCH 2/4] Fixed #524 Added descentralized job database for historic data and energy consumption retrieval. --- .gitignore | 1 + autosubmit/autosubmit.py | 57 +-- autosubmit/config/basicConfig.py | 2 + autosubmit/database/db_jobdata.py | 456 ++++++++++++++++------ autosubmit/job/job.py | 6 +- autosubmit/platforms/paramiko_platform.py | 256 ++++++++---- autosubmit/platforms/platform.py | 50 ++- autosubmit/platforms/slurmplatform.py | 104 +++-- 8 files changed, 640 insertions(+), 292 deletions(-) diff --git a/.gitignore b/.gitignore index 8eacd9c4d..aa1269dcc 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ /cover/ /.coverage autosubmit/miniTest.py +autosubmit/simple_test.py diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8711afcce..7a2857a5e 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -17,38 +17,11 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . # pipeline_test -from __future__ import print_function -import threading -from sets import Set -from job.job_packager import JobPackager -from job.job_exceptions import WrongTemplateException -from platforms.paramiko_submitter import ParamikoSubmitter -from notifications.notifier import Notifier -from notifications.mail_notifier import MailNotifier -from bscearth.utils.date import date2str -from monitor.monitor import Monitor -from database.db_common import get_autosubmit_version -from database.db_common import delete_experiment -from experiment.experiment_common import copy_experiment -from experiment.experiment_common import new_experiment -from database.db_common import create_db -from bscearth.utils.log import Log -from job.job_grouping import JobGrouping -from job.job_list_persistence import JobListPersistencePkl -from job.job_list_persistence import JobListPersistenceDb -from job.job_package_persistence import JobPackagePersistence -from job.job_packages import JobPackageThread -from job.job_list import JobList -from git.autosubmit_git import AutosubmitGit -from job.job_common import Status -from bscearth.utils.config_parser import ConfigParserFactory -from config.config_common import AutosubmitConfig -from config.basicConfig import BasicConfig """ Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit """ - +from __future__ import print_function try: # noinspection PyCompatibility from configparser import SafeConfigParser @@ -77,16 +50,44 @@ import random import signal import datetime import portalocker +import threading from pkg_resources import require, resource_listdir, resource_exists, resource_string from distutils.util import strtobool from collections import defaultdict from pyparsing import nestedExpr + +from sets import Set + sys.path.insert(0, os.path.abspath('.')) # noinspection PyPackageRequirements # noinspection PyPackageRequirements # from API.testAPI import Monitor # noinspection PyPackageRequirements +from job.job_packager import JobPackager +from job.job_exceptions import WrongTemplateException +from platforms.paramiko_submitter import ParamikoSubmitter +from notifications.notifier import Notifier +from notifications.mail_notifier import MailNotifier +from bscearth.utils.date import date2str +from monitor.monitor import Monitor +from database.db_common import get_autosubmit_version +from database.db_common import delete_experiment +from experiment.experiment_common import copy_experiment +from experiment.experiment_common import new_experiment +from database.db_common import create_db +from bscearth.utils.log import Log +from job.job_grouping import JobGrouping +from job.job_list_persistence import JobListPersistencePkl +from job.job_list_persistence import JobListPersistenceDb +from job.job_package_persistence import JobPackagePersistence +from job.job_packages import JobPackageThread +from job.job_list import JobList +from git.autosubmit_git import AutosubmitGit +from job.job_common import Status +from bscearth.utils.config_parser import ConfigParserFactory +from config.config_common import AutosubmitConfig +from config.basicConfig import BasicConfig # noinspection PyUnusedLocal diff --git a/autosubmit/config/basicConfig.py b/autosubmit/config/basicConfig.py index 21d5a0a6a..9c729ebef 100755 --- a/autosubmit/config/basicConfig.py +++ b/autosubmit/config/basicConfig.py @@ -100,6 +100,8 @@ class BasicConfig: BasicConfig.ALLOWED_HOSTS = parser.get('hosts', 'whitelist') if parser.has_option('structures', 'path'): BasicConfig.STRUCTURES_DIR = parser.get('structures', 'path') + if parser.has_option('historicdb', 'path'): + BasicConfig.JOBDATA_DIR = parser.get('historicdb', 'path') @staticmethod def read(): diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index e0fc60c73..64d0d20ed 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -26,24 +26,34 @@ import textwrap import traceback import sqlite3 import copy +import collections from datetime import datetime from networkx import DiGraph from autosubmit.config.basicConfig import BasicConfig +from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates + + +_debug = False +JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish', + 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id']) + +# JobItem = collections.namedtuple( +# 'JobItem', 'id counter job_name created modified submit start finish status rowtype ncpus wallclock qos energy date section member chunk last platform') class JobData(): """Job Data object """ - def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=1, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1): + def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=1, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0): """[summary] Args: - _id ([type]): [description] + _id (int): Internal Id counter (int, optional): [description]. Defaults to 1. job_name (str, optional): [description]. Defaults to "None". - created ([type], optional): [description]. Defaults to None. - modified ([type], optional): [description]. Defaults to None. + created (datetime, optional): [description]. Defaults to None. + modified (datetime, optional): [description]. Defaults to None. submit (int, optional): [description]. Defaults to 0. start (int, optional): [description]. Defaults to 0. finish (int, optional): [description]. Defaults to 0. @@ -57,6 +67,9 @@ class JobData(): section (str, optional): [description]. Defaults to "". member (str, optional): [description]. Defaults to "". chunk (int, optional): [description]. Defaults to 0. + last (int, optional): [description]. Defaults to 1. + platform (str, optional): [description]. Defaults to "NA". + job_id (int, optional): [description]. Defaults to 0. """ self._id = _id self.counter = counter @@ -71,12 +84,15 @@ class JobData(): self.ncpus = ncpus self.wallclock = wallclock self.qos = qos if qos else "debug" - self.energy = energy + self._energy = energy if energy else 0 self.date = date if date else "" self.section = section if section else "" self.member = member if member else "" self.chunk = chunk if chunk else 0 self.last = last + self._platform = platform if platform and len( + platform) > 0 else "NA" + self.job_id = job_id if job_id else 0 @property def submit(self): @@ -90,6 +106,14 @@ class JobData(): def finish(self): return int(self._finish) + @property + def platform(self): + return self._platform + + @property + def energy(self): + return self._energy + @submit.setter def submit(self, submit): self._submit = int(submit) @@ -102,8 +126,18 @@ class JobData(): def finish(self, finish): self._finish = int(finish) + @platform.setter + def platform(self, platform): + self._platform = platform if platform and len(platform) > 0 else "NA" + + @energy.setter + def energy(self, energy): + self._energy = energy if energy else 0 + class JobDataList(): + """Object that stores the list of jobs to be handled. + """ def __init__(self, expid): self.jobdata_list = list() self.expid = expid @@ -116,7 +150,13 @@ class JobDataList(): class JobDataStructure(): + def __init__(self, expid): + """Initializes the object based on the unique identifier of the experiment. + + Args: + expid (str): Experiment identifier + """ BasicConfig.read() self.expid = expid self.folder_path = BasicConfig.JOBDATA_DIR @@ -146,6 +186,8 @@ class JobDataStructure(): member TEXT NOT NULL, chunk INTEGER NOT NULL, last INTEGER NOT NULL, + platform TEXT NOT NULL, + job_id INTEGER NOT NULL, UNIQUE(counter,job_name) );''') if not os.path.exists(self.database_path): @@ -155,8 +197,8 @@ class JobDataStructure(): else: self.conn = self.create_connection(self.database_path) - def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0): - """Write submit always generates a new record + def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0): + """Writes submit time of job. Args: job_name ([type]): [description] @@ -165,55 +207,70 @@ class JobDataStructure(): ncpus (int, optional): [description]. Defaults to 0. wallclock (str, optional): [description]. Defaults to "00:00". qos (str, optional): [description]. Defaults to "debug". + date (str, optional): [description]. Defaults to "". + member (str, optional): [description]. Defaults to "". + section (str, optional): [description]. Defaults to "". + chunk (int, optional): [description]. Defaults to 0. + platform (str, optional): [description]. Defaults to "NA". + job_id (int, optional): [description]. Defaults to 0. + + Returns: + [type]: [description] """ #print("Saving write submit " + job_name) try: job_data = self.get_job_data(job_name) - current_counter = max_counter = 1 + current_counter = 1 + max_counter = self._get_maxcounter_jobdata() #submit = parse_date(submit) if submit > 0 else 0 #print("submit job data " + str(job_data)) if job_data and len(job_data) > 0: - #print("job data has 1 element") - max_counter = self._get_maxcounter_jobdata() + # print("job data has 1 element") + # max_counter = self._get_maxcounter_jobdata() job_max_counter = max(job.counter for job in job_data) current_last = [ job for job in job_data if job.counter == job_max_counter] - - # Deactivate current last for this job - current_last[0].modified = datetime.today().strftime( - '%Y-%m-%d-%H:%M:%S') - up_id = self._deactivate_current_last(current_last[0]) + for current in current_last: + # Deactivate current last for this job + current.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + up_id = self._deactivate_current_last(current) # Finding current counter current_counter = ( - job_max_counter + 1) if job_max_counter >= max_counter else max_counter + 1 - + job_max_counter + 1) if job_max_counter >= max_counter else max_counter + else: + current_counter = max_counter # Insert new last - #print("Inserting new job data") rowid = self._insert_job_data(JobData( - 0, current_counter, job_name, None, None, submit, 0, 0, status, 1, ncpus, wallclock, qos, 0, date, member, section, chunk, 1)) - return True + 0, current_counter, job_name, None, None, submit, 0, 0, status, 1, ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id)) + if rowid: + return True + else: + return None except Exception as exp: - print(traceback.format_exc()) - print(exp) + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) return None # if rowid > 0: # print("Successfully inserted") - def write_start_time(self, job_name, start=0, status="UNKWNONW", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0): - """[summary] + def write_start_time(self, job_name, start=0, status="UNKWNONW", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0): + """Writes start time into the database Args: - job_name ([type]): [description] - start (int, optional): [description]. Defaults to 0. - status (str, optional): [description]. Defaults to "UNKWNONW". - ncpus (int, optional): [description]. Defaults to 0. - wallclock (str, optional): [description]. Defaults to "00:00". - qos (str, optional): [description]. Defaults to "debug". - date (str, optional): [description]. Defaults to "". - member (str, optional): [description]. Defaults to "". + job_name (str): Name of Job + start (int, optional): Start time. Defaults to 0. + status (str, optional): Status of job. Defaults to "UNKWNONW". + ncpus (int, optional): Number of cpis. Defaults to 0. + wallclock (str, optional): Wallclock value. Defaults to "00:00". + qos (str, optional): Name of QoS. Defaults to "debug". + date (str, optional): Date from config. Defaults to "". + member (str, optional): Member from config. Defaults to "". section (str, optional): [description]. Defaults to "". chunk (int, optional): [description]. Defaults to 0. + platform (str, optional): [description]. Defaults to "NA". + job_id (int, optional): [description]. Defaults to 0. Returns: [type]: [description] @@ -222,66 +279,96 @@ class JobDataStructure(): job_data_last = self.get_job_data_last(job_name) # Updating existing row if job_data_last: + job_data_last = job_data_last[0] if job_data_last.start == 0: job_data_last.start = start + job_data_last.status = status + job_data_last.job_id = job_id job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') rowid = self._update_start_job_data(job_data_last) return rowid # It is necessary to create a new row submit_inserted = self.write_submit_time( - job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk) + job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) if submit_inserted: self.write_start_time(job_name, start, status, - ncpus, wallclock, qos, date, member, section, chunk) + ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) else: return None except Exception as exp: - print(traceback.format_exc()) - print(exp) + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0): - """Write finish time in database. If possible, calls Slurm to retrieve energy. + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None): + """Writes the finish time into the database Args: - job_name ([type]): [description] - finish (int, optional): [description]. Defaults to 0. - status (str, optional): [description]. Defaults to "UNKNOWN". - ncpus (int, optional): [description]. Defaults to 0. - wallclock (str, optional): [description]. Defaults to "00:00". - qos (str, optional): [description]. Defaults to "debug". - date (str, optional): [description]. Defaults to "". - member (str, optional): [description]. Defaults to "". - section (str, optional): [description]. Defaults to "". - chunk (int, optional): [description]. Defaults to 0. + job_name (str): Name of Job. + finish (int, optional): Finish time. Defaults to 0. + status (str, optional): Current Status. Defaults to "UNKNOWN". + ncpus (int, optional): Number of cpus. Defaults to 0. + wallclock (str, optional): Wallclock value. Defaults to "00:00". + qos (str, optional): Name of QoS. Defaults to "debug". + date (str, optional): Date from config. Defaults to "". + member (str, optional): Member from config. Defaults to "". + section (str, optional): Section from config. Defaults to "". + chunk (int, optional): Chunk from config. Defaults to 0. + platform (str, optional): Name of platform of job. Defaults to "NA". + job_id (int, optional): Id of job. Defaults to 0. + platform_object (obj, optional): Platform object. Defaults to None. Returns: - [type]: [description] + Boolean/None: True if success, None if exception. """ try: + # print("Writing finish time \t" + str(job_name) + "\t" + str(finish)) job_data_last = self.get_job_data_last(job_name) + energy = 0 + submit_time = start_time = finish_time = 0 # Updating existing row if job_data_last: - if job_data_last.finish == 0: - # Call Slurm here, update times. - job_data_last.finish = finish - job_data_last.status = status - job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + job_data_last = job_data_last[0] + # if job_data_last.finish == 0: + # Call Slurm here, update times. + if platform_object: + # print("There is platform object") + try: + if type(platform_object) is not str: + if platform_object.type == "slurm": + print("Checking Slurm for " + str(job_name)) + submit_time, start_time, finish_time, energy = platform_object.check_job_energy(job_id) + except Exception as exp: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) + energy = 0 + job_data_last.finish = int(finish_time) if finish_time > 0 else int(finish) + job_data_last.status = status + job_data_last.job_id = job_id + job_data_last.energy = energy + job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + if submit_time > 0 and start_time > 0: + job_data_last.submit = int(submit_time) + job_data_last.start = int(start_time) + rowid = self._update_finish_job_data_plus(job_data_last) + else: rowid = self._update_finish_job_data(job_data_last) - return True + return True # It is necessary to create a new row submit_inserted = self.write_submit_time( - job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk) + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) write_inserted = self.write_start_time(job_name, finish, status, ncpus, - wallclock, qos, date, member, section, chunk) + wallclock, qos, date, member, section, chunk, platform, job_id) if submit_inserted and write_inserted: self.write_finish_time( - job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk) + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) else: return None except Exception as exp: - print(traceback.format_exc()) - print(exp) + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) return None def get_all_job_data(self): @@ -296,37 +383,55 @@ class JobDataStructure(): current_table = self._get_all_job_data() current_job_data = dict() for item in current_table: - _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last = item - self.jobdata_list.add_jobdata(JobData(_id, _counter, _job_name, _created, _modified, - _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last)) + # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item + job_item = JobItem(*item) + self.jobdata_list.add_jobdata(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id)) else: raise Exception("Job data folder not found :" + str(self.jobdata_path)) except Exception as exp: - print(traceback.format_exc()) - print(exp) + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) + return None def get_job_data(self, job_name): + """Retrieves all the rows that have the same job_name + + Args: + job_name (str): [description] + + Raises: + Exception: If path to data folder does not exist + + Returns: + [type]: None if error, list of jobs if successful + """ try: job_data = list() if os.path.exists(self.folder_path): current_job = self._get_job_data(job_name) for item in current_job: - _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last = item - job_data.append(JobData(_id, _counter, _job_name, _created, _modified, - _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last)) + # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item + job_item = JobItem(*item) + job_data.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id)) + # job_data.append(JobData(_id, _counter, _job_name, _created, _modified, + # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform)) return job_data else: raise Exception("Job data folder not found :" + str(self.jobdata_path)) except Exception as exp: - print(traceback.format_exc()) - print(exp) + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) return None def get_job_data_last(self, job_name): - """[summary] + """ Returns latest jobdata row for a job_name. The current version. Args: job_name ([type]): [description] @@ -335,36 +440,54 @@ class JobDataStructure(): Exception: [description] Returns: - [type]: [description] + [type]: None if error, JobData if success """ try: + jobdata = list() if os.path.exists(self.folder_path): current_job_last = self._get_job_data_last(job_name) if current_job_last: - _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last = current_job_last - return JobData(_id, _counter, _job_name, _created, _modified, - _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last) + for current in current_job_last: + job_item = JobItem(*current) + # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = current_job_last + # return JobData(_id, _counter, _job_name, _created, _modified, + # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform) + jobdata.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id)) + return jobdata else: return None else: raise Exception("Job data folder not found :" + str(self.jobdata_path)) except Exception as exp: - print(traceback.format_exc()) - print(exp) + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) return None def _deactivate_current_last(self, jobdata): + """Sets last = 0 to row with id + + Args: + jobdata ([type]): [description] + + Returns: + [type]: [description] + """ try: - sql = ''' UPDATE job_data SET last=0, modified = ? WHERE id = ?''' - tuplerow = (jobdata.modified, jobdata._id) - cur = self.conn.cursor() - cur.execute(sql, tuplerow) - self.conn.commit() - return cur.lastrowid - except Exception as exp: - print(traceback.format_exc()) - print("Error on Update : " + str(type(e).__name__)) + if self.conn: + sql = ''' UPDATE job_data SET last=0, modified = ? WHERE id = ?''' + tuplerow = (jobdata.modified, jobdata._id) + cur = self.conn.cursor() + cur.execute(sql, tuplerow) + self.conn.commit() + return cur.lastrowid + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Insert : " + str(type(e).__name__)) return None def _update_start_job_data(self, jobdata): @@ -378,39 +501,84 @@ class JobDataStructure(): """ # current_time = try: - sql = ''' UPDATE job_data SET start=?, modified=? WHERE id=? ''' - cur = self.conn.cursor() - cur.execute(sql, (int(jobdata.start), - jobdata.modified, jobdata._id)) - self.conn.commit() - return cur.lastrowid + if self.conn: + sql = ''' UPDATE job_data SET start=?, modified=?, job_id=?, status=? WHERE id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (int(jobdata.start), + jobdata.modified, jobdata.job_id, jobdata.status, jobdata._id)) + self.conn.commit() + return cur.lastrowid + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Insert : " + str(type(e).__name__)) + return None + + def _update_finish_job_data_plus(self, jobdata): + """Updates the finish job data, also updates submit, start times. + + Args: + jobdata (JobData): JobData object + + Returns: + int/None: lastrowid if success, None if error + """ + try: + if self.conn: + sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=? WHERE id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (jobdata.submit, jobdata.start, jobdata.finish, jobdata.modified, jobdata.job_id, + jobdata.status, jobdata.energy, jobdata._id)) + self.conn.commit() + return cur.lastrowid + return None except sqlite3.Error as e: - print(traceback.format_exc()) - print("Error on Insert : " + str(type(e).__name__)) + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Update : " + str(type(e).__name__)) return None def _update_finish_job_data(self, jobdata): + """Update register with id. Updates finish, modified, status. + + Args: + jobdata ([type]): [description] + + Returns: + [type]: None if error, lastrowid if success + """ try: - sql = ''' UPDATE job_data SET finish=?, modified=?, status=? WHERE id=? ''' - cur = self.conn.cursor() - cur.execute(sql, (jobdata.finish, jobdata.modified, - jobdata.status, jobdata._id)) - self.conn.commit() - return cur.lastrowid + if self.conn: + # print("Updating finish time") + sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=? WHERE id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (jobdata.finish, jobdata.modified, jobdata.job_id, + jobdata.status, jobdata.energy, jobdata._id)) + self.conn.commit() + return cur.lastrowid + return None except sqlite3.Error as e: - print(traceback.format_exc()) - print("Error on Insert : " + str(type(e).__name__)) + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Update : " + str(type(e).__name__)) return None def _insert_job_data(self, jobdata): """[summary] + + Args: + jobdata ([type]): JobData object + + Returns: + [type]: None if error, lastrowid if correct """ try: if self.conn: #print("preparing to insert") - sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' + sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' tuplerow = (jobdata.counter, jobdata.job_name, jobdata.created, jobdata.modified, jobdata.submit, jobdata.start, - jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last) + jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id) cur = self.conn.cursor() #print("pre insert") cur.execute(sql, tuplerow) @@ -418,11 +586,13 @@ class JobDataStructure(): #print("Inserted " + str(jobdata.job_name)) return cur.lastrowid else: - print("Not a valid connection.") + #print("Not a valid connection.") return None except sqlite3.Error as e: - print(traceback.format_exc()) - print("Error on Insert : " + str(type(e).__name__)) + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Insert : " + str(type(e).__name__) + + "\t " + str(jobdata.job_name) + "\t" + str(jobdata.counter)) return None def _get__all_job_data(self): @@ -437,53 +607,73 @@ class JobDataStructure(): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last FROM job_data") + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id FROM job_data") rows = cur.fetchall() return rows else: - raise Exception("Not a valid connection") - except Exception as exp: - print(traceback.format_exc()) + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Select : " + str(type(e).__name__)) return list() def _get_job_data(self, job_name): """[summary] + + Args: + job_name ([type]): [description] + + Returns: + [type]: None if error, list of tuple if found (list can be empty) """ try: if self.conn: self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last FROM job_data WHERE job_name=?", (job_name,)) + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() # print(rows) return rows else: - raise Exception("Not a valid connection") - except Exception as exp: - print(traceback.format_exc()) + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Select : " + str(type(e).__name__)) return None def _get_job_data_last(self, job_name): + """Returns the latest row for a job_name. The current version. + + Args: + job_name ([type]): [description] + + Returns: + [type]: [description] + """ try: if self.conn: self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last FROM job_data WHERE last=1 and job_name=?", (job_name,)) + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() if rows and len(rows) > 0: - return rows[0] + return rows else: return None else: - raise Exception("Not a valid connection") - except Exception as exp: - print(traceback.format_exc()) + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Select : " + str(type(e).__name__)) return None def _get_maxcounter_jobdata(self): - """[summary] + """Return the maxcounter of the experiment Returns: [type]: [description] @@ -497,11 +687,15 @@ class JobDataStructure(): if len(rows) > 0: #print("Row " + str(rows[0])) result, = rows[0] - return int(result) + return int(result) if result else 1 else: - return None - except Exception as exp: - print(traceback.format_exc()) + # Starting value + return 1 + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Select Max : " + str(type(e).__name__)) return None def create_table(self): @@ -515,9 +709,15 @@ class JobDataStructure(): c = self.conn.cursor() c.execute(self.create_table_query) else: - raise Exception("Not a valid connection") - except Exception as e: - print(e) + raise IOError("Not a valid connection") + except IOError as exp: + Log.warning(exp) + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Insert : " + str(type(e).__name__)) + return None def create_connection(self, db_file): """ diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 83118bad8..3c1949b41 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -965,7 +965,7 @@ class Job(object): f.write(date2str(datetime.datetime.now(), 'S')) # Writing database JobDataStructure(self.expid).write_submit_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id) def write_start_time(self): """ @@ -987,7 +987,7 @@ class Job(object): f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) # Writing database JobDataStructure(self.expid).write_start_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id) return True def write_end_time(self, completed): @@ -1019,7 +1019,7 @@ class Job(object): final_status = "FAILED" f.write('FAILED') JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform) def check_started_after(self, date_limit): """ diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 96b3f8c47..88eb34a5d 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -38,7 +38,6 @@ class ParamikoPlatform(Platform): self.submit_cmd = "" self._ftpChannel = None - @property def header(self): """ @@ -58,6 +57,7 @@ class ParamikoPlatform(Platform): :rtype: object """ return self._wrapper + def restore_connection(self): connected = True if self._ssh is None: @@ -68,15 +68,15 @@ class ParamikoPlatform(Platform): while connected == False and retry < retries: if self.connect(True): connected = True - retry+=1 + retry += 1 if not connected: - Log.error('Can not create ssh or sftp connection to {0}: Connection could not be established to platform {1}\n Please, check your expid platform.conf to see if there are mistakes in the configuration\n Also Ensure that the login node listed on HOST parameter is available(try to connect via ssh on a terminal)\n Also you can put more than one host using a comma as separator', self.host,self.name) - Log.critical('Experiment cant no continue without unexpected behaviour, Stopping Autosubmit') + Log.error('Can not create ssh or sftp connection to {0}: Connection could not be established to platform {1}\n Please, check your expid platform.conf to see if there are mistakes in the configuration\n Also Ensure that the login node listed on HOST parameter is available(try to connect via ssh on a terminal)\n Also you can put more than one host using a comma as separator', self.host, self.name) + Log.critical( + 'Experiment cant no continue without unexpected behaviour, Stopping Autosubmit') exit(0) return connected - - def connect(self,reconnect=False): + def connect(self, reconnect=False): """ Creates ssh connection to host @@ -96,26 +96,29 @@ class ParamikoPlatform(Platform): self._host_config = self._ssh_config.lookup(self.host) if "," in self._host_config['hostname']: if reconnect: - self._host_config['hostname'] = random.choice(self._host_config['hostname'].split(',')[1:]) + self._host_config['hostname'] = random.choice( + self._host_config['hostname'].split(',')[1:]) else: - self._host_config['hostname'] = self._host_config['hostname'].split(',')[0] + self._host_config['hostname'] = self._host_config['hostname'].split(',')[ + 0] if 'identityfile' in self._host_config: self._host_config_id = self._host_config['identityfile'] if 'proxycommand' in self._host_config: - self._proxy = paramiko.ProxyCommand(self._host_config['proxycommand']) + self._proxy = paramiko.ProxyCommand( + self._host_config['proxycommand']) self._ssh.connect(self._host_config['hostname'], 22, username=self.user, key_filename=self._host_config_id, sock=self._proxy) else: self._ssh.connect(self._host_config['hostname'], 22, username=self.user, key_filename=self._host_config_id) - self.transport = paramiko.Transport((self._host_config['hostname'], 22)) + self.transport = paramiko.Transport( + (self._host_config['hostname'], 22)) self.transport.connect(username=self.user) self._ftpChannel = self._ssh.open_sftp() return True except: return False - def check_completed_files(self, sections=None): if self.host == 'localhost': @@ -130,7 +133,7 @@ class ParamikoPlatform(Platform): else: command += " -name *_COMPLETED" - if self.send_command(command,True): + if self.send_command(command, True): return self._ssh_output else: return None @@ -139,12 +142,14 @@ class ParamikoPlatform(Platform): #command = "rm" log_dir = os.path.join(self.tmp_path, 'LOG_{0}'.format(self.expid)) - multiple_delete_previous_run = os.path.join(log_dir,"multiple_delete_previous_run.sh") + multiple_delete_previous_run = os.path.join( + log_dir, "multiple_delete_previous_run.sh") if os.path.exists(multiple_delete_previous_run): open(multiple_delete_previous_run, 'w+').write("rm -f "+filenames) os.chmod(multiple_delete_previous_run, 0o770) self.send_file(multiple_delete_previous_run, False) - command = os.path.join(self.get_files_path(),"multiple_delete_previous_run.sh") + command = os.path.join(self.get_files_path(), + "multiple_delete_previous_run.sh") if self.send_command(command, ignore_log=True): return self._ssh_output @@ -167,10 +172,10 @@ class ParamikoPlatform(Platform): try: local_path = os.path.join(os.path.join(self.tmp_path, filename)) - remote_path = os.path.join(self.get_files_path(), os.path.basename(filename)) + remote_path = os.path.join( + self.get_files_path(), os.path.basename(filename)) self._ftpChannel.put(local_path, remote_path) - self._ftpChannel.chmod(remote_path,os.stat(local_path).st_mode) - + self._ftpChannel.chmod(remote_path, os.stat(local_path).st_mode) return True except BaseException as e: @@ -236,22 +241,22 @@ class ParamikoPlatform(Platform): try: #ftp = self._ssh.open_sftp() - self._ftpChannel.remove(os.path.join(self.get_files_path(), filename)) - #ftp.close() + self._ftpChannel.remove(os.path.join( + self.get_files_path(), filename)) + # ftp.close() return True except IOError: return False except BaseException as e: if e.lower().contains("garbage"): - Log.error("Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ") + Log.error( + "Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ") raise - Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename))) + Log.debug('Could not remove file {0}'.format( + os.path.join(self.get_files_path(), filename))) return False - - - - def move_file(self, src, dest,must_exist=False): + def move_file(self, src, dest, must_exist=False): """ Moves a file on the platform (includes .err and .out) :param src: source name @@ -263,17 +268,17 @@ class ParamikoPlatform(Platform): if not self.restore_connection(): return False try: - path_root=self.get_files_path() + path_root = self.get_files_path() self._ftpChannel.rename(os.path.join(path_root, src), os.path.join(path_root, dest)) return True except: if must_exist: - raise Exception('File {0} does not exists'.format(os.path.join(self.get_files_path(), src))) + raise Exception('File {0} does not exists'.format( + os.path.join(self.get_files_path(), src))) else: return False - def submit_job(self, job, script_name, hold=False): """ Submit a job from a given job object. @@ -297,6 +302,22 @@ class ParamikoPlatform(Platform): return int(job_id) else: return None + + def check_job_energy(self, job_id): + """ + Checks job energy and return values. Defined in child classes. + + Args: + job_id (int): Id of Job + + Returns: + 4-tuple (int, int, int, int): submit time, start time, finish time, energy + """ + check_energy_cmd = self.get_job_energy_cmd(job_id) + self.send_command(check_energy_cmd) + return self.parse_job_finish_data( + self.get_ssh_output(), job_id) + def submit_Script(self, hold=False): """ Sends a SubmitfileScript, exec in platform and retrieve the Jobs_ID. @@ -323,19 +344,23 @@ class ParamikoPlatform(Platform): job_id = job.id job_status = Status.UNKNOWN if type(job_id) is not int and type(job_id) is not str: - Log.error('check_job() The job id ({0}) is not an integer neither a string.', job_id) + Log.error( + 'check_job() The job id ({0}) is not an integer neither a string.', job_id) job.new_status = job_status - sleep_time=5 + sleep_time = 5 while not (self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0) or (self.get_ssh_output() == "" and retries >= 0): retries -= 1 - Log.debug('Retrying check job command: {0}', self.get_checkjob_cmd(job_id)) + Log.debug( + 'Retrying check job command: {0}', self.get_checkjob_cmd(job_id)) Log.debug('retries left {0}', retries) Log.debug('Will be retrying in {0} seconds', sleep_time) sleep(sleep_time) sleep_time = sleep_time+5 if retries >= 0: - Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id)) - job_status = self.parse_job_output(self.get_ssh_output()).strip("\n") + Log.debug( + 'Successful check job command: {0}', self.get_checkjob_cmd(job_id)) + job_status = self.parse_job_output( + self.get_ssh_output()).strip("\n") # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED'] or retries == 0: job_status = Status.COMPLETED @@ -350,16 +375,20 @@ class ParamikoPlatform(Platform): else: job_status = Status.UNKNOWN else: - Log.error(" check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id)) + Log.error( + " check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id)) job_status = Status.UNKNOWN - Log.error('check_job() The job id ({0}) status is {1}.', job_id, job_status) + Log.error( + 'check_job() The job id ({0}) status is {1}.', job_id, job_status) job.new_status = job_status - def _check_jobid_in_queue(self,ssh_output,job_list_cmd): + + def _check_jobid_in_queue(self, ssh_output, job_list_cmd): for job in job_list_cmd[:-1].split(','): if job not in ssh_output: return False return True - def check_Alljobs(self, job_list,job_list_cmd,remote_logs, retries=5): + + def check_Alljobs(self, job_list, job_list_cmd, remote_logs, retries=5): """ Checks jobs running status @@ -373,23 +402,24 @@ class ParamikoPlatform(Platform): """ cmd = self.get_checkAlljobs_cmd(job_list_cmd) - sleep_time=5 - while not (self.send_command(cmd) and retries >= 0) or ( not self._check_jobid_in_queue(self.get_ssh_output(),job_list_cmd) and retries >= 0): + sleep_time = 5 + while not (self.send_command(cmd) and retries >= 0) or (not self._check_jobid_in_queue(self.get_ssh_output(), job_list_cmd) and retries >= 0): retries -= 1 Log.debug('Retrying check job command: {0}', cmd) Log.debug('retries left {0}', retries) Log.debug('Will be retrying in {0} seconds', sleep_time) sleep(sleep_time) - sleep_time=sleep_time+5 + sleep_time = sleep_time+5 job_list_status = self.get_ssh_output() - Log.debug('Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output) + Log.debug( + 'Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output) if retries >= 0: in_queue_jobs = [] list_queue_jobid = "" for job in job_list: job_id = job.id - job_status = self.parse_Alljobs_output(job_list_status,job_id) + job_status = self.parse_Alljobs_output(job_list_status, job_id) # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED']: job_status = Status.COMPLETED @@ -397,7 +427,7 @@ class ParamikoPlatform(Platform): job_status = Status.RUNNING elif job_status in self.job_status['QUEUING']: if job.hold: - job_status = Status.HELD # release? + job_status = Status.HELD # release? else: job_status = Status.QUEUING list_queue_jobid += str(job.id) + ',' @@ -410,39 +440,58 @@ class ParamikoPlatform(Platform): else: job_status = Status.UNKNOWN - Log.error('check_job() The job id ({0}) status is {1}.', job_id, job_status) - job.new_status=job_status + Log.error( + 'check_job() The job id ({0}) status is {1}.', job_id, job_status) + job.new_status = job_status reason = str() if self.type == 'slurm' and len(in_queue_jobs) > 0: - cmd=self.get_queue_status_cmd(list_queue_jobid) + cmd = self.get_queue_status_cmd(list_queue_jobid) self.send_command(cmd) - queue_status=self._ssh_output + queue_status = self._ssh_output for job in in_queue_jobs: - reason = self.parse_queue_reason(queue_status,job.id) + reason = self.parse_queue_reason(queue_status, job.id) if job.queuing_reason_cancel(reason): - Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", job.name, reason) - self.send_command(self.platform.cancel_cmd + " {0}".format(job.id)) + Log.error( + "Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", job.name, reason) + self.send_command( + self.platform.cancel_cmd + " {0}".format(job.id)) job.new_status = Status.FAILED job.update_status(remote_logs) return elif reason == '(JobHeldUser)': - job.new_status=Status.HELD + job.new_status = Status.HELD if not job.hold: - self.send_command("scontrol release "+"{0}".format(job.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS - Log.info("Job {0} is being released (id:{1}) ", job.name,job.id) + # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS + self.send_command( + "scontrol release "+"{0}".format(job.id)) + Log.info( + "Job {0} is being released (id:{1}) ", job.name, job.id) else: Log.info("Job {0} is HELD", job.name) elif reason == '(JobHeldAdmin)': - Log.info("Job {0} Failed to be HELD, canceling... ", job.name) + Log.info( + "Job {0} Failed to be HELD, canceling... ", job.name) job.new_status = Status.WAITING - job.platform.send_command(job.platform.cancel_cmd + " {0}".format(job.id)) + job.platform.send_command( + job.platform.cancel_cmd + " {0}".format(job.id)) else: for job in job_list: job_status = Status.UNKNOWN - Log.warning('check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status) - job.new_status=job_status + Log.warning( + 'check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status) + job.new_status = job_status + + def get_job_energy_cmd(self, job_id): + """ + Returns command to check job energy on remote platforms + :param job_id: id of job to check + :type job_id: int + :return: command to check job status + :rtype: str + """ + raise NotImplementedError def get_checkjob_cmd(self, job_id): """ @@ -465,6 +514,7 @@ class ParamikoPlatform(Platform): :rtype: str """ raise NotImplementedError + def send_command(self, command, ignore_log=False): """ Sends given command to HPC @@ -478,7 +528,8 @@ class ParamikoPlatform(Platform): if not self.restore_connection(): return False if "-rP" in command or "find" in command or "convertLink" in command: - timeout = 60*60 # Max Wait 1hour if the command is a copy or simbolic links ( migrate can trigger long times) + # Max Wait 1hour if the command is a copy or simbolic links ( migrate can trigger long times) + timeout = 60*60 elif "rm" in command: timeout = 60/2 else: @@ -490,7 +541,8 @@ class ParamikoPlatform(Platform): stdin.close() channel.shutdown_write() stdout_chunks = [] - stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) + stdout_chunks.append(stdout.channel.recv( + len(stdout.channel.in_buffer))) stderr_readlines = [] while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): @@ -499,12 +551,14 @@ class ParamikoPlatform(Platform): readq, _, _ = select.select([stdout.channel], [], [], 2) for c in readq: if c.recv_ready(): - stdout_chunks.append(stdout.channel.recv(len(c.in_buffer))) + stdout_chunks.append( + stdout.channel.recv(len(c.in_buffer))) #stdout_chunks.append(" ") got_chunk = True if c.recv_stderr_ready(): # make sure to read stderr to prevent stall - stderr_readlines.append(stderr.channel.recv_stderr(len(c.in_stderr_buffer))) + stderr_readlines.append( + stderr.channel.recv_stderr(len(c.in_stderr_buffer))) #stdout_chunks.append(" ") got_chunk = True if not got_chunk and stdout.channel.exit_status_ready() and not stderr.channel.recv_stderr_ready() and not stdout.channel.recv_ready(): @@ -522,16 +576,20 @@ class ParamikoPlatform(Platform): self._ssh_output += s for errorLine in stderr_readlines: if errorLine.find("submission failed") != -1 or errorLine.find("git clone") != -1: - Log.critical('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) + Log.critical('Command {0} in {1} warning: {2}', command, self.host, '\n'.join( + stderr_readlines)) return False if not ignore_log: if len(stderr_readlines) > 0: - Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) + Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join( + stderr_readlines)) else: - Log.debug('Command {0} in {1} successful with out message: {2}', command, self.host, self._ssh_output) + Log.debug('Command {0} in {1} successful with out message: {2}', + command, self.host, self._ssh_output) return True except BaseException as e: - Log.error('Can not send command {0} to {1}: {2}', command, self.host, e.message) + Log.error( + 'Can not send command {0} to {1}: {2}', command, self.host, e.message) return False def parse_job_output(self, output): @@ -544,7 +602,21 @@ class ParamikoPlatform(Platform): :rtype: str """ raise NotImplementedError - def parse_Alljobs_output(self, output,job_id): + + def parse_job_finish_data(self, output, job_id): + """ + Parses check job command output so it can be interpreted by autosubmit + + :param output: output to parse + :type output: str + :param job_id: Id of Job + :type job_id: Integer + :return: job status + :rtype: str + """ + raise NotImplementedError + + def parse_Alljobs_output(self, output, job_id): """ Parses check jobs command output so it can be interpreted by autosubmit :param output: output to parse @@ -561,8 +633,7 @@ class ParamikoPlatform(Platform): def get_submit_script(self): pass - - def get_submit_cmd(self, job_script, job_type,hold=False): + def get_submit_cmd(self, job_script, job_type, hold=False): """ Get command to add job to scheduler @@ -666,26 +737,37 @@ class ParamikoPlatform(Platform): header = header.replace('%ERR_LOG_DIRECTIVE%', err_filename) if hasattr(self.header, 'get_queue_directive'): - header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) + header = header.replace( + '%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) if hasattr(self.header, 'get_tasks_per_node'): - header = header.replace('%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job)) + header = header.replace( + '%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job)) if hasattr(self.header, 'get_threads_per_task'): - header = header.replace('%THREADS%', self.header.get_threads_per_task(job)) + header = header.replace( + '%THREADS%', self.header.get_threads_per_task(job)) if hasattr(self.header, 'get_scratch_free_space'): - header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) + header = header.replace( + '%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) if hasattr(self.header, 'get_custom_directives'): - header = header.replace('%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) + header = header.replace( + '%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) if hasattr(self.header, 'get_exclusivity'): - header = header.replace('%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) + header = header.replace( + '%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) if hasattr(self.header, 'get_account_directive'): - header = header.replace('%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) + header = header.replace( + '%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) if hasattr(self.header, 'get_memory_directive'): - header = header.replace('%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job)) + header = header.replace( + '%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job)) if hasattr(self.header, 'get_memory_per_task_directive'): - header = header.replace('%MEMORY_PER_TASK_DIRECTIVE%', self.header.get_memory_per_task_directive(job)) + header = header.replace( + '%MEMORY_PER_TASK_DIRECTIVE%', self.header.get_memory_per_task_directive(job)) if hasattr(self.header, 'get_hyperthreading_directive'): - header = header.replace('%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job)) + header = header.replace( + '%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job)) return header + def closeConnection(self): if self._ftpChannel is not None: self._ftpChannel.close() @@ -695,7 +777,6 @@ class ParamikoPlatform(Platform): self.transport.stop_thread() self.transport.sys.exit(0) - def check_remote_log_dir(self): """ Creates log dir on remote host @@ -704,20 +785,27 @@ class ParamikoPlatform(Platform): return False if self.type == "slurm": try: - self._ftpChannel.chdir(self.remote_log_dir) # Test if remote_path exists + # Test if remote_path exists + self._ftpChannel.chdir(self.remote_log_dir) except IOError: if self.send_command(self.get_mkdir_cmd()): - Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host) + Log.debug('{0} has been created on {1} .', + self.remote_log_dir, self.host) else: - Log.error('Could not create the DIR {0} on HPC {1}'.format(self.remote_log_dir, self.host)) + Log.error('Could not create the DIR {0} on HPC {1}'.format( + self.remote_log_dir, self.host)) except: Log.critical("Garbage detected") raise else: if self.send_command(self.get_mkdir_cmd()): - Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host) + Log.debug('{0} has been created on {1} .', + self.remote_log_dir, self.host) else: - Log.error('Could not create the DIR {0} on HPC {1}'.format(self.remote_log_dir, self.host)) + Log.error('Could not create the DIR {0} on HPC {1}'.format( + self.remote_log_dir, self.host)) + + class ParamikoPlatformException(Exception): """ Exception raised from HPC queues diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 9b8df22f9..4146c2c37 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -22,7 +22,8 @@ class Platform(object): self.expid = expid self.name = name self.config = config - self.tmp_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) + self.tmp_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) self._serial_platform = None self._serial_queue = None self._default_queue = None @@ -195,21 +196,22 @@ class Platform(object): :rtype: bool """ raise NotImplementedError - + # Executed when calling from Job def get_logs_files(self, exp_id, remote_logs): """ Get the given LOGS files - + :param exp_id: experiment id :type exp_id: str :param remote_logs: names of the log files :type remote_logs: (str, str) """ (job_out_filename, job_err_filename) = remote_logs - self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) + self.get_files([job_out_filename, job_err_filename], + False, 'LOG_{0}'.format(exp_id)) - def get_completed_files(self, job_name, retries=0,recovery=False): + def get_completed_files(self, job_name, retries=0, recovery=False): """ Get the COMPLETED file of the given job @@ -234,7 +236,6 @@ class Platform(object): else: return False - def remove_stat_file(self, job_name): """ Removes *STAT* files from remote @@ -264,6 +265,7 @@ class Platform(object): Log.debug('{0} been removed', filename) return True return False + def check_file_exists(self, src): return True @@ -279,7 +281,8 @@ class Platform(object): :rtype: bool """ filename = job_name + '_STAT' - stat_local_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) + stat_local_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) if self.check_file_exists(filename): @@ -297,7 +300,8 @@ class Platform(object): :rtype: str """ if self.type == "local": - path = os.path.join(self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid)) + path = os.path.join( + self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid)) else: path = os.path.join(self.root_dir, 'LOG_{0}'.format(self.expid)) return path @@ -328,8 +332,19 @@ class Platform(object): :rtype: autosubmit.job.job_common.Status """ raise NotImplementedError + def closeConnection(self): return + + def retrieve_energy_data(self, jobid): + """ + Retrieves energy data from job + + :return: 4-tuple (submit, start, finish, energy) + :rtype: 4-tuple(int, int, int, int) + """ + raise NotImplementedError + def write_jobid(self, jobid, complete_path): """ Writes Job id in an out file. @@ -342,10 +357,10 @@ class Platform(object): :rtype: Boolean """ try: - + title_job = "[INFO] JOBID=" + str(jobid) - if os.path.exists(complete_path): + if os.path.exists(complete_path): file_type = complete_path[-3:] if file_type == "out" or file_type == "err": with open(complete_path, "r+") as f: @@ -353,16 +368,15 @@ class Platform(object): first_line = f.readline() # Not rewrite if not first_line.startswith("[INFO] JOBID="): - content = f.read() + content = f.read() # Write again (Potentially slow) #start = time() - #Log.info("Attempting job identification of " + str(jobid)) - f.seek(0,0) - f.write(title_job + "\n\n" + first_line + content) - f.close() - #finish = time() - #Log.info("Job correctly identified in " + str(finish - start) + " seconds") + #Log.info("Attempting job identification of " + str(jobid)) + f.seek(0, 0) + f.write(title_job + "\n\n" + first_line + content) + f.close() + #finish = time() + #Log.info("Job correctly identified in " + str(finish - start) + " seconds") except Exception as ex: Log.info("Writing Job Id Failed : " + str(ex)) - diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index d382d80c7..555f87543 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -19,6 +19,8 @@ import os from time import sleep +from time import mktime +from datetime import datetime from xml.dom.minidom import parseString @@ -44,7 +46,8 @@ class SlurmPlatform(ParamikoPlatform): self.job_status['COMPLETED'] = ['COMPLETED'] self.job_status['RUNNING'] = ['RUNNING'] self.job_status['QUEUING'] = ['PENDING', 'CONFIGURING', 'RESIZING'] - self.job_status['FAILED'] = ['FAILED', 'CANCELLED','CANCELLED+', 'NODE_FAIL', 'PREEMPTED', 'SUSPENDED', 'TIMEOUT','OUT_OF_MEMORY','OUT_OF_ME+','OUT_OF_ME'] + self.job_status['FAILED'] = ['FAILED', 'CANCELLED', 'CANCELLED+', 'NODE_FAIL', + 'PREEMPTED', 'SUSPENDED', 'TIMEOUT', 'OUT_OF_MEMORY', 'OUT_OF_ME+', 'OUT_OF_ME'] self._pathdir = "\$HOME/LOG_" + self.expid self._allow_arrays = False self._allow_wrappers = True @@ -52,7 +55,8 @@ class SlurmPlatform(ParamikoPlatform): self.config = config exp_id_path = os.path.join(config.LOCAL_ROOT_DIR, self.expid) tmp_path = os.path.join(exp_id_path, "tmp") - self._submit_script_path = os.path.join(tmp_path , config.LOCAL_ASLOG_DIR,"submit_"+self.name+".sh") + self._submit_script_path = os.path.join( + tmp_path, config.LOCAL_ASLOG_DIR, "submit_"+self.name+".sh") self._submit_script_file = open(self._submit_script_path, 'w').close() def open_submit_script(self): @@ -62,10 +66,9 @@ class SlurmPlatform(ParamikoPlatform): def get_submit_script(self): self._submit_script_file.close() os.chmod(self._submit_script_path, 0o750) - return os.path.join(self.config.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path)) + return os.path.join(self.config.LOCAL_ASLOG_DIR, os.path.basename(self._submit_script_path)) - - def submit_Script(self,hold=False): + def submit_Script(self, hold=False): """ Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once. @@ -74,30 +77,34 @@ class SlurmPlatform(ParamikoPlatform): :return: job id for submitted jobs :rtype: list(str) """ - self.send_file(self.get_submit_script(),False) - cmd = os.path.join(self.get_files_path(),os.path.basename(self._submit_script_path)) + self.send_file(self.get_submit_script(), False) + cmd = os.path.join(self.get_files_path(), + os.path.basename(self._submit_script_path)) if self.send_command(cmd): jobs_id = self.get_submitted_job_id(self.get_ssh_output()) return jobs_id else: return None + def update_cmds(self): """ Updates commands for platforms """ - self.root_dir = os.path.join(self.scratch, self.project, self.user, self.expid) + self.root_dir = os.path.join( + self.scratch, self.project, self.user, self.expid) self.remote_log_dir = os.path.join(self.root_dir, "LOG_" + self.expid) self.cancel_cmd = "scancel" self._checkhost_cmd = "echo 1" - self._submit_cmd = 'sbatch -D {1} {1}/'.format(self.host, self.remote_log_dir) - self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format(self.host, self.remote_log_dir) + self._submit_cmd = 'sbatch -D {1} {1}/'.format( + self.host, self.remote_log_dir) + self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format( + self.host, self.remote_log_dir) self.put_cmd = "scp" self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir - def get_checkhost_cmd(self): return self._checkhost_cmd @@ -109,15 +116,44 @@ class SlurmPlatform(ParamikoPlatform): def parse_job_output(self, output): return output.strip().split(' ')[0].strip() - - def parse_Alljobs_output(self, output,job_id): - status =[x.split()[1] for x in output.splitlines() if x.split()[0] == str(job_id)] + + def parse_job_finish_data(self, output, job_id): + try: + lines = output.split("\n") + if len(lines) > 0: + line = lines[0].strip().split() + submit = int(mktime(datetime.strptime(line[2], "%Y-%m-%dT%H:%M:%S").timetuple())) + start = int(mktime(datetime.strptime(line[3], "%Y-%m-%dT%H:%M:%S").timetuple())) + finish = int(mktime(datetime.strptime(line[4], "%Y-%m-%dT%H:%M:%S").timetuple())) + joules = int(float(str(line[5])[:-1]) * 1000 if len(line[5]) > 0 else 0) + return (submit, start, finish, joules) + # for line in output.split("\n"): + # ilist = line.strip().split() + # #print(type(line)) + # print(job_id) + # print(ilist) + # if int(ilist[0]) == job_id: + # submit = int(mktime(datetime.strptime(ilist[2], "%Y-%m-%dT%H:%M:%S").timetuple())) + # start = int(mktime(datetime.strptime(ilist[3], "%Y-%m-%dT%H:%M:%S").timetuple())) + # finish = int(mktime(datetime.strptime(ilist[4], "%Y-%m-%dT%H:%M:%S").timetuple())) + # joules = int(float(str(ilist[5])[:-1]) * 1000 if len(ilist[5]) > 0 else 0) + + # return (submit, start, finish, int(joules)) + return (0,0,0,0) + except Exception as exp: + # On error return 4*0 + Log.warning(str(exp)) + return (0,0,0,0) + + #return str(output) + + def parse_Alljobs_output(self, output, job_id): + status = [x.split()[1] for x in output.splitlines() + if x.split()[0] == str(job_id)] if len(status) == 0: return status return status[0] - - def get_submitted_job_id(self, outputlines): if outputlines.find("failed") != -1: raise Exception(outputlines) @@ -125,6 +161,7 @@ class SlurmPlatform(ParamikoPlatform): for output in outputlines.splitlines(): jobs_id.append(int(output.split(' ')[3])) return jobs_id + def jobs_in_queue(self): dom = parseString('') jobs_xml = dom.getElementsByTagName("JB_job_number") @@ -132,31 +169,34 @@ class SlurmPlatform(ParamikoPlatform): def get_submit_cmd(self, job_script, job, hold=False): if not hold: - self._submit_script_file.write(self._submit_cmd + job_script + "\n") + self._submit_script_file.write( + self._submit_cmd + job_script + "\n") else: - self._submit_script_file.write(self._submit_hold_cmd + job_script + "\n" ) - - + self._submit_script_file.write( + self._submit_hold_cmd + job_script + "\n") def get_checkjob_cmd(self, job_id): return 'sacct -n -X -j {1} -o "State"'.format(self.host, job_id) def get_checkAlljobs_cmd(self, jobs_id): return "sacct -n -X -j {1} -o jobid,State".format(self.host, jobs_id) + def get_queue_status_cmd(self, job_id): return 'squeue -j {0} -o %A,%R'.format(job_id) + def get_job_energy_cmd(self, job_id): + return 'sacct -n -j {0} -o JobId,State,Submit,Start,End,ConsumedEnergy'.format(job_id) - def parse_queue_reason(self, output,job_id): - reason =[x.split(',')[1] for x in output.splitlines() if x.split(',')[0] == str(job_id)] + def parse_queue_reason(self, output, job_id): + reason = [x.split(',')[1] for x in output.splitlines() + if x.split(',')[0] == str(job_id)] if len(reason) > 0: return reason[0] return reason - @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="asthreads"): - if method =='srun': + def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads, method="asthreads"): + if method == 'srun': language = "#!/bin/bash" return \ language + """ @@ -181,7 +221,7 @@ class SlurmPlatform(ParamikoPlatform): else: language = "#!/usr/bin/env python" return \ - language+""" + language+""" ############################################################################### # {0} ############################################################################### @@ -199,13 +239,13 @@ class SlurmPlatform(ParamikoPlatform): # ############################################################################### """.format(filename, queue, project, wallclock, num_procs, dependency, - '\n'.ljust(13).join(str(s) for s in directives),threads) + '\n'.ljust(13).join(str(s) for s in directives), threads) @staticmethod def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list")""" - def check_file_exists(self,filename): + def check_file_exists(self, filename): if not self.restore_connection(): return False file_exist = False @@ -214,11 +254,13 @@ class SlurmPlatform(ParamikoPlatform): max_retries = 3 while not file_exist and retries < max_retries: try: - self._ftpChannel.stat(os.path.join(self.get_files_path(), filename)) # This return IOError if path doesn't exist + # This return IOError if path doesn't exist + self._ftpChannel.stat(os.path.join( + self.get_files_path(), filename)) file_exist = True except IOError: # File doesn't exist, retry in sleeptime Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(),filename)) + max_retries - retries, os.path.join(self.get_files_path(), filename)) sleep(sleeptime) sleeptime = sleeptime + 5 retries = retries + 1 @@ -227,4 +269,4 @@ class SlurmPlatform(ParamikoPlatform): file_exist = False # won't exist retries = 999 # no more retries - return file_exist \ No newline at end of file + return file_exist -- GitLab From 3fd26f6ec5f7f66f413eb566cd50c6372c549fd8 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Wed, 15 Jul 2020 11:53:39 +0200 Subject: [PATCH 3/4] Added extra_data column to database and version handling. --- autosubmit/database/db_jobdata.py | 78 +++++++++++++++++++-------- autosubmit/platforms/slurmplatform.py | 31 ++++++----- 2 files changed, 72 insertions(+), 37 deletions(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 64d0d20ed..3772481f2 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -28,14 +28,16 @@ import sqlite3 import copy import collections from datetime import datetime +from json import dumps from networkx import DiGraph from autosubmit.config.basicConfig import BasicConfig from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates -_debug = False +CURRENT_DB_VERSION = 10 +_debug = True JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish', - 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id']) + 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id','extra_data']) # JobItem = collections.namedtuple( # 'JobItem', 'id counter job_name created modified submit start finish status rowtype ncpus wallclock qos energy date section member chunk last platform') @@ -45,7 +47,7 @@ class JobData(): """Job Data object """ - def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=1, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0): + def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=1, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict()): """[summary] Args: @@ -93,6 +95,7 @@ class JobData(): self._platform = platform if platform and len( platform) > 0 else "NA" self.job_id = job_id if job_id else 0 + self.extra_data = dumps(extra_data) @property def submit(self): @@ -188,12 +191,15 @@ class JobDataStructure(): last INTEGER NOT NULL, platform TEXT NOT NULL, job_id INTEGER NOT NULL, + extra_data TEXT NOT NULL, UNIQUE(counter,job_name) );''') if not os.path.exists(self.database_path): open(self.database_path, "w") self.conn = self.create_connection(self.database_path) self.create_table() + if self._set_pragma_version(CURRENT_DB_VERSION): + Log.info("Database version set.") else: self.conn = self.create_connection(self.database_path) @@ -242,6 +248,7 @@ class JobDataStructure(): # Insert new last rowid = self._insert_job_data(JobData( 0, current_counter, job_name, None, None, submit, 0, 0, status, 1, ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id)) + # print(rowid) if rowid: return True else: @@ -285,14 +292,16 @@ class JobDataStructure(): job_data_last.status = status job_data_last.job_id = job_id job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') - rowid = self._update_start_job_data(job_data_last) - return rowid + _updated = self._update_start_job_data(job_data_last) + return _updated # It is necessary to create a new row submit_inserted = self.write_submit_time( job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) if submit_inserted: + # print("retro start") self.write_start_time(job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) + return True else: return None except Exception as exp: @@ -327,6 +336,7 @@ class JobDataStructure(): job_data_last = self.get_job_data_last(job_name) energy = 0 submit_time = start_time = finish_time = 0 + extra_data = dict() # Updating existing row if job_data_last: job_data_last = job_data_last[0] @@ -338,7 +348,7 @@ class JobDataStructure(): if type(platform_object) is not str: if platform_object.type == "slurm": print("Checking Slurm for " + str(job_name)) - submit_time, start_time, finish_time, energy = platform_object.check_job_energy(job_id) + submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy(job_id) except Exception as exp: Log.info(traceback.format_exc()) Log.warning(str(exp)) @@ -347,6 +357,7 @@ class JobDataStructure(): job_data_last.status = status job_data_last.job_id = job_id job_data_last.energy = energy + job_data_last.extra_data = dumps(extra_data) job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') if submit_time > 0 and start_time > 0: job_data_last.submit = int(submit_time) @@ -360,9 +371,12 @@ class JobDataStructure(): job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) write_inserted = self.write_start_time(job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) + #print(submit_inserted) + #print(write_inserted) if submit_inserted and write_inserted: + #print("retro finish") self.write_finish_time( - job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object) else: return None except Exception as exp: @@ -386,7 +400,7 @@ class JobDataStructure(): # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item job_item = JobItem(*item) self.jobdata_list.add_jobdata(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, - job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id)) + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) else: raise Exception("Job data folder not found :" + @@ -417,7 +431,7 @@ class JobDataStructure(): # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item job_item = JobItem(*item) job_data.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, - job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id)) + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) # job_data.append(JobData(_id, _counter, _job_name, _created, _modified, # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform)) return job_data @@ -453,7 +467,7 @@ class JobDataStructure(): # return JobData(_id, _counter, _job_name, _created, _modified, # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform) jobdata.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, - job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id)) + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) return jobdata else: return None @@ -507,7 +521,7 @@ class JobDataStructure(): cur.execute(sql, (int(jobdata.start), jobdata.modified, jobdata.job_id, jobdata.status, jobdata._id)) self.conn.commit() - return cur.lastrowid + return True return None except sqlite3.Error as e: if _debug == True: @@ -526,10 +540,10 @@ class JobDataStructure(): """ try: if self.conn: - sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=? WHERE id=? ''' + sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=? WHERE id=? ''' cur = self.conn.cursor() cur.execute(sql, (jobdata.submit, jobdata.start, jobdata.finish, jobdata.modified, jobdata.job_id, - jobdata.status, jobdata.energy, jobdata._id)) + jobdata.status, jobdata.energy, jobdata.extra_data, jobdata._id)) self.conn.commit() return cur.lastrowid return None @@ -551,10 +565,10 @@ class JobDataStructure(): try: if self.conn: # print("Updating finish time") - sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=? WHERE id=? ''' + sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=? WHERE id=? ''' cur = self.conn.cursor() cur.execute(sql, (jobdata.finish, jobdata.modified, jobdata.job_id, - jobdata.status, jobdata.energy, jobdata._id)) + jobdata.status, jobdata.energy, jobdata.extra_data, jobdata._id)) self.conn.commit() return cur.lastrowid return None @@ -576,9 +590,9 @@ class JobDataStructure(): try: if self.conn: #print("preparing to insert") - sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' + sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' tuplerow = (jobdata.counter, jobdata.job_name, jobdata.created, jobdata.modified, jobdata.submit, jobdata.start, - jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id) + jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id, jobdata.extra_data) cur = self.conn.cursor() #print("pre insert") cur.execute(sql, tuplerow) @@ -607,7 +621,7 @@ class JobDataStructure(): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id FROM job_data") + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data") rows = cur.fetchall() return rows else: @@ -632,7 +646,7 @@ class JobDataStructure(): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() # print(rows) return rows @@ -658,7 +672,7 @@ class JobDataStructure(): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() if rows and len(rows) > 0: return rows @@ -671,6 +685,28 @@ class JobDataStructure(): Log.info(traceback.format_exc()) Log.warning("Error on Select : " + str(type(e).__name__)) return None + + def _set_pragma_version(self, version = 2): + """Sets current version of the schema + + Args: + version (int, optional): Current Version. Defaults to 1. + + Returns: + Boolean/None: True if success, None if error + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute("pragma user_version={v:d}".format(v=version)) + self.conn.commit() + return True + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on version : " + str(type(e).__name__)) + return None def _get_maxcounter_jobdata(self): """Return the maxcounter of the experiment @@ -716,7 +752,7 @@ class JobDataStructure(): except sqlite3.Error as e: if _debug == True: Log.info(traceback.format_exc()) - Log.warning("Error on Insert : " + str(type(e).__name__)) + Log.warning("Error on create table : " + str(type(e).__name__)) return None def create_connection(self, db_file): diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 555f87543..251d64f21 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -119,31 +119,30 @@ class SlurmPlatform(ParamikoPlatform): def parse_job_finish_data(self, output, job_id): try: + detailed_data = dict() + output = output.strip() lines = output.split("\n") if len(lines) > 0: + for line in lines: + line = line.strip().split() + if len(line) > 0: + name = str(line[0]) + extra_data = { "energy" : str(line[5] if len(line) > 5 else "NA"), "MaxRSS" : str(line[6] if len(line) > 6 else "NA"), "AveRSS" : str(line[7] if len(line) > 6 else "NA")} + detailed_data[name] = extra_data + line = lines[0].strip().split() submit = int(mktime(datetime.strptime(line[2], "%Y-%m-%dT%H:%M:%S").timetuple())) start = int(mktime(datetime.strptime(line[3], "%Y-%m-%dT%H:%M:%S").timetuple())) finish = int(mktime(datetime.strptime(line[4], "%Y-%m-%dT%H:%M:%S").timetuple())) joules = int(float(str(line[5])[:-1]) * 1000 if len(line[5]) > 0 else 0) - return (submit, start, finish, joules) - # for line in output.split("\n"): - # ilist = line.strip().split() - # #print(type(line)) - # print(job_id) - # print(ilist) - # if int(ilist[0]) == job_id: - # submit = int(mktime(datetime.strptime(ilist[2], "%Y-%m-%dT%H:%M:%S").timetuple())) - # start = int(mktime(datetime.strptime(ilist[3], "%Y-%m-%dT%H:%M:%S").timetuple())) - # finish = int(mktime(datetime.strptime(ilist[4], "%Y-%m-%dT%H:%M:%S").timetuple())) - # joules = int(float(str(ilist[5])[:-1]) * 1000 if len(ilist[5]) > 0 else 0) - - # return (submit, start, finish, int(joules)) - return (0,0,0,0) + # print(detailed_data) + return (submit, start, finish, joules, detailed_data) + + return (0,0,0,0, dict()) except Exception as exp: # On error return 4*0 Log.warning(str(exp)) - return (0,0,0,0) + return (0,0,0,0, dict()) #return str(output) @@ -185,7 +184,7 @@ class SlurmPlatform(ParamikoPlatform): return 'squeue -j {0} -o %A,%R'.format(job_id) def get_job_energy_cmd(self, job_id): - return 'sacct -n -j {0} -o JobId,State,Submit,Start,End,ConsumedEnergy'.format(job_id) + return 'sacct -n -j {0} -o JobId%20,State,Submit,Start,End,ConsumedEnergy,MaxRSS,AveRSS'.format(job_id) def parse_queue_reason(self, output, job_id): reason = [x.split(',')[1] for x in output.splitlines() -- GitLab From 57395e875385592f6f483396f2d2cfbdcc9436ce Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Wed, 15 Jul 2020 11:56:46 +0200 Subject: [PATCH 4/4] Removed print --- autosubmit/database/db_jobdata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 3772481f2..b1cfaecac 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -347,7 +347,7 @@ class JobDataStructure(): try: if type(platform_object) is not str: if platform_object.type == "slurm": - print("Checking Slurm for " + str(job_name)) + #print("Checking Slurm for " + str(job_name)) submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy(job_id) except Exception as exp: Log.info(traceback.format_exc()) -- GitLab