diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 145c9072ab1947ca8491834eddbe910cf67fab01..550f8952f3a8bc9b2230f235e30e5cc1551cda50 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -4,4 +4,5 @@ test:
- ~/venv/autosubmit
script:
- source ~/venv/autosubmit/bin/activate
- - nosetests test/unit
\ No newline at end of file
+ - nosetests test/unit
+ - nosetests test/integration
\ No newline at end of file
diff --git a/CHANGELOG b/CHANGELOG
index 394ac5c57faf71e58c1edb7d541c012627a62159..3ad783995b633365d5a5142fc2bcf325ac3dab80 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,12 @@
+3.7.4
+ Forward dependencies
+ Performance improvements
+ Log files copied into LOCAL platform
+ PROCESSORS_PER_NODE/TASKS now optional
+ Exclusivity for MN3 (with Paramiko)
+ THREADS optional for ECMWF
+ Minor bugfixes
+
3.7.3
Fixed error with logs directives (err & out were swapped)
Added new option for MN3: SCRATCH_FREE_SPACE
diff --git a/VERSION b/VERSION
index c1e43e6d45b265239c6a1cbfd3ffcc13f46d4b14..0833a98f1405167316da97ef9843d8bfad51edf5 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.7.3
+3.7.4
diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py
index 4fd402c1c9918c62172167ea807bc36aaf83db0b..3a650e4a78e2b9796f196e6ae05237c1a0e17d8e 100644
--- a/autosubmit/autosubmit.py
+++ b/autosubmit/autosubmit.py
@@ -70,8 +70,8 @@ from job.job_list_persistence import JobListPersistencePkl
# noinspection PyPackageRequirements
from config.log import Log
from database.db_common import create_db
-from database.db_common import new_experiment
-from database.db_common import copy_experiment
+from experiment.experiment_common import new_experiment
+from experiment.experiment_common import copy_experiment
from database.db_common import delete_experiment
from database.db_common import get_autosubmit_version
from monitor.monitor import Monitor
@@ -148,7 +148,8 @@ class Autosubmit:
group.add_argument('-y', '--copy', help='makes a copy of the specified experiment')
group.add_argument('-dm', '--dummy', action='store_true',
help='creates a new experiment with default values, usually for testing')
-
+ group.add_argument('-op', '--operational', action='store_true',
+ help='creates a new experiment with operational experiment id')
subparser.add_argument('-H', '--HPC', required=True,
help='specifies the HPC to use for the experiment')
subparser.add_argument('-d', '--description', type=str, required=True,
@@ -322,7 +323,8 @@ class Autosubmit:
if args.command == 'run':
return Autosubmit.run_experiment(args.expid)
elif args.command == 'expid':
- return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy) != ''
+ return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy, False,
+ args.operational) != ''
elif args.command == 'delete':
return Autosubmit.delete(args.expid, args.force)
elif args.command == 'monitor':
@@ -404,10 +406,12 @@ class Autosubmit:
return ret
@staticmethod
- def expid(hpc, description, copy_id='', dummy=False, test=False):
+ def expid(hpc, description, copy_id='', dummy=False, test=False, operational=False):
"""
Creates a new experiment for given HPC
+ :param operational: if true, creates an operational experiment
+ :type operational: bool
:type hpc: str
:type description: str
:type copy_id: str
@@ -435,7 +439,7 @@ class Autosubmit:
Log.error("Missing HPC.")
return ''
if not copy_id:
- exp_id = new_experiment(description, Autosubmit.autosubmit_version, test)
+ exp_id = new_experiment(description, Autosubmit.autosubmit_version, test, operational)
if exp_id == '':
return ''
try:
@@ -468,7 +472,7 @@ class Autosubmit:
else:
try:
if os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, copy_id)):
- exp_id = copy_experiment(copy_id, description, Autosubmit.autosubmit_version, test)
+ exp_id = copy_experiment(copy_id, description, Autosubmit.autosubmit_version, test, operational)
if exp_id == '':
return ''
dir_exp_id = os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id)
@@ -1444,7 +1448,7 @@ class Autosubmit:
Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, BasicConfig.LOCAL_TMP_DIR,
'refresh.log'))
as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory())
- if not as_conf.check_conf_files():
+ if not as_conf.check_expdef_conf():
Log.critical('Can not copy with invalid configuration')
return False
project_type = as_conf.get_project_type()
diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py
index da5f52ba1267dd7918783f0a6f85e4b66d53f340..83fc98a56848348a04a8a67cf8eb62230e1b9106 100644
--- a/autosubmit/config/config_common.py
+++ b/autosubmit/config/config_common.py
@@ -195,17 +195,17 @@ class AutosubmitConfig:
"""
Log.info('\nChecking configuration files...')
self.reload()
- result = self._check_autosubmit_conf()
- result = result and self._check_platforms_conf()
- result = result and self._check_jobs_conf()
- result = result and self._check_expdef_conf()
+ result = self.check_autosubmit_conf()
+ result = result and self.check_platforms_conf()
+ result = result and self.check_jobs_conf()
+ result = result and self.check_expdef_conf()
if result:
Log.result("Configuration files OK\n")
else:
Log.error("Configuration files invalid\n")
return result
- def _check_autosubmit_conf(self):
+ def check_autosubmit_conf(self):
"""
Checks experiment's autosubmit configuration file.
@@ -236,7 +236,7 @@ class AutosubmitConfig:
Log.info('{0} OK'.format(os.path.basename(self._conf_parser_file)))
return result
- def _check_platforms_conf(self):
+ def check_platforms_conf(self):
"""
Checks experiment's queues configuration file.
@@ -274,7 +274,7 @@ class AutosubmitConfig:
Log.info('{0} OK'.format(os.path.basename(self._platforms_parser_file)))
return result
- def _check_jobs_conf(self):
+ def check_jobs_conf(self):
"""
Checks experiment's jobs configuration file.
@@ -303,8 +303,12 @@ class AutosubmitConfig:
for dependency in str(AutosubmitConfig.get_option(parser, section, 'DEPENDENCIES', '')).split(' '):
if '-' in dependency:
dependency = dependency.split('-')[0]
+ elif '+' in dependency:
+ dependency = dependency.split('+')[0]
if dependency not in sections:
- Log.error('Job {0} depends on job {1} that is not defined'.format(section, dependency))
+ Log.error(
+ 'Job {0} depends on job {1} that is not defined. It will be ignored.'.format(section,
+ dependency))
if parser.has_option(section, 'RERUN_DEPENDENCIES'):
for dependency in str(AutosubmitConfig.get_option(parser, section, 'RERUN_DEPENDENCIES',
@@ -312,7 +316,9 @@ class AutosubmitConfig:
if '-' in dependency:
dependency = dependency.split('-')[0]
if dependency not in sections:
- Log.error('Job {0} depends on job {1} that is not defined'.format(section, dependency))
+ Log.error(
+ 'Job {0} depends on job {1} that is not defined. It will be ignored.'.format(section,
+ dependency))
result = result and AutosubmitConfig.check_is_choice(parser, section, 'RUNNING', False,
['once', 'date', 'member', 'chunk'])
@@ -323,7 +329,7 @@ class AutosubmitConfig:
return result
- def _check_expdef_conf(self):
+ def check_expdef_conf(self):
"""
Checks experiment's experiment configuration file.
diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf
index 129be0fda6df6ec57843afa16ab7fb3f8fb6dd9f..84531137e6d5c6781d4293e2bccbc964449ede12 100644
--- a/autosubmit/config/files/platforms.conf
+++ b/autosubmit/config/files/platforms.conf
@@ -2,38 +2,41 @@
## Platform name
# [PLATFORM]
-## Queue type. Options: PBS, SGE, PS, LSF, ecaccess, SLURM
+## Queue type. Options: PBS, SGE, PS, LSF, ecaccess, SLURM. Required
# TYPE =
## Version of queue manager to use. Needed only in PBS (options: 10, 11, 12) and ecaccess (options: pbs, loadleveler)
# VERSION =
-## Hostname of the HPC
+## Hostname of the HPC. Required
# HOST =
-## Project for the machine scheduler
+## Project for the machine scheduler. Required
# PROJECT =
## Budget account for the machine scheduler. If omitted, takes the value defined in PROJECT
# BUDGET =
-## Option to add project name to host. This is required for some HPCs
+## Option to add project name to host. This is required for some HPCs.
# ADD_PROJECT_TO_HOST = False
-## User for the machine scheduler
+## User for the machine scheduler. Required
# USER =
-## Path to the scratch directory for the machine
+## Path to the scratch directory for the machine. Required.
# SCRATCH_DIR = /scratch
-## If true, autosubmit test command can use this queue as a main queue. Defaults to false
+## If true, Autosubmit test command can use this queue as a main queue. Defaults to False
# TEST_SUITE = False
-## If given, autosubmit will add jobs to the given queue
+## If given, Autosubmit will add jobs to the given queue. Required for some platforms.
# QUEUE =
-## If specified, autosubmit will run jobs with only one processor in the specified platform.
+## Optional. If given, Autosubmit will submit the serial jobs with the exclusivity directive.
+# QUEUE =
+## Optional. If specified, autosubmit will run jobs with only one processor in the specified platform.
# SERIAL_PLATFORM = SERIAL_PLATFORM_NAME
-## If specified, autosubmit will run jobs with only one processor in the specified queue.
+## Optional. If specified, autosubmit will run jobs with only one processor in the specified queue.
## Autosubmit will ignore this configuration if SERIAL_PLATFORM is provided
# SERIAL_QUEUE = SERIAL_QUEUE_NAME
-## Default number of processors per node to be used in jobs
+## Optional. Default number of processors per node to be used in jobs
# PROCESSORS_PER_NODE =
-## Scratch free space requirements for the platform in percentage (%). If not specified, it won't be defined on the template.
-# SCRATCH_FREE_SPACE = 10
-## Default Maximum number of jobs to be waiting in any platform queue
+## Optional. Integer. Scratch free space requirements for the platform in percentage (%).
+## If not specified, it won't be defined on the template.
+# SCRATCH_FREE_SPACE =
+## Optional. Integer. Default Maximum number of jobs to be waiting in any platform queue
## Default = 3
-# MAX_WAITING_JOBS = 3
-## Default maximum number of jobs to be running at the same time at any platform
+# MAX_WAITING_JOBS =
+## Optional. Integer. Default maximum number of jobs to be running at the same time at any platform
## Default = 6
-# TOTAL_JOBS = 6
\ No newline at end of file
+# TOTAL_JOBS =
\ No newline at end of file
diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py
index 8fbcff0521f2c71dbf0129ee99f43648f9d266a8..d58df1888595290e4dc37dcd43e333e8b11b640f 100644
--- a/autosubmit/database/db_common.py
+++ b/autosubmit/database/db_common.py
@@ -22,7 +22,6 @@ Module containing functions to manage autosubmit's database.
"""
import os
import sqlite3
-import string
from autosubmit.config.log import Log
from autosubmit.config.basicConfig import BasicConfig
@@ -55,10 +54,85 @@ def create_db(qry):
return True
-def _set_experiment(name, description, version):
+def check_db():
+ """
+ Checks if database file exist
+
+ :return: None if exists, terminates program if not
+ """
+
+ if not os.path.exists(BasicConfig.DB_PATH):
+ Log.error('Some problem has happened...check the database file.' + 'DB file:' + BasicConfig.DB_PATH)
+ return False
+ return True
+
+
+def open_conn(check_version=True):
+ """
+ Opens a connection to database
+
+ :param check_version: If true, check if the database is compatible with this autosubmit version
+ :type check_version: bool
+ :return: connection object, cursor object
+ :rtype: sqlite3.Connection, sqlite3.Cursor
+ """
+ conn = sqlite3.connect(BasicConfig.DB_PATH)
+ cursor = conn.cursor()
+
+ # Getting database version
+ if check_version:
+ try:
+ cursor.execute('SELECT version '
+ 'FROM db_version;')
+ row = cursor.fetchone()
+ version = row[0]
+ except sqlite3.OperationalError:
+ # If this exception is thrown it's because db_version does not exist.
+ # Database is from 2.x or 3.0 beta releases
+ try:
+ cursor.execute('SELECT type '
+ 'FROM experiment;')
+ # If type field exists, it's from 2.x
+ version = -1
+ except sqlite3.Error:
+ # If raises and error , it's from 3.0 beta releases
+ version = 0
+
+ # If database version is not the expected, update database....
+ if version < CURRENT_DATABASE_VERSION:
+ if not _update_database(version, cursor):
+ raise DbException('Database version could not be updated')
+
+ # ... or ask for autosubmit upgrade
+ elif version > CURRENT_DATABASE_VERSION:
+ Log.critical('Database version is not compatible with this autosubmit version. Please execute pip install '
+ 'autosubmit --upgrade')
+ raise DbException('Database version not compatible')
+
+ return conn, cursor
+
+
+def close_conn(conn, cursor):
+ """
+ Commits changes and close connection to database
+
+ :param conn: connection to close
+ :type conn: sqlite3.Connection
+ :param cursor: cursor to close
+ :type cursor: sqlite3.Cursor
+ """
+ conn.commit()
+ cursor.close()
+ conn.close()
+ return
+
+
+def save_experiment(name, description, version):
"""
Stores experiment in database
+ :param version:
+ :type version: str
:param name: experiment's name
:type name: str
:param description: experiment's description
@@ -66,9 +140,6 @@ def _set_experiment(name, description, version):
"""
if not check_db():
return False
- name = check_name(name)
- if name == '':
- return False
try:
(conn, cursor) = open_conn()
except DbException as e:
@@ -101,10 +172,6 @@ def check_experiment_exists(name, error_on_inexistence=True):
"""
if not check_db():
return False
- name = check_name(name)
- if name == '':
- return False
-
try:
(conn, cursor) = open_conn()
except DbException as e:
@@ -156,133 +223,14 @@ def get_autosubmit_version(expid):
return row[0]
-def new_experiment(description, version, test=False):
- """
- Stores a new experiment on the database and generates its identifier
-
- :param version: autosubmit version associated to the experiment
- :type version: str
- :param test: flag for test experiments
- :type test: bool
- :param description: experiment's description
- :type description: str
- :return: experiment id for the new experiment
- :rtype: str
- """
- if test:
- last_exp_name = last_name_used(True)
- else:
- last_exp_name = last_name_used()
- if last_exp_name == '':
- return ''
- if last_exp_name == 'empty':
- if test:
- # test identifier restricted also to 4 characters.
- new_name = 't000'
- else:
- new_name = 'a000'
- else:
- new_name = _next_name(last_exp_name)
- if new_name == '':
- return ''
- while check_experiment_exists(new_name, False):
- new_name = _next_name(new_name)
- if new_name == '':
- return ''
- if not _set_experiment(new_name, description, version):
- return ''
- Log.info('The new experiment "{0}" has been registered.', new_name)
- return new_name
-
-
-def copy_experiment(name, description, version, test=False):
- """
- Creates a new experiment by copying an existing experiment
-
- :param test: specifies if it is a test experiment
- :type test: bool
- :param version: experiment's associated autosubmit version
- :type version: str
- :param name: identifier of experiment to copy
- :type name: str
- :param description: experiment's description
- :type description: str
- :return: experiment id for the new experiment
- :rtype: str
- """
- if not check_experiment_exists(name):
- return ''
- new_name = new_experiment(description, version, test)
- return new_name
-
-
-def base36encode(number, alphabet=string.digits + string.ascii_lowercase):
- """
- Convert positive integer to a base36 string.
-
- :param number: number to convert
- :type number: int
- :param alphabet: set of characters to use
- :type alphabet: str
- :return: number's base36 string value
- :rtype: str
- """
- if not isinstance(number, (int, long)):
- raise TypeError('number must be an integer')
-
- # Special case for zero
- if number == 0:
- return '0'
-
- base36 = ''
-
- sign = ''
- if number < 0:
- sign = '-'
- number = - number
-
- while number > 0:
- number, i = divmod(number, len(alphabet))
- # noinspection PyAugmentAssignment
- base36 = alphabet[i] + base36
-
- return sign + base36.rjust(4, '0')
-
-
-def base36decode(number):
- """
- Converts a base36 string to a positive integer
-
- :param number: base36 string to convert
- :type number: str
- :return: number's integer value
- :rtype: int
- """
- return int(number, 36)
-
-
-def _next_name(name):
- """
- Get next experiment identifier
-
- :param name: previous experiment identifier
- :type name: str
- :return: new experiment identifier
- :rtype: str
- """
- name = check_name(name)
- if name == '':
- return ''
- # Convert the name to base 36 in number add 1 and then encode it
- return base36encode(base36decode(name) + 1)
-
-
-def last_name_used(test=False):
+def last_name_used(test=False, operational=False):
"""
Gets last experiment identifier used
:param test: flag for test experiments
:type test: bool
+ :param operational: flag for operational experiments
+ :type test: bool
:return: last experiment identifier used, 'empty' if there is none
:rtype: str
"""
@@ -300,11 +248,17 @@ def last_name_used(test=False):
'WHERE rowid=(SELECT max(rowid) FROM experiment WHERE name LIKE "t%" AND '
'autosubmit_version IS NOT NULL AND '
'NOT (autosubmit_version LIKE "%3.0.0b%"))')
+ elif operational:
+ cursor.execute('SELECT name '
+ 'FROM experiment '
+ 'WHERE rowid=(SELECT max(rowid) FROM experiment WHERE name LIKE "o%" AND '
+ 'autosubmit_version IS NOT NULL AND '
+ 'NOT (autosubmit_version LIKE "%3.0.0b%"))')
else:
cursor.execute('SELECT name '
'FROM experiment '
'WHERE rowid=(SELECT max(rowid) FROM experiment WHERE name NOT LIKE "t%" AND '
- 'autosubmit_version IS NOT NULL AND '
+ 'name NOT LIKE "o%" AND autosubmit_version IS NOT NULL AND '
'NOT (autosubmit_version LIKE "%3.0.0b%"))')
row = cursor.fetchone()
close_conn(conn, cursor)
@@ -319,125 +273,34 @@ def last_name_used(test=False):
return row[0]
-def delete_experiment(name):
+def delete_experiment(experiment_id):
"""
Removes experiment from database
- :param name: experiment identifier
- :type name: str
+ :param experiment_id: experiment identifier
+ :type experiment_id: str
:return: True if delete is succesful
:rtype: bool
"""
if not check_db():
return False
- name = check_name(name)
- if name == '':
- return False
+ if check_experiment_exists(experiment_id, False):
+ return True
try:
(conn, cursor) = open_conn()
except DbException as e:
Log.error('Connection to database could not be established: {0}', e.message)
return False
cursor.execute('DELETE FROM experiment '
- 'WHERE name=:name', {'name': name})
+ 'WHERE name=:name', {'name': experiment_id})
row = cursor.fetchone()
if row is None:
- Log.debug('The experiment {0} has been deleted!!!', name)
+ Log.debug('The experiment {0} has been deleted!!!', experiment_id)
close_conn(conn, cursor)
return True
-def check_name(name):
- """
- Checks if it is a valid experiment identifier
-
- :param name: experiment identifier to check
- :type name: str
- :return: name if is valid, terminates program otherwise
- :rtype: str
- """
- name = name.lower()
- if len(name) < 4 or not name.isalnum():
- Log.error("So sorry, but the name must have at least 4 alphanumeric chars!!!")
- return ''
- return name
-
-
-def check_db():
- """
- Checks if database file exist
-
- :return: None if exists, terminates program if not
- """
-
- if not os.path.exists(BasicConfig.DB_PATH):
- Log.error('Some problem has happened...check the database file.' + 'DB file:' + BasicConfig.DB_PATH)
- return False
- return True
-
-
-def open_conn(check_version=True):
- """
- Opens a connection to database
-
- :param check_version: If true, check if the database is compatible with this autosubmit version
- :type check_version: bool
- :return: connection object, cursor object
- :rtype: sqlite3.Connection, sqlite3.Cursor
- """
- conn = sqlite3.connect(BasicConfig.DB_PATH)
- cursor = conn.cursor()
-
- # Getting database version
- if check_version:
- try:
- cursor.execute('SELECT version '
- 'FROM db_version;')
- row = cursor.fetchone()
- version = row[0]
- except sqlite3.OperationalError:
- # If this exception is thrown it's because db_version does not exist.
- # Database is from 2.x or 3.0 beta releases
- try:
- cursor.execute('SELECT type '
- 'FROM experiment;')
- # If type field exists, it's from 2.x
- version = -1
- except sqlite3.Error:
- # If raises and error , it's from 3.0 beta releases
- version = 0
-
- # If database version is not the expected, update database....
- if version < CURRENT_DATABASE_VERSION:
- if not _update_database(version, cursor):
- raise DbException('Database version could not be updated')
-
- # ... or ask for autosubmit upgrade
- elif version > CURRENT_DATABASE_VERSION:
- Log.critical('Database version is not compatible with this autosubmit version. Please execute pip install '
- 'autosubmit --upgrade')
- raise DbException('Database version not compatible')
-
- return conn, cursor
-
-
-def close_conn(conn, cursor):
- """
- Commits changes and close connection to database
-
- :param conn: connection to close
- :type conn: sqlite3.Connection
- :param cursor: cursor to close
- :type cursor: sqlite3.Cursor
- """
- conn.commit()
- cursor.close()
- conn.close()
- return
-
-
def _update_database(version, cursor):
-
Log.info("Autosubmit's database version is {0}. Current version is {1}. Updating...",
version, CURRENT_DATABASE_VERSION)
try:
@@ -472,5 +335,6 @@ class DbException(Exception):
"""
Exception class for database errors
"""
+
def __init__(self, message):
self.message = message
diff --git a/autosubmit/experiment/__init__.py b/autosubmit/experiment/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/autosubmit/experiment/experiment_common.py b/autosubmit/experiment/experiment_common.py
new file mode 100644
index 0000000000000000000000000000000000000000..38bc38a622cb56e2e1f2fd58c45875bbdac6e3d3
--- /dev/null
+++ b/autosubmit/experiment/experiment_common.py
@@ -0,0 +1,171 @@
+#!/usr/bin/env python
+
+# Copyright 2015 Earth Sciences Department, BSC-CNS
+
+# This file is part of Autosubmit.
+
+# Autosubmit is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# Autosubmit is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with Autosubmit. If not, see .
+
+"""
+Module containing functions to manage autosubmit's experiments.
+"""
+import string
+import autosubmit.database.db_common as db_common
+from autosubmit.config.log import Log
+
+
+def new_experiment(description, version, test=False, operational=False):
+ """
+ Stores a new experiment on the database and generates its identifier
+
+ :param version: autosubmit version associated to the experiment
+ :type version: str
+ :param test: flag for test experiments
+ :type test: bool
+ :param operational: flag for operational experiments
+ :type operational: bool
+ :param description: experiment's description
+ :type description: str
+ :return: experiment id for the new experiment
+ :rtype: str
+ """
+ if test:
+ last_exp_name = db_common.last_name_used(True)
+ elif operational:
+ last_exp_name = db_common.last_name_used(False, True)
+ else:
+ last_exp_name = db_common.last_name_used()
+ if last_exp_name == '':
+ return ''
+ if last_exp_name == 'empty':
+ if test:
+ # test identifier restricted also to 4 characters.
+ new_name = 't000'
+ elif operational:
+ # operational identifier restricted also to 4 characters.
+ new_name = 'o000'
+ else:
+ new_name = 'a000'
+ else:
+ new_name = next_experiment_id(last_exp_name)
+ if new_name == '':
+ return ''
+ while db_common.check_experiment_exists(new_name, False):
+ new_name = next_experiment_id(new_name)
+ if new_name == '':
+ return ''
+ if not db_common.save_experiment(new_name, description, version):
+ return ''
+ Log.info('The new experiment "{0}" has been registered.', new_name)
+ return new_name
+
+
+def copy_experiment(experiment_id, description, version, test=False, operational=False):
+ """
+ Creates a new experiment by copying an existing experiment
+
+ :param version: experiment's associated autosubmit version
+ :type version: str
+ :param experiment_id: identifier of experiment to copy
+ :type experiment_id: str
+ :param description: experiment's description
+ :type description: str
+ :param test: specifies if it is a test experiment
+ :type test: bool
+ :param operational: specifies if it is a operational experiment
+ :type operational: bool
+ :return: experiment id for the new experiment
+ :rtype: str
+ """
+ if not db_common.check_experiment_exists(experiment_id):
+ return ''
+ new_name = new_experiment(description, version, test, operational)
+ return new_name
+
+
+def next_experiment_id(current_id):
+ """
+ Get next experiment identifier
+
+ :param current_id: previous experiment identifier
+ :type current_id: str
+ :return: new experiment identifier
+ :rtype: str
+ """
+ if not is_valid_experiment_id(current_id):
+ return ''
+ # Convert the name to base 36 in number add 1 and then encode it
+ next_id = base36encode(base36decode(current_id) + 1)
+ return next_id if is_valid_experiment_id(next_id) else ''
+
+
+def is_valid_experiment_id(name):
+ """
+ Checks if it is a valid experiment identifier
+
+ :param name: experiment identifier to check
+ :type name: str
+ :return: name if is valid, terminates program otherwise
+ :rtype: str
+ """
+ name = name.lower()
+ if len(name) < 4 or not name.isalnum():
+ Log.error("So sorry, but the name must have at least 4 alphanumeric chars!!!")
+ return False
+ return True
+
+
+def base36encode(number, alphabet=string.digits + string.ascii_lowercase):
+ """
+ Convert positive integer to a base36 string.
+
+ :param number: number to convert
+ :type number: int
+ :param alphabet: set of characters to use
+ :type alphabet: str
+ :return: number's base36 string value
+ :rtype: str
+ """
+ if not isinstance(number, (int, long)):
+ raise TypeError('number must be an integer')
+
+ # Special case for zero
+ if number == 0:
+ return '0'
+
+ base36 = ''
+
+ sign = ''
+ if number < 0:
+ sign = '-'
+ number = - number
+
+ while number > 0:
+ number, i = divmod(number, len(alphabet))
+ # noinspection PyAugmentAssignment
+ base36 = alphabet[i] + base36
+
+ return sign + base36.rjust(4, '0')
+
+
+def base36decode(number):
+ """
+ Converts a base36 string to a positive integer
+
+ :param number: base36 string to convert
+ :type number: str
+ :return: number's integer value
+ :rtype: int
+ """
+ return int(number, 36)
diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py
index 73a9843f2590961c927a3d748e7a78e082129986..7b92508e8cd018af0586a7e65d338fd22d308cf4 100644
--- a/autosubmit/job/job.py
+++ b/autosubmit/job/job.py
@@ -73,6 +73,8 @@ class Job:
self.id = jobid
self.file = None
+ self.out_filename = ''
+ self.err_filename = ''
self.status = status
self.priority = priority
self._parents = set()
@@ -427,6 +429,7 @@ class Job:
self.write_start_time()
if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]:
self.write_end_time(self.status == Status.COMPLETED)
+ self.get_platform().get_logs_files(self.expid, self.out_filename, self.err_filename)
return self.status
def check_completion(self, default_status=Status.FAILED):
diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py
index f54022b0d85bbdc8226fea55b6c352753308af21..dd4b7e083c6366ee455a4f4c15d04d067f835b1b 100644
--- a/autosubmit/job/job_list.py
+++ b/autosubmit/job/job_list.py
@@ -35,6 +35,9 @@ from autosubmit.job.job import Job
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import date2str, parse_date
+from networkx import DiGraph
+from autosubmit.job.job_utils import transitive_reduction
+
class JobList:
"""
@@ -58,17 +61,32 @@ class JobList:
self._chunk_list = []
self._dic_jobs = dict()
self._persistence = job_list_persistence
+ self._graph = DiGraph()
@property
def expid(self):
"""
- Returns experiment identifier
+ Returns the experiment identifier
:return: experiment's identifier
:rtype: str
"""
return self._expid
+ @property
+ def graph(self):
+ """
+ Returns the graph
+
+ :return: graph
+ :rtype: networkx graph
+ """
+ return self._graph
+
+ @graph.setter
+ def graph(self, value):
+ self._graph = value
+
def generate(self, date_list, member_list, num_chunks, parameters, date_format, default_retrials, default_job_type,
new=True):
"""
@@ -98,11 +116,9 @@ class JobList:
chunk_list = range(1, num_chunks + 1)
self._chunk_list = chunk_list
- parser = self._parser_factory.create_parser()
- parser.optionxform = str
- parser.read(os.path.join(self._config.LOCAL_ROOT_DIR, self._expid, 'conf', "jobs_" + self._expid + ".conf"))
+ jobs_parser = self._get_jobs_parser()
- dic_jobs = DicJobs(self, parser, date_list, member_list, chunk_list, date_format, default_retrials)
+ dic_jobs = DicJobs(self, jobs_parser, date_list, member_list, chunk_list, date_format, default_retrials)
self._dic_jobs = dic_jobs
priority = 0
@@ -110,10 +126,10 @@ class JobList:
jobs_data = dict()
if not new:
jobs_data = {str(row[0]): row for row in self.load()}
- self._create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data)
+ self._create_jobs(dic_jobs, jobs_parser, priority, default_job_type, jobs_data)
Log.info("Adding dependencies...")
- self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, parser)
+ self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, self.graph)
Log.info("Removing redundant dependencies...")
self.update_genealogy(new)
@@ -121,65 +137,106 @@ class JobList:
job.parameters = parameters
@staticmethod
- def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, parser):
- for section in parser.sections():
- Log.debug("Adding dependencies for {0} jobs".format(section))
- if not parser.has_option(section, "DEPENDENCIES"):
+ def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, graph, option="DEPENDENCIES"):
+ for job_section in jobs_parser.sections():
+ Log.debug("Adding dependencies for {0} jobs".format(job_section))
+
+ # If does not has dependencies, do nothing
+ if not jobs_parser.has_option(job_section, option):
continue
- dependencies = parser.get(section, "DEPENDENCIES").split()
- dep_section = dict()
- dep_distance = dict()
- dep_running = dict()
- for dependency in dependencies:
- JobList._treat_dependency(dep_distance, dep_running, dep_section, dependency, dic_jobs)
+ dependencies_keys = jobs_parser.get(job_section, option).split()
+ dependencies = JobList._manage_dependencies(dependencies_keys, dic_jobs)
- for job in dic_jobs.get_jobs(section):
- JobList._treat_job(dic_jobs, job, date_list, member_list, chunk_list, dependencies, dep_distance,
- dep_running, dep_section)
+ for job in dic_jobs.get_jobs(job_section):
+ JobList._manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys,
+ dependencies, graph)
@staticmethod
- def _treat_job(dic_jobs, job, date_list, member_list, chunk_list, dependencies, dep_distance, dep_running,
- dep_section):
- for dependency in dependencies:
- chunk = job.chunk
- member = job.member
- date = job.date
-
- section_name = dep_section[dependency]
-
- # Case dependency with previous execution of same job
- if '-' in dependency:
- distance = dep_distance[dependency]
- if chunk is not None and dep_running[dependency] == 'chunk':
- chunk_index = chunk_list.index(chunk)
- if chunk_index >= distance:
- chunk = chunk_list[chunk_index - distance]
- else:
- continue
- elif member is not None and dep_running[dependency] in ['chunk', 'member']:
- member_index = member_list.index(member)
- if member_index >= distance:
- member = member_list[member_index - distance]
- else:
- continue
- elif date is not None and dep_running[dependency] in ['chunk', 'member', 'startdate']:
- date_index = date_list.index(date)
- if date_index >= distance:
- date = date_list[date_index - distance]
- else:
- continue
+ def _manage_dependencies(dependencies_keys, dic_jobs):
+ dependencies = dict()
+ for key in dependencies_keys:
+ if '-' not in key and '+' not in key:
+ dependencies[key] = Dependency(key)
+ else:
+ sign = '-' if '-' in key else '+'
+ key_split = key.split(sign)
+ dependency_running_type = dic_jobs.get_option(key_split[0], 'RUNNING', 'once').lower()
+ dependency = Dependency(key_split[0], int(key_split[1]), dependency_running_type, sign)
+ dependencies[key] = dependency
+ return dependencies
+
+ @staticmethod
+ def _manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies, graph):
+ for key in dependencies_keys:
+ dependency = dependencies[key]
+ skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, chunk_list,
+ job.member, member_list,
+ job.date, date_list,
+ dependency)
+ if skip:
+ continue
- # Adding the dependencies (parents) calculated above
- for parent in dic_jobs.get_jobs(section_name, date, member, chunk):
+ for parent in dic_jobs.get_jobs(dependency.section, date, member, chunk):
job.add_parent(parent)
+ graph.add_edge(parent.name, job.name)
JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member,
- member_list, section_name)
+ member_list, dependency.section, graph)
+
+ @staticmethod
+ def _calculate_dependency_metadata(chunk, chunk_list, member, member_list, date, date_list, dependency):
+ skip = False
+ if dependency.sign is '-':
+ if chunk is not None and dependency.running == 'chunk':
+ chunk_index = chunk_list.index(chunk)
+ if chunk_index >= dependency.distance:
+ chunk = chunk_list[chunk_index - dependency.distance]
+ else:
+ skip = True
+ elif member is not None and dependency.running in ['chunk', 'member']:
+ member_index = member_list.index(member)
+ if member_index >= dependency.distance:
+ member = member_list[member_index - dependency.distance]
+ else:
+ skip = True
+ elif date is not None and dependency.running in ['chunk', 'member', 'startdate']:
+ date_index = date_list.index(date)
+ if date_index >= dependency.distance:
+ date = date_list[date_index - dependency.distance]
+ else:
+ skip = True
+
+ if dependency.sign is '+':
+ if chunk is not None and dependency.running == 'chunk':
+ chunk_index = chunk_list.index(chunk)
+ if (chunk_index + dependency.distance) < len(chunk_list):
+ chunk = chunk_list[chunk_index + dependency.distance]
+ else: # calculating the next one possible
+ temp_distance = dependency.distance
+ while temp_distance > 0:
+ temp_distance -= 1
+ if (chunk_index + temp_distance) < len(chunk_list):
+ chunk = chunk_list[chunk_index + temp_distance]
+ break
+
+ elif member is not None and dependency.running in ['chunk', 'member']:
+ member_index = member_list.index(member)
+ if (member_index + dependency.distance) < len(member_list):
+ member = member_list[member_index + dependency.distance]
+ else:
+ skip = True
+ elif date is not None and dependency.running in ['chunk', 'member', 'startdate']:
+ date_index = date_list.index(date)
+ if (date_index + dependency.distance) < len(date_list):
+ date = date_list[date_index - dependency.distance]
+ else:
+ skip = True
+ return skip, (chunk, member, date)
@staticmethod
def handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list,
- section_name):
+ section_name, graph):
if job.wait and job.frequency > 1:
if job.chunk is not None:
max_distance = (chunk_list.index(chunk) + 1) % job.frequency
@@ -188,6 +245,7 @@ class JobList:
for distance in range(1, max_distance):
for parent in dic_jobs.get_jobs(section_name, date, member, chunk - distance):
job.add_parent(parent)
+ graph.add_edge(parent.name, job.name)
elif job.member is not None:
member_index = member_list.index(job.member)
max_distance = (member_index + 1) % job.frequency
@@ -197,6 +255,7 @@ class JobList:
for parent in dic_jobs.get_jobs(section_name, date,
member_list[member_index - distance], chunk):
job.add_parent(parent)
+ graph.add_edge(parent.name, job.name)
elif job.date is not None:
date_index = date_list.index(job.date)
max_distance = (date_index + 1) % job.frequency
@@ -206,16 +265,7 @@ class JobList:
for parent in dic_jobs.get_jobs(section_name, date_list[date_index - distance],
member, chunk):
job.add_parent(parent)
-
- @staticmethod
- def _treat_dependency(dep_distance, dep_running, dep_section, dependency, dic_jobs):
- if '-' in dependency:
- dependency_split = dependency.split('-')
- dep_section[dependency] = dependency_split[0]
- dep_distance[dependency] = int(dependency_split[1])
- dep_running[dependency] = dic_jobs.get_option(dependency_split[0], 'RUNNING', 'once').lower()
- else:
- dep_section[dependency] = dependency
+ graph.add_edge(parent.name, job.name)
@staticmethod
def _create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data=dict()):
@@ -518,7 +568,7 @@ class JobList:
else:
retrials = job.retrials
- if job.fail_count < retrials:
+ if job.fail_count <= retrials:
tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED]
if len(tmp) == len(job.parents):
job.status = Status.READY
@@ -533,9 +583,6 @@ class JobList:
Log.debug('Updating WAITING jobs')
for job in self.get_waiting():
tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED]
- # for parent in job.parents:
- # if parent.status != Status.COMPLETED:
- # break
if len(tmp) == len(job.parents):
job.status = Status.READY
save = True
@@ -543,7 +590,7 @@ class JobList:
Log.debug('Update finished')
return save
- def update_genealogy(self, new):
+ def update_genealogy(self, new=True):
"""
When we have created the job list, every type of job is created.
Update genealogy remove jobs that have no templates
@@ -558,8 +605,12 @@ class JobList:
# Simplifying dependencies: if a parent is already an ancestor of another parent,
# we remove parent dependency
+ self.graph = transitive_reduction(self.graph)
for job in self._job_list:
- job.remove_redundant_parents()
+ children_to_remove = [child for child in job.children if child.name not in self.graph.neighbors(job.name)]
+ for child in children_to_remove:
+ job.children.remove(child)
+ child.parents.remove(job)
for job in self._job_list:
if not job.has_parents() and new:
@@ -614,26 +665,18 @@ class JobList:
:param chunk_list: list of chunks to rerun
:type chunk_list: str
"""
- parser = self._parser_factory.create_parser()
- parser.optionxform = str
- parser.read(os.path.join(self._config.LOCAL_ROOT_DIR, self._expid, 'conf', "jobs_" + self._expid + ".conf"))
+ jobs_parser = self._get_jobs_parser()
Log.info("Adding dependencies...")
- dep_section = dict()
- dep_distance = dict()
- dependencies = dict()
- dep_running = dict()
- for section in parser.sections():
- Log.debug("Reading rerun dependencies for {0} jobs".format(section))
- if not parser.has_option(section, "RERUN_DEPENDENCIES"):
+ for job_section in jobs_parser.sections():
+ Log.debug("Reading rerun dependencies for {0} jobs".format(job_section))
+
+ # If does not has rerun dependencies, do nothing
+ if not jobs_parser.has_option(job_section, "RERUN_DEPENDENCIES"):
continue
- dependencies[section] = parser.get(section, "RERUN_DEPENDENCIES").split()
- dep_section[section] = dict()
- dep_distance[section] = dict()
- dep_running[section] = dict()
- for dependency in dependencies[section]:
- JobList._treat_dependency(dep_distance[section], dep_running[section], dep_section[section], dependency,
- self._dic_jobs)
+
+ dependencies_keys = jobs_parser.get(job_section, "RERUN_DEPENDENCIES").split()
+ dependencies = JobList._manage_dependencies(dependencies_keys, self._dic_jobs)
for job in self._job_list:
job.status = Status.COMPLETED
@@ -651,38 +694,22 @@ class JobList:
chunk = int(c)
for job in [i for i in self._job_list if i.date == date and i.member == member and
i.chunk == chunk]:
+
if not job.rerun_only or chunk != previous_chunk + 1:
job.status = Status.WAITING
Log.debug("Job: " + job.name)
- section = job.section
- if section not in dependencies:
+
+ job_section = job.section
+ if job_section not in dependencies:
continue
- for dependency in dependencies[section]:
- current_chunk = chunk
- current_member = member
- current_date = date
- if '-' in dependency:
- distance = dep_distance[section][dependency]
- running = dep_running[section][dependency]
- if current_chunk is not None and running == 'chunk':
- chunk_index = self._chunk_list.index(current_chunk)
- if chunk_index >= distance:
- current_chunk = self._chunk_list[chunk_index - distance]
- else:
- continue
- elif current_member is not None and running in ['chunk', 'member']:
- member_index = self._member_list.index(current_member)
- if member_index >= distance:
- current_member = self._member_list[member_index - distance]
- else:
- continue
- elif current_date is not None and running in ['chunk', 'member', 'startdate']:
- date_index = self._date_list.index(current_date)
- if date_index >= distance:
- current_date = self._date_list[date_index - distance]
- else:
- continue
- section_name = dep_section[section][dependency]
+
+ for key in dependencies_keys:
+ skip, (chunk, member, date) = JobList._calculate_dependency_metadata(chunk, member, date,
+ dependencies[key])
+ if skip:
+ continue
+
+ section_name = dependencies[key].section
for parent in self._dic_jobs.get_jobs(section_name, current_date, current_member,
current_chunk):
parent.status = Status.WAITING
@@ -696,9 +723,16 @@ class JobList:
self.update_genealogy()
+ def _get_jobs_parser(self):
+ jobs_parser = self._parser_factory.create_parser()
+ jobs_parser.optionxform = str
+ jobs_parser.read(
+ os.path.join(self._config.LOCAL_ROOT_DIR, self._expid, 'conf', "jobs_" + self._expid + ".conf"))
+ return jobs_parser
+
def remove_rerun_only_jobs(self):
"""
- Removes all jobs to be runned only in reruns
+ Removes all jobs to be run only in reruns
"""
flag = False
for job in set(self._job_list):
@@ -779,6 +813,7 @@ class DicJobs:
:type priority: int
"""
self._dic[section] = self.build_job(section, priority, None, None, None, default_job_type, jobs_data)
+ self._joblist.graph.add_node(self._dic[section].name)
def _create_jobs_startdate(self, section, priority, frequency, default_job_type, jobs_data=dict()):
"""
@@ -799,6 +834,7 @@ class DicJobs:
if count % frequency == 0 or count == len(self._date_list):
self._dic[section][date] = self.build_job(section, priority, date, None, None, default_job_type,
jobs_data)
+ self._joblist.graph.add_node(self._dic[section][date].name)
def _create_jobs_member(self, section, priority, frequency, default_job_type, jobs_data=dict()):
"""
@@ -821,6 +857,7 @@ class DicJobs:
if count % frequency == 0 or count == len(self._member_list):
self._dic[section][date][member] = self.build_job(section, priority, date, member, None,
default_job_type, jobs_data)
+ self._joblist.graph.add_node(self._dic[section][date][member].name)
'''
Maybe a good choice could be split this function or ascend the
@@ -872,6 +909,7 @@ class DicJobs:
else:
self._dic[section][date][member][chunk] = self.build_job(section, priority, date, member,
chunk, default_job_type, jobs_data)
+ self._joblist.graph.add_node(self._dic[section][date][member][chunk].name)
def get_jobs(self, section, date=None, member=None, chunk=None):
"""
@@ -891,6 +929,10 @@ class DicJobs:
:rtype: list
"""
jobs = list()
+
+ if section not in self._dic:
+ return jobs
+
dic = self._dic[section]
if type(dic) is not dict:
jobs.append(dic)
@@ -975,7 +1017,7 @@ class DicJobs:
job.check = False
job.processors = self.get_option(section, "PROCESSORS", 1)
- job.threads = self.get_option(section, "THREADS", 1)
+ job.threads = self.get_option(section, "THREADS", '')
job.tasks = self.get_option(section, "TASKS", '')
job.memory = self.get_option(section, "MEMORY", '')
job.wallclock = self.get_option(section, "WALLCLOCK", '')
@@ -1001,3 +1043,16 @@ class DicJobs:
return self._parser.get(section, option)
else:
return default
+
+
+class Dependency(object):
+ """
+ Class to manage the metadata related with a dependency
+
+ """
+
+ def __init__(self, section, distance=None, running=None, sign=None):
+ self.section = section
+ self.distance = distance
+ self.running = running
+ self.sign = sign
diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..f6f3cde17afb4c8905dff63c5ea6401eb6564952
--- /dev/null
+++ b/autosubmit/job/job_utils.py
@@ -0,0 +1,19 @@
+import networkx
+
+from networkx.algorithms.dag import is_directed_acyclic_graph
+from networkx import DiGraph
+from networkx import dfs_edges
+from networkx import NetworkXError
+
+
+def transitive_reduction(graph):
+ if not is_directed_acyclic_graph(graph):
+ raise NetworkXError("Transitive reduction only uniquely defined on directed acyclic graphs.")
+ reduced_graph = DiGraph()
+ reduced_graph.add_nodes_from(graph.nodes())
+ for u in graph:
+ u_edges = set(graph[u])
+ for v in graph[u]:
+ u_edges -= {y for x, y in dfs_edges(graph, v)}
+ reduced_graph.add_edges_from((u, v) for v in u_edges)
+ return reduced_graph
diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py
index 7717eb8c72dc6cb84ea8d53273d9caaf2268d5b4..44771629520db57212ef01e80a245d468345d405 100644
--- a/autosubmit/platforms/ecplatform.py
+++ b/autosubmit/platforms/ecplatform.py
@@ -134,8 +134,8 @@ class EcPlatform(ParamikoPlatform):
raise
return True
- def get_file(self, filename, must_exist=True):
- local_path = os.path.join(self.tmp_path, filename)
+ def get_file(self, filename, must_exist=True, relative_path=''):
+ local_path = os.path.join(self.tmp_path, relative_path, filename)
if os.path.exists(local_path):
os.remove(local_path)
@@ -237,6 +237,20 @@ class EcCcaHeader:
# There is no queue, so directive is empty
return ""
+ # noinspection PyMethodMayBeStatic
+ def get_tasks_per_node(self, job):
+ if not isinstance(job.tasks, int):
+ return ""
+ else:
+ return '#PBS -l EC_tasks_per_node={0}'.format(job.tasks)
+
+ # noinspection PyMethodMayBeStatic
+ def get_threads_per_task(self, job):
+ if not isinstance(job.threads, int):
+ return ""
+ else:
+ return '#PBS -l EC_threads_per_task={0}'.format(job.threads)
+
SERIAL = textwrap.dedent("""\
###############################################################################
# %TASKTYPE% %EXPID% EXPERIMENT
@@ -263,8 +277,8 @@ class EcCcaHeader:
#PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE%
#PBS -q np
#PBS -l EC_total_tasks=%NUMPROC%
- #PBS -l EC_threads_per_task=%NUMTHREADS%
- #PBS -l EC_tasks_per_node=%NUMTASK%
+ %THREADS_PER_TASK_DIRECTIVE%
+ %TASKS_PER_NODE_DIRECTIVE%
#PBS -l walltime=%WALLCLOCK%:00
#PBS -l EC_billing_account=%CURRENT_BUDG%
#
diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py
index ce08d88a33b313706933602809edb35f07edfc70..f51d2af9235d80405798f1a9b555c40645a7f0fd 100644
--- a/autosubmit/platforms/locplatform.py
+++ b/autosubmit/platforms/locplatform.py
@@ -112,8 +112,8 @@ class LocalPlatform(ParamikoPlatform):
raise
return True
- def get_file(self, filename, must_exist=True):
- local_path = os.path.join(self.tmp_path, filename)
+ def get_file(self, filename, must_exist=True, relative_path=''):
+ local_path = os.path.join(self.tmp_path, relative_path, filename)
if os.path.exists(local_path):
os.remove(local_path)
@@ -139,6 +139,20 @@ class LocalPlatform(ParamikoPlatform):
def get_ssh_output(self):
return self._ssh_output
+ def get_logs_files(self, exp_id, job_out_filename, job_err_filename):
+ """
+ Overriding the parent's implementation.
+ Do nothing because the log files are already in the local platform (redundancy).
+
+ :param exp_id: experiment id
+ :type exp_id: str
+ :param job_out_filename: name of the out file
+ :type job_out_filename: str
+ :param job_err_filename: name of the err file
+ :type job_err_filename: str
+ """
+ return
+
class LocalHeader:
"""Class to handle the Ps headers of a job"""
diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py
index dd8b03d0d9bda8ffa8a856125b97038c4a9537d6..7f906efdf1ca4f25f81d554d65389d6836cdb23d 100644
--- a/autosubmit/platforms/lsfplatform.py
+++ b/autosubmit/platforms/lsfplatform.py
@@ -105,11 +105,25 @@ class LsfHeader:
# noinspection PyMethodMayBeStatic
def get_scratch_free_space(self, job):
- if job.scratch_free_space is None:
+ if not isinstance(job.scratch_free_space, int):
return ""
else:
return '#BSUB -R "select[(scratch<{0})]"'.format(job.scratch_free_space)
+ # noinspection PyMethodMayBeStatic
+ def get_tasks_per_node(self, job):
+ if not isinstance(job.tasks, int):
+ return ""
+ else:
+ return '#BSUB -R "span[ptile={0}]"'.format(job.tasks)
+
+ # noinspection PyMethodMayBeStatic
+ def get_exclusivity(self, job):
+ if job.get_platform().exclusivity == 'true':
+ return "#BSUB -x"
+ else:
+ return ""
+
SERIAL = textwrap.dedent("""\
###############################################################################
# %TASKTYPE% %EXPID% EXPERIMENT
@@ -117,10 +131,11 @@ class LsfHeader:
#
#%QUEUE_DIRECTIVE%
#BSUB -J %JOBNAME%
- #BSUB -oo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.out
- #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.err
+ #BSUB -oo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE%
+ #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE%
#BSUB -W %WALLCLOCK%
#BSUB -n %NUMPROC%
+ %EXCLUSIVITY_DIRECTIVE%
#
###############################################################################
""")
@@ -132,11 +147,11 @@ class LsfHeader:
#
#%QUEUE_DIRECTIVE%
#BSUB -J %JOBNAME%
- #BSUB -oo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.out
- #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.err
+ #BSUB -oo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE%
+ #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE%
#BSUB -W %WALLCLOCK%
#BSUB -n %NUMPROC%
- #BSUB -R "span[ptile=%NUMTASK%]"
+ %TASKS_PER_NODE_DIRECTIVE%
%SCRATCH_FREE_SPACE_DIRECTIVE%
#
###############################################################################
diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py
index d3abb1eeb380591071f01f6c539efc9b497e8986..713ab52c40170d3054913011de68f9568b181eb2 100644
--- a/autosubmit/platforms/paramiko_platform.py
+++ b/autosubmit/platforms/paramiko_platform.py
@@ -98,7 +98,7 @@ class ParamikoPlatform(Platform):
os.path.join(self.get_files_path(), filename))
raise
- def get_file(self, filename, must_exist=True):
+ def get_file(self, filename, must_exist=True, relative_path=''):
"""
Copies a file from the current platform to experiment's tmp folder
@@ -106,11 +106,13 @@ class ParamikoPlatform(Platform):
:type filename: str
:param must_exist: If True, raises an exception if file can not be copied
:type must_exist: bool
- :return: True if file is copied succesfully, false otherwise
+ :param relative_path: path inside the tmp folder
+ :type relative_path: str
+ :return: True if file is copied successfully, false otherwise
:rtype: bool
"""
- local_path = os.path.join(self.tmp_path, filename)
+ local_path = os.path.join(self.tmp_path, relative_path, filename)
if os.path.exists(local_path):
os.remove(local_path)
@@ -123,7 +125,7 @@ class ParamikoPlatform(Platform):
ftp.get(os.path.join(self.get_files_path(), filename), local_path)
ftp.close()
return True
- except BaseException as e:
+ except BaseException:
if must_exist:
raise Exception('File {0} does not exists'.format(filename))
return False
@@ -150,18 +152,18 @@ class ParamikoPlatform(Platform):
Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename)))
return False
- def submit_job(self, job, scriptname):
+ def submit_job(self, job, script_name):
"""
Submit a job from a given job object.
:param job: job object
:type job: autosubmit.job.job.Job
- :param scriptname: job script's name
+ :param script_name: job script's name
:rtype scriptname: str
:return: job id for the submitted job
:rtype: int
"""
- if self.send_command(self.get_submit_cmd(scriptname, job.type)):
+ if self.send_command(self.get_submit_cmd(script_name, job.type)):
job_id = self.get_submitted_job_id(self.get_ssh_output())
Log.debug("Job ID: {0}", job_id)
return int(job_id)
@@ -355,11 +357,21 @@ class ParamikoPlatform(Platform):
header = self.header.SERIAL
str_datetime = date2str(datetime.datetime.now(), 'S')
- header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job))
- header = header.replace('%ERR_LOG_DIRECTIVE%', "{0}.{1}.err".format(job.name, str_datetime))
- header = header.replace('%OUT_LOG_DIRECTIVE%', "{0}.{1}.out".format(job.name, str_datetime))
+ job.out_filename = "{0}.{1}.out".format(job.name, str_datetime)
+ job.err_filename = "{0}.{1}.err".format(job.name, str_datetime)
+ header = header.replace('%OUT_LOG_DIRECTIVE%', job.out_filename)
+ header = header.replace('%ERR_LOG_DIRECTIVE%', job.err_filename)
+
+ if hasattr(self.header, 'get_queue_directive'):
+ header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job))
+ if hasattr(self.header, 'get_tasks_per_node'):
+ header = header.replace('%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job))
+ if hasattr(self.header, 'get_threads_per_task'):
+ header = header.replace('%THREADS_PER_TASK_DIRECTIVE%', self.header.get_threads_per_task(job))
if hasattr(self.header, 'get_scratch_free_space'):
header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job))
+ if hasattr(self.header, 'get_exclusivity'):
+ header = header.replace('%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job))
return header
def check_remote_log_dir(self):
diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py
index 0410c542d5328ff839fab4726d273e1eb1af79f5..9ea31f8ceb210d21ac2e2db60df9f6bda34e76af 100644
--- a/autosubmit/platforms/platform.py
+++ b/autosubmit/platforms/platform.py
@@ -120,7 +120,7 @@ class Platform:
"""
raise NotImplementedError
- def get_file(self, filename, must_exist=True):
+ def get_file(self, filename, must_exist=True, relative_path=''):
"""
Copies a file from the current platform to experiment's tmp folder
@@ -128,11 +128,29 @@ class Platform:
:type filename: str
:param must_exist: If True, raises an exception if file can not be copied
:type must_exist: bool
- :return: True if file is copied succesfully, false otherwise
+ :param relative_path: relative path inside tmp folder
+ :type relative_path: str
+ :return: True if file is copied successfully, false otherwise
:rtype: bool
"""
raise NotImplementedError
+ def get_files(self, files, must_exist=True, relative_path=''):
+ """
+ Copies some files from the current platform to experiment's tmp folder
+
+ :param files: file names
+ :type files: [str]
+ :param must_exist: If True, raises an exception if file can not be copied
+ :type must_exist: bool
+ :param relative_path: relative path inside tmp folder
+ :type relative_path: str
+ :return: True if file is copied successfully, false otherwise
+ :rtype: bool
+ """
+ for filename in files:
+ self.get_file(filename, must_exist, relative_path)
+
def delete_file(self, filename):
"""
Deletes a file from this platform
@@ -144,6 +162,19 @@ class Platform:
"""
raise NotImplementedError
+ def get_logs_files(self, exp_id, job_out_filename, job_err_filename):
+ """
+ Get the given LOGS files
+
+ :param exp_id: experiment id
+ :type exp_id: str
+ :param job_out_filename: name of the out file
+ :type job_out_filename: str
+ :param job_err_filename: name of the err file
+ :type job_err_filename: str
+ """
+ self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id))
+
def get_completed_files(self, job_name, retries=5):
"""
Get the COMPLETED file of the given job
diff --git a/autosubmit/platforms/saga_platform.py b/autosubmit/platforms/saga_platform.py
index 3449ce9dbb4dce722d3cc9a4eeb2ddef0fcb4192..85b20b4d4b965aa842388c0b4005fbcbfd64322b 100644
--- a/autosubmit/platforms/saga_platform.py
+++ b/autosubmit/platforms/saga_platform.py
@@ -86,7 +86,7 @@ class SagaPlatform(Platform):
# noinspection PyTypeChecker
return saga.filesystem.Directory(sftp_directory, session=self.service.session)
- def get_file(self, filename, must_exist=True):
+ def get_file(self, filename, must_exist=True, relative_path=''):
"""
Copies a file from the current platform to experiment's tmp folder
@@ -94,6 +94,8 @@ class SagaPlatform(Platform):
:type filename: str
:param must_exist: If True, raises an exception if file can not be copied
:type must_exist: bool
+ :param relative_path: relative path inside tmp folder
+ :type relative_path: str
:return: True if file is copied succesfully, false otherwise
:rtype: bool
"""
@@ -204,14 +206,14 @@ class SagaPlatform(Platform):
saga_job.run()
return saga_job.id
- def create_saga_job(self, job, scriptname):
+ def create_saga_job(self, job, script_name):
"""
Creates a saga job from a given job object.
:param job: job object
:type job: autosubmit.job.job.Job
- :param scriptname: job script's name
- :rtype scriptname: str
+ :param script_name: job script's name
+ :type script_name: str
:return: saga job object for the given job
:rtype: saga.job.Job
"""
@@ -223,21 +225,25 @@ class SagaPlatform(Platform):
elif job.type == Type.R:
binary = 'Rscript'
- # jd.executable = '{0} {1}'.format(binary, os.path.join(self.get_files_path(), scriptname))
- jd.executable = os.path.join(self.get_files_path(), scriptname)
+ # jd.executable = '{0} {1}'.format(binary, os.path.join(self.get_files_path(), script_name))
+ jd.executable = os.path.join(self.get_files_path(), script_name)
jd.working_directory = self.get_files_path()
+
str_datetime = date2str(datetime.datetime.now(), 'S')
- jd.output = "{0}.{1}.out".format(job.name, str_datetime)
- jd.error = "{0}.{1}.err".format(job.name, str_datetime)
+ job.out_filename = "{0}.{1}.out".format(job.name, str_datetime)
+ job.err_filename = "{0}.{1}.err".format(job.name, str_datetime)
+ jd.output = job.out_filename
+ jd.error = job.err_filename
+
self.add_attribute(jd, 'Name', job.name)
- wallclock = job.parameters["WALLCLOCK"]
- if wallclock == '':
- wallclock = 0
+ wall_clock = job.parameters["WALLCLOCK"]
+ if wall_clock == '':
+ wall_clock = 0
else:
- wallclock = wallclock.split(':')
- wallclock = int(wallclock[0]) * 60 + int(wallclock[1])
- self.add_attribute(jd, 'WallTimeLimit', wallclock)
+ wall_clock = wall_clock.split(':')
+ wall_clock = int(wall_clock[0]) * 60 + int(wall_clock[1])
+ self.add_attribute(jd, 'WallTimeLimit', wall_clock)
self.add_attribute(jd, 'Queue', job.parameters["CURRENT_QUEUE"])
@@ -273,13 +279,13 @@ class SagaPlatform(Platform):
return
jd.set_attribute(name, value)
- def check_job(self, jobid, default_status=Status.COMPLETED, retries=10):
+ def check_job(self, job_id, default_status=Status.COMPLETED, retries=10):
"""
Checks job running status
:param retries: retries
- :param jobid: job id
- :type jobid: str
+ :param job_id: job id
+ :type job_id: str
:param default_status: status to assign if it can be retrieved from the platform
:type default_status: autosubmit.job.job_common.Status
:return: current job status
@@ -288,9 +294,9 @@ class SagaPlatform(Platform):
saga_status = None
while saga_status is None and retries >= 0:
try:
- if jobid not in self.service.jobs:
+ if job_id not in self.service.jobs:
return Status.COMPLETED
- saga_status = self.service.get_job(jobid).state
+ saga_status = self.service.get_job(job_id).state
except Exception as e:
# If SAGA can not get the job state, we change it to completed
# It will change to FAILED if not COMPLETED file is present
diff --git a/docs/source/conf.py b/docs/source/conf.py
index dc6320c1ca0c52fc041dc9c6364ea0122b14192f..1bbb67bbdc651c1d647acf645585b1ff189faf32 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -64,7 +64,7 @@ author = u'Earth Science Department, Barcelona Supercomputing Center, BSC'
# The short X.Y version.
version = '3.7'
# The full version, including alpha/beta/rc tags.
-release = '3.7.3'
+release = '3.7.4'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
diff --git a/setup.py b/setup.py
index 9a08a92aa30c878faf39b7434613016455114c36..5aa452e37c94ea97238c5cc91fffb28ee2fd1787 100644
--- a/setup.py
+++ b/setup.py
@@ -41,7 +41,7 @@ setup(
keywords=['climate', 'weather', 'workflow', 'HPC'],
install_requires=['argparse>=1.2,<2', 'python-dateutil>2', 'pydotplus>=2', 'pyparsing>=2.0.1',
'numpy', 'matplotlib', 'saga-python>=0.40', 'paramiko==1.15',
- 'mock>=1.3.0', 'portalocker>=0.5.7'],
+ 'mock>=1.3.0', 'portalocker>=0.5.7', 'networkx'],
extras_require={
'dialog': ["python2-pythondialog>=3.3.0"]
},
diff --git a/test/regression/README b/test/regression/README
index 7884bcef7dbc69aff1b5a49797e395faa3a2efae..a7e26a023c524f2b57d397010b3811de72ece3b9 100644
--- a/test/regression/README
+++ b/test/regression/README
@@ -14,6 +14,7 @@ How to run the regression tests
path = $PATH_TO_PROJECT/test/regression/db
3) Review the credentials on the platforms config file of each test
+or in the 'default_conf' directory
4) Run the 'tests_runner.py' file:
diff --git a/test/regression/default_conf/platforms.conf b/test/regression/default_conf/platforms.conf
index 2b3c23eddfd7f8e94c010fa76b0f687354ff98bf..7b2d75d9cdbbb09dbbfafd36286c6779ad719309 100644
--- a/test/regression/default_conf/platforms.conf
+++ b/test/regression/default_conf/platforms.conf
@@ -5,7 +5,7 @@ VERSION = pbs
HOST = cca
PROJECT = spesiccf
ADD_PROJECT_TO_HOST = false
-USER = c3m
+USER = c3j
SCRATCH_DIR = /scratch/ms
TEST_SUITE = True
PROCESSORS_PER_NODE = 24
diff --git a/test/regression/tests_log.py b/test/regression/tests_log.py
new file mode 100644
index 0000000000000000000000000000000000000000..a2e454b41fa0ef6a179a2102e54c84c0ff009351
--- /dev/null
+++ b/test/regression/tests_log.py
@@ -0,0 +1,189 @@
+#!/usr/bin/env python
+
+# Copyright 2016 Earth Sciences Department, BSC-CNS
+
+# This file is part of Autosubmit.
+
+# Autosubmit is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# Autosubmit is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with Autosubmit. If not, see .
+
+import logging
+import os
+import sys
+from datetime import datetime
+
+
+class LogFormatter:
+ """
+ Class to format log output.
+
+ :param to_file: If True, creates a LogFormatter for files; if False, for console
+ :type to_file: bool
+ """
+ RESULT = '\033[32m'
+ WARNING = '\033[33m'
+ ERROR = '\033[31m'
+ CRITICAL = '\033[1m \033[31m'
+ DEFAULT = '\033[0m\033[39m'
+
+ def __init__(self, to_file=False):
+ """
+ Initializer for LogFormatter
+
+
+ """
+ self._file = to_file
+ if self._file:
+ self._formatter = logging.Formatter('%(asctime)s %(message)s')
+ else:
+ self._formatter = logging.Formatter('%(message)s')
+
+ def format(self, record):
+ """
+ Format log output, adding labels if needed for log level. If logging to console, also manages font color.
+ If logging to file adds timestamp
+
+ :param record: log record to format
+ :type record: LogRecord
+ :return: formatted record
+ :rtype: str
+ """
+ header = ''
+ if record.levelno == Log.RESULT:
+ if not self._file:
+ header = LogFormatter.RESULT
+ elif record.levelno == Log.USER_WARNING:
+ if not self._file:
+ header = LogFormatter.WARNING
+ elif record.levelno == Log.WARNING:
+ if not self._file:
+ header = LogFormatter.WARNING
+ header += "[WARNING] "
+ elif record.levelno == Log.ERROR:
+ if not self._file:
+ header = LogFormatter.ERROR
+ header += "[ERROR] "
+ elif record.levelno == Log.CRITICAL:
+ if not self._file:
+ header = LogFormatter.ERROR
+ header += "[CRITICAL] "
+
+ msg = self._formatter.format(record)
+ if header != '' and not self._file:
+ msg += LogFormatter.DEFAULT
+ return header + msg
+
+
+class Log:
+ """
+ Static class to manage the prints for the regression tests. Messages will be sent to console.
+ Levels can be set for each output independently. These levels are (from lower to higher priority):
+ - DEBUG
+ - INFO
+ - RESULT
+ - USER_WARNING
+ - WARNING
+ - ERROR
+ - CRITICAL
+ """
+ EVERYTHING = 0
+ DEBUG = logging.DEBUG
+ INFO = logging.INFO
+ RESULT = 25
+ USER_WARNING = 29
+ WARNING = logging.WARNING
+ ERROR = logging.ERROR
+ CRITICAL = logging.CRITICAL
+ NO_LOG = CRITICAL + 1
+
+ logging.basicConfig()
+
+ log = logging.Logger('Autosubmit', EVERYTHING)
+
+ console_handler = logging.StreamHandler(sys.stdout)
+ console_handler.setLevel(INFO)
+ console_handler.setFormatter(LogFormatter(False))
+ log.addHandler(console_handler)
+
+ file_handler = None
+ file_level = INFO
+
+ @staticmethod
+ def debug(msg, *args):
+ """
+ Prints debug information
+
+ :param msg: message to show
+ :param args: arguments for message formatting (it will be done using format() method on str)
+ """
+ print(msg.format(*args))
+
+ @staticmethod
+ def info(msg, *args):
+ """
+ Prints information
+
+ :param msg: message to show
+ :param args: arguments for message formatting (it will be done using format() method on str)
+ """
+ print(msg.format(*args))
+
+ @staticmethod
+ def result(msg, *args):
+ """
+ Prints results information. It will be shown in green in the console.
+
+ :param msg: message to show
+ :param args: arguments for message formatting (it will be done using format() method on str)
+ """
+ print(LogFormatter.RESULT + msg.format(*args) + LogFormatter.DEFAULT)
+
+ @staticmethod
+ def user_warning(msg, *args):
+ """
+ Prints warnings for the user. It will be shown in yellow in the console.
+
+ :param msg: message to show
+ :param args: arguments for message formatting (it will be done using format() method on str)
+ """
+ print(LogFormatter.WARNING + msg.format(*args) + LogFormatter.DEFAULT)
+
+ @staticmethod
+ def warning(msg, *args):
+ """
+ Prints program warnings. It will be shown in yellow in the console.
+
+ :param msg: message to show
+ :param args: arguments for message formatting (it will be done using format() method on str)
+ """
+ print(LogFormatter.WARNING + "[WARNING] " + msg.format(*args) + LogFormatter.DEFAULT)
+
+ @staticmethod
+ def error(msg, *args):
+ """
+ Prints errors to the log. It will be shown in red in the console.
+
+ :param msg: message to show
+ :param args: arguments for message formatting (it will be done using format() method on str)
+ """
+ print(LogFormatter.ERROR + "[ERROR] " + msg.format(*args) + LogFormatter.DEFAULT)
+
+ @staticmethod
+ def critical(msg, *args):
+ """
+ Prints critical errors to the log. It will be shown in red in the console.
+
+ :param msg: message to show
+ :param args: arguments for message formatting (it will be done using format() method on str)
+ """
+ print(LogFormatter.ERROR + "[CRITICAL] " + msg.format(*args) + LogFormatter.DEFAULT)
diff --git a/test/regression/tests_runner.py b/test/regression/tests_runner.py
index a4c5b077dadd08a84425788df29c5686cc67a367..4602192daa004cb5de178b625b9c76e081933a6f 100644
--- a/test/regression/tests_runner.py
+++ b/test/regression/tests_runner.py
@@ -1,11 +1,15 @@
-from autosubmit.config.config_common import AutosubmitConfig
-from autosubmit.config.parser_factory import ConfigParserFactory
-from autosubmit.config.log import Log
+from tests_log import Log
from tests_utils import check_cmd, next_experiment_id, copy_experiment_conf_files, create_database, clean_database
from tests_commands import *
from threading import Thread
from time import sleep
import argparse
+try:
+ # noinspection PyCompatibility
+ from configparser import SafeConfigParser
+except ImportError:
+ # noinspection PyCompatibility
+ from ConfigParser import SafeConfigParser
# Configuration file where the regression tests are defined with INI style
tests_parser_file = 'tests.conf'
@@ -45,7 +49,11 @@ def run_test_case(experiment_id, name, hpc_arch, description, src_path, retrials
def run(current_experiment_id, only_list=None, exclude_list=None, max_threads=5):
# Local variables for testing
test_threads = []
- tests_parser = AutosubmitConfig.get_parser(ConfigParserFactory(), tests_parser_file)
+
+ # Building tests parser
+ tests_parser = SafeConfigParser()
+ tests_parser.optionxform = str
+ tests_parser.read(tests_parser_file)
# Resetting the database
clean_database(db_path)
diff --git a/test/regression/tests_utils.py b/test/regression/tests_utils.py
index d76bd1209221aa3a7b9a38adcca1ad5a44ec1d17..297fb8f7523faeabb6f9683746317bec5fe514da 100644
--- a/test/regression/tests_utils.py
+++ b/test/regression/tests_utils.py
@@ -1,7 +1,7 @@
-from autosubmit.database.db_common import base36decode, base36encode
from tests_commands import *
import os
import subprocess
+import string
BIN_PATH = '../../bin'
@@ -70,3 +70,48 @@ def get_default_copy_cmd(db_path, filename, experiment_id):
def get_conf_file_path(base_path, filename):
return os.path.join(base_path, 'conf', filename + '.conf')
+
+
+def base36encode(number, alphabet=string.digits + string.ascii_lowercase):
+ """
+ Convert positive integer to a base36 string.
+
+ :param number: number to convert
+ :type number: int
+ :param alphabet: set of characters to use
+ :type alphabet: str
+ :return: number's base36 string value
+ :rtype: str
+ """
+ if not isinstance(number, (int, long)):
+ raise TypeError('number must be an integer')
+
+ # Special case for zero
+ if number == 0:
+ return '0'
+
+ base36 = ''
+
+ sign = ''
+ if number < 0:
+ sign = '-'
+ number = - number
+
+ while number > 0:
+ number, i = divmod(number, len(alphabet))
+ # noinspection PyAugmentAssignment
+ base36 = alphabet[i] + base36
+
+ return sign + base36.rjust(4, '0')
+
+
+def base36decode(number):
+ """
+ Converts a base36 string to a positive integer
+
+ :param number: base36 string to convert
+ :type number: str
+ :return: number's integer value
+ :rtype: int
+ """
+ return int(number, 36)
diff --git a/test/unit/test_autosubmit_ config.py b/test/unit/test_autosubmit_ config.py
index 18c4113a10a7bd580063e1b770ebff33a1ca7771..30f75bad2c6db73cd9524f896de5666f50774bac 100644
--- a/test/unit/test_autosubmit_ config.py
+++ b/test/unit/test_autosubmit_ config.py
@@ -619,8 +619,8 @@ class TestAutosubmitConfig(TestCase):
config.reload()
# act
- should_be_true = config._check_autosubmit_conf()
- should_be_false = config._check_autosubmit_conf()
+ should_be_true = config.check_autosubmit_conf()
+ should_be_false = config.check_autosubmit_conf()
# arrange
self.assertTrue(should_be_true)
@@ -643,8 +643,8 @@ class TestAutosubmitConfig(TestCase):
config.reload()
# act
- should_be_true = config._check_expdef_conf()
- should_be_false = config._check_expdef_conf()
+ should_be_true = config.check_expdef_conf()
+ should_be_false = config.check_expdef_conf()
# assert
self.assertTrue(should_be_true)
@@ -670,7 +670,7 @@ class TestAutosubmitConfig(TestCase):
config.reload()
# act
- should_be_true = config._check_jobs_conf()
+ should_be_true = config.check_jobs_conf()
# assert
self.assertTrue(should_be_true)
@@ -692,7 +692,7 @@ class TestAutosubmitConfig(TestCase):
config.reload()
# act
- should_be_true = config._check_platforms_conf()
+ should_be_true = config.check_platforms_conf()
# assert
self.assertTrue(should_be_true)
@@ -703,17 +703,17 @@ class TestAutosubmitConfig(TestCase):
config = AutosubmitConfig(self.any_expid, FakeBasicConfig, ConfigParserFactory())
config.reload()
- config._check_autosubmit_conf = truth_mock
- config._check_platforms_conf = truth_mock
- config._check_jobs_conf = truth_mock
- config._check_expdef_conf = truth_mock
+ config.check_autosubmit_conf = truth_mock
+ config.check_platforms_conf = truth_mock
+ config.check_jobs_conf = truth_mock
+ config.check_expdef_conf = truth_mock
config2 = AutosubmitConfig(self.any_expid, FakeBasicConfig, ConfigParserFactory())
config2.reload()
- config2._check_autosubmit_conf = truth_mock
- config2._check_platforms_conf = truth_mock
- config2._check_jobs_conf = truth_mock
- config2._check_expdef_conf = Mock(return_value=False)
+ config2.check_autosubmit_conf = truth_mock
+ config2.check_platforms_conf = truth_mock
+ config2.check_jobs_conf = truth_mock
+ config2.check_expdef_conf = Mock(return_value=False)
# act
should_be_true = config.check_conf_files()
diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py
index 6e7b0c6ea83c04369e48bb3fdc7f204658fca058..a9b749c2c8128e615486fb01f73c468c3bbadc77 100644
--- a/test/unit/test_dic_jobs.py
+++ b/test/unit/test_dic_jobs.py
@@ -116,163 +116,164 @@ class TestDicJobs(TestCase):
def test_dic_creates_right_jobs_by_startdate(self):
# arrange
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
frequency = 1
- created_job = 'created_job'
- self.dictionary.build_job = Mock(return_value=created_job)
+ self.dictionary.build_job = Mock(return_value=mock_section)
# act
- self.dictionary._create_jobs_startdate(section, priority, frequency, Type.BASH)
+ self.dictionary._create_jobs_startdate(mock_section.name, priority, frequency, Type.BASH)
# assert
self.assertEquals(len(self.date_list), self.dictionary.build_job.call_count)
- self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list))
+ self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list))
for date in self.date_list:
- self.assertEquals(self.dictionary._dic[section][date], created_job)
+ self.assertEquals(self.dictionary._dic[mock_section.name][date], mock_section)
def test_dic_creates_right_jobs_by_member(self):
# arrange
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
frequency = 1
- created_job = 'created_job'
- self.dictionary.build_job = Mock(return_value=created_job)
+ self.dictionary.build_job = Mock(return_value=mock_section)
# act
- self.dictionary._create_jobs_member(section, priority, frequency, Type.BASH)
+ self.dictionary._create_jobs_member(mock_section.name, priority, frequency, Type.BASH)
# assert
self.assertEquals(len(self.date_list) * len(self.member_list), self.dictionary.build_job.call_count)
- self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list))
+ self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list))
for date in self.date_list:
for member in self.member_list:
- self.assertEquals(self.dictionary._dic[section][date][member], created_job)
+ self.assertEquals(self.dictionary._dic[mock_section.name][date][member], mock_section)
def test_dic_creates_right_jobs_by_chunk(self):
# arrange
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
frequency = 1
- created_job = 'created_job'
- self.dictionary.build_job = Mock(return_value=created_job)
+ self.dictionary.build_job = Mock(return_value=mock_section)
# act
- self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, dict())
+ self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, dict())
# assert
self.assertEquals(len(self.date_list) * len(self.member_list) * len(self.chunk_list),
self.dictionary.build_job.call_count)
- self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list))
+ self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list))
for date in self.date_list:
for member in self.member_list:
for chunk in self.chunk_list:
- self.assertEquals(self.dictionary._dic[section][date][member][chunk], created_job)
+ self.assertEquals(self.dictionary._dic[mock_section.name][date][member][chunk], mock_section)
def test_dic_creates_right_jobs_by_chunk_with_frequency_3(self):
# arrange
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
frequency = 3
- created_job = 'created_job'
- self.dictionary.build_job = Mock(return_value=created_job)
+ self.dictionary.build_job = Mock(return_value=mock_section)
# act
- self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH)
+ self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH)
# assert
self.assertEquals(len(self.date_list) * len(self.member_list) * (len(self.chunk_list) / frequency),
self.dictionary.build_job.call_count)
- self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list))
+ self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list))
def test_dic_creates_right_jobs_by_chunk_with_frequency_4(self):
# arrange
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
frequency = 4
- created_job = 'created_job'
- self.dictionary.build_job = Mock(return_value=created_job)
+ self.dictionary.build_job = Mock(return_value=mock_section)
# act
- self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH)
+ self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH)
# assert
# you have to multiply to the round upwards (ceil) of the next division
self.assertEquals(
len(self.date_list) * len(self.member_list) * math.ceil(len(self.chunk_list) / float(frequency)),
self.dictionary.build_job.call_count)
- self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list))
+ self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list))
def test_dic_creates_right_jobs_by_chunk_with_date_synchronize(self):
# arrange
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
frequency = 1
created_job = 'created_job'
- self.dictionary.build_job = Mock(return_value=created_job)
+ self.dictionary.build_job = Mock(return_value=mock_section)
# act
- self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, 'date')
+ self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, 'date')
# assert
self.assertEquals(len(self.chunk_list),
self.dictionary.build_job.call_count)
- self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list))
+ self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list))
for date in self.date_list:
for member in self.member_list:
for chunk in self.chunk_list:
- self.assertEquals(self.dictionary._dic[section][date][member][chunk], created_job)
+ self.assertEquals(self.dictionary._dic[mock_section.name][date][member][chunk], mock_section)
def test_dic_creates_right_jobs_by_chunk_with_date_synchronize_and_frequency_4(self):
# arrange
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
frequency = 4
- created_job = 'created_job'
- self.dictionary.build_job = Mock(return_value=created_job)
+ self.dictionary.build_job = Mock(return_value=mock_section)
# act
- self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, 'date')
+ self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, 'date')
# assert
self.assertEquals(math.ceil(len(self.chunk_list) / float(frequency)),
self.dictionary.build_job.call_count)
- self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list))
+ self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list))
def test_dic_creates_right_jobs_by_chunk_with_member_synchronize(self):
# arrange
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
frequency = 1
- created_job = 'created_job'
- self.dictionary.build_job = Mock(return_value=created_job)
+ self.dictionary.build_job = Mock(return_value=mock_section)
# act
- self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, 'member')
+ self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, 'member')
# assert
self.assertEquals(len(self.date_list) * len(self.chunk_list),
self.dictionary.build_job.call_count)
- self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list))
+ self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list))
for date in self.date_list:
for member in self.member_list:
for chunk in self.chunk_list:
- self.assertEquals(self.dictionary._dic[section][date][member][chunk], created_job)
+ self.assertEquals(self.dictionary._dic[mock_section.name][date][member][chunk], mock_section)
def test_dic_creates_right_jobs_by_chunk_with_member_synchronize_and_frequency_4(self):
# arrange
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
frequency = 4
- created_job = 'created_job'
- self.dictionary.build_job = Mock(return_value=created_job)
+ self.dictionary.build_job = Mock(return_value=mock_section)
# act
- self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, 'member')
+ self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, 'member')
# assert
self.assertEquals(len(self.date_list) * math.ceil(len(self.chunk_list) / float(frequency)),
self.dictionary.build_job.call_count)
- self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list))
+ self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list))
def test_create_job_creates_a_job_with_right_parameters(self):
# arrange
@@ -496,14 +497,17 @@ class TestDicJobs(TestCase):
self.dictionary._get_date.assert_any_call(list(), dic, date, member, chunk)
def test_create_jobs_once_calls_create_job_and_assign_correctly_its_return_value(self):
- section = 'fake-section'
+ mock_section = Mock()
+ mock_section.name = 'fake-section'
priority = 999
- self.dictionary.build_job = Mock(return_value='fake-return')
+ self.dictionary.build_job = Mock(return_value=mock_section)
+ self.job_list.graph.add_node = Mock()
- self.dictionary._create_jobs_once(section, priority, Type.BASH, dict())
+ self.dictionary._create_jobs_once(mock_section.name, priority, Type.BASH, dict())
- self.assertEquals('fake-return', self.dictionary._dic[section])
- self.dictionary.build_job.assert_called_once_with(section, priority, None, None, None, Type.BASH, {})
+ self.assertEquals(mock_section, self.dictionary._dic[mock_section.name])
+ self.dictionary.build_job.assert_called_once_with(mock_section.name, priority, None, None, None, Type.BASH, {})
+ self.job_list.graph.add_node.assert_called_once_with(mock_section.name)
class FakeBasicConfig:
diff --git a/test/unit/test_expid.py b/test/unit/test_expid.py
new file mode 100644
index 0000000000000000000000000000000000000000..85e5a012bd1777876f50bec6c233cb13f8f83cba
--- /dev/null
+++ b/test/unit/test_expid.py
@@ -0,0 +1,56 @@
+from unittest import TestCase
+from mock import Mock, patch
+from autosubmit.experiment.experiment_common import new_experiment, next_experiment_id
+
+
+class TestExpid(TestCase):
+ def setUp(self):
+ self.description = "for testing"
+ self.version = "test-version"
+
+ @patch('autosubmit.experiment.experiment_common.db_common')
+ def test_create_new_experiment(self, db_common_mock):
+ current_experiment_id = "empty"
+ self._build_db_mock(current_experiment_id, db_common_mock)
+ experiment_id = new_experiment(self.description, self.version)
+ self.assertEquals("a000", experiment_id)
+
+ @patch('autosubmit.experiment.experiment_common.db_common')
+ def test_create_new_test_experiment(self, db_common_mock):
+ current_experiment_id = "empty"
+ self._build_db_mock(current_experiment_id, db_common_mock)
+ experiment_id = new_experiment(self.description, self.version, True)
+ self.assertEquals("t000", experiment_id)
+
+ @patch('autosubmit.experiment.experiment_common.db_common')
+ def test_create_new_operational_experiment(self, db_common_mock):
+ current_experiment_id = "empty"
+ self._build_db_mock(current_experiment_id, db_common_mock)
+ experiment_id = new_experiment(self.description, self.version, False, True)
+ self.assertEquals("o000", experiment_id)
+
+ @patch('autosubmit.experiment.experiment_common.db_common')
+ def test_create_new_experiment_with_previous_one(self, db_common_mock):
+ current_experiment_id = "a006"
+ self._build_db_mock(current_experiment_id, db_common_mock)
+ experiment_id = new_experiment(self.description, self.version)
+ self.assertEquals("a007", experiment_id)
+
+ @patch('autosubmit.experiment.experiment_common.db_common')
+ def test_create_new_test_experiment_with_previous_one(self, db_common_mock):
+ current_experiment_id = "t0ab"
+ self._build_db_mock(current_experiment_id, db_common_mock)
+ experiment_id = new_experiment(self.description, self.version, True)
+ self.assertEquals("t0ac", experiment_id)
+
+ @patch('autosubmit.experiment.experiment_common.db_common')
+ def test_create_new_operational_experiment_with_previous_one(self, db_common_mock):
+ current_experiment_id = "o112"
+ self._build_db_mock(current_experiment_id, db_common_mock)
+ experiment_id = new_experiment(self.description, self.version, False, True)
+ self.assertEquals("o113", experiment_id)
+
+ @staticmethod
+ def _build_db_mock(current_experiment_id, mock_db_common):
+ mock_db_common.last_name_used = Mock(return_value=current_experiment_id)
+ mock_db_common.check_experiment_exists = Mock(return_value=False)
diff --git a/test/unit/test_job_list.py b/test/unit/test_job_list.py
index 29039ce6104ead1fbbb0530a86d0ad531a6d1bc8..209734ae4eeb980603a112189f04f69dbb5d750a 100644
--- a/test/unit/test_job_list.py
+++ b/test/unit/test_job_list.py
@@ -209,14 +209,14 @@ class TestJobList(TestCase):
job_list.update_genealogy = Mock()
job_list._job_list = [Job('random-name', 9999, Status.WAITING, 0),
Job('random-name2', 99999, Status.WAITING, 0)]
-
date_list = ['fake-date1', 'fake-date2']
member_list = ['fake-member1', 'fake-member2']
num_chunks = 999
chunk_list = range(1, num_chunks + 1)
parameters = {'fake-key': 'fake-value',
'fake-key2': 'fake-value2'}
-
+ graph_mock = Mock()
+ job_list.graph = graph_mock
# act
job_list.generate(date_list, member_list, num_chunks, parameters, 'H', 9999, Type.BASH)
@@ -230,7 +230,8 @@ class TestJobList(TestCase):
cj_args, cj_kwargs = job_list._create_jobs.call_args
self.assertEquals(parser_mock, cj_args[1])
self.assertEquals(0, cj_args[2])
- job_list._add_dependencies.assert_called_once_with(date_list, member_list, chunk_list, cj_args[0], parser_mock)
+ job_list._add_dependencies.assert_called_once_with(date_list, member_list, chunk_list, cj_args[0], parser_mock,
+ graph_mock)
job_list.update_genealogy.assert_called_once_with(True)
for job in job_list._job_list:
self.assertEquals(parameters, job.parameters)