diff --git a/CHANGELOG b/CHANGELOG index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..5e69a0f3d745eed4f05cca1ec60132d489c95f97 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -0,0 +1,29 @@ +4.1.2 - Bug fixes +================= +- Fixed issues with version. +- Fixed issues with the duplication of jobs when using the heterogeneous option. +- Fixed some error messages. +- Fixed issues with monitoring non-owned experiments. + +4.1.1 - Workflow optimizations and bug fixes +========================================== + +Autosubmit supports much larger workflows in this version and has improved performance and memory usage. We have also fixed several bugs and added new features. + +- Improved the performance and memory usage of the workflow generation process. + - Improved the performance and memory usage of the jobs generation process. + - Improved the performance and memory usage of the dependency generation process. +- Improved the performance and memory usage of the workflow visualization process. +- Added a new filter to setstatus ( -ftcs ) to filter by split. +- Added -no-requeue to avoid requeueing jobs. +- A mechanism was added to detect duplicate jobs. +- Fixed multiple issues with the splits usage. +- Fixed multiple issues with Totaljobs. +- Reworked the deadlock detection mechanism. +- Changed multiple debug messages to make them more straightforward. +- Changed the load/save pkl procedure +- Fixed issues with check command and additional files regex. +- Added the previous keyword. +- Fixed an issue with the historical db. +- Fixed an issue with historical db logs. + diff --git a/VERSION b/VERSION index ee74734aa2258df77aa09402d55798a1e2e55212..cd9b8f559effd5c55cbea68fc5eb360213a32352 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.0 +4.1.2 \ No newline at end of file diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 6abc3cc4566f9dfa6362f6dc2814add7542dea94..53e7f528a727b98ece5adb0f5eae8f128f0b4b17 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -157,18 +157,24 @@ class Autosubmit: exit = False + @staticmethod + def environ_init(): + """Initialise AS environment.""" + # Python output buffering delays appearance of stdout and stderr + # when output is not directed to a terminal + os.environ['PYTHONUNBUFFERED'] = 'true' @staticmethod def parse_args(): """ Parse arguments given to an executable and start execution of command given """ - + Autosubmit.environ_init() try: BasicConfig.read() parser = MyParser( description='Main executable for autosubmit. ') - parser.add_argument('-v', '--version', action='version', - version=Autosubmit.autosubmit_version) + parser.add_argument('-v', '--version', dest='version', action='store_true') + parser.add_argument('-lf', '--logfile', choices=('NO_LOG', 'INFO', 'WARNING', 'DEBUG'), default='DEBUG', type=str, help="sets file's log level.") @@ -644,18 +650,14 @@ class Autosubmit: help='Read job files generated by the inspect subcommand.') subparser.add_argument('ID', metavar='ID', help='An ID of a Workflow (eg a000) or a Job (eg a000_20220401_fc0_1_1_APPLICATION).') - args = parser.parse_args() - - if args.command is None: + args, unknown = parser.parse_known_args() + if args.version: + Log.info(Autosubmit.autosubmit_version) + return 0 + if unknown or args.command is None: parser.print_help() - parser.exit() - - except Exception as e: - if type(e) is SystemExit: # todo check - # Version keyword force an exception in parse arg due and os_exit(0) but the program is successfully finished - if "0" in str(e): - print(Autosubmit.autosubmit_version) - return 0 + return 1 + except BaseException as e: raise AutosubmitCritical( "Incorrect arguments for this command", 7011) @@ -828,6 +830,8 @@ class Autosubmit: os.mkdir(aslogs_path) if owner: os.chmod(tmp_path, 0o775) + with suppress(PermissionError, FileNotFoundError, Exception): # for -txt option + os.chmod(f'{exp_path}/status', 0o775) Log.set_file(os.path.join(aslogs_path, args.command + '.log'), "out", log_level) Log.set_file(os.path.join(aslogs_path, args.command + '_err.log'), "err") @@ -2386,9 +2390,6 @@ class Autosubmit: hold=hold) # Jobs that are being retrieved in batch. Right now, only available for slurm platforms. if not inspect and len(valid_packages_to_submit) > 0: - for package in (package for package in valid_packages_to_submit): - for job in (job for job in package.jobs): - job._clean_runtime_parameters() job_list.save() save_2 = False if platform.type.lower() in [ "slurm" , "pjm" ] and not inspect and not only_wrappers: @@ -2397,9 +2398,6 @@ class Autosubmit: failed_packages, error_message="", hold=hold) if not inspect and len(valid_packages_to_submit) > 0: - for package in (package for package in valid_packages_to_submit): - for job in (job for job in package.jobs): - job._clean_runtime_parameters() job_list.save() # Save wrappers(jobs that has the same id) to be visualized and checked in other parts of the code job_list.save_wrappers(valid_packages_to_submit, failed_packages, as_conf, packages_persistence, @@ -2482,12 +2480,10 @@ class Autosubmit: except AutosubmitCritical as e: raise except BaseException as e: - raise AutosubmitCritical("Error while checking the configuration files or loading the job_list", 7040, - str(e)) + raise finally: if profile: profiler.stop() - try: jobs = [] if not isinstance(job_list, type([])): @@ -3414,7 +3410,6 @@ class Autosubmit: try: for job in job_list.get_job_list(): job_parameters = job.update_parameters(as_conf, {}) - job._clean_runtime_parameters() for key, value in job_parameters.items(): jobs_parameters["JOBS"+"."+job.section+"."+key] = value except: @@ -4802,7 +4797,7 @@ class Autosubmit: e.trace = traceback.format_exc() raise AutosubmitCritical(e.message, e.code, e.trace) except BaseException as e: - raise AutosubmitCritical(str(e), 7070) + raise finally: if profile: profiler.stop() diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py index 9e5662af6943de368c61428c1125e25bdfb642c1..8df415c94682435e781588f939a850301bf784f3 100644 --- a/autosubmit/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit/history/database_managers/experiment_history_db_manager.py @@ -234,6 +234,9 @@ class ExperimentHistoryDbManager(DatabaseManager): statement = self.get_built_select_statement("job_data", "last=1 and job_name=? ORDER BY counter DESC") arguments = (job_name,) job_data_rows_last = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) + if not job_data_rows_last: # if previous job didn't finished but a new create has been made + statement = self.get_built_select_statement("job_data", "last=0 and job_name=? ORDER BY counter DESC") + job_data_rows_last = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) return [Models.JobDataRow(*row) for row in job_data_rows_last] def get_job_data_dcs_last_by_run_id(self, run_id): diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index 7f6a496487edbd0d1b891f375fa3594326851fb9..ee0558edd7e3847440e9aa3432a817157d45cad4 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -26,12 +26,13 @@ from .data_classes.job_data import JobData from .data_classes.experiment_run import ExperimentRun from .platform_monitor.slurm_monitor import SlurmMonitor from .internal_logging import Logging +from log.log import Log from autosubmitconfigparser.config.basicconfig import BasicConfig SECONDS_WAIT_PLATFORM = 60 class ExperimentHistory: - def __init__(self, expid, jobdata_dir_path=DEFAULT_JOBDATA_DIR, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR): + def __init__(self, expid, jobdata_dir_path=DEFAULT_JOBDATA_DIR, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR): self.expid = expid BasicConfig.read() self._log = Logging(expid, BasicConfig.HISTORICAL_LOG_DIR) @@ -41,39 +42,42 @@ class ExperimentHistory: self.manager = ExperimentHistoryDbManager(self.expid, jobdata_dir_path=BasicConfig.JOBDATA_DIR) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) - self.manager = None + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + self.manager = None def initialize_database(self): try: - self.manager.initialize() + self.manager.initialize() except Exception as exp: self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + self.manager = None - + def is_header_ready(self): - if self.manager: - return self.manager.is_header_ready_db_version() + if self.manager: + return self.manager.is_header_ready_db_version() return False - - def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", + + def write_submit_time(self, job_name, submit=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""): try: next_counter = self._get_next_counter_by_job_name(job_name) current_experiment_run = self.manager.get_experiment_run_dc_with_max_id() - job_data_dc = JobData(_id=0, - counter=next_counter, - job_name=job_name, - submit=submit, - status=status, - rowtype=self._get_defined_rowtype(wrapper_code), - ncpus=ncpus, - wallclock=wallclock, - qos=self._get_defined_queue_name(wrapper_queue, wrapper_code, qos), + job_data_dc = JobData(_id=0, + counter=next_counter, + job_name=job_name, + submit=submit, + status=status, + rowtype=self._get_defined_rowtype(wrapper_code), + ncpus=ncpus, + wallclock=wallclock, + qos=self._get_defined_queue_name(wrapper_queue, wrapper_code, qos), date=date, member=member, section=section, - chunk=chunk, + chunk=chunk, platform=platform, job_id=job_id, children=children, @@ -81,25 +85,27 @@ class ExperimentHistory: return self.manager.register_submitted_job_data_dc(job_data_dc) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + return None - + def write_start_time(self, job_name, start=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", member="", section="", chunk=0, platform="NA", job_id=0, wrapper_queue=None, wrapper_code=None, children=""): try: job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, + job_data_dc_last = self.write_submit_time(job_name=job_name, + status=status, + ncpus=ncpus, + wallclock=wallclock, + qos=qos, + date=date, + member=member, + section=section, + chunk=chunk, + platform=platform, + job_id=job_id, + wrapper_queue=wrapper_queue, wrapper_code=wrapper_code) self._log.log("write_start_time {0} start not found.".format(job_name)) job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) @@ -114,26 +120,28 @@ class ExperimentHistory: return self.manager.update_job_data_dc_by_id(job_data_dc_last) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) - - def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", - member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + + def write_finish_time(self, job_name, finish=0, status="UNKNOWN", ncpus=0, wallclock="00:00", qos="debug", date="", + member="", section="", chunk=0, platform="NA", job_id=0, out_file=None, err_file=None, wrapper_queue=None, wrapper_code=None, children=""): try: job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) if not job_data_dc_last: - job_data_dc_last = self.write_submit_time(job_name=job_name, - status=status, - ncpus=ncpus, - wallclock=wallclock, - qos=qos, - date=date, - member=member, - section=section, - chunk=chunk, - platform=platform, - job_id=job_id, - wrapper_queue=wrapper_queue, - wrapper_code=wrapper_code, + job_data_dc_last = self.write_submit_time(job_name=job_name, + status=status, + ncpus=ncpus, + wallclock=wallclock, + qos=qos, + date=date, + member=member, + section=section, + chunk=chunk, + platform=platform, + job_id=job_id, + wrapper_queue=wrapper_queue, + wrapper_code=wrapper_code, children=children) self._log.log("write_finish_time {0} submit not found.".format(job_name)) job_data_dc_last = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name) @@ -141,26 +149,28 @@ class ExperimentHistory: raise Exception("Job {0} has not been found in the database.".format(job_name)) job_data_dc_last.finish = finish if finish > 0 else int(time()) job_data_dc_last.status = status - job_data_dc_last.job_id = job_id + job_data_dc_last.job_id = job_id job_data_dc_last.rowstatus = Models.RowStatus.PENDING_PROCESS job_data_dc_last.out = out_file if out_file else "" job_data_dc_last.err = err_file if err_file else "" return self.manager.update_job_data_dc_by_id(job_data_dc_last) except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) - + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def write_platform_data_after_finish(self, job_data_dc, platform_obj): - """ + """ Call it in a thread. """ try: sleep(SECONDS_WAIT_PLATFORM) - ssh_output = platform_obj.check_job_energy(job_data_dc.job_id) + ssh_output = platform_obj.check_job_energy(job_data_dc.job_id) slurm_monitor = SlurmMonitor(ssh_output) self._verify_slurm_monitor(slurm_monitor, job_data_dc) job_data_dcs_in_wrapper = self.manager.get_job_data_dcs_last_by_wrapper_code(job_data_dc.wrapper_code) job_data_dcs_in_wrapper = sorted([job for job in job_data_dcs_in_wrapper if job.status == "COMPLETED"], key=lambda x: x._id) - job_data_dcs_to_update = [] + job_data_dcs_to_update = [] if len(job_data_dcs_in_wrapper) > 0: info_handler = PlatformInformationHandler(StraightWrapperAssociationStrategy(self._historiclog_dir_path)) job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) @@ -172,21 +182,27 @@ class ExperimentHistory: job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) else: info_handler = PlatformInformationHandler(SingleAssociationStrategy(self._historiclog_dir_path)) - job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) - return self.manager.update_list_job_data_dc_by_each_id(job_data_dcs_to_update) + job_data_dcs_to_update = info_handler.execute_distribution(job_data_dc, job_data_dcs_in_wrapper, slurm_monitor) + return self.manager.update_list_job_data_dc_by_each_id(job_data_dcs_to_update) except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + def _verify_slurm_monitor(self, slurm_monitor, job_data_dc): try: if slurm_monitor.header.status not in ["COMPLETED", "FAILED"]: - self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input), + self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input), "Slurm status {0} is not COMPLETED nor FAILED for ID {1}.\n".format(slurm_monitor.header.status, slurm_monitor.header.name)) + Log.debug(f'Historical Database error: Slurm status {slurm_monitor.header.status} is not COMPLETED nor FAILED for ID {slurm_monitor.header.name}.') if not slurm_monitor.steps_plus_extern_approximate_header_energy(): self._log.log("Assertion Error on job {0} with ssh_output {1}".format(job_data_dc.job_name, slurm_monitor.original_input), "Steps + extern != total energy for ID {0}. Number of steps {1}.\n".format(slurm_monitor.header.name, slurm_monitor.step_count)) + Log.debug(f'Historical Database error: Steps + extern != total energy for ID {slurm_monitor.header.name}. Number of steps {slurm_monitor.step_count}.') except Exception as exp: - self._log.log(str(exp), traceback.format_exc()) + self._log.log(str(exp), traceback.format_exc()) + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + def process_status_changes(self, job_list=None, chunk_unit="NA", chunk_size=0, current_config="",create=False): """ Detect status differences between job_list and current job_data rows, and update. Creates a new run if necessary. """ @@ -206,7 +222,9 @@ class ExperimentHistory: return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) - + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def _get_built_list_of_changes(self, job_list): """ Return: List of (current timestamp, current datetime str, status, rowstatus, id in job_data). One tuple per change. """ job_data_dcs = self.detect_changes_in_job_list(job_list) @@ -215,11 +233,13 @@ class ExperimentHistory: def process_job_list_changes_to_experiment_totals(self, job_list=None): """ Updates current experiment_run row with totals calculated from job_list. """ try: - current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() + current_experiment_run_dc = self.manager.get_experiment_run_dc_with_max_id() return self.update_counts_on_experiment_run_dc(current_experiment_run_dc, job_list) except Exception as exp: self._log.log(str(exp), traceback.format_exc()) - + Log.debug(f'Historical Database error: {str(exp)} {traceback.format_exc()}') + + def should_we_create_a_new_run(self, job_list, changes_count, current_experiment_run_dc, new_chunk_unit, new_chunk_size,create=False): if create: return True @@ -229,7 +249,7 @@ class ExperimentHistory: if changes_count > int(self._get_date_member_completed_count(job_list)): return True return self._chunk_config_has_changed(current_experiment_run_dc, new_chunk_unit, new_chunk_size) - + def _chunk_config_has_changed(self, current_exp_run_dc, new_chunk_unit, new_chunk_size): if not current_exp_run_dc: return True @@ -264,19 +284,19 @@ class ExperimentHistory: def _create_new_experiment_run_dc_with_counts(self, chunk_unit, chunk_size, current_config="", job_list=None): """ Create new experiment_run row and return the new Models.ExperimentRun data class from database. """ status_counts = self.get_status_counts_from_job_list(job_list) - experiment_run_dc = ExperimentRun(0, - chunk_unit=chunk_unit, - chunk_size=chunk_size, - metadata=current_config, + experiment_run_dc = ExperimentRun(0, + chunk_unit=chunk_unit, + chunk_size=chunk_size, + metadata=current_config, start=int(time()), - completed=status_counts[HUtils.SupportedStatus.COMPLETED], - total=status_counts["TOTAL"], - failed=status_counts[HUtils.SupportedStatus.FAILED], - queuing=status_counts[HUtils.SupportedStatus.QUEUING], - running=status_counts[HUtils.SupportedStatus.RUNNING], - submitted=status_counts[HUtils.SupportedStatus.SUBMITTED], + completed=status_counts[HUtils.SupportedStatus.COMPLETED], + total=status_counts["TOTAL"], + failed=status_counts[HUtils.SupportedStatus.FAILED], + queuing=status_counts[HUtils.SupportedStatus.QUEUING], + running=status_counts[HUtils.SupportedStatus.RUNNING], + submitted=status_counts[HUtils.SupportedStatus.SUBMITTED], suspended=status_counts[HUtils.SupportedStatus.SUSPENDED]) - return self.manager.register_experiment_run_dc(experiment_run_dc) + return self.manager.register_experiment_run_dc(experiment_run_dc) def detect_changes_in_job_list(self, job_list): """ Detect changes in job_list compared to the current contents of job_data table. Returns a list of JobData data classes where the status of each item is the new status.""" @@ -292,12 +312,12 @@ class ExperimentHistory: differences.append(job_dc) return differences - def _get_defined_rowtype(self, code): + def _get_defined_rowtype(self, code): if code: return code else: return Models.RowType.NORMAL - + def _get_defined_queue_name(self, wrapper_queue, wrapper_code, qos): if wrapper_code and wrapper_code > 2 and wrapper_queue is not None and len(str(wrapper_queue)) > 0: return wrapper_queue @@ -314,12 +334,12 @@ class ExperimentHistory: def _get_date_member_completed_count(self, job_list): """ Each item in the job_list must have attributes: date, member, status_str. """ - job_list = job_list if job_list else [] + job_list = job_list if job_list else [] return sum(1 for job in job_list if job.date is not None and job.member is not None and job.status_str == HUtils.SupportedStatus.COMPLETED) - + def get_status_counts_from_job_list(self, job_list): - """ - Return dict with keys COMPLETED, FAILED, QUEUING, SUBMITTED, RUNNING, SUSPENDED, TOTAL. + """ + Return dict with keys COMPLETED, FAILED, QUEUING, SUBMITTED, RUNNING, SUSPENDED, TOTAL. """ result = { HUtils.SupportedStatus.COMPLETED: 0, @@ -329,14 +349,13 @@ class ExperimentHistory: HUtils.SupportedStatus.RUNNING: 0, HUtils.SupportedStatus.SUSPENDED: 0, "TOTAL": 0 - } + } if not job_list: job_list = [] - - for job in job_list: - if job.status_str in result: + + for job in job_list: + if job.status_str in result: result[job.status_str] += 1 result["TOTAL"] = len(job_list) return result - \ No newline at end of file diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index b697211a7a68a12ab86ab1ef06adf3b1b5d5ab23..bed0521b36fbeb599a39b89910dda5fbb9418f45 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -31,7 +31,7 @@ import os import re import textwrap import time -from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates +from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates, add_time from functools import reduce from threading import Thread from time import sleep @@ -42,7 +42,7 @@ from autosubmit.history.experiment_history import ExperimentHistory from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty from autosubmit.job.job_common import Status, Type, increase_wallclock_by_chunk -from autosubmit.job.job_utils import get_job_package_code +from autosubmit.job.job_utils import get_job_package_code, get_split_size_unit, get_split_size from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter from autosubmitconfigparser.config.basicconfig import BasicConfig from autosubmitconfigparser.config.configcommon import AutosubmitConfig @@ -256,17 +256,6 @@ class Job(object): self._memory = '' self._memory_per_task = '' - def _clean_runtime_parameters(self): - # hetjobs - self.het = None - self.parameters = None - self._tasks = None - self._nodes = None - self.default_parameters = None - self._threads = None - self._processors = None - self._memory = None - self._memory_per_task = None @property @autosubmit_parameter(name='tasktype') def section(self): @@ -1709,6 +1698,9 @@ class Job(object): if as_conf.get_project_type() != "none": parameters['EXTENDED_HEADER'] = self.read_header_tailer_script(self.ext_header_path, as_conf, True) parameters['EXTENDED_TAILER'] = self.read_header_tailer_script(self.ext_tailer_path, as_conf, False) + else: # If not, this show a warning when it tries to check the script + parameters['EXTENDED_HEADER'] = "" + parameters['EXTENDED_TAILER'] = "" parameters['CURRENT_QUEUE'] = self.queue parameters['RESERVATION'] = self.reservation parameters['CURRENT_EC_QUEUE'] = self.ec_queue @@ -1776,29 +1768,75 @@ class Job(object): self.total_jobs = job_data.get("TOTALJOBS",job_data.get("TOTALJOBS", job_platform.get("TOTALJOBS", job_platform.get("TOTAL_JOBS", -1)))) self.max_waiting_jobs = job_data.get("MAXWAITINGJOBS",job_data.get("MAXWAITINGJOBS", job_platform.get("MAXWAITINGJOBS", job_platform.get("MAX_WAITING_JOBS", -1)))) - def update_job_parameters(self,as_conf, parameters): - self.splits = as_conf.jobs_data[self.section].get("SPLITS", None) - self.delete_when_edgeless = as_conf.jobs_data[self.section].get("DELETE_WHEN_EDGELESS", True) - self.check = as_conf.jobs_data[self.section].get("CHECK", False) - self.check_warnings = as_conf.jobs_data[self.section].get("CHECK_WARNINGS", False) - if self.checkpoint: # To activate placeholder sustitution per in the template - parameters["AS_CHECKPOINT"] = self.checkpoint - parameters['JOBNAME'] = self.name - parameters['FAIL_COUNT'] = str(self.fail_count) - parameters['SDATE'] = self.sdate - parameters['MEMBER'] = self.member - parameters['SPLIT'] = self.split - parameters['SPLITS'] = self.splits - parameters['DELAY'] = self.delay - parameters['FREQUENCY'] = self.frequency - parameters['SYNCHRONIZE'] = self.synchronize - parameters['PACKED'] = self.packed - parameters['CHUNK'] = 1 - parameters['RETRIALS'] = self.retrials - parameters['DELAY_RETRIALS'] = self.delay_retrials - parameters['DELETE_WHEN_EDGELESS'] = self.delete_when_edgeless + def calendar_split(self, as_conf, parameters): + """ + Calendar for splits + :param parameters: + :return: + """ + # Calendar struct type numbered ( year, month, day, hour ) + + + job_data = as_conf.jobs_data.get(self.section,{}) + if job_data.get("SPLITS", None) and self.running != "once": # once jobs has no date + # total_split = int(self.splits) + split_unit = get_split_size_unit(as_conf.experiment_data, self.section) + cal = str(parameters.get('EXPERIMENT.CALENDAR', "standard")).lower() + split_length = get_split_size(as_conf.experiment_data, self.section) + start_date = parameters.get('CHUNK_START_DATE', None) + if start_date: + self.date = datetime.datetime.strptime(start_date, "%Y%m%d") + split_start = chunk_start_date(self.date, int(self.split), split_length, split_unit, cal) + split_end = chunk_end_date(split_start, split_length, split_unit, cal) + if split_unit == 'hour': + split_end_1 = split_end + else: + split_end_1 = previous_day(split_end, cal) + + parameters['SPLIT'] = self.split + parameters['SPLITSCALENDAR'] = cal + parameters['SPLITSIZE'] = split_length + parameters['SPLITSIZEUNIT'] = split_unit + + parameters['SPLIT_START_DATE'] = date2str( + split_start, self.date_format) + parameters['SPLIT_START_YEAR'] = str(split_start.year) + parameters['SPLIT_START_MONTH'] = str(split_start.month).zfill(2) + parameters['SPLIT_START_DAY'] = str(split_start.day).zfill(2) + parameters['SPLIT_START_HOUR'] = str(split_start.hour).zfill(2) + + parameters['SPLIT_SECOND_TO_LAST_DATE'] = date2str( + split_end_1, self.date_format) + parameters['SPLIT_SECOND_TO_LAST_YEAR'] = str(split_end_1.year) + parameters['SPLIT_SECOND_TO_LAST_MONTH'] = str(split_end_1.month).zfill(2) + parameters['SPLIT_SECOND_TO_LAST_DAY'] = str(split_end_1.day).zfill(2) + parameters['SPLIT_SECOND_TO_LAST_HOUR'] = str(split_end_1.hour).zfill(2) + + parameters['SPLIT_END_DATE'] = date2str( + split_end, self.date_format) + parameters['SPLIT_END_YEAR'] = str(split_end.year) + parameters['SPLIT_END_MONTH'] = str(split_end.month).zfill(2) + parameters['SPLIT_END_DAY'] = str(split_end.day).zfill(2) + parameters['SPLIT_END_HOUR'] = str(split_end.hour).zfill(2) + if int(self.split) == 1: + parameters['SPLIT_FIRST'] = 'TRUE' + else: + parameters['SPLIT_FIRST'] = 'FALSE' + # if int(total_split) == int(self.split): + # parameters['SPLIT_LAST'] = 'TRUE' + # else: + # parameters['SPLIT_LAST'] = 'FALSE' + + return parameters + def calendar_chunk(self, parameters): + """ + Calendar for chunks + + :param parameters: + :return: + """ if self.date is not None and len(str(self.date)) > 0: if self.chunk is None and len(str(self.chunk)) > 0: chunk = 1 @@ -1859,6 +1897,31 @@ class Job(object): parameters['CHUNK_LAST'] = 'TRUE' else: parameters['CHUNK_LAST'] = 'FALSE' + return parameters + + def update_job_parameters(self,as_conf, parameters): + self.splits = as_conf.jobs_data[self.section].get("SPLITS", None) + self.delete_when_edgeless = as_conf.jobs_data[self.section].get("DELETE_WHEN_EDGELESS", True) + self.check = as_conf.jobs_data[self.section].get("CHECK", False) + self.check_warnings = as_conf.jobs_data[self.section].get("CHECK_WARNINGS", False) + if self.checkpoint: # To activate placeholder sustitution per in the template + parameters["AS_CHECKPOINT"] = self.checkpoint + parameters['JOBNAME'] = self.name + parameters['FAIL_COUNT'] = str(self.fail_count) + parameters['SDATE'] = self.sdate + parameters['MEMBER'] = self.member + parameters['SPLIT'] = self.split + parameters['SPLITS'] = self.splits + parameters['DELAY'] = self.delay + parameters['FREQUENCY'] = self.frequency + parameters['SYNCHRONIZE'] = self.synchronize + parameters['PACKED'] = self.packed + parameters['CHUNK'] = 1 + parameters['RETRIALS'] = self.retrials + parameters['DELAY_RETRIALS'] = self.delay_retrials + parameters['DELETE_WHEN_EDGELESS'] = self.delete_when_edgeless + parameters = self.calendar_chunk(parameters) + parameters = self.calendar_split(as_conf,parameters) parameters['NUMMEMBERS'] = len(as_conf.get_member_list()) self.dependencies = as_conf.jobs_data[self.section].get("DEPENDENCIES", "") self.dependencies = str(self.dependencies) @@ -2158,6 +2221,7 @@ class Job(object): variables_tmp = [variable[1:-1] for variable in variables_tmp] variables_tmp = [variable for variable in variables_tmp if variable not in self.default_parameters] variables.extend(variables_tmp) + out = set(parameters).issuperset(set(variables)) # Check if the variables in the templates are defined in the configurations if not out: diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index ec22a2a25fe9b78dff7269aa9442043efedaa3b6..e4a6519537eb8117f9712d777be0838e61a7e980 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -21,10 +21,12 @@ from bscearth.utils.date import date2str from autosubmit.job.job import Job +from autosubmit.job.job_utils import get_split_size_unit, get_split_size, calendar_chunk_section from autosubmit.job.job_common import Status import datetime import re +from log.log import AutosubmitCritical class DicJobs: @@ -144,8 +146,15 @@ class DicJobs: """ self.compare_section(section) parameters = self.experiment_data["JOBS"] - splits = int(parameters[section].get("SPLITS", -1)) + splits = parameters[section].get("SPLITS", -1) running = str(parameters[section].get('RUNNING', "once")).lower() + if running != "chunk": + if str(splits).isdigit() or splits == -1: + splits = int(splits) + elif splits == "auto": + raise AutosubmitCritical("Splits: auto is only allowed for chunk splitted jobs") + else: + raise AutosubmitCritical(f"Splits must be an integer: {splits}") frequency = int(parameters[section].get("FREQUENCY", 1)) if running == 'once': self._create_jobs_once(section, priority, default_job_type, splits) @@ -258,6 +267,8 @@ class DicJobs: self._dic[section][date][member] = dict() count = 0 for chunk in (chunk for chunk in self._chunk_list): + if splits == "auto": + splits = calendar_chunk_section(self.experiment_data, section, date, chunk) count += 1 if delay == -1 or delay < chunk: if count % frequency == 0 or count == len(self._chunk_list): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 6e924f1093603d0ca4a2309895f1ca7e0b8adda9..f23ca4e7336774eb7ed730b84a47d497a43185ed 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -204,7 +204,7 @@ class JobList(object): chunk_list = list(range(chunk_ini, num_chunks + 1)) self._chunk_list = chunk_list try: - self.graph = self.load() + self.graph = self.load(create) if type(self.graph) is not DiGraph: self.graph = nx.DiGraph() except AutosubmitCritical: @@ -213,28 +213,23 @@ class JobList(object): self.graph = nx.DiGraph() self._dic_jobs = DicJobs(date_list, member_list, chunk_list, date_format, default_retrials, as_conf) self._dic_jobs.graph = self.graph - if show_log: - Log.info("Creating jobs...") if len(self.graph.nodes) > 0: if show_log: Log.info("Load finished") if monitor: - as_conf.experiment_data = as_conf.last_experiment_data - as_conf.data_changed = False - if not as_conf.data_changed: + self._dic_jobs.changes = {} + else: + self._dic_jobs.compare_backbone_sections() + if not self._dic_jobs.changes: self._dic_jobs._job_list = {job["job"].name: job["job"] for _, job in self.graph.nodes.data() if job.get("job", None)} else: - self._dic_jobs.compare_backbone_sections() # fast-look if graph existed, skips some steps # If VERSION in CONFIG or HPCARCH in DEFAULT it will exist, if not it won't. if not new and not self._dic_jobs.changes.get("EXPERIMENT", {}) and not self._dic_jobs.changes.get( "CONFIG", {}) and not self._dic_jobs.changes.get("DEFAULT", {}): self._dic_jobs._job_list = {job["job"].name: job["job"] for _, job in self.graph.nodes.data() if job.get("job", None)} - - # Force to use the last known job_list when autosubmit monitor is running. - self._dic_jobs.last_experiment_data = as_conf.last_experiment_data else: if not create: raise AutosubmitCritical("Autosubmit couldn't load the workflow graph. Please run autosubmit create first. If the pkl file exists and was generated with Autosubmit v4.1+, try again.",7013) @@ -248,6 +243,8 @@ class JobList(object): os.remove(os.path.join(self._persistence_path, self._persistence_file + "_backup.pkl")) new = True # This generates the job object and also finds if dic_jobs has modified from previous iteration in order to expand the workflow + if show_log: + Log.info("Creating jobs...") self._create_jobs(self._dic_jobs, 0, default_job_type) # not needed anymore all data is inside their correspondent sections in dic_jobs # This dic_job is key to the dependencies management as they're ordered by date[member[chunk]] @@ -376,7 +373,7 @@ class JobList(object): Log.debug("No changes detected, keeping edges") else: changes = True - Log.debug("No dependencies detected, calculating dependencies") + Log.debug("Changes detected, calculating dependencies") sections_gen = (section for section in jobs_data.keys()) for job_section in sections_gen: # Changes when all jobs of a section are added @@ -2314,32 +2311,38 @@ class JobList(object): "Autosubmit will use a backup for recover the job_list", 6010) return list() - def load(self): + def load(self, create=False, backup=False): """ Recreates a stored job list from the persistence :return: loaded job list object :rtype: JobList """ - Log.info("Loading JobList") try: - return self._persistence.load(self._persistence_path, self._persistence_file) - except AutosubmitCritical: - raise - except: - Log.printlog( - "Autosubmit will use a backup for recover the job_list", 6010) - return self.backup_load() - - def backup_load(self): - """ - Recreates a stored job list from the persistence - - :return: loaded job list object - :rtype: JobList - """ - Log.info("Loading backup JobList") - return self._persistence.load(self._persistence_path, self._persistence_file + "_backup") + if not backup: + Log.info("Loading JobList") + return self._persistence.load(self._persistence_path, self._persistence_file) + else: + return self._persistence.load(self._persistence_path, self._persistence_file + "_backup") + except ValueError as e: + if not create: + raise AutosubmitCritical(f'JobList could not be loaded due pkl being saved with a different version of Autosubmit or Python version. {e}') + else: + Log.warning(f'Job list will be created from scratch due pkl being saved with a different version of Autosubmit or Python version. {e}') + except PermissionError as e: + if not create: + raise AutosubmitCritical(f'JobList could not be loaded due to permission error. {e}') + else: + Log.warning(f'Job list will be created from scratch due to permission error. {e}') + except BaseException as e: + if not backup: + Log.debug("Autosubmit will use a backup to recover the job_list") + return self.load(create, True) + else: + if not create: + raise AutosubmitCritical(f"JobList could not be loaded due: {e}\nAutosubmit won't do anything") + else: + Log.warning(f'Joblist will be created from scratch due: {e}') def save(self): """ @@ -2398,8 +2401,15 @@ class JobList(object): else: queue = job.queue platform_name = job.platform.name if job.platform else "no-platform" - Log.status("{0:<35}{1:<15}{2:<15}{3:<20}{4:<15}", job.name, job.id, Status( - ).VALUE_TO_KEY[job.status], platform_name, queue) + if job.id is None: + job_id = "no-id" + else: + job_id = job.id + try: + Log.status("{0:<35}{1:<15}{2:<15}{3:<20}{4:<15}", job.name, job_id, Status( + ).VALUE_TO_KEY[job.status], platform_name, queue) + except: + Log.debug("Couldn't print job status for job {0}".format(job.name)) for job in failed_job_list: if len(job.queue) < 1: queue = "no-scheduler" diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 951771bed1523e908eb32efedfa784544d130cde..791de78640f924bad9ccc8cebcc83682e21aa27b 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -22,6 +22,7 @@ from sys import setrecursionlimit import shutil from autosubmit.database.db_manager import DbManager from log.log import AutosubmitCritical, Log +from contextlib import suppress class JobListPersistence(object): @@ -66,20 +67,24 @@ class JobListPersistencePkl(JobListPersistence): """ path = os.path.join(persistence_path, persistence_file + '.pkl') + path_tmp = os.path.join(persistence_path[:-3]+"tmp", persistence_file + f'.pkl.tmp_{os.urandom(8).hex()}') + try: open(path).close() except PermissionError: - raise AutosubmitCritical(f'Permission denied to read {path}', 7012) + Log.warning(f'Permission denied to read {path}') + raise except FileNotFoundError: - Log.printlog(f'File {path} does not exist. ',Log.WARNING) - return list() + Log.warning(f'File {path} does not exist. ') + raise else: # copy the path to a tmp file randomseed to avoid corruption - path_tmp = f'{path}.tmp_{os.urandom(8).hex()}' - shutil.copy(path, path_tmp) - with open(path_tmp, 'rb') as fd: - graph = pickle.load(fd) - os.remove(path_tmp) + try: + shutil.copy(path, path_tmp) + with open(path_tmp, 'rb') as fd: + graph = pickle.load(fd) + finally: + os.remove(path_tmp) for u in ( node for node in graph ): # Set after the dependencies are set graph.nodes[u]["job"].children = set() @@ -100,8 +105,9 @@ class JobListPersistencePkl(JobListPersistence): """ path = os.path.join(persistence_path, persistence_file + '.pkl' + '.tmp') - if os.path.exists(path): + with suppress(FileNotFoundError, PermissionError): os.remove(path) + setrecursionlimit(500000000) Log.debug("Saving JobList: " + path) with open(path, 'wb') as fd: diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 675f113011ca8110b6b9c094c5dd6270564032e9..d02b551c28ae994740217bfe744ce189cfae2a2c 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -226,7 +226,7 @@ class JobPackager(object): min_h = len(package.jobs) return min_v, min_h, balanced - def check_packages_respect_wrapper_policy(self,built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits): + def check_packages_respect_wrapper_policy(self,built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits, any_simple_packages = False): """ Check if the packages respect the wrapper policy and act in base of it ( submit wrapper, submit sequential, wait for more jobs to form a wrapper) :param built_packages_tmp: List of packages to be submitted @@ -237,10 +237,10 @@ class JobPackager(object): :rtype: List of packages to be submitted, int :return: packages_to_submit, max_jobs_to_submit """ + not_wrappeable_package_info = list() for p in built_packages_tmp: if max_jobs_to_submit == 0: break - infinite_deadlock = False # This will raise an autosubmit critical if true, infinite deadlock is when there are no more non-wrapped jobs in waiting or ready status failed_innerjobs = False # Check if the user is using the option to run first some jobs. if so, remove non-first jobs from the package and submit them sequentially following a flexible policy if len(self._jobs_list.jobs_to_run_first) > 0: @@ -269,129 +269,57 @@ class JobPackager(object): job.packed = True packages_to_submit.append(p) max_jobs_to_submit = max_jobs_to_submit - 1 - else: # Check if there is a deadlock or an infinite deadlock. Once checked, act in base of the wrapper policy. - deadlock = True - if deadlock: # Remaining jobs if chunk is the last one + else: + not_wrappeable_package_info.append([p, min_v, min_h, balanced]) + # It is a deadlock when: + # 1. There are no more non-wrapped jobs in ready status + # 2. And there are no more jobs in the queue ( submitted, queuing, running, held ) + # 3. And all current packages are not wrappable. + if not any_simple_packages and len(self._jobs_list.get_in_queue()) == 0 and len(not_wrappeable_package_info) == len(built_packages_tmp): + for p, min_v, min_h, balanced in not_wrappeable_package_info: + if self.wrapper_policy[self.current_wrapper_section] == "strict": for job in p.jobs: - if (job.running == "chunk" and job.chunk == int( - job.parameters["EXPERIMENT.NUMCHUNKS"])) and balanced: - deadlock = False - break - if not deadlock: # Submit package if deadlock has been liberated + job.packed = False + raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, p.wallclock, balanced), 7014) + elif self.wrapper_policy[self.current_wrapper_section] == "mixed": + error = True for job in p.jobs: - job.packed = True - packages_to_submit.append(p) - max_jobs_to_submit = max_jobs_to_submit - 1 - else: - wallclock_sum = p.jobs[0].wallclock - for seq in range(1, min_v): - wallclock_sum = sum_str_hours(wallclock_sum, p.jobs[0].wallclock) - next_wrappable_jobs = self._jobs_list.get_jobs_by_section(self.jobs_in_wrapper[self.current_wrapper_section]) - next_wrappable_jobs = [job for job in next_wrappable_jobs if - job.status == Status.WAITING and job not in p.jobs] # Get only waiting jobs - active_jobs = list() - aux_active_jobs = list() - for job in next_wrappable_jobs: # Prone tree by looking only the closest children - direct_children = False - for related in job.parents: - if related in p.jobs: - direct_children = True - break - if direct_children: # Get parent of direct children that aren't in wrapper - aux_active_jobs += [aux_parent for aux_parent in job.parents if ( - aux_parent.status != Status.COMPLETED and aux_parent.status != Status.FAILED) and ( - aux_parent.section not in self.jobs_in_wrapper[ - self.current_wrapper_section] or ( - aux_parent.section in self.jobs_in_wrapper[ - self.current_wrapper_section] and aux_parent.status != Status.COMPLETED and aux_parent.status != Status.FAILED and aux_parent.status != Status.WAITING and aux_parent.status != Status.READY))] - aux_active_jobs = list(set(aux_active_jobs)) - track = [] # Tracker to prone tree for avoid the checking of the same parent from different nodes. - active_jobs_names = [job.name for job in - p.jobs] # We want to search if the actual wrapped jobs needs to run for add more jobs to this wrapper - hard_deadlock = False - for job in aux_active_jobs: - parents_to_check = [] - if job.status == Status.WAITING: # We only want to check uncompleted parents - aux_job = job - for parent in aux_job.parents: # First case - if parent.name in active_jobs_names: - hard_deadlock = True - infinite_deadlock = True - break - if (parent.status == Status.WAITING) and parent.name != aux_job.name: - parents_to_check.append(parent) - track.extend(parents_to_check) - while len( - parents_to_check) > 0 and not infinite_deadlock: # We want to look deeper on the tree until all jobs are completed, or we find an unresolvable deadlock. - aux_job = parents_to_check.pop(0) - for parent in aux_job.parents: - if parent.name in active_jobs_names: - hard_deadlock = True - infinite_deadlock = True - break - if ( - parent.status == Status.WAITING) and parent.name != aux_job.name and parent not in track: - parents_to_check.append(parent) - track.extend(parents_to_check) - if not infinite_deadlock: - active_jobs.append(job) # List of jobs that can continue to run without run this wrapper - - # Act in base of active_jobs and Policies - if self.wrapper_policy[self.current_wrapper_section] == "strict": - for job in p.jobs: + if max_jobs_to_submit == 0: + break + if job.fail_count > 0 and job.status == Status.READY: job.packed = False - if len(active_jobs) > 0: - Log.printlog(f'Wrapper policy is set to STRICT and there are not enough jobs to form a wrapper.[wrappable:{wrapper_limits["min"]} <= defined_min:{wrapper_limits["min"]}] [wrappeable_h:{min_h} <= defined_min_h:{wrapper_limits["min_h"]}]|[wrappeable_v:{min_v} <= defined_min_v:{wrapper_limits["min_v"]}] waiting until the wrapper can be formed.\nIf all values are <=, some innerjob has failed under strict policy', 6013) - else: - if len(self._jobs_list.get_in_queue()) == 0: - raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, hard_deadlock, wallclock_sum, balanced), 7014) - elif self.wrapper_policy[self.current_wrapper_section] == "mixed": - error = True - show_log = True - for job in p.jobs: - if max_jobs_to_submit == 0: - break - if job.fail_count > 0 and job.status == Status.READY: - job.packed = False - Log.printlog( - "Wrapper policy is set to mixed, there is a failed job that will be sent sequential") - error = False - show_log = False - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - max_jobs_to_submit = max_jobs_to_submit - 1 - if error: - if len(active_jobs) > 0: - if show_log: - Log.printlog(f'Wrapper policy is set to MIXED and there are not enough jobs to form a wrapper.[wrappable:{wrapper_limits["min"]} < defined_min:{wrapper_limits["min"]}] [wrappable_h:{min_h} < defined_min_h:{wrapper_limits["min_h"]}]|[wrappeable_v:{min_v} < defined_min_v:{wrapper_limits["min_v"]}] waiting until the wrapper can be formed.', 6013) + Log.printlog( + "Wrapper policy is set to mixed, there is a failed job that will be sent sequential") + error = False + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped( + [job]) else: - if len(self._jobs_list.get_in_queue()) == 0: # When there are not more possible jobs, autosubmit will stop the execution - raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, hard_deadlock, wallclock_sum, balanced), 7014) - else: - Log.info( - "Wrapper policy is set to flexible and there is a deadlock, Autosubmit will submit the jobs sequentially") - for job in p.jobs: - if max_jobs_to_submit == 0: - break - job.packed = False - if job.status == Status.READY: - if job.type == Type.PYTHON and not self._platform.allow_python_jobs: - package = JobPackageSimpleWrapped( - [job]) - else: - package = JobPackageSimple([job]) - packages_to_submit.append(package) - max_jobs_to_submit = max_jobs_to_submit - 1 + package = JobPackageSimple([job]) + packages_to_submit.append(package) + max_jobs_to_submit = max_jobs_to_submit - 1 + if error: + if len(self._jobs_list.get_in_queue()) == 0: # When there are not more possible jobs, autosubmit will stop the execution + raise AutosubmitCritical(self.error_message_policy(min_h, min_v, wrapper_limits, p.wallclock, balanced), 7014) + else: + Log.info( + "Wrapper policy is set to flexible and there is a deadlock, Autosubmit will submit the jobs sequentially") + for job in p.jobs: + if max_jobs_to_submit == 0: + break + job.packed = False + if job.status == Status.READY: + if job.type == Type.PYTHON and not self._platform.allow_python_jobs: + package = JobPackageSimpleWrapped( + [job]) + else: + package = JobPackageSimple([job]) + packages_to_submit.append(package) + max_jobs_to_submit = max_jobs_to_submit - 1 return packages_to_submit, max_jobs_to_submit - def error_message_policy(self,min_h,min_v,wrapper_limits,hard_deadlock,wallclock_sum,balanced): - message = f"Wrapper couldn't be formed under {self.wrapper_policy[self.current_wrapper_section]} POLICY due minimum limit not being reached: [wrappable:{wrapper_limits['min']} < defined_min:{wrapper_limits['min']}] [wrappable_h:{min_h} < defined_min_h:{wrapper_limits['min_h']}]|[wrappeable_v:{min_v} < defined_min_v:{wrapper_limits['min_v']}] " - if hard_deadlock: - message += "\nCheck your configuration: The next wrappable job can't be wrapped until some of inner jobs of current packages finishes which is impossible" + def error_message_policy(self,min_h,min_v,wrapper_limits,wallclock_sum,balanced): + message = f"Wrapper couldn't be formed under {self.wrapper_policy[self.current_wrapper_section]} POLICY due minimum limit not being reached: [wrappable:{wrapper_limits['min']} < defined_min:{min_h*min_v}] [wrappable_h:{min_h} < defined_min_h:{wrapper_limits['min_h']}]|[wrappeable_v:{min_v} < defined_min_v:{wrapper_limits['min_v']}] " if min_v > 1: message += f"\nCheck your configuration: Check if current {wallclock_sum} vertical wallclock has reached the max defined on platforms.conf." else: @@ -510,6 +438,7 @@ class JobPackager(object): job.packed = False jobs_to_wrap = self._divide_list_by_section(jobs_to_submit) non_wrapped_jobs = jobs_to_wrap.pop("SIMPLE",[]) + any_simple_packages = len(non_wrapped_jobs) > 0 # Prepare packages for wrapped jobs for wrapper_name, jobs in jobs_to_wrap.items(): if max_jobs_to_submit == 0: @@ -536,15 +465,13 @@ class JobPackager(object): if self.wrapper_type[self.current_wrapper_section] == 'vertical': built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits,wrapper_info=current_info) elif self.wrapper_type[self.current_wrapper_section] == 'horizontal': - if len(jobs) >= wrapper_limits["min_h"]: - built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section,wrapper_info=current_info) + built_packages_tmp = self._build_horizontal_packages(jobs, wrapper_limits, section,wrapper_info=current_info) elif self.wrapper_type[self.current_wrapper_section] in ['vertical-horizontal', 'horizontal-vertical']: - if len(jobs) >= wrapper_limits["min_h"]: - built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) + built_packages_tmp.append(self._build_hybrid_package(jobs, wrapper_limits, section,wrapper_info=current_info)) else: built_packages_tmp = self._build_vertical_packages(jobs, wrapper_limits) - packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits) + packages_to_submit,max_jobs_to_submit = self.check_packages_respect_wrapper_policy(built_packages_tmp,packages_to_submit,max_jobs_to_submit,wrapper_limits,any_simple_packages) # Now, prepare the packages for non-wrapper jobs for job in non_wrapped_jobs: if max_jobs_to_submit == 0: diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 86e790791018773792c71f983cee5bec512d721c..8bb679ae9e2b15158557bc55dd5573dc620bc9ef 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -96,22 +96,15 @@ class JobPackageBase(object): @threaded def check_scripts(self,jobs,configuration, parameters,only_generate,hold): for job in jobs: - if str(job.check).lower() == str(Job.CHECK_ON_SUBMISSION).lower(): - if only_generate: - exit_ = True - break - if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - lock.acquire() - if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: - raise AutosubmitCritical( - "Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format( - job.name), 7014) - lock.release() - if not job.check_script(configuration, parameters, show_logs=job.check_warnings): - Log.warning("Script {0} check failed", job.name) - Log.warning("On submission script has some empty variables") - else: - Log.result("Script {0} OK", job.name) + if only_generate and not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + break + else: + if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: + raise AutosubmitCritical(f"Job script:{job.file} does not exists",7014) + if not job.check_script(configuration, parameters, show_logs=job.check_warnings): + Log.warning(f'Script {job.name} has some empty variables. An empty value has substituted these variables') + else: + Log.result("Script {0} OK", job.name) # looking for directives on jobs self._custom_directives = self._custom_directives | set(job.custom_directives) @threaded @@ -123,6 +116,7 @@ class JobPackageBase(object): pass + def submit(self, configuration, parameters,only_generate=False,hold=False): """ :param hold: @@ -148,21 +142,16 @@ class JobPackageBase(object): try: if len(self.jobs) < thread_number: for job in self.jobs: - if job.check == Job.CHECK_ON_SUBMISSION.lower(): - if only_generate: - exit_=True - break - if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: - raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) - if not job.check_script(configuration, parameters,show_logs=job.check_warnings): - Log.warning("Script {0} check failed",job.name) - Log.warning("On submission script has some empty variables") - else: - Log.result("Script {0} OK",job.name) - job.update_parameters(configuration, parameters) - # Looking for special variables - + if only_generate and not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + exit_=True + break + if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + if configuration.get_project_type().lower() != "none" and len(configuration.get_project_type()) > 0: + raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) + if not job.check_script(configuration, parameters,show_logs=job.check_warnings): + Log.warning(f'Script {job.name} has some empty variables. An empty value has substituted these variables') + else: + Log.result("Script {0} OK",job.name) # looking for directives on jobs self._custom_directives = self._custom_directives | set(job.custom_directives) else: diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index c02a92952778361cbb50b599e9c568316914fafb..d407f016047fcd5fd36e5b9be5e1592945cde8cc 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -1,4 +1,11 @@ #!/usr/bin/env python3 +import math +from log.log import Log, AutosubmitCritical +import os +from autosubmit.job.job_package_persistence import JobPackagePersistence +from autosubmitconfigparser.config.basicconfig import BasicConfig +from typing import Dict +from bscearth.utils.date import date2str, previous_day, chunk_end_date, chunk_start_date, subs_dates # Copyright 2017-2020 Earth Sciences Department, BSC-CNS @@ -22,6 +29,176 @@ from autosubmit.job.job_package_persistence import JobPackagePersistence from autosubmitconfigparser.config.basicconfig import BasicConfig from typing import Dict +CALENDAR_UNITSIZE_ENUM = { + "hour": 0, + "day": 1, + "month": 2, + "year": 3 +} + + +def is_leap_year(year): + """Determine whether a year is a leap year.""" + return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) + + +def calendar_unitsize_isgreater(split_unit,chunk_unit): + """ + Check if the split unit is greater than the chunk unit + :param split_unit: + :param chunk_unit: + :return: + """ + split_unit = split_unit.lower() + chunk_unit = chunk_unit.lower() + try: + return CALENDAR_UNITSIZE_ENUM[split_unit] > CALENDAR_UNITSIZE_ENUM[chunk_unit] + except KeyError: + raise AutosubmitCritical(f"Invalid calendar unit size") + +def calendar_unitsize_getlowersize(unitsize): + """ + Get the lower size of a calendar unit + :return: + """ + unit_size = unitsize.lower() + unit_value = CALENDAR_UNITSIZE_ENUM[unit_size] + if unit_value == 0: + return "hour" + else: + return list(CALENDAR_UNITSIZE_ENUM.keys())[unit_value - 1] + +def calendar_get_month_days(date_str): + """ + Get the number of days in a month + :param date_str: Date in string format (YYYYMMDD) + :return: + """ + year = int(date_str[0:4]) + month = int(date_str[4:6]) + if month == 2: + if is_leap_year(year): + return 29 + else: + return 28 + elif month in [4, 6, 9, 11]: + return 30 + else: + return 31 + + +def calendar_split_size_isvalid(date_str, split_size, split_unit, chunk_unit, chunk_length): + """ + Check if the split size is valid for the calendar + :param date_str: Date in string format (YYYYMMDD) + :param split_size: Size of the split + :param split_unit: Unit of the split + :param chunk_unit: Unit of the chunk + :param chunk_length: Size of the chunk + :return: Boolean + """ + if is_leap_year(int(date_str[0:4])): + num_days_in_a_year = 366 + else: + num_days_in_a_year = 365 + if chunk_unit == "year": + chunk_size_in_hours = num_days_in_a_year * 24 * chunk_length + elif chunk_unit == "month": + chunk_size_in_hours = calendar_get_month_days(date_str) * 24 * chunk_length + elif chunk_unit == "day": + chunk_size_in_hours = 24 * chunk_length + else: + chunk_size_in_hours = chunk_length + + if split_unit == "year": + split_size_in_hours = num_days_in_a_year * 24 * split_size + elif split_unit == "month": + split_size_in_hours = calendar_get_month_days(date_str) * 24 * split_size + elif split_unit == "day": + split_size_in_hours = 24 * split_size + else: + split_size_in_hours = split_size + + if split_size_in_hours != chunk_size_in_hours: + Log.warning(f"After calculations, the total sizes are: SplitSize*SplitUnitSize:{split_size_in_hours} hours, ChunkSize*ChunkUnitsize:{chunk_size_in_hours} hours.") + else: + Log.debug(f"Split size in hours: {split_size_in_hours}, Chunk size in hours: {chunk_size_in_hours}") + return split_size_in_hours <= chunk_size_in_hours + + + + + +def calendar_chunk_section(exp_data, section, date, chunk): + """ + Calendar for chunks + :param section: + :param parameters: + :return: + """ + date_str = date2str(date) + splits = 0 + jobs_data = exp_data.get('JOBS', {}) + split_unit = str(exp_data.get("EXPERIMENT", {}).get('SPLITSIZEUNIT', jobs_data.get(section,{}).get("SPLITSIZEUNIT", None))).lower() + chunk_unit = str(exp_data.get("EXPERIMENT", {}).get('CHUNKSIZEUNIT', "day")).lower() + split_policy = str(exp_data.get("EXPERIMENT", {}).get('SPLITPOLICY', jobs_data.get(section,{}).get("SPLITPOLICY", "flexible"))).lower() + if chunk_unit == "hour": + raise AutosubmitCritical("Chunk unit is hour, Autosubmit doesn't support lower than hour splits. Please change the chunk unit to day or higher. Or don't use calendar splits.") + if jobs_data.get(section,{}).get("RUNNING","once") != "once": + chunk_length = int(exp_data.get("EXPERIMENT", {}).get('CHUNKSIZE', 1)) + cal = str(exp_data.get('CALENDAR', "standard")).lower() + chunk_start = chunk_start_date( + date, chunk, chunk_length, chunk_unit, cal) + chunk_end = chunk_end_date( + chunk_start, chunk_length, chunk_unit, cal) + run_days = subs_dates(chunk_start, chunk_end, cal) + if split_unit == "none": + split_unit = calendar_unitsize_getlowersize(chunk_unit) + if calendar_unitsize_isgreater(split_unit,chunk_unit): + raise AutosubmitCritical("Split unit is greater than chunk unit. Autosubmit doesn't support this configuration. Please change the split unit to day or lower. Or don't use calendar splits.") + if split_unit == "hour": + num_max_splits = run_days * 24 + elif split_unit == "month": + num_max_splits = run_days / 12 + elif split_unit == "year": + if not is_leap_year(chunk_start.year) or cal == "noleap": + num_max_splits = run_days / 365 + else: + num_max_splits = run_days / 366 + else: + num_max_splits = run_days + split_size = get_split_size(exp_data, section) + if not calendar_split_size_isvalid(date_str, split_size, split_unit, chunk_unit, chunk_length): + raise AutosubmitCritical(f"Invalid split size for the calendar. The split size is {split_size} and the unit is {split_unit}.") + splits = num_max_splits / split_size + if not splits.is_integer() and split_policy == "flexible": + Log.warning(f"The number of splits:{num_max_splits}/{split_size} is not an integer. The number of splits will be rounded up due the flexible split policy.\n You can modify the SPLITPOLICY parameter in the section {section} to 'strict' to avoid this behavior.") + elif not splits.is_integer() and split_policy == "strict": + raise AutosubmitCritical(f"The number of splits is not an integer. The number of splits will be rounded up due the strict split policy.\n You can modify the SPLITPOLICY parameter in the section {section} to 'flexible' to roundup the number.") + splits = math.ceil(splits) + Log.info(f"For the section {section} with date:{date_str} the number of splits is {splits}.") + + return splits + +def get_split_size_unit(data, section): + split_unit = str(data.get('JOBS',{}).get(section,{}).get('SPLITSIZEUNIT', "none")).lower() + if split_unit == "none": + split_unit = str(data.get('EXPERIMENT',{}).get("CHUNKSIZEUNIT", "day")).lower() + if split_unit == "year": + return "month" + elif split_unit == "month": + return "day" + elif split_unit == "day": + return "hour" + else: + return "day" + return split_unit + + +def get_split_size(as_conf, section): + job_data = as_conf.get('JOBS',{}).get(section,{}) + exp_data = as_conf.get('EXPERIMENT',{}) + return int(job_data.get("SPLITSIZE", exp_data.get("SPLITSIZE", exp_data.get('CHUNKSIZE', 1)))) def transitive_reduction(graph): """ diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 10d7e10515da7b36444b5a814e5b6f7ef0015ca3..05340a526ccce9f2b51ecfa6925ebc5be2763584 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -558,10 +558,11 @@ class Platform(object): :rtype: bool """ if recovery: - if self.get_file('{0}_COMPLETED'.format(job_name), False, ignore_log=recovery): - return True - else: - return False + retries = 5 + for i in range(retries): + if self.get_file('{0}_COMPLETED'.format(job_name), False, ignore_log=recovery): + return True + return False if self.check_file_exists('{0}_COMPLETED'.format(job_name), wrapper_failed=wrapper_failed): if self.get_file('{0}_COMPLETED'.format(job_name), True, wrapper_failed=wrapper_failed): return True diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index b51920d7d921a03422a8bccc51281046e1feeda3..e741239dbc56fac949395e60b2c31d7db302f76b 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -176,7 +176,11 @@ class SlurmPlatform(ParamikoPlatform): job_name = package.name if hasattr(package, "name") else package.jobs[0].name jobid = self.get_jobid_by_jobname(job_name) if len(jobid) > 1: # Cancel each job that is not the associated - for id_ in [ jobid for jobid in jobid if jobid != package.jobs[0].id ]: + ids_to_check = [package.jobs[0].id] + if package.jobs[0].het: + for i in range(1,package.jobs[0].het.get("HETSIZE",1)): + ids_to_check.append(str(int(ids_to_check[0]) + i)) + for id_ in [ jobid for jobid in jobid if jobid not in ids_to_check]: self.send_command(self.cancel_job(id_)) # This can be faster if we cancel all jobs at once but there is no cancel_all_jobs call right now so todo in future Log.debug(f'Job {id_} with the assigned name: {job_name} has been cancelled') Log.debug(f'Job {package.jobs[0].id} with the assigned name: {job_name} has been submitted') diff --git a/bin/autosubmit b/bin/autosubmit index 21c056019acadd564af7329d6ea315a0a6a1949e..53a7f58e68ffaf4b4f644fc6535f7991003ecb49 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -21,7 +21,6 @@ import os import sys import traceback -from io import StringIO # for handling the traceback print from contextlib import suppress scriptdir = os.path.abspath(os.path.dirname(sys.argv[0])) @@ -30,17 +29,25 @@ sys.path.append(os.path.normpath(os.path.join(scriptdir, os.pardir))) # noinspection PyUnresolvedReferences from log.log import Log, AutosubmitCritical , AutosubmitError from autosubmit.autosubmit import Autosubmit +from typing import Union -def exit_from_error(e): +def exit_from_error(e: BaseException): + try: + Log.debug(traceback.format_exc()) + except: + print(traceback.format_exc()) with suppress(FileNotFoundError, PermissionError): os.remove(os.path.join(Log.file_path, "autosubmit.lock")) - try: - if not e.trace: + if isinstance(e, (AutosubmitCritical, AutosubmitError)): + e: Union[AutosubmitError, AutosubmitCritical] = e + if e.trace: Log.debug("Trace: {0}", str(e.trace)) Log.critical("{1} [eCode={0}]", e.code, e.message) - except: - Log.critical("An Unknown error occurred: {0}.\n Please report it to Autosubmit Developers through Git", str(e)) + else: + msg = "An Unknown error occurred: {0}.\n Please report it to Autosubmit Developers through Git" + args = [str(e)] + Log.critical(msg.format(*args)) Log.info("More info at https://autosubmit.readthedocs.io/en/master/troubleshooting/error-codes.html") os._exit(1) diff --git a/docs/source/userguide/configure/develop_a_project.rst b/docs/source/userguide/configure/develop_a_project.rst index 5da3114a2524154ac5f29b87f4a09047164ea115..0a2346f97efdbf512c413c5a0ed2d83379cf9fe0 100644 --- a/docs/source/userguide/configure/develop_a_project.rst +++ b/docs/source/userguide/configure/develop_a_project.rst @@ -40,12 +40,17 @@ Expdef configuration MEMBERS: fc0 # Chunk size unit. STRING: hour, day, month, year CHUNKSIZEUNIT: month + # Split size unit. STRING: hour, day, month, year and lower than CHUNKSIZEUNIT + SPLITSIZEUNIT: day # default CHUNKSIZEUNIT-1 (month-1 == day) # Chunk size. NUMERIC: 4, 6, 12 CHUNKSIZE: 1 + # Split size. NUMERIC: 4, 6, 12 + SPLITSIZE: 1 # Total number of chunks in experiment. NUMERIC: 30, 15, 10 NUMCHUNKS: 2 # Calendar used. LIST: standard, noleap CALENDAR: standard + # List of members that can be included in this run. Optional. # RUN_ONLY_MEMBERS: fc0 fc1 fc2 fc3 fc4 # RUN_ONLY_MEMBERS: fc[0-4] @@ -201,6 +206,12 @@ Jobs configuration ## Specify the path to the interpreter. If empty, use system default based on job type . Default: empty # EXECUTABLE: /my_python_env/python3 + # Split the job in N jobs. If not specified, defaults to None + # Splits = 2 + # Size unit of the split. Options: hour, day, month, year. Defaults to EXPERIMENT.CHUNKSIZEUNIT-1 + # SPLITSIZEUNIT: day + # Size of the split. If not specified, defaults to 1 + # SPLITSIZE: 1 LOCAL_SETUP: FILE: LOCAL_SETUP.sh diff --git a/requeriments.txt b/requeriments.txt index ae0b28c5cd0ab49855b732c2930f4dfb44d65cb3..9c039557166321fd083e38b823fc3fd99c8e7b1b 100644 --- a/requeriments.txt +++ b/requeriments.txt @@ -1,12 +1,11 @@ zipp>=3.1.0 setuptools>=60.8.2 cython -autosubmitconfigparser==1.0.56 +autosubmitconfigparser==1.0.58 paramiko>=2.9.2 bcrypt>=3.2 PyNaCl>=1.5.0 configobj>=5.0.6 -argparse>=1.4.0 python-dateutil>=2.8.2 matplotlib<3.6 py3dotplus>=1.1.0 diff --git a/setup.py b/setup.py index 164dae7c7de87f1878c3ffd2eea1c4948abfc27f..56cbf3f52ec157fd8961797830238a45756f2a31 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ setup( url='http://www.bsc.es/projects/earthscience/autosubmit/', download_url='https://earth.bsc.es/wiki/doku.php?id=tools:autosubmit', keywords=['climate', 'weather', 'workflow', 'HPC'], - install_requires=['zipp>=3.1.0','ruamel.yaml==0.17.21','cython','autosubmitconfigparser','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','argparse>=1.4.0','python-dateutil>=2.8.2','matplotlib<3.6','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2,<=2.7.0','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','pythondialog','pytest','nose','coverage','PyNaCl>=1.5.0','Pygments','psutil','rocrate==0.*'], + install_requires=['zipp>=3.1.0','ruamel.yaml==0.17.21','cython','autosubmitconfigparser','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','python-dateutil>=2.8.2','matplotlib<3.6','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2,<=2.7.0','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','pythondialog','pytest','nose','coverage','PyNaCl>=1.5.0','Pygments','psutil','rocrate==0.*'], classifiers=[ "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.9", diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index bf5360070ab9694ad261e7b847a87d701a460439..f8b2138e656428c4db95c65b58c4a12e101cfcda 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -594,6 +594,12 @@ class TestDicJobs(TestCase): self.dictionary._create_jobs_split(5,'fake-section','fake-date', 'fake-member', 'fake-chunk', 0,Type.BASH, section_data) self.assertEqual(5, len(section_data)) + @patch('autosubmit.job.job_dict.date2str') + def test_create_jobs_split(self,mock_date2str): + mock_date2str.side_effect = lambda x, y: str(x) + section_data = [] + self.dictionary._create_jobs_split(5,'fake-section','fake-date', 'fake-member', 'fake-chunk', 0,Type.BASH, section_data) + self.assertEqual(5, len(section_data)) diff --git a/test/unit/test_job.py b/test/unit/test_job.py index f4887886c1df42b8aa57c802fa646067eec5ca8c..fe41cc4396df965a1b1402790960c4a8c029a238 100644 --- a/test/unit/test_job.py +++ b/test/unit/test_job.py @@ -5,11 +5,13 @@ import sys import tempfile from pathlib import Path from autosubmit.job.job_list_persistence import JobListPersistencePkl +import datetime # compatibility with both versions (2 & 3) from sys import version_info from textwrap import dedent from unittest import TestCase +from autosubmit.job.job_utils import calendar_chunk_section from autosubmitconfigparser.config.configcommon import AutosubmitConfig from autosubmitconfigparser.config.configcommon import BasicConfig, YAMLParserFactory @@ -608,6 +610,7 @@ CONFIG: run_only_members=[], #config.get_member_list(run_only=True), show_log=True, + create=True, ) job_list = job_list_obj.get_job_list() @@ -827,6 +830,7 @@ CONFIG: new=True, run_only_members=config.get_member_list(run_only=True), show_log=True, + create=True, ) job_list = job_list_obj.get_job_list() @@ -971,6 +975,7 @@ CONFIG: new=True, run_only_members=config.get_member_list(run_only=True), show_log=True, + create=True, ) job_list = job_list_obj.get_job_list() self.assertEqual(1, len(job_list)) @@ -1185,6 +1190,223 @@ CONFIG: self.assertEqual(1, len(self.job.children)) self.assertEqual(child, list(self.job.children)[0]) + def test_auto_calendar_split(self): + self.experiment_data = { + 'EXPERIMENT': { + 'DATELIST': '20000101', + 'MEMBERS': 'fc0', + 'CHUNKSIZEUNIT': 'day', + 'CHUNKSIZE': '1', + 'NUMCHUNKS': '2', + 'CALENDAR': 'standard' + }, + 'JOBS': { + 'A': { + 'FILE': 'a', + 'PLATFORM': 'test', + 'RUNNING': 'chunk', + 'SPLITS': 'auto', + 'SPLITSIZE': 1 + }, + 'B': { + 'FILE': 'b', + 'PLATFORM': 'test', + 'RUNNING': 'chunk', + 'SPLITS': 'auto', + 'SPLITSIZE': 2 + } + } + } + section = "A" + date = datetime.datetime.strptime("20000101", "%Y%m%d") + chunk = 1 + splits = calendar_chunk_section(self.experiment_data, section, date, chunk) + self.assertEqual(splits, 24) + splits = calendar_chunk_section(self.experiment_data, "B", date, chunk) + self.assertEqual(splits, 12) + self.experiment_data['EXPERIMENT']['CHUNKSIZEUNIT'] = 'hour' + with self.assertRaises(AutosubmitCritical): + calendar_chunk_section(self.experiment_data, "A", date, chunk) + + self.experiment_data['EXPERIMENT']['CHUNKSIZEUNIT'] = 'month' + splits = calendar_chunk_section(self.experiment_data, "A", date, chunk) + self.assertEqual(splits, 31) + splits = calendar_chunk_section(self.experiment_data, "B", date, chunk) + self.assertEqual(splits, 16) + + self.experiment_data['EXPERIMENT']['CHUNKSIZEUNIT'] = 'year' + splits = calendar_chunk_section(self.experiment_data, "A", date, chunk) + self.assertEqual(splits, 31) + splits = calendar_chunk_section(self.experiment_data, "B", date, chunk) + self.assertEqual(splits, 16) + + + + + + def test_calendar(self): + split = 12 + splitsize = 2 + expid = 'zzyy' + with tempfile.TemporaryDirectory() as temp_dir: + BasicConfig.LOCAL_ROOT_DIR = str(temp_dir) + Path(temp_dir, expid).mkdir() + for path in [f'{expid}/tmp', f'{expid}/tmp/ASLOGS', f'{expid}/tmp/ASLOGS_{expid}', f'{expid}/proj', + f'{expid}/conf']: + Path(temp_dir, path).mkdir() + with open(Path(temp_dir, f'{expid}/conf/minimal.yml'), 'w+') as minimal: + minimal.write(dedent(f'''\ + CONFIG: + RETRIALS: 0 + DEFAULT: + EXPID: {expid} + HPCARCH: test + EXPERIMENT: + # List of start dates + DATELIST: '20000101' + # List of members. + MEMBERS: fc0 + # Unit of the chunk size. Can be hour, day, month, or year. + CHUNKSIZEUNIT: day + # Size of each chunk. + CHUNKSIZE: '4' + # Size of each split + SPLITSIZE: {splitsize} + # Number of chunks of the experiment. + NUMCHUNKS: '2' + CHUNKINI: '' + # Calendar used for the experiment. Can be standard or noleap. + CALENDAR: standard + + JOBS: + A: + FILE: a + PLATFORM: test + RUNNING: chunk + SPLITS: {split} + SPLITSIZE: {splitsize} + PLATFORMS: + test: + TYPE: slurm + HOST: localhost + PROJECT: abc + QUEUE: debug + USER: me + SCRATCH_DIR: /anything/ + ADD_PROJECT_TO_HOST: False + MAX_WALLCLOCK: '00:55' + TEMP_DIR: '' + ''')) + minimal.flush() + + basic_config = FakeBasicConfig() + basic_config.read() + basic_config.LOCAL_ROOT_DIR = str(temp_dir) + + config = AutosubmitConfig(expid, basic_config=basic_config, parser_factory=YAMLParserFactory()) + config.reload(True) + parameters = config.load_parameters() + + job_list = JobList(expid, basic_config, YAMLParserFactory(), + Autosubmit._get_job_list_persistence(expid, config), config) + job_list.generate( + as_conf=config, + date_list=[datetime.datetime.strptime("20000101", "%Y%m%d")], + member_list=["fc0"], + num_chunks=2, + chunk_ini=1, + parameters=parameters, + date_format='', + default_retrials=config.get_retrials(), + default_job_type=config.get_default_job_type(), + wrapper_jobs={}, + new=True, + run_only_members=config.get_member_list(run_only=True), + show_log=True, + create=True, + ) + job_list = job_list.get_job_list() + self.assertEqual(24, len(job_list)) + + submitter = Autosubmit._get_submitter(config) + submitter.load_platforms(config) + + hpcarch = config.get_platform() + for job in job_list: + job.date_format = "" + if job.platform_name == "" or job.platform_name is None: + job.platform_name = hpcarch + job.platform = submitter.platforms[job.platform_name] + + # Check splits + # Assert general + job = job_list[0] + parameters = job.update_parameters(config, parameters) + self.assertEqual(job.splits, 12) + self.assertEqual(job.running, 'chunk') + + self.assertEqual(parameters['SPLIT'], 1) + self.assertEqual(parameters['SPLITSIZE'], splitsize) + self.assertEqual(parameters['SPLITSIZEUNIT'], 'hour') + self.assertEqual(parameters['SPLITSCALENDAR'], 'standard') + # assert parameters + next_start = "00" + for i,job in enumerate(job_list[0:12]): + parameters = job.update_parameters(config, parameters) + end_hour = str(parameters['SPLIT'] * splitsize ).zfill(2) + if end_hour == "24": + end_hour = "00" + self.assertEqual(parameters['SPLIT'], i+1) + self.assertEqual(parameters['SPLITSIZE'], splitsize) + self.assertEqual(parameters['SPLITSIZEUNIT'], 'hour') + self.assertEqual(parameters['SPLIT_START_DATE'], '20000101') + self.assertEqual(parameters['SPLIT_START_YEAR'], '2000') + self.assertEqual(parameters['SPLIT_START_MONTH'], '01') + self.assertEqual(parameters['SPLIT_START_DAY'], '01') + self.assertEqual(parameters['SPLIT_START_HOUR'], next_start) + if parameters['SPLIT'] == 12: + self.assertEqual(parameters['SPLIT_END_DATE'], '20000102') + self.assertEqual(parameters['SPLIT_END_DAY'], '02') + self.assertEqual(parameters['SPLIT_END_DATE'], '20000102') + self.assertEqual(parameters['SPLIT_END_DAY'], '02') + self.assertEqual(parameters['SPLIT_END_YEAR'], '2000') + self.assertEqual(parameters['SPLIT_END_MONTH'], '01') + self.assertEqual(parameters['SPLIT_END_HOUR'], end_hour) + else: + self.assertEqual(parameters['SPLIT_END_DATE'], '20000101') + self.assertEqual(parameters['SPLIT_END_DAY'], '01') + self.assertEqual(parameters['SPLIT_END_YEAR'], '2000') + self.assertEqual(parameters['SPLIT_END_MONTH'], '01') + self.assertEqual(parameters['SPLIT_END_HOUR'], end_hour) + next_start = parameters['SPLIT_END_HOUR'] + next_start = "00" + for i,job in enumerate(job_list[12:24]): + parameters = job.update_parameters(config, parameters) + end_hour = str(parameters['SPLIT'] * splitsize ).zfill(2) + if end_hour == "24": + end_hour = "00" + self.assertEqual(parameters['SPLIT'], i+1) + self.assertEqual(parameters['SPLITSIZE'], splitsize) + self.assertEqual(parameters['SPLITSIZEUNIT'], 'hour') + self.assertEqual(parameters['SPLIT_START_DATE'], '20000105') + self.assertEqual(parameters['SPLIT_START_YEAR'], '2000') + self.assertEqual(parameters['SPLIT_START_MONTH'], '01') + self.assertEqual(parameters['SPLIT_START_DAY'], '05') + self.assertEqual(parameters['SPLIT_START_HOUR'], next_start) + if parameters['SPLIT'] == 12: + self.assertEqual(parameters['SPLIT_END_DATE'], '20000106') + self.assertEqual(parameters['SPLIT_END_DAY'], '06') + self.assertEqual(parameters['SPLIT_END_YEAR'], '2000') + self.assertEqual(parameters['SPLIT_END_MONTH'], '01') + self.assertEqual(parameters['SPLIT_END_HOUR'], end_hour) + else: + self.assertEqual(parameters['SPLIT_END_DATE'], '20000105') + self.assertEqual(parameters['SPLIT_END_DAY'], '05') + self.assertEqual(parameters['SPLIT_END_YEAR'], '2000') + self.assertEqual(parameters['SPLIT_END_MONTH'], '01') + self.assertEqual(parameters['SPLIT_END_HOUR'], end_hour) + next_start = parameters['SPLIT_END_HOUR'] + class FakeBasicConfig: diff --git a/test/unit/test_job_list.py b/test/unit/test_job_list.py index d5ce5b0308152b60c1945a34df0cd670bf756cb7..0dc87554c3bd4560d6962845496ea91184f21ee6 100644 --- a/test/unit/test_job_list.py +++ b/test/unit/test_job_list.py @@ -1,3 +1,4 @@ +import os from unittest import TestCase from copy import copy import networkx @@ -15,6 +16,7 @@ from autosubmit.job.job_common import Type from autosubmit.job.job_list import JobList from autosubmit.job.job_list_persistence import JobListPersistencePkl from autosubmitconfigparser.config.yamlparser import YAMLParserFactory +from log.log import AutosubmitCritical class TestJobList(TestCase): @@ -66,6 +68,72 @@ class TestJobList(TestCase): def tearDown(self) -> None: shutil.rmtree(self.temp_directory) + def test_load(self): + as_conf = Mock() + as_conf.experiment_data = dict() + parser_mock = Mock() + parser_mock.read = Mock() + factory = YAMLParserFactory() + factory.create_parser = Mock(return_value=parser_mock) + date_list = ['fake-date1', 'fake-date2'] + member_list = ['fake-member1', 'fake-member2'] + num_chunks = 999 + parameters = {'fake-key': 'fake-value', + 'fake-key2': 'fake-value2'} + with tempfile.TemporaryDirectory() as temp_dir: + job_list = self.new_job_list(factory, temp_dir) + FakeBasicConfig.LOCAL_ROOT_DIR = str(temp_dir) + Path(temp_dir, self.experiment_id).mkdir() + for path in [f'{self.experiment_id}/tmp', f'{self.experiment_id}/tmp/ASLOGS', + f'{self.experiment_id}/tmp/ASLOGS_{self.experiment_id}', f'{self.experiment_id}/proj', + f'{self.experiment_id}/conf', f'{self.experiment_id}/pkl']: + Path(temp_dir, path).mkdir() + job_list.changes = Mock(return_value=['random_section', 'random_section']) + as_conf.detailed_deep_diff = Mock(return_value={}) + # as_conf.get_member_list = Mock(return_value=member_list) + # act + job_list.generate( + as_conf=as_conf, + date_list=date_list, + member_list=member_list, + num_chunks=num_chunks, + chunk_ini=1, + parameters=parameters, + date_format='H', + default_retrials=9999, + default_job_type=Type.BASH, + wrapper_jobs={}, + new=True, + create=True, + ) + job_list.save() + # Test load + job_list_to_load = self.new_job_list(factory, temp_dir) + # chmod + job_list_to_load.load(False) + self.assertEqual(job_list_to_load._job_list, job_list._job_list) + job_list_to_load.load(True) + self.assertEqual(job_list_to_load._job_list, job_list._job_list) + os.chmod(f'{temp_dir}/{self.experiment_id}/pkl/job_list_random-id.pkl', 0o000) + with self.assertRaises(AutosubmitCritical): + job_list_to_load.load(False) + job_list_to_load.load(True) + self.assertEqual(job_list_to_load._job_list, job_list._job_list) + os.chmod(f'{temp_dir}/{self.experiment_id}/pkl/job_list_random-id.pkl', 0o777) + shutil.copy(f'{temp_dir}/{self.experiment_id}/pkl/job_list_random-id.pkl',f'{temp_dir}/{self.experiment_id}/pkl/job_list_random-id_backup.pkl') + os.remove(f'{temp_dir}/{self.experiment_id}/pkl/job_list_random-id.pkl') + job_list_to_load.load(False) + self.assertEqual(job_list_to_load._job_list, job_list._job_list) + job_list_to_load.load(True) + self.assertEqual(job_list_to_load._job_list, job_list._job_list) + + + + + + + + def test_get_job_list_returns_the_right_list(self): job_list = self.job_list.get_job_list() self.assertEqual(self.job_list._job_list, job_list) @@ -248,6 +316,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=True, + create=True, ) @@ -317,6 +386,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=True, + create=True, ) job_list._job_list[0].member = "fake-member1" job_list._job_list[1].member = "fake-member2" @@ -363,6 +433,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=True, + create=True, ) job_list._job_list[0].section = "fake-section" job_list._job_list[0].date = "fake-date1" @@ -446,6 +517,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=True, + create=True, ) job_list.save() job_list2 = self.new_job_list(factory,temp_dir) @@ -461,6 +533,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=False, + create=True, ) #return False job_list2.update_from_file = Mock() @@ -526,6 +599,7 @@ class TestJobList(TestCase): default_job_type=Type.BASH, wrapper_jobs={}, new=False, + create=True, ) # assert update_genealogy called with right values # When using an 4.0 experiment, the pkl has to be recreated and act as a new one. diff --git a/test/unit/test_job_package.py b/test/unit/test_job_package.py index a5b1085cf8b31c96e54553ce558a0220f757b2b0..e12aa8eb6471d66bc70132e96c9e22e60417bd98 100644 --- a/test/unit/test_job_package.py +++ b/test/unit/test_job_package.py @@ -188,16 +188,20 @@ class TestJobPackage(TestCase): job._tmp_path = MagicMock() job._get_paramiko_template = MagicMock("false", "empty") job.update_parameters = MagicMock() + job.file = "fake-file" self.job_package._create_scripts = MagicMock() self.job_package._send_files = MagicMock() self.job_package._do_submission = MagicMock() - + configuration = MagicMock() + configuration.get_project_dir = MagicMock() + configuration.get_project_dir.return_value = "fake-proj-dir" # act - self.job_package.submit('fake-config', 'fake-params') + self.job_package.submit(configuration, 'fake-params') # assert for job in self.jobs: - job.update_parameters.assert_called_once_with('fake-config', 'fake-params') + job.update_parameters.assert_called() # Should be called once for each job, but currently it needs two calls (for additional files ) to change the code + #job.update_parameters.assert_called_once_with(configuration, 'fake-params') self.job_package._create_scripts.is_called_once_with() self.job_package._send_files.is_called_once_with() diff --git a/test/unit/test_version.py b/test/unit/test_version.py new file mode 100644 index 0000000000000000000000000000000000000000..856979767e18d1de70766cac4e20ac2885ad644d --- /dev/null +++ b/test/unit/test_version.py @@ -0,0 +1,21 @@ +import subprocess +from pathlib import Path +from unittest import TestCase + +import sys + +from autosubmit.autosubmit import Autosubmit + + +class TestAutosubmit(TestCase): + + def testAutosubmitVersion(self): + bin_path = Path(__file__, '../../../bin/autosubmit').resolve() + exit_code, out = subprocess.getstatusoutput(' '.join([sys.executable, str(bin_path), '-v'])) + self.assertEqual(0, exit_code) + self.assertEqual(Autosubmit.autosubmit_version, out.strip()) + + def testAutosubmitVersionBroken(self): + bin_path = Path(__file__, '../../../bin/autosubmit').resolve() + exit_code, _ = subprocess.getstatusoutput(' '.join([sys.executable, str(bin_path), '-abcdefg'])) + self.assertEqual(1, exit_code)