diff --git a/.gitignore b/.gitignore
index 8eacd9c4d6dfd2c389347277de4b0680cc707031..aa1269dcc6085e26601c139dd581d4c70c37fd9c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@
/cover/
/.coverage
autosubmit/miniTest.py
+autosubmit/simple_test.py
diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py
index fa8ea9e5f4965fc9e20ae2dd187e4cae534a9f75..f81ceb80498dbc9aeeadd76543530b0b03c8888a 100644
--- a/autosubmit/autosubmit.py
+++ b/autosubmit/autosubmit.py
@@ -17,38 +17,11 @@
# You should have received a copy of the GNU General Public License
# along with Autosubmit. If not, see .
# pipeline_test
-from __future__ import print_function
-import threading
-from sets import Set
-from job.job_packager import JobPackager
-from job.job_exceptions import WrongTemplateException
-from platforms.paramiko_submitter import ParamikoSubmitter
-from notifications.notifier import Notifier
-from notifications.mail_notifier import MailNotifier
-from bscearth.utils.date import date2str
-from monitor.monitor import Monitor
-from database.db_common import get_autosubmit_version
-from database.db_common import delete_experiment
-from experiment.experiment_common import copy_experiment
-from experiment.experiment_common import new_experiment
-from database.db_common import create_db
-from bscearth.utils.log import Log
-from job.job_grouping import JobGrouping
-from job.job_list_persistence import JobListPersistencePkl
-from job.job_list_persistence import JobListPersistenceDb
-from job.job_package_persistence import JobPackagePersistence
-from job.job_packages import JobPackageThread
-from job.job_list import JobList
-from git.autosubmit_git import AutosubmitGit
-from job.job_common import Status
-from bscearth.utils.config_parser import ConfigParserFactory
-from config.config_common import AutosubmitConfig
-from config.basicConfig import BasicConfig
"""
Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit
"""
-
+from __future__ import print_function
try:
# noinspection PyCompatibility
from configparser import SafeConfigParser
@@ -77,16 +50,44 @@ import random
import signal
import datetime
import portalocker
+import threading
from pkg_resources import require, resource_listdir, resource_exists, resource_string
from distutils.util import strtobool
from collections import defaultdict
from pyparsing import nestedExpr
+
+from sets import Set
+
sys.path.insert(0, os.path.abspath('.'))
# noinspection PyPackageRequirements
# noinspection PyPackageRequirements
# from API.testAPI import Monitor
# noinspection PyPackageRequirements
+from job.job_packager import JobPackager
+from job.job_exceptions import WrongTemplateException
+from platforms.paramiko_submitter import ParamikoSubmitter
+from notifications.notifier import Notifier
+from notifications.mail_notifier import MailNotifier
+from bscearth.utils.date import date2str
+from monitor.monitor import Monitor
+from database.db_common import get_autosubmit_version
+from database.db_common import delete_experiment
+from experiment.experiment_common import copy_experiment
+from experiment.experiment_common import new_experiment
+from database.db_common import create_db
+from bscearth.utils.log import Log
+from job.job_grouping import JobGrouping
+from job.job_list_persistence import JobListPersistencePkl
+from job.job_list_persistence import JobListPersistenceDb
+from job.job_package_persistence import JobPackagePersistence
+from job.job_packages import JobPackageThread
+from job.job_list import JobList
+from git.autosubmit_git import AutosubmitGit
+from job.job_common import Status
+from bscearth.utils.config_parser import ConfigParserFactory
+from config.config_common import AutosubmitConfig
+from config.basicConfig import BasicConfig
# noinspection PyUnusedLocal
diff --git a/autosubmit/config/basicConfig.py b/autosubmit/config/basicConfig.py
index 9d9bfac809f122746a7a0e76c067b017521f92e7..9c729ebefcf73f42cb549c3d67dbc1364dd4739e 100755
--- a/autosubmit/config/basicConfig.py
+++ b/autosubmit/config/basicConfig.py
@@ -38,6 +38,8 @@ class BasicConfig:
DB_DIR = os.path.join(os.path.expanduser('~'), 'debug', 'autosubmit')
STRUCTURES_DIR = os.path.join(
'/esarchive', 'autosubmit', 'as_metadata', 'structures')
+ JOBDATA_DIR = os.path.join(
+ '/esarchive', 'autosubmit', 'as_metadata', 'data')
DB_FILE = 'autosubmit.db'
DB_PATH = os.path.join(DB_DIR, DB_FILE)
LOCAL_ROOT_DIR = DB_DIR
@@ -98,6 +100,8 @@ class BasicConfig:
BasicConfig.ALLOWED_HOSTS = parser.get('hosts', 'whitelist')
if parser.has_option('structures', 'path'):
BasicConfig.STRUCTURES_DIR = parser.get('structures', 'path')
+ if parser.has_option('historicdb', 'path'):
+ BasicConfig.JOBDATA_DIR = parser.get('historicdb', 'path')
@staticmethod
def read():
diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py
new file mode 100644
index 0000000000000000000000000000000000000000..b1cfaecac020a417b814f9da7d95e3f59240ffcf
--- /dev/null
+++ b/autosubmit/database/db_jobdata.py
@@ -0,0 +1,768 @@
+#!/usr/bin/env python
+
+# Copyright 2015 Earth Sciences Department, BSC-CNS
+
+# This file is part of Autosubmit.
+
+# Autosubmit is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# Autosubmit is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with Autosubmit. If not, see .
+
+import os
+import sys
+import string
+import time
+import pickle
+import textwrap
+import traceback
+import sqlite3
+import copy
+import collections
+from datetime import datetime
+from json import dumps
+from networkx import DiGraph
+from autosubmit.config.basicConfig import BasicConfig
+from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates
+
+
+CURRENT_DB_VERSION = 10
+_debug = True
+JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish',
+ 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id','extra_data'])
+
+# JobItem = collections.namedtuple(
+# 'JobItem', 'id counter job_name created modified submit start finish status rowtype ncpus wallclock qos energy date section member chunk last platform')
+
+
+class JobData():
+ """Job Data object
+ """
+
+ def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=1, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict()):
+ """[summary]
+
+ Args:
+ _id (int): Internal Id
+ counter (int, optional): [description]. Defaults to 1.
+ job_name (str, optional): [description]. Defaults to "None".
+ created (datetime, optional): [description]. Defaults to None.
+ modified (datetime, optional): [description]. Defaults to None.
+ submit (int, optional): [description]. Defaults to 0.
+ start (int, optional): [description]. Defaults to 0.
+ finish (int, optional): [description]. Defaults to 0.
+ status (str, optional): [description]. Defaults to "UNKNOWN".
+ rowtype (int, optional): [description]. Defaults to 1.
+ ncpus (int, optional): [description]. Defaults to 0.
+ wallclock (str, optional): [description]. Defaults to "00:00".
+ qos (str, optional): [description]. Defaults to "debug".
+ energy (int, optional): [description]. Defaults to 0.
+ date (str, optional): [description]. Defaults to "".
+ section (str, optional): [description]. Defaults to "".
+ member (str, optional): [description]. Defaults to "".
+ chunk (int, optional): [description]. Defaults to 0.
+ last (int, optional): [description]. Defaults to 1.
+ platform (str, optional): [description]. Defaults to "NA".
+ job_id (int, optional): [description]. Defaults to 0.
+ """
+ self._id = _id
+ self.counter = counter
+ self.job_name = job_name
+ self.created = created if created else datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
+ self.modified = modified if modified else datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
+ self._submit = int(submit)
+ self._start = int(start)
+ self._finish = int(finish)
+ self.status = status
+ self.rowtype = rowtype
+ self.ncpus = ncpus
+ self.wallclock = wallclock
+ self.qos = qos if qos else "debug"
+ self._energy = energy if energy else 0
+ self.date = date if date else ""
+ self.section = section if section else ""
+ self.member = member if member else ""
+ self.chunk = chunk if chunk else 0
+ self.last = last
+ self._platform = platform if platform and len(
+ platform) > 0 else "NA"
+ self.job_id = job_id if job_id else 0
+ self.extra_data = dumps(extra_data)
+
+ @property
+ def submit(self):
+ return int(self._submit)
+
+ @property
+ def start(self):
+ return int(self._start)
+
+ @property
+ def finish(self):
+ return int(self._finish)
+
+ @property
+ def platform(self):
+ return self._platform
+
+ @property
+ def energy(self):
+ return self._energy
+
+ @submit.setter
+ def submit(self, submit):
+ self._submit = int(submit)
+
+ @start.setter
+ def start(self, start):
+ self._start = int(start)
+
+ @finish.setter
+ def finish(self, finish):
+ self._finish = int(finish)
+
+ @platform.setter
+ def platform(self, platform):
+ self._platform = platform if platform and len(platform) > 0 else "NA"
+
+ @energy.setter
+ def energy(self, energy):
+ self._energy = energy if energy else 0
+
+
+class JobDataList():
+ """Object that stores the list of jobs to be handled.
+ """
+ def __init__(self, expid):
+ self.jobdata_list = list()
+ self.expid = expid
+
+ def add_jobdata(self, jobdata):
+ self.jobdata_list.append(jobdata)
+
+ def size(self):
+ return len(self.jobdata_list)
+
+
+class JobDataStructure():
+
+ def __init__(self, expid):
+ """Initializes the object based on the unique identifier of the experiment.
+
+ Args:
+ expid (str): Experiment identifier
+ """
+ BasicConfig.read()
+ self.expid = expid
+ self.folder_path = BasicConfig.JOBDATA_DIR
+ self.database_path = os.path.join(
+ self.folder_path, "job_data_" + str(expid) + ".db")
+ self.conn = None
+ self.jobdata_list = JobDataList(self.expid)
+ self.create_table_query = textwrap.dedent(
+ '''CREATE TABLE
+ IF NOT EXISTS job_data (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ counter INTEGER NOT NULL,
+ job_name TEXT NOT NULL,
+ created TEXT NOT NULL,
+ modified TEXT NOT NULL,
+ submit INTEGER NOT NULL,
+ start INTEGER NOT NULL,
+ finish INTEGER NOT NULL,
+ status TEXT NOT NULL,
+ rowtype INTEGER NOT NULL,
+ ncpus INTEGER NOT NULL,
+ wallclock TEXT NOT NULL,
+ qos TEXT NOT NULL,
+ energy INTEGER NOT NULL,
+ date TEXT NOT NULL,
+ section TEXT NOT NULL,
+ member TEXT NOT NULL,
+ chunk INTEGER NOT NULL,
+ last INTEGER NOT NULL,
+ platform TEXT NOT NULL,
+ job_id INTEGER NOT NULL,
+ extra_data TEXT NOT NULL,
+ UNIQUE(counter,job_name)
+ );''')
+ if not os.path.exists(self.database_path):
+ open(self.database_path, "w")
+ self.conn = self.create_connection(self.database_path)
+ self.create_table()
+ if self._set_pragma_version(CURRENT_DB_VERSION):
+ Log.info("Database version set.")
+ else:
+ self.conn = self.create_connection(self.database_path)
+
+ def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0):
+ """Writes submit time of job.
+
+ Args:
+ job_name ([type]): [description]
+ submit (int, optional): [description]. Defaults to 0.
+ status (str, optional): [description]. Defaults to "UNKNOWN".
+ ncpus (int, optional): [description]. Defaults to 0.
+ wallclock (str, optional): [description]. Defaults to "00:00".
+ qos (str, optional): [description]. Defaults to "debug".
+ date (str, optional): [description]. Defaults to "".
+ member (str, optional): [description]. Defaults to "".
+ section (str, optional): [description]. Defaults to "".
+ chunk (int, optional): [description]. Defaults to 0.
+ platform (str, optional): [description]. Defaults to "NA".
+ job_id (int, optional): [description]. Defaults to 0.
+
+ Returns:
+ [type]: [description]
+ """
+ #print("Saving write submit " + job_name)
+ try:
+ job_data = self.get_job_data(job_name)
+ current_counter = 1
+ max_counter = self._get_maxcounter_jobdata()
+ #submit = parse_date(submit) if submit > 0 else 0
+ #print("submit job data " + str(job_data))
+ if job_data and len(job_data) > 0:
+ # print("job data has 1 element")
+ # max_counter = self._get_maxcounter_jobdata()
+ job_max_counter = max(job.counter for job in job_data)
+ current_last = [
+ job for job in job_data if job.counter == job_max_counter]
+ for current in current_last:
+ # Deactivate current last for this job
+ current.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
+ up_id = self._deactivate_current_last(current)
+ # Finding current counter
+ current_counter = (
+ job_max_counter + 1) if job_max_counter >= max_counter else max_counter
+ else:
+ current_counter = max_counter
+ # Insert new last
+ rowid = self._insert_job_data(JobData(
+ 0, current_counter, job_name, None, None, submit, 0, 0, status, 1, ncpus, wallclock, qos, 0, date, member, section, chunk, 1, platform, job_id))
+ # print(rowid)
+ if rowid:
+ return True
+ else:
+ return None
+ except Exception as exp:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning(str(exp))
+ return None
+
+ # if rowid > 0:
+ # print("Successfully inserted")
+
+ def write_start_time(self, job_name, start=0, status="UNKWNONW", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0):
+ """Writes start time into the database
+
+ Args:
+ job_name (str): Name of Job
+ start (int, optional): Start time. Defaults to 0.
+ status (str, optional): Status of job. Defaults to "UNKWNONW".
+ ncpus (int, optional): Number of cpis. Defaults to 0.
+ wallclock (str, optional): Wallclock value. Defaults to "00:00".
+ qos (str, optional): Name of QoS. Defaults to "debug".
+ date (str, optional): Date from config. Defaults to "".
+ member (str, optional): Member from config. Defaults to "".
+ section (str, optional): [description]. Defaults to "".
+ chunk (int, optional): [description]. Defaults to 0.
+ platform (str, optional): [description]. Defaults to "NA".
+ job_id (int, optional): [description]. Defaults to 0.
+
+ Returns:
+ [type]: [description]
+ """
+ try:
+ job_data_last = self.get_job_data_last(job_name)
+ # Updating existing row
+ if job_data_last:
+ job_data_last = job_data_last[0]
+ if job_data_last.start == 0:
+ job_data_last.start = start
+ job_data_last.status = status
+ job_data_last.job_id = job_id
+ job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
+ _updated = self._update_start_job_data(job_data_last)
+ return _updated
+ # It is necessary to create a new row
+ submit_inserted = self.write_submit_time(
+ job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id)
+ if submit_inserted:
+ # print("retro start")
+ self.write_start_time(job_name, start, status,
+ ncpus, wallclock, qos, date, member, section, chunk, platform, job_id)
+ return True
+ else:
+ return None
+ except Exception as exp:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning(str(exp))
+ return None
+
+ def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None):
+ """Writes the finish time into the database
+
+ Args:
+ job_name (str): Name of Job.
+ finish (int, optional): Finish time. Defaults to 0.
+ status (str, optional): Current Status. Defaults to "UNKNOWN".
+ ncpus (int, optional): Number of cpus. Defaults to 0.
+ wallclock (str, optional): Wallclock value. Defaults to "00:00".
+ qos (str, optional): Name of QoS. Defaults to "debug".
+ date (str, optional): Date from config. Defaults to "".
+ member (str, optional): Member from config. Defaults to "".
+ section (str, optional): Section from config. Defaults to "".
+ chunk (int, optional): Chunk from config. Defaults to 0.
+ platform (str, optional): Name of platform of job. Defaults to "NA".
+ job_id (int, optional): Id of job. Defaults to 0.
+ platform_object (obj, optional): Platform object. Defaults to None.
+
+ Returns:
+ Boolean/None: True if success, None if exception.
+ """
+ try:
+ # print("Writing finish time \t" + str(job_name) + "\t" + str(finish))
+ job_data_last = self.get_job_data_last(job_name)
+ energy = 0
+ submit_time = start_time = finish_time = 0
+ extra_data = dict()
+ # Updating existing row
+ if job_data_last:
+ job_data_last = job_data_last[0]
+ # if job_data_last.finish == 0:
+ # Call Slurm here, update times.
+ if platform_object:
+ # print("There is platform object")
+ try:
+ if type(platform_object) is not str:
+ if platform_object.type == "slurm":
+ #print("Checking Slurm for " + str(job_name))
+ submit_time, start_time, finish_time, energy, extra_data = platform_object.check_job_energy(job_id)
+ except Exception as exp:
+ Log.info(traceback.format_exc())
+ Log.warning(str(exp))
+ energy = 0
+ job_data_last.finish = int(finish_time) if finish_time > 0 else int(finish)
+ job_data_last.status = status
+ job_data_last.job_id = job_id
+ job_data_last.energy = energy
+ job_data_last.extra_data = dumps(extra_data)
+ job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
+ if submit_time > 0 and start_time > 0:
+ job_data_last.submit = int(submit_time)
+ job_data_last.start = int(start_time)
+ rowid = self._update_finish_job_data_plus(job_data_last)
+ else:
+ rowid = self._update_finish_job_data(job_data_last)
+ return True
+ # It is necessary to create a new row
+ submit_inserted = self.write_submit_time(
+ job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id)
+ write_inserted = self.write_start_time(job_name, finish, status, ncpus,
+ wallclock, qos, date, member, section, chunk, platform, job_id)
+ #print(submit_inserted)
+ #print(write_inserted)
+ if submit_inserted and write_inserted:
+ #print("retro finish")
+ self.write_finish_time(
+ job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object)
+ else:
+ return None
+ except Exception as exp:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning(str(exp))
+ return None
+
+ def get_all_job_data(self):
+ """[summary]
+
+ Raises:
+ Exception: [description]
+ """
+ try:
+ if os.path.exists(self.folder_path):
+
+ current_table = self._get_all_job_data()
+ current_job_data = dict()
+ for item in current_table:
+ # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item
+ job_item = JobItem(*item)
+ self.jobdata_list.add_jobdata(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status,
+ job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data))
+
+ else:
+ raise Exception("Job data folder not found :" +
+ str(self.jobdata_path))
+ except Exception as exp:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning(str(exp))
+ return None
+
+ def get_job_data(self, job_name):
+ """Retrieves all the rows that have the same job_name
+
+ Args:
+ job_name (str): [description]
+
+ Raises:
+ Exception: If path to data folder does not exist
+
+ Returns:
+ [type]: None if error, list of jobs if successful
+ """
+ try:
+ job_data = list()
+ if os.path.exists(self.folder_path):
+ current_job = self._get_job_data(job_name)
+ for item in current_job:
+ # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item
+ job_item = JobItem(*item)
+ job_data.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status,
+ job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data))
+ # job_data.append(JobData(_id, _counter, _job_name, _created, _modified,
+ # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform))
+ return job_data
+ else:
+ raise Exception("Job data folder not found :" +
+ str(self.jobdata_path))
+ except Exception as exp:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning(str(exp))
+ return None
+
+ def get_job_data_last(self, job_name):
+ """ Returns latest jobdata row for a job_name. The current version.
+
+ Args:
+ job_name ([type]): [description]
+
+ Raises:
+ Exception: [description]
+
+ Returns:
+ [type]: None if error, JobData if success
+ """
+ try:
+ jobdata = list()
+ if os.path.exists(self.folder_path):
+ current_job_last = self._get_job_data_last(job_name)
+ if current_job_last:
+ for current in current_job_last:
+ job_item = JobItem(*current)
+ # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = current_job_last
+ # return JobData(_id, _counter, _job_name, _created, _modified,
+ # _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform)
+ jobdata.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status,
+ job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data))
+ return jobdata
+ else:
+ return None
+ else:
+ raise Exception("Job data folder not found :" +
+ str(self.jobdata_path))
+ except Exception as exp:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning(str(exp))
+ return None
+
+ def _deactivate_current_last(self, jobdata):
+ """Sets last = 0 to row with id
+
+ Args:
+ jobdata ([type]): [description]
+
+ Returns:
+ [type]: [description]
+ """
+ try:
+ if self.conn:
+ sql = ''' UPDATE job_data SET last=0, modified = ? WHERE id = ?'''
+ tuplerow = (jobdata.modified, jobdata._id)
+ cur = self.conn.cursor()
+ cur.execute(sql, tuplerow)
+ self.conn.commit()
+ return cur.lastrowid
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on Insert : " + str(type(e).__name__))
+ return None
+
+ def _update_start_job_data(self, jobdata):
+ """Update start time of job data row
+
+ Args:
+ jobdata ([type]): [description]
+
+ Returns:
+ [type]: [description]
+ """
+ # current_time =
+ try:
+ if self.conn:
+ sql = ''' UPDATE job_data SET start=?, modified=?, job_id=?, status=? WHERE id=? '''
+ cur = self.conn.cursor()
+ cur.execute(sql, (int(jobdata.start),
+ jobdata.modified, jobdata.job_id, jobdata.status, jobdata._id))
+ self.conn.commit()
+ return True
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on Insert : " + str(type(e).__name__))
+ return None
+
+ def _update_finish_job_data_plus(self, jobdata):
+ """Updates the finish job data, also updates submit, start times.
+
+ Args:
+ jobdata (JobData): JobData object
+
+ Returns:
+ int/None: lastrowid if success, None if error
+ """
+ try:
+ if self.conn:
+ sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=? WHERE id=? '''
+ cur = self.conn.cursor()
+ cur.execute(sql, (jobdata.submit, jobdata.start, jobdata.finish, jobdata.modified, jobdata.job_id,
+ jobdata.status, jobdata.energy, jobdata.extra_data, jobdata._id))
+ self.conn.commit()
+ return cur.lastrowid
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on Update : " + str(type(e).__name__))
+ return None
+
+ def _update_finish_job_data(self, jobdata):
+ """Update register with id. Updates finish, modified, status.
+
+ Args:
+ jobdata ([type]): [description]
+
+ Returns:
+ [type]: None if error, lastrowid if success
+ """
+ try:
+ if self.conn:
+ # print("Updating finish time")
+ sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=? WHERE id=? '''
+ cur = self.conn.cursor()
+ cur.execute(sql, (jobdata.finish, jobdata.modified, jobdata.job_id,
+ jobdata.status, jobdata.energy, jobdata.extra_data, jobdata._id))
+ self.conn.commit()
+ return cur.lastrowid
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on Update : " + str(type(e).__name__))
+ return None
+
+ def _insert_job_data(self, jobdata):
+ """[summary]
+
+ Args:
+ jobdata ([type]): JobData object
+
+ Returns:
+ [type]: None if error, lastrowid if correct
+ """
+ try:
+ if self.conn:
+ #print("preparing to insert")
+ sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) '''
+ tuplerow = (jobdata.counter, jobdata.job_name, jobdata.created, jobdata.modified, jobdata.submit, jobdata.start,
+ jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id, jobdata.extra_data)
+ cur = self.conn.cursor()
+ #print("pre insert")
+ cur.execute(sql, tuplerow)
+ self.conn.commit()
+ #print("Inserted " + str(jobdata.job_name))
+ return cur.lastrowid
+ else:
+ #print("Not a valid connection.")
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on Insert : " + str(type(e).__name__) +
+ "\t " + str(jobdata.job_name) + "\t" + str(jobdata.counter))
+ return None
+
+ def _get__all_job_data(self):
+ """
+ Get all registers from job_data.\n
+ :return: row content: exp_id, name, status, seconds_diff
+ :rtype: 4-tuple (int, str, str, int)
+ """
+ try:
+ #conn = create_connection(path)
+ if self.conn:
+ self.conn.text_factory = str
+ cur = self.conn.cursor()
+ cur.execute(
+ "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data")
+ rows = cur.fetchall()
+ return rows
+ else:
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on Select : " + str(type(e).__name__))
+ return list()
+
+ def _get_job_data(self, job_name):
+ """[summary]
+
+ Args:
+ job_name ([type]): [description]
+
+ Returns:
+ [type]: None if error, list of tuple if found (list can be empty)
+ """
+ try:
+ if self.conn:
+ self.conn.text_factory = str
+ cur = self.conn.cursor()
+ cur.execute(
+ "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,))
+ rows = cur.fetchall()
+ # print(rows)
+ return rows
+ else:
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on Select : " + str(type(e).__name__))
+ return None
+
+ def _get_job_data_last(self, job_name):
+ """Returns the latest row for a job_name. The current version.
+
+ Args:
+ job_name ([type]): [description]
+
+ Returns:
+ [type]: [description]
+ """
+ try:
+ if self.conn:
+ self.conn.text_factory = str
+ cur = self.conn.cursor()
+ cur.execute(
+ "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,))
+ rows = cur.fetchall()
+ if rows and len(rows) > 0:
+ return rows
+ else:
+ return None
+ else:
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on Select : " + str(type(e).__name__))
+ return None
+
+ def _set_pragma_version(self, version = 2):
+ """Sets current version of the schema
+
+ Args:
+ version (int, optional): Current Version. Defaults to 1.
+
+ Returns:
+ Boolean/None: True if success, None if error
+ """
+ try:
+ if self.conn:
+ self.conn.text_factory = str
+ cur = self.conn.cursor()
+ cur.execute("pragma user_version={v:d}".format(v=version))
+ self.conn.commit()
+ return True
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on version : " + str(type(e).__name__))
+ return None
+
+ def _get_maxcounter_jobdata(self):
+ """Return the maxcounter of the experiment
+
+ Returns:
+ [type]: [description]
+ """
+ try:
+ if self.conn:
+ self.conn.text_factory = str
+ cur = self.conn.cursor()
+ cur.execute("SELECT MAX(counter) as maxcounter FROM job_data")
+ rows = cur.fetchall()
+ if len(rows) > 0:
+ #print("Row " + str(rows[0]))
+ result, = rows[0]
+ return int(result) if result else 1
+ else:
+ # Starting value
+ return 1
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on Select Max : " + str(type(e).__name__))
+ return None
+
+ def create_table(self):
+ """ create a table from the create_table_sql statement
+ :param conn: Connection object
+ :param create_table_sql: a CREATE TABLE statement
+ :return:
+ """
+ try:
+ if self.conn:
+ c = self.conn.cursor()
+ c.execute(self.create_table_query)
+ else:
+ raise IOError("Not a valid connection")
+ except IOError as exp:
+ Log.warning(exp)
+ return None
+ except sqlite3.Error as e:
+ if _debug == True:
+ Log.info(traceback.format_exc())
+ Log.warning("Error on create table : " + str(type(e).__name__))
+ return None
+
+ def create_connection(self, db_file):
+ """
+ Create a database connection to the SQLite database specified by db_file.
+ :param db_file: database file name
+ :return: Connection object or None
+ """
+ try:
+ conn = sqlite3.connect(db_file)
+ return conn
+ except:
+ return None
diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py
index fb3e3f0ff96f9debaff8e05706465f92a194f793..7beff44e8af9e360b9e37f453763b13a8b2c538c 100644
--- a/autosubmit/database/db_structure.py
+++ b/autosubmit/database/db_structure.py
@@ -1,3 +1,22 @@
+#!/usr/bin/env python
+
+# Copyright 2015 Earth Sciences Department, BSC-CNS
+
+# This file is part of Autosubmit.
+
+# Autosubmit is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# Autosubmit is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with Autosubmit. If not, see .
+
import os
import sys
import string
@@ -55,7 +74,8 @@ def get_structure(exp_id, structures_path):
return None
else:
# pkl folder not found
- raise Exception("pkl folder not found " + str(structures_path))
+ raise Exception("Structures folder not found " +
+ str(structures_path))
except Exception as exp:
print(traceback.format_exc())
diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py
index 000b607d9f6c1a775da3f55696c68539e80ba42a..101026694a1b96b53f249af47b3866fe90242826 100644
--- a/autosubmit/job/job.py
+++ b/autosubmit/job/job.py
@@ -38,6 +38,7 @@ from autosubmit.job.job_common import Status, Type
from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython
from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty
from autosubmit.config.basicConfig import BasicConfig
+from autosubmit.database.db_jobdata import JobDataStructure
from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates
from time import sleep
from threading import Thread
@@ -126,6 +127,7 @@ class Job(object):
self.packed = False
self.hold = False
self.distance_weight = 0
+
def __getstate__(self):
odict = self.__dict__
if '_platform' in odict:
@@ -553,9 +555,9 @@ class Job(object):
if new_status == Status.COMPLETED:
Log.debug("This job seems to have completed: checking...")
-
if not self.platform.get_completed_files(self.name):
- log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED')
+ log_name = os.path.join(
+ self._tmp_path, self.name + '_COMPLETED')
self.check_completion()
else:
@@ -641,7 +643,7 @@ class Job(object):
:param default_status: status to set if job is not completed. By default is FAILED
:type default_status: Status
"""
- log_name = os.path.join(self._tmp_path,self.name + '_COMPLETED')
+ log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED')
if os.path.exists(log_name):
self.status = Status.COMPLETED
@@ -963,6 +965,9 @@ class Job(object):
else:
f = open(path, 'w')
f.write(date2str(datetime.datetime.now(), 'S'))
+ # Writing database
+ JobDataStructure(self.expid).write_submit_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors,
+ self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id)
def write_start_time(self):
"""
@@ -982,6 +987,9 @@ class Job(object):
f.write(' ')
# noinspection PyTypeChecker
f.write(date2str(datetime.datetime.fromtimestamp(start_time), 'S'))
+ # Writing database
+ JobDataStructure(self.expid).write_start_time(self.name, time.time(), Status.VALUE_TO_KEY[self.status] if self.status in Status.VALUE_TO_KEY.keys() else "UNKNOWN", self.processors,
+ self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id)
return True
def write_end_time(self, completed):
@@ -995,16 +1003,25 @@ class Job(object):
path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS')
f = open(path, 'a')
f.write(' ')
+ finish_time = None
+ final_status = None
if end_time > 0:
# noinspection PyTypeChecker
f.write(date2str(datetime.datetime.fromtimestamp(end_time), 'S'))
+ # date2str(datetime.datetime.fromtimestamp(end_time), 'S')
+ finish_time = end_time
else:
f.write(date2str(datetime.datetime.now(), 'S'))
+ finish_time = time.time() # date2str(datetime.datetime.now(), 'S')
f.write(' ')
if completed:
+ final_status = "COMPLETED"
f.write('COMPLETED')
else:
+ final_status = "FAILED"
f.write('FAILED')
+ JobDataStructure(self.expid).write_finish_time(self.name, finish_time, final_status, self.processors,
+ self.wallclock, self._queue, self.date, self.member, self.section, self.chunk, self.platform_name, self.id, self.platform)
def check_started_after(self, date_limit):
"""
@@ -1356,7 +1373,8 @@ done
total = total * 1.15
hour = int(total)
minute = int((total - int(total)) * 60.0)
- second = int(((total - int(total)) * 60 - int((total - int(total)) * 60.0)) * 60.0)
+ second = int(((total - int(total)) * 60 -
+ int((total - int(total)) * 60.0)) * 60.0)
wallclock_delta = datetime.timedelta(hours=hour, minutes=minute,
seconds=second)
if elapsed > wallclock_delta:
@@ -1372,6 +1390,3 @@ done
time = int(output[index])
time = self._parse_timestamp(time)
return time
-
-
-
diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py
index 58f008eb546c26681c0dec3ef5322dccc083457c..03995f76a2f3bd4662ed7f47aa6bb62476f15141 100644
--- a/autosubmit/platforms/paramiko_platform.py
+++ b/autosubmit/platforms/paramiko_platform.py
@@ -38,7 +38,6 @@ class ParamikoPlatform(Platform):
self.submit_cmd = ""
self._ftpChannel = None
-
@property
def header(self):
"""
@@ -58,6 +57,7 @@ class ParamikoPlatform(Platform):
:rtype: object
"""
return self._wrapper
+
def restore_connection(self):
connected = True
if self._ssh is None:
@@ -68,15 +68,15 @@ class ParamikoPlatform(Platform):
while connected == False and retry < retries:
if self.connect(True):
connected = True
- retry+=1
+ retry += 1
if not connected:
- Log.error('Can not create ssh or sftp connection to {0}: Connection could not be established to platform {1}\n Please, check your expid platform.conf to see if there are mistakes in the configuration\n Also Ensure that the login node listed on HOST parameter is available(try to connect via ssh on a terminal)\n Also you can put more than one host using a comma as separator', self.host,self.name)
- Log.critical('Experiment cant no continue without unexpected behaviour, Stopping Autosubmit')
+ Log.error('Can not create ssh or sftp connection to {0}: Connection could not be established to platform {1}\n Please, check your expid platform.conf to see if there are mistakes in the configuration\n Also Ensure that the login node listed on HOST parameter is available(try to connect via ssh on a terminal)\n Also you can put more than one host using a comma as separator', self.host, self.name)
+ Log.critical(
+ 'Experiment cant no continue without unexpected behaviour, Stopping Autosubmit')
exit(0)
return connected
-
- def connect(self,reconnect=False):
+ def connect(self, reconnect=False):
"""
Creates ssh connection to host
@@ -96,26 +96,29 @@ class ParamikoPlatform(Platform):
self._host_config = self._ssh_config.lookup(self.host)
if "," in self._host_config['hostname']:
if reconnect:
- self._host_config['hostname'] = random.choice(self._host_config['hostname'].split(',')[1:])
+ self._host_config['hostname'] = random.choice(
+ self._host_config['hostname'].split(',')[1:])
else:
- self._host_config['hostname'] = self._host_config['hostname'].split(',')[0]
+ self._host_config['hostname'] = self._host_config['hostname'].split(',')[
+ 0]
if 'identityfile' in self._host_config:
self._host_config_id = self._host_config['identityfile']
if 'proxycommand' in self._host_config:
- self._proxy = paramiko.ProxyCommand(self._host_config['proxycommand'])
+ self._proxy = paramiko.ProxyCommand(
+ self._host_config['proxycommand'])
self._ssh.connect(self._host_config['hostname'], 22, username=self.user,
key_filename=self._host_config_id, sock=self._proxy)
else:
self._ssh.connect(self._host_config['hostname'], 22, username=self.user,
key_filename=self._host_config_id)
- self.transport = paramiko.Transport((self._host_config['hostname'], 22))
+ self.transport = paramiko.Transport(
+ (self._host_config['hostname'], 22))
self.transport.connect(username=self.user)
self._ftpChannel = self._ssh.open_sftp()
return True
except:
return False
-
def check_completed_files(self, sections=None):
if self.host == 'localhost':
@@ -130,7 +133,7 @@ class ParamikoPlatform(Platform):
else:
command += " -name *_COMPLETED"
- if self.send_command(command,True):
+ if self.send_command(command, True):
return self._ssh_output
else:
return None
@@ -144,7 +147,8 @@ class ParamikoPlatform(Platform):
open(multiple_delete_previous_run, 'w+').write("rm -f"+filenames)
os.chmod(multiple_delete_previous_run, 0o770)
self.send_file(multiple_delete_previous_run, False)
- command = os.path.join(self.get_files_path(),"multiple_delete_previous_run.sh")
+ command = os.path.join(self.get_files_path(),
+ "multiple_delete_previous_run.sh")
if self.send_command(command, ignore_log=True):
return self._ssh_output
@@ -167,10 +171,10 @@ class ParamikoPlatform(Platform):
try:
local_path = os.path.join(os.path.join(self.tmp_path, filename))
- remote_path = os.path.join(self.get_files_path(), os.path.basename(filename))
+ remote_path = os.path.join(
+ self.get_files_path(), os.path.basename(filename))
self._ftpChannel.put(local_path, remote_path)
- self._ftpChannel.chmod(remote_path,os.stat(local_path).st_mode)
-
+ self._ftpChannel.chmod(remote_path, os.stat(local_path).st_mode)
return True
except BaseException as e:
@@ -236,22 +240,22 @@ class ParamikoPlatform(Platform):
try:
#ftp = self._ssh.open_sftp()
- self._ftpChannel.remove(os.path.join(self.get_files_path(), filename))
- #ftp.close()
+ self._ftpChannel.remove(os.path.join(
+ self.get_files_path(), filename))
+ # ftp.close()
return True
except IOError:
return False
except BaseException as e:
if e.lower().contains("garbage"):
- Log.error("Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ")
+ Log.error(
+ "Wrong User or invalid .ssh/config. Or invalid user in platform.conf or public key not set ")
raise
- Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename)))
+ Log.debug('Could not remove file {0}'.format(
+ os.path.join(self.get_files_path(), filename)))
return False
-
-
-
- def move_file(self, src, dest,must_exist=False):
+ def move_file(self, src, dest, must_exist=False):
"""
Moves a file on the platform (includes .err and .out)
:param src: source name
@@ -263,17 +267,17 @@ class ParamikoPlatform(Platform):
if not self.restore_connection():
return False
try:
- path_root=self.get_files_path()
+ path_root = self.get_files_path()
self._ftpChannel.rename(os.path.join(path_root, src),
os.path.join(path_root, dest))
return True
except:
if must_exist:
- raise Exception('File {0} does not exists'.format(os.path.join(self.get_files_path(), src)))
+ raise Exception('File {0} does not exists'.format(
+ os.path.join(self.get_files_path(), src)))
else:
return False
-
def submit_job(self, job, script_name, hold=False):
"""
Submit a job from a given job object.
@@ -297,6 +301,22 @@ class ParamikoPlatform(Platform):
return int(job_id)
else:
return None
+
+ def check_job_energy(self, job_id):
+ """
+ Checks job energy and return values. Defined in child classes.
+
+ Args:
+ job_id (int): Id of Job
+
+ Returns:
+ 4-tuple (int, int, int, int): submit time, start time, finish time, energy
+ """
+ check_energy_cmd = self.get_job_energy_cmd(job_id)
+ self.send_command(check_energy_cmd)
+ return self.parse_job_finish_data(
+ self.get_ssh_output(), job_id)
+
def submit_Script(self, hold=False):
"""
Sends a SubmitfileScript, exec in platform and retrieve the Jobs_ID.
@@ -323,19 +343,23 @@ class ParamikoPlatform(Platform):
job_id = job.id
job_status = Status.UNKNOWN
if type(job_id) is not int and type(job_id) is not str:
- Log.error('check_job() The job id ({0}) is not an integer neither a string.', job_id)
+ Log.error(
+ 'check_job() The job id ({0}) is not an integer neither a string.', job_id)
job.new_status = job_status
- sleep_time=5
+ sleep_time = 5
while not (self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0) or (self.get_ssh_output() == "" and retries >= 0):
retries -= 1
- Log.debug('Retrying check job command: {0}', self.get_checkjob_cmd(job_id))
+ Log.debug(
+ 'Retrying check job command: {0}', self.get_checkjob_cmd(job_id))
Log.debug('retries left {0}', retries)
Log.debug('Will be retrying in {0} seconds', sleep_time)
sleep(sleep_time)
sleep_time = sleep_time+5
if retries >= 0:
- Log.debug('Successful check job command: {0}', self.get_checkjob_cmd(job_id))
- job_status = self.parse_job_output(self.get_ssh_output()).strip("\n")
+ Log.debug(
+ 'Successful check job command: {0}', self.get_checkjob_cmd(job_id))
+ job_status = self.parse_job_output(
+ self.get_ssh_output()).strip("\n")
# URi: define status list in HPC Queue Class
if job_status in self.job_status['COMPLETED'] or retries == 0:
job_status = Status.COMPLETED
@@ -350,16 +374,20 @@ class ParamikoPlatform(Platform):
else:
job_status = Status.UNKNOWN
else:
- Log.error(" check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id))
+ Log.error(
+ " check_job(), job is not on the queue system. Output was: {0}", self.get_checkjob_cmd(job_id))
job_status = Status.UNKNOWN
- Log.error('check_job() The job id ({0}) status is {1}.', job_id, job_status)
+ Log.error(
+ 'check_job() The job id ({0}) status is {1}.', job_id, job_status)
job.new_status = job_status
- def _check_jobid_in_queue(self,ssh_output,job_list_cmd):
+
+ def _check_jobid_in_queue(self, ssh_output, job_list_cmd):
for job in job_list_cmd[:-1].split(','):
if job not in ssh_output:
return False
return True
- def check_Alljobs(self, job_list,job_list_cmd,remote_logs, retries=5):
+
+ def check_Alljobs(self, job_list, job_list_cmd, remote_logs, retries=5):
"""
Checks jobs running status
@@ -373,23 +401,24 @@ class ParamikoPlatform(Platform):
"""
cmd = self.get_checkAlljobs_cmd(job_list_cmd)
- sleep_time=5
- while not (self.send_command(cmd) and retries >= 0) or ( not self._check_jobid_in_queue(self.get_ssh_output(),job_list_cmd) and retries >= 0):
+ sleep_time = 5
+ while not (self.send_command(cmd) and retries >= 0) or (not self._check_jobid_in_queue(self.get_ssh_output(), job_list_cmd) and retries >= 0):
retries -= 1
Log.debug('Retrying check job command: {0}', cmd)
Log.debug('retries left {0}', retries)
Log.debug('Will be retrying in {0} seconds', sleep_time)
sleep(sleep_time)
- sleep_time=sleep_time+5
+ sleep_time = sleep_time+5
job_list_status = self.get_ssh_output()
- Log.debug('Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output)
+ Log.debug(
+ 'Successful check job command: {0}, \n output: {1}', cmd, self._ssh_output)
if retries >= 0:
in_queue_jobs = []
list_queue_jobid = ""
for job in job_list:
job_id = job.id
- job_status = self.parse_Alljobs_output(job_list_status,job_id)
+ job_status = self.parse_Alljobs_output(job_list_status, job_id)
# URi: define status list in HPC Queue Class
if job_status in self.job_status['COMPLETED']:
job_status = Status.COMPLETED
@@ -397,7 +426,7 @@ class ParamikoPlatform(Platform):
job_status = Status.RUNNING
elif job_status in self.job_status['QUEUING']:
if job.hold:
- job_status = Status.HELD # release?
+ job_status = Status.HELD # release?
else:
job_status = Status.QUEUING
list_queue_jobid += str(job.id) + ','
@@ -410,39 +439,58 @@ class ParamikoPlatform(Platform):
else:
job_status = Status.UNKNOWN
- Log.error('check_job() The job id ({0}) status is {1}.', job_id, job_status)
- job.new_status=job_status
+ Log.error(
+ 'check_job() The job id ({0}) status is {1}.', job_id, job_status)
+ job.new_status = job_status
reason = str()
if self.type == 'slurm' and len(in_queue_jobs) > 0:
- cmd=self.get_queue_status_cmd(list_queue_jobid)
+ cmd = self.get_queue_status_cmd(list_queue_jobid)
self.send_command(cmd)
- queue_status=self._ssh_output
+ queue_status = self._ssh_output
for job in in_queue_jobs:
- reason = self.parse_queue_reason(queue_status,job.id)
+ reason = self.parse_queue_reason(queue_status, job.id)
if job.queuing_reason_cancel(reason):
- Log.error("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", job.name, reason)
- self.send_command(self.platform.cancel_cmd + " {0}".format(job.id))
+ Log.error(
+ "Job {0} will be cancelled and set to FAILED as it was queuing due to {1}", job.name, reason)
+ self.send_command(
+ self.platform.cancel_cmd + " {0}".format(job.id))
job.new_status = Status.FAILED
job.update_status(remote_logs)
return
elif reason == '(JobHeldUser)':
- job.new_status=Status.HELD
+ job.new_status = Status.HELD
if not job.hold:
- self.send_command("scontrol release "+"{0}".format(job.id)) # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS
- Log.info("Job {0} is being released (id:{1}) ", job.name,job.id)
+ # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS
+ self.send_command(
+ "scontrol release "+"{0}".format(job.id))
+ Log.info(
+ "Job {0} is being released (id:{1}) ", job.name, job.id)
else:
Log.info("Job {0} is HELD", job.name)
elif reason == '(JobHeldAdmin)':
- Log.info("Job {0} Failed to be HELD, canceling... ", job.name)
+ Log.info(
+ "Job {0} Failed to be HELD, canceling... ", job.name)
job.new_status = Status.WAITING
- job.platform.send_command(job.platform.cancel_cmd + " {0}".format(job.id))
+ job.platform.send_command(
+ job.platform.cancel_cmd + " {0}".format(job.id))
else:
for job in job_list:
job_status = Status.UNKNOWN
- Log.warning('check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status)
- job.new_status=job_status
+ Log.warning(
+ 'check_job() The job id ({0}) from platform {1} has an status of {2}.', job.id, self.name, job_status)
+ job.new_status = job_status
+
+ def get_job_energy_cmd(self, job_id):
+ """
+ Returns command to check job energy on remote platforms
+ :param job_id: id of job to check
+ :type job_id: int
+ :return: command to check job status
+ :rtype: str
+ """
+ raise NotImplementedError
def get_checkjob_cmd(self, job_id):
"""
@@ -465,6 +513,7 @@ class ParamikoPlatform(Platform):
:rtype: str
"""
raise NotImplementedError
+
def send_command(self, command, ignore_log=False):
"""
Sends given command to HPC
@@ -478,7 +527,8 @@ class ParamikoPlatform(Platform):
if not self.restore_connection():
return False
if "-rP" in command or "find" in command or "convertLink" in command:
- timeout = 60*60 # Max Wait 1hour if the command is a copy or simbolic links ( migrate can trigger long times)
+ # Max Wait 1hour if the command is a copy or simbolic links ( migrate can trigger long times)
+ timeout = 60*60
elif "rm" in command:
timeout = 60/2
else:
@@ -490,7 +540,8 @@ class ParamikoPlatform(Platform):
stdin.close()
channel.shutdown_write()
stdout_chunks = []
- stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer)))
+ stdout_chunks.append(stdout.channel.recv(
+ len(stdout.channel.in_buffer)))
stderr_readlines = []
while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
@@ -499,12 +550,14 @@ class ParamikoPlatform(Platform):
readq, _, _ = select.select([stdout.channel], [], [], 2)
for c in readq:
if c.recv_ready():
- stdout_chunks.append(stdout.channel.recv(len(c.in_buffer)))
+ stdout_chunks.append(
+ stdout.channel.recv(len(c.in_buffer)))
#stdout_chunks.append(" ")
got_chunk = True
if c.recv_stderr_ready():
# make sure to read stderr to prevent stall
- stderr_readlines.append(stderr.channel.recv_stderr(len(c.in_stderr_buffer)))
+ stderr_readlines.append(
+ stderr.channel.recv_stderr(len(c.in_stderr_buffer)))
#stdout_chunks.append(" ")
got_chunk = True
if not got_chunk and stdout.channel.exit_status_ready() and not stderr.channel.recv_stderr_ready() and not stdout.channel.recv_ready():
@@ -522,16 +575,20 @@ class ParamikoPlatform(Platform):
self._ssh_output += s
for errorLine in stderr_readlines:
if errorLine.find("submission failed") != -1 or errorLine.find("git clone") != -1:
- Log.critical('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines))
+ Log.critical('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(
+ stderr_readlines))
return False
if not ignore_log:
if len(stderr_readlines) > 0:
- Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(stderr_readlines))
+ Log.warning('Command {0} in {1} warning: {2}', command, self.host, '\n'.join(
+ stderr_readlines))
else:
- Log.debug('Command {0} in {1} successful with out message: {2}', command, self.host, self._ssh_output)
+ Log.debug('Command {0} in {1} successful with out message: {2}',
+ command, self.host, self._ssh_output)
return True
except BaseException as e:
- Log.error('Can not send command {0} to {1}: {2}', command, self.host, e.message)
+ Log.error(
+ 'Can not send command {0} to {1}: {2}', command, self.host, e.message)
return False
def parse_job_output(self, output):
@@ -544,7 +601,21 @@ class ParamikoPlatform(Platform):
:rtype: str
"""
raise NotImplementedError
- def parse_Alljobs_output(self, output,job_id):
+
+ def parse_job_finish_data(self, output, job_id):
+ """
+ Parses check job command output so it can be interpreted by autosubmit
+
+ :param output: output to parse
+ :type output: str
+ :param job_id: Id of Job
+ :type job_id: Integer
+ :return: job status
+ :rtype: str
+ """
+ raise NotImplementedError
+
+ def parse_Alljobs_output(self, output, job_id):
"""
Parses check jobs command output so it can be interpreted by autosubmit
:param output: output to parse
@@ -561,8 +632,7 @@ class ParamikoPlatform(Platform):
def get_submit_script(self):
pass
-
- def get_submit_cmd(self, job_script, job_type,hold=False):
+ def get_submit_cmd(self, job_script, job_type, hold=False):
"""
Get command to add job to scheduler
@@ -666,26 +736,37 @@ class ParamikoPlatform(Platform):
header = header.replace('%ERR_LOG_DIRECTIVE%', err_filename)
if hasattr(self.header, 'get_queue_directive'):
- header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job))
+ header = header.replace(
+ '%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job))
if hasattr(self.header, 'get_tasks_per_node'):
- header = header.replace('%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job))
+ header = header.replace(
+ '%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job))
if hasattr(self.header, 'get_threads_per_task'):
- header = header.replace('%THREADS%', self.header.get_threads_per_task(job))
+ header = header.replace(
+ '%THREADS%', self.header.get_threads_per_task(job))
if hasattr(self.header, 'get_scratch_free_space'):
- header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job))
+ header = header.replace(
+ '%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job))
if hasattr(self.header, 'get_custom_directives'):
- header = header.replace('%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job))
+ header = header.replace(
+ '%CUSTOM_DIRECTIVES%', self.header.get_custom_directives(job))
if hasattr(self.header, 'get_exclusivity'):
- header = header.replace('%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job))
+ header = header.replace(
+ '%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job))
if hasattr(self.header, 'get_account_directive'):
- header = header.replace('%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job))
+ header = header.replace(
+ '%ACCOUNT_DIRECTIVE%', self.header.get_account_directive(job))
if hasattr(self.header, 'get_memory_directive'):
- header = header.replace('%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job))
+ header = header.replace(
+ '%MEMORY_DIRECTIVE%', self.header.get_memory_directive(job))
if hasattr(self.header, 'get_memory_per_task_directive'):
- header = header.replace('%MEMORY_PER_TASK_DIRECTIVE%', self.header.get_memory_per_task_directive(job))
+ header = header.replace(
+ '%MEMORY_PER_TASK_DIRECTIVE%', self.header.get_memory_per_task_directive(job))
if hasattr(self.header, 'get_hyperthreading_directive'):
- header = header.replace('%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job))
+ header = header.replace(
+ '%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job))
return header
+
def closeConnection(self):
if self._ftpChannel is not None:
self._ftpChannel.close()
@@ -695,7 +776,6 @@ class ParamikoPlatform(Platform):
self.transport.stop_thread()
self.transport.sys.exit(0)
-
def check_remote_log_dir(self):
"""
Creates log dir on remote host
@@ -704,20 +784,27 @@ class ParamikoPlatform(Platform):
return False
if self.type == "slurm":
try:
- self._ftpChannel.chdir(self.remote_log_dir) # Test if remote_path exists
+ # Test if remote_path exists
+ self._ftpChannel.chdir(self.remote_log_dir)
except IOError:
if self.send_command(self.get_mkdir_cmd()):
- Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host)
+ Log.debug('{0} has been created on {1} .',
+ self.remote_log_dir, self.host)
else:
- Log.error('Could not create the DIR {0} on HPC {1}'.format(self.remote_log_dir, self.host))
+ Log.error('Could not create the DIR {0} on HPC {1}'.format(
+ self.remote_log_dir, self.host))
except:
Log.critical("Garbage detected")
raise
else:
if self.send_command(self.get_mkdir_cmd()):
- Log.debug('{0} has been created on {1} .', self.remote_log_dir, self.host)
+ Log.debug('{0} has been created on {1} .',
+ self.remote_log_dir, self.host)
else:
- Log.error('Could not create the DIR {0} on HPC {1}'.format(self.remote_log_dir, self.host))
+ Log.error('Could not create the DIR {0} on HPC {1}'.format(
+ self.remote_log_dir, self.host))
+
+
class ParamikoPlatformException(Exception):
"""
Exception raised from HPC queues
diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py
index 9b8df22f95a672c1f25d471232a7b94e148f4abe..4146c2c37e7d919d74c7c17fa770ca802b0705e9 100644
--- a/autosubmit/platforms/platform.py
+++ b/autosubmit/platforms/platform.py
@@ -22,7 +22,8 @@ class Platform(object):
self.expid = expid
self.name = name
self.config = config
- self.tmp_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR)
+ self.tmp_path = os.path.join(
+ self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR)
self._serial_platform = None
self._serial_queue = None
self._default_queue = None
@@ -195,21 +196,22 @@ class Platform(object):
:rtype: bool
"""
raise NotImplementedError
-
+
# Executed when calling from Job
def get_logs_files(self, exp_id, remote_logs):
"""
Get the given LOGS files
-
+
:param exp_id: experiment id
:type exp_id: str
:param remote_logs: names of the log files
:type remote_logs: (str, str)
"""
(job_out_filename, job_err_filename) = remote_logs
- self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id))
+ self.get_files([job_out_filename, job_err_filename],
+ False, 'LOG_{0}'.format(exp_id))
- def get_completed_files(self, job_name, retries=0,recovery=False):
+ def get_completed_files(self, job_name, retries=0, recovery=False):
"""
Get the COMPLETED file of the given job
@@ -234,7 +236,6 @@ class Platform(object):
else:
return False
-
def remove_stat_file(self, job_name):
"""
Removes *STAT* files from remote
@@ -264,6 +265,7 @@ class Platform(object):
Log.debug('{0} been removed', filename)
return True
return False
+
def check_file_exists(self, src):
return True
@@ -279,7 +281,8 @@ class Platform(object):
:rtype: bool
"""
filename = job_name + '_STAT'
- stat_local_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename)
+ stat_local_path = os.path.join(
+ self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR, filename)
if os.path.exists(stat_local_path):
os.remove(stat_local_path)
if self.check_file_exists(filename):
@@ -297,7 +300,8 @@ class Platform(object):
:rtype: str
"""
if self.type == "local":
- path = os.path.join(self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid))
+ path = os.path.join(
+ self.root_dir, self.config.LOCAL_TMP_DIR, 'LOG_{0}'.format(self.expid))
else:
path = os.path.join(self.root_dir, 'LOG_{0}'.format(self.expid))
return path
@@ -328,8 +332,19 @@ class Platform(object):
:rtype: autosubmit.job.job_common.Status
"""
raise NotImplementedError
+
def closeConnection(self):
return
+
+ def retrieve_energy_data(self, jobid):
+ """
+ Retrieves energy data from job
+
+ :return: 4-tuple (submit, start, finish, energy)
+ :rtype: 4-tuple(int, int, int, int)
+ """
+ raise NotImplementedError
+
def write_jobid(self, jobid, complete_path):
"""
Writes Job id in an out file.
@@ -342,10 +357,10 @@ class Platform(object):
:rtype: Boolean
"""
try:
-
+
title_job = "[INFO] JOBID=" + str(jobid)
- if os.path.exists(complete_path):
+ if os.path.exists(complete_path):
file_type = complete_path[-3:]
if file_type == "out" or file_type == "err":
with open(complete_path, "r+") as f:
@@ -353,16 +368,15 @@ class Platform(object):
first_line = f.readline()
# Not rewrite
if not first_line.startswith("[INFO] JOBID="):
- content = f.read()
+ content = f.read()
# Write again (Potentially slow)
#start = time()
- #Log.info("Attempting job identification of " + str(jobid))
- f.seek(0,0)
- f.write(title_job + "\n\n" + first_line + content)
- f.close()
- #finish = time()
- #Log.info("Job correctly identified in " + str(finish - start) + " seconds")
+ #Log.info("Attempting job identification of " + str(jobid))
+ f.seek(0, 0)
+ f.write(title_job + "\n\n" + first_line + content)
+ f.close()
+ #finish = time()
+ #Log.info("Job correctly identified in " + str(finish - start) + " seconds")
except Exception as ex:
Log.info("Writing Job Id Failed : " + str(ex))
-
diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py
index d382d80c7d2ee063104b4f35a54db4726bcd26dc..251d64f2106c581c2c8b5baabbb0a187a8ae5919 100644
--- a/autosubmit/platforms/slurmplatform.py
+++ b/autosubmit/platforms/slurmplatform.py
@@ -19,6 +19,8 @@
import os
from time import sleep
+from time import mktime
+from datetime import datetime
from xml.dom.minidom import parseString
@@ -44,7 +46,8 @@ class SlurmPlatform(ParamikoPlatform):
self.job_status['COMPLETED'] = ['COMPLETED']
self.job_status['RUNNING'] = ['RUNNING']
self.job_status['QUEUING'] = ['PENDING', 'CONFIGURING', 'RESIZING']
- self.job_status['FAILED'] = ['FAILED', 'CANCELLED','CANCELLED+', 'NODE_FAIL', 'PREEMPTED', 'SUSPENDED', 'TIMEOUT','OUT_OF_MEMORY','OUT_OF_ME+','OUT_OF_ME']
+ self.job_status['FAILED'] = ['FAILED', 'CANCELLED', 'CANCELLED+', 'NODE_FAIL',
+ 'PREEMPTED', 'SUSPENDED', 'TIMEOUT', 'OUT_OF_MEMORY', 'OUT_OF_ME+', 'OUT_OF_ME']
self._pathdir = "\$HOME/LOG_" + self.expid
self._allow_arrays = False
self._allow_wrappers = True
@@ -52,7 +55,8 @@ class SlurmPlatform(ParamikoPlatform):
self.config = config
exp_id_path = os.path.join(config.LOCAL_ROOT_DIR, self.expid)
tmp_path = os.path.join(exp_id_path, "tmp")
- self._submit_script_path = os.path.join(tmp_path , config.LOCAL_ASLOG_DIR,"submit_"+self.name+".sh")
+ self._submit_script_path = os.path.join(
+ tmp_path, config.LOCAL_ASLOG_DIR, "submit_"+self.name+".sh")
self._submit_script_file = open(self._submit_script_path, 'w').close()
def open_submit_script(self):
@@ -62,10 +66,9 @@ class SlurmPlatform(ParamikoPlatform):
def get_submit_script(self):
self._submit_script_file.close()
os.chmod(self._submit_script_path, 0o750)
- return os.path.join(self.config.LOCAL_ASLOG_DIR,os.path.basename(self._submit_script_path))
+ return os.path.join(self.config.LOCAL_ASLOG_DIR, os.path.basename(self._submit_script_path))
-
- def submit_Script(self,hold=False):
+ def submit_Script(self, hold=False):
"""
Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once.
@@ -74,30 +77,34 @@ class SlurmPlatform(ParamikoPlatform):
:return: job id for submitted jobs
:rtype: list(str)
"""
- self.send_file(self.get_submit_script(),False)
- cmd = os.path.join(self.get_files_path(),os.path.basename(self._submit_script_path))
+ self.send_file(self.get_submit_script(), False)
+ cmd = os.path.join(self.get_files_path(),
+ os.path.basename(self._submit_script_path))
if self.send_command(cmd):
jobs_id = self.get_submitted_job_id(self.get_ssh_output())
return jobs_id
else:
return None
+
def update_cmds(self):
"""
Updates commands for platforms
"""
- self.root_dir = os.path.join(self.scratch, self.project, self.user, self.expid)
+ self.root_dir = os.path.join(
+ self.scratch, self.project, self.user, self.expid)
self.remote_log_dir = os.path.join(self.root_dir, "LOG_" + self.expid)
self.cancel_cmd = "scancel"
self._checkhost_cmd = "echo 1"
- self._submit_cmd = 'sbatch -D {1} {1}/'.format(self.host, self.remote_log_dir)
- self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format(self.host, self.remote_log_dir)
+ self._submit_cmd = 'sbatch -D {1} {1}/'.format(
+ self.host, self.remote_log_dir)
+ self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format(
+ self.host, self.remote_log_dir)
self.put_cmd = "scp"
self.get_cmd = "scp"
self.mkdir_cmd = "mkdir -p " + self.remote_log_dir
-
def get_checkhost_cmd(self):
return self._checkhost_cmd
@@ -109,15 +116,43 @@ class SlurmPlatform(ParamikoPlatform):
def parse_job_output(self, output):
return output.strip().split(' ')[0].strip()
-
- def parse_Alljobs_output(self, output,job_id):
- status =[x.split()[1] for x in output.splitlines() if x.split()[0] == str(job_id)]
+
+ def parse_job_finish_data(self, output, job_id):
+ try:
+ detailed_data = dict()
+ output = output.strip()
+ lines = output.split("\n")
+ if len(lines) > 0:
+ for line in lines:
+ line = line.strip().split()
+ if len(line) > 0:
+ name = str(line[0])
+ extra_data = { "energy" : str(line[5] if len(line) > 5 else "NA"), "MaxRSS" : str(line[6] if len(line) > 6 else "NA"), "AveRSS" : str(line[7] if len(line) > 6 else "NA")}
+ detailed_data[name] = extra_data
+
+ line = lines[0].strip().split()
+ submit = int(mktime(datetime.strptime(line[2], "%Y-%m-%dT%H:%M:%S").timetuple()))
+ start = int(mktime(datetime.strptime(line[3], "%Y-%m-%dT%H:%M:%S").timetuple()))
+ finish = int(mktime(datetime.strptime(line[4], "%Y-%m-%dT%H:%M:%S").timetuple()))
+ joules = int(float(str(line[5])[:-1]) * 1000 if len(line[5]) > 0 else 0)
+ # print(detailed_data)
+ return (submit, start, finish, joules, detailed_data)
+
+ return (0,0,0,0, dict())
+ except Exception as exp:
+ # On error return 4*0
+ Log.warning(str(exp))
+ return (0,0,0,0, dict())
+
+ #return str(output)
+
+ def parse_Alljobs_output(self, output, job_id):
+ status = [x.split()[1] for x in output.splitlines()
+ if x.split()[0] == str(job_id)]
if len(status) == 0:
return status
return status[0]
-
-
def get_submitted_job_id(self, outputlines):
if outputlines.find("failed") != -1:
raise Exception(outputlines)
@@ -125,6 +160,7 @@ class SlurmPlatform(ParamikoPlatform):
for output in outputlines.splitlines():
jobs_id.append(int(output.split(' ')[3]))
return jobs_id
+
def jobs_in_queue(self):
dom = parseString('')
jobs_xml = dom.getElementsByTagName("JB_job_number")
@@ -132,31 +168,34 @@ class SlurmPlatform(ParamikoPlatform):
def get_submit_cmd(self, job_script, job, hold=False):
if not hold:
- self._submit_script_file.write(self._submit_cmd + job_script + "\n")
+ self._submit_script_file.write(
+ self._submit_cmd + job_script + "\n")
else:
- self._submit_script_file.write(self._submit_hold_cmd + job_script + "\n" )
-
-
+ self._submit_script_file.write(
+ self._submit_hold_cmd + job_script + "\n")
def get_checkjob_cmd(self, job_id):
return 'sacct -n -X -j {1} -o "State"'.format(self.host, job_id)
def get_checkAlljobs_cmd(self, jobs_id):
return "sacct -n -X -j {1} -o jobid,State".format(self.host, jobs_id)
+
def get_queue_status_cmd(self, job_id):
return 'squeue -j {0} -o %A,%R'.format(job_id)
+ def get_job_energy_cmd(self, job_id):
+ return 'sacct -n -j {0} -o JobId%20,State,Submit,Start,End,ConsumedEnergy,MaxRSS,AveRSS'.format(job_id)
- def parse_queue_reason(self, output,job_id):
- reason =[x.split(',')[1] for x in output.splitlines() if x.split(',')[0] == str(job_id)]
+ def parse_queue_reason(self, output, job_id):
+ reason = [x.split(',')[1] for x in output.splitlines()
+ if x.split(',')[0] == str(job_id)]
if len(reason) > 0:
return reason[0]
return reason
-
@staticmethod
- def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads,method="asthreads"):
- if method =='srun':
+ def wrapper_header(filename, queue, project, wallclock, num_procs, dependency, directives, threads, method="asthreads"):
+ if method == 'srun':
language = "#!/bin/bash"
return \
language + """
@@ -181,7 +220,7 @@ class SlurmPlatform(ParamikoPlatform):
else:
language = "#!/usr/bin/env python"
return \
- language+"""
+ language+"""
###############################################################################
# {0}
###############################################################################
@@ -199,13 +238,13 @@ class SlurmPlatform(ParamikoPlatform):
#
###############################################################################
""".format(filename, queue, project, wallclock, num_procs, dependency,
- '\n'.ljust(13).join(str(s) for s in directives),threads)
+ '\n'.ljust(13).join(str(s) for s in directives), threads)
@staticmethod
def allocated_nodes():
return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list")"""
- def check_file_exists(self,filename):
+ def check_file_exists(self, filename):
if not self.restore_connection():
return False
file_exist = False
@@ -214,11 +253,13 @@ class SlurmPlatform(ParamikoPlatform):
max_retries = 3
while not file_exist and retries < max_retries:
try:
- self._ftpChannel.stat(os.path.join(self.get_files_path(), filename)) # This return IOError if path doesn't exist
+ # This return IOError if path doesn't exist
+ self._ftpChannel.stat(os.path.join(
+ self.get_files_path(), filename))
file_exist = True
except IOError: # File doesn't exist, retry in sleeptime
Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime,
- max_retries - retries, os.path.join(self.get_files_path(),filename))
+ max_retries - retries, os.path.join(self.get_files_path(), filename))
sleep(sleeptime)
sleeptime = sleeptime + 5
retries = retries + 1
@@ -227,4 +268,4 @@ class SlurmPlatform(ParamikoPlatform):
file_exist = False # won't exist
retries = 999 # no more retries
- return file_exist
\ No newline at end of file
+ return file_exist