diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index ea3e0b9f1ac5b8509020001cefd08083d5be42d4..57d20f2750ec4e47d364113507c57f0ed29fb793 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -17,17 +17,17 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from log.log import Log, AutosubmitCritical, AutosubmitError -from autosubmit.job.job_common import Status, Type +import operator from bscearth.utils.date import sum_str_hours -from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal, \ - JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal, JobPackageBase -from operator import attrgetter from math import ceil -import operator -from collections import defaultdict +from operator import attrgetter from typing import List +from autosubmit.job.job_common import Status, Type +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal, \ + JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal, JobPackageBase +from log.log import Log, AutosubmitCritical + class JobPackager(object): """ @@ -437,10 +437,10 @@ class JobPackager(object): wrapper_limits["min_v"], wrapper_limits["min"], len(active_jobs)), 6013) else: - message = "Wrapper couldn't be formed under {0} POLICY due minimum limit not being reached: [wrappeable:{4} < defined_min:{5}] [wrappeable_h:{1} < defined_min_h:{2}]|[wrappeable_v:{3} < defined_min_v:{4}] ".format( + message = "Wrapper couldn't be formed under {0} POLICY due minimum limit not being reached: [wrappeable:{5} <= defined_min:{6}] [wrappeable_h:{1} <= defined_min_h:{2}]|[wrappeable_v:{3} <= defined_min_v:{4}] ".format( self.wrapper_policy[self.current_wrapper_section], min_h, - wrapper_limits["min_h"], min_v, wrapper_limits["min_v"], - wrapper_limits["min"], len(active_jobs)) + wrapper_limits["min_h"], min_v, wrapper_limits["min_v"], len(p.jobs), + wrapper_limits["min"]) if hard_deadlock: message += "\nCheck your configuration: The next wrappeable job can't be wrapped until some of inner jobs of current packages finishes which is imposible" if min_v > 1: @@ -481,13 +481,12 @@ class JobPackager(object): if len(active_jobs) > 0: if show_log: Log.printlog( - "Wrapper policy is set to MIXED and there are not enough jobs to form a wrapper.[wrappeable:{4} < defined_min:{5}] [wrappeable_h:{0} < defined_min_h:{1}]|[wrappeable_v:{2} < defined_min_v:{3}] waiting until the wrapper can be formed.".format( + "Wrapper policy is set to MIXED and there are not enough jobs to form a wrapper.[wrappeable:{4} <= defined_min:{5}] [wrappeable_h:{0} <= defined_min_h:{1}]|[wrappeable_v:{2} <= defined_min_v:{3}] waiting until the wrapper can be formed.".format( min_h, wrapper_limits["min_h"], min_v, wrapper_limits["min_v"],wrapper_limits["min"],len(active_jobs)), 6013) else: - message = "Wrapper couldn't be formed under {0} POLICY due minimum limit not being reached: [wrappeable:{4} < defined_min:{5}] [wrappeable_h:{1} < defined_min_h:{2}]|[wrappeable_v:{3} < defined_min_v:{4}] ".format( - self.wrapper_policy[self.current_wrapper_section], min_h, - wrapper_limits["min_h"], min_v, wrapper_limits["min_v"],wrapper_limits["min"],len(active_jobs)) + message = "Wrapper couldn't be formed under {0} POLICY due minimum limit not being reached: [wrappeable:{5} <= defined_min:{6}] [wrappeable_h:{1} <= defined_min_h:{2}]|[wrappeable_v:{3} <= defined_min_v:{4}] ".format( + self.wrapper_policy[self.current_wrapper_section], min_h,wrapper_limits["min_h"],min_v, wrapper_limits["min_v"], len(p.jobs),wrapper_limits["min"]) if hard_deadlock: message += "\nCheck your configuration: The next wrappeable job can't be wrapped until some of inner jobs of current packages finishes which is imposible" if min_v > 1: diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 8e3c3409ac644e45bd7cba111c42faf4e9e0e51a..b98c1fc727229da5416c32e63ef0536e61f03dad 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -24,6 +24,7 @@ except ImportError: # noinspection PyCompatibility from ConfigParser import SafeConfigParser +import multiprocessing import os import random import time @@ -37,7 +38,6 @@ from autosubmit.job.job import Job from bscearth.utils.date import sum_str_hours from threading import Thread, Lock from typing import List -import multiprocessing import tarfile import datetime import re @@ -101,21 +101,21 @@ class JobPackageBase(object): if str(job.check).lower() == str(Job.CHECK_ON_SUBMISSION).lower(): if only_generate: break - if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - lock.acquire() - if str(configuration.get_project_type()).lower() != "none": - raise AutosubmitCritical( - "Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format( - job.name), 7014) - lock.release() - if not job.check_script(configuration, parameters, show_logs=job.check_warnings): - Log.warning("Script {0} check failed", job.name) - Log.warning("On submission script has some empty variables") - else: - Log.result("Script {0} OK", job.name) - lock.acquire() - job.update_parameters(configuration, parameters) - lock.release() + if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + lock.acquire() + if str(configuration.get_project_type()).lower() != "none": + raise AutosubmitCritical( + "Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format( + job.name), 7014) + lock.release() + if not job.check_script(configuration, parameters, show_logs=job.check_warnings): + Log.warning("Script {0} check failed", job.name) + Log.warning("On submission script has some empty variables") + else: + Log.result("Script {0} OK", job.name) + # lock.acquire() + # job.update_parameters(configuration, parameters) + # lock.release() # looking for directives on jobs self._custom_directives = self._custom_directives | set(job.custom_directives) @threaded @@ -137,6 +137,7 @@ class JobPackageBase(object): """ exit=False thread_number = multiprocessing.cpu_count() + #thread_number = 1 if len(self.jobs) > 2500: thread_number = thread_number * 2 elif len(self.jobs) > 5000: @@ -149,13 +150,16 @@ class JobPackageBase(object): try: # get one job of each section jobs by section if only_generate: - sections = configuration.get_wrapper_jobs(self.current_wrapper_section) - if "&" in sections: - sections.split("&") - elif " " in sections: - sections.split(" ") + if hasattr(configuration, 'current_wrapper_section'): + sections = configuration.get_wrapper_jobs(self.current_wrapper_section) + if "&" in sections: + sections.split("&") + elif " " in sections: + sections.split(" ") + else: + sections = [sections] else: - sections = [sections] + sections = [self.jobs[0].section] for section in sections: if str(configuration._jobs_parser.get_option(section, "CHECK", 'True')).lower() == str(Job.CHECK_ON_SUBMISSION).lower(): exit = True @@ -183,8 +187,12 @@ class JobPackageBase(object): except BaseException as e: original = e if not exit: - raise AutosubmitCritical( - "Error on {1}, template [{0}] still does not exists in running time(check=on_submission actived)\n{2} ".format(self.jobs[0].file,self.jobs[0].name,e), 7014) + if not os.path.exists(os.path.join(configuration.get_project_dir(), self.jobs[0].file)) and str(configuration.get_project_type()).lower() != "none": + raise AutosubmitCritical( + "Error on {1}, template [{0}] still does not exists in running time(check=on_submission actived)\n{2} ".format(self.jobs[0].file,self.jobs[0].name,e), 7014) + else: + raise AutosubmitCritical("Error in Custom_directives of {0} \n{1}, Check if it format is correct and there are no extra commas".format(self.jobs[0].section,e),7014) + else: raise AutosubmitCritical(original,7014) Log.debug("Creating Scripts") diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index a929d6c43709bb8bbe179cdfd6237e061a81e018..fa89f2e551e03f65b175830ade56b8d98b899652 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -42,19 +42,18 @@ class TestJobPackage(TestCase): self.assertEquals(self.platform, self.job_package.platform) @patch('os.path.exists') - def test_job_package_submission(self, os_mock): + @patch('__builtin__.open') + def test_job_package_submission(self, os_mock, open_mock): # arrange - write_mock = Mock().write = Mock() + open_mock.return_value = MagicMock() os_mock.return_value = True for job in self.job_package.jobs: - job._tmp_path = Mock() + job._tmp_path = "fake-path" job.name = "fake-name" - job._get_paramiko_template = Mock("false","empty") + job._get_paramiko_template = Mock("false", "empty") job.file = "fake-file" job.update_parameters = MagicMock(return_value="fake-params") - job.parameters = "fake-params" - - + job.parameters = {"fake-params": "fake-value"} self.job_package._create_scripts = Mock() self.job_package._send_files = Mock() @@ -63,11 +62,10 @@ class TestJobPackage(TestCase): configuration.get_project_type = Mock(return_value='fake-type') configuration.get_project_dir = Mock(return_value='fake-dir') configuration.get_project_name = Mock(return_value='fake-name') - # act self.job_package.submit(configuration, 'fake-params') # assert - for job in self.jobs: + for job in self.job_package.jobs: job.update_parameters.assert_called_once_with(configuration, 'fake-params') self.job_package._create_scripts.is_called_once_with() self.job_package._send_files.is_called_once_with()