From bdcc6b06e4089d9803c41655ba6973038ec13601 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Mon, 3 Oct 2016 16:34:21 +0200 Subject: [PATCH 01/30] Big refactor of JobList --- autosubmit/job/job_list.py | 211 ++++++++++++++++++------------------- 1 file changed, 102 insertions(+), 109 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index f54022b0d..4bee7d18b 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -98,11 +98,9 @@ class JobList: chunk_list = range(1, num_chunks + 1) self._chunk_list = chunk_list - parser = self._parser_factory.create_parser() - parser.optionxform = str - parser.read(os.path.join(self._config.LOCAL_ROOT_DIR, self._expid, 'conf', "jobs_" + self._expid + ".conf")) + jobs_parser = self._get_jobs_parser() - dic_jobs = DicJobs(self, parser, date_list, member_list, chunk_list, date_format, default_retrials) + dic_jobs = DicJobs(self, jobs_parser, date_list, member_list, chunk_list, date_format, default_retrials) self._dic_jobs = dic_jobs priority = 0 @@ -110,10 +108,10 @@ class JobList: jobs_data = dict() if not new: jobs_data = {str(row[0]): row for row in self.load()} - self._create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data) + self._create_jobs(dic_jobs, jobs_parser, priority, default_job_type, jobs_data) Log.info("Adding dependencies...") - self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, parser) + self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser) Log.info("Removing redundant dependencies...") self.update_genealogy(new) @@ -121,61 +119,71 @@ class JobList: job.parameters = parameters @staticmethod - def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, parser): - for section in parser.sections(): - Log.debug("Adding dependencies for {0} jobs".format(section)) - if not parser.has_option(section, "DEPENDENCIES"): + def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, option="DEPENDENCIES"): + for job_section in jobs_parser.sections(): + Log.debug("Adding dependencies for {0} jobs".format(job_section)) + + # If does not has dependencies, do nothing + if not jobs_parser.has_option(job_section, option): continue - dependencies = parser.get(section, "DEPENDENCIES").split() - dep_section = dict() - dep_distance = dict() - dep_running = dict() - for dependency in dependencies: - JobList._treat_dependency(dep_distance, dep_running, dep_section, dependency, dic_jobs) + dependencies_keys = jobs_parser.get(job_section, option).split() + dependencies = JobList._manage_dependencies(dependencies_keys, dic_jobs) - for job in dic_jobs.get_jobs(section): - JobList._treat_job(dic_jobs, job, date_list, member_list, chunk_list, dependencies, dep_distance, - dep_running, dep_section) + for job in dic_jobs.get_jobs(job_section): + JobList._manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, + dependencies) @staticmethod - def _treat_job(dic_jobs, job, date_list, member_list, chunk_list, dependencies, dep_distance, dep_running, - dep_section): - for dependency in dependencies: - chunk = job.chunk - member = job.member - date = job.date - - section_name = dep_section[dependency] - - # Case dependency with previous execution of same job - if '-' in dependency: - distance = dep_distance[dependency] - if chunk is not None and dep_running[dependency] == 'chunk': - chunk_index = chunk_list.index(chunk) - if chunk_index >= distance: - chunk = chunk_list[chunk_index - distance] - else: - continue - elif member is not None and dep_running[dependency] in ['chunk', 'member']: - member_index = member_list.index(member) - if member_index >= distance: - member = member_list[member_index - distance] - else: - continue - elif date is not None and dep_running[dependency] in ['chunk', 'member', 'startdate']: - date_index = date_list.index(date) - if date_index >= distance: - date = date_list[date_index - distance] - else: - continue + def _manage_dependencies(dependencies_keys, dic_jobs): + dependencies = dict() + for key in dependencies_keys: + if '-' in key: + key_split = key.split('-') + dependency_running_type = dic_jobs.get_option(key_split[0], 'RUNNING', 'once').lower() + dependency = Dependency(key_split[0], int(key_split[1]), dependency_running_type) + dependencies[key] = dependency + else: + dependencies[key] = Dependency(key) + return dependencies - # Adding the dependencies (parents) calculated above - for parent in dic_jobs.get_jobs(section_name, date, member, chunk): + @staticmethod + def _manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies): + for key in dependencies_keys: + skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, job.member, job.date, + dependencies[key]) + if skip: + continue + + for parent in dic_jobs.get_jobs(dependency.section, date, member, chunk): job.add_parent(parent) JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, - member_list, section_name) + member_list, dependency.section) + + @staticmethod + def _calculate_dependency_metadata(chunk, member, date, dependency): + skip = False + if '-' in key: + if chunk is not None and dependency.running == 'chunk': + chunk_index = chunk_list.index(chunk) + if chunk_index >= dependency.distance: + chunk = chunk_list[chunk_index - dependency.distance] + else: + skip = True + elif member is not None and dependency.running in ['chunk', 'member']: + member_index = member_list.index(member) + if member_index >= dependency.distance: + member = member_list[member_index - dependency.distance] + else: + skip = True + elif date is not None and dependency.running in ['chunk', 'member', 'startdate']: + date_index = date_list.index(date) + if date_index >= dependency.distance: + date = date_list[date_index - dependency.distance] + else: + skip = True + return skip, (chunk, member, date) @staticmethod def handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, @@ -207,16 +215,6 @@ class JobList: member, chunk): job.add_parent(parent) - @staticmethod - def _treat_dependency(dep_distance, dep_running, dep_section, dependency, dic_jobs): - if '-' in dependency: - dependency_split = dependency.split('-') - dep_section[dependency] = dependency_split[0] - dep_distance[dependency] = int(dependency_split[1]) - dep_running[dependency] = dic_jobs.get_option(dependency_split[0], 'RUNNING', 'once').lower() - else: - dep_section[dependency] = dependency - @staticmethod def _create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data=dict()): for section in parser.sections(): @@ -543,7 +541,7 @@ class JobList: Log.debug('Update finished') return save - def update_genealogy(self, new): + def update_genealogy(self, new=True): """ When we have created the job list, every type of job is created. Update genealogy remove jobs that have no templates @@ -614,26 +612,18 @@ class JobList: :param chunk_list: list of chunks to rerun :type chunk_list: str """ - parser = self._parser_factory.create_parser() - parser.optionxform = str - parser.read(os.path.join(self._config.LOCAL_ROOT_DIR, self._expid, 'conf', "jobs_" + self._expid + ".conf")) + jobs_parser = self._get_jobs_parser() Log.info("Adding dependencies...") - dep_section = dict() - dep_distance = dict() - dependencies = dict() - dep_running = dict() - for section in parser.sections(): - Log.debug("Reading rerun dependencies for {0} jobs".format(section)) - if not parser.has_option(section, "RERUN_DEPENDENCIES"): + for job_section in jobs_parser.sections(): + Log.debug("Reading rerun dependencies for {0} jobs".format(job_section)) + + # If does not has rerun dependencies, do nothing + if not jobs_parser.has_option(job_section, "RERUN_DEPENDENCIES"): continue - dependencies[section] = parser.get(section, "RERUN_DEPENDENCIES").split() - dep_section[section] = dict() - dep_distance[section] = dict() - dep_running[section] = dict() - for dependency in dependencies[section]: - JobList._treat_dependency(dep_distance[section], dep_running[section], dep_section[section], dependency, - self._dic_jobs) + + dependencies_keys = jobs_parser.get(job_section, "RERUN_DEPENDENCIES").split() + dependencies = JobList._manage_dependencies(dependencies_keys, self._dic_jobs) for job in self._job_list: job.status = Status.COMPLETED @@ -651,38 +641,22 @@ class JobList: chunk = int(c) for job in [i for i in self._job_list if i.date == date and i.member == member and i.chunk == chunk]: + if not job.rerun_only or chunk != previous_chunk + 1: job.status = Status.WAITING Log.debug("Job: " + job.name) - section = job.section - if section not in dependencies: + + job_section = job.section + if job_section not in dependencies: continue - for dependency in dependencies[section]: - current_chunk = chunk - current_member = member - current_date = date - if '-' in dependency: - distance = dep_distance[section][dependency] - running = dep_running[section][dependency] - if current_chunk is not None and running == 'chunk': - chunk_index = self._chunk_list.index(current_chunk) - if chunk_index >= distance: - current_chunk = self._chunk_list[chunk_index - distance] - else: - continue - elif current_member is not None and running in ['chunk', 'member']: - member_index = self._member_list.index(current_member) - if member_index >= distance: - current_member = self._member_list[member_index - distance] - else: - continue - elif current_date is not None and running in ['chunk', 'member', 'startdate']: - date_index = self._date_list.index(current_date) - if date_index >= distance: - current_date = self._date_list[date_index - distance] - else: - continue - section_name = dep_section[section][dependency] + + for key in dependencies_keys: + skip, (chunk, member, date) = JobList._calculate_dependency_metadata(chunk, member, date, + dependencies[key]) + if skip: + continue + + section_name = dependencies[key].section for parent in self._dic_jobs.get_jobs(section_name, current_date, current_member, current_chunk): parent.status = Status.WAITING @@ -696,9 +670,16 @@ class JobList: self.update_genealogy() + def _get_jobs_parser(self): + jobs_parser = self._parser_factory.create_parser() + jobs_parser.optionxform = str + jobs_parser.read( + os.path.join(self._config.LOCAL_ROOT_DIR, self._expid, 'conf', "jobs_" + self._expid + ".conf")) + return jobs_parser + def remove_rerun_only_jobs(self): """ - Removes all jobs to be runned only in reruns + Removes all jobs to be run only in reruns """ flag = False for job in set(self._job_list): @@ -1001,3 +982,15 @@ class DicJobs: return self._parser.get(section, option) else: return default + + +class Dependency(object): + """ + Class to manage the metadata related with a dependency + + """ + + def __init__(self, section, distance=None, running=None): + self.section = section + self.distance = distance + self.running = running -- GitLab From afb3fb110b192f64aec9ae3b951fad3d727ddb84 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Tue, 4 Oct 2016 12:08:24 +0200 Subject: [PATCH 02/30] Configuration checks improved to allow forward (+) dependencies --- autosubmit/config/config_common.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index da5f52ba1..82bc2ccbe 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -303,6 +303,8 @@ class AutosubmitConfig: for dependency in str(AutosubmitConfig.get_option(parser, section, 'DEPENDENCIES', '')).split(' '): if '-' in dependency: dependency = dependency.split('-')[0] + if '+' in dependency: + dependency = dependency.split('+')[0] if dependency not in sections: Log.error('Job {0} depends on job {1} that is not defined'.format(section, dependency)) -- GitLab From 3cecf18863d0b42efe9833b676e5353e55bacc3f Mon Sep 17 00:00:00 2001 From: jlope2 Date: Tue, 4 Oct 2016 12:09:01 +0200 Subject: [PATCH 03/30] Minor typo --- autosubmit/config/config_common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 82bc2ccbe..3af358b79 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -303,7 +303,7 @@ class AutosubmitConfig: for dependency in str(AutosubmitConfig.get_option(parser, section, 'DEPENDENCIES', '')).split(' '): if '-' in dependency: dependency = dependency.split('-')[0] - if '+' in dependency: + elif '+' in dependency: dependency = dependency.split('+')[0] if dependency not in sections: Log.error('Job {0} depends on job {1} that is not defined'.format(section, dependency)) -- GitLab From 429959b622ce55cf9301808f78019dddea10a2cc Mon Sep 17 00:00:00 2001 From: jlope2 Date: Tue, 4 Oct 2016 13:05:34 +0200 Subject: [PATCH 04/30] Working to allow forward (+) dependencies --- autosubmit/job/job_list.py | 45 +++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 4bee7d18b..fc12cf1e2 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -138,20 +138,24 @@ class JobList: def _manage_dependencies(dependencies_keys, dic_jobs): dependencies = dict() for key in dependencies_keys: - if '-' in key: - key_split = key.split('-') + if '-' not in key and '+' not in key: + dependencies[key] = Dependency(key) + else: + sign = '-' if '-' in key else '+' + key_split = key.split(sign) dependency_running_type = dic_jobs.get_option(key_split[0], 'RUNNING', 'once').lower() - dependency = Dependency(key_split[0], int(key_split[1]), dependency_running_type) + dependency = Dependency(key_split[0], int(key_split[1]), dependency_running_type, sign) dependencies[key] = dependency - else: - dependencies[key] = Dependency(key) return dependencies @staticmethod def _manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies): for key in dependencies_keys: - skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, job.member, job.date, - dependencies[key]) + dependency = dependencies[key] + skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, chunk_list, + job.member, member_list, + job.date, date_list, + dependency) if skip: continue @@ -162,9 +166,9 @@ class JobList: member_list, dependency.section) @staticmethod - def _calculate_dependency_metadata(chunk, member, date, dependency): + def _calculate_dependency_metadata(chunk, chunk_list, member, member_list, date, date_list, dependency): skip = False - if '-' in key: + if dependency.sign is '-': if chunk is not None and dependency.running == 'chunk': chunk_index = chunk_list.index(chunk) if chunk_index >= dependency.distance: @@ -183,6 +187,26 @@ class JobList: date = date_list[date_index - dependency.distance] else: skip = True + + if dependency.sign is '+': + if chunk is not None and dependency.running == 'chunk': + chunk_index = chunk_list.index(chunk) + if (chunk_index + dependency.distance) < len(chunk_list): + chunk = chunk_list[chunk_index + dependency.distance] + else: + skip = True + elif member is not None and dependency.running in ['chunk', 'member']: + member_index = member_list.index(member) + if (member_index + dependency.distance) < len(member_list): + member = member_list[member_index + dependency.distance] + else: + skip = True + elif date is not None and dependency.running in ['chunk', 'member', 'startdate']: + date_index = date_list.index(date) + if (date_index + dependency.distance) < len(date_list): + date = date_list[date_index - dependency.distance] + else: + skip = True return skip, (chunk, member, date) @staticmethod @@ -990,7 +1014,8 @@ class Dependency(object): """ - def __init__(self, section, distance=None, running=None): + def __init__(self, section, distance=None, running=None, sign=None): self.section = section self.distance = distance self.running = running + self.sign = sign -- GitLab From 12b71ba896aa7104fc16b49435ab15fc7a099103 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Tue, 4 Oct 2016 15:01:14 +0200 Subject: [PATCH 05/30] Handling "extra" forward (+) dependencies --- autosubmit/job/job_list.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index fc12cf1e2..c26e155ff 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -193,8 +193,14 @@ class JobList: chunk_index = chunk_list.index(chunk) if (chunk_index + dependency.distance) < len(chunk_list): chunk = chunk_list[chunk_index + dependency.distance] - else: - skip = True + else: # calculating the next one possible + temp_distance = dependency.distance + while temp_distance > 0: + temp_distance -= 1 + if (chunk_index + temp_distance) < len(chunk_list): + chunk = chunk_list[chunk_index + temp_distance] + break + elif member is not None and dependency.running in ['chunk', 'member']: member_index = member_list.index(member) if (member_index + dependency.distance) < len(member_list): -- GitLab From e689a77a81f83cc8a5b21b5ee705bc51b4627cab Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 10:21:35 +0200 Subject: [PATCH 06/30] Operational experiment ids are available now. Fixes #202 --- autosubmit/autosubmit.py | 10 ++++++---- autosubmit/database/db_common.py | 19 +++++++++++++++++-- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 4fd402c1c..aa3a46300 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -148,7 +148,8 @@ class Autosubmit: group.add_argument('-y', '--copy', help='makes a copy of the specified experiment') group.add_argument('-dm', '--dummy', action='store_true', help='creates a new experiment with default values, usually for testing') - + group.add_argument('-op', '--operational', action='store_true', + help='creates a new experiment with operational experiment id') subparser.add_argument('-H', '--HPC', required=True, help='specifies the HPC to use for the experiment') subparser.add_argument('-d', '--description', type=str, required=True, @@ -322,7 +323,8 @@ class Autosubmit: if args.command == 'run': return Autosubmit.run_experiment(args.expid) elif args.command == 'expid': - return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy) != '' + return Autosubmit.expid(args.HPC, args.description, args.copy, args.dummy, False, + args.operational) != '' elif args.command == 'delete': return Autosubmit.delete(args.expid, args.force) elif args.command == 'monitor': @@ -404,7 +406,7 @@ class Autosubmit: return ret @staticmethod - def expid(hpc, description, copy_id='', dummy=False, test=False): + def expid(hpc, description, copy_id='', dummy=False, test=False, operational=False): """ Creates a new experiment for given HPC @@ -435,7 +437,7 @@ class Autosubmit: Log.error("Missing HPC.") return '' if not copy_id: - exp_id = new_experiment(description, Autosubmit.autosubmit_version, test) + exp_id = new_experiment(description, Autosubmit.autosubmit_version, test, operational) if exp_id == '': return '' try: diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index 8fbcff052..7815f37cd 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -156,7 +156,7 @@ def get_autosubmit_version(expid): return row[0] -def new_experiment(description, version, test=False): +def new_experiment(description, version, test=False, operational=False): """ Stores a new experiment on the database and generates its identifier @@ -164,6 +164,8 @@ def new_experiment(description, version, test=False): :type version: str :param test: flag for test experiments :type test: bool + :param operational: flag for operational experiments + :type operational: bool :param description: experiment's description :type description: str :return: experiment id for the new experiment @@ -171,6 +173,8 @@ def new_experiment(description, version, test=False): """ if test: last_exp_name = last_name_used(True) + elif operational: + last_exp_name = last_name_used(False, True) else: last_exp_name = last_name_used() if last_exp_name == '': @@ -179,6 +183,9 @@ def new_experiment(description, version, test=False): if test: # test identifier restricted also to 4 characters. new_name = 't000' + elif operational: + # operational identifier restricted also to 4 characters. + new_name = 'o000' else: new_name = 'a000' else: @@ -277,12 +284,14 @@ def _next_name(name): return base36encode(base36decode(name) + 1) -def last_name_used(test=False): +def last_name_used(test=False, operational=False): """ Gets last experiment identifier used :param test: flag for test experiments :type test: bool + :param operational: flag for operational experiments + :type test: bool :return: last experiment identifier used, 'empty' if there is none :rtype: str """ @@ -300,6 +309,12 @@ def last_name_used(test=False): 'WHERE rowid=(SELECT max(rowid) FROM experiment WHERE name LIKE "t%" AND ' 'autosubmit_version IS NOT NULL AND ' 'NOT (autosubmit_version LIKE "%3.0.0b%"))') + elif operational: + cursor.execute('SELECT name ' + 'FROM experiment ' + 'WHERE rowid=(SELECT max(rowid) FROM experiment WHERE name LIKE "o%" AND ' + 'autosubmit_version IS NOT NULL AND ' + 'NOT (autosubmit_version LIKE "%3.0.0b%"))') else: cursor.execute('SELECT name ' 'FROM experiment ' -- GitLab From fadd8cd6c3807b0a52280752a7604983f2254572 Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 11:14:26 +0200 Subject: [PATCH 07/30] Fixed SQL query to retrieve the last expid used. Fixes #202 --- autosubmit/database/db_common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index 7815f37cd..8273fbe80 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -319,7 +319,7 @@ def last_name_used(test=False, operational=False): cursor.execute('SELECT name ' 'FROM experiment ' 'WHERE rowid=(SELECT max(rowid) FROM experiment WHERE name NOT LIKE "t%" AND ' - 'autosubmit_version IS NOT NULL AND ' + 'name NOT LIKE "o%" AND autosubmit_version IS NOT NULL AND ' 'NOT (autosubmit_version LIKE "%3.0.0b%"))') row = cursor.fetchone() close_conn(conn, cursor) -- GitLab From 79d87ec8370e376000b0ec4f679c5c95efbf47e6 Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 12:58:17 +0200 Subject: [PATCH 08/30] Tests on expid --- autosubmit/autosubmit.py | 2 ++ test/unit/test_expid.py | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+) create mode 100644 test/unit/test_expid.py diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index aa3a46300..d47039dc9 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -410,6 +410,8 @@ class Autosubmit: """ Creates a new experiment for given HPC + :param operational: if true, creates an operational experiment + :type operational: bool :type hpc: str :type description: str :type copy_id: str diff --git a/test/unit/test_expid.py b/test/unit/test_expid.py new file mode 100644 index 000000000..8f135183c --- /dev/null +++ b/test/unit/test_expid.py @@ -0,0 +1,21 @@ +from unittest import TestCase +from mock import Mock +from autosubmit.autosubmit import new_experiment +import autosubmit.database.db_common as db_common + + +class TestExpid(TestCase): + def setUp(self): + self.description = "for testing" + self.version = "test-version" + + def test_create_new_experiment(self): + # arrange + current_experiment_id = "a006" + db_common.last_name_used = Mock(return_value=current_experiment_id) + db_common.check_experiment_exists = Mock(return_value=False) + db_common._set_experiment = Mock(return_value=True) + # act + experiment_id = new_experiment(self.description, self.version) + # assert + self.assertEquals("a007", experiment_id) \ No newline at end of file -- GitLab From af7e41bf3459684f7d51d28faef5bc18c36c9d02 Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 15:09:20 +0200 Subject: [PATCH 09/30] Big refactor on the experiments management. Some tests were added. --- autosubmit/autosubmit.py | 6 +- autosubmit/database/db_common.py | 319 ++++++--------------- autosubmit/experiment/__init__.py | 0 autosubmit/experiment/experiment_common.py | 171 +++++++++++ test/unit/test_expid.py | 57 +++- 5 files changed, 304 insertions(+), 249 deletions(-) create mode 100644 autosubmit/experiment/__init__.py create mode 100644 autosubmit/experiment/experiment_common.py diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index d47039dc9..1dfc092ff 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -70,8 +70,8 @@ from job.job_list_persistence import JobListPersistencePkl # noinspection PyPackageRequirements from config.log import Log from database.db_common import create_db -from database.db_common import new_experiment -from database.db_common import copy_experiment +from experiment.experiment_common import new_experiment +from experiment.experiment_common import copy_experiment from database.db_common import delete_experiment from database.db_common import get_autosubmit_version from monitor.monitor import Monitor @@ -472,7 +472,7 @@ class Autosubmit: else: try: if os.path.exists(os.path.join(BasicConfig.LOCAL_ROOT_DIR, copy_id)): - exp_id = copy_experiment(copy_id, description, Autosubmit.autosubmit_version, test) + exp_id = copy_experiment(copy_id, description, Autosubmit.autosubmit_version, test, operational) if exp_id == '': return '' dir_exp_id = os.path.join(BasicConfig.LOCAL_ROOT_DIR, exp_id) diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index 8273fbe80..d58df1888 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -22,7 +22,6 @@ Module containing functions to manage autosubmit's database. """ import os import sqlite3 -import string from autosubmit.config.log import Log from autosubmit.config.basicConfig import BasicConfig @@ -55,10 +54,85 @@ def create_db(qry): return True -def _set_experiment(name, description, version): +def check_db(): + """ + Checks if database file exist + + :return: None if exists, terminates program if not + """ + + if not os.path.exists(BasicConfig.DB_PATH): + Log.error('Some problem has happened...check the database file.' + 'DB file:' + BasicConfig.DB_PATH) + return False + return True + + +def open_conn(check_version=True): + """ + Opens a connection to database + + :param check_version: If true, check if the database is compatible with this autosubmit version + :type check_version: bool + :return: connection object, cursor object + :rtype: sqlite3.Connection, sqlite3.Cursor + """ + conn = sqlite3.connect(BasicConfig.DB_PATH) + cursor = conn.cursor() + + # Getting database version + if check_version: + try: + cursor.execute('SELECT version ' + 'FROM db_version;') + row = cursor.fetchone() + version = row[0] + except sqlite3.OperationalError: + # If this exception is thrown it's because db_version does not exist. + # Database is from 2.x or 3.0 beta releases + try: + cursor.execute('SELECT type ' + 'FROM experiment;') + # If type field exists, it's from 2.x + version = -1 + except sqlite3.Error: + # If raises and error , it's from 3.0 beta releases + version = 0 + + # If database version is not the expected, update database.... + if version < CURRENT_DATABASE_VERSION: + if not _update_database(version, cursor): + raise DbException('Database version could not be updated') + + # ... or ask for autosubmit upgrade + elif version > CURRENT_DATABASE_VERSION: + Log.critical('Database version is not compatible with this autosubmit version. Please execute pip install ' + 'autosubmit --upgrade') + raise DbException('Database version not compatible') + + return conn, cursor + + +def close_conn(conn, cursor): + """ + Commits changes and close connection to database + + :param conn: connection to close + :type conn: sqlite3.Connection + :param cursor: cursor to close + :type cursor: sqlite3.Cursor + """ + conn.commit() + cursor.close() + conn.close() + return + + +def save_experiment(name, description, version): """ Stores experiment in database + :param version: + :type version: str :param name: experiment's name :type name: str :param description: experiment's description @@ -66,9 +140,6 @@ def _set_experiment(name, description, version): """ if not check_db(): return False - name = check_name(name) - if name == '': - return False try: (conn, cursor) = open_conn() except DbException as e: @@ -101,10 +172,6 @@ def check_experiment_exists(name, error_on_inexistence=True): """ if not check_db(): return False - name = check_name(name) - if name == '': - return False - try: (conn, cursor) = open_conn() except DbException as e: @@ -156,134 +223,6 @@ def get_autosubmit_version(expid): return row[0] -def new_experiment(description, version, test=False, operational=False): - """ - Stores a new experiment on the database and generates its identifier - - :param version: autosubmit version associated to the experiment - :type version: str - :param test: flag for test experiments - :type test: bool - :param operational: flag for operational experiments - :type operational: bool - :param description: experiment's description - :type description: str - :return: experiment id for the new experiment - :rtype: str - """ - if test: - last_exp_name = last_name_used(True) - elif operational: - last_exp_name = last_name_used(False, True) - else: - last_exp_name = last_name_used() - if last_exp_name == '': - return '' - if last_exp_name == 'empty': - if test: - # test identifier restricted also to 4 characters. - new_name = 't000' - elif operational: - # operational identifier restricted also to 4 characters. - new_name = 'o000' - else: - new_name = 'a000' - else: - new_name = _next_name(last_exp_name) - if new_name == '': - return '' - while check_experiment_exists(new_name, False): - new_name = _next_name(new_name) - if new_name == '': - return '' - if not _set_experiment(new_name, description, version): - return '' - Log.info('The new experiment "{0}" has been registered.', new_name) - return new_name - - -def copy_experiment(name, description, version, test=False): - """ - Creates a new experiment by copying an existing experiment - - :param test: specifies if it is a test experiment - :type test: bool - :param version: experiment's associated autosubmit version - :type version: str - :param name: identifier of experiment to copy - :type name: str - :param description: experiment's description - :type description: str - :return: experiment id for the new experiment - :rtype: str - """ - if not check_experiment_exists(name): - return '' - new_name = new_experiment(description, version, test) - return new_name - - -def base36encode(number, alphabet=string.digits + string.ascii_lowercase): - """ - Convert positive integer to a base36 string. - - :param number: number to convert - :type number: int - :param alphabet: set of characters to use - :type alphabet: str - :return: number's base36 string value - :rtype: str - """ - if not isinstance(number, (int, long)): - raise TypeError('number must be an integer') - - # Special case for zero - if number == 0: - return '0' - - base36 = '' - - sign = '' - if number < 0: - sign = '-' - number = - number - - while number > 0: - number, i = divmod(number, len(alphabet)) - # noinspection PyAugmentAssignment - base36 = alphabet[i] + base36 - - return sign + base36.rjust(4, '0') - - -def base36decode(number): - """ - Converts a base36 string to a positive integer - - :param number: base36 string to convert - :type number: str - :return: number's integer value - :rtype: int - """ - return int(number, 36) - - -def _next_name(name): - """ - Get next experiment identifier - - :param name: previous experiment identifier - :type name: str - :return: new experiment identifier - :rtype: str - """ - name = check_name(name) - if name == '': - return '' - # Convert the name to base 36 in number add 1 and then encode it - return base36encode(base36decode(name) + 1) - - def last_name_used(test=False, operational=False): """ Gets last experiment identifier used @@ -334,125 +273,34 @@ def last_name_used(test=False, operational=False): return row[0] -def delete_experiment(name): +def delete_experiment(experiment_id): """ Removes experiment from database - :param name: experiment identifier - :type name: str + :param experiment_id: experiment identifier + :type experiment_id: str :return: True if delete is succesful :rtype: bool """ if not check_db(): return False - name = check_name(name) - if name == '': - return False + if check_experiment_exists(experiment_id, False): + return True try: (conn, cursor) = open_conn() except DbException as e: Log.error('Connection to database could not be established: {0}', e.message) return False cursor.execute('DELETE FROM experiment ' - 'WHERE name=:name', {'name': name}) + 'WHERE name=:name', {'name': experiment_id}) row = cursor.fetchone() if row is None: - Log.debug('The experiment {0} has been deleted!!!', name) + Log.debug('The experiment {0} has been deleted!!!', experiment_id) close_conn(conn, cursor) return True -def check_name(name): - """ - Checks if it is a valid experiment identifier - - :param name: experiment identifier to check - :type name: str - :return: name if is valid, terminates program otherwise - :rtype: str - """ - name = name.lower() - if len(name) < 4 or not name.isalnum(): - Log.error("So sorry, but the name must have at least 4 alphanumeric chars!!!") - return '' - return name - - -def check_db(): - """ - Checks if database file exist - - :return: None if exists, terminates program if not - """ - - if not os.path.exists(BasicConfig.DB_PATH): - Log.error('Some problem has happened...check the database file.' + 'DB file:' + BasicConfig.DB_PATH) - return False - return True - - -def open_conn(check_version=True): - """ - Opens a connection to database - - :param check_version: If true, check if the database is compatible with this autosubmit version - :type check_version: bool - :return: connection object, cursor object - :rtype: sqlite3.Connection, sqlite3.Cursor - """ - conn = sqlite3.connect(BasicConfig.DB_PATH) - cursor = conn.cursor() - - # Getting database version - if check_version: - try: - cursor.execute('SELECT version ' - 'FROM db_version;') - row = cursor.fetchone() - version = row[0] - except sqlite3.OperationalError: - # If this exception is thrown it's because db_version does not exist. - # Database is from 2.x or 3.0 beta releases - try: - cursor.execute('SELECT type ' - 'FROM experiment;') - # If type field exists, it's from 2.x - version = -1 - except sqlite3.Error: - # If raises and error , it's from 3.0 beta releases - version = 0 - - # If database version is not the expected, update database.... - if version < CURRENT_DATABASE_VERSION: - if not _update_database(version, cursor): - raise DbException('Database version could not be updated') - - # ... or ask for autosubmit upgrade - elif version > CURRENT_DATABASE_VERSION: - Log.critical('Database version is not compatible with this autosubmit version. Please execute pip install ' - 'autosubmit --upgrade') - raise DbException('Database version not compatible') - - return conn, cursor - - -def close_conn(conn, cursor): - """ - Commits changes and close connection to database - - :param conn: connection to close - :type conn: sqlite3.Connection - :param cursor: cursor to close - :type cursor: sqlite3.Cursor - """ - conn.commit() - cursor.close() - conn.close() - return - - def _update_database(version, cursor): - Log.info("Autosubmit's database version is {0}. Current version is {1}. Updating...", version, CURRENT_DATABASE_VERSION) try: @@ -487,5 +335,6 @@ class DbException(Exception): """ Exception class for database errors """ + def __init__(self, message): self.message = message diff --git a/autosubmit/experiment/__init__.py b/autosubmit/experiment/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/autosubmit/experiment/experiment_common.py b/autosubmit/experiment/experiment_common.py new file mode 100644 index 000000000..38bc38a62 --- /dev/null +++ b/autosubmit/experiment/experiment_common.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python + +# Copyright 2015 Earth Sciences Department, BSC-CNS + +# This file is part of Autosubmit. + +# Autosubmit is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# Autosubmit is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . + +""" +Module containing functions to manage autosubmit's experiments. +""" +import string +import autosubmit.database.db_common as db_common +from autosubmit.config.log import Log + + +def new_experiment(description, version, test=False, operational=False): + """ + Stores a new experiment on the database and generates its identifier + + :param version: autosubmit version associated to the experiment + :type version: str + :param test: flag for test experiments + :type test: bool + :param operational: flag for operational experiments + :type operational: bool + :param description: experiment's description + :type description: str + :return: experiment id for the new experiment + :rtype: str + """ + if test: + last_exp_name = db_common.last_name_used(True) + elif operational: + last_exp_name = db_common.last_name_used(False, True) + else: + last_exp_name = db_common.last_name_used() + if last_exp_name == '': + return '' + if last_exp_name == 'empty': + if test: + # test identifier restricted also to 4 characters. + new_name = 't000' + elif operational: + # operational identifier restricted also to 4 characters. + new_name = 'o000' + else: + new_name = 'a000' + else: + new_name = next_experiment_id(last_exp_name) + if new_name == '': + return '' + while db_common.check_experiment_exists(new_name, False): + new_name = next_experiment_id(new_name) + if new_name == '': + return '' + if not db_common.save_experiment(new_name, description, version): + return '' + Log.info('The new experiment "{0}" has been registered.', new_name) + return new_name + + +def copy_experiment(experiment_id, description, version, test=False, operational=False): + """ + Creates a new experiment by copying an existing experiment + + :param version: experiment's associated autosubmit version + :type version: str + :param experiment_id: identifier of experiment to copy + :type experiment_id: str + :param description: experiment's description + :type description: str + :param test: specifies if it is a test experiment + :type test: bool + :param operational: specifies if it is a operational experiment + :type operational: bool + :return: experiment id for the new experiment + :rtype: str + """ + if not db_common.check_experiment_exists(experiment_id): + return '' + new_name = new_experiment(description, version, test, operational) + return new_name + + +def next_experiment_id(current_id): + """ + Get next experiment identifier + + :param current_id: previous experiment identifier + :type current_id: str + :return: new experiment identifier + :rtype: str + """ + if not is_valid_experiment_id(current_id): + return '' + # Convert the name to base 36 in number add 1 and then encode it + next_id = base36encode(base36decode(current_id) + 1) + return next_id if is_valid_experiment_id(next_id) else '' + + +def is_valid_experiment_id(name): + """ + Checks if it is a valid experiment identifier + + :param name: experiment identifier to check + :type name: str + :return: name if is valid, terminates program otherwise + :rtype: str + """ + name = name.lower() + if len(name) < 4 or not name.isalnum(): + Log.error("So sorry, but the name must have at least 4 alphanumeric chars!!!") + return False + return True + + +def base36encode(number, alphabet=string.digits + string.ascii_lowercase): + """ + Convert positive integer to a base36 string. + + :param number: number to convert + :type number: int + :param alphabet: set of characters to use + :type alphabet: str + :return: number's base36 string value + :rtype: str + """ + if not isinstance(number, (int, long)): + raise TypeError('number must be an integer') + + # Special case for zero + if number == 0: + return '0' + + base36 = '' + + sign = '' + if number < 0: + sign = '-' + number = - number + + while number > 0: + number, i = divmod(number, len(alphabet)) + # noinspection PyAugmentAssignment + base36 = alphabet[i] + base36 + + return sign + base36.rjust(4, '0') + + +def base36decode(number): + """ + Converts a base36 string to a positive integer + + :param number: base36 string to convert + :type number: str + :return: number's integer value + :rtype: int + """ + return int(number, 36) diff --git a/test/unit/test_expid.py b/test/unit/test_expid.py index 8f135183c..85e5a012b 100644 --- a/test/unit/test_expid.py +++ b/test/unit/test_expid.py @@ -1,7 +1,6 @@ from unittest import TestCase -from mock import Mock -from autosubmit.autosubmit import new_experiment -import autosubmit.database.db_common as db_common +from mock import Mock, patch +from autosubmit.experiment.experiment_common import new_experiment, next_experiment_id class TestExpid(TestCase): @@ -9,13 +8,49 @@ class TestExpid(TestCase): self.description = "for testing" self.version = "test-version" - def test_create_new_experiment(self): - # arrange + @patch('autosubmit.experiment.experiment_common.db_common') + def test_create_new_experiment(self, db_common_mock): + current_experiment_id = "empty" + self._build_db_mock(current_experiment_id, db_common_mock) + experiment_id = new_experiment(self.description, self.version) + self.assertEquals("a000", experiment_id) + + @patch('autosubmit.experiment.experiment_common.db_common') + def test_create_new_test_experiment(self, db_common_mock): + current_experiment_id = "empty" + self._build_db_mock(current_experiment_id, db_common_mock) + experiment_id = new_experiment(self.description, self.version, True) + self.assertEquals("t000", experiment_id) + + @patch('autosubmit.experiment.experiment_common.db_common') + def test_create_new_operational_experiment(self, db_common_mock): + current_experiment_id = "empty" + self._build_db_mock(current_experiment_id, db_common_mock) + experiment_id = new_experiment(self.description, self.version, False, True) + self.assertEquals("o000", experiment_id) + + @patch('autosubmit.experiment.experiment_common.db_common') + def test_create_new_experiment_with_previous_one(self, db_common_mock): current_experiment_id = "a006" - db_common.last_name_used = Mock(return_value=current_experiment_id) - db_common.check_experiment_exists = Mock(return_value=False) - db_common._set_experiment = Mock(return_value=True) - # act + self._build_db_mock(current_experiment_id, db_common_mock) experiment_id = new_experiment(self.description, self.version) - # assert - self.assertEquals("a007", experiment_id) \ No newline at end of file + self.assertEquals("a007", experiment_id) + + @patch('autosubmit.experiment.experiment_common.db_common') + def test_create_new_test_experiment_with_previous_one(self, db_common_mock): + current_experiment_id = "t0ab" + self._build_db_mock(current_experiment_id, db_common_mock) + experiment_id = new_experiment(self.description, self.version, True) + self.assertEquals("t0ac", experiment_id) + + @patch('autosubmit.experiment.experiment_common.db_common') + def test_create_new_operational_experiment_with_previous_one(self, db_common_mock): + current_experiment_id = "o112" + self._build_db_mock(current_experiment_id, db_common_mock) + experiment_id = new_experiment(self.description, self.version, False, True) + self.assertEquals("o113", experiment_id) + + @staticmethod + def _build_db_mock(current_experiment_id, mock_db_common): + mock_db_common.last_name_used = Mock(return_value=current_experiment_id) + mock_db_common.check_experiment_exists = Mock(return_value=False) -- GitLab From a7d2120ca6211a3043bbe79dad94279eb1be7423 Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 15:28:25 +0200 Subject: [PATCH 10/30] Integration tests should be run too --- .gitlab-ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 145c9072a..550f8952f 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -4,4 +4,5 @@ test: - ~/venv/autosubmit script: - source ~/venv/autosubmit/bin/activate - - nosetests test/unit \ No newline at end of file + - nosetests test/unit + - nosetests test/integration \ No newline at end of file -- GitLab From eba33ccf38d6095cf8b100a2a1d090a8072e1a03 Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 19:14:35 +0200 Subject: [PATCH 11/30] MN3 exclusivity is working now with Paramiko --- autosubmit/platforms/lsfplatform.py | 8 ++++++++ autosubmit/platforms/paramiko_platform.py | 5 ++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index dd8b03d0d..7c48ab3b7 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -110,6 +110,13 @@ class LsfHeader: else: return '#BSUB -R "select[(scratch<{0})]"'.format(job.scratch_free_space) + # noinspection PyMethodMayBeStatic + def get_exclusivity(self, job): + if job.get_platform().exclusivity is '': + return "" + else: + return "#BSUB -x" + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -121,6 +128,7 @@ class LsfHeader: #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.err #BSUB -W %WALLCLOCK% #BSUB -n %NUMPROC% + %EXCLUSIVITY_DIRECTIVE% # ############################################################################### """) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index d3abb1eeb..342a2ea7e 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -355,11 +355,14 @@ class ParamikoPlatform(Platform): header = self.header.SERIAL str_datetime = date2str(datetime.datetime.now(), 'S') - header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) + if hasattr(self.header, 'get_queue_directive'): + header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) header = header.replace('%ERR_LOG_DIRECTIVE%', "{0}.{1}.err".format(job.name, str_datetime)) header = header.replace('%OUT_LOG_DIRECTIVE%', "{0}.{1}.out".format(job.name, str_datetime)) if hasattr(self.header, 'get_scratch_free_space'): header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) + if hasattr(self.header, 'get_exclusivity'): + header = header.replace('%EXCLUSIVITY_DIRECTIVE%', self.header.get_exclusivity(job)) return header def check_remote_log_dir(self): -- GitLab From e4b1ec395b613167454a5e060d8e245050e33360 Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 19:44:21 +0200 Subject: [PATCH 12/30] Fixed RETRIALS variable usage (wrong comparison). Fixes #194 --- autosubmit/job/job_list.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index c26e155ff..8e45e6fc5 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -546,7 +546,7 @@ class JobList: else: retrials = job.retrials - if job.fail_count < retrials: + if job.fail_count <= retrials: tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY @@ -561,9 +561,6 @@ class JobList: Log.debug('Updating WAITING jobs') for job in self.get_waiting(): tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] - # for parent in job.parents: - # if parent.status != Status.COMPLETED: - # break if len(tmp) == len(job.parents): job.status = Status.READY save = True -- GitLab From cad602470baa263b8fc4caac95d0829bcef088a0 Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 20:00:36 +0200 Subject: [PATCH 13/30] Platforms.conf improved a bit. Related with #152 --- autosubmit/config/files/platforms.conf | 37 ++++++++++++++------------ 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/autosubmit/config/files/platforms.conf b/autosubmit/config/files/platforms.conf index 129be0fda..84531137e 100644 --- a/autosubmit/config/files/platforms.conf +++ b/autosubmit/config/files/platforms.conf @@ -2,38 +2,41 @@ ## Platform name # [PLATFORM] -## Queue type. Options: PBS, SGE, PS, LSF, ecaccess, SLURM +## Queue type. Options: PBS, SGE, PS, LSF, ecaccess, SLURM. Required # TYPE = ## Version of queue manager to use. Needed only in PBS (options: 10, 11, 12) and ecaccess (options: pbs, loadleveler) # VERSION = -## Hostname of the HPC +## Hostname of the HPC. Required # HOST = -## Project for the machine scheduler +## Project for the machine scheduler. Required # PROJECT = ## Budget account for the machine scheduler. If omitted, takes the value defined in PROJECT # BUDGET = -## Option to add project name to host. This is required for some HPCs +## Option to add project name to host. This is required for some HPCs. # ADD_PROJECT_TO_HOST = False -## User for the machine scheduler +## User for the machine scheduler. Required # USER = -## Path to the scratch directory for the machine +## Path to the scratch directory for the machine. Required. # SCRATCH_DIR = /scratch -## If true, autosubmit test command can use this queue as a main queue. Defaults to false +## If true, Autosubmit test command can use this queue as a main queue. Defaults to False # TEST_SUITE = False -## If given, autosubmit will add jobs to the given queue +## If given, Autosubmit will add jobs to the given queue. Required for some platforms. # QUEUE = -## If specified, autosubmit will run jobs with only one processor in the specified platform. +## Optional. If given, Autosubmit will submit the serial jobs with the exclusivity directive. +# QUEUE = +## Optional. If specified, autosubmit will run jobs with only one processor in the specified platform. # SERIAL_PLATFORM = SERIAL_PLATFORM_NAME -## If specified, autosubmit will run jobs with only one processor in the specified queue. +## Optional. If specified, autosubmit will run jobs with only one processor in the specified queue. ## Autosubmit will ignore this configuration if SERIAL_PLATFORM is provided # SERIAL_QUEUE = SERIAL_QUEUE_NAME -## Default number of processors per node to be used in jobs +## Optional. Default number of processors per node to be used in jobs # PROCESSORS_PER_NODE = -## Scratch free space requirements for the platform in percentage (%). If not specified, it won't be defined on the template. -# SCRATCH_FREE_SPACE = 10 -## Default Maximum number of jobs to be waiting in any platform queue +## Optional. Integer. Scratch free space requirements for the platform in percentage (%). +## If not specified, it won't be defined on the template. +# SCRATCH_FREE_SPACE = +## Optional. Integer. Default Maximum number of jobs to be waiting in any platform queue ## Default = 3 -# MAX_WAITING_JOBS = 3 -## Default maximum number of jobs to be running at the same time at any platform +# MAX_WAITING_JOBS = +## Optional. Integer. Default maximum number of jobs to be running at the same time at any platform ## Default = 6 -# TOTAL_JOBS = 6 \ No newline at end of file +# TOTAL_JOBS = \ No newline at end of file -- GitLab From 5456408e3b37d6ef01231027eda4a624e78ad588 Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 20:17:50 +0200 Subject: [PATCH 14/30] Refactor of the tasks_per_node (NUMTASK) management. Fixes #190 --- autosubmit/platforms/ecplatform.py | 9 ++++++++- autosubmit/platforms/lsfplatform.py | 9 ++++++++- autosubmit/platforms/paramiko_platform.py | 2 ++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 7717eb8c7..328503d22 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -237,6 +237,13 @@ class EcCcaHeader: # There is no queue, so directive is empty return "" + # noinspection PyMethodMayBeStatic + def get_tasks_per_node(self, job): + if job.tasks is None: + return "" + else: + return '#PBS -l EC_tasks_per_node={0}'.format(job.tasks) + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -264,7 +271,7 @@ class EcCcaHeader: #PBS -q np #PBS -l EC_total_tasks=%NUMPROC% #PBS -l EC_threads_per_task=%NUMTHREADS% - #PBS -l EC_tasks_per_node=%NUMTASK% + %TASKS_PER_NODE_DIRECTIVE% #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% # diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index 7c48ab3b7..cae79d06e 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -110,6 +110,13 @@ class LsfHeader: else: return '#BSUB -R "select[(scratch<{0})]"'.format(job.scratch_free_space) + # noinspection PyMethodMayBeStatic + def get_tasks_per_node(self, job): + if job.tasks is None: + return "" + else: + return '#BSUB -R "span[ptile={0}]"'.format(job.tasks) + # noinspection PyMethodMayBeStatic def get_exclusivity(self, job): if job.get_platform().exclusivity is '': @@ -144,7 +151,7 @@ class LsfHeader: #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.err #BSUB -W %WALLCLOCK% #BSUB -n %NUMPROC% - #BSUB -R "span[ptile=%NUMTASK%]" + %TASKS_PER_NODE_DIRECTIVE% %SCRATCH_FREE_SPACE_DIRECTIVE% # ############################################################################### diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 342a2ea7e..1e10a3029 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -359,6 +359,8 @@ class ParamikoPlatform(Platform): header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) header = header.replace('%ERR_LOG_DIRECTIVE%', "{0}.{1}.err".format(job.name, str_datetime)) header = header.replace('%OUT_LOG_DIRECTIVE%', "{0}.{1}.out".format(job.name, str_datetime)) + if hasattr(self.header, 'get_tasks_per_node'): + header = header.replace('%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job)) if hasattr(self.header, 'get_scratch_free_space'): header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) if hasattr(self.header, 'get_exclusivity'): -- GitLab From a249d7556180b865d38cdc07b06cb38715ca6ade Mon Sep 17 00:00:00 2001 From: Joan Lopez Date: Thu, 13 Oct 2016 23:32:58 +0200 Subject: [PATCH 15/30] Now only checks the EXPDEF.conf before a refresh. Fixes #134 --- autosubmit/autosubmit.py | 2 +- autosubmit/config/config_common.py | 16 ++++++++-------- test/unit/test_autosubmit_ config.py | 28 ++++++++++++++-------------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 1dfc092ff..3a650e4a7 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1448,7 +1448,7 @@ class Autosubmit: Log.set_file(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, BasicConfig.LOCAL_TMP_DIR, 'refresh.log')) as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) - if not as_conf.check_conf_files(): + if not as_conf.check_expdef_conf(): Log.critical('Can not copy with invalid configuration') return False project_type = as_conf.get_project_type() diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 3af358b79..0aaa3f6a3 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -195,17 +195,17 @@ class AutosubmitConfig: """ Log.info('\nChecking configuration files...') self.reload() - result = self._check_autosubmit_conf() - result = result and self._check_platforms_conf() - result = result and self._check_jobs_conf() - result = result and self._check_expdef_conf() + result = self.check_autosubmit_conf() + result = result and self.check_platforms_conf() + result = result and self.check_jobs_conf() + result = result and self.check_expdef_conf() if result: Log.result("Configuration files OK\n") else: Log.error("Configuration files invalid\n") return result - def _check_autosubmit_conf(self): + def check_autosubmit_conf(self): """ Checks experiment's autosubmit configuration file. @@ -236,7 +236,7 @@ class AutosubmitConfig: Log.info('{0} OK'.format(os.path.basename(self._conf_parser_file))) return result - def _check_platforms_conf(self): + def check_platforms_conf(self): """ Checks experiment's queues configuration file. @@ -274,7 +274,7 @@ class AutosubmitConfig: Log.info('{0} OK'.format(os.path.basename(self._platforms_parser_file))) return result - def _check_jobs_conf(self): + def check_jobs_conf(self): """ Checks experiment's jobs configuration file. @@ -325,7 +325,7 @@ class AutosubmitConfig: return result - def _check_expdef_conf(self): + def check_expdef_conf(self): """ Checks experiment's experiment configuration file. diff --git a/test/unit/test_autosubmit_ config.py b/test/unit/test_autosubmit_ config.py index 18c4113a1..30f75bad2 100644 --- a/test/unit/test_autosubmit_ config.py +++ b/test/unit/test_autosubmit_ config.py @@ -619,8 +619,8 @@ class TestAutosubmitConfig(TestCase): config.reload() # act - should_be_true = config._check_autosubmit_conf() - should_be_false = config._check_autosubmit_conf() + should_be_true = config.check_autosubmit_conf() + should_be_false = config.check_autosubmit_conf() # arrange self.assertTrue(should_be_true) @@ -643,8 +643,8 @@ class TestAutosubmitConfig(TestCase): config.reload() # act - should_be_true = config._check_expdef_conf() - should_be_false = config._check_expdef_conf() + should_be_true = config.check_expdef_conf() + should_be_false = config.check_expdef_conf() # assert self.assertTrue(should_be_true) @@ -670,7 +670,7 @@ class TestAutosubmitConfig(TestCase): config.reload() # act - should_be_true = config._check_jobs_conf() + should_be_true = config.check_jobs_conf() # assert self.assertTrue(should_be_true) @@ -692,7 +692,7 @@ class TestAutosubmitConfig(TestCase): config.reload() # act - should_be_true = config._check_platforms_conf() + should_be_true = config.check_platforms_conf() # assert self.assertTrue(should_be_true) @@ -703,17 +703,17 @@ class TestAutosubmitConfig(TestCase): config = AutosubmitConfig(self.any_expid, FakeBasicConfig, ConfigParserFactory()) config.reload() - config._check_autosubmit_conf = truth_mock - config._check_platforms_conf = truth_mock - config._check_jobs_conf = truth_mock - config._check_expdef_conf = truth_mock + config.check_autosubmit_conf = truth_mock + config.check_platforms_conf = truth_mock + config.check_jobs_conf = truth_mock + config.check_expdef_conf = truth_mock config2 = AutosubmitConfig(self.any_expid, FakeBasicConfig, ConfigParserFactory()) config2.reload() - config2._check_autosubmit_conf = truth_mock - config2._check_platforms_conf = truth_mock - config2._check_jobs_conf = truth_mock - config2._check_expdef_conf = Mock(return_value=False) + config2.check_autosubmit_conf = truth_mock + config2.check_platforms_conf = truth_mock + config2.check_jobs_conf = truth_mock + config2.check_expdef_conf = Mock(return_value=False) # act should_be_true = config.check_conf_files() -- GitLab From 38790f10ac84bcd2262b96baefb5065e0b0d989d Mon Sep 17 00:00:00 2001 From: jlope2 Date: Fri, 14 Oct 2016 17:04:32 +0200 Subject: [PATCH 16/30] Get LOGS files working, at least for ECMWF. Related with #182 --- autosubmit/job/job.py | 3 ++ autosubmit/platforms/paramiko_platform.py | 7 ++-- autosubmit/platforms/platform.py | 25 ++++++++++++++ autosubmit/platforms/saga_platform.py | 40 +++++++++++++---------- 4 files changed, 55 insertions(+), 20 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 73a9843f2..d29e00b12 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -73,6 +73,8 @@ class Job: self.id = jobid self.file = None + self.out_filename = '' + self.err_filename = '' self.status = status self.priority = priority self._parents = set() @@ -427,6 +429,7 @@ class Job: self.write_start_time() if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: self.write_end_time(self.status == Status.COMPLETED) + self.get_platform().get_logs_files(self.out_filename, self.err_filename) return self.status def check_completion(self, default_status=Status.FAILED): diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 1e10a3029..28b9ad4d6 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -355,10 +355,13 @@ class ParamikoPlatform(Platform): header = self.header.SERIAL str_datetime = date2str(datetime.datetime.now(), 'S') + job.out_filename = "{0}.{1}.out".format(job.name, str_datetime) + job.err_filename = "{0}.{1}.err".format(job.name, str_datetime) + header = header.replace('%OUT_LOG_DIRECTIVE%', job.out_filename) + header = header.replace('%ERR_LOG_DIRECTIVE%', job.err_filename) + if hasattr(self.header, 'get_queue_directive'): header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) - header = header.replace('%ERR_LOG_DIRECTIVE%', "{0}.{1}.err".format(job.name, str_datetime)) - header = header.replace('%OUT_LOG_DIRECTIVE%', "{0}.{1}.out".format(job.name, str_datetime)) if hasattr(self.header, 'get_tasks_per_node'): header = header.replace('%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job)) if hasattr(self.header, 'get_scratch_free_space'): diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 0410c542d..732448e2f 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -133,6 +133,20 @@ class Platform: """ raise NotImplementedError + def get_files(self, files, must_exist=True): + """ + Copies some files from the current platform to experiment's tmp folder + + :param files: file names + :type files: [str] + :param must_exist: If True, raises an exception if file can not be copied + :type must_exist: bool + :return: True if file is copied succesfully, false otherwise + :rtype: bool + """ + for filename in files: + self.get_file(filename, must_exist) + def delete_file(self, filename): """ Deletes a file from this platform @@ -144,6 +158,17 @@ class Platform: """ raise NotImplementedError + def get_logs_files(self, job_out_filename, job_err_filename): + """ + Get the given LOGS files + + :param job_out_filename: name of the out file + :type job_out_filename: str + :param job_err_filename: name of the err file + :type job_err_filename: str + """ + self.get_files([job_out_filename, job_err_filename], False) + def get_completed_files(self, job_name, retries=5): """ Get the COMPLETED file of the given job diff --git a/autosubmit/platforms/saga_platform.py b/autosubmit/platforms/saga_platform.py index 3449ce9db..d47c776c4 100644 --- a/autosubmit/platforms/saga_platform.py +++ b/autosubmit/platforms/saga_platform.py @@ -204,14 +204,14 @@ class SagaPlatform(Platform): saga_job.run() return saga_job.id - def create_saga_job(self, job, scriptname): + def create_saga_job(self, job, script_name): """ Creates a saga job from a given job object. :param job: job object :type job: autosubmit.job.job.Job - :param scriptname: job script's name - :rtype scriptname: str + :param script_name: job script's name + :type script_name: str :return: saga job object for the given job :rtype: saga.job.Job """ @@ -223,21 +223,25 @@ class SagaPlatform(Platform): elif job.type == Type.R: binary = 'Rscript' - # jd.executable = '{0} {1}'.format(binary, os.path.join(self.get_files_path(), scriptname)) - jd.executable = os.path.join(self.get_files_path(), scriptname) + # jd.executable = '{0} {1}'.format(binary, os.path.join(self.get_files_path(), script_name)) + jd.executable = os.path.join(self.get_files_path(), script_name) jd.working_directory = self.get_files_path() + str_datetime = date2str(datetime.datetime.now(), 'S') - jd.output = "{0}.{1}.out".format(job.name, str_datetime) - jd.error = "{0}.{1}.err".format(job.name, str_datetime) + job.out_filename = "{0}.{1}.out".format(job.name, str_datetime) + job.err_filename = "{0}.{1}.err".format(job.name, str_datetime) + jd.output = job.out_filename + jd.error = job.err_filename + self.add_attribute(jd, 'Name', job.name) - wallclock = job.parameters["WALLCLOCK"] - if wallclock == '': - wallclock = 0 + wall_clock = job.parameters["WALLCLOCK"] + if wall_clock == '': + wall_clock = 0 else: - wallclock = wallclock.split(':') - wallclock = int(wallclock[0]) * 60 + int(wallclock[1]) - self.add_attribute(jd, 'WallTimeLimit', wallclock) + wall_clock = wall_clock.split(':') + wall_clock = int(wall_clock[0]) * 60 + int(wall_clock[1]) + self.add_attribute(jd, 'WallTimeLimit', wall_clock) self.add_attribute(jd, 'Queue', job.parameters["CURRENT_QUEUE"]) @@ -273,13 +277,13 @@ class SagaPlatform(Platform): return jd.set_attribute(name, value) - def check_job(self, jobid, default_status=Status.COMPLETED, retries=10): + def check_job(self, job_id, default_status=Status.COMPLETED, retries=10): """ Checks job running status :param retries: retries - :param jobid: job id - :type jobid: str + :param job_id: job id + :type job_id: str :param default_status: status to assign if it can be retrieved from the platform :type default_status: autosubmit.job.job_common.Status :return: current job status @@ -288,9 +292,9 @@ class SagaPlatform(Platform): saga_status = None while saga_status is None and retries >= 0: try: - if jobid not in self.service.jobs: + if job_id not in self.service.jobs: return Status.COMPLETED - saga_status = self.service.get_job(jobid).state + saga_status = self.service.get_job(job_id).state except Exception as e: # If SAGA can not get the job state, we change it to completed # It will change to FAILED if not COMPLETED file is present -- GitLab From 0a1952d23bbec8d66d82d58a63ecb0c6bf6cce74 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Fri, 14 Oct 2016 17:20:18 +0200 Subject: [PATCH 17/30] THREADS is now optional. Fixes #204 --- autosubmit/job/job_list.py | 2 +- autosubmit/platforms/ecplatform.py | 11 +++++++++-- autosubmit/platforms/lsfplatform.py | 4 ++-- autosubmit/platforms/paramiko_platform.py | 2 ++ 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 8e45e6fc5..8bac43d03 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -983,7 +983,7 @@ class DicJobs: job.check = False job.processors = self.get_option(section, "PROCESSORS", 1) - job.threads = self.get_option(section, "THREADS", 1) + job.threads = self.get_option(section, "THREADS", '') job.tasks = self.get_option(section, "TASKS", '') job.memory = self.get_option(section, "MEMORY", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 328503d22..1b7fa828b 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -239,11 +239,18 @@ class EcCcaHeader: # noinspection PyMethodMayBeStatic def get_tasks_per_node(self, job): - if job.tasks is None: + if not isinstance(job.tasks, int): return "" else: return '#PBS -l EC_tasks_per_node={0}'.format(job.tasks) + # noinspection PyMethodMayBeStatic + def get_threads_per_task(self, job): + if not isinstance(job.threads, int): + return "" + else: + return '#PBS -l EC_threads_per_task={0}'.format(job.threads) + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT @@ -270,7 +277,7 @@ class EcCcaHeader: #PBS -e %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% #PBS -q np #PBS -l EC_total_tasks=%NUMPROC% - #PBS -l EC_threads_per_task=%NUMTHREADS% + %THREADS_PER_TASK_DIRECTIVE% %TASKS_PER_NODE_DIRECTIVE% #PBS -l walltime=%WALLCLOCK%:00 #PBS -l EC_billing_account=%CURRENT_BUDG% diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index cae79d06e..691ab51fb 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -105,14 +105,14 @@ class LsfHeader: # noinspection PyMethodMayBeStatic def get_scratch_free_space(self, job): - if job.scratch_free_space is None: + if not isinstance(job.scratch_free_space, int): return "" else: return '#BSUB -R "select[(scratch<{0})]"'.format(job.scratch_free_space) # noinspection PyMethodMayBeStatic def get_tasks_per_node(self, job): - if job.tasks is None: + if not isinstance(job.tasks, int): return "" else: return '#BSUB -R "span[ptile={0}]"'.format(job.tasks) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 28b9ad4d6..64d4abd28 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -364,6 +364,8 @@ class ParamikoPlatform(Platform): header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) if hasattr(self.header, 'get_tasks_per_node'): header = header.replace('%TASKS_PER_NODE_DIRECTIVE%', self.header.get_tasks_per_node(job)) + if hasattr(self.header, 'get_threads_per_task'): + header = header.replace('%THREADS_PER_TASK_DIRECTIVE%', self.header.get_threads_per_task(job)) if hasattr(self.header, 'get_scratch_free_space'): header = header.replace('%SCRATCH_FREE_SPACE_DIRECTIVE%', self.header.get_scratch_free_space(job)) if hasattr(self.header, 'get_exclusivity'): -- GitLab From 956b72a9e764f82154d9e0c5e8e85f664d46e9a5 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Fri, 14 Oct 2016 17:22:57 +0200 Subject: [PATCH 18/30] Exclusivity minor refactor --- autosubmit/platforms/lsfplatform.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index 691ab51fb..f2eeb4cad 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -119,10 +119,10 @@ class LsfHeader: # noinspection PyMethodMayBeStatic def get_exclusivity(self, job): - if job.get_platform().exclusivity is '': - return "" - else: + if job.get_platform().exclusivity == 'true': return "#BSUB -x" + else: + return "" SERIAL = textwrap.dedent("""\ ############################################################################### -- GitLab From 10834386a2ed5dc6c0068ba67640acee32093226 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Mon, 17 Oct 2016 11:08:25 +0200 Subject: [PATCH 19/30] Common LOGS files format. Now get LOGS is also working for MN3. Related with #182 --- autosubmit/platforms/lsfplatform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index f2eeb4cad..961c3aefe 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -131,8 +131,8 @@ class LsfHeader: # #%QUEUE_DIRECTIVE% #BSUB -J %JOBNAME% - #BSUB -oo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.out - #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.err + #BSUB -oo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% + #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%ERR_LOG_DIRECTIVE% #BSUB -W %WALLCLOCK% #BSUB -n %NUMPROC% %EXCLUSIVITY_DIRECTIVE% -- GitLab From 407eb3af437f0a74c5b0085cd8ef72f8b096a3be Mon Sep 17 00:00:00 2001 From: jlope2 Date: Mon, 17 Oct 2016 11:13:33 +0200 Subject: [PATCH 20/30] Preventing redundancy with LOGS files. Related with #182 --- autosubmit/platforms/locplatform.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index ce08d88a3..c6ccb7554 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -139,6 +139,18 @@ class LocalPlatform(ParamikoPlatform): def get_ssh_output(self): return self._ssh_output + def get_logs_files(self, job_out_filename, job_err_filename): + """ + Overriding the parent's implementation. + Do nothing because the log files are already in the local platform (redundancy). + + :param job_out_filename: name of the out file + :type job_out_filename: str + :param job_err_filename: name of the err file + :type job_err_filename: str + """ + return + class LocalHeader: """Class to handle the Ps headers of a job""" -- GitLab From 4a24a2fb02168664fb2e5ba0e7a86d917f97ab63 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Mon, 17 Oct 2016 11:48:25 +0200 Subject: [PATCH 21/30] Refactor of the LOGS files management. Fixes #182 --- autosubmit/job/job.py | 2 +- autosubmit/platforms/ecplatform.py | 4 ++-- autosubmit/platforms/locplatform.py | 8 +++++--- autosubmit/platforms/paramiko_platform.py | 16 +++++++++------- autosubmit/platforms/platform.py | 20 +++++++++++++------- autosubmit/platforms/saga_platform.py | 4 +++- 6 files changed, 33 insertions(+), 21 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d29e00b12..7b92508e8 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -429,7 +429,7 @@ class Job: self.write_start_time() if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: self.write_end_time(self.status == Status.COMPLETED) - self.get_platform().get_logs_files(self.out_filename, self.err_filename) + self.get_platform().get_logs_files(self.expid, self.out_filename, self.err_filename) return self.status def check_completion(self, default_status=Status.FAILED): diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 1b7fa828b..447716295 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -134,8 +134,8 @@ class EcPlatform(ParamikoPlatform): raise return True - def get_file(self, filename, must_exist=True): - local_path = os.path.join(self.tmp_path, filename) + def get_file(self, filename, must_exist=True, relative_path=''): + local_path = os.path.join(self.tmp_path, relative_path, filename) if os.path.exists(local_path): os.remove(local_path) diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index c6ccb7554..f51d2af92 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -112,8 +112,8 @@ class LocalPlatform(ParamikoPlatform): raise return True - def get_file(self, filename, must_exist=True): - local_path = os.path.join(self.tmp_path, filename) + def get_file(self, filename, must_exist=True, relative_path=''): + local_path = os.path.join(self.tmp_path, relative_path, filename) if os.path.exists(local_path): os.remove(local_path) @@ -139,11 +139,13 @@ class LocalPlatform(ParamikoPlatform): def get_ssh_output(self): return self._ssh_output - def get_logs_files(self, job_out_filename, job_err_filename): + def get_logs_files(self, exp_id, job_out_filename, job_err_filename): """ Overriding the parent's implementation. Do nothing because the log files are already in the local platform (redundancy). + :param exp_id: experiment id + :type exp_id: str :param job_out_filename: name of the out file :type job_out_filename: str :param job_err_filename: name of the err file diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 64d4abd28..713ab52c4 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -98,7 +98,7 @@ class ParamikoPlatform(Platform): os.path.join(self.get_files_path(), filename)) raise - def get_file(self, filename, must_exist=True): + def get_file(self, filename, must_exist=True, relative_path=''): """ Copies a file from the current platform to experiment's tmp folder @@ -106,11 +106,13 @@ class ParamikoPlatform(Platform): :type filename: str :param must_exist: If True, raises an exception if file can not be copied :type must_exist: bool - :return: True if file is copied succesfully, false otherwise + :param relative_path: path inside the tmp folder + :type relative_path: str + :return: True if file is copied successfully, false otherwise :rtype: bool """ - local_path = os.path.join(self.tmp_path, filename) + local_path = os.path.join(self.tmp_path, relative_path, filename) if os.path.exists(local_path): os.remove(local_path) @@ -123,7 +125,7 @@ class ParamikoPlatform(Platform): ftp.get(os.path.join(self.get_files_path(), filename), local_path) ftp.close() return True - except BaseException as e: + except BaseException: if must_exist: raise Exception('File {0} does not exists'.format(filename)) return False @@ -150,18 +152,18 @@ class ParamikoPlatform(Platform): Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename))) return False - def submit_job(self, job, scriptname): + def submit_job(self, job, script_name): """ Submit a job from a given job object. :param job: job object :type job: autosubmit.job.job.Job - :param scriptname: job script's name + :param script_name: job script's name :rtype scriptname: str :return: job id for the submitted job :rtype: int """ - if self.send_command(self.get_submit_cmd(scriptname, job.type)): + if self.send_command(self.get_submit_cmd(script_name, job.type)): job_id = self.get_submitted_job_id(self.get_ssh_output()) Log.debug("Job ID: {0}", job_id) return int(job_id) diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 732448e2f..9ea31f8ce 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -120,7 +120,7 @@ class Platform: """ raise NotImplementedError - def get_file(self, filename, must_exist=True): + def get_file(self, filename, must_exist=True, relative_path=''): """ Copies a file from the current platform to experiment's tmp folder @@ -128,12 +128,14 @@ class Platform: :type filename: str :param must_exist: If True, raises an exception if file can not be copied :type must_exist: bool - :return: True if file is copied succesfully, false otherwise + :param relative_path: relative path inside tmp folder + :type relative_path: str + :return: True if file is copied successfully, false otherwise :rtype: bool """ raise NotImplementedError - def get_files(self, files, must_exist=True): + def get_files(self, files, must_exist=True, relative_path=''): """ Copies some files from the current platform to experiment's tmp folder @@ -141,11 +143,13 @@ class Platform: :type files: [str] :param must_exist: If True, raises an exception if file can not be copied :type must_exist: bool - :return: True if file is copied succesfully, false otherwise + :param relative_path: relative path inside tmp folder + :type relative_path: str + :return: True if file is copied successfully, false otherwise :rtype: bool """ for filename in files: - self.get_file(filename, must_exist) + self.get_file(filename, must_exist, relative_path) def delete_file(self, filename): """ @@ -158,16 +162,18 @@ class Platform: """ raise NotImplementedError - def get_logs_files(self, job_out_filename, job_err_filename): + def get_logs_files(self, exp_id, job_out_filename, job_err_filename): """ Get the given LOGS files + :param exp_id: experiment id + :type exp_id: str :param job_out_filename: name of the out file :type job_out_filename: str :param job_err_filename: name of the err file :type job_err_filename: str """ - self.get_files([job_out_filename, job_err_filename], False) + self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) def get_completed_files(self, job_name, retries=5): """ diff --git a/autosubmit/platforms/saga_platform.py b/autosubmit/platforms/saga_platform.py index d47c776c4..85b20b4d4 100644 --- a/autosubmit/platforms/saga_platform.py +++ b/autosubmit/platforms/saga_platform.py @@ -86,7 +86,7 @@ class SagaPlatform(Platform): # noinspection PyTypeChecker return saga.filesystem.Directory(sftp_directory, session=self.service.session) - def get_file(self, filename, must_exist=True): + def get_file(self, filename, must_exist=True, relative_path=''): """ Copies a file from the current platform to experiment's tmp folder @@ -94,6 +94,8 @@ class SagaPlatform(Platform): :type filename: str :param must_exist: If True, raises an exception if file can not be copied :type must_exist: bool + :param relative_path: relative path inside tmp folder + :type relative_path: str :return: True if file is copied succesfully, false otherwise :rtype: bool """ -- GitLab From 89253618aae8193a63d2138acc73e4b4b44480fe Mon Sep 17 00:00:00 2001 From: jlope2 Date: Wed, 19 Oct 2016 15:32:54 +0200 Subject: [PATCH 22/30] Added 'networkx' dependency --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9a08a92aa..5aa452e37 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,7 @@ setup( keywords=['climate', 'weather', 'workflow', 'HPC'], install_requires=['argparse>=1.2,<2', 'python-dateutil>2', 'pydotplus>=2', 'pyparsing>=2.0.1', 'numpy', 'matplotlib', 'saga-python>=0.40', 'paramiko==1.15', - 'mock>=1.3.0', 'portalocker>=0.5.7'], + 'mock>=1.3.0', 'portalocker>=0.5.7', 'networkx'], extras_require={ 'dialog': ["python2-pythondialog>=3.3.0"] }, -- GitLab From c56c686dabbd05392024dd9a1a463d670a1a1066 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Wed, 19 Oct 2016 15:34:02 +0200 Subject: [PATCH 23/30] Added transitive reduction algorithm (utility) --- autosubmit/job/job_utils.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 autosubmit/job/job_utils.py diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py new file mode 100644 index 000000000..f6f3cde17 --- /dev/null +++ b/autosubmit/job/job_utils.py @@ -0,0 +1,19 @@ +import networkx + +from networkx.algorithms.dag import is_directed_acyclic_graph +from networkx import DiGraph +from networkx import dfs_edges +from networkx import NetworkXError + + +def transitive_reduction(graph): + if not is_directed_acyclic_graph(graph): + raise NetworkXError("Transitive reduction only uniquely defined on directed acyclic graphs.") + reduced_graph = DiGraph() + reduced_graph.add_nodes_from(graph.nodes()) + for u in graph: + u_edges = set(graph[u]) + for v in graph[u]: + u_edges -= {y for x, y in dfs_edges(graph, v)} + reduced_graph.add_edges_from((u, v) for v in u_edges) + return reduced_graph -- GitLab From 83f8c829395b5395e330721e8c936725a5ba97dd Mon Sep 17 00:00:00 2001 From: jlope2 Date: Wed, 19 Oct 2016 16:28:03 +0200 Subject: [PATCH 24/30] The performance has been improved. Related with #192 and #180 --- autosubmit/job/job_list.py | 46 +++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 8bac43d03..a0a67d841 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -35,6 +35,9 @@ from autosubmit.job.job import Job from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import date2str, parse_date +from networkx import DiGraph +from autosubmit.job.job_utils import transitive_reduction + class JobList: """ @@ -58,17 +61,32 @@ class JobList: self._chunk_list = [] self._dic_jobs = dict() self._persistence = job_list_persistence + self._graph = DiGraph() @property def expid(self): """ - Returns experiment identifier + Returns the experiment identifier :return: experiment's identifier :rtype: str """ return self._expid + @property + def graph(self): + """ + Returns the graph + + :return: graph + :rtype: networkx graph + """ + return self._graph + + @graph.setter + def graph(self, value): + self._graph = value + def generate(self, date_list, member_list, num_chunks, parameters, date_format, default_retrials, default_job_type, new=True): """ @@ -111,7 +129,7 @@ class JobList: self._create_jobs(dic_jobs, jobs_parser, priority, default_job_type, jobs_data) Log.info("Adding dependencies...") - self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser) + self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, self.graph) Log.info("Removing redundant dependencies...") self.update_genealogy(new) @@ -119,7 +137,7 @@ class JobList: job.parameters = parameters @staticmethod - def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, option="DEPENDENCIES"): + def _add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, graph, option="DEPENDENCIES"): for job_section in jobs_parser.sections(): Log.debug("Adding dependencies for {0} jobs".format(job_section)) @@ -132,7 +150,7 @@ class JobList: for job in dic_jobs.get_jobs(job_section): JobList._manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, - dependencies) + dependencies, graph) @staticmethod def _manage_dependencies(dependencies_keys, dic_jobs): @@ -149,7 +167,7 @@ class JobList: return dependencies @staticmethod - def _manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies): + def _manage_job_dependencies(dic_jobs, job, date_list, member_list, chunk_list, dependencies_keys, dependencies, graph): for key in dependencies_keys: dependency = dependencies[key] skip, (chunk, member, date) = JobList._calculate_dependency_metadata(job.chunk, chunk_list, @@ -161,9 +179,10 @@ class JobList: for parent in dic_jobs.get_jobs(dependency.section, date, member, chunk): job.add_parent(parent) + graph.add_edge(parent.name, job.name) JobList.handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, - member_list, dependency.section) + member_list, dependency.section, graph) @staticmethod def _calculate_dependency_metadata(chunk, chunk_list, member, member_list, date, date_list, dependency): @@ -217,7 +236,7 @@ class JobList: @staticmethod def handle_frequency_interval_dependencies(chunk, chunk_list, date, date_list, dic_jobs, job, member, member_list, - section_name): + section_name, graph): if job.wait and job.frequency > 1: if job.chunk is not None: max_distance = (chunk_list.index(chunk) + 1) % job.frequency @@ -226,6 +245,7 @@ class JobList: for distance in range(1, max_distance): for parent in dic_jobs.get_jobs(section_name, date, member, chunk - distance): job.add_parent(parent) + graph.add_edge(parent.name, job.name) elif job.member is not None: member_index = member_list.index(job.member) max_distance = (member_index + 1) % job.frequency @@ -235,6 +255,7 @@ class JobList: for parent in dic_jobs.get_jobs(section_name, date, member_list[member_index - distance], chunk): job.add_parent(parent) + graph.add_edge(parent.name, job.name) elif job.date is not None: date_index = date_list.index(job.date) max_distance = (date_index + 1) % job.frequency @@ -244,6 +265,7 @@ class JobList: for parent in dic_jobs.get_jobs(section_name, date_list[date_index - distance], member, chunk): job.add_parent(parent) + graph.add_edge(parent.name, job.name) @staticmethod def _create_jobs(dic_jobs, parser, priority, default_job_type, jobs_data=dict()): @@ -583,8 +605,12 @@ class JobList: # Simplifying dependencies: if a parent is already an ancestor of another parent, # we remove parent dependency + self.graph = transitive_reduction(self.graph) for job in self._job_list: - job.remove_redundant_parents() + children_to_remove = [child for child in job.children if child.name not in self.graph.neighbors(job.name)] + for child in children_to_remove: + job.children.remove(child) + child.parents.remove(job) for job in self._job_list: if not job.has_parents() and new: @@ -787,6 +813,7 @@ class DicJobs: :type priority: int """ self._dic[section] = self.build_job(section, priority, None, None, None, default_job_type, jobs_data) + self._joblist.graph.add_node(self._dic[section].name) def _create_jobs_startdate(self, section, priority, frequency, default_job_type, jobs_data=dict()): """ @@ -807,6 +834,7 @@ class DicJobs: if count % frequency == 0 or count == len(self._date_list): self._dic[section][date] = self.build_job(section, priority, date, None, None, default_job_type, jobs_data) + self._joblist.graph.add_node(self._dic[section][date].name) def _create_jobs_member(self, section, priority, frequency, default_job_type, jobs_data=dict()): """ @@ -829,6 +857,7 @@ class DicJobs: if count % frequency == 0 or count == len(self._member_list): self._dic[section][date][member] = self.build_job(section, priority, date, member, None, default_job_type, jobs_data) + self._joblist.graph.add_node(self._dic[section][date][member].name) ''' Maybe a good choice could be split this function or ascend the @@ -880,6 +909,7 @@ class DicJobs: else: self._dic[section][date][member][chunk] = self.build_job(section, priority, date, member, chunk, default_job_type, jobs_data) + self._joblist.graph.add_node(self._dic[section][date][member][chunk].name) def get_jobs(self, section, date=None, member=None, chunk=None): """ -- GitLab From 9a7ac23c21a47f685f9473608fef4613fe547ad8 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Thu, 27 Oct 2016 09:54:34 +0200 Subject: [PATCH 25/30] Refactor to make the regression tests standalone --- test/regression/tests_log.py | 189 ++++++++++++++++++++++++++++++++ test/regression/tests_runner.py | 16 ++- test/regression/tests_utils.py | 47 +++++++- 3 files changed, 247 insertions(+), 5 deletions(-) create mode 100644 test/regression/tests_log.py diff --git a/test/regression/tests_log.py b/test/regression/tests_log.py new file mode 100644 index 000000000..a2e454b41 --- /dev/null +++ b/test/regression/tests_log.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python + +# Copyright 2016 Earth Sciences Department, BSC-CNS + +# This file is part of Autosubmit. + +# Autosubmit is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# Autosubmit is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . + +import logging +import os +import sys +from datetime import datetime + + +class LogFormatter: + """ + Class to format log output. + + :param to_file: If True, creates a LogFormatter for files; if False, for console + :type to_file: bool + """ + RESULT = '\033[32m' + WARNING = '\033[33m' + ERROR = '\033[31m' + CRITICAL = '\033[1m \033[31m' + DEFAULT = '\033[0m\033[39m' + + def __init__(self, to_file=False): + """ + Initializer for LogFormatter + + + """ + self._file = to_file + if self._file: + self._formatter = logging.Formatter('%(asctime)s %(message)s') + else: + self._formatter = logging.Formatter('%(message)s') + + def format(self, record): + """ + Format log output, adding labels if needed for log level. If logging to console, also manages font color. + If logging to file adds timestamp + + :param record: log record to format + :type record: LogRecord + :return: formatted record + :rtype: str + """ + header = '' + if record.levelno == Log.RESULT: + if not self._file: + header = LogFormatter.RESULT + elif record.levelno == Log.USER_WARNING: + if not self._file: + header = LogFormatter.WARNING + elif record.levelno == Log.WARNING: + if not self._file: + header = LogFormatter.WARNING + header += "[WARNING] " + elif record.levelno == Log.ERROR: + if not self._file: + header = LogFormatter.ERROR + header += "[ERROR] " + elif record.levelno == Log.CRITICAL: + if not self._file: + header = LogFormatter.ERROR + header += "[CRITICAL] " + + msg = self._formatter.format(record) + if header != '' and not self._file: + msg += LogFormatter.DEFAULT + return header + msg + + +class Log: + """ + Static class to manage the prints for the regression tests. Messages will be sent to console. + Levels can be set for each output independently. These levels are (from lower to higher priority): + - DEBUG + - INFO + - RESULT + - USER_WARNING + - WARNING + - ERROR + - CRITICAL + """ + EVERYTHING = 0 + DEBUG = logging.DEBUG + INFO = logging.INFO + RESULT = 25 + USER_WARNING = 29 + WARNING = logging.WARNING + ERROR = logging.ERROR + CRITICAL = logging.CRITICAL + NO_LOG = CRITICAL + 1 + + logging.basicConfig() + + log = logging.Logger('Autosubmit', EVERYTHING) + + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(INFO) + console_handler.setFormatter(LogFormatter(False)) + log.addHandler(console_handler) + + file_handler = None + file_level = INFO + + @staticmethod + def debug(msg, *args): + """ + Prints debug information + + :param msg: message to show + :param args: arguments for message formatting (it will be done using format() method on str) + """ + print(msg.format(*args)) + + @staticmethod + def info(msg, *args): + """ + Prints information + + :param msg: message to show + :param args: arguments for message formatting (it will be done using format() method on str) + """ + print(msg.format(*args)) + + @staticmethod + def result(msg, *args): + """ + Prints results information. It will be shown in green in the console. + + :param msg: message to show + :param args: arguments for message formatting (it will be done using format() method on str) + """ + print(LogFormatter.RESULT + msg.format(*args) + LogFormatter.DEFAULT) + + @staticmethod + def user_warning(msg, *args): + """ + Prints warnings for the user. It will be shown in yellow in the console. + + :param msg: message to show + :param args: arguments for message formatting (it will be done using format() method on str) + """ + print(LogFormatter.WARNING + msg.format(*args) + LogFormatter.DEFAULT) + + @staticmethod + def warning(msg, *args): + """ + Prints program warnings. It will be shown in yellow in the console. + + :param msg: message to show + :param args: arguments for message formatting (it will be done using format() method on str) + """ + print(LogFormatter.WARNING + "[WARNING] " + msg.format(*args) + LogFormatter.DEFAULT) + + @staticmethod + def error(msg, *args): + """ + Prints errors to the log. It will be shown in red in the console. + + :param msg: message to show + :param args: arguments for message formatting (it will be done using format() method on str) + """ + print(LogFormatter.ERROR + "[ERROR] " + msg.format(*args) + LogFormatter.DEFAULT) + + @staticmethod + def critical(msg, *args): + """ + Prints critical errors to the log. It will be shown in red in the console. + + :param msg: message to show + :param args: arguments for message formatting (it will be done using format() method on str) + """ + print(LogFormatter.ERROR + "[CRITICAL] " + msg.format(*args) + LogFormatter.DEFAULT) diff --git a/test/regression/tests_runner.py b/test/regression/tests_runner.py index a4c5b077d..4602192da 100644 --- a/test/regression/tests_runner.py +++ b/test/regression/tests_runner.py @@ -1,11 +1,15 @@ -from autosubmit.config.config_common import AutosubmitConfig -from autosubmit.config.parser_factory import ConfigParserFactory -from autosubmit.config.log import Log +from tests_log import Log from tests_utils import check_cmd, next_experiment_id, copy_experiment_conf_files, create_database, clean_database from tests_commands import * from threading import Thread from time import sleep import argparse +try: + # noinspection PyCompatibility + from configparser import SafeConfigParser +except ImportError: + # noinspection PyCompatibility + from ConfigParser import SafeConfigParser # Configuration file where the regression tests are defined with INI style tests_parser_file = 'tests.conf' @@ -45,7 +49,11 @@ def run_test_case(experiment_id, name, hpc_arch, description, src_path, retrials def run(current_experiment_id, only_list=None, exclude_list=None, max_threads=5): # Local variables for testing test_threads = [] - tests_parser = AutosubmitConfig.get_parser(ConfigParserFactory(), tests_parser_file) + + # Building tests parser + tests_parser = SafeConfigParser() + tests_parser.optionxform = str + tests_parser.read(tests_parser_file) # Resetting the database clean_database(db_path) diff --git a/test/regression/tests_utils.py b/test/regression/tests_utils.py index d76bd1209..297fb8f75 100644 --- a/test/regression/tests_utils.py +++ b/test/regression/tests_utils.py @@ -1,7 +1,7 @@ -from autosubmit.database.db_common import base36decode, base36encode from tests_commands import * import os import subprocess +import string BIN_PATH = '../../bin' @@ -70,3 +70,48 @@ def get_default_copy_cmd(db_path, filename, experiment_id): def get_conf_file_path(base_path, filename): return os.path.join(base_path, 'conf', filename + '.conf') + + +def base36encode(number, alphabet=string.digits + string.ascii_lowercase): + """ + Convert positive integer to a base36 string. + + :param number: number to convert + :type number: int + :param alphabet: set of characters to use + :type alphabet: str + :return: number's base36 string value + :rtype: str + """ + if not isinstance(number, (int, long)): + raise TypeError('number must be an integer') + + # Special case for zero + if number == 0: + return '0' + + base36 = '' + + sign = '' + if number < 0: + sign = '-' + number = - number + + while number > 0: + number, i = divmod(number, len(alphabet)) + # noinspection PyAugmentAssignment + base36 = alphabet[i] + base36 + + return sign + base36.rjust(4, '0') + + +def base36decode(number): + """ + Converts a base36 string to a positive integer + + :param number: base36 string to convert + :type number: str + :return: number's integer value + :rtype: int + """ + return int(number, 36) -- GitLab From 9c1d4a20d5fdffeeb80be2954ab98c83f46d33f5 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Fri, 28 Oct 2016 09:29:02 +0200 Subject: [PATCH 26/30] Improved the UX when non-existing dependency is defined --- autosubmit/config/config_common.py | 8 ++++++-- autosubmit/job/job_list.py | 4 ++++ test/regression/README | 1 + 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 0aaa3f6a3..83fc98a56 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -306,7 +306,9 @@ class AutosubmitConfig: elif '+' in dependency: dependency = dependency.split('+')[0] if dependency not in sections: - Log.error('Job {0} depends on job {1} that is not defined'.format(section, dependency)) + Log.error( + 'Job {0} depends on job {1} that is not defined. It will be ignored.'.format(section, + dependency)) if parser.has_option(section, 'RERUN_DEPENDENCIES'): for dependency in str(AutosubmitConfig.get_option(parser, section, 'RERUN_DEPENDENCIES', @@ -314,7 +316,9 @@ class AutosubmitConfig: if '-' in dependency: dependency = dependency.split('-')[0] if dependency not in sections: - Log.error('Job {0} depends on job {1} that is not defined'.format(section, dependency)) + Log.error( + 'Job {0} depends on job {1} that is not defined. It will be ignored.'.format(section, + dependency)) result = result and AutosubmitConfig.check_is_choice(parser, section, 'RUNNING', False, ['once', 'date', 'member', 'chunk']) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index a0a67d841..dd4b7e083 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -929,6 +929,10 @@ class DicJobs: :rtype: list """ jobs = list() + + if section not in self._dic: + return jobs + dic = self._dic[section] if type(dic) is not dict: jobs.append(dic) diff --git a/test/regression/README b/test/regression/README index 7884bcef7..a7e26a023 100644 --- a/test/regression/README +++ b/test/regression/README @@ -14,6 +14,7 @@ How to run the regression tests path = $PATH_TO_PROJECT/test/regression/db 3) Review the credentials on the platforms config file of each test +or in the 'default_conf' directory 4) Run the 'tests_runner.py' file: -- GitLab From 7dd55cd6b27b680d659ec31adbf4d5e7fa47b2bc Mon Sep 17 00:00:00 2001 From: jlope2 Date: Fri, 28 Oct 2016 09:34:02 +0200 Subject: [PATCH 27/30] Fixed minor pending from #182 --- autosubmit/platforms/lsfplatform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index 961c3aefe..7f906efdf 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -147,8 +147,8 @@ class LsfHeader: # #%QUEUE_DIRECTIVE% #BSUB -J %JOBNAME% - #BSUB -oo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.out - #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%JOBNAME%_%J.err + #BSUB -oo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% + #BSUB -eo %CURRENT_SCRATCH_DIR%/%CURRENT_PROJ%/%CURRENT_USER%/%EXPID%/LOG_%EXPID%/%OUT_LOG_DIRECTIVE% #BSUB -W %WALLCLOCK% #BSUB -n %NUMPROC% %TASKS_PER_NODE_DIRECTIVE% -- GitLab From 29da6588e98aa7822dbdfde7e31f60afc377028a Mon Sep 17 00:00:00 2001 From: jlope2 Date: Fri, 28 Oct 2016 09:48:19 +0200 Subject: [PATCH 28/30] Some tests fixed --- test/unit/test_job_list.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/unit/test_job_list.py b/test/unit/test_job_list.py index 29039ce61..209734ae4 100644 --- a/test/unit/test_job_list.py +++ b/test/unit/test_job_list.py @@ -209,14 +209,14 @@ class TestJobList(TestCase): job_list.update_genealogy = Mock() job_list._job_list = [Job('random-name', 9999, Status.WAITING, 0), Job('random-name2', 99999, Status.WAITING, 0)] - date_list = ['fake-date1', 'fake-date2'] member_list = ['fake-member1', 'fake-member2'] num_chunks = 999 chunk_list = range(1, num_chunks + 1) parameters = {'fake-key': 'fake-value', 'fake-key2': 'fake-value2'} - + graph_mock = Mock() + job_list.graph = graph_mock # act job_list.generate(date_list, member_list, num_chunks, parameters, 'H', 9999, Type.BASH) @@ -230,7 +230,8 @@ class TestJobList(TestCase): cj_args, cj_kwargs = job_list._create_jobs.call_args self.assertEquals(parser_mock, cj_args[1]) self.assertEquals(0, cj_args[2]) - job_list._add_dependencies.assert_called_once_with(date_list, member_list, chunk_list, cj_args[0], parser_mock) + job_list._add_dependencies.assert_called_once_with(date_list, member_list, chunk_list, cj_args[0], parser_mock, + graph_mock) job_list.update_genealogy.assert_called_once_with(True) for job in job_list._job_list: self.assertEquals(parameters, job.parameters) -- GitLab From 43d3c048a504929ce13d0f1de2be4618f2e1a657 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Mon, 31 Oct 2016 16:16:19 +0100 Subject: [PATCH 29/30] Some tests were fixed --- test/regression/default_conf/platforms.conf | 2 +- test/unit/test_dic_jobs.py | 112 ++++++++++---------- 2 files changed, 59 insertions(+), 55 deletions(-) diff --git a/test/regression/default_conf/platforms.conf b/test/regression/default_conf/platforms.conf index 2b3c23edd..7b2d75d9c 100644 --- a/test/regression/default_conf/platforms.conf +++ b/test/regression/default_conf/platforms.conf @@ -5,7 +5,7 @@ VERSION = pbs HOST = cca PROJECT = spesiccf ADD_PROJECT_TO_HOST = false -USER = c3m +USER = c3j SCRATCH_DIR = /scratch/ms TEST_SUITE = True PROCESSORS_PER_NODE = 24 diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 6e7b0c6ea..a9b749c2c 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -116,163 +116,164 @@ class TestDicJobs(TestCase): def test_dic_creates_right_jobs_by_startdate(self): # arrange - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 frequency = 1 - created_job = 'created_job' - self.dictionary.build_job = Mock(return_value=created_job) + self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_startdate(section, priority, frequency, Type.BASH) + self.dictionary._create_jobs_startdate(mock_section.name, priority, frequency, Type.BASH) # assert self.assertEquals(len(self.date_list), self.dictionary.build_job.call_count) - self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list)) + self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) for date in self.date_list: - self.assertEquals(self.dictionary._dic[section][date], created_job) + self.assertEquals(self.dictionary._dic[mock_section.name][date], mock_section) def test_dic_creates_right_jobs_by_member(self): # arrange - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 frequency = 1 - created_job = 'created_job' - self.dictionary.build_job = Mock(return_value=created_job) + self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_member(section, priority, frequency, Type.BASH) + self.dictionary._create_jobs_member(mock_section.name, priority, frequency, Type.BASH) # assert self.assertEquals(len(self.date_list) * len(self.member_list), self.dictionary.build_job.call_count) - self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list)) + self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) for date in self.date_list: for member in self.member_list: - self.assertEquals(self.dictionary._dic[section][date][member], created_job) + self.assertEquals(self.dictionary._dic[mock_section.name][date][member], mock_section) def test_dic_creates_right_jobs_by_chunk(self): # arrange - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 frequency = 1 - created_job = 'created_job' - self.dictionary.build_job = Mock(return_value=created_job) + self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, dict()) + self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, dict()) # assert self.assertEquals(len(self.date_list) * len(self.member_list) * len(self.chunk_list), self.dictionary.build_job.call_count) - self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list)) + self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) for date in self.date_list: for member in self.member_list: for chunk in self.chunk_list: - self.assertEquals(self.dictionary._dic[section][date][member][chunk], created_job) + self.assertEquals(self.dictionary._dic[mock_section.name][date][member][chunk], mock_section) def test_dic_creates_right_jobs_by_chunk_with_frequency_3(self): # arrange - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 frequency = 3 - created_job = 'created_job' - self.dictionary.build_job = Mock(return_value=created_job) + self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH) + self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH) # assert self.assertEquals(len(self.date_list) * len(self.member_list) * (len(self.chunk_list) / frequency), self.dictionary.build_job.call_count) - self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list)) + self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) def test_dic_creates_right_jobs_by_chunk_with_frequency_4(self): # arrange - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 frequency = 4 - created_job = 'created_job' - self.dictionary.build_job = Mock(return_value=created_job) + self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH) + self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH) # assert # you have to multiply to the round upwards (ceil) of the next division self.assertEquals( len(self.date_list) * len(self.member_list) * math.ceil(len(self.chunk_list) / float(frequency)), self.dictionary.build_job.call_count) - self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list)) + self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) def test_dic_creates_right_jobs_by_chunk_with_date_synchronize(self): # arrange - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 frequency = 1 created_job = 'created_job' - self.dictionary.build_job = Mock(return_value=created_job) + self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, 'date') + self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, 'date') # assert self.assertEquals(len(self.chunk_list), self.dictionary.build_job.call_count) - self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list)) + self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) for date in self.date_list: for member in self.member_list: for chunk in self.chunk_list: - self.assertEquals(self.dictionary._dic[section][date][member][chunk], created_job) + self.assertEquals(self.dictionary._dic[mock_section.name][date][member][chunk], mock_section) def test_dic_creates_right_jobs_by_chunk_with_date_synchronize_and_frequency_4(self): # arrange - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 frequency = 4 - created_job = 'created_job' - self.dictionary.build_job = Mock(return_value=created_job) + self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, 'date') + self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, 'date') # assert self.assertEquals(math.ceil(len(self.chunk_list) / float(frequency)), self.dictionary.build_job.call_count) - self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list)) + self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) def test_dic_creates_right_jobs_by_chunk_with_member_synchronize(self): # arrange - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 frequency = 1 - created_job = 'created_job' - self.dictionary.build_job = Mock(return_value=created_job) + self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, 'member') + self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, 'member') # assert self.assertEquals(len(self.date_list) * len(self.chunk_list), self.dictionary.build_job.call_count) - self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list)) + self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) for date in self.date_list: for member in self.member_list: for chunk in self.chunk_list: - self.assertEquals(self.dictionary._dic[section][date][member][chunk], created_job) + self.assertEquals(self.dictionary._dic[mock_section.name][date][member][chunk], mock_section) def test_dic_creates_right_jobs_by_chunk_with_member_synchronize_and_frequency_4(self): # arrange - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 frequency = 4 - created_job = 'created_job' - self.dictionary.build_job = Mock(return_value=created_job) + self.dictionary.build_job = Mock(return_value=mock_section) # act - self.dictionary._create_jobs_chunk(section, priority, frequency, Type.BASH, 'member') + self.dictionary._create_jobs_chunk(mock_section.name, priority, frequency, Type.BASH, 'member') # assert self.assertEquals(len(self.date_list) * math.ceil(len(self.chunk_list) / float(frequency)), self.dictionary.build_job.call_count) - self.assertEquals(len(self.dictionary._dic[section]), len(self.date_list)) + self.assertEquals(len(self.dictionary._dic[mock_section.name]), len(self.date_list)) def test_create_job_creates_a_job_with_right_parameters(self): # arrange @@ -496,14 +497,17 @@ class TestDicJobs(TestCase): self.dictionary._get_date.assert_any_call(list(), dic, date, member, chunk) def test_create_jobs_once_calls_create_job_and_assign_correctly_its_return_value(self): - section = 'fake-section' + mock_section = Mock() + mock_section.name = 'fake-section' priority = 999 - self.dictionary.build_job = Mock(return_value='fake-return') + self.dictionary.build_job = Mock(return_value=mock_section) + self.job_list.graph.add_node = Mock() - self.dictionary._create_jobs_once(section, priority, Type.BASH, dict()) + self.dictionary._create_jobs_once(mock_section.name, priority, Type.BASH, dict()) - self.assertEquals('fake-return', self.dictionary._dic[section]) - self.dictionary.build_job.assert_called_once_with(section, priority, None, None, None, Type.BASH, {}) + self.assertEquals(mock_section, self.dictionary._dic[mock_section.name]) + self.dictionary.build_job.assert_called_once_with(mock_section.name, priority, None, None, None, Type.BASH, {}) + self.job_list.graph.add_node.assert_called_once_with(mock_section.name) class FakeBasicConfig: -- GitLab From 06b0f517c55a273b3f2ad5a5f0e73d6b9bed1599 Mon Sep 17 00:00:00 2001 From: jlope2 Date: Mon, 31 Oct 2016 16:24:40 +0100 Subject: [PATCH 30/30] Bumped version number to 3.7.4 --- CHANGELOG | 9 +++++++++ VERSION | 2 +- docs/source/conf.py | 2 +- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 394ac5c57..3ad783995 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,12 @@ +3.7.4 + Forward dependencies + Performance improvements + Log files copied into LOCAL platform + PROCESSORS_PER_NODE/TASKS now optional + Exclusivity for MN3 (with Paramiko) + THREADS optional for ECMWF + Minor bugfixes + 3.7.3 Fixed error with logs directives (err & out were swapped) Added new option for MN3: SCRATCH_FREE_SPACE diff --git a/VERSION b/VERSION index c1e43e6d4..0833a98f1 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.7.3 +3.7.4 diff --git a/docs/source/conf.py b/docs/source/conf.py index dc6320c1c..1bbb67bbd 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -64,7 +64,7 @@ author = u'Earth Science Department, Barcelona Supercomputing Center, BSC' # The short X.Y version. version = '3.7' # The full version, including alpha/beta/rc tags. -release = '3.7.3' +release = '3.7.4' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. -- GitLab