From edd6637c3b701ab1cd61e63837ff7d1a3e3adc66 Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Mon, 20 Jul 2020 11:48:35 +0200 Subject: [PATCH 1/2] Fixed #549. Added necessary structure to allow Autosubmit to update the experiment status database. Also, this structure will serve as a way to implement more database handling for other purposes --- autosubmit/autosubmit.py | 31 +-- autosubmit/database/db_jobdata.py | 288 +++++++++++++++++++++++----- autosubmit/database/db_structure.py | 22 ++- 3 files changed, 272 insertions(+), 69 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index f81ceb804..4a5367145 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -73,6 +73,7 @@ from bscearth.utils.date import date2str from monitor.monitor import Monitor from database.db_common import get_autosubmit_version from database.db_common import delete_experiment +from database.db_jobdata import ExperimentStatus from experiment.experiment_common import copy_experiment from experiment.experiment_common import new_experiment from database.db_common import create_db @@ -809,7 +810,7 @@ class Autosubmit: if filename in conf_copy_filter_folder: if os.path.isfile(os.path.join(conf_copy_id, filename)): new_filename = filename.split( - ".")[0]+"_"+exp_id+".conf" + ".")[0] + "_" + exp_id + ".conf" content = open(os.path.join( conf_copy_id, filename), 'r').read() # If autosubmitrc [conf] custom_platforms has been set and file exists, replace content @@ -853,7 +854,7 @@ class Autosubmit: os.mkdir(os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR)) os.chmod(os.path.join(tmp_path, BasicConfig.LOCAL_ASLOG_DIR), 0o775) Log.debug("Creating temporal remote directory...") - remote_tmp_path = os.path.join(tmp_path, "LOG_"+exp_id) + remote_tmp_path = os.path.join(tmp_path, "LOG_" + exp_id) os.mkdir(remote_tmp_path) os.chmod(remote_tmp_path, 0o755) @@ -948,7 +949,7 @@ class Autosubmit: job_list.parameters = parameters @staticmethod - def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, check_wrapper=False): + def inspect(expid, lst, filter_chunks, filter_status, filter_section, notransitive=False, force=False, check_wrapper=False): """ Generates cmd files experiment. @@ -1272,7 +1273,7 @@ class Autosubmit: "job_packages_" + expid) if as_conf.get_wrapper_type() != 'none': os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid+".db"), 0644) + expid, "pkl", "job_packages_" + expid + ".db"), 0644) packages = packages_persistence.load() for (exp_id, package_name, job_name) in packages: if package_name not in job_list.packages_dict: @@ -1292,6 +1293,8 @@ class Autosubmit: ######################### # AUTOSUBMIT - MAIN LOOP ######################### + # Update RUNNING database + ExperimentStatus(expid).update_running_status() # Main loop. Finishing when all jobs have been submitted while job_list.get_active(): # reload parameters changes @@ -1444,9 +1447,9 @@ class Autosubmit: if "Thread-" in thread.name: if thread.isAlive(): active_threads = True - threads_active = threads_active+1 + threads_active = threads_active + 1 sleep(10) - timeout = 10+timeout + timeout = 10 + timeout if len(job_list.get_failed()) > 0: Log.info("Some jobs have failed and reached maximum retrials") return False @@ -1761,7 +1764,7 @@ class Autosubmit: if txt_only or txt_logfiles: monitor_exp.generate_output_txt(expid, jobs, os.path.join( - exp_path, "/tmp/LOG_"+expid), txt_logfiles, job_list_object=job_list) + exp_path, "/tmp/LOG_" + expid), txt_logfiles, job_list_object=job_list) else: # if file_format is set, use file_format, otherwise use conf value monitor_exp.generate_output(expid, @@ -2264,7 +2267,7 @@ class Autosubmit: try: p.send_command( "cp -rP " + os.path.join(p.temp_dir, experiment_id) + " " + p.root_dir) - p.send_command("chmod 755 -R "+p.root_dir) + p.send_command("chmod 755 -R " + p.root_dir) Log.result( "Files/dirs on {0} have been successfully picked up", platform) except (IOError, BaseException): @@ -2287,7 +2290,7 @@ class Autosubmit: else: for platform in backup_files: p = submitter.platforms[platform] - p.send_command("rm -R " + p.temp_dir+"/"+experiment_id) + p.send_command("rm -R " + p.temp_dir + "/" + experiment_id) Log.result("The experiment has been successfully picked up.") #Log.info("Refreshing the experiment.") # Autosubmit.refresh(experiment_id,False,False) @@ -2881,10 +2884,10 @@ class Autosubmit: try: tmp_folder = os.path.join( BasicConfig.LOCAL_ROOT_DIR, "tmp") - tmp_expid = os.path.join(tmp_folder, expid+"_to_delete") + tmp_expid = os.path.join(tmp_folder, expid + "_to_delete") os.rename(exp_folder, tmp_expid) Log.warning("Experiment folder renamed to: {0}".format( - exp_folder+"_to_delete ")) + exp_folder + "_to_delete ")) except Exception as e: Log.critical( "Can not remove or rename experiments folder: {0}".format(e)) @@ -3234,7 +3237,7 @@ class Autosubmit: if force: try: cmd = ["rsync -ach --info=progress2 " + - local_project_path+"/* "+local_destination] + local_project_path + "/* " + local_destination] subprocess.call(cmd, shell=True) except subprocess.CalledProcessError: Log.error( @@ -3786,7 +3789,7 @@ class Autosubmit: expidJoblist[str(x[0:4])] += 1 if str(expid) in expidJoblist: - wrongExpid = jobs.__len__()-expidJoblist[expid] + wrongExpid = jobs.__len__() - expidJoblist[expid] if wrongExpid > 0: Log.warning( "There are {0} job.name with an invalid Expid", wrongExpid) @@ -3817,7 +3820,7 @@ class Autosubmit: packages_persistence = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) os.chmod(os.path.join(BasicConfig.LOCAL_ROOT_DIR, - expid, "pkl", "job_packages_" + expid+".db"), 0775) + expid, "pkl", "job_packages_" + expid + ".db"), 0775) packages_persistence.reset_table(True) referenced_jobs_to_remove = set() job_list_wrappers = copy.deepcopy(job_list) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index b1cfaecac..638d317b4 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -29,7 +29,7 @@ import copy import collections from datetime import datetime from json import dumps -from networkx import DiGraph +#from networkx import DiGraph from autosubmit.config.basicConfig import BasicConfig from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates @@ -37,10 +37,10 @@ from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_da CURRENT_DB_VERSION = 10 _debug = True JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish', - 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id','extra_data']) + 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id', 'extra_data']) -# JobItem = collections.namedtuple( -# 'JobItem', 'id counter job_name created modified submit start finish status rowtype ncpus wallclock qos energy date section member chunk last platform') +ExperimentRow = collections.namedtuple( + 'ExperimentRow', ['exp_id', 'expid', 'status', 'seconds']) class JobData(): @@ -141,6 +141,7 @@ class JobData(): class JobDataList(): """Object that stores the list of jobs to be handled. """ + def __init__(self, expid): self.jobdata_list = list() self.expid = expid @@ -152,7 +153,230 @@ class JobDataList(): return len(self.jobdata_list) -class JobDataStructure(): +class MainDataBase(): + def __init__(self, expid): + self.expid = expid + self.conn = None + self.conn_ec = None + self.create_table_query = None + + def create_connection(self, db_file): + """ + Create a database connection to the SQLite database specified by db_file. + :param db_file: database file name + :return: Connection object or None + """ + try: + conn = sqlite3.connect(db_file) + return conn + except: + return None + + def create_table(self): + """ create a table from the create_table_sql statement + :param conn: Connection object + :param create_table_sql: a CREATE TABLE statement + :return: + """ + try: + if self.conn: + c = self.conn.cursor() + c.execute(self.create_table_query) + else: + raise IOError("Not a valid connection") + except IOError as exp: + Log.warning(exp) + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on create table : " + str(type(e).__name__)) + return None + + +class ExperimentStatus(MainDataBase): + def __init__(self, expid): + MainDataBase.__init__(self, expid) + BasicConfig.read() + self.DB_FILE_AS_TIMES = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, "as_times.db") + self.DB_FILE_ECEARTH = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, "ecearth.db") + self.PKL_FILE_PATH = os.path.join( + BasicConfig.LOCAL_ROOT_DIR, expid, "pkl", "job_list_" + str(self.expid) + ".pkl") + self.create_table_query = textwrap.dedent( + '''CREATE TABLE + IF NOT EXISTS experiment_status ( + exp_id integer PRIMARY KEY, + name text NOT NULL, + status text NOT NULL, + seconds_diff integer NOT NULL, + modified text NOT NULL, + FOREIGN KEY (exp_id) REFERENCES experiment (id) + );''') + + if not os.path.exists(self.DB_FILE_AS_TIMES): + open(self.DB_FILE_AS_TIMES, "w") + self.conn = self.create_connection(self.DB_FILE_AS_TIMES) + self.create_table() + else: + self.conn = self.create_connection(self.DB_FILE_AS_TIMES) + + if os.path.exists(self.DB_FILE_ECEARTH): + self.conn_ec = self.create_connection(self.DB_FILE_ECEARTH) + + self.current_table = self.prepare_status_db() + self.current_row = next( + (exp for exp in self.current_table if exp.expid == self.expid), None) if len(self.current_table) > 0 else None + + def print_current_table(self): + for experiment in self.current_table: + #experiment = ExperimentRow(k, *v) + print(experiment.expid) + print(experiment.exp_id) + print(experiment.status) + print(experiment.seconds) + print("\n") + if self.current_row: + print("Current Row:\n\t" + self.current_row.expid + "\n\t" + + str(self.current_row.exp_id) + "\n\t" + self.current_row.status) + + def prepare_status_db(self): + """ + Returns the contents of the status table in an ordered way + :return: Map from experiment name to (Id of experiment, Status, Seconds) + :rtype: Dictionary Key: String, Value: Integer, String, Integer + """ + #self.conn = self.create_connection(self.DB_FILE_AS_TIMES) + + #drop_table_query = ''' DROP TABLE experiment_status ''' + # create_table(conn, drop_table_query) + # self.create_table() + current_table = self._get_exp_status() + result = list() + # print(current_table) + # print(type(current_table)) + for item in current_table: + #exp_id, expid, status, seconds = item + result.append(ExperimentRow(*item)) + return result + + def _get_id_db(self): + """ + Get exp_id of the experiment (different than the experiment name). + :param conn: ecearth.db connection + :type conn: sqlite3 connection + :param expid: Experiment name + :type expid: String + :return: Id of the experiment + :rtype: Integer or None + """ + try: + if self.conn_ec: + cur = self.conn_ec.cursor() + cur.execute( + "SELECT id FROM experiment WHERE name=?", (self.expid,)) + row = cur.fetchone() + return int(row[0]) + return None + except Exception as exp: + Log.warning(exp) + return None + + def _get_exp_status(self): + """ + Get all registers from experiment_status.\n + :return: row content: exp_id, name, status, seconds_diff + :rtype: 4-tuple (int, str, str, int) + """ + try: + if self.conn: + #conn = create_connection(DB_FILE_AS_TIMES) + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute( + "SELECT exp_id, name, status, seconds_diff FROM experiment_status") + rows = cur.fetchall() + return rows + return None + except Exception as exp: + print(traceback.format_exc()) + return None + + def test_running(self, time_condition=600): + if (os.path.exists(self.PKL_FILE_PATH)): + current_stat = os.stat(self.PKL_FILE_PATH) + timest = int(current_stat.st_mtime) + timesys = int(time.time()) + time_diff = int(timesys - timest) + if (time_diff < time_condition): + return True + else: + return False + + def update_running_status(self, status="RUNNING"): + if self.current_row: + # Row exists + self._update_exp_status(status) + else: + # New Row + self._create_exp_status() + + def _create_exp_status(self): + """ + Create experiment status + :param conn: + :param details: + :return: + """ + try: + if self.conn and self.conn_ec: + exp_id = self._get_id_db() + #conn = create_connection(DB_FILE_AS_TIMES) + creation_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + sql = ''' INSERT INTO experiment_status(exp_id, name, status, seconds_diff, modified) VALUES(?,?,?,?,?) ''' + # print(row_content) + cur = self.conn.cursor() + cur.execute(sql, (exp_id, + self.expid, "RUNNING", 0, creation_date)) + # print(cur) + self.conn.commit() + return cur.lastrowid + except sqlite3.Error as e: + Log.warning("Error on Insert : " + str(type(e).__name__)) + + def _update_exp_status(self, status="RUNNING"): + """ + Update existing experiment_status. + :param expid: Experiment name + :type expid: String + :param status: Experiment status + :type status: String + :param seconds_diff: Indicator of how long it has been active since the last time it was checked + :type seconds_diff: Integer + :return: Id of register + :rtype: Integer + """ + try: + if self.conn and self.current_row: + # conn = create_connection(DB_FILE_AS_TIMES) + modified_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + sql = ''' UPDATE experiment_status SET status = ?, seconds_diff = ?, modified = ? WHERE name = ? ''' + cur = self.conn.cursor() + cur.execute(sql, (status, 0, modified_date, + self.current_row.expid)) + self.conn.commit() + return cur.lastrowid + return None + except sqlite3.Error as e: + Log.warning("Error while trying to update " + + str(expid) + " in experiment_status.") + Log.warning(traceback.format_exc()) + Log.warning("Error on Update: " + str(type(e).__name__)) + return None + + +class JobDataStructure(MainDataBase): def __init__(self, expid): """Initializes the object based on the unique identifier of the experiment. @@ -160,12 +384,13 @@ class JobDataStructure(): Args: expid (str): Experiment identifier """ + MainDataBase.__init__(self, expid) BasicConfig.read() - self.expid = expid + #self.expid = expid self.folder_path = BasicConfig.JOBDATA_DIR self.database_path = os.path.join( self.folder_path, "job_data_" + str(expid) + ".db") - self.conn = None + #self.conn = None self.jobdata_list = JobDataList(self.expid) self.create_table_query = textwrap.dedent( '''CREATE TABLE @@ -310,7 +535,7 @@ class JobDataStructure(): Log.warning(str(exp)) return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None): + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None): """Writes the finish time into the database Args: @@ -348,12 +573,14 @@ class JobDataStructure(): if type(platform_object) is not str: if platform_object.type == "slurm": #print("Checking Slurm for " + str(job_name)) - submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy(job_id) + submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy( + job_id) except Exception as exp: Log.info(traceback.format_exc()) Log.warning(str(exp)) energy = 0 - job_data_last.finish = int(finish_time) if finish_time > 0 else int(finish) + job_data_last.finish = int( + finish_time) if finish_time > 0 else int(finish) job_data_last.status = status job_data_last.job_id = job_id job_data_last.energy = energy @@ -371,8 +598,8 @@ class JobDataStructure(): job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) write_inserted = self.write_start_time(job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) - #print(submit_inserted) - #print(write_inserted) + # print(submit_inserted) + # print(write_inserted) if submit_inserted and write_inserted: #print("retro finish") self.write_finish_time( @@ -685,8 +912,8 @@ class JobDataStructure(): Log.info(traceback.format_exc()) Log.warning("Error on Select : " + str(type(e).__name__)) return None - - def _set_pragma_version(self, version = 2): + + def _set_pragma_version(self, version=2): """Sets current version of the schema Args: @@ -733,36 +960,3 @@ class JobDataStructure(): Log.info(traceback.format_exc()) Log.warning("Error on Select Max : " + str(type(e).__name__)) return None - - def create_table(self): - """ create a table from the create_table_sql statement - :param conn: Connection object - :param create_table_sql: a CREATE TABLE statement - :return: - """ - try: - if self.conn: - c = self.conn.cursor() - c.execute(self.create_table_query) - else: - raise IOError("Not a valid connection") - except IOError as exp: - Log.warning(exp) - return None - except sqlite3.Error as e: - if _debug == True: - Log.info(traceback.format_exc()) - Log.warning("Error on create table : " + str(type(e).__name__)) - return None - - def create_connection(self, db_file): - """ - Create a database connection to the SQLite database specified by db_file. - :param db_file: database file name - :return: Connection object or None - """ - try: - conn = sqlite3.connect(db_file) - return conn - except: - return None diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py index 7beff44e8..78b91c205 100644 --- a/autosubmit/database/db_structure.py +++ b/autosubmit/database/db_structure.py @@ -27,7 +27,7 @@ import traceback import sqlite3 import copy from datetime import datetime -from networkx import DiGraph +# from networkx import DiGraph #DB_FILE_AS_TIMES = "/esarchive/autosubmit/as_times.db" @@ -134,13 +134,19 @@ def save_structure(graph, exp_id, structures_path): db_structure_path = os.path.join( structures_path, "structure_" + exp_id + ".db") # with open(db_structure_path, "w"): - conn = create_connection(db_structure_path) - _delete_table_content(conn) - for u, v in graph.edges(): - # save - _create_edge(conn, u, v) - #print("Created edge " + str(u) + str(v)) - conn.commit() + conn = None + if os.path.exists(db_structure_path): + conn = create_connection(db_structure_path) + _delete_table_content(conn) + else: + open(db_structure_path, "w") + conn = create_connection(db_structure_path) + if conn: + for u, v in graph.edges(): + # save + _create_edge(conn, u, v) + #print("Created edge " + str(u) + str(v)) + conn.commit() else: # pkl folder not found raise Exception("pkl folder not found " + str(structures_path)) -- GitLab From 4fb2e4446123453bbe0b062e3dbe9c4b86f989bc Mon Sep 17 00:00:00 2001 From: Wilmer Uruchi Ticona Date: Wed, 22 Jul 2020 13:26:05 +0200 Subject: [PATCH 2/2] Added a fix that does not affect the workflow anyway. Just for closure. --- autosubmit/database/db_jobdata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index 638d317b4..56183771b 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -695,7 +695,7 @@ class JobDataStructure(MainDataBase): # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform) jobdata.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) - return jobdata + return jobdata else: return None else: -- GitLab