diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 0ff06a791a4320e76c399da4da6b8e28d9960d23..70380067df2e230f79935240381f17cf0aff79a0 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -53,7 +53,7 @@ import datetime import portalocker from pkg_resources import require, resource_listdir, resource_exists, resource_string from distutils.util import strtobool - +from collections import defaultdict from pyparsing import nestedExpr sys.path.insert(0, os.path.abspath('.')) @@ -190,6 +190,7 @@ class Autosubmit: help='Supply the list of chunks to filter the list of jobs. Default = "Any". ' 'LIST = "[ 19601101 [ fc0 [1 2 3 4] fc1 [1] ] 19651101 [ fc0 [16-30] ] ]"') group.add_argument('-fs', '--filter_status', type=str, + choices=('Any', 'READY', 'COMPLETED', 'WAITING', 'SUSPENDED', 'FAILED', 'UNKNOWN'), help='Select the original status to filter the list of jobs') group.add_argument('-ft', '--filter_type', type=str, @@ -695,6 +696,7 @@ class Autosubmit: pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + Log.debug("Starting from job list restored from {0} files", pkl_dir) Log.debug("Length of the jobs list: {0}", len(job_list)) @@ -937,8 +939,10 @@ class Autosubmit: return False pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive,monitor=True) Log.debug("Job list restored from {0} files", pkl_dir) + + if not isinstance(job_list, type([])): jobs = [] if filter_chunks: @@ -1184,7 +1188,7 @@ class Autosubmit: Log.info('Recovering experiment {0}'.format(expid)) pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') - job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive) + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=notransitive,monitor=True) Log.debug("Job list restored from {0} files", pkl_dir) if not as_conf.check_conf_files(): @@ -2057,14 +2061,15 @@ class Autosubmit: as_conf.get_wrapper_type(), as_conf.get_wrapper_jobs(), notransitive=notransitive) if rerun == "true": + chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) job_list.rerun(chunk_list, notransitive) + + else: job_list.remove_rerun_only_jobs(notransitive) - Log.info("\nSaving the jobs list...") job_list.save() - JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid).reset_table() @@ -2089,6 +2094,7 @@ class Autosubmit: Log.result("\nJob list created successfully") Log.user_warning("Remember to MODIFY the MODEL config files!") + return True except portalocker.AlreadyLocked: @@ -2136,7 +2142,7 @@ class Autosubmit: shutil.rmtree(project_path, ignore_errors=True) return False Log.debug("{0}", output) - + #RSYNC elif project_type == "local": local_project_path = as_conf.get_local_project_path() project_path = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, BasicConfig.LOCAL_PROJ_DIR) @@ -2220,7 +2226,7 @@ class Autosubmit: Log.debug('Chunks to change: {0}', filter_chunks) Log.debug('Status of jobs to change: {0}', filter_status) Log.debug('Sections to change: {0}', filter_section) - + wrongExpid = 0 as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) if not as_conf.check_conf_files(): Log.critical('Can not run with invalid configuration') @@ -2247,11 +2253,14 @@ class Autosubmit: member = member_json['m'] jobs_member = filter(lambda j: j.member == member, jobs_date) - for job in filter(lambda j: j.chunk is None, jobs_member): - Autosubmit.change_status(final, final_status, job) + #for job in filter(lambda j: j.chunk is None, jobs_member): + # Autosubmit.change_status(final, final_status, job) for chunk_json in member_json['cs']: chunk = int(chunk_json) + for job in filter(lambda j: j.chunk == chunk and j.synchronize is not None, jobs_date): + Autosubmit.change_status(final, final_status, job) + for job in filter(lambda j: j.chunk == chunk, jobs_member): Autosubmit.change_status(final, final_status, job) @@ -2282,6 +2291,15 @@ class Autosubmit: if lst: jobs = lst.split() + expidJoblist =defaultdict(int) + for x in lst.split(): + expidJoblist[str(x[0:4])] += 1 + + if str(expid) in expidJoblist: + wrongExpid=jobs.__len__()-expidJoblist[expid] + if wrongExpid > 0: + Log.warning("There are {0} job.name with an invalid Expid",wrongExpid) + if jobs == 'Any': for job in job_list.get_job_list(): @@ -2292,11 +2310,16 @@ class Autosubmit: Autosubmit.change_status(final, final_status, job) sys.setrecursionlimit(50000) - job_list.update_list(as_conf,False) - if save: + job_list.update_list(as_conf,False,True) + + if save and wrongExpid == 0: job_list.save() else: Log.warning("Changes NOT saved to the JobList!!!!: use -s option to save") + if wrongExpid > 0: + + Log.error("Save disabled due invalid expid, please check or/and jobs expid name") + packages = JobPackagePersistence(os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid).load() @@ -2636,7 +2659,8 @@ class Autosubmit: open(as_conf.experiment_file, 'w').write(content) @staticmethod - def load_job_list(expid, as_conf, notransitive=False): + def load_job_list(expid, as_conf, notransitive=False,monitor=False): + rerun = as_conf.get_rerun() job_list = JobList(expid, BasicConfig, ConfigParserFactory(), Autosubmit._get_job_list_persistence(expid, as_conf)) date_list = as_conf.get_date_list() @@ -2652,6 +2676,87 @@ class Autosubmit: as_conf.load_parameters(), date_format, as_conf.get_retrials(), as_conf.get_default_job_type(), as_conf.get_wrapper_type(), as_conf.get_wrapper_jobs(), new=False, notransitive=notransitive) + if rerun == "true": + + chunk_list = Autosubmit._create_json(as_conf.get_chunk_list()) + if not monitor: + job_list.rerun(chunk_list, notransitive) + else: + rerun_list = JobList(expid, BasicConfig, ConfigParserFactory(), + Autosubmit._get_job_list_persistence(expid, as_conf)) + rerun_list.generate(date_list, as_conf.get_member_list(), as_conf.get_num_chunks(), + as_conf.get_chunk_ini(), + as_conf.load_parameters(), date_format, as_conf.get_retrials(), + as_conf.get_default_job_type(), as_conf.get_wrapper_type(), + as_conf.get_wrapper_jobs(), + new=False, notransitive=notransitive) + rerun_list.rerun(chunk_list, notransitive) + job_list =Autosubmit.rerun_recovery(expid,job_list,rerun_list,as_conf) + else: + job_list.remove_rerun_only_jobs(notransitive) + + + return job_list + @staticmethod + def rerun_recovery(expid,job_list,rerun_list,as_conf): + """ + Method to check all active jobs. If COMPLETED file is found, job status will be changed to COMPLETED, + otherwise it will be set to WAITING. It will also update the jobs list. + + :param expid: identifier of the experiment to recover + :type expid: str + :param save: If true, recovery saves changes to the jobs list + :type save: bool + :param all_jobs: if True, it tries to get completed files for all jobs, not only active. + :type all_jobs: bool + :param hide: hides plot window + :type hide: bool + """ + + hpcarch = as_conf.get_platform() + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if submitter.platforms is None: + return False + platforms = submitter.platforms + + platforms_to_test = set() + for job in job_list.get_job_list(): + if job.platform_name is None: + job.platform_name = hpcarch + # noinspection PyTypeChecker + job.platform = platforms[job.platform_name.lower()] + # noinspection PyTypeChecker + platforms_to_test.add(platforms[job.platform_name.lower()]) + + + rerun_names=[] + + [rerun_names.append(job.name) for job in rerun_list.get_job_list()] + jobs_to_recover = [i for i in job_list.get_job_list() if i.name not in rerun_names] + + + Log.info("Looking for COMPLETED files") + start = datetime.datetime.now() + for job in jobs_to_recover: + if job.platform_name is None: + job.platform_name = hpcarch + # noinspection PyTypeChecker + job.platform = platforms[job.platform_name.lower()] + + if job.platform.get_completed_files(job.name, 0): + job.status = Status.COMPLETED + # Log.info("CHANGED job '{0}' status to COMPLETED".format(job.name)) + #elif job.status != Status.SUSPENDED: + # job.status = Status.WAITING + # job.fail_count = 0 + # Log.info("CHANGED job '{0}' status to WAITING".format(job.name)) + + job.platform.get_logs_files(expid, job.remote_logs) + + #end = datetime.datetime.now() + #Log.info("Time spent: '{0}'".format(end - start)) + #Log.info("Updating the jobs list") return job_list @staticmethod diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 4e19c9076b7c6c97a14813073f71a2bea9e140e4..981c7966f98145f4a0e8a76dea1cd93524db94c7 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -144,6 +144,15 @@ class AutosubmitConfig(object): """ return self._jobs_parser.get_option(section, 'WALLCLOCK', '') + def get_synchronize(self, section): + """ + Gets wallclock for the given job type + :param section: job type + :type section: str + :return: wallclock time + :rtype: str + """ + return self._jobs_parser.get_option(section, 'SYNCHRONIZE', '') def get_processors(self, section): """ Gets processors needed for the given job type diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 73b3e0969847da64b16dd700ec0a16175869179c..eb427a69d12cbb20436c92d050f7c787a540d30b 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -73,6 +73,8 @@ class Job(object): self.name = name self.split = None self.delay = None + self.synchronize = None + self._long_name = None self.long_name = name self.date_format = '' @@ -98,6 +100,7 @@ class Job(object): self.check = 'True' self.packed = False + def __getstate__(self): odict = self.__dict__ if '_platform' in odict: @@ -590,6 +593,7 @@ class Job(object): parameters['CHUNK'] = chunk parameters['SPLIT'] = self.split parameters['DELAY'] = self.delay + parameters['SYNCHRONIZE'] = self.synchronize total_chunk = int(parameters['NUMCHUNKS']) chunk_length = int(parameters['CHUNKSIZE']) chunk_unit = parameters['CHUNKSIZEUNIT'].lower() @@ -636,6 +640,7 @@ class Job(object): self.memory = as_conf.get_memory(self.section) self.memory_per_task = as_conf.get_memory_per_task(self.section) self.wallclock = as_conf.get_wallclock(self.section) + self.scratch_free_space = as_conf.get_scratch_free_space(self.section) if self.scratch_free_space == 0: self.scratch_free_space = job_platform.scratch_free_space diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index 40c147be847cf64192ba9a3d5adbdef50eaf87d6..1a0239c9a4c059be449be9371ebe30b5b730fe4e 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -159,6 +159,7 @@ class DicJobs: :type delay: int """ # Temporally creation for unified jobs in case of synchronize + if synchronize is not None: tmp_dic = dict() count = 0 @@ -299,8 +300,11 @@ class DicJobs: job = Job(name, jobs_data[name][1], jobs_data[name][2], priority) job.local_logs = (jobs_data[name][8], jobs_data[name][9]) job.remote_logs = (jobs_data[name][10], jobs_data[name][11]) + else: job = Job(name, 0, Status.WAITING, priority) + + job.section = section job.date = date job.member = member @@ -334,11 +338,17 @@ class DicJobs: job.memory = self.get_option(section, "MEMORY", '') job.memory_per_task = self.get_option(section, "MEMORY_PER_TASK", '') job.wallclock = self.get_option(section, "WALLCLOCK", '') + + job.retrials = int(self.get_option(section, 'RETRIALS', -1)) + if job.retrials == -1: job.retrials = None job.notify_on = [x.upper() for x in self.get_option(section, "NOTIFY_ON", '').split(' ')] + job.synchronize = self.get_option(section, "SYNCHRONIZE", None) + self._jobs_list.get_job_list().append(job) + return job def get_option(self, section, option, default): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 11d14a9f8fabacc1a08a09c53151e60cd4d22b73..933b8de0fdc05d10322fcc3240fb2825733d3f66 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -97,7 +97,7 @@ class JobList: self._graph = value def generate(self, date_list, member_list, num_chunks, chunk_ini, parameters, date_format, default_retrials, - default_job_type, wrapper_type=None, wrapper_jobs=None, new=True, notransitive=False): + default_job_type, wrapper_type=None, wrapper_jobs=None,new=True, notransitive=False): """ Creates all jobs needed for the current workflow @@ -138,7 +138,6 @@ class JobList: if not new: jobs_data = {str(row[0]): row for row in self.load()} self._create_jobs(dic_jobs, jobs_parser, priority, default_job_type, jobs_data) - Log.info("Adding dependencies...") self._add_dependencies(date_list, member_list, chunk_list, dic_jobs, jobs_parser, self.graph) @@ -777,7 +776,7 @@ class JobList: def parameters(self, value): self._parameters = value - def update_list(self, as_conf,store_change=True): + def update_list(self, as_conf,store_change=True,fromSetStatus=False): """ Updates job list, resetting failed jobs and changing to READY all WAITING jobs with all parents COMPLETED @@ -814,14 +813,30 @@ class JobList: Log.debug("Resetting job: {0} status to: WAITING for parents completion...".format(job.name)) # if waiting jobs has all parents completed change its State to READY + #RERUN FIX + + for job in self.get_completed(): + + if job.synchronize is not None: #and job in self.get_active(): + Log.debug('Updating SYNC jobs') + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) != len(job.parents): + job.status = Status.WAITING + save = True + Log.debug("Resetting sync job: {0} status to: WAITING for parents completion...".format(job.name)) + Log.debug('Update finished') + + Log.debug('Updating WAITING jobs') for job in self.get_waiting(): - tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] - if len(tmp) == len(job.parents): - job.status = Status.READY - save = True - Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) + if not fromSetStatus: + tmp = [parent for parent in job.parents if parent.status == Status.COMPLETED] + if len(tmp) == len(job.parents): + job.status = Status.READY + save = True + Log.debug("Resetting job: {0} status to: READY (all parents completed)...".format(job.name)) Log.debug('Update finished') + return save def update_genealogy(self, new=True, notransitive=False): @@ -896,7 +911,7 @@ class JobList: self._job_list.remove(job) - def rerun(self, chunk_list, notransitive=False): + def rerun(self, chunk_list, notransitive=False,monitor=False): """ Updates job list to rerun the jobs specified by chunk_list @@ -931,8 +946,7 @@ class JobList: for c in m['cs']: Log.debug("Chunk: " + c) chunk = int(c) - for job in [i for i in self._job_list if i.date == date and i.member == member and - i.chunk == chunk]: + for job in [i for i in self._job_list if i.date == date and i.member == member and (i.chunk == chunk ) ]: if not job.rerun_only or chunk != previous_chunk + 1: job.status = Status.WAITING @@ -943,7 +957,7 @@ class JobList: continue for key in dependencies_keys: - skip, (chunk, member, date) = JobList._calculate_dependency_metadata(chunk, member, date, + skip, (current_chunk, current_member, current_date) = JobList._calculate_dependency_metadata(chunk, member, date, dependencies[key]) if skip: continue @@ -954,13 +968,17 @@ class JobList: parent.status = Status.WAITING Log.debug("Parent: " + parent.name) - for job in [j for j in self._job_list if j.status == Status.COMPLETED]: - self._remove_job(job) for job in [j for j in self._job_list if j.status == Status.COMPLETED]: - self._remove_job(job) + if job.synchronize is None: + self._remove_job(job) self.update_genealogy(notransitive=notransitive) + for job in [j for j in self._job_list if j.synchronize !=None]: + if job.status == Status.COMPLETED: + job.status = Status.WAITING + else: + self._remove_job(job) def _get_jobs_parser(self): jobs_parser = self._parser_factory.create_parser() diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index cef7957cdfa91f16c0665eea44745a5cbc3a64ac..d6a7198bbc0ddcbe965080bd4b8c149220c0b386 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -132,6 +132,7 @@ class Platform(object): parameters['{0}SCRATCH_DIR'.format(prefix)] = self.scratch parameters['{0}TEMP_DIR'.format(prefix)] = self.temp_dir parameters['{0}ROOTDIR'.format(prefix)] = self.root_dir + parameters['{0}LOGDIR'.format(prefix)] = self.get_files_path() def send_file(self, filename): diff --git a/test/unit/test_dic_jobs.py b/test/unit/test_dic_jobs.py index 6a595479f063c3c49f7f1594dbb498a85e7259ae..1cba4d1ad382b9b48b854ce51c68f3629df9684b 100644 --- a/test/unit/test_dic_jobs.py +++ b/test/unit/test_dic_jobs.py @@ -295,11 +295,12 @@ class TestDicJobs(TestCase): memory = memory_per_task = 444 wallclock = 555 notify_on = 'COMPLETED FAILED' + synchronize = None self.parser_mock.has_option = Mock(side_effect=[True, True, True, True, True, True, True, True, True, True, True, - True, True, True, True, False, True]) + True, True, True, True, False, True, False]) self.parser_mock.get = Mock(side_effect=[frequency, delay, 'True', 'True', 'bash', platform_name, filename, queue, 'True', processors, threads, tasks, memory, memory_per_task, - wallclock, notify_on]) + wallclock, notify_on,synchronize]) job_list_mock = Mock() job_list_mock.append = Mock() self.dictionary._jobs_list.get_job_list = Mock(return_value=job_list_mock)