diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6abc3cc4566f9dfa6362f6dc2814add7542dea94..bb243e0d5c16a2b6c3e50b3f344cc161588f01de 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -2386,9 +2386,6 @@ class Autosubmit: hold=hold) # Jobs that are being retrieved in batch. Right now, only available for slurm platforms. if not inspect and len(valid_packages_to_submit) > 0: - for package in (package for package in valid_packages_to_submit): - for job in (job for job in package.jobs): - job._clean_runtime_parameters() job_list.save() save_2 = False if platform.type.lower() in [ "slurm" , "pjm" ] and not inspect and not only_wrappers: @@ -2397,9 +2394,6 @@ class Autosubmit: failed_packages, error_message="", hold=hold) if not inspect and len(valid_packages_to_submit) > 0: - for package in (package for package in valid_packages_to_submit): - for job in (job for job in package.jobs): - job._clean_runtime_parameters() job_list.save() # Save wrappers(jobs that has the same id) to be visualized and checked in other parts of the code job_list.save_wrappers(valid_packages_to_submit, failed_packages, as_conf, packages_persistence, @@ -3414,7 +3408,6 @@ class Autosubmit: try: for job in job_list.get_job_list(): job_parameters = job.update_parameters(as_conf, {}) - job._clean_runtime_parameters() for key, value in job_parameters.items(): jobs_parameters["JOBS"+"."+job.section+"."+key] = value except: diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py index 9e5662af6943de368c61428c1125e25bdfb642c1..8df415c94682435e781588f939a850301bf784f3 100644 --- a/autosubmit/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit/history/database_managers/experiment_history_db_manager.py @@ -234,6 +234,9 @@ class ExperimentHistoryDbManager(DatabaseManager): statement = self.get_built_select_statement("job_data", "last=1 and job_name=? ORDER BY counter DESC") arguments = (job_name,) job_data_rows_last = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) + if not job_data_rows_last: # if previous job didn't finished but a new create has been made + statement = self.get_built_select_statement("job_data", "last=0 and job_name=? ORDER BY counter DESC") + job_data_rows_last = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) return [Models.JobDataRow(*row) for row in job_data_rows_last] def get_job_data_dcs_last_by_run_id(self, run_id): diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index 7f6a496487edbd0d1b891f375fa3594326851fb9..ee0558edd7e3847440e9aa3432a817157d45cad4 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -26,12 +26,13 @@ from .data_classes.job_data import JobData from .data_classes.experiment_run import ExperimentRun from .platform_monitor.slurm_monitor import SlurmMonitor from .internal_logging import Logging +from log.log import Log from autosubmitconfigparser.config.basicconfig import BasicConfig SECONDS_WAIT_PLATFORM = 60 class ExperimentHistory: - def __init__(self, expid, jobdata_dir_path=DEFAULT_JOBDATA_DIR, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR): + def __init__(self, expid, jobdata_dir_path=DEFAULT_JOBDATA_DIR, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR): self.expid = expid BasicConfig.read() self._log = Logging(expid, BasicConfig.HISTORICAL_LOG_DIR) @@ -41,39 +42,42 @@ class ExperimentHistory: self.manager = ExperimentHistoryDbManager(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) - self.manager = None + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + self.manager = None def initialize_database(self): try: - self.manager.initialize() + self.manager.initialize() except Exception as exp: self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + self.manager = None - + def is_header_ready(self): - if self.manager: - return self.manager.is_header_ready_db_version() + if self.manager: + return self.manager.is_header_ready_db_version() return False - - def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", + + def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""): try: next_counter = self._get_next_counter_by_job_name(job_name) current_experiment_run = self.manager.get_experiment_run_dc_with_max_id() - job_data_dc = JobData(_id=0, - counter=next_counter, - job_name=job_name, - submit=submit, - status=status, - rowtype=self._get_defined_rowtype(wrapper_code), - ncpus=ncpus, - wallclock=wallclock, - qos=self._get_defined_queue_name(wrapper_queue, wrapper_code, qos), + job_data_dc = JobData(_id=0, + counter=next_counter, + job_name=job_name, + submit=submit, + status=status, + rowtype=self._get_defined_rowtype(wrapper_code), + ncpus=ncpus, + wallclock=wallclock, + qos=self._get_defined_queue_name(wrapper_queue, wrapper_code, qos), date=date, member=member, section=section, - chunk=chunk, + chunk=chunk, platform=platform, job_id=job_id, children=children, @@ -81,25 +85,27 @@ class ExperimentHistory: return self.manager.register_submitted_job_data_dc(job_data_dc) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + return None - + def write_start_time(self, job_name, start=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""): try: job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, + job_data_dc_last = self.write_submit_time(job_name=job_name, + status=status, + ncpus=ncpus, + wallclock=wallclock, + qos=qos, + date=date, + member=member, + section=section, + chunk=chunk, + platform=platform, + job_id=job_id, + wrapper_queue=wrapper_queue, wrapper_code=wrapper_code) self._log.log("write_start_time {0} start not found.".format(job_name)) job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) @@ -114,26 +120,28 @@ class ExperimentHistory: return self.manager.update_job_data_dc_by_id(job_data_dc_last) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) - - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", - member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", + member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, wrapper_queue=None, wrapper_code=None, children=""): try: job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, - wrapper_code=wrapper_code, + job_data_dc_last = self.write_submit_time(job_name=job_name, + status=status, + ncpus=ncpus, + wallclock=wallclock, + qos=qos, + date=date, + member=member, + section=section, + chunk=chunk, + platform=platform, + job_id=job_id, + wrapper_queue=wrapper_queue, + wrapper_code=wrapper_code, children=children) self._log.log("write_finish_time {0} submit not found.".format(job_name)) job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) @@ -141,26 +149,28 @@ class ExperimentHistory: raise Exception("Job {0} has not been found in the database.".format(job_name)) job_data_dc_last.finish = finish if finish > 0 else int(time()) job_data_dc_last.status = status - job_data_dc_last.job_id = job_id + job_data_dc_last.job_id = job_id job_data_dc_last.rowstatus = Models.RowStatus.PENDING_PROCESS job_data_dc_last.out = out_file if out_file else "" job_data_dc_last.err = err_file if err_file else "" return self.manager.update_job_data_dc_by_id(job_data_dc_last) except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def write_platform_data_after_finish(self, job_data_dc, platform_obj): - """ + """ Call it in a thread. """ try: sleep(SECONDS_WAIT_PLATFORM) - ssh_output = platform_obj.check_job_energy(job_data_dc.job_id) + ssh_output = platform_obj.check_job_energy(job_data_dc.job_id) slurm_monitor = SlurmMonitor(ssh_output) self._verify_slurm_monitor(slurm_monitor, job_data_dc) job_data_dcs_in_wrapper = self.manager.get_job_data_dcs_last_by_wrapper_code(job_data_dc.wrapper_code) job_data_dcs_in_wrapper = sorted([job for job in job_data_dcs_in_wrapper if job.status == "COMPLETED"], key=lambda x: x._id) - job_data_dcs_to_update = [] + job_data_dcs_to_update = [] if len(job_data_dcs_in_wrapper) > 0: info_handler = PlatformInformationHandler(StraightWrapperAssociationStrategy(self._historiclog_dir_path)) job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) @@ -172,21 +182,27 @@ class ExperimentHistory: job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) else: info_handler = PlatformInformationHandler(SingleAssociationStrategy(self._historiclog_dir_path)) - job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) - return self.manager.update_list_job_data_dc_by_each_id(job_data_dcs_to_update) + job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) + return self.manager.update_list_job_data_dc_by_each_id(job_data_dcs_to_update) except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + def _verify_slurm_monitor(self, slurm_monitor, job_data_dc): try: if slurm_monitor.header.status not in ["COMPLETED", "FAILED"]: - self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input), + self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input), "Slurm status {0} is not COMPLETED nor FAILED for ID {1}.\n".format(slurm_monitor.header.status, slurm_monitor.header.name)) + Log.debug(f'Historical Database error: Slurm status {slurm_monitor.header.status} is not COMPLETED nor FAILED for ID {slurm_monitor.header.name}.') if not slurm_monitor.steps_plus_extern_approximate_header_energy(): self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input), "Steps + extern != total energy for ID {0}. Number of steps {1}.\n".format(slurm_monitor.header.name, slurm_monitor.step_count)) + Log.debug(f'Historical Database error: Steps + extern != total energy for ID {slurm_monitor.header.name}. Number of steps {slurm_monitor.step_count}.') except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + def process_status_changes(self, job_list=None, chunk_unit="NA", chunk_size=0, current_config="",create=False): """ Detect status differences between job_list and current job_data rows, and update. Creates a new run if necessary. """ @@ -206,7 +222,9 @@ class ExperimentHistory: return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) - + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def _get_built_list_of_changes(self, job_list): """ Return: List of (current timestamp, current datetime str, status, rowstatus, id in job_data). One tuple per change. """ job_data_dcs = self.detect_changes_in_job_list(job_list) @@ -215,11 +233,13 @@ class ExperimentHistory: def process_job_list_changes_to_experiment_totals(self, job_list=None): """ Updates current experiment_run row with totals calculated from job_list. """ try: - current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() + current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) - + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def should_we_create_a_new_run(self, job_list, changes_count, current_experiment_run_dc, new_chunk_unit, new_chunk_size,create=False): if create: return True @@ -229,7 +249,7 @@ class ExperimentHistory: if changes_count > int(self._get_date_member_completed_count(job_list)): return True return self._chunk_config_has_changed(current_experiment_run_dc, new_chunk_unit, new_chunk_size) - + def _chunk_config_has_changed(self, current_exp_run_dc, new_chunk_unit, new_chunk_size): if not current_exp_run_dc: return True @@ -264,19 +284,19 @@ class ExperimentHistory: def _create_new_experiment_run_dc_with_counts(self, chunk_unit, chunk_size, current_config="", job_list=None): """ Create new experiment_run row and return the new Models.ExperimentRun data class from database. """ status_counts = self.get_status_counts_from_job_list(job_list) - experiment_run_dc = ExperimentRun(0, - chunk_unit=chunk_unit, - chunk_size=chunk_size, - metadata=current_config, + experiment_run_dc = ExperimentRun(0, + chunk_unit=chunk_unit, + chunk_size=chunk_size, + metadata=current_config, start=int(time()), - completed=status_counts[HUtils.SupportedStatus.COMPLETED], - total=status_counts["TOTAL"], - failed=status_counts[HUtils.SupportedStatus.FAILED], - queuing=status_counts[HUtils.SupportedStatus.QUEUING], - running=status_counts[HUtils.SupportedStatus.RUNNING], - submitted=status_counts[HUtils.SupportedStatus.SUBMITTED], + completed=status_counts[HUtils.SupportedStatus.COMPLETED], + total=status_counts["TOTAL"], + failed=status_counts[HUtils.SupportedStatus.FAILED], + queuing=status_counts[HUtils.SupportedStatus.QUEUING], + running=status_counts[HUtils.SupportedStatus.RUNNING], + submitted=status_counts[HUtils.SupportedStatus.SUBMITTED], suspended=status_counts[HUtils.SupportedStatus.SUSPENDED]) - return self.manager.register_experiment_run_dc(experiment_run_dc) + return self.manager.register_experiment_run_dc(experiment_run_dc) def detect_changes_in_job_list(self, job_list): """ Detect changes in job_list compared to the current contents of job_data table. Returns a list of JobData data classes where the status of each item is the new status.""" @@ -292,12 +312,12 @@ class ExperimentHistory: differences.append(job_dc) return differences - def _get_defined_rowtype(self, code): + def _get_defined_rowtype(self, code): if code: return code else: return Models.RowType.NORMAL - + def _get_defined_queue_name(self, wrapper_queue, wrapper_code, qos): if wrapper_code and wrapper_code > 2 and wrapper_queue is not None and len(str(wrapper_queue)) > 0: return wrapper_queue @@ -314,12 +334,12 @@ class ExperimentHistory: def _get_date_member_completed_count(self, job_list): """ Each item in the job_list must have attributes: date, member, status_str. """ - job_list = job_list if job_list else [] + job_list = job_list if job_list else [] return sum(1 for job in job_list if job.date is not None and job.member is not None and job.status_str == HUtils.SupportedStatus.COMPLETED) - + def get_status_counts_from_job_list(self, job_list): - """ - Return dict with keys COMPLETED, FAILED, QUEUING, SUBMITTED, RUNNING, SUSPENDED, TOTAL. + """ + Return dict with keys COMPLETED, FAILED, QUEUING, SUBMITTED, RUNNING, SUSPENDED, TOTAL. """ result = { HUtils.SupportedStatus.COMPLETED: 0, @@ -329,14 +349,13 @@ class ExperimentHistory: HUtils.SupportedStatus.RUNNING: 0, HUtils.SupportedStatus.SUSPENDED: 0, "TOTAL": 0 - } + } if not job_list: job_list = [] - - for job in job_list: - if job.status_str in result: + + for job in job_list: + if job.status_str in result: result[job.status_str] += 1 result["TOTAL"] = len(job_list) return result - \ No newline at end of file diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index b697211a7a68a12ab86ab1ef06adf3b1b5d5ab23..11dcc451e689362c8c34d5585bef88684d0a52da 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -256,17 +256,6 @@ class Job(object): self._memory = '' self._memory_per_task = '' - def _clean_runtime_parameters(self): - # hetjobs - self.het = None - self.parameters = None - self._tasks = None - self._nodes = None - self.default_parameters = None - self._threads = None - self._processors = None - self._memory = None - self._memory_per_task = None @property @autosubmit_parameter(name='tasktype') def section(self): @@ -1709,6 +1698,9 @@ class Job(object): if as_conf.get_project_type() != "none": parameters['EXTENDED_HEADER'] = self.read_header_tailer_script(self.ext_header_path, as_conf, True) parameters['EXTENDED_TAILER'] = self.read_header_tailer_script(self.ext_tailer_path, as_conf, False) + else: # If not, this show a warning when it tries to check the script + parameters['EXTENDED_HEADER'] = "" + parameters['EXTENDED_TAILER'] = "" parameters['CURRENT_QUEUE'] = self.queue parameters['RESERVATION'] = self.reservation parameters['CURRENT_EC_QUEUE'] = self.ec_queue @@ -2158,6 +2150,7 @@ class Job(object): variables_tmp = [variable[1:-1] for variable in variables_tmp] variables_tmp = [variable for variable in variables_tmp if variable not in self.default_parameters] variables.extend(variables_tmp) + out = set(parameters).issuperset(set(variables)) # Check if the variables in the templates are defined in the configurations if not out: diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 86e790791018773792c71f983cee5bec512d721c..57376e6027ef3e468f258434ec8322965dec3d1d 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -96,22 +96,16 @@ class JobPackageBase(object): @threaded def check_scripts(self,jobs,configuration, parameters,only_generate,hold): for job in jobs: - if str(job.check).lower() == str(Job.CHECK_ON_SUBMISSION).lower(): - if only_generate: - exit_ = True - break - if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - lock.acquire() - if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: - raise AutosubmitCritical( - "Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format( - job.name), 7014) - lock.release() - if not job.check_script(configuration, parameters, show_logs=job.check_warnings): - Log.warning("Script {0} check failed", job.name) - Log.warning("On submission script has some empty variables") - else: - Log.result("Script {0} OK", job.name) + if only_generate and not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + break + else: + if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: + raise AutosubmitCritical(f"Job script:{job.file} does not exists",7014) + if not job.check_script(configuration, parameters, show_logs=job.check_warnings): + Log.warning("Script {0} check failed", job.name) + Log.warning("On submission script has some empty variables") + else: + Log.result("Script {0} OK", job.name) # looking for directives on jobs self._custom_directives = self._custom_directives | set(job.custom_directives) @threaded @@ -123,6 +117,7 @@ class JobPackageBase(object): pass + def submit(self, configuration, parameters,only_generate=False,hold=False): """ :param hold: @@ -148,21 +143,17 @@ class JobPackageBase(object): try: if len(self.jobs) < thread_number: for job in self.jobs: - if job.check == Job.CHECK_ON_SUBMISSION.lower(): - if only_generate: - exit_=True - break - if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: - raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) - if not job.check_script(configuration, parameters,show_logs=job.check_warnings): - Log.warning("Script {0} check failed",job.name) - Log.warning("On submission script has some empty variables") - else: - Log.result("Script {0} OK",job.name) - job.update_parameters(configuration, parameters) - # Looking for special variables - + if only_generate and not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + exit_=True + break + if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: + raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) + if not job.check_script(configuration, parameters,show_logs=job.check_warnings): + Log.warning("Script {0} check failed",job.name) + Log.warning("On submission script has some empty variables") + else: + Log.result("Script {0} OK",job.name) # looking for directives on jobs self._custom_directives = self._custom_directives | set(job.custom_directives) else: