diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8f22a65c486b5c213e87a989120ce20ed54f4f43..a8ecceedf98e099d25d0ebe9c982472d0fe9ca80 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -627,7 +627,7 @@ class Autosubmit: "User or owner does not exists", 7012, e.message) @staticmethod - def _delete_expid(expid_delete, force): + def _delete_expid(expid_delete, force=False): """ Removes an experiment from path and database If current user is eadmin and -f has been sent, it deletes regardless @@ -1134,6 +1134,7 @@ class Autosubmit: :return: Nothing\n :rtype: \n """ + job_list._job_list = jobs_filtered # Current choice is Paramiko Submitter submitter = Autosubmit._get_submitter(as_conf) @@ -1526,8 +1527,7 @@ class Autosubmit: save = True if platform.type == "slurm" and list_jobid != "": - slurm.append( - [platform, list_jobid, list_prevStatus, completed_joblist]) + slurm.append([platform, list_jobid, list_prevStatus, completed_joblist]) # END Normal jobs + wrappers # CHECK ALL JOBS at once if they're from slurm ( wrappers non contempled) for platform_jobs in slurm: @@ -1550,7 +1550,7 @@ class Autosubmit: as_conf.get_mails_to()) save = True # End Check Current jobs - save2 = job_list.update_list(as_conf) + save2 = job_list.update_list(as_conf,submitter=submitter) if save or save2: job_list.save() if len(job_list.get_ready()) > 0: @@ -1559,7 +1559,7 @@ class Autosubmit: if as_conf.get_remote_dependencies() and len(job_list.get_prepared()) > 0: Autosubmit.submit_ready_jobs( as_conf, job_list, platforms_to_test, packages_persistence, hold=True) - save = job_list.update_list(as_conf) + save = job_list.update_list(as_conf,submitter=submitter) if save: job_list.save() # Safe spot to store changes @@ -1615,6 +1615,8 @@ class Autosubmit: try: Autosubmit.restore_platforms(platforms_to_test) platforms_to_test = set() + Autosubmit.restore_platforms(platforms_to_test) + for job in job_list.get_job_list(): if job.platform_name is None: job.platform_name = hpcarch @@ -1703,14 +1705,13 @@ class Autosubmit: save = False for platform in platforms_to_test: if not hold: - Log.debug("\nJobs ready for {1}: {0}", len( - job_list.get_ready(platform, hold=hold)), platform.name) + Log.debug("\nJobs ready for {1}: {0}", len(job_list.get_ready(platform, hold=hold)), platform.name) + ready_jobs = job_list.get_ready(platform, hold=hold) else: Log.debug("\nJobs prepared for {1}: {0}", len( job_list.get_prepared(platform)), platform.name) - packages_to_submit = JobPackager( - as_conf, platform, job_list, hold=hold).build_packages() + packages_to_submit = JobPackager(as_conf, platform, job_list, hold=hold).build_packages() if not inspect: platform.open_submit_script() @@ -1736,8 +1737,7 @@ class Autosubmit: # If called from RUN or inspect command if not only_wrappers: try: - package.submit( - as_conf, job_list.parameters, inspect, hold=hold) + package.submit(as_conf, job_list.parameters, inspect, hold=hold) valid_packages_to_submit.append(package) except (IOError, OSError): continue @@ -1816,6 +1816,7 @@ class Autosubmit: job.hold = hold job.id = str(jobs_id[i]) job.status = Status.SUBMITTED + job.write_submit_time() if hasattr(package, "name"): job_list.packages_dict[package.name] = package.jobs from job.job import WrapperJob @@ -3238,7 +3239,9 @@ class Autosubmit: expand_list=expand, expanded_status=status) groups_dict = job_grouping.group_jobs() # WRAPPERS + if as_conf.get_wrapper_type() != 'none' and check_wrappers: + as_conf.check_conf_files(True) packages_persistence = JobPackagePersistence( os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) packages_persistence.reset_table(True) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d4c96491067e60d5bac6e21737e3ca4fd0ae91d2..159cc43a0f0f590333c6d1cd5c7281da33a3955d 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -128,7 +128,7 @@ class Job(object): self.packed = False self.hold = False self.distance_weight = 0 - + self.level = 0 def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -520,71 +520,61 @@ class Job(object): platforms_to_test = set() if self.platform_name is None: self.platform_name = hpcarch - self.platform = submitter.platforms[self.platform_name.lower()] + self._platform = submitter.platforms[self.platform_name.lower()] # serial try: - self.platform.restore_connection() + self._platform.restore_connection() except Exception as e: Log.printlog( "{0} \n Couldn't connect to the remote platform for this {1} job err/out files. ".format(e.message, self.name), 6001) out_exist = False err_exist = False - retries = 3 + retries = 5 sleeptime = 0 i = 0 - sleep(20) + sleep(10) + no_continue = False try: while (not out_exist and not err_exist) and i < retries: try: - try: - out_exist = self.platform.check_file_exists( - remote_logs[0]) # will do 5 retries - except IOError as e: - out_exist = False - try: - err_exist = self.platform.check_file_exists( - remote_logs[1]) # will do 5 retries - except IOError as e: - err_exists = False - except Exception as e: + out_exist = self._platform.check_file_exists(remote_logs[0]) # will do 5 retries + except IOError as e: out_exist = False - err_exist = False - pass + try: + err_exist = self._platform.check_file_exists(remote_logs[1]) # will do 5 retries + except IOError as e: + err_exists = False if not out_exist or not err_exist: sleeptime = sleeptime + 5 i = i + 1 sleep(sleeptime) if i >= retries: if not out_exist or not err_exist: - Log.printlog("Retries = {0}, Failed to retrieve log files {1} and {2}".format( - retries, remote_logs[0], remote_logs[1]), 6001) - + Log.printlog("Failed to retrieve log files {1} and {2} e=6001".format(retries,remote_logs[0],remote_logs[1])) + sleep(5) # safe wait before end a thread + return if copy_remote_logs: if local_logs != remote_logs: # unifying names for log files - self.synchronize_logs( - self.platform, remote_logs, local_logs) + self.synchronize_logs(self._platform, remote_logs, local_logs) remote_logs = local_logs - self.platform.get_logs_files(self.expid, remote_logs) - # Update the logs with Autosubmit Job Id Brand - try: - for local_log in local_logs: - self.platform.write_jobid(self.id, os.path.join( - self._tmp_path, 'LOG_' + str(self.expid), local_log)) - except BaseException as e: - Log.printlog("Trace {0} \n Failed to write the {1}".format( - e.message, self.name), 6001) - + self._platform.get_logs_files(self.expid, remote_logs) + # Update the logs with Autosubmit Job Id Brand + try: + for local_log in local_logs: + self._platform.write_jobid(self.id, os.path.join(self._tmp_path, 'LOG_' + str(self.expid), local_log)) + except BaseException as e: + Log.printlog("Trace {0} \n Failed to write the {1} e=6001".format(e.message,self.name)) + sleep(5) # safe wait before end a thread + return except AutosubmitError as e: - Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( - e.message, self.name), 6001) + Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(e.message,self.name), 6001) + sleep(5) # safe wait before end a thread + return except AutosubmitCritical as e: # Critical errors can't be recovered. Failed configuration or autosubmit error - Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format( - e.message, self.name), 6001) - try: - self.platform.closeConnection() - except: - pass - sleep(5) # safe wait before end a thread + Log.printlog("Trace {0} \nFailed to retrieve log file for job {0}".format(e.message,self.name), 6001) + sleep(5) # safe wait before end a thread + return + sleep(5) # safe wait before end a thread return def update_status(self, copy_remote_logs=False): @@ -602,9 +592,8 @@ class Job(object): Log.debug( "{0} job seems to have completed: checking...".format(self.name)) - if not self.platform.get_completed_files(self.name): - log_name = os.path.join( - self._tmp_path, self.name + '_COMPLETED') + if not self._platform.get_completed_files(self.name,wrapper_failed=self.packed): + log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') self.check_completion() else: @@ -619,19 +608,16 @@ class Job(object): elif self.status == Status.COMPLETED: Log.result("Job {0} is COMPLETED", self.name) elif self.status == Status.FAILED: - Log.printlog("Job {0} is FAILED. Checking completed files to confirm the failure...".format( - self.name), 3000) - self.platform.get_completed_files(self.name) + Log.printlog("Job {0} is FAILED. Checking completed files to confirm the failure...".format(self.name),3000) + self._platform.get_completed_files(self.name,wrapper_failed=self.packed) self.check_completion() if self.status == Status.COMPLETED: - Log.printlog(" there is a COMPLETED file.", 3000) Log.result("Job {0} is COMPLETED", self.name) else: self.update_children_status() elif self.status == Status.UNKNOWN: - Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format( - self.name), 3000) - self.platform.get_completed_files(self.name) + Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format(self.name),3000) + self._platform.get_completed_files(self.name,wrapper_failed=self.packed) self.check_completion(Status.UNKNOWN) if self.status == Status.UNKNOWN: Log.printlog("Job {0} is UNKNOWN. Checking completed files to confirm the failure...".format( @@ -677,7 +663,7 @@ class Job(object): def update_children_status(self): children = list(self.children) for child in children: - if child.status in [Status.SUBMITTED, Status.RUNNING, Status.QUEUING, Status.UNKNOWN]: + if child.level == 0 and child.status in [Status.SUBMITTED, Status.RUNNING, Status.QUEUING, Status.UNKNOWN]: child.status = Status.FAILED children += list(child.children) @@ -690,7 +676,7 @@ class Job(object): """ log_name = os.path.join(self._tmp_path, self.name + '_COMPLETED') - if os.path.exists(log_name): + if os.path.exists(log_name): #TODO self.status = Status.COMPLETED else: Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format( @@ -778,7 +764,7 @@ class Job(object): else: parameters['Chunk_LAST'] = 'FALSE' - job_platform = self.platform + job_platform = self._platform self.processors = as_conf.get_processors(self.section) self.threads = as_conf.get_threads(self.section) self.tasks = as_conf.get_tasks(self.section) @@ -854,7 +840,7 @@ class Job(object): template = template_file.read() else: if self.type == Type.BASH: - template = 'sleep 60' + template = 'sleep 5' elif self.type == Type.PYTHON: template = 'time.sleep(5)' elif self.type == Type.R: @@ -893,7 +879,7 @@ class Job(object): "Job {0} does not have an correct template// template not found".format(self.name), 7014) def _get_paramiko_template(self, snippet, template): - current_platform = self.platform + current_platform = self._platform return ''.join([snippet.as_header(current_platform.get_header(self)), template, snippet.as_tailer()]) @@ -1019,7 +1005,7 @@ class Job(object): :return: True if succesful, False otherwise :rtype: bool """ - if self.platform.get_stat_file(self.name, retries=5): + if self._platform.get_stat_file(self.name, retries=5): start_time = self.check_start_time() else: Log.printlog('Could not get start time for {0}. Using current time as an approximation'.format( @@ -1042,7 +1028,7 @@ class Job(object): :param completed: True if job was completed successfully, False otherwise :type completed: bool """ - self.platform.get_stat_file(self.name, retries=0) + self._platform.get_stat_file(self.name, retries=5) end_time = self.check_end_time() path = os.path.join(self._tmp_path, self.name + '_TOTAL_STATS') f = open(path, 'a') @@ -1168,17 +1154,19 @@ class WrapperJob(Job): def __init__(self, name, job_id, status, priority, job_list, total_wallclock, num_processors, platform, as_config, hold): super(WrapperJob, self).__init__(name, job_id, status, priority) + self.failed = False self.job_list = job_list # divide jobs in dictionary by state? self.wallclock = total_wallclock self.num_processors = num_processors self.running_jobs_start = OrderedDict() - self.platform = platform + self._platform = platform self.as_config = as_config # save start time, wallclock and processors?! self.checked_time = datetime.datetime.now() self.hold = hold + self.inner_jobs_running = list() def _queuing_reason_cancel(self, reason): try: @@ -1217,8 +1205,31 @@ class WrapperJob(Job): # Fail can come from check function or running/completed checkers. if self.status in [Status.FAILED, Status.UNKNOWN]: self.status = Status.FAILED - self.cancel_failed_wrapper_job() - self.update_failed_jobs() + if self.prev_status not in [Status.FAILED, Status.UNKNOWN]: + sleep(1) + else: + self.failed = True + self._check_running_jobs() + if len(self.inner_jobs_running) > 0: + still_running = True + if not self.failed: + if self._platform.check_file_exists('WRAPPER_FAILED', wrapper_failed=True): + for job in self.inner_jobs_running: + if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True): + Log.info("Wrapper {0} Failed, checking inner_jobs...".format(self.name)) + self.failed = True + self._platform.delete_file('WRAPPER_FAILED') + break + + if self.failed: + self.update_failed_jobs() + if len(self.inner_jobs_running) <= 0: + still_running = False + else: + still_running = False + if not still_running: + self.cancel_failed_wrapper_job() + def check_inner_jobs_completed(self, jobs): not_completed_jobs = [ @@ -1226,7 +1237,7 @@ class WrapperJob(Job): not_completed_job_names = [job.name for job in not_completed_jobs] job_names = ' '.join(not_completed_job_names) if job_names: - completed_files = self.platform.check_completed_files(job_names) + completed_files = self._platform.check_completed_files(job_names) completed_jobs = [] for job in not_completed_jobs: if completed_files and len(completed_files) > 0: @@ -1245,21 +1256,21 @@ class WrapperJob(Job): def _check_inner_jobs_queue(self, prev_status): reason = str() - if self.platform.type == 'slurm': - self.platform.send_command( - self.platform.get_queue_status_cmd(self.id)) - reason = self.platform.parse_queue_reason( - self.platform._ssh_output, self.id) + if self._platform.type == 'slurm': + self._platform.send_command(self._platform.get_queue_status_cmd(self.id)) + reason = self._platform.parse_queue_reason(self._platform._ssh_output, self.id) if self._queuing_reason_cancel(reason): - Log.printlog("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}".format( - self.name, reason), 6009) + Log.printlog("Job {0} will be cancelled and set to FAILED as it was queuing due to {1}".format(self.name,reason),6009) + #while running jobs? + self._check_running_jobs() #todo + self.update_failed_jobs(canceled_wrapper=True) self.cancel_failed_wrapper_job() - self.update_failed_jobs() + return if reason == '(JobHeldUser)': if self.hold is False: # SHOULD BE MORE CLASS (GET_scontrol realease but not sure if this can be implemented on others PLATFORMS - self.platform.send_command( + self._platform.send_command( "scontrol release " + "{0}".format(self.id)) self.status = Status.QUEUING for job in self.job_list: @@ -1272,8 +1283,8 @@ class WrapperJob(Job): elif reason == '(JobHeldAdmin)': Log.debug( "Job {0} Failed to be HELD, canceling... ", self.name) - self.platform.send_command( - self.platform.cancel_cmd + " {0}".format(self.id)) + self._platform.send_command( + self._platform.cancel_cmd + " {0}".format(self.id)) self.status = Status.WAITING else: Log.info("Job {0} is QUEUING {1}", self.name, reason) @@ -1295,9 +1306,9 @@ class WrapperJob(Job): job.update_status(self.as_config.get_copy_remote_logs() == 'true') return True return False - def _check_running_jobs(self): not_finished_jobs_dict = OrderedDict() + self.inner_jobs_running = list() not_finished_jobs = [job for job in self.job_list if job.status not in [ Status.COMPLETED, Status.FAILED]] for job in not_finished_jobs: @@ -1305,9 +1316,10 @@ class WrapperJob(Job): Status.COMPLETED or self.status == Status.COMPLETED] if job.parents is None or len(tmp) == len(job.parents): not_finished_jobs_dict[job.name] = job + self.inner_jobs_running.append(job) if len(not_finished_jobs_dict.keys()) > 0: # Only running jobs will enter there not_finished_jobs_names = ' '.join(not_finished_jobs_dict.keys()) - remote_log_dir = self.platform.get_remote_log_dir() + remote_log_dir = self._platform.get_remote_log_dir() # PREPARE SCRIPT TO SEND command = textwrap.dedent(""" cd {1} @@ -1338,7 +1350,7 @@ done content = '' while content == '' and retries > 0: self._platform.send_command(command, False) - content = self.platform._ssh_output.split('\n') + content = self._platform._ssh_output.split('\n') # content.reverse() for line in content[:-1]: out = line.split() @@ -1348,64 +1360,61 @@ done if len(out) > 1: if job not in self.running_jobs_start: start_time = self._check_time(out, 1) - Log.debug("Job {0} started at {1}".format( - jobname, str(parse_date(start_time)))) - + Log.info("Job {0} started at {1}".format(jobname, str(parse_date(start_time)))) self.running_jobs_start[job] = start_time - job.new_status = Status.RUNNING - job.update_status( - self.as_config.get_copy_remote_logs() == 'true') - + job.status = Status.RUNNING + #job.update_status(self.as_config.get_copy_remote_logs() == 'true') if len(out) == 2: Log.info("Job {0} is RUNNING".format(jobname)) - over_wallclock = self._check_inner_job_wallclock( - job) + over_wallclock = self._check_inner_job_wallclock(job) if over_wallclock: - Log.printlog( - "Job {0} is FAILED".format(jobname), 6009) - + Log.printlog("Job {0} is FAILED".format(jobname),6009) elif len(out) == 3: end_time = self._check_time(out, 2) self._check_finished_job(job) - Log.info("Job {0} finished at {1}".format( - jobname, str(parse_date(end_time)))) + Log.info("Job {0} finished at {1}".format(jobname, str(parse_date(end_time)))) if content == '': sleep(wait) retries = retries - 1 - + temp_list = self.inner_jobs_running + self.inner_jobs_running = [job for job in temp_list if job.status == Status.RUNNING] if retries == 0 or over_wallclock: self.status = Status.FAILED - - def _check_finished_job(self, job): - wait = 2 - retries = 5 - output = '' - while output == '' and retries > 0: - output = self.platform.check_completed_files(job.name) - if output is None or output == '': - sleep(wait) - retries = retries - 1 - if output is not None and output != '' and 'COMPLETED' in output: + def _check_finished_job(self, job , failed_file=False): + if not failed_file: + wait = 2 + retries = 2 + output = '' + while output == '' and retries > 0: + output = self._platform.check_completed_files(job.name) + if output is None or output == '': + sleep(wait) + retries = retries - 1 + if failed_file or (output is not None and output != '' and 'COMPLETED' in output): job.new_status = Status.COMPLETED job.update_status(self.as_config.get_copy_remote_logs() == 'true') else: - Log.info( - "No completed filed found, setting {0} to FAILED...".format(job.name)) + #Log.info("No completed filed found, setting {0} to FAILED...".format(job.name)) job.new_status = Status.FAILED job.update_status(self.as_config.get_copy_remote_logs() == 'true') self.running_jobs_start.pop(job, None) - def update_failed_jobs(self): - not_finished_jobs = [job for job in self.job_list if job.status not in [ - Status.FAILED, Status.COMPLETED]] - for job in not_finished_jobs: - self._check_finished_job(job) + def update_failed_jobs(self,canceled_wrapper=False): + running_jobs = self.inner_jobs_running + self.inner_jobs_running = list() + for job in running_jobs: + if job.platform.check_file_exists('{0}_FAILED'.format(job.name), wrapper_failed=True): + if job.platform.get_file('{0}_FAILED'.format(job.name), False, wrapper_failed=True): + self._check_finished_job(job) + else: + self.inner_jobs_running.append(job) def cancel_failed_wrapper_job(self): - Log.printlog("Cancelling job with id {0}".format(self.id), 6009) - self.platform.send_command( - self.platform.cancel_cmd + " " + str(self.id)) - + Log.printlog("Cancelling job with id {0}".format(self.id),6009) + self._platform.send_command(self._platform.cancel_cmd + " " + str(self.id)) + for job in self.job_list: + if job.status not in [Status.COMPLETED, Status.FAILED]: + job.status = Status.WAITING def _update_completed_jobs(self): for job in self.job_list: if job.status == Status.RUNNING: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 4e030b88b2b5c95e1d7927efc03af452468656d2..a3f7f8911d2f6c509dcbf3741bea286f59a61808 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -673,7 +673,7 @@ class JobList(object): :rtype: list """ - completed_jobs = [job for job in self._job_list if (platform is None or job.platform == platform) and + completed_jobs = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.COMPLETED] if wrapper: return [job for job in completed_jobs if job.packed is False] @@ -690,7 +690,7 @@ class JobList(object): :return: completed jobs :rtype: list """ - uncompleted_jobs = [job for job in self._job_list if (platform is None or job.platform == platform) and + uncompleted_jobs = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status != Status.COMPLETED] if wrapper: @@ -709,10 +709,10 @@ class JobList(object): """ submitted = list() if hold: - submitted = [job for job in self._job_list if (platform is None or job.platform == platform) and + submitted = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.SUBMITTED and job.hold == hold] else: - submitted = [job for job in self._job_list if (platform is None or job.platform == platform) and + submitted = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.SUBMITTED] if wrapper: return [job for job in submitted if job.packed is False] @@ -728,7 +728,7 @@ class JobList(object): :return: running jobs :rtype: list """ - running = [job for job in self._job_list if (platform is None or job.platform == platform) and + running = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.RUNNING] if wrapper: return [job for job in running if job.packed is False] @@ -744,7 +744,7 @@ class JobList(object): :return: queuedjobs :rtype: list """ - queuing = [job for job in self._job_list if (platform is None or job.platform == platform) and + queuing = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.QUEUING] if wrapper: return [job for job in queuing if job.packed is False] @@ -760,7 +760,7 @@ class JobList(object): :return: failed jobs :rtype: list """ - failed = [job for job in self._job_list if (platform is None or job.platform == platform) and + failed = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.FAILED] if wrapper: return [job for job in failed if job.packed is False] @@ -776,7 +776,7 @@ class JobList(object): :return: all jobs :rtype: list """ - unsubmitted = [job for job in self._job_list if (platform is None or job.platform == platform) and + unsubmitted = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and (job.status != Status.SUBMITTED and job.status != Status.QUEUING and job.status == Status.RUNNING and job.status == Status.COMPLETED)] if wrapper: @@ -809,7 +809,7 @@ class JobList(object): :return: ready jobs :rtype: list """ - ready = [job for job in self._job_list if (platform is None or job.platform == platform) and + ready = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.READY and job.hold is hold] if wrapper: @@ -826,7 +826,7 @@ class JobList(object): :return: prepared jobs :rtype: list """ - prepared = [job for job in self._job_list if (platform is None or job.platform == platform) and + prepared = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.PREPARED] return prepared @@ -839,7 +839,7 @@ class JobList(object): :return: waiting jobs :rtype: list """ - waiting_jobs = [job for job in self._job_list if (platform is None or job.platform == platform) and + waiting_jobs = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.WAITING] if wrapper: return [job for job in waiting_jobs if job.packed is False] @@ -868,7 +868,7 @@ class JobList(object): :return: jobs in platforms :rtype: list """ - return [job for job in self._job_list if (platform is None or job.platform == platform) and + return [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.HELD] def get_unknown(self, platform=None, wrapper=False): @@ -880,7 +880,7 @@ class JobList(object): :return: unknown state jobs :rtype: list """ - submitted = [job for job in self._job_list if (platform is None or job.platform == platform) and + submitted = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.UNKNOWN] if wrapper: return [job for job in submitted if job.packed is False] @@ -896,7 +896,7 @@ class JobList(object): :return: unknown state jobs :rtype: list """ - suspended = [job for job in self._job_list if (platform is None or job.platform == platform) and + suspended = [job for job in self._job_list if (platform is None or job.platform.name.lower() == platform.name.lower()) and job.status == Status.SUSPENDED] if wrapper: return [job for job in suspended if job.packed is False] @@ -996,7 +996,7 @@ class JobList(object): def get_in_ready_grouped_id(self, platform): jobs = [] [jobs.append(job) for job in jobs if ( - platform is None or job._platform.name is platform.name)] + platform is None or job.platform.name is platform.name)] jobs_by_id = dict() for job in jobs: @@ -1153,7 +1153,7 @@ class JobList(object): def parameters(self, value): self._parameters = value - def update_list(self, as_conf, store_change=True, fromSetStatus=False): + def update_list(self, as_conf, store_change=True, fromSetStatus=False,submitter=None): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -1182,6 +1182,14 @@ class JobList(object): parent for parent in job.parents if parent.status == Status.COMPLETED] if len(tmp) == len(job.parents): job.status = Status.READY + if submitter is not None: + job.platform = submitter.platforms[job.platform_name.lower()] + job.platform.test_connection() + job.platform = submitter.platforms[job.platform_name.lower()] + job.platform.test_connection() + + job.id = None + job.packed = False save = True Log.debug( @@ -1192,7 +1200,12 @@ class JobList(object): job.packed = False Log.debug( "Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) - + else: + job.status = Status.FAILED + job.packed = False + save = True + Log.debug( + "Job is failed".format(job.name)) # if waiting jobs has all parents completed change its State to READY for job in self.get_completed(): if job.synchronize is not None: diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 88075d99038cd44a6cf2fc2eeed867d1dc3dd8e9..76a4f2c22c0370853766314dcc9a3b3cd600b551 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -77,13 +77,13 @@ class JobPackager(object): Log.debug("Number of jobs prepared: {0}", len( jobs_list.get_prepared(platform))) if len(jobs_list.get_prepared(platform)) > 0: - Log.info("Jobs ready for {0}: {1}", self._platform.name, len( + Log.debug("Jobs ready for {0}: {1}", self._platform.name, len( jobs_list.get_prepared(platform))) else: Log.debug("Number of jobs ready: {0}", len( jobs_list.get_ready(platform, hold=False))) if len(jobs_list.get_ready(platform)) > 0: - Log.info("Jobs ready for {0}: {1}", self._platform.name, len( + Log.debug("Jobs ready for {0}: {1}", self._platform.name, len( jobs_list.get_ready(platform))) self._maxTotalProcessors = 0 def compute_weight(self,job_list): @@ -235,10 +235,8 @@ class JobPackager(object): elif self.wrapper_type in ['vertical-horizontal', 'horizontal-vertical']: wrapped = True built_packages_tmp = list() - built_packages_tmp.append(self._build_hybrid_package( - jobs_to_submit_by_section[section], max_wrapped_jobs, section)) + built_packages_tmp.append(self._build_hybrid_package(jobs_to_submit_by_section[section], max_wrapped_jobs, section)) if wrapped: - for p in built_packages_tmp: failed_innerjobs = False #Check failed jobs first @@ -343,8 +341,7 @@ class JobPackager(object): jobs_section = dict() for job in jobs_list: # This iterator will always return None if there is no '&' defined in the section name - section = next( - (s for s in sections_split if job.section in s and '&' in s), None) + section = next((s for s in sections_split if job.section in s and '&' in s), None) if section is None: section = job.section if section not in jobs_section: @@ -444,12 +441,16 @@ class JobPackager(object): max_procs = horizontal_packager.total_processors new_package = horizontal_packager.get_next_packages( section, max_wallclock=self._platform.max_wallclock, horizontal_vertical=True, max_procs=max_procs) + if new_package is not None: current_package += new_package for i in range(len(current_package)): total_wallclock = sum_str_hours(total_wallclock, wallclock) - + if len(current_package) > 1: + for level in range(1,len(current_package)): + for job in current_package[level]: + job.level=level return JobPackageHorizontalVertical(current_package, max_procs, total_wallclock, jobs_resources=jobs_resources, configuration=self._as_config) @@ -465,11 +466,15 @@ class JobPackager(object): job_list = JobPackagerVerticalSimple([job], job.wallclock, self.max_jobs, max_wrapped_jobs, self._platform.max_wallclock).build_vertical_package(job) + current_package.append(job_list) for job in current_package[-1]: total_wallclock = sum_str_hours(total_wallclock, job.wallclock) - + if len(current_package) > 1: + for level in range(1,len(current_package)): + for job in current_package[level]: + job.level=level return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock, jobs_resources=jobs_resources, method=self.wrapper_method, configuration=self._as_config) @@ -498,7 +503,7 @@ class JobPackagerVertical(object): self.max_wrapped_jobs = max_wrapped_jobs self.max_wallclock = max_wallclock - def build_vertical_package(self, job): + def build_vertical_package(self, job,level=0): """ Goes trough the job and all the related jobs (children, or part of the same date member ordered group), finds those suitable and groups them together into a wrapper. @@ -522,9 +527,10 @@ class JobPackagerVertical(object): if self.total_wallclock <= self.max_wallclock: # Marking, this is later tested in the main loop child.packed = True + child.level = level self.jobs_list.append(child) # Recursive call - return self.build_vertical_package(child) + return self.build_vertical_package(child,level=level+1) # Wrapped jobs are accumulated and returned in this list return self.jobs_list diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 9f33f892c8e0630f70d22b216b3aba12ad7c40c4..21bd93e4263c753ba6d415d3721b7a966ad30f07 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -46,10 +46,10 @@ class JobPackageBase(object): self.hold = False try: self._tmp_path = jobs[0]._tmp_path - self._platform = jobs[0].platform + self._platform = jobs[0]._platform self._custom_directives = set() for job in jobs: - if job.platform.name != self._platform.name or job.platform is None: + if job._platform.name != self._platform.name or job.platform is None: raise Exception('Only one valid platform per package') except IndexError: raise Exception('No jobs given') @@ -94,7 +94,8 @@ class JobPackageBase(object): exit=True break if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) + if configuration.get_project_type().lower() != "none": + raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) if not job.check_script(configuration, parameters,show_logs=job.check_warnings): Log.warning("Script {0} check failed",job.name) Log.warning("On submission script has some empty variables") diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index e1ec6d7da57d25b3195796ca728c1bfd692a2255..e27923c74dd1cba45a0dabdcc77d51323b889b79 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -157,7 +157,7 @@ class EcPlatform(ParamikoPlatform): raise AutosubmitError('Could not send file {0} to {1}'.format(os.path.join(self.tmp_path, filename),os.path.join(self.get_files_path(), filename)),6005,e.message) return True - def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False): + def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False,wrapper_failed=False): local_path = os.path.join(self.tmp_path, relative_path) if not os.path.exists(local_path): os.makedirs(local_path) diff --git a/autosubmit/platforms/headers/lsf_header.py b/autosubmit/platforms/headers/lsf_header.py index af22da2ca7fcd519ffb49cfb89caddc0b4a9ee59..0ea44b77f3a96d9cf47392005c96a7a675af655f 100644 --- a/autosubmit/platforms/headers/lsf_header.py +++ b/autosubmit/platforms/headers/lsf_header.py @@ -54,7 +54,7 @@ class LsfHeader(object): # noinspection PyMethodMayBeStatic def get_exclusivity(self, job): - if job.platform.exclusivity == 'true': + if job._platform.exclusivity == 'true': return "#BSUB -x" else: return "" diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 81c969f824152627dba8aefe6531c25e67528096..56936ca4e760d9b85d0fcc2674f06848230f4b53 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -118,10 +118,10 @@ class LocalPlatform(ParamikoPlatform): raise return True - def check_file_exists(self,filename): + def check_file_exists(self,filename,wrapper_failed=False): return True - def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False): + def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False,wrapper_failed=False): local_path = os.path.join(self.tmp_path, relative_path) if not os.path.exists(local_path): os.makedirs(local_path) @@ -140,7 +140,7 @@ class LocalPlatform(ParamikoPlatform): return True # Moves .err .out - def check_file_exists(self, src): + def check_file_exists(self, src,wrapper_failed=False): """ Moves a file on the platform :param src: source name @@ -160,11 +160,14 @@ class LocalPlatform(ParamikoPlatform): if not file_exist: # File doesn't exist, retry in sleeptime Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, max_retries - retries, remote_path) - sleep(sleeptime) - sleeptime = sleeptime + 5 - retries = retries + 1 + if not wrapper_failed: + sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 + else: + retries = 9999 except BaseException as e: # Unrecoverable error - Log.printlog("Crashed while retrieving logs",6001) + Log.printlog("File does not exist, logs {0} {1}".format(self.get_files_path(),src),6001) file_exist = False # won't exist retries = 999 # no more retries diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index e6b4891b960dfceb86a6592b5600b4adb9a3bc6f..b6d23e19c84b9c3e1ae4cd9e24bdd2f145e50cf2 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -207,7 +207,7 @@ class ParamikoPlatform(Platform): raise AutosubmitError('Send file failed. Connection seems to no be active',6004) # Gets .err and .out - def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False): + def get_file(self, filename, must_exist=True, relative_path='',ignore_log = False,wrapper_failed=False): """ Copies a file from the current platform to experiment's tmp folder @@ -363,7 +363,7 @@ class ParamikoPlatform(Platform): Log.error('check_job() The job id ({0}) is not an integer neither a string.', job_id) job.new_status = job_status sleep_time=5 - while not (self.send_command(self.get_checkjob_cmd(job_id)) and retries >= 0) or (self.get_ssh_output() == "" and retries >= 0): + while not ( self.send_command(self.get_checkjob_cmd(job_id)) or (self.get_ssh_output() == "") ) and retries > 0: retries = retries - 1 Log.debug('Retrying check job command: {0}', self.get_checkjob_cmd(job_id)) Log.debug('retries left {0}', retries) diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 71e69a0519f608b69b0d019764feb18d8c676951..1334110b397320bd2d12e0c5d07d2f5469b558d9 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -153,7 +153,7 @@ class Platform(object): """ raise NotImplementedError - def get_file(self, filename, must_exist=True, relative_path='', ignore_log=False): + def get_file(self, filename, must_exist=True, relative_path='', ignore_log=False,wrapper_failed=False): """ Copies a file from the current platform to experiment's tmp folder @@ -209,7 +209,7 @@ class Platform(object): self.get_files([job_out_filename, job_err_filename], False, 'LOG_{0}'.format(exp_id)) - def get_completed_files(self, job_name, retries=0, recovery=False): + def get_completed_files(self, job_name, retries=0, recovery=False,wrapper_failed=False): """ Get the COMPLETED file of the given job @@ -226,8 +226,8 @@ class Platform(object): return True else: return False - if self.check_file_exists('{0}_COMPLETED'.format(job_name)): - if self.get_file('{0}_COMPLETED'.format(job_name), False): + if self.check_file_exists('{0}_COMPLETED'.format(job_name),wrapper_failed=wrapper_failed): + if self.get_file('{0}_COMPLETED'.format(job_name), False,wrapper_failed=wrapper_failed): return True else: return False @@ -264,7 +264,7 @@ class Platform(object): return True return False - def check_file_exists(self, src): + def check_file_exists(self, src,wrapper_failed=False): return True def get_stat_file(self, job_name, retries=0): diff --git a/autosubmit/platforms/psplatform.py b/autosubmit/platforms/psplatform.py index e8981eec8e135c0309e2ae838d5146d941243230..9861db07e1e1986c2a46553868fb8b19d47f12ab 100644 --- a/autosubmit/platforms/psplatform.py +++ b/autosubmit/platforms/psplatform.py @@ -79,27 +79,3 @@ class PsPlatform(ParamikoPlatform): def get_checkjob_cmd(self, job_id): return self.get_pscall(job_id) - # def connect(self): - # """ - # In this case, it does nothing because connection is established for each command - # - # :return: True - # :rtype: bool - # """ - # self.connected = True - # def restore_connection(self): - # """ - # In this case, it does nothing because connection is established for each command - # - # :return: True - # :rtype: bool - # """ - # self.connected = True - # def test_connection(self): - # """ - # In this case, it does nothing because connection is established for each command - # - # :return: True - # :rtype: bool - # """ - # self.connected = True \ No newline at end of file diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index bcd6fc2043195218caafc3f6706abb9220a59a91..37669be507207eba1b4e147b20e598e4216aaca3 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -345,9 +345,11 @@ class SlurmPlatform(ParamikoPlatform): if not hold: self._submit_script_file.write( self._submit_cmd + job_script + "\n") + #self._submit_script_file.close() else: self._submit_script_file.write( self._submit_hold_cmd + job_script + "\n") + #self._submit_script_file.close() def get_checkjob_cmd(self, job_id): return 'sacct -n -X -j {1} -o "State"'.format(self.host, job_id) @@ -419,7 +421,7 @@ class SlurmPlatform(ParamikoPlatform): def allocated_nodes(): return """os.system("scontrol show hostnames $SLURM_JOB_NODELIST > node_list")""" - def check_file_exists(self, filename): + def check_file_exists(self, filename,wrapper_failed=False): file_exist = False sleeptime = 5 retries = 0 @@ -433,12 +435,15 @@ class SlurmPlatform(ParamikoPlatform): except IOError: # File doesn't exist, retry in sleeptime Log.debug("{2} File still no exists.. waiting {0}s for a new retry ( retries left: {1})", sleeptime, max_retries - retries, os.path.join(self.get_files_path(), filename)) - sleep(sleeptime) - sleeptime = sleeptime + 5 - retries = retries + 1 + if not wrapper_failed: + sleep(sleeptime) + sleeptime = sleeptime + 5 + retries = retries + 1 + else: + retries = 9999 except BaseException as e: # Unrecoverable error Log.critical( - "Crashed while retrieving remote logs", 6001, e.message) + "remote logs {0} couldn't be recovered".format(filename), 6001, e.message) file_exist = False # won't exist retries = 999 # no more retries diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 4d295bb80b6aa8814e53720400123f261c59c1fe..06b86563b40df93eee52ae7ebef86aedacdd7b18 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -19,6 +19,8 @@ import textwrap import math +import random +import string class WrapperDirector: """ @@ -119,8 +121,17 @@ class WrapperBuilder(object): class PythonWrapperBuilder(WrapperBuilder): - + def get_random_alphanumeric_string(self,letters_count, digits_count): + sample_str = ''.join((random.choice(string.ascii_letters) for i in range(letters_count))) + sample_str += ''.join((random.choice(string.digits) for i in range(digits_count))) + + # Convert string to list and shuffle it to mix letters and digits + sample_list = list(sample_str) + random.shuffle(sample_list) + final_string = ''.join(sample_list) + return final_string+"_FAILED" def build_imports(self): + return textwrap.dedent(""" import os import sys @@ -130,10 +141,23 @@ class PythonWrapperBuilder(WrapperBuilder): from math import ceil from collections import OrderedDict import copy - + class Unbuffered(object): + def __init__(self, stream): + self.stream = stream + def write(self, data): + self.stream.write(data) + self.stream.flush() + def writelines(self, datas): + self.stream.writelines(datas) + self.stream.flush() + def __getattr__(self, attr): + return getattr(self.stream, attr) + + sys.stdout = Unbuffered(sys.stdout) + wrapper_id = "{1}" # Defining scripts to be run scripts= {0} - """).format(str(self.job_scripts), '\n'.ljust(13)) + """).format(str(self.job_scripts), self.get_random_alphanumeric_string(5,5),'\n'.ljust(13)) def build_job_thread(self): return textwrap.dedent(""" @@ -175,23 +199,20 @@ class PythonWrapperBuilder(WrapperBuilder): def build_cores_list(self): return textwrap.dedent(""" - total_cores = {0} - jobs_resources = {1} - - processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) - - idx = 0 - all_cores = [] - while total_cores > 0: - if processors_per_node > 0: - processors_per_node -= 1 - total_cores -= 1 - all_cores.append(all_nodes[idx]) - else: - idx += 1 - processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) - +total_cores = {0} +jobs_resources = {1} +processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) +idx = 0 +all_cores = [] +while total_cores > 0: + if processors_per_node > 0: + processors_per_node -= 1 + total_cores -= 1 + all_cores.append(all_nodes[idx]) + else: + idx += 1 processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) +processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) """).format(self.num_procs, str(self.jobs_resources), '\n'.ljust(13)) def build_machinefiles(self): @@ -202,28 +223,26 @@ class PythonWrapperBuilder(WrapperBuilder): def build_machinefiles_standard(self): return textwrap.dedent(""" - machines = str() - - cores = int(jobs_resources[section]['PROCESSORS']) - tasks = int(jobs_resources[section]['TASKS']) - nodes = int(ceil(int(cores)/float(tasks))) - if tasks < processors_per_node: - cores = tasks - - job_cores = cores - while nodes > 0: - while cores > 0: - if len(all_cores) > 0: - node = all_cores.pop(0) - if node: - machines += node +"_NEWLINE_" - cores -= 1 - for rest in range(processors_per_node-tasks): - if len(all_cores) > 0: - all_cores.pop(0) - nodes -= 1 + machines = str() + cores = int(jobs_resources[section]['PROCESSORS']) + tasks = int(jobs_resources[section]['TASKS']) + nodes = int(ceil(int(cores)/float(tasks))) if tasks < processors_per_node: - cores = job_cores + cores = tasks + job_cores = cores + while nodes > 0: + while cores > 0: + if len(all_cores) > 0: + node = all_cores.pop(0) + if node: + machines += node +"_NEWLINE_" + cores -= 1 + for rest in range(processors_per_node-tasks): + if len(all_cores) > 0: + all_cores.pop(0) + nodes -= 1 + if tasks < processors_per_node: + cores = job_cores """).format('\n'.ljust(13)) def _create_components_dict(self): @@ -266,122 +285,139 @@ class PythonWrapperBuilder(WrapperBuilder): def build_sequential_threads_launcher(self, jobs_list, thread, footer=True): sequential_threads_launcher = textwrap.dedent(""" + failed_wrapper = os.path.join(os.getcwd(),wrapper_id) for i in range(len({0})): current = {1} current.start() current.join() + if os.path.exists(failed_wrapper): + os.remove(os.path.join(os.getcwd(),wrapper_id)) + wrapper_failed = os.path.join(os.getcwd(),"WRAPPER_FAILED") + open(wrapper_failed, 'w').close() + os._exit(1) + """).format(jobs_list, thread, '\n'.ljust(13)) if footer: sequential_threads_launcher += self._indent(textwrap.dedent(""" completed_filename = {0}[i].replace('.cmd', '_COMPLETED') completed_path = os.path.join(os.getcwd(), completed_filename) + failed_filename = {0}[i].replace('.cmd', '_FAILED') + failed_path = os.path.join(os.getcwd(), failed_filename) + failed_wrapper = os.path.join(os.getcwd(), wrapper_id) if os.path.exists(completed_path): print datetime.now(), "The job ", current.template," has been COMPLETED" else: + open(failed_wrapper,'w').close() + open(failed_path, 'w').close() print datetime.now(), "The job ", current.template," has FAILED" - {1} - """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) + #{1} + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 8) return sequential_threads_launcher def build_parallel_threads_launcher(self, jobs_list, thread, footer=True): parallel_threads_launcher = textwrap.dedent(""" - pid_list = [] - - for i in range(len({0})): - if type({0}[i]) != list: - job = {0}[i] - jobname = job.replace(".cmd", '') - section = jobname.split('_')[-1] - - {2} - current = {1}({0}[i], i+self.id_run) - pid_list.append(current) - current.start() - - # Waiting until all scripts finish - for i in range(len(pid_list)): - pid = pid_list[i] - pid.join() +pid_list = [] +for i in range(len({0})): + if type({0}[i]) != list: + job = {0}[i] + jobname = job.replace(".cmd", '') + section = jobname.split('_')[-1] + {2} + current = {1}({0}[i], i+self.id_run) + pid_list.append(current) + current.start() +# Waiting until all scripts finish +for i in range(len(pid_list)): + pid = pid_list[i] + pid.join() """).format(jobs_list, thread, self._indent(self.build_machinefiles(), 8), '\n'.ljust(13)) - if footer: parallel_threads_launcher += self._indent(textwrap.dedent(""" - completed_filename = {0}[i].replace('.cmd', '_COMPLETED') - completed_path = os.path.join(os.getcwd(), completed_filename) - if os.path.exists(completed_path): - print datetime.now(), "The job ", pid.template," has been COMPLETED" - else: - print datetime.now(), "The job ", pid.template," has FAILED" - {1} - """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) - + completed_filename = {0}[i].replace('.cmd', '_COMPLETED') + completed_path = os.path.join(os.getcwd(), completed_filename) + failed_filename = {0}[i].replace('.cmd', '_FAILED') + failed_path = os.path.join(os.getcwd(),failed_filename) + failed_wrapper = os.path.join(os.getcwd(),wrapper_id) + if os.path.exists(completed_path): + print datetime.now(), "The job ", pid.template," has been COMPLETED" + else: + open(failed_wrapper, 'w').close() + open(failed_path, 'w').close() + print datetime.now(), "The job ", pid.template," has FAILED" + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) return parallel_threads_launcher def build_parallel_threads_launcher_horizontal(self, jobs_list, thread, footer=True): parallel_threads_launcher = textwrap.dedent(""" - pid_list = [] - - for i in range(len({0})): - if type({0}[i]) != list: - job = {0}[i] - jobname = job.replace(".cmd", '') - section = jobname.split('_')[-1] - - {2} - current = {1}({0}[i], i) - pid_list.append(current) - current.start() - - # Waiting until all scripts finish - for i in range(len(pid_list)): - pid = pid_list[i] - pid.join() +pid_list = [] +for i in range(len({0})): + if type({0}[i]) != list: + job = {0}[i] + jobname = job.replace(".cmd", '') + section = jobname.split('_')[-1] + + {2} + current = {1}({0}[i], i) + pid_list.append(current) + current.start() + +# Waiting until all scripts finish +for i in range(len(pid_list)): + pid = pid_list[i] + pid.join() """).format(jobs_list, thread, self._indent(self.build_machinefiles(), 8), '\n'.ljust(13)) - if footer: parallel_threads_launcher += self._indent(textwrap.dedent(""" - completed_filename = {0}[i].replace('.cmd', '_COMPLETED') - completed_path = os.path.join(os.getcwd(), completed_filename) - if os.path.exists(completed_path): - print datetime.now(), "The job ", pid.template," has been COMPLETED" - else: - print datetime.now(), "The job ", pid.template," has FAILED" - {1} - """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) + completed_filename = {0}[i].replace('.cmd', '_COMPLETED') + completed_path = os.path.join(os.getcwd(), completed_filename) + failed_filename = {0}[i].replace('.cmd', '_FAILED') + failed_path = os.path.join(os.getcwd(),failed_filename) + failed_wrapper = os.path.join(os.getcwd(),wrapper_id) + Failed = False + if os.path.exists(completed_path): + print datetime.now(), "The job ", pid.template," has been COMPLETED" + else: + open(failed_wrapper, 'w').close() + open(failed_path, 'w').close() + print datetime.now(), "The job ", pid.template," has FAILED" + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) return parallel_threads_launcher def build_parallel_threads_launcher_vertical_horizontal(self, jobs_list, thread, footer=True): parallel_threads_launcher = textwrap.dedent(""" - pid_list = [] - - for i in range(len({0})): - if type({0}[i]) != list: - job = {0}[i] - jobname = job.replace(".cmd", '') - section = jobname.split('_')[-1] - - {2} - current = {1}({0}[i], i) - pid_list.append(current) - current.start() - - # Waiting until all scripts finish - for i in range(len(pid_list)): - pid = pid_list[i] - pid.join() +pid_list = [] +for i in range(len({0})): + if type({0}[i]) != list: + job = {0}[i] + jobname = job.replace(".cmd", '') + section = jobname.split('_')[-1] + + {2} + current = {1}({0}[i], i) + pid_list.append(current) + current.start() + +# Waiting until all scripts finish +for i in range(len(pid_list)): + pid = pid_list[i] + pid.join() """).format(jobs_list, thread, self._indent(self.build_machinefiles(), 8), '\n'.ljust(13)) - if footer: parallel_threads_launcher += self._indent(textwrap.dedent(""" - completed_filename = {0}[i].replace('.cmd', '_COMPLETED') - completed_path = os.path.join(os.getcwd(), completed_filename) - if os.path.exists(completed_path): - print datetime.now(), "The job ", pid.template," has been COMPLETED" - else: - print datetime.now(), "The job ", pid.template," has FAILED" - {1} - """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) + completed_filename = {0}[i].replace('.cmd', '_COMPLETED') + completed_path = os.path.join(os.getcwd(), completed_filename) + failed_filename = {0}[i].replace('.cmd', '_FAILED') + failed_path = os.path.join(os.getcwd(),failed_filename) + failed_wrapper = os.path.join(os.getcwd(),wrapper_id) + Failed = False + if os.path.exists(completed_path): + print datetime.now(), "The job ", pid.template," has been COMPLETED" + else: + open(failed_wrapper, 'w').close() + open(failed_path, 'w').close() + print datetime.now(), "The job ", pid.template," has FAILED" + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) return parallel_threads_launcher # all should override -> abstract! @@ -444,35 +480,37 @@ class PythonVerticalHorizontalWrapperBuilder(PythonWrapperBuilder): class PythonHorizontalVerticalWrapperBuilder(PythonWrapperBuilder): def build_parallel_threads_launcher_horizontal_vertical(self, jobs_list, thread, footer=True): parallel_threads_launcher = textwrap.dedent(""" - pid_list = [] - - for i in range(len({0})): - if type({0}[i]) != list: - job = {0}[i] - jobname = job.replace(".cmd", '') - section = jobname.split('_')[-1] - - {2} - current = {1}({0}[i], i+self.id_run) - pid_list.append(current) - current.start() - - # Waiting until all scripts finish - for i in range(len(pid_list)): - pid = pid_list[i] - pid.join() +pid_list = [] +for i in range(len({0})): + if type({0}[i]) != list: + job = {0}[i] + jobname = job.replace(".cmd", '') + section = jobname.split('_')[-1] + + {2} + current = {1}({0}[i], i+self.id_run) + pid_list.append(current) + current.start() + +# Waiting until all scripts finish +for i in range(len(pid_list)): + pid = pid_list[i] + pid.join() """).format(jobs_list, thread, self._indent(self.build_machinefiles(), 8), '\n'.ljust(13)) if footer: parallel_threads_launcher += self._indent(textwrap.dedent(""" - completed_filename = {0}[i].replace('.cmd', '_COMPLETED') - completed_path = os.path.join(os.getcwd(), completed_filename) - if os.path.exists(completed_path): - print datetime.now(), "The job ", pid.template," has been COMPLETED" - else: - print datetime.now(), "The job ", pid.template," has FAILED" - {1} - """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) - + completed_filename = {0}[i].replace('.cmd', '_COMPLETED') + completed_path = os.path.join(os.getcwd(), completed_filename) + failed_filename = {0}[i].replace('.cmd', '_FAILED') + failed_path = os.path.join(os.getcwd(),failed_filename) + failed_wrapper = os.path.join(os.getcwd(),wrapper_id) + if os.path.exists(completed_path): + print datetime.now(), "The job ", pid.template," has been COMPLETED" + else: + open(failed_wrapper, 'w').close() + open(failed_path, 'w').close() + print datetime.now(), "The job ", pid.template," has FAILED" + """).format(jobs_list, self.exit_thread, '\n'.ljust(13)), 4) return parallel_threads_launcher def build_joblist_thread(self): return textwrap.dedent(""" @@ -602,23 +640,20 @@ class SrunWrapperBuilder(WrapperBuilder): def build_cores_list(self): return textwrap.dedent(""" - total_cores = {0} - jobs_resources = {1} - - processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) - - idx = 0 - all_cores = [] - while total_cores > 0: - if processors_per_node > 0: - processors_per_node -= 1 - total_cores -= 1 - all_cores.append(all_nodes[idx]) - else: - idx += 1 - processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) - +total_cores = {0} +jobs_resources = {1} +processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) +idx = 0 +all_cores = [] +while total_cores > 0: + if processors_per_node > 0: + processors_per_node -= 1 + total_cores -= 1 + all_cores.append(all_nodes[idx]) + else: + idx += 1 processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) +processors_per_node = int(jobs_resources['PROCESSORS_PER_NODE']) """).format(self.num_procs, str(self.jobs_resources), '\n'.ljust(13)) def build_machinefiles(self): @@ -629,28 +664,26 @@ class SrunWrapperBuilder(WrapperBuilder): def build_machinefiles_standard(self): return textwrap.dedent(""" - machines = str() - - cores = int(jobs_resources[section]['PROCESSORS']) - tasks = int(jobs_resources[section]['TASKS']) - nodes = int(ceil(int(cores)/float(tasks))) - if tasks < processors_per_node: - cores = tasks - - job_cores = cores - while nodes > 0: - while cores > 0: - if len(all_cores) > 0: - node = all_cores.pop(0) - if node: - machines += node +"_NEWLINE_" - cores -= 1 - for rest in range(processors_per_node-tasks): - if len(all_cores) > 0: - all_cores.pop(0) - nodes -= 1 + machines = str() + cores = int(jobs_resources[section]['PROCESSORS']) + tasks = int(jobs_resources[section]['TASKS']) + nodes = int(ceil(int(cores)/float(tasks))) if tasks < processors_per_node: - cores = job_cores + cores = tasks + job_cores = cores + while nodes > 0: + while cores > 0: + if len(all_cores) > 0: + node = all_cores.pop(0) + if node: + machines += node +"_NEWLINE_" + cores -= 1 + for rest in range(processors_per_node-tasks): + if len(all_cores) > 0: + all_cores.pop(0) + nodes -= 1 + if tasks < processors_per_node: + cores = job_cores """).format('\n'.ljust(13)) def _create_components_dict(self): diff --git a/docs/source/usage/wrappers.rst b/docs/source/usage/wrappers.rst index c961f1a19693275e2a7a68f9e0e1e9518fbc3adb..6f52f0b2c1ee295890ad87068c86e8b4ccf5c832 100644 --- a/docs/source/usage/wrappers.rst +++ b/docs/source/usage/wrappers.rst @@ -101,7 +101,7 @@ Additionally, jobs are grouped within the corresponding date, member and chunk h [wrapper] TYPE = vertical-mixed - JOBS_IN_WRAPPER = # REQUIRED + JOBS_IN_WRAPPER = # REQUIRED Horizontal wrapper @@ -157,7 +157,7 @@ Horizontal-vertical [wrapper] TYPE = horizontal-vertical MACHINEFILES = STANDARD - JOBS_IN_WRAPPER = SIM POST + JOBS_IN_WRAPPER = SIM&POST .. figure:: ../workflows/horizontal-vertical.png :name: wrapper_horizontal_vertical @@ -178,7 +178,7 @@ Vertical-horizontal [wrapper] TYPE = vertical-horizontal MACHINEFILES = STANDARD - JOBS_IN_WRAPPER = SIM POST + JOBS_IN_WRAPPER = SIM&POST .. figure:: ../workflows/vertical-horizontal.png :name: wrapper_vertical_horizontal @@ -234,7 +234,7 @@ Considering a very simple workflow with the configurations as follows: [wrapper] TYPE = vertical-mixed - JOBS_IN_WRAPPER = SIM POST + JOBS_IN_WRAPPER = SIM&POST .. figure:: ../workflows/wrapper.png @@ -250,7 +250,7 @@ Horizontal wrapper with remote dependencies [wrapper] TYPE = horizontal - JOBS_IN_WRAPPER = SIM POST + JOBS_IN_WRAPPER = SIM&POST .. figure:: ../workflows/horizontal_remote.png :name: horizontal_remote diff --git a/simple_test.py b/simple_test.py index 5fbbc0d55ea3ff44a7fbf30a1cc870bc922ea679..df1bee2d612a62e266aa19ec032bd4f248bfa981 100644 --- a/simple_test.py +++ b/simple_test.py @@ -33,7 +33,7 @@ # if job.platform_name is None: # job.platform_name = "marenostrum4" # # noinspection PyTypeChecker -# job.platform = submitter.platforms[job.platform_name.lower( +# job._platform = submitter.platforms[job.platform_name.lower( # )] # list_jobs = job_list.get_job_list() diff --git a/test/unit/test_job.py b/test/unit/test_job.py index 61967ae8b0b534d181047d6c4819ee3928e50f23..5248377d4ad07233ea70432c9a9b6e84dc31e958 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -43,7 +43,7 @@ class TestJob(TestCase): platform.serial_platform = 'serial-platform' self.job._platform = platform - self.job.processors = '1' + self.job.processors = 1 returned_platform = self.job.platform @@ -51,7 +51,7 @@ class TestJob(TestCase): def test_set_platform(self): dummy_platform = Platform('whatever', 'rand-name', FakeBasicConfig) - self.assertNotEquals(dummy_platform, self.job._platform) + self.assertNotEquals(dummy_platform, self.job.platform) self.job.platform = dummy_platform @@ -89,7 +89,7 @@ class TestJob(TestCase): dummy_platform.serial_platform = dummy_serial_platform dummy_platform.queue = parallel_queue - self.job.platform = dummy_platform + self.job._platform = dummy_platform self.job.processors = '1' self.assertIsNone(self.job._queue) diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index 42be51493c8f54373374cb07c585aec58f837d15..a695c95bbec9dd43935701b4058766a334331ea4 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -15,7 +15,7 @@ class TestJobPackage(TestCase): self.platform = Mock() self.jobs = [Job('dummy1', 0, Status.READY, 0), Job('dummy2', 0, Status.READY, 0)] - self.jobs[0].platform = self.jobs[1].platform = self.platform + self.jobs[0]._platform = self.jobs[1]._platform = self.platform self.job_package = JobPackageSimple(self.jobs) def test_job_package_default_init(self): @@ -23,14 +23,14 @@ class TestJobPackage(TestCase): JobPackageSimple([]) def test_job_package_different_platforms_init(self): - self.jobs[0].platform = Mock() - self.jobs[1].platform = Mock() + self.jobs[0]._platform = Mock() + self.jobs[1]._platform = Mock() with self.assertRaises(Exception): JobPackageSimple(this.jobs) def test_job_package_none_platforms_init(self): - self.jobs[0].platform = None - self.jobs[1].platform = None + self.jobs[0]._platform = None + self.jobs[1]._platform = None with self.assertRaises(Exception): JobPackageSimple(this.jobs) @@ -41,7 +41,7 @@ class TestJobPackage(TestCase): self.assertEquals(self.jobs, self.job_package.jobs) def test_job_package_platform_getter(self): - self.assertEquals(self.platform.serial_platform, self.job_package.platform) + self.assertEquals(self.platform, self.job_package.platform) def test_job_package_submission(self): # arrange diff --git a/test/unit/test_wrappers.py b/test/unit/test_wrappers.py index ebb9c196084d50d92716b5ab059872009e48385e..aeb192f9e1ebd2d5b4f50c6208be3b48557e3d19 100644 --- a/test/unit/test_wrappers.py +++ b/test/unit/test_wrappers.py @@ -151,13 +151,13 @@ class TestWrappers(TestCase): def setUp(self): self.experiment_id = 'random-id' self.config = FakeBasicConfig - self.platform = Mock() + self._platform = Mock() self.job_list = JobList(self.experiment_id, self.config, ConfigParserFactory(), JobListPersistenceDb('.', '.')) self.parser_mock = Mock(spec='SafeConfigParser') - self.platform.max_waiting_jobs = 100 - self.platform.total_jobs = 100 + self._platform.max_waiting_jobs = 100 + self._platform.total_jobs = 100 self.config.get_wrapper_type = Mock(return_value='vertical') self.config.get_wrapper_crossdate = Mock(return_value=False) self.config.get_remote_dependencies = Mock(return_value=False) @@ -167,7 +167,7 @@ class TestWrappers(TestCase): self.config.get_wrapper_policy = Mock(return_value='flexible') self.job_packager = JobPackager( - self.config, self.platform, self.job_list) + self.config, self._platform, self.job_list) ### ONE SECTION WRAPPER ### def test_returned_packages(self): @@ -1311,7 +1311,7 @@ class TestWrappers(TestCase): job.packed = False job.hold = False job.wallclock = total_wallclock - job.platform = self.platform + job.platform = self._platform job.date = date job.member = member