diff --git a/.gitignore b/.gitignore index bd137496e1b1e189f0cc66805504ddecebbd4af8..c748715fbfd1d4098d553839b961cc12f2b083fe 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ .*.sw* .*.pyc* .*.log* +/cover/ +/.coverage diff --git a/CHANGELOG b/CHANGELOG index 1d24e8584ec6e15e3629ef8a07eb7783d0773727..39830fa43307f71aeeff3f8582fcaad093ed9810 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,6 @@ +3.8.0 + First version with LSF arrays + 3.7.6 Fixed refresh Fixed recovery for ECMWF diff --git a/VERSION b/VERSION index 897e56be0b6911813b9baecd20ea8de49244b917..19811903a7f7584d7aa752ea29bbf9d74cf78b47 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.7.6 +3.8.0 diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index f5ceb7c940963832010264c7835c4a16c5a9e4b8..6bdecc01aaaa66f6fb58eafffcb77a6d24ed7a3f 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -622,9 +622,9 @@ class Autosubmit: if job.platform_name is None: job.platform_name = hpcarch # noinspection PyTypeChecker - job.set_platform(submitter.platforms[job.platform_name.lower()]) + job.platform = submitter.platforms[job.platform_name.lower()] # noinspection PyTypeChecker - platforms_to_test.add(job.get_platform()) + platforms_to_test.add(job.platform) job_list.check_scripts(as_conf) @@ -705,45 +705,13 @@ class Autosubmit: """ save = False for platform in platforms_to_test: - jobs_in_queue = job_list.get_in_queue(platform) - jobs_available = job_list.get_ready(platform) - if len(jobs_available) == 0: - continue - - Log.info("\nJobs ready for {1}: {0}", len(jobs_available), platform.name) - - max_jobs = platform.total_jobs - max_waiting_jobs = platform.max_waiting_jobs - waiting = len(job_list.get_submitted(platform) + job_list.get_queuing(platform)) - available = max_waiting_jobs - waiting - - if min(available, len(jobs_available)) == 0: - Log.debug("Number of jobs ready: {0}", len(jobs_available)) - Log.debug("Number of jobs available: {0}", available) - elif min(available, len(jobs_available)) > 0 and len(job_list.get_in_queue(platform)) <= max_jobs: - Log.info("Jobs to submit: {0}", min(available, len(jobs_available))) - - s = sorted(jobs_available, key=lambda k: k.long_name.split('_')[1][:6]) - list_of_jobs_avail = sorted(s, key=lambda k: k.priority, reverse=True) - - for job in list_of_jobs_avail[0:min(available, len(jobs_available), max_jobs - len(jobs_in_queue))]: - job.update_parameters(as_conf, job_list.parameters) - script_name = job.create_script(as_conf) + for job_package in job_list.get_ready_packages(platform): try: - platform.send_file(script_name) - platform.remove_stat_file(job.name) - platform.remove_completed_file(job.name) - job.id = platform.submit_job(job, script_name) + job_package.submit(as_conf, job_list.parameters) + save = True except Exception: - Log.error("{0} submission failed", job.name) + Log.error("{0} submission failed", platform.name) continue - if job.id is None: - continue - Log.info("{0} submitted", job.name) - # set status to "submitted" - job.status = Status.SUBMITTED - job.write_submit_time() - save = True return save @staticmethod @@ -1028,7 +996,7 @@ class Autosubmit: if job.platform_name is None: job.platform_name = hpcarch # noinspection PyTypeChecker - job.set_platform(platforms[job.platform_name.lower()]) + job.platform = platforms[job.platform_name.lower()] # noinspection PyTypeChecker platforms_to_test.add(platforms[job.platform_name.lower()]) @@ -1043,9 +1011,9 @@ class Autosubmit: if job.platform_name is None: job.platform_name = hpcarch # noinspection PyTypeChecker - job.set_platform(platforms[job.platform_name.lower()]) + job.platform = platforms[job.platform_name.lower()] - if job.get_platform().get_completed_files(job.name, 0): + if job.platform.get_completed_files(job.name, 0): job.status = Status.COMPLETED Log.info("CHANGED job '{0}' status to COMPLETED".format(job.name)) elif job.status != Status.SUSPENDED: @@ -1112,7 +1080,7 @@ class Autosubmit: if job.platform_name is None: job.platform_name = hpcarch # noinspection PyTypeChecker - job.set_platform(submitter.platforms[job.platform_name.lower()]) + job.platform = submitter.platforms[job.platform_name.lower()] job.update_parameters(as_conf, job_list.parameters) return job_list.check_scripts(as_conf) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 3d3e479a434baf6084c29679a86ce3c6b9cc51f8..5181d240dd409aa31a8c03a2e45814dc4b79c07e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -30,7 +30,7 @@ from autosubmit.config.basicConfig import BasicConfig from autosubmit.date.chunk_date_lib import * -class Job: +class Job(object): """ Class to handle all the tasks with Jobs at HPC. A job is created by default with a name, a jobid, a status and a type. @@ -50,7 +50,7 @@ class Job: def __str__(self): return "{0} STATUS: {1}".format(self.name, self.status) - def __init__(self, name, jobid, status, priority): + def __init__(self, name, job_id, status, priority): self._platform = None self._queue = None self.platform_name = None @@ -71,10 +71,10 @@ class Job: self.type = Type.BASH self.scratch_free_space = None - self.id = jobid + self.id = job_id self.file = None - self.out_filename = '' - self.err_filename = '' + self._local_logs = ('', '') + self._remote_logs = ('', '') self.status = status self.priority = priority self._parents = set() @@ -107,20 +107,27 @@ class Job: Log.debug('FAIL_COUNT: {0}', self.fail_count) Log.debug('EXPID: {0}', self.expid) - # Properties @property def parents(self): """ - Return parent jobs list + Returns parent jobs list :return: parent jobs :rtype: set """ return self._parents - def get_platform(self): + @parents.setter + def parents(self, parents): """ - Returns the platforms to be used by the job. Chooses between serial and parallel platforms + Sets the parents job list + """ + self._parents = parents + + @property + def platform(self): + """ + Returns the platform to be used by the job. Chooses between serial and parallel platforms :return HPCPlatform object for the job to use :rtype: HPCPlatform @@ -130,7 +137,8 @@ class Job: else: return self._platform.serial_platform - def set_platform(self, value): + @platform.setter + def platform(self, value): """ Sets the HPC platforms to be used by the job. @@ -139,7 +147,8 @@ class Job: """ self._platform = value - def get_queue(self): + @property + def queue(self): """ Returns the queue to be used by the job. Chooses between serial and parallel platforms @@ -153,7 +162,8 @@ class Job: else: return self._platform.serial_platform.serial_queue - def set_queue(self, value): + @queue.setter + def queue(self, value): """ Sets the queue to be used by the job. @@ -172,6 +182,13 @@ class Job: """ return self._children + @children.setter + def children(self, children): + """ + Sets the children job list + """ + self._children = children + @property def long_name(self): """ @@ -195,6 +212,23 @@ class Job: """ self._long_name = value + @property + def local_logs(self): + return self._local_logs + + @local_logs.setter + def local_logs(self, value): + self._local_logs = value + self._remote_logs = value + + @property + def remote_logs(self): + return self._remote_logs + + @remote_logs.setter + def remote_logs(self, value): + self._remote_logs = value + def log_job(self): """ Prints job information in log @@ -333,10 +367,10 @@ class Job: :return: list of values in column index position :rtype: list[datetime.datetime] """ - logname = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') + log_name = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') lst = [] - if os.path.exists(logname): - f = open(logname) + if os.path.exists(log_name): + f = open(log_name) lines = f.readlines() for line in lines: fields = line.split() @@ -401,7 +435,7 @@ class Job: if new_status == Status.COMPLETED: Log.debug("This job seems to have completed...checking") - self.get_platform().get_completed_files(self.name) + self.platform.get_completed_files(self.name) self.check_completion() else: self.status = new_status @@ -415,7 +449,7 @@ class Job: Log.user_warning("Job {0} is FAILED", self.name) elif self.status is Status.UNKNOWN: Log.debug("Job {0} in UNKNOWN status. Checking completed files", self.name) - self.get_platform().get_completed_files(self.name) + self.platform.get_completed_files(self.name) self.check_completion(Status.UNKNOWN) if self.status is Status.UNKNOWN: Log.warning('Job {0} in UNKNOWN status', self.name) @@ -430,8 +464,10 @@ class Job: self.write_start_time() if self.status in [Status.COMPLETED, Status.FAILED, Status.UNKNOWN]: self.write_end_time(self.status == Status.COMPLETED) + if self.local_logs != self.remote_logs: + self.synchronize_logs() # unifying names for log files if copy_remote_logs: - self.get_platform().get_logs_files(self.expid, self.out_filename, self.err_filename) + self.platform.get_logs_files(self.expid, self.remote_logs) return self.status def check_completion(self, default_status=Status.FAILED): @@ -441,8 +477,8 @@ class Job: :param default_status: status to set if job is not completed. By default is FAILED :type default_status: Status """ - logname = os.path.join(self._tmp_path, self.name + '_COMPLETED') - if os.path.exists(logname): + log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') + if os.path.exists(log_name): self.status = Status.COMPLETED else: Log.warning("Job {0} seemed to be completed but there is no COMPLETED file", self.name) @@ -516,7 +552,7 @@ class Job: else: parameters['Chunk_LAST'] = 'FALSE' - job_platform = self.get_platform() + job_platform = self.platform self.processors = as_conf.get_processors(self.section) self.threads = as_conf.get_threads(self.section) self.tasks = as_conf.get_tasks(self.section) @@ -539,7 +575,7 @@ class Job: parameters['CURRENT_ARCH'] = job_platform.name parameters['CURRENT_HOST'] = job_platform.host - parameters['CURRENT_QUEUE'] = self.get_queue() + parameters['CURRENT_QUEUE'] = self.queue parameters['CURRENT_USER'] = job_platform.user parameters['CURRENT_PROJ'] = job_platform.project parameters['CURRENT_BUDG'] = job_platform.budget @@ -610,7 +646,7 @@ class Job: snippet.as_tailer()]) def _get_paramiko_template(self, snippet, template): - current_platform = self.get_platform() + current_platform = self.platform return ''.join([snippet.as_header(current_platform.get_header(self)), template, snippet.as_tailer()]) @@ -684,7 +720,7 @@ class Job: :return: True if succesful, False otherwise :rtype: bool """ - if self.get_platform().get_stat_file(self.name, retries=5): + if self.platform.get_stat_file(self.name, retries=5): start_time = self.check_start_time() else: Log.warning('Could not get start time for {0}. Using current time as an aproximation', self.name) @@ -703,7 +739,7 @@ class Job: :param completed: True if job was completed succesfuly, False otherwise :type completed: bool """ - self.get_platform().get_stat_file(self.name, retries=5) + self.platform.get_stat_file(self.name, retries=5) end_time = self.check_end_time() path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') @@ -777,3 +813,8 @@ class Job: if self.is_ancestor(parent): parent.children.remove(self) self.parents.remove(parent) + + def synchronize_logs(self): + self.platform.move_file(self.remote_logs[0], self.local_logs[0]) # .out + self.platform.move_file(self.remote_logs[1], self.local_logs[1]) # .err + self.remote_logs = self.local_logs diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index dd4b7e083c6366ee455a4f4c15d04d067f835b1b..48515eac74fb3787528866053b4cbd477679dc83 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -32,6 +32,8 @@ from shutil import move from autosubmit.job.job_common import Status, Type from autosubmit.job.job import Job +from autosubmit.job.job_package import JobPackageSimple +from autosubmit.job.job_package import JobPackageArray from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import date2str, parse_date @@ -295,7 +297,7 @@ class JobList: :return: completed jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.get_platform() is platform) and + return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.COMPLETED] def get_submitted(self, platform=None): @@ -307,7 +309,7 @@ class JobList: :return: submitted jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.get_platform() is platform) and + return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.SUBMITTED] def get_running(self, platform=None): @@ -319,7 +321,7 @@ class JobList: :return: running jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.get_platform() is platform) and + return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.RUNNING] def get_queuing(self, platform=None): @@ -331,7 +333,7 @@ class JobList: :return: queuedjobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.get_platform() is platform) and + return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.QUEUING] def get_failed(self, platform=None): @@ -343,7 +345,7 @@ class JobList: :return: failed jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.get_platform() is platform) and + return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.FAILED] def get_ready(self, platform=None): @@ -355,7 +357,7 @@ class JobList: :return: ready jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.get_platform() is platform) and + return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.READY] def get_waiting(self, platform=None): @@ -367,7 +369,7 @@ class JobList: :return: waiting jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.get_platform() is platform) and + return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.WAITING] def get_unknown(self, platform=None): @@ -379,7 +381,7 @@ class JobList: :return: unknown state jobs :rtype: list """ - return [job for job in self._job_list if (platform is None or job.get_platform() is platform) and + return [job for job in self._job_list if (platform is None or job.platform is platform) and job.status == Status.UNKNOWN] def get_in_queue(self, platform=None): @@ -744,6 +746,54 @@ class JobList: self.update_genealogy() del self._dic_jobs + def get_ready_packages(self, platform): + # Check there are ready jobs + jobs_available = self.get_ready(platform) + if len(jobs_available) == 0: + return list() + Log.info("\nJobs ready for {1}: {0}", len(jobs_available), platform.name) + # Checking available submission slots + max_waiting_jobs = platform.max_waiting_jobs + waiting_jobs = len(self.get_submitted(platform) + self.get_queuing(platform)) + max_wait_jobs_to_submit = max_waiting_jobs - waiting_jobs + max_jobs_to_submit = platform.total_jobs - len(self.get_in_queue(platform)) + # Logging obtained data + Log.debug("Number of jobs ready: {0}", len(jobs_available)) + Log.debug("Number of jobs available: {0}", max_wait_jobs_to_submit) + Log.info("Jobs to submit: {0}", min(max_wait_jobs_to_submit, len(jobs_available))) + # If can submit jobs + if max_wait_jobs_to_submit > 0 and max_jobs_to_submit > 0: + available_sorted = sorted(jobs_available, key=lambda k: k.long_name.split('_')[1][:6]) + list_of_available = sorted(available_sorted, key=lambda k: k.priority, reverse=True) + num_jobs_to_submit = min(max_wait_jobs_to_submit, len(jobs_available), max_jobs_to_submit) + jobs_to_submit = list_of_available[0:num_jobs_to_submit] + jobs_to_submit_by_section = self.divide_list_by_section(jobs_to_submit) + packages_to_submit = list() + if platform.allow_arrays: + for section_list in jobs_to_submit_by_section.values(): + packages_to_submit.append(JobPackageArray(section_list)) + return packages_to_submit + for job in jobs_to_submit: + packages_to_submit.append(JobPackageSimple([job])) + return packages_to_submit + return list() # no packages to submit + + @staticmethod + def divide_list_by_section(jobs_list): + """ + Returns a dict() with as many keys as 'jobs_list' different sections. + The value for each key is a list() with all the jobs with the key section. + + :param jobs_list: list of jobs to be divided + :rtype: dict + """ + by_section = dict() + for job in jobs_list: + if job.section not in by_section: + by_section[job.section] = list() + by_section[job.section].append(job) + return by_section + class DicJobs: """ @@ -986,6 +1036,8 @@ class DicJobs: name += "_" + section if name in jobs_data: job = Job(name, jobs_data[name][1], jobs_data[name][2], priority) + job.local_logs = (jobs_data[name][8], jobs_data[name][9]) + job.remote_logs = (jobs_data[name][10], jobs_data[name][11]) else: job = Job(name, 0, Status.WAITING, priority) job.section = section @@ -1010,7 +1062,7 @@ class DicJobs: if job.platform_name is not None: job.platform_name = job.platform_name job.file = self.get_option(section, "FILE", None) - job.set_queue(self.get_option(section, "QUEUE", None)) + job.queue = self.get_option(section, "QUEUE", None) if self.get_option(section, "CHECK", 'True').lower() == 'true': job.check = True else: diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index a718325f6ec4329b2c392c4a42a562793ff0366f..23008451c67b58dd72b5a9fe563f26b1aa30319f 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -86,8 +86,11 @@ class JobListPersistencePkl(JobListPersistence): fd = open(path, 'w') setrecursionlimit(50000) Log.debug("Saving JobList: " + path) - jobs_data = [(job.name, job.id, job.status, job.priority, job.section, job.date, job.member, job.chunk) for job - in job_list] + jobs_data = [(job.name, job.id, job.status, + job.priority, job.section, job.date, + job.member, job.chunk, + job.local_logs[0], job.local_logs[1], + job.remote_logs[0], job.remote_logs[1]) for job in job_list] pickle.dump(jobs_data, fd) Log.debug('Job list saved') @@ -98,9 +101,12 @@ class JobListPersistenceDb(JobListPersistence): """ - VERSION = 1 + VERSION = 3 JOB_LIST_TABLE = 'job_list' - TABLE_FIELDS = ['name', 'id', 'status', 'priority', 'section', 'date', 'member', 'chunk'] + TABLE_FIELDS = ['name', 'id', 'status', 'priority', + 'section', 'date', 'member', 'chunk', + 'local_out', 'local_err' + 'remote_out', 'remote_err'] def __init__(self, persistence_path, persistence_file): self.db_manager = DbManager(persistence_path, persistence_file, self.VERSION) @@ -123,8 +129,11 @@ class JobListPersistenceDb(JobListPersistence): """ self._reset_table() - jobs_data = [(job.name, job.id, job.status, job.priority, job.section, job.date, job.member, job.chunk) for job - in job_list] + jobs_data = [(job.name, job.id, job.status, + job.priority, job.section, job.date, + job.member, job.chunk, + job.local_logs[0], job.local_logs[1], + job.remote_logs[0], job.remote_logs[1]) for job in job_list] self.db_manager.insertMany(self.JOB_LIST_TABLE, jobs_data) def _reset_table(self): diff --git a/autosubmit/job/job_package.py b/autosubmit/job/job_package.py new file mode 100644 index 0000000000000000000000000000000000000000..58c7d84a9e917bcc9c9d4d52cd42dccebc327ab4 --- /dev/null +++ b/autosubmit/job/job_package.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python + +# Copyright 2016 Earth Sciences Department, BSC-CNS + +# This file is part of Autosubmit. + +# Autosubmit is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# Autosubmit is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . +try: + # noinspection PyCompatibility + from configparser import SafeConfigParser +except ImportError: + # noinspection PyCompatibility + from ConfigParser import SafeConfigParser + +import time +import os +from autosubmit.job.job_common import Status +from autosubmit.config.log import Log + + +class JobPackageBase(object): + """ + Class to manage the package of jobs to be submitted by autosubmit + """ + + def __init__(self, jobs): + self._jobs = jobs + try: + self._tmp_path = jobs[0]._tmp_path + self._platform = jobs[0].platform + for job in jobs: + if job.platform != self._platform or job.platform is None: + raise Exception('Only one valid platform per package') + except IndexError: + raise Exception('No jobs given') + + def __len__(self): + return self._jobs.__len__() + + @property + def jobs(self): + """ + Returns the jobs + + :return: jobs + :rtype: List[Job] + """ + return self._jobs + + @property + def platform(self): + """ + Returns the platform + + :return: platform + :rtype: Platform + """ + return self._platform + + def submit(self, configuration, parameters): + for job in self.jobs: + job.update_parameters(configuration, parameters) + self._create_scripts(configuration) + self._send_files() + self._do_submission() + + def _create_scripts(self, configuration): + raise Exception('Not implemented') + + def _send_files(self): + raise Exception('Not implemented') + + def _do_submission(self): + raise Exception('Not implemented') + + +class JobPackageSimple(JobPackageBase): + """ + Class to manage the package of jobs to be submitted by autosubmit + """ + + def __init__(self, jobs): + self._job_scripts = {} + super(JobPackageSimple, self).__init__(jobs) + + def _create_scripts(self, configuration): + for job in self.jobs: + self._job_scripts[job.name] = job.create_script(configuration) + + def _send_files(self): + for job in self.jobs: + self.platform.send_file(self._job_scripts[job.name]) + + def _do_submission(self): + for job in self.jobs: + self.platform.remove_stat_file(job.name) + self.platform.remove_completed_file(job.name) + job.id = self.platform.submit_job(job, self._job_scripts[job.name]) + if job.id is None: + continue + Log.info("{0} submitted", job.name) + job.status = Status.SUBMITTED + job.write_submit_time() + + +class JobPackageArray(JobPackageBase): + """ + Class to manage the package of jobs to be submitted by autosubmit + """ + + def __init__(self, jobs): + self._job_inputs = {} + self._job_scripts = {} + self._common_script = None + self._array_size_id = "[1-" + str(len(jobs)) + "]" + self._wallclock = '00:00' + self._num_processors = 1 + for job in jobs: + if job.wallclock > self._wallclock: + self._wallclock = job.wallclock + if job.processors > self._num_processors: + self._num_processors = job.processors + super(JobPackageArray, self).__init__(jobs) + + def _create_scripts(self, configuration): + timestamp = str(int(time.time())) + for i in range(1, len(self.jobs) + 1): + self._job_scripts[self.jobs[i - 1].name] = self.jobs[i - 1].create_script(configuration) + self._job_inputs[self.jobs[i - 1].name] = self._create_i_input(timestamp, i) + self.jobs[i - 1].remote_logs = (timestamp + ".{0}.out".format(i), timestamp + ".{0}.err".format(i)) + self._common_script = self._create_common_script(timestamp) + + def _create_i_input(self, filename, index): + filename += '.{0}'.format(index) + input_content = self._job_scripts[self.jobs[index - 1].name] + open(os.path.join(self._tmp_path, filename), 'w').write(input_content) + os.chmod(os.path.join(self._tmp_path, filename), 0o775) + return filename + + def _create_common_script(self, filename): + script_content = self.platform.header.array_header(filename, self._array_size_id, self._wallclock, + self._num_processors) + filename += '.cmd' + open(os.path.join(self._tmp_path, filename), 'w').write(script_content) + os.chmod(os.path.join(self._tmp_path, filename), 0o775) + return filename + + def _send_files(self): + for job in self.jobs: + self.platform.send_file(self._job_scripts[job.name]) + self.platform.send_file(self._job_inputs[job.name]) + self.platform.send_file(self._common_script) + + def _do_submission(self): + for job in self.jobs: + self.platform.remove_stat_file(job.name) + self.platform.remove_completed_file(job.name) + + package_id = self.platform.submit_job(None, self._common_script) + + if package_id is None: + raise Exception('Submission failed') + + for i in range(1, len(self.jobs) + 1): + Log.info("{0} submitted", self.jobs[i - 1].name) + self.jobs[i - 1].id = str(package_id) + '[{0}]'.format(i) + self.jobs[i - 1].status = Status.SUBMITTED + self.jobs[i - 1].write_submit_time() diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index ff69d2bf2d66a9342666f7ea530a3d3d2c2d1cee..b4bca241253a376c2313bfd1f0c2e57802259a2f 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -139,17 +139,15 @@ class LocalPlatform(ParamikoPlatform): def get_ssh_output(self): return self._ssh_output - def get_logs_files(self, exp_id, job_out_filename, job_err_filename): + def get_logs_files(self, exp_id, remote_logs): """ Overriding the parent's implementation. Do nothing because the log files are already in the local platform (redundancy). :param exp_id: experiment id :type exp_id: str - :param job_out_filename: name of the out file - :type job_out_filename: str - :param job_err_filename: name of the err file - :type job_err_filename: str + :param remote_logs: names of the log files + :type remote_logs: (str, str) """ return diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index aed518c08a831322f7f435b23b5364da305e282f..a3751cd922ec543026700987dae7d2afea951754 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -37,6 +37,7 @@ class LsfPlatform(ParamikoPlatform): self.job_status['RUNNING'] = ['RUN'] self.job_status['QUEUING'] = ['PEND', 'FW_PEND'] self.job_status['FAILED'] = ['SSUSP', 'USUSP', 'EXIT'] + self._allow_arrays = True self.update_cmds() def update_cmds(self): @@ -119,11 +120,32 @@ class LsfHeader: # noinspection PyMethodMayBeStatic def get_exclusivity(self, job): - if job.get_platform().exclusivity == 'true': + if job.platform.exclusivity == 'true': return "#BSUB -x" else: return "" + @classmethod + def array_header(cls, filename, array_id, wallclock, num_processors): + return textwrap.dedent("""\ + ############################################################################### + # {0} + ############################################################################### + # + # + #BSUB -J {0}{1} + #BSUB -oo {0}.%I.out + #BSUB -eo {0}.%I.err + #BSUB -W {2} + #BSUB -n {3} + # + ############################################################################### + + SCRIPT=$(cat {0}.$LSB_JOBINDEX | awk 'NR==1') + chmod +x $SCRIPT + ./$SCRIPT + """.format(filename, array_id, wallclock, num_processors)) + SERIAL = textwrap.dedent("""\ ############################################################################### # %TASKTYPE% %EXPID% EXPERIMENT diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index d22f9c1b58f8f43ce82bf4a332958292b5582e00..6f7f86cb97ad52f2bdd1af748089cc45a6561148 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -136,7 +136,7 @@ class ParamikoPlatform(Platform): :param filename: file name :type filename: str - :return: True if succesful or file does no exists + :return: True if successful or file does no exists :rtype: bool """ if self._ssh is None: @@ -152,6 +152,28 @@ class ParamikoPlatform(Platform): Log.debug('Could not remove file {0}'.format(os.path.join(self.get_files_path(), filename))) return False + def move_file(self, src, dest): + """ + Moves a file on the platform + :param src: source name + :type src: str + :param dest: destination name + :type dest: str + """ + if self._ssh is None: + if not self.connect(): + return None + + try: + ftp = self._ssh.open_sftp() + ftp.rename(os.path.join(self.get_files_path(), src), os.path.join(self.get_files_path(), dest)) + ftp.close() + return True + except BaseException: + Log.debug('Could not move (rename) file {0} to {1}'.format(os.path.join(self.get_files_path(), src), + os.path.join(self.get_files_path(), dest))) + return False + def submit_job(self, job, script_name): """ Submit a job from a given job object. @@ -184,9 +206,9 @@ class ParamikoPlatform(Platform): """ job_status = Status.UNKNOWN - if type(job_id) is not int: + if type(job_id) is not int and type(job_id) is not str: # URi: logger - Log.error('check_job() The job id ({0}) is not an integer.', job_id) + Log.error('check_job() The job id ({0}) is not an integer neither a string.', job_id) # URi: value ? return job_status @@ -318,8 +340,8 @@ class ParamikoPlatform(Platform): executable = 'Rscript' return 'nohup ' + executable + ' {0} > {1} 2> {2} & echo $!'.format( os.path.join(self.remote_log_dir, job_script), - os.path.join(self.remote_log_dir, job.out_filename), - os.path.join(self.remote_log_dir, job.err_filename) + os.path.join(self.remote_log_dir, job.remote_logs[0]), + os.path.join(self.remote_log_dir, job.remote_logs[1]) ) @staticmethod @@ -359,10 +381,11 @@ class ParamikoPlatform(Platform): header = self.header.SERIAL str_datetime = date2str(datetime.datetime.now(), 'S') - job.out_filename = "{0}.{1}.out".format(job.name, str_datetime) - job.err_filename = "{0}.{1}.err".format(job.name, str_datetime) - header = header.replace('%OUT_LOG_DIRECTIVE%', job.out_filename) - header = header.replace('%ERR_LOG_DIRECTIVE%', job.err_filename) + out_filename = "{0}.{1}.out".format(job.name, str_datetime) + err_filename = "{0}.{1}.err".format(job.name, str_datetime) + job.local_logs = (out_filename, err_filename) + header = header.replace('%OUT_LOG_DIRECTIVE%', out_filename) + header = header.replace('%ERR_LOG_DIRECTIVE%', err_filename) if hasattr(self.header, 'get_queue_directive'): header = header.replace('%QUEUE_DIRECTIVE%', self.header.get_queue_directive(job)) diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 9ea31f8ceb210d21ac2e2db60df9f6bda34e76af..ea3290cc90d4c345a06c5ea2aff077e833025645 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -6,7 +6,7 @@ from autosubmit.config.log import Log from autosubmit.job.job_common import Status -class Platform: +class Platform(object): """ Class to manage the connections to the different platforms. """ @@ -24,6 +24,7 @@ class Platform: self.tmp_path = os.path.join(self.config.LOCAL_ROOT_DIR, self.expid, self.config.LOCAL_TMP_DIR) self._serial_platform = None self._serial_queue = None + self._default_queue = None self.processors_per_node = None self.scratch_free_space = None self.host = '' @@ -38,6 +39,7 @@ class Platform: self.service = None self.scheduler = None self.directory = None + self._allow_arrays = False @property def serial_platform(self): @@ -84,6 +86,10 @@ class Platform: def serial_queue(self, value): self._serial_queue = value + @property + def allow_arrays(self): + return self._allow_arrays is True + def add_parameters(self, parameters, main_hpc=False): """ Add parameters for the current platform to the given parameters list @@ -120,6 +126,16 @@ class Platform: """ raise NotImplementedError + def move_file(self, src, dest): + """ + Moves a file on the platform + :param src: source name + :type src: str + :param dest: destination name + :type dest: str + """ + raise NotImplementedError + def get_file(self, filename, must_exist=True, relative_path=''): """ Copies a file from the current platform to experiment's tmp folder @@ -162,17 +178,16 @@ class Platform: """ raise NotImplementedError - def get_logs_files(self, exp_id, job_out_filename, job_err_filename): + def get_logs_files(self, exp_id, remote_logs): """ Get the given LOGS files - + :param exp_id: experiment id :type exp_id: str - :param job_out_filename: name of the out file - :type job_out_filename: str - :param job_err_filename: name of the err file - :type job_err_filename: str + :param remote_logs: names of the log files + :type remote_logs: (str, str) """ + (job_out_filename, job_err_filename) = remote_logs self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) def get_completed_files(self, job_name, retries=5): diff --git a/autosubmit/platforms/saga_platform.py b/autosubmit/platforms/saga_platform.py index 85b20b4d4b965aa842388c0b4005fbcbfd64322b..eeec8f6ec924bcef2ba3cf2871c3bbf23a478c85 100644 --- a/autosubmit/platforms/saga_platform.py +++ b/autosubmit/platforms/saga_platform.py @@ -230,10 +230,11 @@ class SagaPlatform(Platform): jd.working_directory = self.get_files_path() str_datetime = date2str(datetime.datetime.now(), 'S') - job.out_filename = "{0}.{1}.out".format(job.name, str_datetime) - job.err_filename = "{0}.{1}.err".format(job.name, str_datetime) - jd.output = job.out_filename - jd.error = job.err_filename + out_filename = "{0}.{1}.out".format(job.name, str_datetime) + err_filename = "{0}.{1}.err".format(job.name, str_datetime) + job.local_logs = (out_filename, err_filename) + jd.output = out_filename + jd.error = err_filename self.add_attribute(jd, 'Name', job.name) diff --git a/docs/source/conf.py b/docs/source/conf.py index 99b48afd8dc33be21fe3f072812a1faaa8edcb58..4d08fc1e4d3937f7e4000988268490524f760529 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -62,9 +62,9 @@ author = u'Earth Science Department, Barcelona Supercomputing Center, BSC' # built documents. # # The short X.Y version. -version = '3.7' +version = '3.8' # The full version, including alpha/beta/rc tags. -release = '3.7.6' +release = '3.8.0' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/test/regression/test_mn_with_paramiko/conf/jobs.conf b/test/regression/test_mn_with_paramiko/conf/jobs.conf index 0f96a82930c74cf8bc7ed0d906a1bfd957ac47ba..d9d2a14ef83e5660b1b9501568f51bff03ee45de 100644 --- a/test/regression/test_mn_with_paramiko/conf/jobs.conf +++ b/test/regression/test_mn_with_paramiko/conf/jobs.conf @@ -26,4 +26,5 @@ RUNNING = chunk WALLCLOCK = 00:10 TASKS = 16 PROCESSORS = 32 -SCRATCH_FREE_SPACE = 15 \ No newline at end of file +SCRATCH_FREE_SPACE = 15 +DEPENDENCIES = WITH_FREE_SCRATCH-1 \ No newline at end of file diff --git a/test/regression/tests_commands.py b/test/regression/tests_commands.py index 36c427be70e728d7db5d47d8873ea07d28152502..e79611d54dd76ca476e27ea9f1dc434c774cb750 100644 --- a/test/regression/tests_commands.py +++ b/test/regression/tests_commands.py @@ -6,11 +6,31 @@ def generate_experiment_cmd(hpc, description): def create_experiment_cmd(experiment_id): - return 'autosubmit -lf EVERYTHING -lc EVERYTHING create ' + experiment_id + ' -np' + return 'autosubmit -lf EVERYTHING -lc EVERYTHING create {0} --hide'.format(experiment_id) def run_experiment_cmd(experiment_id): - return 'autosubmit -lf EVERYTHING -lc EVERYTHING run ' + experiment_id + return 'autosubmit -lf EVERYTHING -lc EVERYTHING run {0}'.format(experiment_id) + + +def monitor_experiment_cmd(experiment_id): + return 'autosubmit -lf EVERYTHING -lc EVERYTHING monitor {0} --hide'.format(experiment_id) + + +def refresh_experiment_cmd(experiment_id): + return 'autosubmit -lf EVERYTHING -lc EVERYTHING refresh {0}'.format(experiment_id) + + +def recovery_experiment_cmd(experiment_id): + return 'autosubmit -lf EVERYTHING -lc EVERYTHING recovery {0} -all --hide -s'.format(experiment_id) + + +def check_experiment_cmd(experiment_id): + return 'autosubmit -lf EVERYTHING -lc EVERYTHING check {0}'.format(experiment_id) + + +def stats_experiment_cmd(experiment_id): + return 'autosubmit -lf EVERYTHING -lc EVERYTHING stats {0} --hide'.format(experiment_id) def create_database_cmd(): diff --git a/test/regression/tests_runner.py b/test/regression/tests_runner.py index 4602192daa004cb5de178b625b9c76e081933a6f..acdc3c0abde3584ecae698028036b9efbf8c08a6 100644 --- a/test/regression/tests_runner.py +++ b/test/regression/tests_runner.py @@ -34,10 +34,34 @@ def run_test_case(experiment_id, name, hpc_arch, description, src_path, retrials Log.warning('Error while creating the experiment {0}({1})', name, experiment_id) continue + if not check_cmd(check_experiment_cmd(experiment_id)): + Log.warning('Error while checking the experiment {0}({1})', name, experiment_id) + continue + + if not check_cmd(monitor_experiment_cmd(experiment_id)): + Log.warning('Error while monitoring the experiment {0}({1})', name, experiment_id) + continue + + if not check_cmd(refresh_experiment_cmd(experiment_id)): + Log.warning('Error while refreshing the experiment {0}({1})', name, experiment_id) + continue + if not check_cmd(run_experiment_cmd(experiment_id)): Log.warning('Error while running the experiment {0}({1})', name, experiment_id) continue + if not check_cmd(monitor_experiment_cmd(experiment_id)): + Log.warning('Error while monitoring the experiment {0}({1})', name, experiment_id) + continue + + if not check_cmd(stats_experiment_cmd(experiment_id)): + Log.warning('Error while getting stats of the experiment {0}({1})', name, experiment_id) + continue + + if not check_cmd(recovery_experiment_cmd(experiment_id)): + Log.warning('Error while recovering the experiment {0}({1})', name, experiment_id) + continue + run_ok = True break if run_ok: diff --git a/test/run_unit_suite.py b/test/run_unit_suite.py new file mode 100644 index 0000000000000000000000000000000000000000..019e593d1f0d988d7d285e7bbbe6380cb0f80857 --- /dev/null +++ b/test/run_unit_suite.py @@ -0,0 +1,2 @@ +import os +os.system("nosetests --with-coverage --cover-erase --cover-package=autosubmit --cover-html test/unit") diff --git a/test/unit/test_basic_ config.py b/test/unit/test_basic_config.py similarity index 100% rename from test/unit/test_basic_ config.py rename to test/unit/test_basic_config.py diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index a9b749c2c8128e615486fb01f73c468c3bbadc77..7321a273d7574dab542cbcc91e7f4d65dc53d1a5 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -318,7 +318,7 @@ class TestDicJobs(TestCase): self.assertEquals(Type.BASH, created_job.type) self.assertEquals(platform_name, created_job.platform_name) self.assertEquals(filename, created_job.file) - self.assertEquals(queue, created_job._queue) + self.assertEquals(queue, created_job.queue) self.assertTrue(created_job.check) self.assertEquals(processors, created_job.processors) self.assertEquals(threads, created_job.threads) diff --git a/test/unit/test_job.py b/test/unit/test_job.py index a20b1b0c391c4522483a9725ac2a76cfe6731e49..25c0aa6f407c1884b2fcee7eb54c0cc914c9bcd8 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -25,6 +25,7 @@ class TestJob(TestCase): self.job_priority = 0 self.job = Job(self.job_name, self.job_id, Status.WAITING, self.job_priority) + self.job.processors = 2 def test_when_the_job_has_more_than_one_processor_returns_the_parallel_platform(self): platform = Platform(self.experiment_id, 'parallel-platform', FakeBasicConfig) @@ -33,7 +34,7 @@ class TestJob(TestCase): self.job._platform = platform self.job.processors = 999 - returned_platform = self.job.get_platform() + returned_platform = self.job.platform self.assertEquals(platform, returned_platform) @@ -44,7 +45,7 @@ class TestJob(TestCase): self.job._platform = platform self.job.processors = 1 - returned_platform = self.job.get_platform() + returned_platform = self.job.platform self.assertEquals('serial-platform', returned_platform) @@ -52,15 +53,15 @@ class TestJob(TestCase): dummy_platform = Platform('whatever', 'rand-name', FakeBasicConfig) self.assertNotEquals(dummy_platform, self.job._platform) - self.job.set_platform(dummy_platform) + self.job.platform = dummy_platform - self.assertEquals(dummy_platform, self.job._platform) + self.assertEquals(dummy_platform, self.job.platform) def test_when_the_job_has_a_queue_returns_that_queue(self): dummy_queue = 'whatever' self.job._queue = dummy_queue - returned_queue = self.job.get_queue() + returned_queue = self.job.queue self.assertEquals(dummy_queue, returned_queue) @@ -68,11 +69,11 @@ class TestJob(TestCase): dummy_queue = 'whatever-parallel' dummy_platform = Platform('whatever', 'rand-name', FakeBasicConfig) dummy_platform.queue = dummy_queue - self.job.set_platform(dummy_platform) + self.job.platform = dummy_platform self.assertIsNone(self.job._queue) - returned_queue = self.job.get_queue() + returned_queue = self.job.queue self.assertIsNotNone(returned_queue) self.assertEquals(dummy_queue, returned_queue) @@ -88,12 +89,12 @@ class TestJob(TestCase): dummy_platform.serial_platform = dummy_serial_platform dummy_platform.queue = parallel_queue - self.job.set_platform(dummy_platform) + self.job.platform = dummy_platform self.job.processors = 1 self.assertIsNone(self.job._queue) - returned_queue = self.job.get_queue() + returned_queue = self.job.queue self.assertIsNotNone(returned_queue) self.assertEquals(serial_queue, returned_queue) @@ -103,9 +104,9 @@ class TestJob(TestCase): dummy_queue = 'whatever' self.assertNotEquals(dummy_queue, self.job._queue) - self.job.set_queue(dummy_queue) + self.job.queue = dummy_queue - self.assertEquals(dummy_queue, self.job._queue) + self.assertEquals(dummy_queue, self.job.queue) def test_that_the_increment_fails_count_only_adds_one(self): initial_fail_count = self.job.fail_count diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py new file mode 100644 index 0000000000000000000000000000000000000000..6e530ae3109f1a96c689447fecdeb98d53c5f92a --- /dev/null +++ b/test/unit/test_job_package.py @@ -0,0 +1,60 @@ +from unittest import TestCase + +import os +from mock import Mock +from mock import patch + +from autosubmit.job.job_package import JobPackageSimple +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status + + +class TestJobPackage(TestCase): + + def setUp(self): + self.platform = Mock() + self.jobs = [Job('dummy1', 0, Status.READY, 0), + Job('dummy2', 0, Status.READY, 0)] + self.jobs[0].platform = self.jobs[1].platform = self.platform + self.job_package = JobPackageSimple(self.jobs) + + def test_job_package_default_init(self): + with self.assertRaises(Exception): + JobPackageSimple([]) + + def test_job_package_different_platforms_init(self): + self.jobs[0].platform = Mock() + self.jobs[1].platform = Mock() + with self.assertRaises(Exception): + JobPackageSimple(this.jobs) + + def test_job_package_none_platforms_init(self): + self.jobs[0].platform = None + self.jobs[1].platform = None + with self.assertRaises(Exception): + JobPackageSimple(this.jobs) + + def test_job_package_length(self): + self.assertEquals(2, len(self.job_package)) + + def test_job_package_jobs_getter(self): + self.assertEquals(self.jobs, self.job_package.jobs) + + def test_job_package_platform_getter(self): + self.assertEquals(self.platform.serial_platform, self.job_package.platform) + + def test_job_package_submission(self): + # arrange + self.job_package._create_scripts = Mock() + self.job_package._send_files = Mock() + self.job_package._do_submission = Mock() + for job in self.jobs: + job.update_parameters = Mock() + # act + self.job_package.submit('fake-config', 'fake-params') + # assert + for job in self.jobs: + job.update_parameters.assert_called_once_with('fake-config', 'fake-params') + self.job_package._create_scripts.is_called_once_with() + self.job_package._send_files.is_called_once_with() + self.job_package._do_submission.is_called_once_with()