From ac73ced570b6a6200cddee9c599ef147b621c022 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 19 Jan 2021 09:44:48 +0100 Subject: [PATCH 01/12] Fixed log message in create --- autosubmit/autosubmit.py | 15 ++++++++++----- autosubmit/config/config_common.py | 7 ++++++- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index a677f8c19..f02fa3416 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -3533,8 +3533,7 @@ class Autosubmit: Log.info( "Preparing .lock file to avoid multiple instances with same expid.") - as_conf = AutosubmitConfig( - expid, BasicConfig, ConfigParserFactory()) + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.check_conf_files(False) project_type = as_conf.get_project_type() @@ -3665,7 +3664,6 @@ class Autosubmit: else: Log.info(job_list.print_with_status()) Log.status(job_list.print_with_status()) - return True # catching Exception except (KeyboardInterrupt) as e: @@ -3674,12 +3672,19 @@ class Autosubmit: fh.flush() os.fsync(fh.fileno()) raise AutosubmitCritical("Stopped by user input", 7010) + except (BaseException) as e: + raise except portalocker.AlreadyLocked: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) + except AutosubmitError as e: + if e.trace == "": + e.trace = traceback.format_exc() + raise AutosubmitError(e.message, e.code,e.trace) except AutosubmitCritical as e: - Log.debug(traceback.format_exc()) - raise AutosubmitCritical(e.message, e.code) + if e.trace == "": + e.trace = traceback.format_exc() + raise AutosubmitCritical(e.message, e.code,e.trace) @staticmethod def _copy_code(as_conf, expid, project_type, force): diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index f1b70a334..3fea2b69c 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -455,7 +455,12 @@ class AutosubmitConfig(object): """ Log.info('\nChecking configuration files...') self.ignore_file_path = check_file - self.reload() + try: + self.reload() + except (AutosubmitCritical,AutosubmitError) as e: + raise + except BaseException as e: + raise # Annotates all errors found in the configuration files in dictionaries self.warn_config and self.wrong_config. self.check_expdef_conf() self.check_platforms_conf() -- GitLab From e3a598566b436237587475480089ea1fb583aedd Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 20 Jan 2021 13:59:02 +0100 Subject: [PATCH 02/12] wallclock fix --- autosubmit/job/job_packager.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 886e98df8..28c40f57e 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -767,13 +767,18 @@ class JobPackagerHorizontal(object): if wrappable and child not in next_section_list: next_section_list.append(child) - next_section_list.sort( - key=lambda job: self.sort_by_expression(job.name)) + next_section_list.sort(key=lambda job: self.sort_by_expression(job.name)) self.job_list = next_section_list package_jobs = self.build_horizontal_package(horizontal_vertical) if package_jobs: - # if not self.add_sectioncombo_processors(self.total_processors) and horizontal_vertical: + sections_aux = set() + wallclock = package_jobs[0].wallclock + for job in package_jobs: + if job.section not in sections_aux: + sections_aux.add(job.section) + if job.wallclock > wallclock: + wallclock = job.wallclock if self._current_processors > max_procs: return packages if max_wallclock: -- GitLab From 8990fe58f3257ae6c1ab01e0f1391bacd02a7c51 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 22 Jan 2021 11:09:20 +0100 Subject: [PATCH 03/12] First stage of optimization --- autosubmit/autosubmit.py | 10 +- autosubmit/config/config_common.py | 4 +- autosubmit/database/db_jobdata.py | 2 +- autosubmit/job/job.py | 9 +- autosubmit/job/job_grouping.py | 6 +- autosubmit/job/job_list.py | 31 ++-- autosubmit/job/job_packager.py | 8 +- autosubmit/job/job_packages.py | 146 +++++++++++++----- autosubmit/monitor/diagram.py | 4 +- autosubmit/monitor/monitor.py | 2 +- .../platforms/wrappers/wrapper_builder.py | 6 +- test/integration/test_db_manager.py | 2 +- 12 files changed, 149 insertions(+), 81 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index f02fa3416..06bd1efa6 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1535,8 +1535,7 @@ class Autosubmit: "Wrapper is in Unknown Status couldn't get wrapper parameters", 7050) # New status will be saved and inner_jobs will be checked. - wrapper_job.check_status( - wrapper_job.new_status) + wrapper_job.check_status(wrapper_job.new_status) # Erase from packages if the wrapper failed to be queued ( Hold Admin bug ) if wrapper_job.status == Status.WAITING: for inner_job in wrapper_job.job_list: @@ -1575,7 +1574,7 @@ class Autosubmit: list_jobid += str(job_id) + ',' list_prevStatus.append(prev_status) completed_joblist.append(job) - else: # If they're not from slurm platform check one-by-one + else: # If they're not from slurm platform check one-by-one TODO: Implement ecwmf future platform and mnX, abstract this part platform.check_job(job) if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): # Keeping track of changes @@ -1597,8 +1596,7 @@ class Autosubmit: for platform_jobs in slurm: platform = platform_jobs[0] jobs_to_check = platform_jobs[1] - platform.check_Alljobs( - platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) + platform.check_Alljobs(platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) for j_Indx in xrange(0, len(platform_jobs[3])): prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] @@ -4552,7 +4550,7 @@ class Autosubmit: for element in out: if element.find("-") != -1: numbers = element.split("-") - for count in range(int(numbers[0]), int(numbers[1]) + 1): + for count in xrange(int(numbers[0]), int(numbers[1]) + 1): data.append(str(count)) else: data.append(element) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index 3fea2b69c..ac33cf3cd 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -1109,7 +1109,7 @@ class AutosubmitConfig(object): for split_in in split: if split_in.find("-") != -1: numbers = split_in.split("-") - for count in range(int(numbers[0]), int(numbers[1]) + 1): + for count in xrange(int(numbers[0]), int(numbers[1]) + 1): date_list.append(parse_date( string_date + str(count).zfill(len(numbers[0])))) else: @@ -1187,7 +1187,7 @@ class AutosubmitConfig(object): for split_in in split: if split_in.find("-") != -1: numbers = split_in.split("-") - for count in range(int(numbers[0]), int(numbers[1]) + 1): + for count in xrange(int(numbers[0]), int(numbers[1]) + 1): member_list.append( string_member + str(count).zfill(len(numbers[0]))) else: diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index fff46c106..61840e351 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -1358,7 +1358,7 @@ class JobDataStructure(MainDataBase): # Needs more guarantees but so far it works. if len(jobs_in_package) > 0 and len(wrapper_jobs) > 0 and len(jobs_in_package) == len(wrapper_jobs) and no_process == False: # It is 1 to 1 - for i in range(0, len(jobs_in_package)): + for i in xrange(0, len(jobs_in_package)): name_to_current_job[jobs_in_package[i] .job_name].energy = wrapper_jobs[i].energy name_to_current_job[jobs_in_package[i] diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 14cdd4d2d..7198b9c33 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -45,7 +45,7 @@ from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter from log.log import Log, AutosubmitCritical, AutosubmitError Log.get_logger("Autosubmit") - +# A wrapper for encapsulate threads , TODO: Python 3+ to be replaced by the < from concurrent.futures > def threaded(fn): def wrapper(*args, **kwargs): thread = Thread(target=fn, args=args, kwargs=kwargs) @@ -329,7 +329,7 @@ class Job(object): num_parents = 1 if isinstance(parent, list): num_parents = len(parent) - for i in range(num_parents): + for i in xrange(num_parents): new_parent = parent[i] if isinstance(parent, list) else parent self._parents.add(new_parent) new_parent.__add_child(self) @@ -715,7 +715,6 @@ class Job(object): Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format( self.name), 6009) self.status = default_status - def update_parameters(self, as_conf, parameters, default_parameters={'d': '%d%', 'd_': '%d_%', 'Y': '%Y%', 'Y_': '%Y_%', 'M': '%M%', 'M_': '%M_%', 'm': '%m%', 'm_': '%m_%'}): @@ -1012,13 +1011,13 @@ class Job(object): self.undefined_variables = set(variables) - set(parameters) if show_logs: Log.printlog("The following set of variables to be substituted in template script is not part of parameters set, and will be replaced by a blank value: {0}".format( - self.undefined_variables), 3000) + self.undefined_variables), 6013) # Check which variables in the proj.conf are not being used in the templates if show_logs: if not set(variables).issuperset(set(parameters)): Log.printlog("The following set of variables are not being used in the templates: {0}".format( - str(set(parameters) - set(variables))), 3000) + str(set(parameters) - set(variables))), 6013) return out def write_submit_time(self): diff --git a/autosubmit/job/job_grouping.py b/autosubmit/job/job_grouping.py index a49464088..90f534f21 100644 --- a/autosubmit/job/job_grouping.py +++ b/autosubmit/job/job_grouping.py @@ -127,7 +127,7 @@ class JobGrouping(object): for chunk in member_chunks[member_count + 1]: if chunk.find("-") != -1: numbers = chunk.split("-") - for count in range(int(numbers[0]), int(numbers[1]) + 1): + for count in xrange(int(numbers[0]), int(numbers[1]) + 1): chunks.append(count) else: chunks.append(int(chunk)) @@ -163,7 +163,7 @@ class JobGrouping(object): return Status.UNKNOWN def _create_groups(self, jobs_group_dict, blacklist=list()): - for i in reversed(range(len(self.jobs))): + for i in reversed(xrange(len(self.jobs))): job = self.jobs[i] groups = [] @@ -251,7 +251,7 @@ class JobGrouping(object): self._fix_splits_automatic_grouping(split_groups, split_groups_status, jobs_group_dict) # check if remaining jobs can be grouped - for i in reversed(range(len(self.jobs))): + for i in reversed(xrange(len(self.jobs))): job = self.jobs[i] for group, status in self.group_status_dict.items(): if group in job.name and status == job.status: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index bcf9675a1..bb4b20814 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -152,7 +152,7 @@ class JobList(object): self._date_list = date_list self._member_list = member_list - chunk_list = range(chunk_ini, num_chunks + 1) + chunk_list = xrange(chunk_ini, num_chunks + 1) self._chunk_list = chunk_list jobs_parser = self._get_jobs_parser() @@ -220,7 +220,7 @@ class JobList(object): num_jobs = 1 if isinstance(job, list): num_jobs = len(job) - for i in range(num_jobs): + for i in xrange(num_jobs): _job = job[i] if num_jobs > 1 else job JobList._manage_job_dependencies(dic_jobs, _job, date_list, member_list, chunk_list, dependencies_keys, dependencies, graph) @@ -266,23 +266,23 @@ class JobList(object): for section_chunk in sections_chunks: info = section_chunk.split('*') if info[0] in key: - for relation in range(1, len(info)): + for relation in xrange(1, len(info)): auxiliar_relation_list = [] for location in info[relation].split('-'): auxiliar_chunk_list = [] location = location.strip('[').strip(']') if ':' in location: if len(location) == 3: - for chunk_number in range(int(location[0]), int(location[2]) + 1): + for chunk_number in xrange(int(location[0]), int(location[2]) + 1): auxiliar_chunk_list.append( chunk_number) elif len(location) == 2: if ':' == location[0]: - for chunk_number in range(0, int(location[1]) + 1): + for chunk_number in xrange(0, int(location[1]) + 1): auxiliar_chunk_list.append( chunk_number) elif ':' == location[1]: - for chunk_number in range(int(location[0]) + 1, len(dic_jobs._chunk_list) - 1): + for chunk_number in xrange(int(location[0]) + 1, len(dic_jobs._chunk_list) - 1): auxiliar_chunk_list.append( chunk_number) elif ',' in location: @@ -316,7 +316,7 @@ class JobList(object): numbers = str_split.split(":") # change this to be checked in job_common.py max_splits = min(int(numbers[1]), max_splits) - for count in range(int(numbers[0]), max_splits + 1): + for count in xrange(int(numbers[0]), max_splits + 1): splits.append(int(str(count).zfill(len(numbers[0])))) else: if int(str_split) <= max_splits: @@ -430,7 +430,7 @@ class JobList(object): max_distance = (chunk_list.index(chunk) + 1) % job.frequency if max_distance == 0: max_distance = job.frequency - for distance in range(1, max_distance): + for distance in xrange(1, max_distance): for parent in dic_jobs.get_jobs(section_name, date, member, chunk - distance): job.add_parent(parent) JobList._add_edge(graph, job, parent) @@ -439,7 +439,7 @@ class JobList(object): max_distance = (member_index + 1) % job.frequency if max_distance == 0: max_distance = job.frequency - for distance in range(1, max_distance, 1): + for distance in xrange(1, max_distance, 1): for parent in dic_jobs.get_jobs(section_name, date, member_list[member_index - distance], chunk): job.add_parent(parent) @@ -449,7 +449,7 @@ class JobList(object): max_distance = (date_index + 1) % job.frequency if max_distance == 0: max_distance = job.frequency - for distance in range(1, max_distance, 1): + for distance in xrange(1, max_distance, 1): for parent in dic_jobs.get_jobs(section_name, date_list[date_index - distance], member, chunk): job.add_parent(parent) @@ -460,7 +460,7 @@ class JobList(object): num_parents = 1 if isinstance(parents, list): num_parents = len(parents) - for i in range(num_parents): + for i in xrange(num_parents): parent = parents[i] if isinstance(parents, list) else parents graph.add_edge(parent.name, job.name) @@ -523,7 +523,7 @@ class JobList(object): jobs_to_sort = [previous_job] previous_section_running_type = None # Index starts at 1 because 0 has been taken in a previous step - for index in range(1, len(sorted_jobs_list) + 1): + for index in xrange(1, len(sorted_jobs_list) + 1): # If not last item if index < len(sorted_jobs_list): job = sorted_jobs_list[index] @@ -542,7 +542,7 @@ class JobList(object): if len(k.name.split('_')) == 5 else num_chunks + 1))) # Bringing back original job if identified - for idx in range(0, len(jobs_to_sort)): + for idx in xrange(0, len(jobs_to_sort)): # Test if it is a fake job if jobs_to_sort[idx] in fake_original_job_map: fake_job = jobs_to_sort[idx] @@ -1423,8 +1423,7 @@ class JobList(object): 'Template {0} will be checked in running time'.format(job.section)) continue elif job.check.lower() != 'true': - Log.info( - 'Template {0} will not be checked'.format(job.section)) + Log.info('Template {0} will not be checked'.format(job.section)) continue else: if job.section in self.sections_checked: @@ -1627,7 +1626,7 @@ class JobList(object): if job.name not in visited: visited.append(job.name) prefix = "" - for i in range(level): + for i in xrange(level): prefix += "| " # Prefix + Job Name result = "\n" + prefix + \ diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 28c40f57e..53bef7b55 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -450,10 +450,10 @@ class JobPackager(object): if new_package is not None: current_package += new_package - for i in range(len(current_package)): + for i in xrange(len(current_package)): total_wallclock = sum_str_hours(total_wallclock, wallclock) if len(current_package) > 1: - for level in range(1,len(current_package)): + for level in xrange(1,len(current_package)): for job in current_package[level]: job.level=level return JobPackageHorizontalVertical(current_package, max_procs, total_wallclock, @@ -477,7 +477,7 @@ class JobPackager(object): for job in current_package[-1]: total_wallclock = sum_str_hours(total_wallclock, job.wallclock) if len(current_package) > 1: - for level in range(1,len(current_package)): + for level in xrange(1,len(current_package)): for job in current_package[level]: job.level=level return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock, @@ -653,7 +653,7 @@ class JobPackagerVerticalMixed(JobPackagerVertical): # Unnecessary assignment sorted_jobs = self.sorted_jobs - for index in range(self.index, len(sorted_jobs)): + for index in xrange(self.index, len(sorted_jobs)): child = sorted_jobs[index] if self._is_wrappable(child): self.index = index+1 diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index a2f49c060..d6a4ead86 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -33,8 +33,18 @@ Log.get_logger("Autosubmit") from autosubmit.job.job_exceptions import WrongTemplateException from autosubmit.job.job import Job from bscearth.utils.date import sum_str_hours - - +from threading import Thread,Lock +import multiprocessing +import tarfile + +lock = Lock() +def threaded(fn): + def wrapper(*args, **kwargs): + thread = Thread(target=fn, args=args, kwargs=kwargs) + thread.name = "data_processing" + thread.start() + return thread + return wrapper class JobPackageBase(object): """ Class to manage the package of jobs to be submitted by autosubmit @@ -77,6 +87,29 @@ class JobPackageBase(object): """ return self._platform + @threaded + def check_scripts(self,jobs,configuration, parameters,only_generate,hold): + for job in jobs: + if job.check.lower() == Job.CHECK_ON_SUBMISSION.lower(): + if only_generate: + exit = True + break + if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + if configuration.get_project_type().lower() != "none": + raise AutosubmitCritical( + "Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format( + job.name), 7014) + if not job.check_script(configuration, parameters, show_logs=job.check_warnings): + Log.warning("Script {0} check failed", job.name) + Log.warning("On submission script has some empty variables") + else: + Log.result("Script {0} OK", job.name) + lock.acquire() + job.update_parameters(configuration, parameters) + lock.release() + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) + def submit(self, configuration, parameters,only_generate=False,hold=False): """ :para configuration: Autosubmit basic configuration \n @@ -87,34 +120,57 @@ class JobPackageBase(object): :type only_generate: Boolean """ exit=False - for job in self.jobs: - try: - if job.check.lower() == Job.CHECK_ON_SUBMISSION.lower(): - if only_generate: - exit=True - break - if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - if configuration.get_project_type().lower() != "none": - raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) - if not job.check_script(configuration, parameters,show_logs=job.check_warnings): - Log.warning("Script {0} check failed",job.name) - Log.warning("On submission script has some empty variables") - else: - Log.result("Script {0} OK",job.name) - job.update_parameters(configuration, parameters) - # looking for directives on jobs - self._custom_directives = self._custom_directives | set(job.custom_directives) - - except BaseException as e: #should be IOERROR - raise AutosubmitCritical( - "Error on {1}, template [{0}] still does not exists in running time(check=on_submission actived) ".format(job.file,job.name), 7014) - + chunksize = int((len(self.jobs) + multiprocessing.cpu_count() - 1) / multiprocessing.cpu_count()); + try: + if chunksize < 2: + for job in self.jobs: + if job.check.lower() == Job.CHECK_ON_SUBMISSION.lower(): + if only_generate: + exit=True + break + if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + if configuration.get_project_type().lower() != "none": + raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) + if not job.check_script(configuration, parameters,show_logs=job.check_warnings): + Log.warning("Script {0} check failed",job.name) + Log.warning("On submission script has some empty variables") + else: + Log.result("Script {0} OK",job.name) + job.update_parameters(configuration, parameters) + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) + else: + Lhandle = list() + for i in xrange(0, len(self.jobs), chunksize): + Lhandle.append(self.check_scripts(self.jobs[i:i + chunksize], configuration, parameters, only_generate, hold)) + for dataThread in Lhandle: + dataThread.join() + except BaseException as e: #should be IOERROR + raise AutosubmitCritical( + "Error on {1}, template [{0}] still does not exists in running time(check=on_submission actived) ".format(job.file,job.name), 7014) + Log.debug("Creating Scripts") if only_generate: if not exit: - self._create_scripts(configuration) + if len(self.jobs) < 2: + self._create_scripts(configuration) + else: + Lhandle = list() + for i in xrange(0, len(self.jobs), chunksize): + Lhandle.append(self._create_scripts_threaded(self.jobs[i:i + chunksize], configuration)) + for dataThread in Lhandle: + dataThread.join() else: - self._create_scripts(configuration) + if len(self.jobs) < 2: + self._create_scripts(configuration) + else: + Lhandle = list() + for i in xrange(0, len(self.jobs), chunksize): + Lhandle.append(self._create_scripts_threaded(self.jobs[i:i + chunksize],configuration)) + for dataThread in Lhandle: + dataThread.join() + Log.debug("Sending Files") self._send_files() + Log.debug("Submitting") self._do_submission(hold=hold) @@ -212,7 +268,7 @@ class JobPackageArray(JobPackageBase): def _create_scripts(self, configuration): timestamp = str(int(time.time())) - for i in range(0, len(self.jobs)): + for i in xrange(0, len(self.jobs)): self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) self._job_inputs[self.jobs[i].name] = self._create_i_input(timestamp, i) self.jobs[i].remote_logs = (timestamp + ".{0}.out".format(i), timestamp + ".{0}.err".format(i)) @@ -250,7 +306,7 @@ class JobPackageArray(JobPackageBase): if package_id is None: return - for i in range(0, len(self.jobs)): + for i in xrange(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) + '[{0}]'.format(i) self.jobs[i].status = Status.SUBMITTED @@ -320,13 +376,19 @@ class JobPackageThread(JobPackageBase): def set_job_dependency(self, dependency): self._job_dependency = dependency + @threaded + def _create_scripts_threaded(self,jobs,configuration): + for i in xrange(0, len(jobs)): + self._job_scripts[jobs[i].name] = jobs[i].create_script(configuration) + self.jobs[i].remote_logs = (self._job_scripts[jobs[i].name] + ".out".format(i), + self._job_scripts[jobs[i].name] + ".err".format(i) + ) + self._common_script = self._create_common_script() def _create_scripts(self, configuration): - - for i in range(0, len(self.jobs)): + for i in xrange(0, len(self.jobs)): self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) - self.jobs[i].remote_logs = ( - self._job_scripts[self.jobs[i].name] + ".out".format(i), + self.jobs[i].remote_logs = (self._job_scripts[self.jobs[i].name] + ".out".format(i), self._job_scripts[self.jobs[i].name] + ".err".format(i) ) self._common_script = self._create_common_script() @@ -340,15 +402,25 @@ class JobPackageThread(JobPackageBase): def _send_files(self): self.platform.check_remote_log_dir() + compress_type = "w" + output_filepath = '{0}.tar'.format("wrapper_scripts") if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() for job in self.jobs: filenames += " " + self.platform.remote_log_dir + "/" + job.name + ".cmd" self.platform.remove_multiple_files(filenames) - for job in self.jobs: - self.platform.send_file(self._job_scripts[job.name], check=False) + with tarfile.open(os.path.join(self._tmp_path, output_filepath), compress_type) as tar: + for job in self.jobs: + tar.addfile(tarfile.TarInfo(self._job_scripts[job.name]),open(os.path.join(self._tmp_path,self._job_scripts[job.name]))) + output_path = os.path.join(self._tmp_path, output_filepath) + os.chmod(output_path, 0o755) + tar.close() + self.platform.send_file(output_path, check=False) + self.platform.send_command("cd {0}; tar -xvf {1}".format(self.platform.get_files_path(),output_filepath)) + #self.platform.send_file(self._job_scripts[job.name], check=False) self.platform.send_file(self._common_script) + def _do_submission(self, job_scripts=None, hold=False): if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() @@ -368,7 +440,7 @@ class JobPackageThread(JobPackageBase): if package_id is None: return - for i in range(0, len(self.jobs) ): + for i in xrange(0, len(self.jobs) ): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED @@ -419,7 +491,7 @@ class JobPackageThreadWrapped(JobPackageThread): return self._platform.project def _create_scripts(self, configuration): - for i in range(0, len(self.jobs)): + for i in xrange(0, len(self.jobs)): self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) self.jobs[i].remote_logs = ( self._job_scripts[self.jobs[i].name] + ".out".format(i), @@ -451,7 +523,7 @@ class JobPackageThreadWrapped(JobPackageThread): if package_id is None: raise Exception('Submission failed') - for i in range(0, len(self.jobs)): + for i in xrange(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED diff --git a/autosubmit/monitor/diagram.py b/autosubmit/monitor/diagram.py index e4484c1d9..2b758859e 100644 --- a/autosubmit/monitor/diagram.py +++ b/autosubmit/monitor/diagram.py @@ -54,7 +54,7 @@ suppose it is noon, if you supply -fp 5 the query will consider changes starting exp_stats = ExperimentStats(jobs_list, period_ini, period_fi) grid_spec = gridspec.GridSpec(RATIO * num_plots + 2, 1) - for plot in range(1, num_plots + 1): + for plot in xrange(1, num_plots + 1): # Calculating jobs inside the given plot l1 = int((plot - 1) * MAX_JOBS_PER_PLOT) l2 = int(plot * MAX_JOBS_PER_PLOT) @@ -104,7 +104,7 @@ def create_csv_stats(exp_stats, jobs_list, output_file): output_file = output_file.replace('pdf', 'csv') with open(output_file, 'wb') as file: file.write("Job,Started,Ended,Queuing time (hours),Running time (hours)\n") - for i in range(len(jobs_list)): + for i in xrange(len(jobs_list)): file.write("{0},{1},{2},{3},{4}\n".format(job_names[i], start_times[i], end_times[i], queuing_times[i], running_times[i])) def build_legends(plot, rects, experiment_stats, general_stats): diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 64f48bd9e..cebb30848 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -163,7 +163,7 @@ class Monitor: if len(subgraph.get_node(group[0])) == 0: subgraph.add_node(previous_node) - for i in range(1, len(group)): + for i in xrange(1, len(group)): node = exp.get_node(group[i])[0] if len(subgraph.get_node(group[i])) == 0: subgraph.add_node(node) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 846f79304..dbb7c8cbf 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -827,14 +827,14 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): total_threads = float(len(self.job_scripts)) n_threads = float(self.threads) core = [] - for thread in range(int(n_threads)): + for thread in xrange(int(n_threads)): core.append(0x0) core[0] = 0x1 horizontal_wrapper_size=int(total_threads) srun_mask_values = [] - for job_id in range(horizontal_wrapper_size): - for thread in range(1, int(n_threads)): + for job_id in xrange(horizontal_wrapper_size): + for thread in xrange(1, int(n_threads)): core[thread] = core[thread-1]*2 job_mask = 0x0 for thr_mask in core: diff --git a/test/integration/test_db_manager.py b/test/integration/test_db_manager.py index 558dca17e..64bf413e3 100644 --- a/test/integration/test_db_manager.py +++ b/test/integration/test_db_manager.py @@ -29,7 +29,7 @@ class TestDbManager(TestCase): table_name = 'test' columns = ['field1', 'field2'] self.db_manager.create_table(table_name, columns) - for i in xrange(3): + for i in range(3): self.db_manager.insert(table_name, columns, ['dummy', 'dummy']) count = self.db_manager.count(table_name) self.assertEquals(3, count) -- GitLab From a6123119cd92dea3daf9bd83b3f5effb27d9aa1b Mon Sep 17 00:00:00 2001 From: dbeltran Date: Mon, 25 Jan 2021 13:20:45 +0100 Subject: [PATCH 04/12] more threads --- autosubmit/job/job_list.py | 25 ++++++++++++++++++++++++- autosubmit/job/job_packages.py | 17 +++++++++++++---- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index bb4b20814..f5635359d 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -38,8 +38,16 @@ import autosubmit.database.db_structure as DbStructure from networkx import DiGraph from autosubmit.job.job_utils import transitive_reduction from log.log import AutosubmitCritical, AutosubmitError, Log +from threading import Thread,Lock +import multiprocessing # Log.get_logger("Log.Autosubmit") - +def threaded(fn): + def wrapper(*args, **kwargs): + thread = Thread(target=fn, args=args, kwargs=kwargs) + thread.name = "data_processing" + thread.start() + return thread + return wrapper class JobList(object): """ @@ -1404,6 +1412,21 @@ class JobList(object): for job in self._job_list: if not job.has_parents() and new: job.status = Status.READY + @threaded + def check_scripts_threaded(self, as_conf): + """ + When we have created the scripts, all parameters should have been substituted. + %PARAMETER% handlers not allowed (thread test) + + :param as_conf: experiment configuration + :type as_conf: AutosubmitConfig + """ + out = True + for job in self._job_list: + show_logs = job.check_warnings + if not job.check_script(as_conf, self.parameters, show_logs): + out = False + return out def check_scripts(self, as_conf): """ diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index d6a4ead86..ac1cf3a47 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -120,9 +120,18 @@ class JobPackageBase(object): :type only_generate: Boolean """ exit=False - chunksize = int((len(self.jobs) + multiprocessing.cpu_count() - 1) / multiprocessing.cpu_count()); + thread_number = multiprocessing.cpu_count() + if len(self.jobs) > 2500: + thread_number = thread_number * 2 + elif len(self.jobs) > 5000: + thread_number = thread_number * 3 + elif len(self.jobs) > 7500: + thread_number = thread_number * 4 + elif len(self.jobs) > 10000: + thread_number = thread_number * 5 + chunksize = int((len(self.jobs) + thread_number - 1) / thread_number); try: - if chunksize < 2: + if len(self.jobs) < thread_number: for job in self.jobs: if job.check.lower() == Job.CHECK_ON_SUBMISSION.lower(): if only_generate: @@ -151,7 +160,7 @@ class JobPackageBase(object): Log.debug("Creating Scripts") if only_generate: if not exit: - if len(self.jobs) < 2: + if len(self.jobs) < thread_number: self._create_scripts(configuration) else: Lhandle = list() @@ -160,7 +169,7 @@ class JobPackageBase(object): for dataThread in Lhandle: dataThread.join() else: - if len(self.jobs) < 2: + if len(self.jobs) < thread_number: self._create_scripts(configuration) else: Lhandle = list() -- GitLab From 4b5b68bb6c5a478eaf30f935d6f22342103a130c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 26 Jan 2021 13:04:16 +0100 Subject: [PATCH 05/12] fix horizontal --- autosubmit/job/job_packages.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index ac1cf3a47..a5b95c9e3 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -109,7 +109,14 @@ class JobPackageBase(object): lock.release() # looking for directives on jobs self._custom_directives = self._custom_directives | set(job.custom_directives) - + @threaded + def _create_scripts_threaded(self,jobs,configuration): + for i in xrange(0, len(jobs)): + self._job_scripts[jobs[i].name] = jobs[i].create_script(configuration) + self.jobs[i].remote_logs = (self._job_scripts[jobs[i].name] + ".out".format(i), + self._job_scripts[jobs[i].name] + ".err".format(i) + ) + self._common_script = self._create_common_script() def submit(self, configuration, parameters,only_generate=False,hold=False): """ :para configuration: Autosubmit basic configuration \n @@ -385,14 +392,7 @@ class JobPackageThread(JobPackageBase): def set_job_dependency(self, dependency): self._job_dependency = dependency - @threaded - def _create_scripts_threaded(self,jobs,configuration): - for i in xrange(0, len(jobs)): - self._job_scripts[jobs[i].name] = jobs[i].create_script(configuration) - self.jobs[i].remote_logs = (self._job_scripts[jobs[i].name] + ".out".format(i), - self._job_scripts[jobs[i].name] + ".err".format(i) - ) - self._common_script = self._create_common_script() + def _create_scripts(self, configuration): for i in xrange(0, len(self.jobs)): -- GitLab From d9f3d81eed3d095500e3a39e94274b1591f46a95 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 26 Jan 2021 16:34:42 +0100 Subject: [PATCH 06/12] fix horizontal --- test/unit/test_job_list.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/unit/test_job_list.py b/test/unit/test_job_list.py index 65d78f28d..f2305898a 100644 --- a/test/unit/test_job_list.py +++ b/test/unit/test_job_list.py @@ -175,27 +175,27 @@ class TestJobList(TestCase): def test_sort_by_name_returns_the_list_of_jobs_well_sorted(self): sorted_by_name = self.job_list.sort_by_name() - for i in xrange(len(sorted_by_name) - 1): + for i in range(len(sorted_by_name) - 1): self.assertTrue( sorted_by_name[i].name <= sorted_by_name[i + 1].name) def test_sort_by_id_returns_the_list_of_jobs_well_sorted(self): sorted_by_id = self.job_list.sort_by_id() - for i in xrange(len(sorted_by_id) - 1): + for i in range(len(sorted_by_id) - 1): self.assertTrue(sorted_by_id[i].id <= sorted_by_id[i + 1].id) def test_sort_by_type_returns_the_list_of_jobs_well_sorted(self): sorted_by_type = self.job_list.sort_by_type() - for i in xrange(len(sorted_by_type) - 1): + for i in range(len(sorted_by_type) - 1): self.assertTrue( sorted_by_type[i].type <= sorted_by_type[i + 1].type) def test_sort_by_status_returns_the_list_of_jobs_well_sorted(self): sorted_by_status = self.job_list.sort_by_status() - for i in xrange(len(sorted_by_status) - 1): + for i in range(len(sorted_by_status) - 1): self.assertTrue( sorted_by_status[i].status <= sorted_by_status[i + 1].status) -- GitLab From 0306d5a0c5f261dcceb2841095bbdad131cf96bf Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 26 Jan 2021 17:05:22 +0100 Subject: [PATCH 07/12] Fix Local --- autosubmit/job/job_packages.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index a5b95c9e3..d1dada9bb 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -95,10 +95,12 @@ class JobPackageBase(object): exit = True break if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + lock.acquire() if configuration.get_project_type().lower() != "none": raise AutosubmitCritical( "Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format( job.name), 7014) + lock.release() if not job.check_script(configuration, parameters, show_logs=job.check_warnings): Log.warning("Script {0} check failed", job.name) Log.warning("On submission script has some empty variables") -- GitLab From ec785a53e020e1dd22cb54a438dca49711c48f13 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 26 Jan 2021 17:25:57 +0100 Subject: [PATCH 08/12] Fix pipeline --- autosubmit/job/job_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index f5635359d..bafd77113 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -160,7 +160,7 @@ class JobList(object): self._date_list = date_list self._member_list = member_list - chunk_list = xrange(chunk_ini, num_chunks + 1) + chunk_list = range(chunk_ini, num_chunks + 1) self._chunk_list = chunk_list jobs_parser = self._get_jobs_parser() -- GitLab From b9a84ec739edb20c197e2dd9673279c5bb76c534 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 27 Jan 2021 17:01:47 +0100 Subject: [PATCH 09/12] Fix for horizontal common script deadlock --- autosubmit/job/job_packages.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index d1dada9bb..0772e6dcf 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -118,7 +118,7 @@ class JobPackageBase(object): self.jobs[i].remote_logs = (self._job_scripts[jobs[i].name] + ".out".format(i), self._job_scripts[jobs[i].name] + ".err".format(i) ) - self._common_script = self._create_common_script() + def submit(self, configuration, parameters,only_generate=False,hold=False): """ :para configuration: Autosubmit basic configuration \n @@ -177,6 +177,8 @@ class JobPackageBase(object): Lhandle.append(self._create_scripts_threaded(self.jobs[i:i + chunksize], configuration)) for dataThread in Lhandle: dataThread.join() + self._common_script = self._create_common_script() + else: if len(self.jobs) < thread_number: self._create_scripts(configuration) @@ -186,6 +188,7 @@ class JobPackageBase(object): Lhandle.append(self._create_scripts_threaded(self.jobs[i:i + chunksize],configuration)) for dataThread in Lhandle: dataThread.join() + self._common_script = self._create_common_script() Log.debug("Sending Files") self._send_files() Log.debug("Submitting") @@ -373,7 +376,10 @@ class JobPackageThread(JobPackageBase): self._jobs_resources[job.section] = dict() self._jobs_resources[job.section]['PROCESSORS'] = job.processors self._jobs_resources[job.section]['TASKS'] = job.tasks - jobs_scripts.append(self._job_scripts[job.name]) + try: + jobs_scripts.append(self._job_scripts[job.name]) + except BaseException as e: + pass return jobs_scripts @property -- GitLab From de12090a920372dc59fc52ae5c24dd41d6349101 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 27 Jan 2021 17:38:17 +0100 Subject: [PATCH 10/12] Fix for horizontal common script deadlock --- autosubmit/job/job_list.py | 1 - autosubmit/job/job_packages.py | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index bafd77113..1d66025d9 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -174,7 +174,6 @@ class JobList(object): jobs_data = dict() # jobs_data includes the name of the .our and .err files of the job in LOG_expid if not new: - try: jobs_data = {str(row[0]): row for row in self.load()} except: diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 0772e6dcf..1d402ffef 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -119,6 +119,8 @@ class JobPackageBase(object): self._job_scripts[jobs[i].name] + ".err".format(i) ) + def _create_common_script(self): + pass def submit(self, configuration, parameters,only_generate=False,hold=False): """ :para configuration: Autosubmit basic configuration \n -- GitLab From 167eda611127e184fa9366440611a341451c9636 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 28 Jan 2021 10:55:30 +0100 Subject: [PATCH 11/12] Fix script inside tarfiles, they were empty. Fix a crashing bug when LOG_XXXX is missing (due rm -rf without prompt create later) --- autosubmit/job/job.py | 3 +++ autosubmit/job/job_packages.py | 15 +++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 7198b9c33..2ca204ccb 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1376,6 +1376,9 @@ done self._tmp_path, 'LOG_{0}'.format(self.expid)) multiple_checker_inner_jobs = os.path.join( log_dir, "inner_jobs_checker.sh") + if not os.stat(log_dir): + os.mkdir(log_dir) + os.chmod(log_dir, 0o770) open(multiple_checker_inner_jobs, 'w+').write(command) os.chmod(multiple_checker_inner_jobs, 0o770) self._platform.send_file(multiple_checker_inner_jobs, False) diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index 1d402ffef..6d8eca784 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -428,15 +428,18 @@ class JobPackageThread(JobPackageBase): for job in self.jobs: filenames += " " + self.platform.remote_log_dir + "/" + job.name + ".cmd" self.platform.remove_multiple_files(filenames) - with tarfile.open(os.path.join(self._tmp_path, output_filepath), compress_type) as tar: + tar_path = os.path.join(self._tmp_path, output_filepath) + + with tarfile.open(tar_path, compress_type) as tar: for job in self.jobs: - tar.addfile(tarfile.TarInfo(self._job_scripts[job.name]),open(os.path.join(self._tmp_path,self._job_scripts[job.name]))) - output_path = os.path.join(self._tmp_path, output_filepath) - os.chmod(output_path, 0o755) + jfile = os.path.join(self._tmp_path,self._job_scripts[job.name]) + with open(jfile, 'rb') as f: + info = tar.gettarinfo(jfile,self._job_scripts[job.name]) + tar.addfile(info, f) tar.close() - self.platform.send_file(output_path, check=False) + os.chmod(tar_path, 0o755) + self.platform.send_file(tar_path, check=False) self.platform.send_command("cd {0}; tar -xvf {1}".format(self.platform.get_files_path(),output_filepath)) - #self.platform.send_file(self._job_scripts[job.name], check=False) self.platform.send_file(self._common_script) -- GitLab From 57e749446d87d181488f4396dd75f4e99487b8fb Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 28 Jan 2021 16:42:51 +0100 Subject: [PATCH 12/12] Log fix --- autosubmit/job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 2ca204ccb..f8a6668ba 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -272,7 +272,7 @@ class Job(object): @local_logs.setter def local_logs(self, value): self._local_logs = value - self._remote_logs = value + #self._remote_logs = value @property def remote_logs(self): -- GitLab