diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 1ba0a11884a40d26a55baf2a8620b26af96181ca..bea6bbbb2513adc85c66fcb79a15c84211090d21 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -36,12 +36,12 @@ test_python2:
# - source activate earthdiagnostics3
# - python run_test.py
-report_codacy:
- stage: report
- script:
- - source activate autosubmit2
- - pip install codacy-coverage --upgrade
- - python-codacy-coverage -r test/coverage.xml
+# report_codacy:
+# stage: report
+# script:
+# - source activate autosubmit2
+# - pip install codacy-coverage --upgrade
+# - python-codacy-coverage -r test/coverage.xml
clean:
stage: clean
diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py
index 6f1649e66d51ac78a66ed35741606e94a5bc7778..da652c0deafd7de9a130fa59099154d4dadbbdd2 100644
--- a/autosubmit/autosubmit.py
+++ b/autosubmit/autosubmit.py
@@ -29,6 +29,7 @@ except ImportError:
from job.job_packager import JobPackager
from job.job_exceptions import WrongTemplateException
from platforms.paramiko_submitter import ParamikoSubmitter
+from platforms.platform import Platform
from notifications.notifier import Notifier
from notifications.mail_notifier import MailNotifier
from bscearth.utils.date import date2str
@@ -42,8 +43,9 @@ from job.job_grouping import JobGrouping
from job.job_list_persistence import JobListPersistencePkl
from job.job_list_persistence import JobListPersistenceDb
from job.job_package_persistence import JobPackagePersistence
-from job.job_packages import JobPackageThread
+from job.job_packages import JobPackageThread, JobPackageBase
from job.job_list import JobList
+from job.job import Job
from git.autosubmit_git import AutosubmitGit
from job.job_common import Status
from config.config_parser import ConfigParserFactory
@@ -52,6 +54,7 @@ from config.basicConfig import BasicConfig
import locale
from distutils.util import strtobool
from log.log import Log, AutosubmitError, AutosubmitCritical
+from typing import Set
import log.fd_show
try:
@@ -80,7 +83,9 @@ from collections import defaultdict
from pyparsing import nestedExpr
from history.experiment_status import ExperimentStatus
from history.experiment_history import ExperimentHistory
+from typing import List
import history.utils as HUtils
+import helpers.autosubmit_helper as AutosubmitHelper
"""
Main module for autosubmit. Only contains an interface class to all functionality implemented on autosubmit
"""
@@ -1357,90 +1362,13 @@ class Autosubmit:
"Or with the -v parameter: autosubmit run {2} -v ".format(as_conf.get_version(), Autosubmit.autosubmit_version, expid), 7067)
# Handling starting time
- if start_time:
- Log.info("User provided starting time has been detected.")
- current_time = time.time()
- datetime_now = datetime.datetime.now()
- target_date = parsed_time = None
- try:
- # Trying first parse H:M:S
- parsed_time = datetime.datetime.strptime(
- start_time, "%H:%M:%S")
- target_date = datetime.datetime(datetime_now.year, datetime_now.month,
- datetime_now.day, parsed_time.hour, parsed_time.minute, parsed_time.second)
- except:
- try:
- # Trying second parse y-m-d H:M:S
- target_date = datetime.datetime.strptime(
- start_time, "%Y-%m-%d %H:%M:%S")
- except:
- target_date = None
- Log.critical(
- "The string input provided as the starting time of your experiment must have the format 'H:M:S' or 'yyyy-mm-dd H:M:S'. Your input was '{0}'.".format(start_time))
- return
- # Must be in the future
- if (target_date < datetime.datetime.now()):
- Log.critical("You must provide a valid date into the future. Your input was interpreted as '{0}', which is considered past.\nCurrent time {1}.".format(
- target_date.strftime("%Y-%m-%d %H:%M:%S"), datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
- return
- # Starting waiting sequence
- Log.info("Your experiment will start execution on {0}\n".format(
- target_date.strftime("%Y-%m-%d %H:%M:%S")))
- # Check time every second
- while datetime.datetime.now() < target_date:
- elapsed_time = target_date - datetime.datetime.now()
- sys.stdout.write(
- "\r{0} until execution starts".format(elapsed_time))
- sys.stdout.flush()
- sleep(1)
- # End of handling start ing time block
+ AutosubmitHelper.handle_start_time(start_time)
# Start start after completion trigger block
- if start_after:
- Log.info("User provided expid completion trigger has been detected.")
- # The user tries to be tricky
- if str(start_after) == str(expid):
- Log.info(
- "Hey! What do you think is going to happen? In theory, your experiment will run again after it has been completed. Good luck!")
- # Check if experiment exists. If False or None, it does not exist
- if not check_experiment_exists(start_after):
- return None
- # Historical Database: We use the historical database to retrieve the current progress data of the supplied expid (start_after)
- exp_history = ExperimentHistory(start_after, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
- if exp_history.is_header_ready() == False:
- Log.critical("Experiment {0} is running a database version which is not supported by the completion trigger function. An updated DB version is needed.".format(
- start_after))
- return
- Log.info("Autosubmit will start monitoring experiment {0}. When the number of completed jobs plus suspended jobs becomes equal to the total number of jobs of experiment {0}, experiment {1} will start. Querying every 60 seconds. Status format Completed/Queuing/Running/Suspended/Failed.".format(
- start_after, expid))
- while True:
- # Query current run
- current_run = exp_history.manager.get_experiment_run_dc_with_max_id()
- if current_run and current_run.finish > 0 and current_run.total > 0 and current_run.completed + current_run.suspended == current_run.total:
- break
- else:
- sys.stdout.write(
- "\rExperiment {0} ({1} total jobs) status {2}/{3}/{4}/{5}/{6}".format(start_after, current_run.total, current_run.completed, current_run.queuing, current_run.running, current_run.suspended, current_run.failed))
- sys.stdout.flush()
- # Update every 60 seconds
- sleep(60)
- # End of completion trigger block
-
+ AutosubmitHelper.handle_start_after(start_after, expid, BasicConfig)
+
# Handling run_members
- allowed_members = None
-
- if run_members:
- allowed_members = run_members.split()
- rmember = [
- rmember for rmember in allowed_members if rmember not in as_conf.get_member_list()]
- if len(rmember) > 0:
- Log.critical(
- "Some of the members ({0}) in the list of allowed members you supplied do not exist in the current list of members specified in the conf files.\nCurrent list of members: {1}".format(str(rmember), str(as_conf.get_member_list())))
- return
- if len(allowed_members) == 0:
- Log.critical(
- "Not a valid -rm --run_members input: {0}".format(str(run_members)))
- return
+ allowed_members = AutosubmitHelper.get_allowed_members(run_members, as_conf)
# checking if there is a lock file to avoid multiple running on the same expid
try:
@@ -1575,8 +1503,7 @@ class Autosubmit:
job_list.update_list(as_conf, first_time=True)
job_list.save()
- Log.info(
- "Autosubmit is running with v{0}", Autosubmit.autosubmit_version)
+ Log.info("Autosubmit is running with v{0}", Autosubmit.autosubmit_version)
# Before starting main loop, setup historical database tables and main information
Log.debug("Running job data structure")
try:
@@ -1675,10 +1602,8 @@ class Autosubmit:
list_jobid = ""
completed_joblist = []
list_prevStatus = []
- queuing_jobs = job_list.get_in_queue_grouped_id(
- platform)
- Log.debug(
- 'Checking jobs for platform={0}'.format(platform.name))
+ queuing_jobs = job_list.get_in_queue_grouped_id(platform)
+ Log.debug('Checking jobs for platform={0}'.format(platform.name))
for job_id, job in queuing_jobs.items():
# Check Wrappers one-by-one
if job_list.job_package_map and job_id in job_list.job_package_map:
@@ -1778,8 +1703,7 @@ class Autosubmit:
job = platform_jobs[3][j_Indx]
if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'):
# Keeping track of changes
- job_changes_tracker[job.name] = (
- prev_status, job.status)
+ job_changes_tracker[job.name] = (prev_status, job.status)
if as_conf.get_notifications() == 'true':
if Status.VALUE_TO_KEY[job.status] in job.notify_on:
Notifier.notify_status_change(MailNotifier(BasicConfig), expid, job.name,
@@ -2039,8 +1963,8 @@ class Autosubmit:
"Issues while checking the connectivity of platforms.", 7010, issues)
@staticmethod
- def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False,
- only_wrappers=False, hold=False):
+ def submit_ready_jobs(as_conf, job_list, platforms_to_test, packages_persistence, inspect=False, only_wrappers=False, hold=False):
+ # type: (AutosubmitConfig, JobList, Set[Platform], JobPackagePersistence, bool, bool, bool) -> bool
"""
Gets READY jobs and send them to the platforms if there is available space on the queues
@@ -2074,7 +1998,7 @@ class Autosubmit:
as_conf, platform, job_list, hold=hold).build_packages()
if not inspect:
platform.open_submit_script()
- valid_packages_to_submit = []
+ valid_packages_to_submit = [] # type: List[JobPackageBase]
for package in packages_to_submit:
try:
# If called from inspect command or -cw
@@ -2171,15 +2095,12 @@ class Autosubmit:
try:
can_continue = True
while can_continue and retries > 0:
- cmd = package.jobs[0].platform.get_queue_status_cmd(
- jobs_id[i])
+ cmd = package.jobs[0].platform.get_queue_status_cmd(jobs_id[i])
#Log.debug("FD submit: {0}".format(log.fd_show.fd_table_status_str()))
- package.jobs[0].platform.send_command(
- cmd)
+ package.jobs[0].platform.send_command(cmd)
#Log.debug("FD endsubmit: {0}".format(log.fd_show.fd_table_status_str()))
queue_status = package.jobs[0].platform._ssh_output
- reason = package.jobs[0].platform.parse_queue_reason(
- queue_status, jobs_id[i])
+ reason = package.jobs[0].platform.parse_queue_reason(queue_status, jobs_id[i])
if reason == '(JobHeldAdmin)':
can_continue = False
elif reason == '(JobHeldUser)':
@@ -2190,8 +2111,7 @@ class Autosubmit:
retries = retries - 1
if not can_continue:
#Log.debug("FD hold start: {0}".format(log.fd_show.fd_table_status_str()))
- package.jobs[0].platform.send_command(
- package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i]))
+ package.jobs[0].platform.send_command(package.jobs[0].platform.cancel_cmd + " {0}".format(jobs_id[i]))
i = i + 1
#Log.debug("FD hold on: {0}".format(log.fd_show.fd_table_status_str()))
@@ -2207,7 +2127,7 @@ class Autosubmit:
job.hold = hold
job.id = str(jobs_id[i])
job.status = Status.SUBMITTED
- job.write_submit_time()
+ job.write_submit_time(hold=hold)
i += 1
save = True
if len(failed_packages) > 0:
@@ -4286,6 +4206,7 @@ class Autosubmit:
@staticmethod
def change_status(final, final_status, job, save):
+ # type: (str, int, Job, bool) -> None
"""
Set job status to final
@@ -4293,20 +4214,17 @@ class Autosubmit:
:param final_status:
:param job:
"""
-
- if (job.status == Status.SUBMITTED or job.status == Status.QUEUING or job.status == Status.HELD) and save and (final_status != Status.QUEUING and final_status != Status.HELD and final_status != Status.SUSPENDED):
- job.hold = False
- if job.platform_name is not None and job.platform_name.lower() != "local":
- job.platform.send_command(
- job.platform.cancel_cmd + " " + str(job.id), ignore_log=True)
- elif (job.status == Status.QUEUING or job.status == Status.RUNNING or job.status == Status.SUBMITTED) and save and (final_status == Status.SUSPENDED):
- if job.platform_name is not None and job.platform_name.lower() != "local":
- job.platform.send_command(
- "scontrol hold " + "{0}".format(job.id), ignore_log=True)
- elif (final_status == Status.QUEUING or final_status == Status.RUNNING) and save and (job.status == Status.SUSPENDED):
- if job.platform_name is not None and job.platform_name.lower() != "local":
- job.platform.send_command(
- "scontrol release " + "{0}".format(job.id), ignore_log=True)
+ if save:
+ if job.status in [Status.SUBMITTED, Status.QUEUING, Status.HELD] and final_status not in [Status.QUEUING, Status.HELD, Status.SUSPENDED]:
+ job.hold = False
+ if job.platform_name and job.platform_name.lower() != "local":
+ job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True)
+ elif job.status in [Status.QUEUING, Status.RUNNING, Status.SUBMITTED] and final_status == Status.SUSPENDED:
+ if job.platform_name and job.platform_name.lower() != "local":
+ job.platform.send_command("scontrol hold " + "{0}".format(job.id), ignore_log=True)
+ elif final_status in [Status.QUEUING, Status.RUNNING] and (job.status == Status.SUSPENDED):
+ if job.platform_name and job.platform_name.lower() != "local":
+ job.platform.send_command("scontrol release " + "{0}".format(job.id), ignore_log=True)
job.status = final_status
Log.info("CHANGED: job: " + job.name + " status to: " + final)
Log.status("CHANGED: job: " + job.name + " status to: " + final)
diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py
deleted file mode 100644
index a7e5f89a441f3e6fc6b57e5bcdaf1a9cc55594bd..0000000000000000000000000000000000000000
--- a/autosubmit/database/db_jobdata.py
+++ /dev/null
@@ -1,2192 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright 2015-2020 Earth Sciences Department, BSC-CNS
-
-# This file is part of Autosubmit.
-
-# Autosubmit is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# Autosubmit is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with Autosubmit. If not, see .
-
-import os
-import sys
-import string
-import time
-import pickle
-import textwrap
-import traceback
-import sqlite3
-import copy
-import collections
-from datetime import datetime, timedelta
-from json import dumps, loads
-#from networkx import DiGraph
-from autosubmit.config.basicConfig import BasicConfig
-from autosubmit.job.job_common import Status, parse_output_number
-from autosubmit.job.job_package_persistence import JobPackagePersistence
-from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, subs_dates
-from log.log import Log, AutosubmitCritical, AutosubmitError
-
-# VERSION 15 Adds columns MaxRSS, AveRSS, out, err => job_data
-CURRENT_DB_VERSION = 15 # Used to be 10
-EXPERIMENT_HEADER_CHANGES_DB_VERSION = 14
-# Defining RowType standard
-
-
-class RowType:
- NORMAL = 2
- #PACKED = 2
-
-
-class RowStatus:
- INITIAL = 0
- COMPLETED = 1
- PROCESSED = 2
- FAULTY = 3
- CHANGED = 4
-
-
-_debug = False
-JobItem = collections.namedtuple('JobItem', ['id', 'counter', 'job_name', 'created', 'modified', 'submit', 'start', 'finish',
- 'status', 'rowtype', 'ncpus', 'wallclock', 'qos', 'energy', 'date', 'section', 'member', 'chunk', 'last', 'platform', 'job_id', 'extra_data', 'nnodes', 'run_id', 'MaxRSS', 'AveRSS', 'out', 'err', 'rowstatus'])
-
-ExperimentRunItem = collections.namedtuple('ExperimentRunItem', [
- 'run_id', 'created', 'start', 'finish', 'chunk_unit', 'chunk_size', 'completed', 'total', 'failed', 'queuing', 'running', 'submitted', 'suspended', 'metadata'])
-
-ExperimentRow = collections.namedtuple(
- 'ExperimentRow', ['exp_id', 'expid', 'status', 'seconds'])
-
-
-class ExperimentRun():
- """
- Class that represents an experiment run
- """
-
- def __init__(self, run_id, created=None, start=0, finish=0, chunk_unit="NA", chunk_size=0, completed=0, total=0, failed=0, queuing=0, running=0, submitted=0, suspended=0, metadata=""):
- self.run_id = run_id
- self.created = created if created else datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
- self.start = start
- self.finish = finish
- self.chunk_unit = chunk_unit
- self.chunk_size = chunk_size
- self.submitted = submitted
- self.queuing = queuing
- self.running = running
- self.completed = completed
- self.failed = failed
- self.total = total
- self.suspended = suspended
- self.metadata = metadata
-
-
-class JobStepExtraData():
- """
- Class that manages the extra_data content
- The constructor should receive a dict object
- """
-
- def __init__(self, key, dict_data):
- self.key = key
- self.ncpus = dict_data["ncpus"] if dict_data and "ncpus" in dict_data.keys(
- ) else 0
- self.nnodes = dict_data["nnodes"] if dict_data and "nnodes" in dict_data.keys(
- ) else 0
- self.submit = int(time.mktime(datetime.strptime(dict_data["submit"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "submit" in dict_data.keys(
- ) else 0
- self.start = int(time.mktime(datetime.strptime(dict_data["start"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "start" in dict_data.keys(
- ) else 0
- self.finish = int(time.mktime(datetime.strptime(dict_data["finish"], "%Y-%m-%dT%H:%M:%S").timetuple())) if dict_data and "finish" in dict_data.keys(
- ) and dict_data["finish"] != "Unknown" else 0
- self.energy = parse_output_number(dict_data["energy"]) if dict_data and "energy" in dict_data.keys(
- ) else 0
- self.maxRSS = dict_data["MaxRSS"] if dict_data and "MaxRSS" in dict_data.keys(
- ) else 0
- self.aveRSS = dict_data["AveRSS"] if dict_data and "AveRSS" in dict_data.keys(
- ) else 0
-
-
-class JobData(object):
- """Job Data object
- """
-
- def __init__(self, _id, counter=1, job_name="None", created=None, modified=None, submit=0, start=0, finish=0, status="UNKNOWN", rowtype=0, ncpus=0, wallclock="00:00", qos="debug", energy=0, date="", section="", member="", chunk=0, last=1, platform="NA", job_id=0, extra_data=dict(), nnodes=0, run_id=None, MaxRSS=0.0, AveRSS=0.0, out="", err="", rowstatus=RowStatus.INITIAL):
- """
- """
- self._id = _id
- self.counter = counter
- self.job_name = job_name
- self.created = created if created else datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
- self.modified = modified if modified else datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
- self._submit = int(submit)
- self._start = int(start)
- self._finish = int(finish)
- self.status = status
- self.rowtype = rowtype
- self.ncpus = ncpus
- self.wallclock = wallclock
- self.qos = qos if qos else "debug"
- self._energy = energy if energy else 0
- self.date = date if date else ""
- self.section = section if section else ""
- self.member = member if member else ""
- self.chunk = chunk if chunk else 0
- self.last = last
- self._platform = platform if platform and len(
- platform) > 0 else "NA"
- self.job_id = job_id if job_id else 0
- try:
- self.extra_data = loads(extra_data)
- except Exception as exp:
- self.extra_data = ""
- pass
- self.nnodes = nnodes
- self.run_id = run_id
- self.require_update = False
- # DB VERSION 15 attributes
- self.MaxRSS = MaxRSS
- self.AveRSS = AveRSS
- self.out = out
- self.err = err
- self.rowstatus = rowstatus
-
- @property
- def submit(self):
- """
- Returns the submit time timestamp as an integer.
- """
- return int(self._submit)
-
- @property
- def start(self):
- """
- Returns the start time timestamp as an integer.
- """
- return int(self._start)
-
- @property
- def finish(self):
- """
- Returns the finish time timestamp as an integer.
- """
- return int(self._finish)
-
- @property
- def platform(self):
- """
- Returns the name of the platform, "NA" if no platform is set.
- """
- return self._platform
-
- @property
- def energy(self):
- """
- Returns the energy spent value (JOULES) as an integer.
- """
- return self._energy
-
- @submit.setter
- def submit(self, submit):
- self._submit = int(submit)
-
- @start.setter
- def start(self, start):
- self._start = int(start)
-
- @finish.setter
- def finish(self, finish):
- self._finish = int(finish)
-
- @platform.setter
- def platform(self, platform):
- self._platform = platform if platform and len(platform) > 0 else "NA"
-
- @energy.setter
- def energy(self, energy):
- """
- Set the energy value. If it is different than the current energy value, a update flag will be activated.
- """
- if energy > 0:
- if (energy != self._energy):
- # print("Updating energy to {0} from {1}.".format(
- # energy, self._energy))
- self.require_update = True
- self._energy = energy if energy else 0
-
- def delta_queue_time(self):
- """
- Returns queuing time as a timedelta object.
- """
- return str(timedelta(seconds=self.queuing_time()))
-
- def delta_running_time(self):
- """
- Returns running time as a timedelta object.
- """
- return str(timedelta(seconds=self.running_time()))
-
- def submit_datetime(self):
- """
- Return the submit time as a datetime object, None if submit time equal 0.
- """
- if self.submit > 0:
- return datetime.fromtimestamp(self.submit)
- return None
-
- def start_datetime(self):
- """
- Return the start time as a datetime object, None if start time equal 0.
- """
- if self.start > 0:
- return datetime.fromtimestamp(self.start)
- return None
-
- def finish_datetime(self):
- """
- Return the finish time as a datetime object, None if start time equal 0.
- """
- if self.finish > 0:
- return datetime.fromtimestamp(self.finish)
- return None
-
- def submit_datetime_str(self):
- """
- Returns the submit datetime as a string with format %Y-%m-%d-%H:%M:%S
- """
- o_datetime = self.submit_datetime()
- if o_datetime:
- return o_datetime.strftime('%Y-%m-%d-%H:%M:%S')
- else:
- return None
-
- def start_datetime_str(self):
- """
- Returns the start datetime as a string with format %Y-%m-%d-%H:%M:%S
- """
- o_datetime = self.start_datetime()
- if o_datetime:
- return o_datetime.strftime('%Y-%m-%d-%H:%M:%S')
- else:
- return None
-
- def finish_datetime_str(self):
- """
- Returns the finish datetime as a string with format %Y-%m-%d-%H:%M:%S
- """
- o_datetime = self.finish_datetime()
- if o_datetime:
- return o_datetime.strftime('%Y-%m-%d-%H:%M:%S')
- else:
- return None
-
- def running_time(self):
- """
- Calculates and returns the running time of the job, in seconds.
-
- :return: Running time in seconds.
- :rtype: int
- """
- if self.status in ["RUNNING", "COMPLETED", "FAILED"]:
- # print("Finish: {0}".format(self.finish))
- run = int((self.finish if self.finish >
- 0 else time.time()) - self.start)
- # print("RUN {0}".format(run))
- if run > 0:
- return run
- return 0
-
- def queuing_time(self):
- """
- Calculates and returns the queuing time of the job, in seconds.
-
- :return: Queueing time in seconds.
- :rtype: int
- """
- if self.status in ["SUBMITTED", "QUEUING", "RUNNING", "COMPLETED", "HELD", "PREPARED", "FAILED", "SKIPPED"]:
- queue = int((self.start if self.start >
- 0 else time.time()) - self.submit)
- if queue > 0:
- return queue
- return 0
-
- def get_hdata(self):
- """
- Get the job data as an ordered dict into a JSON object.
- :return: Job data as an ordered dict into a JSON object.
- :rtype: JSON object.
- """
- hdata = collections.OrderedDict()
- hdata["name"] = self.job_name
- hdata["date"] = self.date
- hdata["section"] = self.section
- hdata["member"] = self.member
- hdata["chunk"] = self.chunk
- hdata["submit"] = self.submit_datetime_str()
- hdata["start"] = self.start_datetime_str()
- hdata["finish"] = self.finish_datetime_str()
- hdata["queue_time"] = self.delta_queue_time()
- hdata["run_time"] = self.delta_running_time()
- hdata["wallclock"] = self.wallclock
- hdata["ncpus"] = self.ncpus
- hdata["nnodes"] = self.nnodes
- hdata["energy"] = self.energy
- hdata["platform"] = self.platform
- hdata["MaxRSS"] = self.MaxRSS
- hdata["AveRSS"] = self.AveRSS
- return dumps(hdata)
-
-
-class JobDataList():
- """Object that stores the list of jobs to be handled.
- """
-
- def __init__(self, expid):
- self.jobdata_list = list()
- self.expid = expid
-
- def add_jobdata(self, jobdata):
- self.jobdata_list.append(jobdata)
-
- def size(self):
- return len(self.jobdata_list)
-
-
-class MainDataBase():
- def __init__(self, expid):
- self.expid = expid
- self.conn = None
- self.conn_ec = None
- self.create_table_query = None
- self.create_table_header_query = None
- self.version_schema_changes = []
-
- def create_connection(self, db_file):
- """
- Create a database connection to the SQLite database specified by db_file.
- :param db_file: database file name
- :return: Connection object or None
- """
- try:
- conn = sqlite3.connect(db_file)
- return conn
- except:
- return None
-
- def create_table(self, statement):
- """ create a table from the create_table_sql statement
- :param conn: Connection object
- :param create_table_sql: a CREATE TABLE statement
- :return:
- """
- try:
- if self.conn:
- c = self.conn.cursor()
- c.execute(statement)
- self.conn.commit()
- else:
- raise IOError("Not a valid connection")
- except IOError as exp:
- Log.warning(exp)
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(str(type(e).__name__))
- Log.warning("Error on create table . create_table")
- return None
-
- def create_index(self):
- """ Creates index from statement defined in child class
- """
- try:
- if self.conn:
- c = self.conn.cursor()
- c.execute(self.create_index_query)
- self.conn.commit()
- else:
- raise IOError("Not a valid connection")
- except IOError as exp:
- Log.warning(exp)
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(str(type(e).__name__))
- Log.warning("Error on create index . create_index")
- return None
-
- def update_table_schema(self):
- """
- Updates the table schema from a list of changes.
- """
- try:
- if self.conn:
- c = self.conn.cursor()
- for item in self.version_schema_changes:
- try:
- c.execute(item)
- except sqlite3.Error as e:
- # Always useful
- # print(traceback.format_exc())
- if _debug == True:
- Log.info(str(type(e).__name__))
- Log.debug(str(type(e).__name__))
- Log.warning(
- "Error on updating table schema statement. It is safe to ignore this message.")
- pass
- self.conn.commit()
- else:
- raise IOError("Not a valid connection")
- except IOError as exp:
- Log.warning(exp)
- return None
- except Exception as exp:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(str(exp))
- Log.warning(
- "Error on updating table schema . update_table_schema.")
- return None
-
-
-class ExperimentStatus(MainDataBase):
- def __init__(self, expid):
- MainDataBase.__init__(self, expid)
- BasicConfig.read()
- self.DB_FILE_AS_TIMES = os.path.join(
- BasicConfig.LOCAL_ROOT_DIR, "as_times.db")
- self.DB_FILE_ECEARTH = os.path.join(
- BasicConfig.LOCAL_ROOT_DIR, "ecearth.db")
- self.PKL_FILE_PATH = os.path.join(
- BasicConfig.LOCAL_ROOT_DIR, str(self.expid), "pkl", "job_list_" + str(self.expid) + ".pkl")
- self.create_table_query = textwrap.dedent(
- '''CREATE TABLE
- IF NOT EXISTS experiment_status (
- exp_id integer PRIMARY KEY,
- name text NOT NULL,
- status text NOT NULL,
- seconds_diff integer NOT NULL,
- modified text NOT NULL,
- FOREIGN KEY (exp_id) REFERENCES experiment (id)
- );''')
- try:
- if not os.path.exists(self.DB_FILE_AS_TIMES):
- open(self.DB_FILE_AS_TIMES, "w")
- self.conn = self.create_connection(self.DB_FILE_AS_TIMES)
- self.create_table()
- else:
- self.conn = self.create_connection(self.DB_FILE_AS_TIMES)
-
- if os.path.exists(self.DB_FILE_ECEARTH):
- self.conn_ec = self.create_connection(self.DB_FILE_ECEARTH)
-
- self.current_table = self.prepare_status_db()
- self.current_row = next(
- (exp for exp in self.current_table if exp.expid == self.expid), None) if len(self.current_table) > 0 else None
- except Exception as exp:
- Log.debug(
- "Historical database error on experiment status constructor: {}.".format(str(exp)))
- pass
-
- def print_current_table(self):
- for experiment in self.current_table:
- #experiment = ExperimentRow(k, *v)
- print(experiment.expid)
- print(experiment.exp_id)
- print(experiment.status)
- print(experiment.seconds)
- print("\n")
- if self.current_row:
- print("Current Row:\n\t" + self.current_row.expid + "\n\t" +
- str(self.current_row.exp_id) + "\n\t" + self.current_row.status)
-
- def prepare_status_db(self):
- """
- Returns the contents of the status table in an ordered way
- :return: Map from experiment name to (Id of experiment, Status, Seconds)
- :rtype: Dictionary Key: String, Value: Integer, String, Integer
- """
- current_table = self._get_exp_status()
- result = list()
- if current_table:
- for item in current_table:
- result.append(ExperimentRow(*item))
- return result
-
- def _get_id_db(self):
- """
- Get exp_id of the experiment (different than the experiment name).
- :param conn: ecearth.db connection
- :type conn: sqlite3 connection
- :param expid: Experiment name
- :type expid: String
- :return: Id of the experiment
- :rtype: Integer or None
- """
- try:
- if self.conn_ec:
- cur = self.conn_ec.cursor()
- # Always use tuple
- cur.execute(
- "SELECT id FROM experiment WHERE name=?", (self.expid, ))
- row = cur.fetchone()
- return int(row[0])
- return None
- except Exception as exp:
- Log.debug("From _get_id_db: {0}".format(str(exp)))
- Log.warning(
- "Autosubmit couldn't retrieve experiment database information. _get_id_db")
- return None
-
- def _get_exp_status(self):
- """
- Get all registers from experiment_status.\n
- :return: row content: exp_id, name, status, seconds_diff
- :rtype: 4-tuple (int, str, str, int)
- """
- try:
- if self.conn:
- #conn = create_connection(DB_FILE_AS_TIMES)
- self.conn.text_factory = str
- cur = self.conn.cursor()
- cur.execute(
- "SELECT exp_id, name, status, seconds_diff FROM experiment_status")
- rows = cur.fetchall()
- return rows
- return None
- except Exception as exp:
- Log.debug("From _get_exp_status: {0}".format(str(exp)))
- Log.warning(
- "Autosubmit couldn't retrieve experiment status database information. _get_exp_status")
- return None
-
- def test_running(self, time_condition=600):
- if (os.path.exists(self.PKL_FILE_PATH)):
- current_stat = os.stat(self.PKL_FILE_PATH)
- timest = int(current_stat.st_mtime)
- timesys = int(time.time())
- time_diff = int(timesys - timest)
- if (time_diff < time_condition):
- return True
- else:
- return False
-
- def update_running_status(self, status="RUNNING"):
- if self.current_row:
- # Row exists
- self._update_exp_status(status)
- else:
- # New Row
- self._create_exp_status()
-
- def _create_exp_status(self):
- """
- Create experiment status
- :param conn:
- :param details:
- :return:
- """
- try:
- if self.conn and self.conn_ec:
- exp_id = self._get_id_db()
- if exp_id:
- # print("exp_id {0}".format(exp_id))
- # conn = create_connection(DB_FILE_AS_TIMES)
- creation_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
- sql = ''' INSERT INTO experiment_status(exp_id, name, status, seconds_diff, modified) VALUES(?,?,?,?,?) '''
- # print(row_content)
- cur = self.conn.cursor()
- cur.execute(sql, (exp_id,
- self.expid, "RUNNING", 0, creation_date))
- # print(cur)
- self.conn.commit()
- return cur.lastrowid
- return None
- except sqlite3.Error as e:
- Log.debug("From _create_exp_status: {0}".format(
- str(type(e).__name__)))
- Log.warning(
- "Autosubmit couldn't insert information into status database. _create_exp_status")
- pass
-
- def _update_exp_status(self, status="RUNNING"):
- """
- Update existing experiment_status.
- :param expid: Experiment name
- :type expid: String
- :param status: Experiment status
- :type status: String
- :param seconds_diff: Indicator of how long it has been active since the last time it was checked
- :type seconds_diff: Integer
- :return: Id of register
- :rtype: Integer
- """
- try:
- if self.conn and self.current_row:
- # conn = create_connection(DB_FILE_AS_TIMES)
- modified_date = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
- sql = ''' UPDATE experiment_status SET status = ?, seconds_diff = ?, modified = ? WHERE name = ? '''
- cur = self.conn.cursor()
- cur.execute(sql, (status, 0, modified_date,
- self.current_row.expid))
- self.conn.commit()
- return cur.lastrowid
- return None
- except sqlite3.Error as e:
- Log.warning(
- "Error while trying to update {0} in experiment_status.".format(str(self.expid)))
- Log.debug("From _update_exp_status: {0}".format(
- traceback.format_exc()))
- # Log.warning("Error on Update: " + str(type(e).__name__))
- return None
-
-
-def check_if_database_exists(expid):
- """
- Tests if historical database exists.
- :param expid: Experiment name (identifier)
- :type expid: str
- :return: True if exists, False otherwise.
- :rtype: bool
- """
- BasicConfig.read()
- folder_path = BasicConfig.JOBDATA_DIR
- database_path = os.path.join(folder_path, "job_data_" + str(expid) + ".db")
- if os.path.exists(database_path):
- return True
- else:
- return False
-
-
-class JobDataStructure(MainDataBase):
-
- def __init__(self, expid, check_only=False):
- """
- Initializes the object based on the unique identifier of the experiment.
- """
- MainDataBase.__init__(self, expid)
- BasicConfig.read()
- self.expid = expid
- self.basic_conf = BasicConfig
- self.folder_path = BasicConfig.JOBDATA_DIR
- self.database_path = os.path.join(
- self.folder_path, "job_data_" + str(expid) + ".db")
- #self.conn = None
- self.jobdata_list = JobDataList(self.expid)
- # job_data changes
- self.version_schema_changes.append(
- "ALTER TABLE job_data ADD COLUMN nnodes INTEGER NOT NULL DEFAULT 0")
- self.version_schema_changes.append(
- "ALTER TABLE job_data ADD COLUMN run_id INTEGER")
- # DB VERSION 15 changes
- self.version_schema_changes.append(
- "ALTER TABLE job_data ADD COLUMN MaxRSS REAL NOT NULL DEFAULT 0.0")
- self.version_schema_changes.append(
- "ALTER TABLE job_data ADD COLUMN AveRSS REAL NOT NULL DEFAULT 0.0")
- self.version_schema_changes.append(
- "ALTER TABLE job_data ADD COLUMN out TEXT NOT NULL DEFAULT ''")
- self.version_schema_changes.append(
- "ALTER TABLE job_data ADD COLUMN err TEXT NOT NULL DEFAULT ''")
- self.version_schema_changes.append(
- "ALTER TABLE job_data ADD COLUMN rowstatus INTEGER NOT NULL DEFAULT 0")
- # experiment_run changes
- self.version_schema_changes.append(
- "ALTER TABLE experiment_run ADD COLUMN suspended INTEGER NOT NULL DEFAULT 0")
- self.version_schema_changes.append(
- "ALTER TABLE experiment_run ADD COLUMN metadata TEXT")
- # We use rowtype to identify a packed job
- self.create_table_query = textwrap.dedent(
- '''CREATE TABLE
- IF NOT EXISTS job_data (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- counter INTEGER NOT NULL,
- job_name TEXT NOT NULL,
- created TEXT NOT NULL,
- modified TEXT NOT NULL,
- submit INTEGER NOT NULL,
- start INTEGER NOT NULL,
- finish INTEGER NOT NULL,
- status TEXT NOT NULL,
- rowtype INTEGER NOT NULL,
- ncpus INTEGER NOT NULL,
- wallclock TEXT NOT NULL,
- qos TEXT NOT NULL,
- energy INTEGER NOT NULL,
- date TEXT NOT NULL,
- section TEXT NOT NULL,
- member TEXT NOT NULL,
- chunk INTEGER NOT NULL,
- last INTEGER NOT NULL,
- platform TEXT NOT NULL,
- job_id INTEGER NOT NULL,
- extra_data TEXT NOT NULL,
- nnodes INTEGER NOT NULL DEFAULT 0,
- run_id INTEGER,
- MaxRSS REAL NOT NULL DEFAULT 0.0,
- AveRSS REAL NOT NULL DEFAULT 0.0,
- out TEXT NOT NULL,
- err TEXT NOT NULL,
- rowstatus INTEGER NOT NULL DEFAULT 0,
- UNIQUE(counter,job_name)
- );
- ''')
-
- # Creating the header table
- self.create_table_header_query = textwrap.dedent(
- '''CREATE TABLE
- IF NOT EXISTS experiment_run (
- run_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- created TEXT NOT NULL,
- start INTEGER NOT NULL,
- finish INTEGER,
- chunk_unit TEXT NOT NULL,
- chunk_size INTEGER NOT NULL,
- completed INTEGER NOT NULL,
- total INTEGER NOT NULL,
- failed INTEGER NOT NULL,
- queuing INTEGER NOT NULL,
- running INTEGER NOT NULL,
- submitted INTEGER NOT NULL,
- suspended INTEGER NOT NULL DEFAULT 0,
- metadata TEXT
- );
- ''')
-
- # Index creation is in a different statement
- self.create_index_query = textwrap.dedent('''
- CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name);
- ''')
-
- self.database_exists = True
- self.current_run_id = None
- self.is_original_run_id = True
- self.db_version = 0
- try:
- if check_only == False:
- if not os.path.exists(self.database_path):
- open(self.database_path, "w")
- self.conn = self.create_connection(self.database_path)
- self.create_table(self.create_table_header_query)
- self.create_table(self.create_table_query)
- self.create_index()
- if self._set_pragma_version(CURRENT_DB_VERSION):
- Log.info("Database version set.")
- self.db_version = CURRENT_DB_VERSION
- else:
- self.conn = self.create_connection(self.database_path)
- self.db_version = self._select_pragma_version()
- if self.db_version > CURRENT_DB_VERSION:
- Log.info(
- "Your version of Autosubmit implements an older database version. This might result in unexpected behavior in the job historical database.")
- if self.db_version < CURRENT_DB_VERSION:
- # Update to current version
- Log.info("Database schema needs update.")
- self.update_table_schema()
- self.create_index()
- self.create_table(self.create_table_header_query)
- if self._set_pragma_version(CURRENT_DB_VERSION):
- Log.info("Database version set to {0}.".format(
- CURRENT_DB_VERSION))
- self.db_version = CURRENT_DB_VERSION
- self.current_run_id = self.get_current_run_id()
- else:
- if not os.path.exists(self.database_path):
- self.database_exists = False
- else:
- self.conn = self.create_connection(self.database_path)
- self.db_version = self._select_pragma_version()
-
- except IOError as e:
- Log.debug(
- "Historical database I/O error on jobdatastructure constructor: {}".format(str(e)))
- pass
- # raise AutosubmitCritical("Historic Database route {0} is not accesible".format(
- # BasicConfig.JOBDATA_DIR), 7067, e.message)
- except Exception as e:
- Log.debug(
- "Historical database error on jobdatastructure constructor: {}".format(str(e)))
- pass
- # raise AutosubmitCritical(
- # "Historic Database {0} due an database error".format(), 7067, e.message)
-
- def is_header_ready_db_version(self):
- return True if self.db_version >= EXPERIMENT_HEADER_CHANGES_DB_VERSION else False
-
- def determine_rowtype(self, code):
- """
- Determines rowtype based on job information.
-
- :param packed: True if job belongs to wrapper, False otherwise
- :type packed: boolean
- :return: rowtype, >2 packed (wrapper code), 2 normal
- :rtype: int
- """
- if code:
- return code
- else:
- return RowType.NORMAL
-
- def get_current_run_id(self):
- """
- Get the Id of the current Experiment run.
-
- :return: run_id
- :rtype: int
- """
- current_run = self.get_max_id_experiment_run()
- if current_run:
- self.is_original_run_id = True
- return current_run.run_id
- else:
- new_run = ExperimentRun(0)
- self.is_original_run_id = False
- return self._insert_experiment_run(new_run)
-
- def process_status_changes(self, tracking_dictionary, job_list=None, chunk_unit="NA", chunk_size=0, check_run=False, current_config="", is_setstatus=False):
- """
- Finds and updates the changes of status of the jobs in the current job list.
- Creates new run if:
- - Greater or equal to 90% of completed date-member jobs are going to be changed.
- - The number of total jobs is not equal than the number of total jobs in the experiment header of the database.
-
- :param tracking_dictionary: map of changes
- :type tracking_dictionary: dict()
- :param job_list: current list of jobs
- :type job_list: list of Job objects
- :param chunk_unit: chunk unit from config
- :type chunk_unit: str
- :param chunk_size: chunk size from config
- :type chunk_size: int
- :param check_run: true if the experiment run should be checked
- :type check_run: bool
- :param current_config: current configuration of experiment
- :type current_config: JSON
- :return: None
- :rtype: None
- """
- try:
- current_run = self.get_max_id_experiment_run()
- if current_run:
- if tracking_dictionary is not None and bool(tracking_dictionary) == True:
- # Changes exist
- if job_list and check_run == True:
- current_date_member_completed_count = sum(
- 1 for job in job_list if job.date is not None and job.member is not None and job.status == Status.COMPLETED)
- if len(tracking_dictionary.keys()) >= int(current_date_member_completed_count * 0.9):
- # If setstatus changes more than 90% of date-member completed jobs, it's a new run
- # Update status of individual jobs
- if is_setstatus == True:
- self.update_jobs_from_change_status(
- tracking_dictionary)
- # Must create a new experiment run
- Log.debug(
- "Since a significant amount of jobs have changed status. Autosubmit will consider a new run of the same experiment.")
- self.validate_current_run(
- job_list, chunk_unit, chunk_size, must_create=True, only_update=False, current_config=current_config)
- return None
- if job_list:
- if len(tracking_dictionary.items()) > 0:
- total_number_jobs = len(job_list)
- # Changes exist
- completed_count = sum(
- 1 for job in job_list if job.status == Status.COMPLETED)
- failed_count = sum(
- 1 for job in job_list if job.status == Status.FAILED)
- queue_count = sum(
- 1 for job in job_list if job.status == Status.QUEUING)
- submit_count = sum(
- 1 for job in job_list if job.status == Status.SUBMITTED)
- running_count = sum(
- 1 for job in job_list if job.status == Status.RUNNING)
- suspended_count = sum(
- 1 for job in job_list if job.status == Status.SUSPENDED)
- current_run.completed = completed_count
- current_run.failed = failed_count
- current_run.queuing = queue_count
- current_run.submitted = submit_count
- current_run.running = running_count
- current_run.suspended = suspended_count
- # Update status of individual jobs
- if is_setstatus == True:
- self.update_jobs_from_change_status(
- tracking_dictionary)
- # Check if we are still dealing with the right number of jobs
- if current_run.total != total_number_jobs:
- # must_create = True, signals that a new run should be created
- self.validate_current_run(job_list, current_run.chunk_unit, current_run.chunk_size,
- must_create=True, only_update=False, current_config=current_run.metadata)
- else:
- self._update_experiment_run(current_run)
- return None
- except Exception as exp:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning(
- "Autosubmit couldn't process status changes validate_current_run {0}".format(str(exp)))
- return None
-
- def validate_current_run(self, job_list, chunk_unit="NA", chunk_size=0, must_create=False, only_update=False, current_config=""):
- """
- Checks current run and created a new run or updates the existing run if necessary. Returns the current id of the run.
- A new run is created when:
- - A new run is always generated on autosubmit create.
- - If there is no current run in the database when experiment run starts, it creates a new experiment run register.
- - If the must_create Flag is true.
- - If the total number of jobs in the job_list object is different than the total number of jobs in the current state and only_update is False.
-
- :param job_list: list of jobs in experiment
- :type job_list: list of Job objects
- :param chunk_unit: Chunk unit in the settings of the experiment
- :type chunk_unit: str
- :param chunk_size: Chunk size in the settings of the experiment
- :type chunk_size: str
- :param must_create: True if a new experiment run register must be created
- :type must_create: bool
- :param only_update: True if the process should only update an existing register
- :type only_update: bool
- :param current_config: current configuration of the experiment as a JSON object
- :type current_config: JSON object
- :return: Id of the current run, None if error
- :type: int
- """
- try:
- if not job_list:
- raise Exception(
- "Historical database: Autosubmit couldn't find the job_list. validate_current_run.")
- current_run = self.get_max_id_experiment_run()
- current_total = len(job_list)
- completed_count = sum(
- 1 for job in job_list if job.status == Status.COMPLETED)
- failed_count = sum(
- 1 for job in job_list if job.status == Status.FAILED)
- queue_count = sum(
- 1 for job in job_list if job.status == Status.QUEUING)
- submit_count = sum(
- 1 for job in job_list if job.status == Status.SUBMITTED)
- running_count = sum(
- 1 for job in job_list if job.status == Status.RUNNING)
- suspended_count = sum(
- 1 for job in job_list if job.status == Status.SUSPENDED)
-
- if not current_run or must_create == True:
- # If there is not current run register, or it must be created
- new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count,
- current_total, failed_count, queue_count, running_count, submit_count, suspended_count, current_config)
- self.current_run_id = self._insert_experiment_run(new_run)
- self.is_original_run_id = False
- return self.current_run_id
- else:
- if current_run.total != current_total and only_update == False:
- # There is a difference in total jobs, create new experiment run
- new_run = ExperimentRun(0, None, 0, 0, chunk_unit, chunk_size, completed_count,
- current_total, failed_count, queue_count, running_count, submit_count, suspended_count, current_config)
- self.current_run_id = self._insert_experiment_run(new_run)
- self.is_original_run_id = False
- return self.current_run_id
- else:
- # There is no difference in total jobs or it must only update
- current_run.completed = completed_count
- current_run.failed = failed_count
- current_run.queuing = queue_count
- current_run.submitted = submit_count
- current_run.running = running_count
- current_run.suspended = suspended_count
- current_run.total = current_total if only_update == True else current_run.total
- current_run.finish = 0
- self._update_experiment_run(current_run)
- self.current_run_id = current_run.run_id
- self.is_original_run_id = True
- return self.current_run_id
- except Exception as exp:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning(
- "Historical database error: Autosubmit couldn't insert a new experiment run register. validate_current_run {0}".format(str(exp)))
- return None
-
- def update_finish_time(self):
- """
- Update finish time of experiment. Updates the current_run_id attribute in this object.
- """
- try:
- current_run = self.get_max_id_experiment_run()
- if current_run:
- current_run.finish = int(time.time())
- self._update_experiment_run(current_run)
- self.current_run_id = current_run.run_id
- self.is_original_run_id = True
- except Exception as exp:
- Log.debug(str(exp))
- pass
-
- def get_job_package_code(self, current_job_name):
- """
- Finds the package code and retrieves it. None if no package.
-
- :param BasicConfig: Basic configuration
- :type BasicConfig: Configuration Object
- :param expid: Experiment Id
- :type expid: String
- :param current_job_name: Name of job
- :type current_jobs: string
- :return: package code, None if not found
- :rtype: int or None
- """
- packages = packages_plus = None
- count_packages = count_packages_plus = 0
- try:
- packages = JobPackagePersistence(os.path.join(self.basic_conf.LOCAL_ROOT_DIR, self.expid, "pkl"),
- "job_packages_" + self.expid).load(wrapper=True)
- count_packages = len(packages)
- except Exception as ex:
- Log.debug(
- "Wrapper table not found, trying packages. JobDataStructure.retrieve_packages")
- packages = None
-
- try:
- packages_plus = JobPackagePersistence(os.path.join(self.basic_conf.LOCAL_ROOT_DIR, self.expid, "pkl"),
- "job_packages_" + self.expid).load(wrapper=False)
- count_packages_plus = len(packages_plus)
- except Exception as ex:
- Log.debug(
- "Wrapper table not found, trying packages. JobDataStructure.retrieve_packages")
- packages_plus = None
-
- if (packages or packages_plus):
- packages_source = packages if count_packages > count_packages_plus else packages_plus
- try:
- for exp, package_name, job_name in packages_source:
- if current_job_name == job_name:
- code = int(package_name.split("_")[2])
- return code
- except Exception as ex:
- Log.warning(
- "Package parse error. JobDataStructure.retrieve_packages")
- Log.debug(traceback.format_exc())
- return None
- return None
-
- def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False, wrapper_queue=None):
- """
- Writes submit time of job into the historical database.
-
- :param job_name: Name of the job
- :type job_name: str
- :param submit: Submit time as timestamp
- :type submit: int
- :param status: Status of the job
- :type status: str
- :param ncpus: Number of processors requested by the job
- :type ncpus: int
- :param wallclock: Wallclock requested by the job
- :type wallclock: str
- :param qos: Queue requested by the job
- :type qos: str
- :param date: date of the job (from experiment config)
- :type date: str
- :param member: member of the job (from experiment config)
- :type member: str
- :param section: section of the job (from experiment config)
- :type section: str
- :param chunk: chunk of the job (from experiment config)
- :type chunk: int
- :param platform: Name of the target platform
- :type platform: str
- :param job_id: JobId in the platform
- :type job_id: int
- :param packed: True if job belongs to a wrapper
- :type packed: bool
- :param wrapper_queue: Name of the queue requested by the wrapper
- :type wrapper_queue: str
- :return: True if succesfully saved
- :rtype: True or None
- """
- try:
- job_data = self.get_job_data(job_name)
- current_counter = 1
- max_counter = self._get_maxcounter_jobdata()
- if job_data and len(job_data) > 0:
- job_max_counter = max(job.counter for job in job_data)
- current_last = [
- job for job in job_data if job.counter == job_max_counter]
- for current in current_last:
- # Deactivate current last for this job
- current.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
- up_id = self._deactivate_current_last(current)
- # Finding current counter
- current_counter = (
- job_max_counter + 1) if job_max_counter >= max_counter else max_counter
- else:
- current_counter = max_counter
- package_code = self.get_job_package_code(job_name)
- queue_name = wrapper_queue if (
- package_code and package_code > 2 and wrapper_queue is not None) else qos
- # Insert new last
- rowid = self._insert_job_data(JobData(
- 0, current_counter, job_name, None, None, submit, 0, 0, status, self.determine_rowtype(package_code), ncpus, wallclock, queue_name, 0, date, member, section, chunk, 1, platform, job_id, dict(), 0, self.current_run_id))
- if rowid:
- return True
- else:
- return None
- except Exception as exp:
- Log.debug(traceback.format_exc())
- Log.warning(
- "Autosubmit couldn't write submit time.")
- return None
-
- # if rowid > 0:
- # print("Successfully inserted")
-
- def write_start_time(self, job_name, start=0, status="UNKWOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, packed=False, wrapper_queue=None):
- """
- Writes start time into the database
-
- :param job_name: Name of the job
- :type job_name: str
- :param start: Start time as timestamp
- :type start: int
- :param status: Status of the job
- :type status: str
- :param ncpus: Number of processors requested by the job
- :type ncpus: int
- :param wallclock: Wallclock requested by the job
- :type wallclock: str
- :param qos: Queue requested by the job
- :type qos: str
- :param date: date of the job (from experiment config)
- :type date: str
- :param member: member of the job (from experiment config)
- :type member: str
- :param section: section of the job (from experiment config)
- :type section: str
- :param chunk: chunk of the job (from experiment config)
- :type chunk: int
- :param platform: Name of the target platform
- :type platform: str
- :param job_id: JobId in the platform
- :type job_id: int
- :param packed: True if job belongs to a wrapper
- :type packed: bool
- :param wrapper_queue: Name of the queue requested by the wrapper
- :type wrapper_queue: str
- :return: True if succesfully saved
- :rtype: True or None
- """
- try:
- job_data_last = self.get_job_data_last(job_name)
- # Updating existing row
- if job_data_last:
- job_data_last = job_data_last[0]
- if job_data_last.start == 0:
- package_code = self.get_job_package_code(job_name)
- queue_name = wrapper_queue if (
- package_code and package_code > 2 and wrapper_queue is not None) else qos
- job_data_last.start = start
- job_data_last.qos = queue_name
- job_data_last.status = status
- job_data_last.rowtype = self.determine_rowtype(
- package_code)
- job_data_last.job_id = job_id
- job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
- _updated = self._update_start_job_data(job_data_last)
- return _updated
- # It is necessary to create a new row
- submit_inserted = self.write_submit_time(
- job_name, start, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, packed, wrapper_queue)
- if submit_inserted:
- # print("retro start")
- self.write_start_time(job_name, start, status,
- ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, packed)
- return True
- else:
- return None
- except Exception as exp:
- Log.debug(traceback.format_exc())
- Log.warning(
- "Autosubmit couldn't write start time.")
- return None
-
- def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, platform_object=None, packed=False, parent_id_list=[], no_slurm=True, out_file_path=None, out_file=None, err_file=None, wrapper_queue=None):
- """
- Writes the finish time into the database
-
- :param job_name: Name of the job
- :type job_name: str
- :param finish: Finish time as timestamp
- :type finish: int
- :param status: Status of the job
- :type status: str
- :param ncpus: Number of processors requested by the job
- :type ncpus: int
- :param wallclock: Wallclock requested by the job
- :type wallclock: str
- :param qos: Queue requested by the job
- :type qos: str
- :param date: date of the job (from experiment config)
- :type date: str
- :param member: member of the job (from experiment config)
- :type member: str
- :param section: section of the job (from experiment config)
- :type section: str
- :param chunk: chunk of the job (from experiment config)
- :type chunk: int
- :param platform: Name of the target platform
- :type platform: str
- :param job_id: JobId in the platform
- :type job_id: int
- :param platform_object: Platform Object
- :type platform: Object
- :param packed: True if job belongs to a wrapper
- :type packed: bool
- :param parent_id_list: List of parents (not in use)
- :type parent_id_list: list
- :param no_slurm: True if job belongs to slurm platform
- :type no_slurm: bool
- :param out_file_path: Path to the out file of the job
- :type out_file_path: str
- :param out_file: Name of the out file
- :type out_file: str
- :param err_file: Name of the err file
- :type err_file: str
- :param wrapper_queue: Name of the queue requested by the wrapper
- :type wrapper_queue: str
- :return: True if succesfully saved
- :rtype: True or None
- """
- try:
- # Current thread:
- BasicConfig.read()
- # self.expid = expid
- # self.basic_conf = BasicConfig
- self.folder_path = BasicConfig.JOBDATA_DIR
- self.database_path = os.path.join(
- self.folder_path, "job_data_" + str(self.expid) + ".db")
- self.conn = self.create_connection(self.database_path)
-
- # print("Writing finish time \t" + str(job_name) + "\t" + str(finish))
- job_data_last = self.get_job_data_last(job_name)
- # energy = 0
- is_packed = False
- is_end_of_wrapper = False
- submit_time = start_time = finish_time = number_nodes = number_cpus = energy = 0
- extra_data = dict()
- # Updating existing row
- if job_data_last and len(job_data_last) > 0:
- job_data_last = job_data_last[0]
- is_packed = True if job_data_last.rowtype > 1000 else False
-
- # Call Slurm here, update times.
- if platform_object and no_slurm == False:
- # print("There is platform object")
- try:
- if type(platform_object) is not str:
- if platform_object.type == "slurm" and job_id > 0:
- # Waiting 60 seconds for slurm data completion
- time.sleep(60)
- submit_time, start_time, finish_time, energy, number_cpus, number_nodes, extra_data, is_end_of_wrapper = platform_object.check_job_energy(
- job_id, is_packed)
- # Writing EXTRADATA
- if job_id > 0 and out_file_path is not None:
- if job_data_last.job_id == job_id:
- # print("Writing extra info")
- platform_object.write_job_extrainfo(
- job_data_last.get_hdata(), out_file_path)
- except Exception as exp:
- # Log.info(traceback.format_exc()) #TODO Wilmer, this is stopping autosubmit, "Tuple index out of range"
- Log.info("Couldn't write finish time {0}", exp.message)
- Log.warning(str(exp))
- #energy = 0
-
- try:
- extra_data["parents"] = [int(item)
- for item in parent_id_list]
- except Exception as inner_exp:
- Log.debug(
- "Parent Id List couldn't be parsed to array of int. Using default values.")
- extra_data["parents"] = parent_id_list
- pass
- current_timestamp = finish if finish > 0 else int(time.time())
- job_data_last.finish = current_timestamp # Take finish from input
- # job_data_last.finish = finish_time if finish_time > 0 and finish_time >= job_data_last.start else (
- # current_timestamp if no_slurm == True else job_data_last.finish)
- #print("Job data finish time {0}".format(job_data_last.finish))
- job_data_last.status = status
- job_data_last.job_id = job_id
- job_data_last.energy = energy
- job_data_last.rowstatus = RowStatus.COMPLETED
- job_data_last.out = out_file if out_file else ""
- job_data_last.err = err_file if err_file else ""
- # TODO: These values need to be retrieved from the sacct command
- job_data_last.MaxRSS = 0.0
- job_data_last.AveRSS = 0.0
- # END TODO
- job_data_last.ncpus = number_cpus if number_cpus > 0 else job_data_last.ncpus
- job_data_last.nnodes = number_nodes if number_nodes > 0 else job_data_last.nnodes
- job_data_last.extra_data = dumps(
- extra_data) if extra_data else None
- job_data_last.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
- if is_packed == False and submit_time > 0 and start_time > 0:
- job_data_last.submit = int(submit_time)
- job_data_last.start = int(start_time)
- rowid = self._update_finish_job_data_plus(job_data_last)
- else:
- job_data_last.start = job_data_last.start if job_data_last.start > 0 else start_time
- rowid = self._update_finish_job_data_plus(job_data_last)
- if no_slurm == False and is_end_of_wrapper == True:
- self.process_current_run_collection()
- return True
- else:
- # Implementing a warning to keep track of it in the log.
- Log.warning("Historical database: The register for {} from path {} was not found. The system will try to restore it with default values.".format(
- job_name, self.database_path))
- # It is necessary to create a new row
- submit_inserted = self.write_submit_time(
- job_name, finish, status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, is_packed, wrapper_queue)
- write_inserted = self.write_start_time(job_name, finish, status, ncpus,
- wallclock, qos, date, member, section, chunk, platform, job_id, is_packed, wrapper_queue)
-
- if submit_inserted and write_inserted:
- self.write_finish_time(
- job_name, time.time(), status, ncpus, wallclock, qos, date, member, section, chunk, platform, job_id, platform_object, is_packed, parent_id_list, no_slurm, out_file_path, out_file, err_file, wrapper_queue)
- else:
- return None
- except Exception as exp:
- Log.debug(traceback.format_exc())
- Log.warning("Autosubmit couldn't write finish time.")
- return None
-
- def process_current_run_collection(self):
- """
- Post-process source output for job_data.
-
- :return: job data processes, messages
- :rtype: ([job_data], [warning_messaages])
- """
-
- try:
- # start_time = time.time()
- current_job_data = None
- # warning_messages = []
- experiment_run = self.get_max_id_experiment_run()
- # List of jobs from pkl -> Dictionary
- if experiment_run:
- # List of last runs of jobs
- current_job_data = self.get_current_job_data(
- experiment_run.run_id)
- if not current_job_data:
- Log.warning(
- "Autosubmit did not find historical database information.")
- return None
- # warning_messages.append(
- # "Critical | This version of Autosubmit does not support the database that provides the energy information.")
- # Start processing
- if current_job_data:
- # Dropping parents key
- for job in current_job_data:
- if job.extra_data is not None and isinstance(job.extra_data, dict):
- job.extra_data.pop('parents', None)
- # Internal map from name to object
- name_to_current_job = {
- job.job_name: job for job in current_job_data}
- # Unique packages where rowtype > 2
- packages = set(
- job.rowtype for job in current_job_data if job.rowtype > 2)
- # Start by processing packages
- for package in packages:
- # All jobs in package
- jobs_in_package = [
- job for job in current_job_data if job.rowtype == package]
- # Order package by submit order
- jobs_in_package.sort(key=lambda x: x._id, reverse=True)
- # Internal list of single-purpose objects
- wrapper_jobs = []
- sum_total_energy = 0
- not_1_to_1 = True
- keys_found = False
- no_process = False
- for job_data in jobs_in_package:
- # If it is a wrapper job step
- if job_data.extra_data is not None and isinstance(job_data.extra_data, dict) and job_data.extra_data.get("energy", None) and job_data.extra_data["energy"] != "NA":
- name_to_current_job[job_data.job_name].energy = parse_output_number(
- job_data.extra_data["energy"])
- sum_total_energy += name_to_current_job[job_data.job_name].energy
- else:
- # Identify best source
- description_job = max(
- jobs_in_package, key=lambda x: len(str(x.extra_data)))
- # Identify job steps
- keys_step = [
- y for y in description_job.extra_data.keys() if '.' in y and y[y.index('.') + 1:] not in ["batch", "extern"] and y != "parents"] if description_job.extra_data and isinstance(description_job.extra_data, dict) else []
- if len(keys_step) > 0:
- # Steps found
- keys_step.sort(
- key=lambda x: int(x[x.index('.') + 1:]))
- keys_found = True
- # Find all job steps
- for key in keys_step:
- if "submit" not in description_job.extra_data[key].keys():
- keys_found = False
- break
- # Build wrapper jobs as job steps
- for key in keys_step:
- wrapper_jobs.append(JobStepExtraData(
- key, description_job.extra_data[key]))
-
- sum_total_energy = sum(
- jobp.energy for jobp in wrapper_jobs) * 1.0
-
- if len(jobs_in_package) == len(wrapper_jobs) and len(wrapper_jobs) > 0:
- # Approximation
- not_1_to_1 = False
- else:
- # No jobs steps, identify main step
- main_step = [
- y for y in description_job.extra_data.keys() if '.' not in y and y != "parents"] if description_job.extra_data and isinstance(description_job.extra_data, dict) else []
- # For some reason, a packaged jobs can arrive as a single job slurm output
- if len(main_step) > 0 and main_step[0] not in ['AveRSS', 'finish', 'ncpus', 'submit', 'MaxRSS', 'start', 'nnodes', 'energy']:
- # Check only first one
- main_step = [main_step[0]]
- # If main step contains submit, its valid. Else, break, not valid,
- for key in main_step:
- if key not in description_job.extra_data.keys() or "submit" not in description_job.extra_data[key].keys():
- keys_found = False
- break
- # Build wrapper jobs as job steps
- for key in main_step:
- wrapper_jobs.append(JobStepExtraData(
- key, description_job.extra_data[key]))
- # Total energy for main job
- sum_total_energy = sum(
- jobp.energy for jobp in wrapper_jobs) * 1.0
-
- else:
- no_process = True
- # warning_messages.append(
- # "Wrapper | Wrapper {0} does not have information to perform any energy approximation.".format(package))
- break
- # Keys do not have enough information
- # if keys_found == False:
- # warning_messages.append(
- # "Wrapper | Wrapper {0} does not have complete sacct data available.".format(package))
- # If it is not a 1 to 1 relationship between jobs in package and job steps
- if sum_total_energy > 0:
- if not_1_to_1 == True and no_process == False:
- # It is not 1 to 1, so we perform approximation
- # warning_messages.append(
- # "Approximation | The energy results in wrapper {0} are an approximation. Total energy detected: {1}.".format(package, sum_total_energy))
- # Completing job information if necessary
- # dropped_jobs = [job_data.job_name for job_data in jobs_in_package if job_data.running_time() <= 0]
- jobs_in_package = [
- job_data for job_data in jobs_in_package if job_data.running_time() > 0]
-
- # After completion is finished, calculate total resources to be approximated
- resources_total = sum(
- z.ncpus * z.running_time() for z in jobs_in_package) * 1.0
- if resources_total > 0:
- for job_data in jobs_in_package:
- job_data_factor = (
- job_data.ncpus * job_data.running_time())
- name_to_current_job[job_data.job_name].energy = round(job_data_factor /
- resources_total * sum_total_energy, 2)
- # else:
- # warning_messages.append(
- # "Approximation | Aproximation for wrapper {0} failed.".format(package))
- else:
- # Check if it is 1 to 1
- # If it is 1 to 1, then jobs in package is equal to wrapper jobs in size, so we can assign energy based on order of jobs.
- # Needs more guarantees but so far it works.
- if len(jobs_in_package) > 0 and len(wrapper_jobs) > 0 and len(jobs_in_package) == len(wrapper_jobs) and no_process == False:
- # It is 1 to 1
- for i in xrange(0, len(jobs_in_package)):
- name_to_current_job[jobs_in_package[i]
- .job_name].energy = wrapper_jobs[i].energy
- name_to_current_job[jobs_in_package[i]
- .job_name].submit = wrapper_jobs[i].submit
- name_to_current_job[jobs_in_package[i]
- .job_name].start = wrapper_jobs[i].start
- name_to_current_job[jobs_in_package[i]
- .job_name].finish = wrapper_jobs[i].finish
-
- for job_data in current_job_data:
- # Making VERY sure
- if job_data.rowtype == 2 and job_data.extra_data and isinstance(job_data.extra_data, dict) and len(job_data.extra_data) > 0:
- keys = [x for x in job_data.extra_data.keys()
- if x != "parents" and '.' not in x]
- if len(keys) > 0:
- found_energy = job_data.extra_data[keys[0]]["energy"]
- # Resort to batch if main is NA
- found_energy = found_energy if found_energy != "NA" else (
- job_data.extra_data[keys[0] + ".batch"]["energy"] if keys[0] + ".batch" in job_data.extra_data.keys() else found_energy)
- job_data.energy = parse_output_number(
- found_energy)
- else:
- continue
- # warning_messages.append(
- # "Single Job | Job {0} has no energy information available. {1} ".format(job_data.job_name, keys))
- # Updating detected energy values
- self.update_energy_values(
- [job for job in current_job_data if job.require_update == True])
-
- except Exception as exp:
- # stack = traceback.extract_stack()
- # (filename, line, procname, text) = stack[-1]
- Log.info(traceback.format_exc())
- Log.warning(
- "Autosubmit couldn't process the SLURM output. ".format(str(exp)))
- pass
-
- def update_energy_values(self, update_job_data):
- """Updating energy values
-
- :param update_job_data: list JobData object
- :type update_job_data: List of JobData
- """
- try:
- #print("Updating {0}".format(len(update_job_data)))
- for jobdata in update_job_data:
- # print("Job {0} requires update. Energy {1}.".format(
- # jobdata.job_name, jobdata.energy))
- self._update_job_data(jobdata)
- self.conn.commit()
- except Exception as exp:
- Log.info(traceback.format_exc())
- Log.warning(
- "Autosubmit couldn't retrieve experiment run header. update_energy_values. Exception {0}".format(str(exp)))
- pass
-
- def update_jobs_from_change_status(self, tracking_dictionary):
- """
- Updates the status of the jobs according to the tracked changes.
- Status allowed: 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN', 'QUEUING', 'RUNNING', 'HELD'
-
- :param tracking_dictionary: name -> (status, final_status)
- :type tracking_dictionary: dict()
- :return: True if updated, False otherwise
- :rtype: bool
- """
- if tracking_dictionary:
- changes = []
- current_job_data_detail = self.get_current_job_data(
- self.current_run_id, only_finished=False)
- # for x in current_job_data_detail:
- # print("{} {} {}".format(x.job_name, x.last, x.status))
- for job_name in tracking_dictionary:
- status_code, final_status_code = tracking_dictionary[job_name]
- #print("{} from {} to {}".format(job_name, status_code, final_status_code))
- status_string = Status.VALUE_TO_KEY[status_code]
- final_status_string = Status.VALUE_TO_KEY[final_status_code]
- # REMOVED "and job_data.status == status_string" from the filter.
- current_job_data = next(
- (job_data for job_data in current_job_data_detail if job_data.job_name == job_name and job_data.last == 1), None)
- # We found the current row that matches the provided current status
- if current_job_data:
- # print("{} to {}".format(job_name, final_status_code))
- if final_status_code in [Status.COMPLETED, Status.FAILED, Status.QUEUING, Status.RUNNING, Status.HELD, Status.SUSPENDED]:
- # new_current_job_data = copy.deepcopy(current_job_data)
- current_job_data.modified = datetime.today().strftime('%Y-%m-%d-%H:%M:%S')
- current_job_data.status = final_status_string
- current_job_data.finish = time.time() if final_status_code in [
- Status.COMPLETED, Status.FAILED] else 0
- changes.append((current_job_data.finish, current_job_data.modified,
- current_job_data.status, RowStatus.CHANGED, current_job_data._id))
- #print("Added {}".format(current_job_data.job_name))
- # else:
- # print("{} not found".format(job_name))
- if len(changes) > 0:
- result = self._update_many_job_data_change_status(changes)
- return result
- return None
-
- def get_all_job_data(self):
- """
- Get all register from job_data.
- """
- try:
- if os.path.exists(self.folder_path):
-
- current_table = self._get_all_job_data()
- # current_job_data = dict()
- if current_table:
- for item in current_table:
- # _id, _counter, _job_name, _created, _modified, _submit, _start, _finish, _status, _rowtype, _ncpus, _wallclock, _qos, _energy, _date, _section, _member, _chunk, _last, _platform = item
- job_item = JobItem(*item)
- self.jobdata_list.add_jobdata(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status,
- job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes if 'nnodes' in job_item._fields else 0, job_item.run_id if 'run_id' in job_item._fields else None, job_item.MaxRSS if 'MaxRSS' in job_item._fields else 0.0, job_item.AveRSS if 'AveRSS' in job_item._fields else 0.0, job_item.out if 'out' in job_item._fields else "", job_item.err if 'err' in job_item._fields else "", job_item.rowstatus if 'rowstatus' in job_item._fields else RowStatus.FAULTY))
-
- else:
- raise Exception("Job data folder not found :" +
- str(self.jobdata_path))
- except Exception as exp:
- Log.debug(traceback.format_exc())
- Log.warning(
- "Autosubmit couldn't retrieve job data. get_all_job_data")
- return None
-
- def get_job_data(self, job_name):
- """Retrieves all the rows that have the same job_name
-
- :param job_name: name of job
- :type job_name: str
- :return: all jobs with the sanme job name
- :rtype: list of JobItem objects
- """
- try:
- job_data = list()
- if os.path.exists(self.folder_path):
- current_job = self._get_job_data(job_name)
- if current_job:
- for item in current_job:
- job_item = JobItem(*item)
- job_data.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status,
- job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes if 'nnodes' in job_item._fields else 0, job_item.run_id if 'run_id' in job_item._fields else None, job_item.MaxRSS if 'MaxRSS' in job_item._fields else 0.0, job_item.AveRSS if 'AveRSS' in job_item._fields else 0.0, job_item.out if 'out' in job_item._fields else "", job_item.err if 'err' in job_item._fields else "", job_item.rowstatus if 'rowstatus' in job_item._fields else RowStatus.FAULTY))
- return job_data
- else:
- return None
- else:
- raise Exception("Job data folder not found :" +
- str(self.jobdata_path))
- except Exception as exp:
- Log.debug(traceback.format_exc())
- Log.warning("Autosubmit couldn't retrieve job data. get_job_data")
- return None
-
- def get_current_job_data(self, run_id, only_finished=True):
- """
- Gets current job_data for provided run_id.
- :param run_id: run identifier
- :type run_id: int
- """
- try:
- current_collection = []
- # if self.db_version < DB_VERSION_SCHEMA_CHANGES:
- # raise Exception("This function requieres a newer DB version.")
- if os.path.exists(self.folder_path):
- current_job_data = None
- if only_finished == True:
- current_job_data = self._get_current_job_data(run_id)
- else:
- current_job_data = self._get_current_last_job_data(run_id)
- if current_job_data:
- for job_data in current_job_data:
- jobitem = JobItem(*job_data)
- current_collection.append(JobData(jobitem.id, jobitem.counter, jobitem.job_name, jobitem.created, jobitem.modified, jobitem.submit, jobitem.start, jobitem.finish, jobitem.status, jobitem.rowtype, jobitem.ncpus,
- jobitem.wallclock, jobitem.qos, jobitem.energy, jobitem.date, jobitem.section, jobitem.member, jobitem.chunk, jobitem.last, jobitem.platform, jobitem.job_id, jobitem.extra_data, jobitem.nnodes if 'nnodes' in jobitem._fields else 0, jobitem.run_id if 'run_id' in jobitem._fields else None, jobitem.MaxRSS if 'MaxRSS' in jobitem._fields else 0.0, jobitem.AveRSS if 'AveRSS' in jobitem._fields else 0.0, jobitem.out if 'out' in jobitem._fields else "", jobitem.err if 'err' in jobitem._fields else "", jobitem.rowstatus if 'rowstatus' in jobitem._fields else RowStatus.FAULTY))
- return current_collection
- return None
- except Exception as exp:
- print(traceback.format_exc())
- print(
- "Error on returning current job data. run_id {0}".format(run_id))
- return None
-
- def get_max_id_experiment_run(self):
- """Get Max experiment run object (last experiment run)
-
- :return: ExperimentRun object
- :rtype: Object
- """
- try:
- #expe = list()
- if os.path.exists(self.folder_path):
- current_experiment_run = self._get_max_id_experiment_run()
- if current_experiment_run:
- exprun_item = ExperimentRunItem(*current_experiment_run)
- return ExperimentRun(exprun_item.run_id, exprun_item.created, exprun_item.start, exprun_item.finish, exprun_item.chunk_unit, exprun_item.chunk_size, exprun_item.completed, exprun_item.total, exprun_item.failed, exprun_item.queuing, exprun_item.running, exprun_item.submitted, exprun_item.suspended, exprun_item.metadata)
- else:
- return None
- else:
- raise Exception("Job data folder not found :" +
- str(self.jobdata_path))
- except Exception as exp:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning(
- "Autosubmit couldn't retrieve experiment run header. get_max_id_experiment_run")
- return None
-
- def get_job_data_last(self, job_name):
- """
- Returns latest jobdata row for a job_name. The current version.
-
- :param job_name:
- :type job_name:
- :return: Rows with last = 1
- :rtype: List of JobData objects
- """
- try:
- jobdata = list()
- if os.path.exists(self.folder_path):
- current_job_last = self._get_job_data_last(job_name)
- if current_job_last:
- for current in current_job_last:
- job_item = JobItem(*current)
- jobdata.append(JobData(job_item.id, job_item.counter, job_item.job_name, job_item.created, job_item.modified, job_item.submit, job_item.start, job_item.finish, job_item.status,
- job_item.rowtype, job_item.ncpus, job_item.wallclock, job_item.qos, job_item.energy, job_item.date, job_item.section, job_item.member, job_item.chunk, job_item.last, job_item.platform, job_item.job_id, job_item.extra_data, job_item.nnodes if 'nnodes' in job_item._fields else 0, job_item.run_id if 'run_id' in job_item._fields else None, job_item.MaxRSS if 'MaxRSS' in job_item._fields else 0.0, job_item.AveRSS if 'AveRSS' in job_item._fields else 0.0, job_item.out if 'out' in job_item._fields else "", job_item.err if 'err' in job_item._fields else "", job_item.rowstatus if 'rowstatus' in job_item._fields else RowStatus.FAULTY))
- return jobdata
- else:
- return None
- else:
- raise Exception("Job data folder not found :" +
- str(self.jobdata_path))
- except Exception as exp:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning(
- "Autosubmit couldn't retrieve job data. get_job_data_last")
- return None
-
- def _deactivate_current_last(self, jobdata):
- """
- Sets last = 0 to row with id
- """
- try:
- if self.conn:
- sql = ''' UPDATE job_data SET last=0, modified = ? WHERE id = ?'''
- tuplerow = (jobdata.modified, jobdata._id)
- cur = self.conn.cursor()
- cur.execute(sql, tuplerow)
- self.conn.commit()
- return cur.lastrowid
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on Insert : " + str(type(e).__name__))
- return None
-
- def _update_start_job_data(self, jobdata):
- """
- Update job_data by id. Updates start, modified, job_id, status.
-
- :param jobdata: Job information
- :type jobdata: JobData object
- :return: True, None if failed
- :rtype: Bool or None
- """
- # current_time =
- try:
- if self.conn:
- sql = ''' UPDATE job_data SET start=?, modified=?, job_id=?, status=?, rowtype=? WHERE id=? '''
- cur = self.conn.cursor()
- cur.execute(sql, (int(jobdata.start),
- jobdata.modified, jobdata.job_id, jobdata.status, jobdata.rowtype, jobdata._id))
- self.conn.commit()
- return True
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on Insert : " + str(type(e).__name__))
- return None
-
- def _update_finish_job_data_plus(self, jobdata):
- """
- Update register by id. Updates submit, start, finish, modified, job_id, status, energy, extra_data, nnodes, ncpus, rowstatus, out, err
-
- :param jobdata: Job information
- :type jobdata: JobData object
- :return: last row id, None if failed
- :rtype: integer or None
- """
- try:
- if self.conn:
- sql = ''' UPDATE job_data SET submit=?, start=?, finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=?, rowstatus=?, out=?, err=? WHERE id=? '''
- cur = self.conn.cursor()
- cur.execute(sql, (jobdata.submit, jobdata.start, jobdata.finish, jobdata.modified, jobdata.job_id,
- jobdata.status, jobdata.energy, jobdata.extra_data, jobdata.nnodes, jobdata.ncpus, RowStatus.COMPLETED, jobdata.out, jobdata.err, jobdata._id))
- self.conn.commit()
- return cur.lastrowid
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on Update : " + str(type(e).__name__))
- return None
-
- def _update_many_job_data_change_status(self, changes):
- """
- Update a many job_data updates.
-
- :param changes: list of tuples of the data to change (finish, modified, status, rowstatus, id)
- :type changes: 5-tuple (int, str, str, int, int)
- :return: True if updated
- :rtype: bool
- """
- try:
- if self.conn:
- sql = ''' UPDATE job_data SET finish=?, modified=?, status=?, rowstatus=? WHERE id=? '''
- cur = self.conn.cursor()
- cur.executemany(sql, changes)
- self.conn.commit()
- return True
- return False
- except Exception as exp:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Historical database error: {}".format(exp))
-
- def _update_finish_job_data(self, jobdata):
- """
- Update register by id. Updates finish, modified, job_id, status, energy, extra_data, nnodes, ncpus, rowstatus, out, err
-
- :param jobdata: Job information
- :type jobdata: JobData object
- :return: last row id, None if failed
- :rtype: integer or None
- """
- try:
- if self.conn:
- # print("Updating finish time")
- sql = ''' UPDATE job_data SET finish=?, modified=?, job_id=?, status=?, energy=?, extra_data=?, nnodes=?, ncpus=?, rowstatus=?, out=?, err=? WHERE id=? '''
- cur = self.conn.cursor()
- cur.execute(sql, (jobdata.finish, jobdata.modified, jobdata.job_id,
- jobdata.status, jobdata.energy, jobdata.extra_data, jobdata.nnodes, jobdata.ncpus, RowStatus.COMPLETED, jobdata.out, jobdata.err, jobdata._id))
- self.conn.commit()
- return cur.lastrowid
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on Update : " + str(type(e).__name__))
- return None
-
- # PROCESSED
- def _update_job_data(self, job_data):
- """
- Updating PROCESSED job_data
-
- :param job_data: JobData object with changes
- :type job_data: JobData object
- :return: True if succesful, None otherwise
- :rtype: Boolean - None
- """
- try:
- if self.conn:
- sql = ''' UPDATE job_data SET energy=?, modified=?, MaxRSS=?, AveRSS=?, rowstatus=? WHERE id=? '''
- cur = self.conn.cursor()
- cur.execute(sql, (job_data.energy, datetime.today().strftime(
- '%Y-%m-%d-%H:%M:%S'), job_data.MaxRSS, job_data.AveRSS, RowStatus.PROCESSED, job_data._id))
- # self.conn.commit()
- return True
- return None
- except sqlite3.Error as e:
- if _debug == True:
- print(traceback.format_exc())
- Log.info(traceback.format_exc())
- Log.warning("Error on Insert : {}".format(str(type(e).__name__)))
- return None
-
- def _update_experiment_run(self, experiment_run):
- """Updates experiment run row by run_id (finish, chunk_unit, chunk_size, completed, total, failed, queuing, running, submitted)
-
- :param experiment_run: Object representation of experiment run row
- :type experiment_run: ExperimentRun object
-
- :return: None
- """
- try:
- if self.conn:
- sql = ''' UPDATE experiment_run SET finish=?, chunk_unit=?, chunk_size=?, completed=?, total=?, failed=?, queuing=?, running=?, submitted=?, suspended=? WHERE run_id=? '''
- cur = self.conn.cursor()
- cur.execute(sql, (experiment_run.finish, experiment_run.chunk_unit, experiment_run.chunk_size,
- experiment_run.completed, experiment_run.total, experiment_run.failed, experiment_run.queuing, experiment_run.running, experiment_run.submitted, experiment_run.suspended, experiment_run.run_id))
- self.conn.commit()
- return cur.lastrowid
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on update experiment_run : " +
- str(type(e).__name__))
- return None
-
- def _insert_job_data(self, jobdata):
- """
- Inserts a new job_data register.
- :param jobdata: JobData object
- """
- try:
- if self.conn:
- #print("preparing to insert")
- sql = ''' INSERT INTO job_data(counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) '''
- tuplerow = (jobdata.counter, jobdata.job_name, jobdata.created, jobdata.modified, jobdata.submit, jobdata.start,
- jobdata.finish, jobdata.status, jobdata.rowtype, jobdata.ncpus, jobdata.wallclock, jobdata.qos, jobdata.energy, jobdata.date, jobdata.section, jobdata.member, jobdata.chunk, jobdata.last, jobdata.platform, jobdata.job_id, jobdata.extra_data, jobdata.nnodes, jobdata.run_id, jobdata.MaxRSS, jobdata.AveRSS, jobdata.out, jobdata.err, jobdata.rowstatus)
- cur = self.conn.cursor()
- #print("pre insert")
- cur.execute(sql, tuplerow)
- self.conn.commit()
- # print("Inserted " + str(jobdata.job_name))
- return cur.lastrowid
- else:
- #print("Not a valid connection.")
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on Insert : " + str(type(e).__name__) +
- "\t " + str(jobdata.job_name) + "\t" + str(jobdata.counter))
- return None
-
- def _insert_experiment_run(self, experiment_run):
- """
- Inserts a new experiment_run register.
- :param experiment_run: ExperimentRun object
- """
- try:
- if self.conn:
- #print("preparing to insert")
- sql = ''' INSERT INTO experiment_run(created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted,suspended,metadata) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?) '''
- tuplerow = (experiment_run.created, experiment_run.start, experiment_run.finish, experiment_run.chunk_unit, experiment_run.chunk_size, experiment_run.completed,
- experiment_run.total, experiment_run.failed, experiment_run.queuing, experiment_run.running, experiment_run.submitted, experiment_run.suspended, experiment_run.metadata)
- cur = self.conn.cursor()
- cur.execute(sql, tuplerow)
- self.conn.commit()
- return cur.lastrowid
- else:
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- print(traceback.format_exc())
- Log.warning("Error on insert on experiment_run: {0}".format(
- str(type(e).__name__)))
- return None
-
- def _get_all_job_data(self):
- """
- Get all registers from job_data.
- :return: row content:
- :rtype: 23-tuple
- """
- try:
- #conn = create_connection(path)
- if self.conn:
- self.conn.text_factory = str
- cur = self.conn.cursor()
- cur.execute(
- "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data")
- rows = cur.fetchall()
- return rows
- else:
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on Select : " + str(type(e).__name__))
- return list()
-
- def _get_current_job_data(self, run_id):
- """
- Get job data for a current run.
- """
- try:
- if self.conn:
- self.conn.text_factory = str
- cur = self.conn.cursor()
- cur.execute("SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus from job_data WHERE run_id=? and last=1 and finish > 0 and rowtype >= 2 ORDER BY id", (run_id,))
- rows = cur.fetchall()
- if len(rows) > 0:
- return rows
- else:
- return None
- except sqlite3.Error as e:
- if _debug == True:
- print(traceback.format_exc())
- print("Error on select job data: {0}".format(
- str(type(e).__name__)))
- return None
-
- def _get_current_last_job_data(self, run_id):
- """
- Get job data for a current run.
- """
- try:
- if self.conn:
- self.conn.text_factory = str
- cur = self.conn.cursor()
- cur.execute("SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus from job_data WHERE run_id=? and last=1 and rowtype >= 2 ORDER BY id", (run_id,))
- rows = cur.fetchall()
- if len(rows) > 0:
- return rows
- else:
- return None
- except Exception as exp:
- if _debug == True:
- print(traceback.format_exc())
- print("Historical database error: {0}".format(str(exp)))
- return None
-
- def _get_job_data(self, job_name):
- """
- Returns rows belonging to a job_name
- """
- try:
- if self.conn:
- self.conn.text_factory = str
- cur = self.conn.cursor()
- cur.execute(
- "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data WHERE job_name=? ORDER BY counter DESC", (job_name,))
- rows = cur.fetchall()
- # print(rows)
- return rows
- else:
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on Select : " + str(type(e).__name__))
- return None
-
- def _get_job_data_last(self, job_name):
- """
- Returns the latest rows (last = 1) for a job_name.
-
- :param job_name: Name of the requested job
- :type job_name: str
- :return: Rows from historical database
- :rtype: list of tuples
- """
- try:
- if self.conn:
- self.conn.text_factory = str
- cur = self.conn.cursor()
- cur.execute(
- "SELECT id, counter, job_name, created, modified, submit, start, finish, status, rowtype, ncpus, wallclock, qos, energy, date, section, member, chunk, last, platform, job_id, extra_data, nnodes, run_id, MaxRSS, AveRSS, out, err, rowstatus FROM job_data WHERE last=1 and job_name=? ORDER BY counter DESC", (job_name,))
- rows = cur.fetchall()
- if rows and len(rows) > 0:
- return rows
- else:
- raise Exception(
- "Historical database error: Valid last row not found for job {}.".format(job_name))
- else:
- raise Exception(
- "Historical database error: Connection not found when requesting information from job {}.".format(job_name))
- except Exception as exp:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Historical database error on select : " + str(exp))
- return None
-
- def _set_pragma_version(self, version=2):
- """Sets current version of the schema
-
- :param version: Current Version. Defaults to 1.
- :type version: (int, optional)
- :return: current version, None
- :rtype: (int, None)
- """
- try:
- if self.conn:
- self.conn.text_factory = str
- cur = self.conn.cursor()
- # print("Setting version")
- cur.execute("pragma user_version={v:d};".format(v=version))
- self.conn.commit()
- return True
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on version : " + str(type(e).__name__))
- return None
-
- def _select_pragma_version(self):
- """[summary]
- """
- try:
- if self.conn:
- self.conn.text_factory = str
- cur = self.conn.cursor()
- cur.execute("pragma user_version;")
- rows = cur.fetchall()
- if len(rows) > 0:
- # print(rows)
- #print("Row " + str(rows[0]))
- result, = rows[0]
- # print(result)
- return int(result) if result >= 0 else None
- else:
- # Starting value
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error while retrieving version: " +
- str(type(e).__name__))
- return None
-
- def _get_maxcounter_jobdata(self):
- """Return the maxcounter of the experiment
-
- Returns:
- [type]: [description]
- """
- try:
- if self.conn:
- self.conn.text_factory = str
- cur = self.conn.cursor()
- cur.execute("SELECT MAX(counter) as maxcounter FROM job_data")
- rows = cur.fetchall()
- if len(rows) > 0:
- #print("Row " + str(rows[0]))
- result, = rows[0]
- return int(result) if result else 1
- else:
- # Starting value
- return 1
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on Select Max : " + str(type(e).__name__))
- return None
-
- def _get_max_id_experiment_run(self):
- """Return the max id from experiment_run
-
- :return: max run_id, None
- :rtype: int, None
- """
- try:
- if self.conn:
- self.conn.text_factory = str
- cur = self.conn.cursor()
- cur.execute(
- "SELECT run_id,created,start,finish,chunk_unit,chunk_size,completed,total,failed,queuing,running,submitted, suspended, metadata from experiment_run ORDER BY run_id DESC LIMIT 0, 1")
- rows = cur.fetchall()
- if len(rows) > 0:
- return rows[0]
- else:
- return None
- return None
- except sqlite3.Error as e:
- if _debug == True:
- Log.info(traceback.format_exc())
- Log.debug(traceback.format_exc())
- Log.warning("Error on select max run_id : " +
- str(type(e).__name__))
- return None
diff --git a/autosubmit/helpers/__init__.py b/autosubmit/helpers/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/autosubmit/helpers/autosubmit_helper.py b/autosubmit/helpers/autosubmit_helper.py
new file mode 100644
index 0000000000000000000000000000000000000000..8164168060af9506b7f4047b96a9760425428fe2
--- /dev/null
+++ b/autosubmit/helpers/autosubmit_helper.py
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+
+# Copyright 2015-2020 Earth Sciences Department, BSC-CNS
+
+# This file is part of Autosubmit.
+
+# Autosubmit is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# Autosubmit is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with Autosubmit. If not, see .
+
+from log.log import AutosubmitCritical, Log
+from time import sleep
+from autosubmit.config.basicConfig import BasicConfig
+from autosubmit.config.config_common import AutosubmitConfig
+from autosubmit.history.experiment_history import ExperimentHistory
+from autosubmit.database.db_common import check_experiment_exists
+import datetime
+import sys
+from typing import List
+
+def handle_start_time(start_time):
+ # type: (str) -> None
+ """ Wait until the supplied time. """
+ if start_time:
+ Log.info("User provided starting time has been detected.")
+ # current_time = time()
+ datetime_now = datetime.datetime.now()
+ target_date = parsed_time = None
+ try:
+ # Trying first parse H:M:S
+ parsed_time = datetime.datetime.strptime(start_time, "%H:%M:%S")
+ target_date = datetime.datetime(datetime_now.year, datetime_now.month,
+ datetime_now.day, parsed_time.hour, parsed_time.minute, parsed_time.second)
+ except:
+ try:
+ # Trying second parse y-m-d H:M:S
+ target_date = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
+ except:
+ target_date = None
+ Log.critical(
+ "The string input provided as the starting time of your experiment must have the format 'H:M:S' or 'yyyy-mm-dd H:M:S'. Your input was '{0}'.".format(start_time))
+ return
+ # Must be in the future
+ if (target_date < datetime.datetime.now()):
+ Log.critical("You must provide a valid date into the future. Your input was interpreted as '{0}', which is considered past.\nCurrent time {1}.".format(
+ target_date.strftime("%Y-%m-%d %H:%M:%S"), datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
+ return
+ # Starting waiting sequence
+ Log.info("Your experiment will start execution on {0}\n".format(target_date.strftime("%Y-%m-%d %H:%M:%S")))
+ # Check time every second
+ while datetime.datetime.now() < target_date:
+ elapsed_time = target_date - datetime.datetime.now()
+ sys.stdout.write("\r{0} until execution starts".format(elapsed_time))
+ sys.stdout.flush()
+ sleep(1)
+
+def handle_start_after(start_after, expid, BasicConfig):
+ # type: (str, str, BasicConfig) -> None
+ """ Wait until the start_after experiment has finished."""
+ if start_after:
+ Log.info("User provided expid completion trigger has been detected.")
+ # The user tries to be tricky
+ if str(start_after) == str(expid):
+ Log.info(
+ "Hey! What do you think is going to happen? In theory, your experiment will run again after it has been completed. Good luck!")
+ # Check if experiment exists. If False or None, it does not exist
+ if not check_experiment_exists(start_after):
+ return None
+ # Historical Database: We use the historical database to retrieve the current progress data of the supplied expid (start_after)
+ exp_history = ExperimentHistory(start_after, jobdata_dir_path=BasicConfig.JOBDATA_DIR, historiclog_dir_path=BasicConfig.HISTORICAL_LOG_DIR)
+ if exp_history.is_header_ready() == False:
+ Log.critical("Experiment {0} is running a database version which is not supported by the completion trigger function. An updated DB version is needed.".format(
+ start_after))
+ return
+ Log.info("Autosubmit will start monitoring experiment {0}. When the number of completed jobs plus suspended jobs becomes equal to the total number of jobs of experiment {0}, experiment {1} will start. Querying every 60 seconds. Status format Completed/Queuing/Running/Suspended/Failed.".format(
+ start_after, expid))
+ while True:
+ # Query current run
+ current_run = exp_history.manager.get_experiment_run_dc_with_max_id()
+ if current_run and current_run.finish > 0 and current_run.total > 0 and current_run.completed + current_run.suspended == current_run.total:
+ break
+ else:
+ sys.stdout.write(
+ "\rExperiment {0} ({1} total jobs) status {2}/{3}/{4}/{5}/{6}".format(start_after, current_run.total, current_run.completed, current_run.queuing, current_run.running, current_run.suspended, current_run.failed))
+ sys.stdout.flush()
+ # Update every 60 seconds
+ sleep(60)
+
+def get_allowed_members(run_members, as_conf):
+ # type: (str, AutosubmitConfig) -> List
+ if run_members:
+ allowed_members = run_members.split()
+ rmember = [rmember for rmember in allowed_members if rmember not in as_conf.get_member_list()]
+ if len(rmember) > 0:
+ raise AutosubmitCritical("Some of the members ({0}) in the list of allowed members you supplied do not exist in the current list " +
+ "of members specified in the conf files.\nCurrent list of members: {1}".format(str(rmember), str(as_conf.get_member_list())))
+ if len(allowed_members) == 0:
+ raise AutosubmitCritical("Not a valid -rm --run_members input: {0}".format(str(run_members)))
+ return allowed_members
+ return None
\ No newline at end of file
diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py
index bf3921305d550712e06372aa9fbc96d50e1ac611..0daf49c191b3c48aa3918607555b812580d9e52a 100644
--- a/autosubmit/history/experiment_history.py
+++ b/autosubmit/history/experiment_history.py
@@ -187,10 +187,11 @@ class ExperimentHistory():
""" Detect status differences between job_list and current job_data rows, and update. Creates a new run if necessary. """
try:
current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id()
- update_these_changes = self._get_built_list_of_changes(job_list)
- if len(update_these_changes) > 0:
+ update_these_changes = self._get_built_list_of_changes(job_list)
+ should_create_new_run = self.should_we_create_a_new_run(job_list, len(update_these_changes), current_experiment_run_dc, chunk_unit, chunk_size)
+ if len(update_these_changes) > 0 and should_create_new_run == False:
self.manager.update_many_job_data_change_status(update_these_changes)
- if self.should_we_create_a_new_run(job_list, len(update_these_changes), current_experiment_run_dc, chunk_unit, chunk_size):
+ if should_create_new_run:
return self.create_new_experiment_run(chunk_unit, chunk_size, current_config, job_list)
return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list)
except Exception as exp:
diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py
index 66bd13255bd525ebcbf70b67e2180d5de25b4523..d65b2d491bbba600aa8f97100e3c6601fb841226 100644
--- a/autosubmit/job/job.py
+++ b/autosubmit/job/job.py
@@ -29,6 +29,7 @@ import datetime
import textwrap
from collections import OrderedDict
import copy
+import traceback
from bscearth.utils.config_parser import ConfigParserFactory
@@ -88,11 +89,11 @@ class Job(object):
self._platform = None
self._queue = None
self.retry_delay = 0
- self.platform_name = None
- self.section = None
- self.wallclock = None
+ self.platform_name = None # type: str
+ self.section = None # type: str
+ self.wallclock = None # type: str
self.wchunkinc = None
- self.tasks = '0'
+ self.tasks = '0'
self.threads = '1'
self.processors = '1'
self.memory = ''
@@ -130,7 +131,7 @@ class Job(object):
self._parents = set()
self._children = set()
self.fail_count = 0
- self.expid = name.split('_')[0]
+ self.expid = name.split('_')[0] # type: str
self.parameters = dict()
self._tmp_path = os.path.join(
BasicConfig.LOCAL_ROOT_DIR, self.expid, BasicConfig.LOCAL_TMP_DIR)
@@ -139,7 +140,7 @@ class Job(object):
self.check = 'true'
self.check_warnings = False
self.packed = False
- self.hold = False
+ self.hold = False # type: bool
self.distance_weight = 0
self.level = 0
self.export = "none"
@@ -603,6 +604,7 @@ class Job(object):
Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(
e.message, self.name), 6001)
return
+
@threaded
def retrieve_logfiles(self, copy_remote_logs, local_logs, remote_logs, expid, platform_name,fail_count = 0):
max_logs = 0
@@ -762,6 +764,7 @@ class Job(object):
except BaseException as e:
pass
return
+
def update_status(self, copy_remote_logs=False, failed_file=False):
"""
Updates job status, checking COMPLETED file if needed
@@ -820,6 +823,8 @@ class Job(object):
if previous_status != Status.RUNNING and self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN,
Status.RUNNING]:
self.write_start_time()
+ if previous_status == Status.HELD and self.status in [Status.SUBMITTED, Status.QUEUING, Status.RUNNING]:
+ self.write_submit_time()
# Updating logs
if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]:
# New thread, check if file exist
@@ -1246,10 +1251,15 @@ class Job(object):
str(set(parameters) - set(variables))), 6013)
return out
- def write_submit_time(self,enabled = False):
+ def write_submit_time(self, enabled=False, hold=False):
+ # type: (bool, bool) -> None
"""
- Writes submit date and time to TOTAL_STATS file
+ Writes submit date and time to TOTAL_STATS file. It doesn't write if hold == True.
"""
+ # print(traceback.format_stack())
+ print("Call from {} with status {}".format(self.name, self.status_str))
+ if hold == True:
+ return # Do not write for HELD jobs.
data_time = ["",time.time()]
if self.wrapper_type != "vertical" or enabled:
path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS')
@@ -1375,9 +1385,9 @@ class Job(object):
Writes all data to TOTAL_STATS file
:param total_stats: data gathered by the wrapper
:type completed: str
- """
+ """
if first_retrial:
- self.write_submit_time(True)
+ self.write_submit_time(enabled=True)
path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS')
f = open(path, 'a')
if first_retrial:
@@ -1624,12 +1634,12 @@ class WrapperJob(Job):
if reason == '(JobHeldUser)':
if self.hold is False:
# SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS
- self._platform.send_command(
- "scontrol release " + "{0}".format(self.id))
- self.status = Status.QUEUING
+ self._platform.send_command("scontrol release " + "{0}".format(self.id))
+ self.new_status = Status.QUEUING
for job in self.job_list:
job.hold = self.hold
- job.status = self.status
+ job.new_status = Status.QUEUING
+ job.update_status(self.as_config.get_copy_remote_logs() == 'true')
Log.info("Job {0} is QUEUING {1}", self.name, reason)
else:
self.status = Status.HELD
diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py
index 1cb1dd4e0b1d65437016d1992e0ce6483a5ed1ef..cf9cf94de3ed20e8a7c5d2af002e3377a1e8b9ea 100644
--- a/autosubmit/job/job_list.py
+++ b/autosubmit/job/job_list.py
@@ -41,6 +41,8 @@ from log.log import AutosubmitCritical, AutosubmitError, Log
from threading import Thread, Lock
import multiprocessing
from autosubmit.config.basicConfig import BasicConfig
+from autosubmit.config.config_common import AutosubmitConfig
+from typing import List, Dict
import log.fd_show
# Log.get_logger("Log.Autosubmit")
@@ -1309,6 +1311,7 @@ class JobList(object):
return jobs_by_section
def get_in_queue_grouped_id(self, platform):
+ # type: (object) -> Dict[int, List[Job]]
jobs = self.get_in_queue(platform)
jobs_by_id = dict()
for job in jobs:
@@ -1425,7 +1428,7 @@ class JobList(object):
try:
self._persistence.save(self._persistence_path,
self._persistence_file, self._job_list if self.run_members is None or job_list is None else job_list)
- Log.info("_persistence end: {0}".format(log.fd_show.fd_table_status_str()))
+ # Log.info("_persistence end: {0}".format(log.fd_show.fd_table_status_str()))
pass
except BaseException as e:
raise AutosubmitError(e.message,6040,"Failure while saving the job_list")
@@ -1519,6 +1522,7 @@ class JobList(object):
self._parameters = value
def update_list(self, as_conf, store_change=True, fromSetStatus=False, submitter=None, first_time=False):
+ # type: (AutosubmitConfig, bool, bool, object, bool) -> bool
"""
Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED
@@ -1681,7 +1685,7 @@ class JobList(object):
job.hold = hold_wrapper
if not job.hold:
for inner_job in job.job_list:
- inner_job.hold = False
+ inner_job.hold = False
Log.debug(
"Setting job: {0} status to: Queuing (all parents completed)...".format(
job.name))
@@ -1689,7 +1693,7 @@ class JobList(object):
tmp = [
parent for parent in job.parents if parent.status == Status.COMPLETED]
if len(tmp) == len(job.parents):
- job.hold = False
+ job.hold = False
Log.debug(
"Setting job: {0} status to: Queuing (all parents completed)...".format(
job.name))
diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py
index 273c660b041d6384ccf831ce0ec5b3a2f6b93f13..4c54921f06eb3c27b535de5b978fbe7c8a39fbb5 100644
--- a/autosubmit/job/job_packager.py
+++ b/autosubmit/job/job_packager.py
@@ -21,11 +21,12 @@ from log.log import Log, AutosubmitCritical, AutosubmitError
from autosubmit.job.job_common import Status, Type
from bscearth.utils.date import sum_str_hours
from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal, \
- JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal
+ JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal, JobPackageBase
from operator import attrgetter
from math import ceil
import operator
from collections import defaultdict
+from typing import List
class JobPackager(object):
@@ -151,6 +152,7 @@ class JobPackager(object):
job.distance_weight = job.distance_weight - 1
def build_packages(self):
+ # type: () -> List[JobPackageBase]
"""
Returns the list of the built packages to be submitted
diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py
index 8d948d90bb481fae119b4f37a7c52532d9998ae3..0ea1feadc0b1419282ae9bbd13e284286dc57de2 100644
--- a/autosubmit/job/job_packages.py
+++ b/autosubmit/job/job_packages.py
@@ -35,7 +35,8 @@ Log.get_logger("Autosubmit")
from autosubmit.job.job_exceptions import WrongTemplateException
from autosubmit.job.job import Job
from bscearth.utils.date import sum_str_hours,date2str
-from threading import Thread,Lock
+from threading import Thread, Lock
+from typing import List
import multiprocessing
import tarfile
import datetime
@@ -54,10 +55,11 @@ class JobPackageBase(object):
"""
def __init__(self, jobs):
- self._jobs = jobs
- self._expid = jobs[0].expid
- self.hold = False
- self.export = jobs[0].export
+ # type: (List[Job]) -> None
+ self._jobs = jobs # type: List[Job]
+ self._expid = jobs[0].expid # type: str
+ self.hold = False # type: bool
+ self.export = jobs[0].export
self.x11 = jobs[0].x11
try:
self._tmp_path = jobs[0]._tmp_path
@@ -74,6 +76,7 @@ class JobPackageBase(object):
@property
def jobs(self):
+ # type: () -> List[Job]
"""
Returns the jobs
@@ -193,10 +196,10 @@ class JobPackageBase(object):
raise Exception('Not implemented')
def _send_files(self):
- raise Exception('Not implemented')
+ """ Send local files to the platform. """
def _do_submission(self,job_scripts=None, hold=False):
- raise Exception('Not implemented')
+ """ Submit package to the platform. """
@@ -235,8 +238,8 @@ class JobPackageSimple(JobPackageBase):
if job.id is None:
continue
Log.info("{0} submitted", job.name)
- job.status = Status.SUBMITTED
- job.write_submit_time()
+ job.status = Status.SUBMITTED
+ job.write_submit_time(hold=self.hold)
class JobPackageSimpleWrapped(JobPackageSimple):
@@ -325,8 +328,8 @@ class JobPackageArray(JobPackageBase):
for i in xrange(0, len(self.jobs)):
Log.info("{0} submitted", self.jobs[i].name)
self.jobs[i].id = str(package_id) + '[{0}]'.format(i)
- self.jobs[i].status = Status.SUBMITTED
- self.jobs[i].write_submit_time()
+ self.jobs[i].status = Status.SUBMITTED
+ self.jobs[i].write_submit_time(hold=hold)
class JobPackageThread(JobPackageBase):
@@ -471,8 +474,8 @@ class JobPackageThread(JobPackageBase):
for i in xrange(0, len(self.jobs) ):
Log.info("{0} submitted", self.jobs[i].name)
self.jobs[i].id = str(package_id)
- self.jobs[i].status = Status.SUBMITTED
- self.jobs[i].write_submit_time()
+ self.jobs[i].status = Status.SUBMITTED
+ self.jobs[i].write_submit_time(hold=hold)
def _common_script_content(self):
pass
@@ -549,8 +552,8 @@ class JobPackageThreadWrapped(JobPackageThread):
for i in xrange(0, len(self.jobs)):
Log.info("{0} submitted", self.jobs[i].name)
self.jobs[i].id = str(package_id)
- self.jobs[i].status = Status.SUBMITTED
- self.jobs[i].write_submit_time()
+ self.jobs[i].status = Status.SUBMITTED
+ self.jobs[i].write_submit_time(hold=hold)
class JobPackageVertical(JobPackageThread):
"""
Class to manage a vertical thread-based package of jobs to be submitted by autosubmit
diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py
index e06a4a9863a8f66d974df00cfef8c4bd6547887d..2d031ee90ae91268f6c6706595a05ea23017e5aa 100644
--- a/autosubmit/platforms/paramiko_platform.py
+++ b/autosubmit/platforms/paramiko_platform.py
@@ -140,6 +140,7 @@ class ParamikoPlatform(Platform):
except Exception as e:
raise AutosubmitCritical(
'Cant connect to this platform due an unknown error', 7050, e.message)
+
def threaded(fn):
def wrapper(*args, **kwargs):
thread = Thread(target=fn, args=args, kwargs=kwargs)
@@ -147,10 +148,6 @@ class ParamikoPlatform(Platform):
return thread
return wrapper
-
-
-
-
def connect(self, reconnect=False):
"""
Creates ssh connection to host
@@ -378,7 +375,7 @@ class ParamikoPlatform(Platform):
os.path.join(self.get_files_path(), src)), 5001)
return False
- def submit_job(self, job, script_name, hold=False,export="none"):
+ def submit_job(self, job, script_name, hold=False, export="none"):
"""
Submit a job from a given job object.
@@ -580,8 +577,8 @@ class ParamikoPlatform(Platform):
job.new_status = Status.HELD
if not job.hold:
# SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS
- self.send_command(
- "scontrol release {0}".format(job.id))
+ self.send_command("scontrol release {0}".format(job.id))
+ job.new_status = Status.QUEUING # If it was HELD and was released, it should be QUEUING next.
else:
pass
# This shouldn't happen anymore TODO delete
@@ -1036,6 +1033,7 @@ class ParamikoPlatform(Platform):
return False
except Exception as e:
return False
+
def check_remote_permissions(self):
try:
path = os.path.join(self.scratch, self.project, self.user, "permission_checker_azxbyc")
@@ -1048,6 +1046,7 @@ class ParamikoPlatform(Platform):
return True
except:
return False
+
def check_remote_log_dir(self):
"""
Creates log dir on remote host
diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py
index 11b3013739a2bd63b2bd779578fa4675132ce6e1..c2ccf3575af60222f7fe0dcee31e733f735b193f 100644
--- a/autosubmit/platforms/platform.py
+++ b/autosubmit/platforms/platform.py
@@ -3,6 +3,7 @@ import os
from log.log import Log
import traceback
from autosubmit.job.job_common import Status
+from typing import List, Union
class Platform(object):
@@ -12,13 +13,12 @@ class Platform(object):
def __init__(self, expid, name, config):
"""
-
:param config:
:param expid:
:param name:
"""
- self.expid = expid
- self.name = name
+ self.expid = expid # type: str
+ self.name = name # type: str
self.config = config
self.tmp_path = os.path.join(
self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR)
@@ -209,8 +209,8 @@ class Platform(object):
:type remote_logs: (str, str)
"""
(job_out_filename, job_err_filename) = remote_logs
- self.get_files([job_out_filename, job_err_filename],
- False, 'LOG_{0}'.format(exp_id))
+ self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id))
+
def get_stat_file(self, exp_id, job_name):
"""
Get the given stat files for all retrials
@@ -219,8 +219,8 @@ class Platform(object):
:param remote_logs: names of the log files
:type remote_logs: (str, str)
"""
-
self.get_files(job_name,False, 'LOG_{0}'.format(exp_id))
+
def get_completed_files(self, job_name, retries=0, recovery=False, wrapper_failed=False):
"""
Get the COMPLETED file of the given job
@@ -260,6 +260,7 @@ class Platform(object):
Log.debug('{0}_STAT have been removed', job_name)
return True
return False
+
def remove_stat_file_by_retrials(self, job_name):
"""
Removes *STAT* files from remote
@@ -273,6 +274,7 @@ class Platform(object):
if self.delete_file(filename):
return True
return False
+
def remove_completed_file(self, job_name):
"""
Removes *COMPLETED* files from remote
@@ -330,6 +332,7 @@ class Platform(object):
return True
else:
return False
+
def get_stat_file_by_retrials(self, job_name, retries=0):
"""
Copies *STAT* files from remote to local
@@ -458,3 +461,15 @@ class Platform(object):
Log.warning(
"Autosubmit has not written extra information into the .out log.")
pass
+
+ def open_submit_script(self):
+ # type: () -> None
+ """ Opens Submit script file """
+ raise NotImplementedError
+
+ def submit_Script(self, hold=False):
+ # type: (bool) -> Union[List[str], str]
+ """
+ Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once.
+ """
+ raise NotImplementedError
\ No newline at end of file
diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py
index e16e3284645593f740a2ba9b04b5bef124d4a4f9..b7ba088fc884942b2fdf6e40f3a917a1a20716de 100644
--- a/autosubmit/platforms/slurmplatform.py
+++ b/autosubmit/platforms/slurmplatform.py
@@ -22,6 +22,7 @@ from time import sleep
from time import mktime
from time import time
from datetime import datetime
+from typing import List, Union
import traceback
from xml.dom.minidom import parseString
@@ -72,6 +73,7 @@ class SlurmPlatform(ParamikoPlatform):
return os.path.join(self.config.LOCAL_ASLOG_DIR, os.path.basename(self._submit_script_path))
def submit_Script(self, hold=False):
+ # type: (bool) -> Union[List[str], str]
"""
Sends a Submit file Script, execute it in the platform and retrieves the Jobs_ID of all jobs at once.
diff --git a/requeriments.txt b/requeriments.txt
index b2086a3f1f2e244f3a258341f127bfc83f1369a2..561b0fd99c405c3bdf23a8a300793947d9ed6b8c 100644
--- a/requeriments.txt
+++ b/requeriments.txt
@@ -8,5 +8,6 @@ paramiko==1.15
mock>=1.3.0
portalocker==0.5.7
networkx
+typing
bscearth.utils
cryptography==3.3.2
\ No newline at end of file
diff --git a/setup.py b/setup.py
index 730705536631c787c9e8e32de4e7081a52ee391a..aa62f3920cd281bee099a4b476711211b810149d 100644
--- a/setup.py
+++ b/setup.py
@@ -40,7 +40,7 @@ setup(
download_url='https://earth.bsc.es/wiki/doku.php?id=tools:autosubmit',
keywords=['climate', 'weather', 'workflow', 'HPC'],
install_requires=['argparse>=1.2,<2', 'python-dateutil>2', 'pydotplus>=2', 'pyparsing>=2.0.1',
- 'numpy', 'matplotlib', 'paramiko == 2.7.1',
+ 'numpy', 'matplotlib', 'typing', 'paramiko == 2.7.1',
'mock>=1.3.0', 'portalocker==0.5.7', 'networkx', 'bscearth.utils', 'Xlib == 0.21'],
extras_require={
'dialog': ["python2-pythondialog>=3.3.0"]