diff --git a/.gitignore b/.gitignore index 8eacd9c4d6dfd2c389347277de4b0680cc707031..aa1269dcc6085e26601c139dd581d4c70c37fd9c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ /cover/ /.coverage autosubmit/miniTest.py +autosubmit/simple_test.py diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index fa8ea9e5f4965fc9e20ae2dd187e4cae534a9f75..f81ceb80498dbc9aeeadd76543530b0b03c8888a 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -17,38 +17,11 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . # pipeline_test -from __future__ import print_function -import threading -from sets import Set -from job.job_packager import JobPackager -from job.job_exceptions import WrongTemplateException -from platforms.paramiko_submitter import ParamikoSubmitter -from notifications.notifier import Notifier -from notifications.mail_notifier import MailNotifier -from bscearth.utils.date import date2str -from monitor.monitor import Monitor -from database.db_common import get_autosubmit_version -from database.db_common import delete_experiment -from experiment.experiment_common import copy_experiment -from experiment.experiment_common import new_experiment -from database.db_common import create_db -from bscearth.utils.log import Log -from job.job_grouping import JobGrouping -from job.job_list_persistence import JobListPersistencePkl -from job.job_list_persistence import JobListPersistenceDb -from job.job_package_persistence import JobPackagePersistence -from job.job_packages import JobPackageThread -from job.job_list import JobList -from git.autosubmit_git import AutosubmitGit -from job.job_common import Status -from bscearth.utils.config_parser import ConfigParserFactory -from config.config_common import AutosubmitConfig -from config.basicConfig import BasicConfig """ Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit """ - +from __future__ import print_function try: # noinspection PyCompatibility from configparser import SafeConfigParser @@ -77,16 +50,44 @@ import random import signal import datetime import portalocker +import threading from pkg_resources import require, resource_listdir, resource_exists, resource_string from distutils.util import strtobool from collections import defaultdict from pyparsing import nestedExpr + +from sets import Set + sys.path.insert(0, os.path.abspath('.')) # noinspection PyPackageRequirements # noinspection PyPackageRequirements # from API.testAPI import Monitor # noinspection PyPackageRequirements +from job.job_packager import JobPackager +from job.job_exceptions import WrongTemplateException +from platforms.paramiko_submitter import ParamikoSubmitter +from notifications.notifier import Notifier +from notifications.mail_notifier import MailNotifier +from bscearth.utils.date import date2str +from monitor.monitor import Monitor +from database.db_common import get_autosubmit_version +from database.db_common import delete_experiment +from experiment.experiment_common import copy_experiment +from experiment.experiment_common import new_experiment +from database.db_common import create_db +from bscearth.utils.log import Log +from job.job_grouping import JobGrouping +from job.job_list_persistence import JobListPersistencePkl +from job.job_list_persistence import JobListPersistenceDb +from job.job_package_persistence import JobPackagePersistence +from job.job_packages import JobPackageThread +from job.job_list import JobList +from git.autosubmit_git import AutosubmitGit +from job.job_common import Status +from bscearth.utils.config_parser import ConfigParserFactory +from config.config_common import AutosubmitConfig +from config.basicConfig import BasicConfig # noinspection PyUnusedLocal diff --git a/autosubmit/config/basicConfig.py b/autosubmit/config/basicConfig.py index 9d9bfac809f122746a7a0e76c067b017521f92e7..9c729ebefcf73f42cb549c3d67dbc1364dd4739e 100755 --- a/autosubmit/config/basicConfig.py +++ b/autosubmit/config/basicConfig.py @@ -38,6 +38,8 @@ class BasicConfig: DB_DIR = os.path.join(os.path.expanduser('~'), 'debug', 'autosubmit') STRUCTURES_DIR = os.path.join( '/esarchive', 'autosubmit', 'as_metadata', 'structures') + JOBDATA_DIR = os.path.join( + '/esarchive', 'autosubmit', 'as_metadata', 'data') DB_FILE = 'autosubmit.db' DB_PATH = os.path.join(DB_DIR, DB_FILE) LOCAL_ROOT_DIR = DB_DIR @@ -98,6 +100,8 @@ class BasicConfig: BasicConfig.ALLOWED_HOSTS = parser.get('hosts', 'whitelist') if parser.has_option('structures', 'path'): BasicConfig.STRUCTURES_DIR = parser.get('structures', 'path') + if parser.has_option('historicdb', 'path'): + BasicConfig.JOBDATA_DIR = parser.get('historicdb', 'path') @staticmethod def read(): diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py new file mode 100644 index 0000000000000000000000000000000000000000..b1cfaecac020a417b814f9da7d95e3f59240ffcf --- /dev/null +++ b/autosubmit/database/db_jobdata.py @@ -0,0 +1,768 @@ +#!/usr/bin/env python + +# Copyright 2015 Earth Sciences Department, BSC-CNS + +# This file is part of Autosubmit. + +# Autosubmit is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# Autosubmit is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . + +import os +import sys +import string +import time +import pickle +import textwrap +import traceback +import sqlite3 +import copy +import collections +from datetime import datetime +from json import dumps +from networkx import DiGraph +from autosubmit.config.basicConfig import BasicConfig +from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates + + +CURRENT_DB_VERSION = 10 +_debug = True +JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish', + 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id','extra_data']) + +# JobItem = collections.namedtuple( +# 'JobItem', 'id counter job_name created modified submit start finish status rowtype ncpus wallclock qos energy date section member chunk last platform') + + +class JobData(): + """Job Data object + """ + + def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=1, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict()): + """[summary] + + Args: + _id (int): Internal Id + counter (int, optional): [description]. Defaults to 1. + job_name (str, optional): [description]. Defaults to "None". + created (datetime, optional): [description]. Defaults to None. + modified (datetime, optional): [description]. Defaults to None. + submit (int, optional): [description]. Defaults to 0. + start (int, optional): [description]. Defaults to 0. + finish (int, optional): [description]. Defaults to 0. + status (str, optional): [description]. Defaults to "UNKNOWN". + rowtype (int, optional): [description]. Defaults to 1. + ncpus (int, optional): [description]. Defaults to 0. + wallclock (str, optional): [description]. Defaults to "00:00". + qos (str, optional): [description]. Defaults to "debug". + energy (int, optional): [description]. Defaults to 0. + date (str, optional): [description]. Defaults to "". + section (str, optional): [description]. Defaults to "". + member (str, optional): [description]. Defaults to "". + chunk (int, optional): [description]. Defaults to 0. + last (int, optional): [description]. Defaults to 1. + platform (str, optional): [description]. Defaults to "NA". + job_id (int, optional): [description]. Defaults to 0. + """ + self._id = _id + self.counter = counter + self.job_name = job_name + self.created = created if created else datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + self.modified = modified if modified else datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + self._submit = int(submit) + self._start = int(start) + self._finish = int(finish) + self.status = status + self.rowtype = rowtype + self.ncpus = ncpus + self.wallclock = wallclock + self.qos = qos if qos else "debug" + self._energy = energy if energy else 0 + self.date = date if date else "" + self.section = section if section else "" + self.member = member if member else "" + self.chunk = chunk if chunk else 0 + self.last = last + self._platform = platform if platform and len( + platform) > 0 else "NA" + self.job_id = job_id if job_id else 0 + self.extra_data = dumps(extra_data) + + @property + def submit(self): + return int(self._submit) + + @property + def start(self): + return int(self._start) + + @property + def finish(self): + return int(self._finish) + + @property + def platform(self): + return self._platform + + @property + def energy(self): + return self._energy + + @submit.setter + def submit(self, submit): + self._submit = int(submit) + + @start.setter + def start(self, start): + self._start = int(start) + + @finish.setter + def finish(self, finish): + self._finish = int(finish) + + @platform.setter + def platform(self, platform): + self._platform = platform if platform and len(platform) > 0 else "NA" + + @energy.setter + def energy(self, energy): + self._energy = energy if energy else 0 + + +class JobDataList(): + """Object that stores the list of jobs to be handled. + """ + def __init__(self, expid): + self.jobdata_list = list() + self.expid = expid + + def add_jobdata(self, jobdata): + self.jobdata_list.append(jobdata) + + def size(self): + return len(self.jobdata_list) + + +class JobDataStructure(): + + def __init__(self, expid): + """Initializes the object based on the unique identifier of the experiment. + + Args: + expid (str): Experiment identifier + """ + BasicConfig.read() + self.expid = expid + self.folder_path = BasicConfig.JOBDATA_DIR + self.database_path = os.path.join( + self.folder_path, "job_data_" + str(expid) + ".db") + self.conn = None + self.jobdata_list = JobDataList(self.expid) + self.create_table_query = textwrap.dedent( + '''CREATE TABLE + IF NOT EXISTS job_data ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + counter INTEGER NOT NULL, + job_name TEXT NOT NULL, + created TEXT NOT NULL, + modified TEXT NOT NULL, + submit INTEGER NOT NULL, + start INTEGER NOT NULL, + finish INTEGER NOT NULL, + status TEXT NOT NULL, + rowtype INTEGER NOT NULL, + ncpus INTEGER NOT NULL, + wallclock TEXT NOT NULL, + qos TEXT NOT NULL, + energy INTEGER NOT NULL, + date TEXT NOT NULL, + section TEXT NOT NULL, + member TEXT NOT NULL, + chunk INTEGER NOT NULL, + last INTEGER NOT NULL, + platform TEXT NOT NULL, + job_id INTEGER NOT NULL, + extra_data TEXT NOT NULL, + UNIQUE(counter,job_name) + );''') + if not os.path.exists(self.database_path): + open(self.database_path, "w") + self.conn = self.create_connection(self.database_path) + self.create_table() + if self._set_pragma_version(CURRENT_DB_VERSION): + Log.info("Database version set.") + else: + self.conn = self.create_connection(self.database_path) + + def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0): + """Writes submit time of job. + + Args: + job_name ([type]): [description] + submit (int, optional): [description]. Defaults to 0. + status (str, optional): [description]. Defaults to "UNKNOWN". + ncpus (int, optional): [description]. Defaults to 0. + wallclock (str, optional): [description]. Defaults to "00:00". + qos (str, optional): [description]. Defaults to "debug". + date (str, optional): [description]. Defaults to "". + member (str, optional): [description]. Defaults to "". + section (str, optional): [description]. Defaults to "". + chunk (int, optional): [description]. Defaults to 0. + platform (str, optional): [description]. Defaults to "NA". + job_id (int, optional): [description]. Defaults to 0. + + Returns: + [type]: [description] + """ + #print("Saving write submit " + job_name) + try: + job_data = self.get_job_data(job_name) + current_counter = 1 + max_counter = self._get_maxcounter_jobdata() + #submit = parse_date(submit) if submit > 0 else 0 + #print("submit job data " + str(job_data)) + if job_data and len(job_data) > 0: + # print("job data has 1 element") + # max_counter = self._get_maxcounter_jobdata() + job_max_counter = max(job.counter for job in job_data) + current_last = [ + job for job in job_data if job.counter == job_max_counter] + for current in current_last: + # Deactivate current last for this job + current.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + up_id = self._deactivate_current_last(current) + # Finding current counter + current_counter = ( + job_max_counter + 1) if job_max_counter >= max_counter else max_counter + else: + current_counter = max_counter + # Insert new last + rowid = self._insert_job_data(JobData( + 0, current_counter, job_name, None, None, submit, 0, 0, status, 1, ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id)) + # print(rowid) + if rowid: + return True + else: + return None + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) + return None + + # if rowid > 0: + # print("Successfully inserted") + + def write_start_time(self, job_name, start=0, status="UNKWNONW", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0): + """Writes start time into the database + + Args: + job_name (str): Name of Job + start (int, optional): Start time. Defaults to 0. + status (str, optional): Status of job. Defaults to "UNKWNONW". + ncpus (int, optional): Number of cpis. Defaults to 0. + wallclock (str, optional): Wallclock value. Defaults to "00:00". + qos (str, optional): Name of QoS. Defaults to "debug". + date (str, optional): Date from config. Defaults to "". + member (str, optional): Member from config. Defaults to "". + section (str, optional): [description]. Defaults to "". + chunk (int, optional): [description]. Defaults to 0. + platform (str, optional): [description]. Defaults to "NA". + job_id (int, optional): [description]. Defaults to 0. + + Returns: + [type]: [description] + """ + try: + job_data_last = self.get_job_data_last(job_name) + # Updating existing row + if job_data_last: + job_data_last = job_data_last[0] + if job_data_last.start == 0: + job_data_last.start = start + job_data_last.status = status + job_data_last.job_id = job_id + job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + _updated = self._update_start_job_data(job_data_last) + return _updated + # It is necessary to create a new row + submit_inserted = self.write_submit_time( + job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) + if submit_inserted: + # print("retro start") + self.write_start_time(job_name, start, status, + ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) + return True + else: + return None + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) + return None + + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None): + """Writes the finish time into the database + + Args: + job_name (str): Name of Job. + finish (int, optional): Finish time. Defaults to 0. + status (str, optional): Current Status. Defaults to "UNKNOWN". + ncpus (int, optional): Number of cpus. Defaults to 0. + wallclock (str, optional): Wallclock value. Defaults to "00:00". + qos (str, optional): Name of QoS. Defaults to "debug". + date (str, optional): Date from config. Defaults to "". + member (str, optional): Member from config. Defaults to "". + section (str, optional): Section from config. Defaults to "". + chunk (int, optional): Chunk from config. Defaults to 0. + platform (str, optional): Name of platform of job. Defaults to "NA". + job_id (int, optional): Id of job. Defaults to 0. + platform_object (obj, optional): Platform object. Defaults to None. + + Returns: + Boolean/None: True if success, None if exception. + """ + try: + # print("Writing finish time \t" + str(job_name) + "\t" + str(finish)) + job_data_last = self.get_job_data_last(job_name) + energy = 0 + submit_time = start_time = finish_time = 0 + extra_data = dict() + # Updating existing row + if job_data_last: + job_data_last = job_data_last[0] + # if job_data_last.finish == 0: + # Call Slurm here, update times. + if platform_object: + # print("There is platform object") + try: + if type(platform_object) is not str: + if platform_object.type == "slurm": + #print("Checking Slurm for " + str(job_name)) + submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy(job_id) + except Exception as exp: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) + energy = 0 + job_data_last.finish = int(finish_time) if finish_time > 0 else int(finish) + job_data_last.status = status + job_data_last.job_id = job_id + job_data_last.energy = energy + job_data_last.extra_data = dumps(extra_data) + job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S') + if submit_time > 0 and start_time > 0: + job_data_last.submit = int(submit_time) + job_data_last.start = int(start_time) + rowid = self._update_finish_job_data_plus(job_data_last) + else: + rowid = self._update_finish_job_data(job_data_last) + return True + # It is necessary to create a new row + submit_inserted = self.write_submit_time( + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id) + write_inserted = self.write_start_time(job_name, finish, status, ncpus, + wallclock, qos, date, member, section, chunk, platform, job_id) + #print(submit_inserted) + #print(write_inserted) + if submit_inserted and write_inserted: + #print("retro finish") + self.write_finish_time( + job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object) + else: + return None + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) + return None + + def get_all_job_data(self): + """[summary] + + Raises: + Exception: [description] + """ + try: + if os.path.exists(self.folder_path): + + current_table = self._get_all_job_data() + current_job_data = dict() + for item in current_table: + # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item + job_item = JobItem(*item) + self.jobdata_list.add_jobdata(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) + + else: + raise Exception("Job data folder not found :" + + str(self.jobdata_path)) + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) + return None + + def get_job_data(self, job_name): + """Retrieves all the rows that have the same job_name + + Args: + job_name (str): [description] + + Raises: + Exception: If path to data folder does not exist + + Returns: + [type]: None if error, list of jobs if successful + """ + try: + job_data = list() + if os.path.exists(self.folder_path): + current_job = self._get_job_data(job_name) + for item in current_job: + # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item + job_item = JobItem(*item) + job_data.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) + # job_data.append(JobData(_id, _counter, _job_name, _created, _modified, + # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform)) + return job_data + else: + raise Exception("Job data folder not found :" + + str(self.jobdata_path)) + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) + return None + + def get_job_data_last(self, job_name): + """ Returns latest jobdata row for a job_name. The current version. + + Args: + job_name ([type]): [description] + + Raises: + Exception: [description] + + Returns: + [type]: None if error, JobData if success + """ + try: + jobdata = list() + if os.path.exists(self.folder_path): + current_job_last = self._get_job_data_last(job_name) + if current_job_last: + for current in current_job_last: + job_item = JobItem(*current) + # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = current_job_last + # return JobData(_id, _counter, _job_name, _created, _modified, + # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform) + jobdata.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status, + job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data)) + return jobdata + else: + return None + else: + raise Exception("Job data folder not found :" + + str(self.jobdata_path)) + except Exception as exp: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning(str(exp)) + return None + + def _deactivate_current_last(self, jobdata): + """Sets last = 0 to row with id + + Args: + jobdata ([type]): [description] + + Returns: + [type]: [description] + """ + try: + if self.conn: + sql = ''' UPDATE job_data SET last=0, modified = ? WHERE id = ?''' + tuplerow = (jobdata.modified, jobdata._id) + cur = self.conn.cursor() + cur.execute(sql, tuplerow) + self.conn.commit() + return cur.lastrowid + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Insert : " + str(type(e).__name__)) + return None + + def _update_start_job_data(self, jobdata): + """Update start time of job data row + + Args: + jobdata ([type]): [description] + + Returns: + [type]: [description] + """ + # current_time = + try: + if self.conn: + sql = ''' UPDATE job_data SET start=?, modified=?, job_id=?, status=? WHERE id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (int(jobdata.start), + jobdata.modified, jobdata.job_id, jobdata.status, jobdata._id)) + self.conn.commit() + return True + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Insert : " + str(type(e).__name__)) + return None + + def _update_finish_job_data_plus(self, jobdata): + """Updates the finish job data, also updates submit, start times. + + Args: + jobdata (JobData): JobData object + + Returns: + int/None: lastrowid if success, None if error + """ + try: + if self.conn: + sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=? WHERE id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (jobdata.submit, jobdata.start, jobdata.finish, jobdata.modified, jobdata.job_id, + jobdata.status, jobdata.energy, jobdata.extra_data, jobdata._id)) + self.conn.commit() + return cur.lastrowid + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Update : " + str(type(e).__name__)) + return None + + def _update_finish_job_data(self, jobdata): + """Update register with id. Updates finish, modified, status. + + Args: + jobdata ([type]): [description] + + Returns: + [type]: None if error, lastrowid if success + """ + try: + if self.conn: + # print("Updating finish time") + sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=? WHERE id=? ''' + cur = self.conn.cursor() + cur.execute(sql, (jobdata.finish, jobdata.modified, jobdata.job_id, + jobdata.status, jobdata.energy, jobdata.extra_data, jobdata._id)) + self.conn.commit() + return cur.lastrowid + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Update : " + str(type(e).__name__)) + return None + + def _insert_job_data(self, jobdata): + """[summary] + + Args: + jobdata ([type]): JobData object + + Returns: + [type]: None if error, lastrowid if correct + """ + try: + if self.conn: + #print("preparing to insert") + sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ''' + tuplerow = (jobdata.counter, jobdata.job_name, jobdata.created, jobdata.modified, jobdata.submit, jobdata.start, + jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id, jobdata.extra_data) + cur = self.conn.cursor() + #print("pre insert") + cur.execute(sql, tuplerow) + self.conn.commit() + #print("Inserted " + str(jobdata.job_name)) + return cur.lastrowid + else: + #print("Not a valid connection.") + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Insert : " + str(type(e).__name__) + + "\t " + str(jobdata.job_name) + "\t" + str(jobdata.counter)) + return None + + def _get__all_job_data(self): + """ + Get all registers from job_data.\n + :return: row content: exp_id, name, status, seconds_diff + :rtype: 4-tuple (int, str, str, int) + """ + try: + #conn = create_connection(path) + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute( + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data") + rows = cur.fetchall() + return rows + else: + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Select : " + str(type(e).__name__)) + return list() + + def _get_job_data(self, job_name): + """[summary] + + Args: + job_name ([type]): [description] + + Returns: + [type]: None if error, list of tuple if found (list can be empty) + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute( + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,)) + rows = cur.fetchall() + # print(rows) + return rows + else: + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Select : " + str(type(e).__name__)) + return None + + def _get_job_data_last(self, job_name): + """Returns the latest row for a job_name. The current version. + + Args: + job_name ([type]): [description] + + Returns: + [type]: [description] + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute( + "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,)) + rows = cur.fetchall() + if rows and len(rows) > 0: + return rows + else: + return None + else: + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Select : " + str(type(e).__name__)) + return None + + def _set_pragma_version(self, version = 2): + """Sets current version of the schema + + Args: + version (int, optional): Current Version. Defaults to 1. + + Returns: + Boolean/None: True if success, None if error + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute("pragma user_version={v:d}".format(v=version)) + self.conn.commit() + return True + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on version : " + str(type(e).__name__)) + return None + + def _get_maxcounter_jobdata(self): + """Return the maxcounter of the experiment + + Returns: + [type]: [description] + """ + try: + if self.conn: + self.conn.text_factory = str + cur = self.conn.cursor() + cur.execute("SELECT MAX(counter) as maxcounter FROM job_data") + rows = cur.fetchall() + if len(rows) > 0: + #print("Row " + str(rows[0])) + result, = rows[0] + return int(result) if result else 1 + else: + # Starting value + return 1 + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on Select Max : " + str(type(e).__name__)) + return None + + def create_table(self): + """ create a table from the create_table_sql statement + :param conn: Connection object + :param create_table_sql: a CREATE TABLE statement + :return: + """ + try: + if self.conn: + c = self.conn.cursor() + c.execute(self.create_table_query) + else: + raise IOError("Not a valid connection") + except IOError as exp: + Log.warning(exp) + return None + except sqlite3.Error as e: + if _debug == True: + Log.info(traceback.format_exc()) + Log.warning("Error on create table : " + str(type(e).__name__)) + return None + + def create_connection(self, db_file): + """ + Create a database connection to the SQLite database specified by db_file. + :param db_file: database file name + :return: Connection object or None + """ + try: + conn = sqlite3.connect(db_file) + return conn + except: + return None diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py index fb3e3f0ff96f9debaff8e05706465f92a194f793..7beff44e8af9e360b9e37f453763b13a8b2c538c 100644 --- a/autosubmit/database/db_structure.py +++ b/autosubmit/database/db_structure.py @@ -1,3 +1,22 @@ +#!/usr/bin/env python + +# Copyright 2015 Earth Sciences Department, BSC-CNS + +# This file is part of Autosubmit. + +# Autosubmit is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# Autosubmit is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . + import os import sys import string @@ -55,7 +74,8 @@ def get_structure(exp_id, structures_path): return None else: # pkl folder not found - raise Exception("pkl folder not found " + str(structures_path)) + raise Exception("Structures folder not found " + + str(structures_path)) except Exception as exp: print(traceback.format_exc()) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 000b607d9f6c1a775da3f55696c68539e80ba42a..101026694a1b96b53f249af47b3866fe90242826 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -38,6 +38,7 @@ from autosubmit.job.job_common import Status, Type from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty from autosubmit.config.basicConfig import BasicConfig +from autosubmit.database.db_jobdata import JobDataStructure from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates from time import sleep from threading import Thread @@ -126,6 +127,7 @@ class Job(object): self.packed = False self.hold = False self.distance_weight = 0 + def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -553,9 +555,9 @@ class Job(object): if new_status == Status.COMPLETED: Log.debug("This job seems to have completed: checking...") - if not self.platform.get_completed_files(self.name): - log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') + log_name = os.path.join( + self._tmp_path, self.name + '_COMPLETED') self.check_completion() else: @@ -641,7 +643,7 @@ class Job(object): :param default_status: status to set if job is not completed. By default is FAILED :type default_status: Status """ - log_name = os.path.join(self._tmp_path,self.name + '_COMPLETED') + log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') if os.path.exists(log_name): self.status = Status.COMPLETED @@ -963,6 +965,9 @@ class Job(object): else: f = open(path, 'w') f.write(date2str(datetime.datetime.now(), 'S')) + # Writing database + JobDataStructure(self.expid).write_submit_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id) def write_start_time(self): """ @@ -982,6 +987,9 @@ class Job(object): f.write(' ') # noinspection PyTypeChecker f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S')) + # Writing database + JobDataStructure(self.expid).write_start_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id) return True def write_end_time(self, completed): @@ -995,16 +1003,25 @@ class Job(object): path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') f.write(' ') + finish_time = None + final_status = None if end_time > 0: # noinspection PyTypeChecker f.write(date2str(datetime.datetime.fromtimestamp(end_time), 'S')) + # date2str(datetime.datetime.fromtimestamp(end_time), 'S') + finish_time = end_time else: f.write(date2str(datetime.datetime.now(), 'S')) + finish_time = time.time() # date2str(datetime.datetime.now(), 'S') f.write(' ') if completed: + final_status = "COMPLETED" f.write('COMPLETED') else: + final_status = "FAILED" f.write('FAILED') + JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors, + self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform) def check_started_after(self, date_limit): """ @@ -1356,7 +1373,8 @@ done total = total * 1.15 hour = int(total) minute = int((total - int(total)) * 60.0) - second = int(((total - int(total)) * 60 - int((total - int(total)) * 60.0)) * 60.0) + second = int(((total - int(total)) * 60 - + int((total - int(total)) * 60.0)) * 60.0) wallclock_delta = datetime.timedelta(hours=hour, minutes=minute, seconds=second) if elapsed > wallclock_delta: @@ -1372,6 +1390,3 @@ done time = int(output[index]) time = self._parse_timestamp(time) return time - - - diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 58f008eb546c26681c0dec3ef5322dccc083457c..03995f76a2f3bd4662ed7f47aa6bb62476f15141 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -38,7 +38,6 @@ class ParamikoPlatform(Platform): self.submit_cmd = "" self._ftpChannel = None - @property def header(self): """ @@ -58,6 +57,7 @@ class ParamikoPlatform(Platform): :rtype: object """ return self._wrapper + def restore_connection(self): connected = True if self._ssh is None: @@ -68,15 +68,15 @@ class ParamikoPlatform(Platform): while connected == False and retry < retries: if self.connect(True): connected = True - retry+=1 + retry += 1 if not connected: - Log.error('Can not create ssh or sftp connection to {0}: Connection could not be established to platform {1}\n Please, check your expid platform.conf to see if there are mistakes in the configuration\n Also Ensure that the login node listed on HOST parameter is available(try to connect via ssh on a terminal)\n Also you can put more than one host using a comma as separator', self.host,self.name) - Log.critical('Experiment cant no continue without unexpected behaviour, Stopping Autosubmit') + Log.error('Can not create ssh or sftp connection to {0}: Connection could not be established to platform {1}\n Please, check your expid platform.conf to see if there are mistakes in the configuration\n Also Ensure that the login node listed on HOST parameter is available(try to connect via ssh on a terminal)\n Also you can put more than one host using a comma as separator', self.host, self.name) + Log.critical( + 'Experiment cant no continue without unexpected behaviour, Stopping Autosubmit') exit(0) return connected - - def connect(self,reconnect=False): + def connect(self, reconnect=False): """ Creates ssh connection to host @@ -96,26 +96,29 @@ class ParamikoPlatform(Platform): self._host_config = self._ssh_config.lookup(self.host) if "," in self._host_config['hostname']: if reconnect: - self._host_config['hostname'] = random.choice(self._host_config['hostname'].split(',')[1:]) + self._host_config['hostname'] = random.choice( + self._host_config['hostname'].split(',')[1:]) else: - self._host_config['hostname'] = self._host_config['hostname'].split(',')[0] + self._host_config['hostname'] = self._host_config['hostname'].split(',')[ + 0] if 'identityfile' in self._host_config: self._host_config_id = self._host_config['identityfile'] if 'proxycommand' in self._host_config: - self._proxy = paramiko.ProxyCommand(self._host_config['proxycommand']) + self._proxy = paramiko.ProxyCommand( + self._host_config['proxycommand']) self._ssh.connect(self._host_config['hostname'], 22, username=self.user, key_filename=self._host_config_id, sock=self._proxy) else: self._ssh.connect(self._host_config['hostname'], 22, username=self.user, key_filename=self._host_config_id) - self.transport = paramiko.Transport((self._host_config['hostname'], 22)) + self.transport = paramiko.Transport( + (self._host_config['hostname'], 22)) self.transport.connect(username=self.user) self._ftpChannel = self._ssh.open_sftp() return True except: return False - def check_completed_files(self, sections=None): if self.host == 'localhost': @@ -130,7 +133,7 @@ class ParamikoPlatform(Platform): else: command += " -name *_COMPLETED" - if self.send_command(command,True): + if self.send_command(command, True): return self._ssh_output else: return None @@ -144,7 +147,8 @@ class ParamikoPlatform(Platform): open(multiple_delete_previous_run, 'w+').write("rm -f"+filenames) os.chmod(multiple_delete_previous_run, 0o770) self.send_file(multiple_delete_previous_run, False) - command = os.path.join(self.get_files_path(),"multiple_delete_previous_run.sh") + command = os.path.join(self.get_files_path(), + "multiple_delete_previous_run.sh") if self.send_command(command, ignore_log=True): return self._ssh_output @@ -167,10 +171,10 @@ class ParamikoPlatform(Platform): try: local_path = os.path.join(os.path.join(self.tmp_path, filename)) - remote_path = os.path.join(self.get_files_path(), os.path.basename(filename)) + remote_path = os.path.join( + self.get_files_path(), os.path.basename(filename)) self._ftpChannel.put(local_path, remote_path) - self._ftpChannel.chmod(remote_path,os.stat(local_path).st_mode) - + self._ftpChannel.chmod(remote_path, os.stat(local_path).st_mode) return True except BaseException as e: @@ -236,22 +240,22 @@ class ParamikoPlatform(Platform): try: #ftp = self._ssh.open_sftp() - self._ftpChannel.remove(os.path.join(self.get_files_path(), filename)) - #ftp.close() + self._ftpChannel.remove(os.path.join( + self.get_files_path(), filename)) + # ftp.close() return True except IOError: return False except BaseException as e: if e.lower().contains("garbage"): - Log.error("Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ") + Log.error( + "Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ") raise - Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename))) + Log.debug('Could not remove file {0}'.format( + os.path.join(self.get_files_path(), filename))) return False - - - - def move_file(self, src, dest,must_exist=False): + def move_file(self, src, dest, must_exist=False): """ Moves a file on the platform (includes .err and .out) :param src: source name @@ -263,17 +267,17 @@ class ParamikoPlatform(Platform): if not self.restore_connection(): return False try: - path_root=self.get_files_path() + path_root = self.get_files_path() self._ftpChannel.rename(os.path.join(path_root, src), os.path.join(path_root, dest)) return True except: if must_exist: - raise Exception('File {0} does not exists'.format(os.path.join(self.get_files_path(), src))) + raise Exception('File {0} does not exists'.format( + os.path.join(self.get_files_path(), src))) else: return False - def submit_job(self, job, script_name, hold=False): """ Submit a job from a given job object. @@ -297,6 +301,22 @@ class ParamikoPlatform(Platform): return int(job_id) else: return None + + def check_job_energy(self, job_id): + """ + Checks job energy and return values. Defined in child classes. + + Args: + job_id (int): Id of Job + + Returns: + 4-tuple (int, int, int, int): submit time, start time, finish time, energy + """ + check_energy_cmd = self.get_job_energy_cmd(job_id) + self.send_command(check_energy_cmd) + return self.parse_job_finish_data( + self.get_ssh_output(), job_id) + def submit_Script(self, hold=False): """ Sends a SubmitfileScript, exec in platform and retrieve the Jobs_ID. @@ -323,19 +343,23 @@ class ParamikoPlatform(Platform): job_id = job.id job_status = Status.UNKNOWN if type(job_id) is not int and type(job_id) is not str: - Log.error('check_job() The job id ({0}) is not an integer neither a string.', job_id) + Log.error( + 'check_job() The job id ({0}) is not an integer neither a string.', job_id) job.new_status = job_status - sleep_time=5 + sleep_time = 5 while not (self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0) or (self.get_ssh_output() == "" and retries >= 0): retries -= 1 - Log.debug('Retrying check job command: {0}', self.get_checkjob_cmd(job_id)) + Log.debug( + 'Retrying check job command: {0}', self.get_checkjob_cmd(job_id)) Log.debug('retries left {0}', retries) Log.debug('Will be retrying in {0} seconds', sleep_time) sleep(sleep_time) sleep_time = sleep_time+5 if retries >= 0: - Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id)) - job_status = self.parse_job_output(self.get_ssh_output()).strip("\n") + Log.debug( + 'Successful check job command: {0}', self.get_checkjob_cmd(job_id)) + job_status = self.parse_job_output( + self.get_ssh_output()).strip("\n") # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED'] or retries == 0: job_status = Status.COMPLETED @@ -350,16 +374,20 @@ class ParamikoPlatform(Platform): else: job_status = Status.UNKNOWN else: - Log.error(" check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id)) + Log.error( + " check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id)) job_status = Status.UNKNOWN - Log.error('check_job() The job id ({0}) status is {1}.', job_id, job_status) + Log.error( + 'check_job() The job id ({0}) status is {1}.', job_id, job_status) job.new_status = job_status - def _check_jobid_in_queue(self,ssh_output,job_list_cmd): + + def _check_jobid_in_queue(self, ssh_output, job_list_cmd): for job in job_list_cmd[:-1].split(','): if job not in ssh_output: return False return True - def check_Alljobs(self, job_list,job_list_cmd,remote_logs, retries=5): + + def check_Alljobs(self, job_list, job_list_cmd, remote_logs, retries=5): """ Checks jobs running status @@ -373,23 +401,24 @@ class ParamikoPlatform(Platform): """ cmd = self.get_checkAlljobs_cmd(job_list_cmd) - sleep_time=5 - while not (self.send_command(cmd) and retries >= 0) or ( not self._check_jobid_in_queue(self.get_ssh_output(),job_list_cmd) and retries >= 0): + sleep_time = 5 + while not (self.send_command(cmd) and retries >= 0) or (not self._check_jobid_in_queue(self.get_ssh_output(), job_list_cmd) and retries >= 0): retries -= 1 Log.debug('Retrying check job command: {0}', cmd) Log.debug('retries left {0}', retries) Log.debug('Will be retrying in {0} seconds', sleep_time) sleep(sleep_time) - sleep_time=sleep_time+5 + sleep_time = sleep_time+5 job_list_status = self.get_ssh_output() - Log.debug('Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output) + Log.debug( + 'Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output) if retries >= 0: in_queue_jobs = [] list_queue_jobid = "" for job in job_list: job_id = job.id - job_status = self.parse_Alljobs_output(job_list_status,job_id) + job_status = self.parse_Alljobs_output(job_list_status, job_id) # URi: define status list in HPC Queue Class if job_status in self.job_status['COMPLETED']: job_status = Status.COMPLETED @@ -397,7 +426,7 @@ class ParamikoPlatform(Platform): job_status = Status.RUNNING elif job_status in self.job_status['QUEUING']: if job.hold: - job_status = Status.HELD # release? + job_status = Status.HELD # release? else: job_status = Status.QUEUING list_queue_jobid += str(job.id) + ',' @@ -410,39 +439,58 @@ class ParamikoPlatform(Platform): else: job_status = Status.UNKNOWN - Log.error('check_job() The job id ({0}) status is {1}.', job_id, job_status) - job.new_status=job_status + Log.error( + 'check_job() The job id ({0}) status is {1}.', job_id, job_status) + job.new_status = job_status reason = str() if self.type == 'slurm' and len(in_queue_jobs) > 0: - cmd=self.get_queue_status_cmd(list_queue_jobid) + cmd = self.get_queue_status_cmd(list_queue_jobid) self.send_command(cmd) - queue_status=self._ssh_output + queue_status = self._ssh_output for job in in_queue_jobs: - reason = self.parse_queue_reason(queue_status,job.id) + reason = self.parse_queue_reason(queue_status, job.id) if job.queuing_reason_cancel(reason): - Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", job.name, reason) - self.send_command(self.platform.cancel_cmd + " {0}".format(job.id)) + Log.error( + "Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", job.name, reason) + self.send_command( + self.platform.cancel_cmd + " {0}".format(job.id)) job.new_status = Status.FAILED job.update_status(remote_logs) return elif reason == '(JobHeldUser)': - job.new_status=Status.HELD + job.new_status = Status.HELD if not job.hold: - self.send_command("scontrol release "+"{0}".format(job.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS - Log.info("Job {0} is being released (id:{1}) ", job.name,job.id) + # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS + self.send_command( + "scontrol release "+"{0}".format(job.id)) + Log.info( + "Job {0} is being released (id:{1}) ", job.name, job.id) else: Log.info("Job {0} is HELD", job.name) elif reason == '(JobHeldAdmin)': - Log.info("Job {0} Failed to be HELD, canceling... ", job.name) + Log.info( + "Job {0} Failed to be HELD, canceling... ", job.name) job.new_status = Status.WAITING - job.platform.send_command(job.platform.cancel_cmd + " {0}".format(job.id)) + job.platform.send_command( + job.platform.cancel_cmd + " {0}".format(job.id)) else: for job in job_list: job_status = Status.UNKNOWN - Log.warning('check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status) - job.new_status=job_status + Log.warning( + 'check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status) + job.new_status = job_status + + def get_job_energy_cmd(self, job_id): + """ + Returns command to check job energy on remote platforms + :param job_id: id of job to check + :type job_id: int + :return: command to check job status + :rtype: str + """ + raise NotImplementedError def get_checkjob_cmd(self, job_id): """ @@ -465,6 +513,7 @@ class ParamikoPlatform(Platform): :rtype: str """ raise NotImplementedError + def send_command(self, command, ignore_log=False): """ Sends given command to HPC @@ -478,7 +527,8 @@ class ParamikoPlatform(Platform): if not self.restore_connection(): return False if "-rP" in command or "find" in command or "convertLink" in command: - timeout = 60*60 # Max Wait 1hour if the command is a copy or simbolic links ( migrate can trigger long times) + # Max Wait 1hour if the command is a copy or simbolic links ( migrate can trigger long times) + timeout = 60*60 elif "rm" in command: timeout = 60/2 else: @@ -490,7 +540,8 @@ class ParamikoPlatform(Platform): stdin.close() channel.shutdown_write() stdout_chunks = [] - stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) + stdout_chunks.append(stdout.channel.recv( + len(stdout.channel.in_buffer))) stderr_readlines = [] while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): @@ -499,12 +550,14 @@ class ParamikoPlatform(Platform): readq, _, _ = select.select([stdout.channel], [], [], 2) for c in readq: if c.recv_ready(): - stdout_chunks.append(stdout.channel.recv(len(c.in_buffer))) + stdout_chunks.append( + stdout.channel.recv(len(c.in_buffer))) #stdout_chunks.append(" ") got_chunk = True if c.recv_stderr_ready(): # make sure to read stderr to prevent stall - stderr_readlines.append(stderr.channel.recv_stderr(len(c.in_stderr_buffer))) + stderr_readlines.append( + stderr.channel.recv_stderr(len(c.in_stderr_buffer))) #stdout_chunks.append(" ") got_chunk = True if not got_chunk and stdout.channel.exit_status_ready() and not stderr.channel.recv_stderr_ready() and not stdout.channel.recv_ready(): @@ -522,16 +575,20 @@ class ParamikoPlatform(Platform): self._ssh_output += s for errorLine in stderr_readlines: if errorLine.find("submission failed") != -1 or errorLine.find("git clone") != -1: - Log.critical('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) + Log.critical('Command {0} in {1} warning: {2}', command, self.host, '\n'.join( + stderr_readlines)) return False if not ignore_log: if len(stderr_readlines) > 0: - Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines)) + Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join( + stderr_readlines)) else: - Log.debug('Command {0} in {1} successful with out message: {2}', command, self.host, self._ssh_output) + Log.debug('Command {0} in {1} successful with out message: {2}', + command, self.host, self._ssh_output) return True except BaseException as e: - Log.error('Can not send command {0} to {1}: {2}', command, self.host, e.message) + Log.error( + 'Can not send command {0} to {1}: {2}', command, self.host, e.message) return False def parse_job_output(self, output): @@ -544,7 +601,21 @@ class ParamikoPlatform(Platform): :rtype: str """ raise NotImplementedError - def parse_Alljobs_output(self, output,job_id): + + def parse_job_finish_data(self, output, job_id): + """ + Parses check job command output so it can be interpreted by autosubmit + + :param output: output to parse + :type output: str + :param job_id: Id of Job + :type job_id: Integer + :return: job status + :rtype: str + """ + raise NotImplementedError + + def parse_Alljobs_output(self, output, job_id): """ Parses check jobs command output so it can be interpreted by autosubmit :param output: output to parse @@ -561,8 +632,7 @@ class ParamikoPlatform(Platform): def get_submit_script(self): pass - - def get_submit_cmd(self, job_script, job_type,hold=False): + def get_submit_cmd(self, job_script, job_type, hold=False): """ Get command to add job to scheduler @@ -666,26 +736,37 @@ class ParamikoPlatform(Platform): header = header.replace('%ERR_LOG_DIRECTIVE%', err_filename) if hasattr(self.header, 'get_queue_directive'): - header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) + header = header.replace( + '%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) if hasattr(self.header, 'get_tasks_per_node'): - header = header.replace('%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job)) + header = header.replace( + '%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job)) if hasattr(self.header, 'get_threads_per_task'): - header = header.replace('%THREADS%', self.header.get_threads_per_task(job)) + header = header.replace( + '%THREADS%', self.header.get_threads_per_task(job)) if hasattr(self.header, 'get_scratch_free_space'): - header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) + header = header.replace( + '%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) if hasattr(self.header, 'get_custom_directives'): - header = header.replace('%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) + header = header.replace( + '%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job)) if hasattr(self.header, 'get_exclusivity'): - header = header.replace('%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) + header = header.replace( + '%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) if hasattr(self.header, 'get_account_directive'): - header = header.replace('%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) + header = header.replace( + '%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job)) if hasattr(self.header, 'get_memory_directive'): - header = header.replace('%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job)) + header = header.replace( + '%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job)) if hasattr(self.header, 'get_memory_per_task_directive'): - header = header.replace('%MEMORY_PER_TASK_DIRECTIVE%', self.header.get_memory_per_task_directive(job)) + header = header.replace( + '%MEMORY_PER_TASK_DIRECTIVE%', self.header.get_memory_per_task_directive(job)) if hasattr(self.header, 'get_hyperthreading_directive'): - header = header.replace('%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job)) + header = header.replace( + '%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job)) return header + def closeConnection(self): if self._ftpChannel is not None: self._ftpChannel.close() @@ -695,7 +776,6 @@ class ParamikoPlatform(Platform): self.transport.stop_thread() self.transport.sys.exit(0) - def check_remote_log_dir(self): """ Creates log dir on remote host @@ -704,20 +784,27 @@ class ParamikoPlatform(Platform): return False if self.type == "slurm": try: - self._ftpChannel.chdir(self.remote_log_dir) # Test if remote_path exists + # Test if remote_path exists + self._ftpChannel.chdir(self.remote_log_dir) except IOError: if self.send_command(self.get_mkdir_cmd()): - Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host) + Log.debug('{0} has been created on {1} .', + self.remote_log_dir, self.host) else: - Log.error('Could not create the DIR {0} on HPC {1}'.format(self.remote_log_dir, self.host)) + Log.error('Could not create the DIR {0} on HPC {1}'.format( + self.remote_log_dir, self.host)) except: Log.critical("Garbage detected") raise else: if self.send_command(self.get_mkdir_cmd()): - Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host) + Log.debug('{0} has been created on {1} .', + self.remote_log_dir, self.host) else: - Log.error('Could not create the DIR {0} on HPC {1}'.format(self.remote_log_dir, self.host)) + Log.error('Could not create the DIR {0} on HPC {1}'.format( + self.remote_log_dir, self.host)) + + class ParamikoPlatformException(Exception): """ Exception raised from HPC queues diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 9b8df22f95a672c1f25d471232a7b94e148f4abe..4146c2c37e7d919d74c7c17fa770ca802b0705e9 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -22,7 +22,8 @@ class Platform(object): self.expid = expid self.name = name self.config = config - self.tmp_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) + self.tmp_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) self._serial_platform = None self._serial_queue = None self._default_queue = None @@ -195,21 +196,22 @@ class Platform(object): :rtype: bool """ raise NotImplementedError - + # Executed when calling from Job def get_logs_files(self, exp_id, remote_logs): """ Get the given LOGS files - + :param exp_id: experiment id :type exp_id: str :param remote_logs: names of the log files :type remote_logs: (str, str) """ (job_out_filename, job_err_filename) = remote_logs - self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) + self.get_files([job_out_filename, job_err_filename], + False, 'LOG_{0}'.format(exp_id)) - def get_completed_files(self, job_name, retries=0,recovery=False): + def get_completed_files(self, job_name, retries=0, recovery=False): """ Get the COMPLETED file of the given job @@ -234,7 +236,6 @@ class Platform(object): else: return False - def remove_stat_file(self, job_name): """ Removes *STAT* files from remote @@ -264,6 +265,7 @@ class Platform(object): Log.debug('{0} been removed', filename) return True return False + def check_file_exists(self, src): return True @@ -279,7 +281,8 @@ class Platform(object): :rtype: bool """ filename = job_name + '_STAT' - stat_local_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) + stat_local_path = os.path.join( + self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename) if os.path.exists(stat_local_path): os.remove(stat_local_path) if self.check_file_exists(filename): @@ -297,7 +300,8 @@ class Platform(object): :rtype: str """ if self.type == "local": - path = os.path.join(self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid)) + path = os.path.join( + self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid)) else: path = os.path.join(self.root_dir, 'LOG_{0}'.format(self.expid)) return path @@ -328,8 +332,19 @@ class Platform(object): :rtype: autosubmit.job.job_common.Status """ raise NotImplementedError + def closeConnection(self): return + + def retrieve_energy_data(self, jobid): + """ + Retrieves energy data from job + + :return: 4-tuple (submit, start, finish, energy) + :rtype: 4-tuple(int, int, int, int) + """ + raise NotImplementedError + def write_jobid(self, jobid, complete_path): """ Writes Job id in an out file. @@ -342,10 +357,10 @@ class Platform(object): :rtype: Boolean """ try: - + title_job = "[INFO] JOBID=" + str(jobid) - if os.path.exists(complete_path): + if os.path.exists(complete_path): file_type = complete_path[-3:] if file_type == "out" or file_type == "err": with open(complete_path, "r+") as f: @@ -353,16 +368,15 @@ class Platform(object): first_line = f.readline() # Not rewrite if not first_line.startswith("[INFO] JOBID="): - content = f.read() + content = f.read() # Write again (Potentially slow) #start = time() - #Log.info("Attempting job identification of " + str(jobid)) - f.seek(0,0) - f.write(title_job + "\n\n" + first_line + content) - f.close() - #finish = time() - #Log.info("Job correctly identified in " + str(finish - start) + " seconds") + #Log.info("Attempting job identification of " + str(jobid)) + f.seek(0, 0) + f.write(title_job + "\n\n" + first_line + content) + f.close() + #finish = time() + #Log.info("Job correctly identified in " + str(finish - start) + " seconds") except Exception as ex: Log.info("Writing Job Id Failed : " + str(ex)) - diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index d382d80c7d2ee063104b4f35a54db4726bcd26dc..251d64f2106c581c2c8b5baabbb0a187a8ae5919 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -19,6 +19,8 @@ import os from time import sleep +from time import mktime +from datetime import datetime from xml.dom.minidom import parseString @@ -44,7 +46,8 @@ class SlurmPlatform(ParamikoPlatform): self.job_status['COMPLETED'] = ['COMPLETED'] self.job_status['RUNNING'] = ['RUNNING'] self.job_status['QUEUING'] = ['PENDING', 'CONFIGURING', 'RESIZING'] - self.job_status['FAILED'] = ['FAILED', 'CANCELLED','CANCELLED+', 'NODE_FAIL', 'PREEMPTED', 'SUSPENDED', 'TIMEOUT','OUT_OF_MEMORY','OUT_OF_ME+','OUT_OF_ME'] + self.job_status['FAILED'] = ['FAILED', 'CANCELLED', 'CANCELLED+', 'NODE_FAIL', + 'PREEMPTED', 'SUSPENDED', 'TIMEOUT', 'OUT_OF_MEMORY', 'OUT_OF_ME+', 'OUT_OF_ME'] self._pathdir = "\$HOME/LOG_" + self.expid self._allow_arrays = False self._allow_wrappers = True @@ -52,7 +55,8 @@ class SlurmPlatform(ParamikoPlatform): self.config = config exp_id_path = os.path.join(config.LOCAL_ROOT_DIR, self.expid) tmp_path = os.path.join(exp_id_path, "tmp") - self._submit_script_path = os.path.join(tmp_path , config.LOCAL_ASLOG_DIR,"submit_"+self.name+".sh") + self._submit_script_path = os.path.join( + tmp_path, config.LOCAL_ASLOG_DIR, "submit_"+self.name+".sh") self._submit_script_file = open(self._submit_script_path, 'w').close() def open_submit_script(self): @@ -62,10 +66,9 @@ class SlurmPlatform(ParamikoPlatform): def get_submit_script(self): self._submit_script_file.close() os.chmod(self._submit_script_path, 0o750) - return os.path.join(self.config.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path)) + return os.path.join(self.config.LOCAL_ASLOG_DIR, os.path.basename(self._submit_script_path)) - - def submit_Script(self,hold=False): + def submit_Script(self, hold=False): """ Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once. @@ -74,30 +77,34 @@ class SlurmPlatform(ParamikoPlatform): :return: job id for submitted jobs :rtype: list(str) """ - self.send_file(self.get_submit_script(),False) - cmd = os.path.join(self.get_files_path(),os.path.basename(self._submit_script_path)) + self.send_file(self.get_submit_script(), False) + cmd = os.path.join(self.get_files_path(), + os.path.basename(self._submit_script_path)) if self.send_command(cmd): jobs_id = self.get_submitted_job_id(self.get_ssh_output()) return jobs_id else: return None + def update_cmds(self): """ Updates commands for platforms """ - self.root_dir = os.path.join(self.scratch, self.project, self.user, self.expid) + self.root_dir = os.path.join( + self.scratch, self.project, self.user, self.expid) self.remote_log_dir = os.path.join(self.root_dir, "LOG_" + self.expid) self.cancel_cmd = "scancel" self._checkhost_cmd = "echo 1" - self._submit_cmd = 'sbatch -D {1} {1}/'.format(self.host, self.remote_log_dir) - self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format(self.host, self.remote_log_dir) + self._submit_cmd = 'sbatch -D {1} {1}/'.format( + self.host, self.remote_log_dir) + self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format( + self.host, self.remote_log_dir) self.put_cmd = "scp" self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir - def get_checkhost_cmd(self): return self._checkhost_cmd @@ -109,15 +116,43 @@ class SlurmPlatform(ParamikoPlatform): def parse_job_output(self, output): return output.strip().split(' ')[0].strip() - - def parse_Alljobs_output(self, output,job_id): - status =[x.split()[1] for x in output.splitlines() if x.split()[0] == str(job_id)] + + def parse_job_finish_data(self, output, job_id): + try: + detailed_data = dict() + output = output.strip() + lines = output.split("\n") + if len(lines) > 0: + for line in lines: + line = line.strip().split() + if len(line) > 0: + name = str(line[0]) + extra_data = { "energy" : str(line[5] if len(line) > 5 else "NA"), "MaxRSS" : str(line[6] if len(line) > 6 else "NA"), "AveRSS" : str(line[7] if len(line) > 6 else "NA")} + detailed_data[name] = extra_data + + line = lines[0].strip().split() + submit = int(mktime(datetime.strptime(line[2], "%Y-%m-%dT%H:%M:%S").timetuple())) + start = int(mktime(datetime.strptime(line[3], "%Y-%m-%dT%H:%M:%S").timetuple())) + finish = int(mktime(datetime.strptime(line[4], "%Y-%m-%dT%H:%M:%S").timetuple())) + joules = int(float(str(line[5])[:-1]) * 1000 if len(line[5]) > 0 else 0) + # print(detailed_data) + return (submit, start, finish, joules, detailed_data) + + return (0,0,0,0, dict()) + except Exception as exp: + # On error return 4*0 + Log.warning(str(exp)) + return (0,0,0,0, dict()) + + #return str(output) + + def parse_Alljobs_output(self, output, job_id): + status = [x.split()[1] for x in output.splitlines() + if x.split()[0] == str(job_id)] if len(status) == 0: return status return status[0] - - def get_submitted_job_id(self, outputlines): if outputlines.find("failed") != -1: raise Exception(outputlines) @@ -125,6 +160,7 @@ class SlurmPlatform(ParamikoPlatform): for output in outputlines.splitlines(): jobs_id.append(int(output.split(' ')[3])) return jobs_id + def jobs_in_queue(self): dom = parseString('') jobs_xml = dom.getElementsByTagName("JB_job_number") @@ -132,31 +168,34 @@ class SlurmPlatform(ParamikoPlatform): def get_submit_cmd(self, job_script, job, hold=False): if not hold: - self._submit_script_file.write(self._submit_cmd + job_script + "\n") + self._submit_script_file.write( + self._submit_cmd + job_script + "\n") else: - self._submit_script_file.write(self._submit_hold_cmd + job_script + "\n" ) - - + self._submit_script_file.write( + self._submit_hold_cmd + job_script + "\n") def get_checkjob_cmd(self, job_id): return 'sacct -n -X -j {1} -o "State"'.format(self.host, job_id) def get_checkAlljobs_cmd(self, jobs_id): return "sacct -n -X -j {1} -o jobid,State".format(self.host, jobs_id) + def get_queue_status_cmd(self, job_id): return 'squeue -j {0} -o %A,%R'.format(job_id) + def get_job_energy_cmd(self, job_id): + return 'sacct -n -j {0} -o JobId%20,State,Submit,Start,End,ConsumedEnergy,MaxRSS,AveRSS'.format(job_id) - def parse_queue_reason(self, output,job_id): - reason =[x.split(',')[1] for x in output.splitlines() if x.split(',')[0] == str(job_id)] + def parse_queue_reason(self, output, job_id): + reason = [x.split(',')[1] for x in output.splitlines() + if x.split(',')[0] == str(job_id)] if len(reason) > 0: return reason[0] return reason - @staticmethod - def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="asthreads"): - if method =='srun': + def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads, method="asthreads"): + if method == 'srun': language = "#!/bin/bash" return \ language + """ @@ -181,7 +220,7 @@ class SlurmPlatform(ParamikoPlatform): else: language = "#!/usr/bin/env python" return \ - language+""" + language+""" ############################################################################### # {0} ############################################################################### @@ -199,13 +238,13 @@ class SlurmPlatform(ParamikoPlatform): # ############################################################################### """.format(filename, queue, project, wallclock, num_procs, dependency, - '\n'.ljust(13).join(str(s) for s in directives),threads) + '\n'.ljust(13).join(str(s) for s in directives), threads) @staticmethod def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list")""" - def check_file_exists(self,filename): + def check_file_exists(self, filename): if not self.restore_connection(): return False file_exist = False @@ -214,11 +253,13 @@ class SlurmPlatform(ParamikoPlatform): max_retries = 3 while not file_exist and retries < max_retries: try: - self._ftpChannel.stat(os.path.join(self.get_files_path(), filename)) # This return IOError if path doesn't exist + # This return IOError if path doesn't exist + self._ftpChannel.stat(os.path.join( + self.get_files_path(), filename)) file_exist = True except IOError: # File doesn't exist, retry in sleeptime Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, - max_retries - retries, os.path.join(self.get_files_path(),filename)) + max_retries - retries, os.path.join(self.get_files_path(), filename)) sleep(sleeptime) sleeptime = sleeptime + 5 retries = retries + 1 @@ -227,4 +268,4 @@ class SlurmPlatform(ParamikoPlatform): file_exist = False # won't exist retries = 999 # no more retries - return file_exist \ No newline at end of file + return file_exist