diff --git a/.gitignore b/.gitignore index 7eebdc167af1f3d3036ad2bcf562fa023726215f..957faf6e221d6f5e8ac52653845b25fc3448d167 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,4 @@ autosubmit/miniTest.py autosubmit/simple_test.py .vscode -autosubmit.egg-info/ \ No newline at end of file +autosubmit.egg-info/ diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index f43dc9483a2ca0efa837a1a8e9fac8ed608c3744..b313a427b2cf178423e6f6f93578f9db289f9856 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -36,7 +36,8 @@ from autosubmit.job.job_package_persistence import JobPackagePersistence from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, subs_dates from log.log import Log, AutosubmitCritical, AutosubmitError -CURRENT_DB_VERSION = 14 # Used to be 10 +# VERSION 15 Adds columns MaxRSS, AveRSS, out, err => job_data +CURRENT_DB_VERSION = 15 # Used to be 10 EXPERIMENT_HEADER_CHANGES_DB_VERSION = 14 # Defining RowType standard @@ -46,9 +47,16 @@ class RowType: #PACKED = 2 +class RowStatus: + INITIAL = 0 + COMPLETED = 1 + PROCESSED = 2 + FAULTY = 3 + + _debug = False JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish', - 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id', 'extra_data', 'nnodes', 'run_id']) + 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id', 'extra_data', 'nnodes', 'run_id', 'MaxRSS', 'AveRSS', 'out', 'err', 'rowstatus']) ExperimentRunItem = collections.namedtuple('ExperimentRunItem', [ 'run_id', 'created', 'start', 'finish', 'chunk_unit', 'chunk_size', 'completed', 'total', 'failed', 'queuing', 'running', 'submitted', 'suspended', 'metadata']) @@ -110,6 +118,11 @@ class ExperimentRun(): class JobStepExtraData(): + """ + Class that manages the extra_data content + The constructor should receive a dict object + """ + def __init__(self, key, dict_data): self.key = key self.ncpus = dict_data["ncpus"] if dict_data and "ncpus" in dict_data.keys( @@ -134,31 +147,8 @@ class JobData(object): """Job Data object """ - def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=0, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict(), nnodes=0, run_id=None): - """[summary] - - Args: - _id (int): Internal Id - counter (int, optional): [description]. Defaults to 1. - job_name (str, optional): [description]. Defaults to "None". - created (datetime, optional): [description]. Defaults to None. - modified (datetime, optional): [description]. Defaults to None. - submit (int, optional): [description]. Defaults to 0. - start (int, optional): [description]. Defaults to 0. - finish (int, optional): [description]. Defaults to 0. - status (str, optional): [description]. Defaults to "UNKNOWN". - rowtype (int, optional): [description]. Defaults to 1. - ncpus (int, optional): [description]. Defaults to 0. - wallclock (str, optional): [description]. Defaults to "00:00". - qos (str, optional): [description]. Defaults to "debug". - energy (int, optional): [description]. Defaults to 0. - date (str, optional): [description]. Defaults to "". - section (str, optional): [description]. Defaults to "". - member (str, optional): [description]. Defaults to "". - chunk (int, optional): [description]. Defaults to 0. - last (int, optional): [description]. Defaults to 1. - platform (str, optional): [description]. Defaults to "NA". - job_id (int, optional): [description]. Defaults to 0. + def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=0, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict(), nnodes=0, run_id=None, MaxRSS=0.0, AveRSS=0.0, out="", err="", rowstatus=RowStatus.INITIAL): + """ """ self._id = _id self.counter = counter @@ -190,25 +180,46 @@ class JobData(object): self.nnodes = nnodes self.run_id = run_id self.require_update = False + # DB VERSION 15 attributes + self.MaxRSS = MaxRSS + self.AveRSS = AveRSS + self.out = out + self.err = err + self.rowstatus = rowstatus @property def submit(self): + """ + Returns the submit time timestamp as an integer. + """ return int(self._submit) @property def start(self): + """ + Returns the start time timestamp as an integer. + """ return int(self._start) @property def finish(self): + """ + Returns the finish time timestamp as an integer. + """ return int(self._finish) @property def platform(self): + """ + Returns the name of the platform, "NA" if no platform is set. + """ return self._platform @property def energy(self): + """ + Returns the energy spent value (JOULES) as an integer. + """ return self._energy @submit.setter @@ -229,7 +240,9 @@ class JobData(object): @energy.setter def energy(self, energy): - # print("Energy {0}".format(energy)) + """ + Set the energy value. If it is different than the current energy value, a update flag will be activated. + """ if energy > 0: if (energy != self._energy): # print("Updating energy to {0} from {1}.".format( @@ -238,22 +251,37 @@ class JobData(object): self._energy = energy if energy else 0 def delta_queue_time(self): + """ + Returns queuing time as a timedelta object. + """ return str(timedelta(seconds=self.queuing_time())) def delta_running_time(self): + """ + Returns running time as a timedelta object. + """ return str(timedelta(seconds=self.running_time())) def submit_datetime(self): + """ + Return the submit time as a datetime object, None if submit time equal 0. + """ if self.submit > 0: return datetime.fromtimestamp(self.submit) return None def start_datetime(self): + """ + Return the start time as a datetime object, None if start time equal 0. + """ if self.start > 0: return datetime.fromtimestamp(self.start) return None def finish_datetime(self): + """ + Return the finish time as a datetime object, None if start time equal 0. + """ if self.finish > 0: return datetime.fromtimestamp(self.finish) return None @@ -280,10 +308,11 @@ class JobData(object): return None def running_time(self): - """Calculates the running time of the job. + """ + Calculates and returns the running time of the job, in seconds. - Returns: - int: running time + :return: Running time in seconds. + :rtype: int """ if self.status in ["RUNNING", "COMPLETED", "FAILED"]: # print("Finish: {0}".format(self.finish)) @@ -295,10 +324,11 @@ class JobData(object): return 0 def queuing_time(self): - """Calculates the queuing time of the job. + """ + Calculates and returns the queuing time of the job, in seconds. - Returns: - int: queueing time + :return: Queueing time in seconds. + :rtype: int """ if self.status in ["SUBMITTED", "QUEUING", "RUNNING", "COMPLETED", "HELD", "PREPARED", "FAILED"]: queue = int((self.start if self.start > @@ -308,6 +338,11 @@ class JobData(object): return 0 def get_hdata(self): + """ + Get the job data as an ordered dict into a JSON object. + :return: Job data as an ordered dict into a JSON object. + :rtype: JSON object. + """ hdata = collections.OrderedDict() hdata["name"] = self.job_name hdata["date"] = self.date @@ -324,6 +359,8 @@ class JobData(object): hdata["nnodes"] = self.nnodes hdata["energy"] = self.energy hdata["platform"] = self.platform + hdata["MaxRSS"] = self.MaxRSS + hdata["AveRSS"] = self.AveRSS return dumps(hdata) @@ -416,6 +453,8 @@ class MainDataBase(): try: c.execute(item) except sqlite3.Error as e: + # Always useful + # print(traceback.format_exc()) if _debug == True: Log.info(str(type(e).__name__)) Log.debug(str(type(e).__name__)) @@ -585,8 +624,8 @@ class ExperimentStatus(MainDataBase): if self.conn and self.conn_ec: exp_id = self._get_id_db() if exp_id: - print("exp_id {0}".format(exp_id)) - #conn = create_connection(DB_FILE_AS_TIMES) + # print("exp_id {0}".format(exp_id)) + # conn = create_connection(DB_FILE_AS_TIMES) creation_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') sql = ''' INSERT INTO experiment_status(exp_id, name, status, seconds_diff, modified) VALUES(?,?,?,?,?) ''' # print(row_content) @@ -637,9 +676,20 @@ class ExperimentStatus(MainDataBase): def check_if_database_exists(expid): + """ + Tests if historical database exists. + :param expid: Experiment name (identifier) + :type expid: str + :return: True if exists, False otherwise. + :rtype: bool + """ BasicConfig.read() folder_path = BasicConfig.JOBDATA_DIR database_path = os.path.join(folder_path, "job_data_" + str(expid) + ".db") + if os.path.exists(database_path): + return True + else: + return False class JobDataStructure(MainDataBase): @@ -659,10 +709,23 @@ class JobDataStructure(MainDataBase): self.folder_path, "job_data_" + str(expid) + ".db") #self.conn = None self.jobdata_list = JobDataList(self.expid) + # job_data changes self.version_schema_changes.append( "ALTER TABLE job_data ADD COLUMN nnodes INTEGER NOT NULL DEFAULT 0") self.version_schema_changes.append( "ALTER TABLE job_data ADD COLUMN run_id INTEGER") + # DB VERSION 15 changes + self.version_schema_changes.append( + "ALTER TABLE job_data ADD COLUMN MaxRSS REAL NOT NULL DEFAULT 0.0") + self.version_schema_changes.append( + "ALTER TABLE job_data ADD COLUMN AveRSS REAL NOT NULL DEFAULT 0.0") + self.version_schema_changes.append( + "ALTER TABLE job_data ADD COLUMN out TEXT NOT NULL DEFAULT ''") + self.version_schema_changes.append( + "ALTER TABLE job_data ADD COLUMN err TEXT NOT NULL DEFAULT ''") + self.version_schema_changes.append( + "ALTER TABLE job_data ADD COLUMN rowstatus INTEGER NOT NULL DEFAULT 0") + # experiment_run changes self.version_schema_changes.append( "ALTER TABLE experiment_run ADD COLUMN suspended INTEGER NOT NULL DEFAULT 0") self.version_schema_changes.append( @@ -695,6 +758,11 @@ class JobDataStructure(MainDataBase): extra_data TEXT NOT NULL, nnodes INTEGER NOT NULL DEFAULT 0, run_id INTEGER, + MaxRSS REAL NOT NULL DEFAULT 0.0, + AveRSS REAL NOT NULL DEFAULT 0.0, + out TEXT NOT NULL, + err TEXT NOT NULL, + rowstatus INTEGER NOT NULL DEFAULT 0, UNIQUE(counter,job_name) ); ''') @@ -799,6 +867,24 @@ class JobDataStructure(MainDataBase): return self._insert_experiment_run(new_run) def process_status_changes(self, tracking_dictionary, job_list=None, chunk_unit="NA", chunk_size=0, check_run=False, current_config=""): + """ + Finds and updated the changes of status of the jobs in the current job list. + + :param tracking_dictionary: map of changes + :type tracking_dictionary: dict() + :param job_list: current list of jobs + :type job_list: list of Job objects + :param chunk_unit: chunk unit from config + :type chunk_unit: str + :param chunk_size: chunk size from config + :type chunk_size: int + :param check_run: true if the experiment run should be checked + :type check_run: bool + :param current_config: current configuration of experiment + :type current_config: JSON + :return: None + :rtype: None + """ try: current_run = self.get_max_id_experiment_run() if current_run: @@ -847,7 +933,8 @@ class JobDataStructure(MainDataBase): return None def validate_current_run(self, job_list, chunk_unit="NA", chunk_size=0, must_create=False, only_update=False, current_config=""): - """[summary] + """ + Checks current run :param job_list ([type]): [description] :param chunk_unit (str, optional): [description]. Defaults to "NA". @@ -909,6 +996,9 @@ class JobDataStructure(MainDataBase): return None def update_finish_time(self): + """ + Update finish time of experiment. Updates the current_run_id attribute in this object. + """ try: current_run = self.get_max_id_experiment_run() if current_run: @@ -1062,7 +1152,7 @@ class JobDataStructure(MainDataBase): "Autosubmit couldn't write start time.") return None - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True, out_file_path=None): + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True, out_file_path=None, out_file=None, err_file=None): """Writes the finish time into the database Args: @@ -1143,6 +1233,13 @@ class JobDataStructure(MainDataBase): job_data_last.status = status job_data_last.job_id = job_id job_data_last.energy = energy + job_data_last.rowstatus = RowStatus.COMPLETED + job_data_last.out = out_file if out_file else "" + job_data_last.err = err_file if err_file else "" + # TODO: These values need to be retrieved from the sacct command + job_data_last.MaxRSS = 0.0 + job_data_last.AveRSS = 0.0 + # END TODO job_data_last.ncpus = number_cpus if number_cpus > 0 else job_data_last.ncpus job_data_last.nnodes = number_nodes if number_nodes > 0 else job_data_last.nnodes job_data_last.extra_data = dumps( @@ -1168,7 +1265,7 @@ class JobDataStructure(MainDataBase): if submit_inserted and write_inserted: #print("retro finish") self.write_finish_time( - job_name, time.time(), status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, parent_id_list) + job_name, time.time(), status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, parent_id_list, no_slurm, out_file_path, out_file, err_file) else: return None except Exception as exp: @@ -1178,6 +1275,7 @@ class JobDataStructure(MainDataBase): def retry_incompleted_data(self, list_jobs): """ + DEPRECATED Retries retrieval of data that might be incompleted. :param list_jobs: list of jobs in experiment @@ -1219,7 +1317,8 @@ class JobDataStructure(MainDataBase): return None def process_current_run_collection(self): - """Post-process source output for job_data. + """ + Post-process source output for job_data. :return: job data processes, messages :rtype: ([job_data], [warning_messaages]) @@ -1434,7 +1533,7 @@ class JobDataStructure(MainDataBase): # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item job_item = JobItem(*item) self.jobdata_list.add_jobdata(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, - job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes, job_item.run_id)) + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes if 'nnodes' in job_item._fields else 0, job_item.run_id if 'run_id' in job_item._fields else None, job_item.MaxRSS if 'MaxRSS' in job_item._fields else 0.0, job_item.AveRSS if 'AveRSS' in job_item._fields else 0.0, job_item.out if 'out' in job_item._fields else "", job_item.err if 'err' in job_item._fields else "", job_item.rowstatus if 'rowstatus' in job_item._fields else RowStatus.FAULTY)) else: raise Exception("Job data folder not found :" + @@ -1465,7 +1564,7 @@ class JobDataStructure(MainDataBase): for item in current_job: job_item = JobItem(*item) job_data.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, - job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes, job_item.run_id)) + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes if 'nnodes' in job_item._fields else 0, job_item.run_id if 'run_id' in job_item._fields else None, job_item.MaxRSS if 'MaxRSS' in job_item._fields else 0.0, job_item.AveRSS if 'AveRSS' in job_item._fields else 0.0, job_item.out if 'out' in job_item._fields else "", job_item.err if 'err' in job_item._fields else "", job_item.rowstatus if 'rowstatus' in job_item._fields else RowStatus.FAULTY)) return job_data else: return None @@ -1493,7 +1592,7 @@ class JobDataStructure(MainDataBase): for job_data in current_job_data: jobitem = JobItem(*job_data) current_collection.append(JobData(jobitem.id, jobitem.counter, jobitem.job_name, jobitem.created, jobitem.modified, jobitem.submit, jobitem.start, jobitem.finish, jobitem.status, jobitem.rowtype, jobitem.ncpus, - jobitem.wallclock, jobitem.qos, jobitem.energy, jobitem.date, jobitem.section, jobitem.member, jobitem.chunk, jobitem.last, jobitem.platform, jobitem.job_id, jobitem.extra_data, jobitem.nnodes, jobitem.run_id)) + jobitem.wallclock, jobitem.qos, jobitem.energy, jobitem.date, jobitem.section, jobitem.member, jobitem.chunk, jobitem.last, jobitem.platform, jobitem.job_id, jobitem.extra_data, jobitem.nnodes if 'nnodes' in jobitem._fields else 0, jobitem.run_id if 'run_id' in jobitem._fields else None, jobitem.MaxRSS if 'MaxRSS' in jobitem._fields else 0.0, jobitem.AveRSS if 'AveRSS' in jobitem._fields else 0.0, jobitem.out if 'out' in jobitem._fields else "", jobitem.err if 'err' in jobitem._fields else "", jobitem.rowstatus if 'rowstatus' in jobitem._fields else RowStatus.FAULTY)) return current_collection return None except Exception as exp: @@ -1572,7 +1671,7 @@ class JobDataStructure(MainDataBase): for current in current_job_last: job_item = JobItem(*current) jobdata.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, - job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes, job_item.run_id)) + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes if 'nnodes' in job_item._fields else 0, job_item.run_id if 'run_id' in job_item._fields else None, job_item.MaxRSS if 'MaxRSS' in job_item._fields else 0.0, job_item.AveRSS if 'AveRSS' in job_item._fields else 0.0, job_item.out if 'out' in job_item._fields else "", job_item.err if 'err' in job_item._fields else "", job_item.rowstatus if 'rowstatus' in job_item._fields else RowStatus.FAULTY)) return jobdata else: return None @@ -1613,13 +1712,13 @@ class JobDataStructure(MainDataBase): return None def _update_start_job_data(self, jobdata): - """Update job_data by id. Updates start, modified, job_id, status. - - Args: - jobdata ([type]): [description] + """ + Update job_data by id. Updates start, modified, job_id, status. - Returns: - [type]: [description] + :param jobdata: Job information + :type jobdata: JobData object + :return: True, None if failed + :rtype: Bool or None """ # current_time = try: @@ -1639,20 +1738,20 @@ class JobDataStructure(MainDataBase): return None def _update_finish_job_data_plus(self, jobdata): - """Updates job_data by id. Updates submit, start, finish, modified, job_id, status, energy, extra_data, nnodes, ncpus - - Args: - jobdata (JobData): JobData object + """ + Update register by id. Updates submit, start, finish, modified, job_id, status, energy, extra_data, nnodes, ncpus, rowstatus, out, err - Returns: - int/None: lastrowid if success, None if error + :param jobdata: Job information + :type jobdata: JobData object + :return: last row id, None if failed + :rtype: integer or None """ try: if self.conn: - sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=? WHERE id=? ''' + sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=?, rowstatus=?, out=?, err=? WHERE id=? ''' cur = self.conn.cursor() cur.execute(sql, (jobdata.submit, jobdata.start, jobdata.finish, jobdata.modified, jobdata.job_id, - jobdata.status, jobdata.energy, jobdata.extra_data, jobdata.nnodes, jobdata.ncpus, jobdata._id)) + jobdata.status, jobdata.energy, jobdata.extra_data, jobdata.nnodes, jobdata.ncpus, RowStatus.COMPLETED, jobdata.out, jobdata.err, jobdata._id)) self.conn.commit() return cur.lastrowid return None @@ -1664,21 +1763,21 @@ class JobDataStructure(MainDataBase): return None def _update_finish_job_data(self, jobdata): - """Update register by id. Updates finish, modified, job_id, status, energy, extra_data, nnodes, ncpus - - Args: - jobdata ([type]): [description] + """ + Update register by id. Updates finish, modified, job_id, status, energy, extra_data, nnodes, ncpus, rowstatus, out, err - Returns: - [type]: None if error, lastrowid if success + :param jobdata: Job information + :type jobdata: JobData object + :return: last row id, None if failed + :rtype: integer or None """ try: if self.conn: # print("Updating finish time") - sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=? WHERE id=? ''' + sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=?, rowstatus=?, out=?, err=? WHERE id=? ''' cur = self.conn.cursor() cur.execute(sql, (jobdata.finish, jobdata.modified, jobdata.job_id, - jobdata.status, jobdata.energy, jobdata.extra_data, jobdata.nnodes, jobdata.ncpus, jobdata._id)) + jobdata.status, jobdata.energy, jobdata.extra_data, jobdata.nnodes, jobdata.ncpus, RowStatus.COMPLETED, jobdata.out, jobdata.err, jobdata._id)) self.conn.commit() return cur.lastrowid return None @@ -1689,8 +1788,10 @@ class JobDataStructure(MainDataBase): Log.warning("Error on Update : " + str(type(e).__name__)) return None + # PROCESSED def _update_job_data(self, job_data): - """Updating processed job_data + """ + Updating PROCESSED job_data :param job_data: JobData object with changes :type job_data: JobData object @@ -1699,10 +1800,10 @@ class JobDataStructure(MainDataBase): """ try: if self.conn: - sql = ''' UPDATE job_data SET energy=?, modified=? WHERE id=? ''' + sql = ''' UPDATE job_data SET energy=?, modified=?, MaxRSS=?, AveRSS=?, rowstatus=? WHERE id=? ''' cur = self.conn.cursor() cur.execute(sql, (job_data.energy, datetime.today().strftime( - '%Y-%m-%d-%H:%M:%S'), job_data._id)) + '%Y-%m-%d-%H:%M:%S'), job_data.MaxRSS, job_data.AveRSS, RowStatus.PROCESSED, job_data._id)) # self.conn.commit() return True return None @@ -1746,9 +1847,9 @@ class JobDataStructure(MainDataBase): try: if self.conn: #print("preparing to insert") - sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' + sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' tuplerow = (jobdata.counter, jobdata.job_name, jobdata.created, jobdata.modified, jobdata.submit, jobdata.start, - jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id, jobdata.extra_data, jobdata.nnodes, jobdata.run_id) + jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id, jobdata.extra_data, jobdata.nnodes, jobdata.run_id, jobdata.MaxRSS, jobdata.AveRSS, jobdata.out, jobdata.err, jobdata.rowstatus) cur = self.conn.cursor() #print("pre insert") cur.execute(sql, tuplerow) @@ -1803,7 +1904,7 @@ class JobDataStructure(MainDataBase): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id FROM job_data") + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data") rows = cur.fetchall() return rows else: @@ -1825,7 +1926,7 @@ class JobDataStructure(MainDataBase): if self.conn: self.conn.text_factory = str cur = self.conn.cursor() - cur.execute("SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id from job_data WHERE run_id=? and last=1 and finish > 0 and rowtype >= 2 ORDER BY id", (run_id,)) + cur.execute("SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus from job_data WHERE run_id=? and last=1 and finish > 0 and rowtype >= 2 ORDER BY id", (run_id,)) rows = cur.fetchall() if len(rows) > 0: return rows @@ -1852,7 +1953,7 @@ class JobDataStructure(MainDataBase): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() # print(rows) return rows @@ -1879,7 +1980,7 @@ class JobDataStructure(MainDataBase): self.conn.text_factory = str cur = self.conn.cursor() cur.execute( - "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) rows = cur.fetchall() if rows and len(rows) > 0: return rows diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 66693314b9d4f8c06594044d1312033591806180..604784dbb8e01d1d50daeba2deb887532b6e31f0 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1109,10 +1109,10 @@ class Job(object): path_out = os.path.join(self._tmp_path, 'LOG_' + str(self.expid), out) # Launch first as simple non-threaded function JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, self.wallclock, self._queue, self.date, - self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents]) + self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], True, None, out, err) # Launch second as threaded function thread_write_finish = Thread(target=JobDataStructure(self.expid).write_finish_time, args=(self.name, finish_time, final_status, self.processors, - self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out)) + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform, self.packed, [job.id for job in self._parents], False, path_out, out, err)) thread_write_finish.start() def check_started_after(self, date_limit): diff --git a/docs/source/usage.rst b/docs/source/usage.rst index b82652b39125a7eb2c3bc45a7bbe66866aaff88c..7c9e00102c140f7530fd36f838b1ac8e634f8aa2 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -35,6 +35,8 @@ Command list usage/check usage/describe usage/run + usage/start_time + usage/start_after usage/inspect usage/test usage/testcase diff --git a/docs/source/usage/start_after.rst b/docs/source/usage/start_after.rst new file mode 100644 index 0000000000000000000000000000000000000000..2e6be41813733c1b5e566efd3f0e85e55e1167e4 --- /dev/null +++ b/docs/source/usage/start_after.rst @@ -0,0 +1,20 @@ +How to start an experiment after another experiment is finished +=============================================================== + +To start an experiment after another experiment is finished, use the command: +:: + + autosubmit run EXPID -sa EXPIDB + +*EXPID* is the experiment identifier, the experiment you want to start. + +*EXPIDB* is the experiment identifier of the experiment you are waiting for before your experiment starts. + +.. warning:: Both experiments must be using Autosubmit version `3.13.0b` or later. + +Then, your terminal will show the current status of the experiment you are waiting for. The status format is `COMPLETED/QUEUING/RUNNING/SUSPENDED/FAILED`. + +This functionality can be used together with other options supplied by the `run` command. + +The `-sa` command has a long version `--start_after`. + diff --git a/docs/source/usage/start_time.rst b/docs/source/usage/start_time.rst new file mode 100644 index 0000000000000000000000000000000000000000..ab83c471c7fe6a1891c429e1319177415979122a --- /dev/null +++ b/docs/source/usage/start_time.rst @@ -0,0 +1,19 @@ +How to start an experiment at a given time +========================================== + +To start an experiment at a given time, use the command: +:: + + autosubmit run EXPID -st INPUT + +*EXPID* is the experiment identifier + +*INPUT* is the time when your experiment will start. You can provide two formats: + * `H:M:S`: For example `15:30:00` will start your experiment at 15:30 in the afternoon of the present day. + * `yyyy-mm-dd H:M:S`: For example `2021-02-15 15:30:00` will start your experiment at 15:30 in the afternoon on February 15th. + +Then, your terminal will show a countdown for your experiment start. + +This functionality can be used together with other options supplied by the `run` command. + +The `-st` command has a long version `--start_time`.