diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 138fe5b60b35a2b451cafb3c2cd48c6e6395933d..4d0b179b1edd5259932940ef7ab8bf41da0adff3 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -845,8 +845,8 @@ class AutosubmitConfig(object): for option in self._conf_parser.options(section): parameters[option] = self._conf_parser.get(section, option) - project_type = self.get_project_type() - if project_type != "none" and self._proj_parser is not None: + parameters['PROJECT_TYPE'] = self.get_project_type() + if parameters['PROJECT_TYPE'] != "none" and self._proj_parser is not None: # Load project parameters Log.debug("Loading project parameters...") parameters2 = parameters.copy() diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index b313a427b2cf178423e6f6f93578f9db289f9856..8075b6301e2634d58bf6cd3f35f9afab5a20e5dc 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -330,7 +330,7 @@ class JobData(object): :return: Queueing time in seconds. :rtype: int """ - if self.status in ["SUBMITTED", "QUEUING", "RUNNING", "COMPLETED", "HELD", "PREPARED", "FAILED"]: + if self.status in ["SUBMITTED", "QUEUING", "RUNNING", "COMPLETED", "HELD", "PREPARED", "FAILED", "SKIPPED"]: queue = int((self.start if self.start > 0 else time.time()) - self.submit) if queue > 0: diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 67e524b333b7ef0820483b5ed9b26c9f8fb0a6d2..c6355c21cf62cdfcac9687312d5b143a80b553f1 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -99,6 +99,7 @@ class Job(object): self.delay = None self.frequency = None self.synchronize = None + self.skippable = False self.repacked = 0 self._long_name = None self.long_name = name @@ -794,10 +795,8 @@ class Job(object): parameters.update(default_parameters) parameters['JOBNAME'] = self.name parameters['FAIL_COUNT'] = str(self.fail_count) - parameters['SDATE'] = date2str(self.date, self.date_format) parameters['MEMBER'] = self.member - if hasattr(self, 'retrials'): parameters['RETRIALS'] = self.retrials @@ -935,19 +934,22 @@ class Job(object): :rtype: str """ parameters = self.parameters - if parameters['PROJECT_TYPE'].lower() != "none": - template_file = open(os.path.join( - as_conf.get_project_dir(), self.file), 'r') - template = template_file.read() - else: - if self.type == Type.BASH: - template = 'sleep 5' - elif self.type == Type.PYTHON: - template = 'time.sleep(5)' - elif self.type == Type.R: - template = 'Sys.sleep(5)' + try: # issue in tests with project_type variable while using threads + if as_conf.get_project_type().lower() != "none": + template_file = open(os.path.join( + as_conf.get_project_dir(), self.file), 'r') + template = template_file.read() else: - template = '' + if self.type == Type.BASH: + template = 'sleep 5' + elif self.type == Type.PYTHON: + template = 'time.sleep(5)' + elif self.type == Type.R: + template = 'Sys.sleep(5)' + else: + template = '' + except: + template = '' if self.type == Type.BASH: snippet = StatisticsSnippetBash @@ -957,7 +959,6 @@ class Job(object): snippet = StatisticsSnippetR else: raise Exception('Job type {0} not supported'.format(self.type)) - template_content = self._get_template_content( as_conf, snippet, template) diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 129e7382263a001217ac5b148f474ae7f943a384..f80bd67364ad9f4f7ef31371a328990a38a05d2c 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -32,13 +32,14 @@ class Status: COMPLETED = 5 HELD = 6 PREPARED = 7 + SKIPPED = 8 FAILED = -1 UNKNOWN = -2 SUSPENDED = -3 ####### # Note: any change on constants must be applied on the dict below!!! VALUE_TO_KEY = {-3: 'SUSPENDED', -2: 'UNKNOWN', -1: 'FAILED', 0: 'WAITING', 1: 'READY', - 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED', 6: 'HELD', 7: 'PREPARED'} + 2: 'SUBMITTED', 3: 'QUEUING', 4: 'RUNNING', 5: 'COMPLETED', 6: 'HELD', 7: 'PREPARED', 8: 'SKIPPED'} def retval(self, value): return getattr(self, value) @@ -62,12 +63,13 @@ class bcolors: QUEUING = '\033[35;1m' RUNNING = '\033[32m' COMPLETED = '\033[33m' + SKIPPED = '\033[33m' PREPARED = '\033[34;2m' HELD = '\033[34;1m' FAILED = '\033[31m' SUSPENDED = '\033[31;1m' CODE_TO_COLOR = {-3: SUSPENDED, -2: UNKNOWN, -1: FAILED, 0: WAITING, 1: READY, - 2: SUBMITTED, 3: QUEUING, 4: RUNNING, 5: COMPLETED, 6: HELD, 7: PREPARED} + 2: SUBMITTED, 3: QUEUING, 4: RUNNING, 5: COMPLETED, 6: HELD, 7: PREPARED, 8: SKIPPED} class Type: diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index e57bb11939d91f767bc53d5936c22e9d230301a9..ad948e2bb9a2cad9230fa35c35782ef8afe14a55 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -53,6 +53,7 @@ class DicJobs: self.default_retrials = default_retrials self._dic = dict() + def read_section(self, section, priority, default_job_type, jobs_data=dict()): """ Read a section from jobs conf and creates all jobs for it @@ -345,6 +346,12 @@ class DicJobs: job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] job.synchronize = self.get_option(section, "SYNCHRONIZE", None) job.check_warnings = str(self.get_option(section, "SHOW_CHECK_WARNINGS", 'false')).lower() + job.running = self.get_option(section, 'RUNNING', 'once').lower() + + if self.get_option(section, "SKIPPABLE", "False").lower() == "true": + job.skippable = True + else: + job.skippable = False if job.check_warnings == 'true': job.check_warnings = True else: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index b69268b42450fec5cd8b7628860898853c648bd1..7d9b0a76fd63c341e7c1c79ce2ce3574655bae7b 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -886,7 +886,18 @@ class JobList(object): prepared = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.PREPARED] return prepared + def get_skipped(self, platform=None): + """ + Returns a list of prepared jobs + :param platform: job platform + :type platform: HPCPlatform + :return: prepared jobs + :rtype: list + """ + skipped = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and + job.status == Status.SKIPPED] + return skipped def get_waiting(self, platform=None, wrapper=False): """ Returns a list of jobs waiting @@ -1196,7 +1207,15 @@ class JobList(object): move(os.path.join(self._persistence_path, self._update_file), os.path.join(self._persistence_path, self._update_file + "_" + output_date)) - + def get_skippable_jobs(self, jobs_in_wrapper): + job_list_skip = [job for job in self.get_job_list() if job.skippable is True and ( job.status == Status.QUEUING or job.status == Status.RUNNING or job.status == Status.COMPLETED or job.status == Status.READY) and jobs_in_wrapper.find(job.section) == -1 ] + skip_by_section = dict() + for job in job_list_skip: + if job.section not in skip_by_section: + skip_by_section[job.section] = [job] + else: + skip_by_section[job.section].append(job) + return skip_by_section @property def parameters(self): """ @@ -1265,10 +1284,26 @@ class JobList(object): save = True Log.debug( "Job is failed".format(job.name)) + jobs_to_skip = self.get_skippable_jobs(as_conf.get_wrapper_jobs()) # Get A Dict with all jobs that are listed as skipabble + for section in jobs_to_skip: + for job in jobs_to_skip[section]: + if job.status == Status.READY or job.status == Status.QUEUING: # Check only jobs to be pending of canceled if not started + if job.running == 'chunk': + for related_job in jobs_to_skip[section]: + if job.chunk < related_job.chunk: # Check if there is some related job with an higher chunk + if job.status == Status.QUEUING: + job.platform.send_command(job.platform.cancel_cmd + " " + str(job.id), ignore_log=True) + job.status = Status.SKIPPED + elif job.running == 'member': + members = as_conf.get_member_list() + for related_job in jobs_to_skip[section]: + if members.index(job.member) < members.index(related_job.member): + job.status = Status.SKIPPED + + # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): if job.synchronize is not None: - #Log.debug('Updating SYNC jobs') tmp = [ parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) != len(job.parents): @@ -1276,13 +1311,12 @@ class JobList(object): save = True Log.debug( "Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) - Log.debug('Update finished') + #Log.debug('Update finished') Log.debug('Updating WAITING jobs') if not fromSetStatus: all_parents_completed = [] for job in self.get_waiting(): - tmp = [ - parent for parent in job.parents if parent.status == Status.COMPLETED] + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED or parent.status == Status.SKIPPED] if job.parents is None or len(tmp) == len(job.parents): job.status = Status.READY job.hold = False @@ -1312,7 +1346,7 @@ class JobList(object): for job in self.get_waiting_remote_dependencies('slurm'): if job.name not in all_parents_completed: tmp = [parent for parent in job.parents if ( - (parent.status == Status.COMPLETED or parent.status == Status.QUEUING or parent.status == Status.RUNNING) and "setup" not in parent.name.lower())] + (parent.status == Status.SKIPPED or parent.status == Status.COMPLETED or parent.status == Status.QUEUING or parent.status == Status.RUNNING) and "setup" not in parent.name.lower())] if len(tmp) == len(job.parents): job.status = Status.PREPARED job.hold = True diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 7e219c689c898bc36e57806aedd937c1b4f2e945..c913d865aa4280030f4ffb3d77ebaaef1956bedb 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -118,6 +118,7 @@ class JobPackageBase(object): def _create_common_script(self): pass + def submit(self, configuration, parameters,only_generate=False,hold=False): """ :para configuration: Autosubmit basic configuration \n @@ -141,21 +142,21 @@ class JobPackageBase(object): try: if len(self.jobs) < thread_number: for job in self.jobs: - if job.check.lower() == Job.CHECK_ON_SUBMISSION.lower(): - if only_generate: - exit=True - break - if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - if configuration.get_project_type().lower() != "none": - raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) - if not job.check_script(configuration, parameters,show_logs=job.check_warnings): - Log.warning("Script {0} check failed",job.name) - Log.warning("On submission script has some empty variables") - else: - Log.result("Script {0} OK",job.name) - job.update_parameters(configuration, parameters) - # looking for directives on jobs - self._custom_directives = self._custom_directives | set(job.custom_directives) + if job.check.lower() == Job.CHECK_ON_SUBMISSION.lower(): + if only_generate: + exit=True + break + if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + if configuration.get_project_type().lower() != "none": + raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) + if not job.check_script(configuration, parameters,show_logs=job.check_warnings): + Log.warning("Script {0} check failed",job.name) + Log.warning("On submission script has some empty variables") + else: + Log.result("Script {0} OK",job.name) + job.update_parameters(configuration, parameters) + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) else: Lhandle = list() for i in xrange(0, len(self.jobs), chunksize): @@ -203,6 +204,7 @@ class JobPackageSimple(JobPackageBase): def __init__(self, jobs): super(JobPackageSimple, self).__init__(jobs) self._job_scripts = {} + def _create_scripts(self, configuration): for job in self.jobs: self._job_scripts[job.name] = job.create_script(configuration) diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index ce97cd319b34f235a9c71da0b0973f1c4ca6e2fc..99f307d369643f4b8707afb820849ba7ebbeeedc 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -43,7 +43,7 @@ class Monitor: """Class to handle monitoring of Jobs at HPC.""" _table = dict([(Status.UNKNOWN, 'white'), (Status.WAITING, 'gray'), (Status.READY, 'lightblue'),(Status.PREPARED, 'skyblue'), (Status.SUBMITTED, 'cyan'), (Status.HELD, 'salmon'), (Status.QUEUING, 'pink'), (Status.RUNNING, 'green'), - (Status.COMPLETED, 'yellow'), (Status.FAILED, 'red'), (Status.SUSPENDED, 'orange')]) + (Status.COMPLETED, 'yellow'), (Status.FAILED, 'red'), (Status.SUSPENDED, 'orange'), (Status.SKIPPED, 'lightyellow')]) @staticmethod def color_status(status): @@ -71,7 +71,8 @@ class Monitor: return Monitor._table[Status.RUNNING] elif status == Status.COMPLETED: return Monitor._table[Status.COMPLETED] - + elif status == Status.SKIPPED: + return Monitor._table[Status.SKIPPED] elif status == Status.FAILED: return Monitor._table[Status.FAILED] elif status == Status.SUSPENDED: @@ -103,6 +104,7 @@ class Monitor: fillcolor=self._table[Status.READY])) legend.add_node(pydotplus.Node(name='PREPARED', shape='box', style="filled", fillcolor=self._table[Status.PREPARED])) + legend.add_node(pydotplus.Node(name='SUBMITTED', shape='box', style="filled", fillcolor=self._table[Status.SUBMITTED])) legend.add_node(pydotplus.Node(name='HELD', shape='box', style="filled", @@ -111,13 +113,15 @@ class Monitor: fillcolor=self._table[Status.QUEUING])) legend.add_node(pydotplus.Node(name='RUNNING', shape='box', style="filled", fillcolor=self._table[Status.RUNNING])) + legend.add_node(pydotplus.Node(name='SKIPPED', shape='box', style="filled", + fillcolor=self._table[Status.SKIPPED])) legend.add_node(pydotplus.Node(name='COMPLETED', shape='box', style="filled", fillcolor=self._table[Status.COMPLETED])) - legend.add_node(pydotplus.Node(name='FAILED', shape='box', style="filled", fillcolor=self._table[Status.FAILED])) legend.add_node(pydotplus.Node(name='SUSPENDED', shape='box', style="filled", fillcolor=self._table[Status.SUSPENDED])) + graph.add_subgraph(legend) exp = pydotplus.Subgraph(graph_name='Experiment', label=expid) diff --git a/docs/source/usage/new_job.rst b/docs/source/usage/new_job.rst index f6270aa655a7464e004963479453d25a59ca062f..0e1b63367af5e0c437cc389d447e5a36b2eadfee 100644 --- a/docs/source/usage/new_job.rst +++ b/docs/source/usage/new_job.rst @@ -56,6 +56,8 @@ There are also other, less used features that you can use: * CUSTOM_DIRECTIVES: Custom directives for the HPC resource manager headers of the platform used for that job. +* SKIPPABLE: When this is true, the job will be able to skip it work if there is an higher chunk or member already ready, running, queuing or in complete status. + Workflow examples: ------------------ @@ -114,5 +116,29 @@ In this workflow you can see an illustrated example of select_chunks used in an :align: center :alt: select_chunks_workflow +Example 3: + +In this workflow you can see an illustrated example of SKIPPABLE parameter used in an dummy workflow. + +.. code-block:: ini + [SIM] + FILE = sim.sh + DEPENDENCIES = INI POST-1 + WALLCLOCK = 00:15 + RUNNING = chunk + QUEUE = debug + SKIPPABLE = TRUE + [POST] + FILE = post.sh + DEPENDENCIES = SIM + WALLCLOCK = 00:05 + RUNNING = member + #QUEUE = debug + +.. figure:: workflows/skip.png + :name: simple + :width: 100% + :align: center + :alt: skip_workflow diff --git a/docs/source/workflows/skip.png b/docs/source/workflows/skip.png new file mode 100644 index 0000000000000000000000000000000000000000..6083dad8b3c16a5667a366adf2f210d3896c50bf Binary files /dev/null and b/docs/source/workflows/skip.png differ diff --git a/test/unit/test_autosubmit_config.py b/test/unit/test_autosubmit_config.py index 626b0ca74225097850bcfd3af1bc851f18e97f29..a0b1aea63a31ec2f272e332e3d56b0e20cce7028 100644 --- a/test/unit/test_autosubmit_config.py +++ b/test/unit/test_autosubmit_config.py @@ -299,7 +299,7 @@ class TestAutosubmitConfig(TestCase): returned_parameters = config.load_parameters() # assert - self.assertEquals(6, len(returned_parameters)) + self.assertEquals(7, len(returned_parameters)) self.assertTrue(returned_parameters.has_key('dummy-option1')) self.assertTrue(returned_parameters.has_key('dummy-option2')) self.assertTrue(returned_parameters.has_key('dummy-option3'))