diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index a677f8c1988c84982187a7b21d5fe0a655c4bd68..06bd1efa601bd7e7a0dd5b9a75570cb69d329a5d 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -1535,8 +1535,7 @@ class Autosubmit: "Wrapper is in Unknown Status couldn't get wrapper parameters", 7050) # New status will be saved and inner_jobs will be checked. - wrapper_job.check_status( - wrapper_job.new_status) + wrapper_job.check_status(wrapper_job.new_status) # Erase from packages if the wrapper failed to be queued ( Hold Admin bug ) if wrapper_job.status == Status.WAITING: for inner_job in wrapper_job.job_list: @@ -1575,7 +1574,7 @@ class Autosubmit: list_jobid += str(job_id) + ',' list_prevStatus.append(prev_status) completed_joblist.append(job) - else: # If they're not from slurm platform check one-by-one + else: # If they're not from slurm platform check one-by-one TODO: Implement ecwmf future platform and mnX, abstract this part platform.check_job(job) if prev_status != job.update_status(as_conf.get_copy_remote_logs() == 'true'): # Keeping track of changes @@ -1597,8 +1596,7 @@ class Autosubmit: for platform_jobs in slurm: platform = platform_jobs[0] jobs_to_check = platform_jobs[1] - platform.check_Alljobs( - platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) + platform.check_Alljobs(platform_jobs[3], jobs_to_check, as_conf.get_copy_remote_logs()) for j_Indx in xrange(0, len(platform_jobs[3])): prev_status = platform_jobs[2][j_Indx] job = platform_jobs[3][j_Indx] @@ -3533,8 +3531,7 @@ class Autosubmit: Log.info( "Preparing .lock file to avoid multiple instances with same expid.") - as_conf = AutosubmitConfig( - expid, BasicConfig, ConfigParserFactory()) + as_conf = AutosubmitConfig(expid, BasicConfig, ConfigParserFactory()) as_conf.check_conf_files(False) project_type = as_conf.get_project_type() @@ -3665,7 +3662,6 @@ class Autosubmit: else: Log.info(job_list.print_with_status()) Log.status(job_list.print_with_status()) - return True # catching Exception except (KeyboardInterrupt) as e: @@ -3674,12 +3670,19 @@ class Autosubmit: fh.flush() os.fsync(fh.fileno()) raise AutosubmitCritical("Stopped by user input", 7010) + except (BaseException) as e: + raise except portalocker.AlreadyLocked: message = "We have detected that there is another Autosubmit instance using the experiment\n. Stop other Autosubmit instances that are using the experiment or delete autosubmit.lock file located on tmp folder" raise AutosubmitCritical(message, 7000) + except AutosubmitError as e: + if e.trace == "": + e.trace = traceback.format_exc() + raise AutosubmitError(e.message, e.code,e.trace) except AutosubmitCritical as e: - Log.debug(traceback.format_exc()) - raise AutosubmitCritical(e.message, e.code) + if e.trace == "": + e.trace = traceback.format_exc() + raise AutosubmitCritical(e.message, e.code,e.trace) @staticmethod def _copy_code(as_conf, expid, project_type, force): @@ -4547,7 +4550,7 @@ class Autosubmit: for element in out: if element.find("-") != -1: numbers = element.split("-") - for count in range(int(numbers[0]), int(numbers[1]) + 1): + for count in xrange(int(numbers[0]), int(numbers[1]) + 1): data.append(str(count)) else: data.append(element) diff --git a/autosubmit/config/config_common.py b/autosubmit/config/config_common.py index f1b70a334baeeb310c8a057707ed0df42f9c4f2f..ac33cf3cd5add51beb81ee9d4e7b1e93f8b35eb1 100644 --- a/autosubmit/config/config_common.py +++ b/autosubmit/config/config_common.py @@ -455,7 +455,12 @@ class AutosubmitConfig(object): """ Log.info('\nChecking configuration files...') self.ignore_file_path = check_file - self.reload() + try: + self.reload() + except (AutosubmitCritical,AutosubmitError) as e: + raise + except BaseException as e: + raise # Annotates all errors found in the configuration files in dictionaries self.warn_config and self.wrong_config. self.check_expdef_conf() self.check_platforms_conf() @@ -1104,7 +1109,7 @@ class AutosubmitConfig(object): for split_in in split: if split_in.find("-") != -1: numbers = split_in.split("-") - for count in range(int(numbers[0]), int(numbers[1]) + 1): + for count in xrange(int(numbers[0]), int(numbers[1]) + 1): date_list.append(parse_date( string_date + str(count).zfill(len(numbers[0])))) else: @@ -1182,7 +1187,7 @@ class AutosubmitConfig(object): for split_in in split: if split_in.find("-") != -1: numbers = split_in.split("-") - for count in range(int(numbers[0]), int(numbers[1]) + 1): + for count in xrange(int(numbers[0]), int(numbers[1]) + 1): member_list.append( string_member + str(count).zfill(len(numbers[0]))) else: diff --git a/autosubmit/database/db_jobdata.py b/autosubmit/database/db_jobdata.py index fff46c1069fb3d6287b54dabedf1cabe41fe8854..61840e351ae54a1bd83f68915700feb9bdf6e3e9 100644 --- a/autosubmit/database/db_jobdata.py +++ b/autosubmit/database/db_jobdata.py @@ -1358,7 +1358,7 @@ class JobDataStructure(MainDataBase): # Needs more guarantees but so far it works. if len(jobs_in_package) > 0 and len(wrapper_jobs) > 0 and len(jobs_in_package) == len(wrapper_jobs) and no_process == False: # It is 1 to 1 - for i in range(0, len(jobs_in_package)): + for i in xrange(0, len(jobs_in_package)): name_to_current_job[jobs_in_package[i] .job_name].energy = wrapper_jobs[i].energy name_to_current_job[jobs_in_package[i] diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 14cdd4d2d7c452867404304d6a485688c44fc490..f8a6668bad3dc0e9332ded8babb4e24c294af00a 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -45,7 +45,7 @@ from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter from log.log import Log, AutosubmitCritical, AutosubmitError Log.get_logger("Autosubmit") - +# A wrapper for encapsulate threads , TODO: Python 3+ to be replaced by the < from concurrent.futures > def threaded(fn): def wrapper(*args, **kwargs): thread = Thread(target=fn, args=args, kwargs=kwargs) @@ -272,7 +272,7 @@ class Job(object): @local_logs.setter def local_logs(self, value): self._local_logs = value - self._remote_logs = value + #self._remote_logs = value @property def remote_logs(self): @@ -329,7 +329,7 @@ class Job(object): num_parents = 1 if isinstance(parent, list): num_parents = len(parent) - for i in range(num_parents): + for i in xrange(num_parents): new_parent = parent[i] if isinstance(parent, list) else parent self._parents.add(new_parent) new_parent.__add_child(self) @@ -715,7 +715,6 @@ class Job(object): Log.printlog("Job {0} completion check failed. There is no COMPLETED file".format( self.name), 6009) self.status = default_status - def update_parameters(self, as_conf, parameters, default_parameters={'d': '%d%', 'd_': '%d_%', 'Y': '%Y%', 'Y_': '%Y_%', 'M': '%M%', 'M_': '%M_%', 'm': '%m%', 'm_': '%m_%'}): @@ -1012,13 +1011,13 @@ class Job(object): self.undefined_variables = set(variables) - set(parameters) if show_logs: Log.printlog("The following set of variables to be substituted in template script is not part of parameters set, and will be replaced by a blank value: {0}".format( - self.undefined_variables), 3000) + self.undefined_variables), 6013) # Check which variables in the proj.conf are not being used in the templates if show_logs: if not set(variables).issuperset(set(parameters)): Log.printlog("The following set of variables are not being used in the templates: {0}".format( - str(set(parameters) - set(variables))), 3000) + str(set(parameters) - set(variables))), 6013) return out def write_submit_time(self): @@ -1377,6 +1376,9 @@ done self._tmp_path, 'LOG_{0}'.format(self.expid)) multiple_checker_inner_jobs = os.path.join( log_dir, "inner_jobs_checker.sh") + if not os.stat(log_dir): + os.mkdir(log_dir) + os.chmod(log_dir, 0o770) open(multiple_checker_inner_jobs, 'w+').write(command) os.chmod(multiple_checker_inner_jobs, 0o770) self._platform.send_file(multiple_checker_inner_jobs, False) diff --git a/autosubmit/job/job_grouping.py b/autosubmit/job/job_grouping.py index a494640886c74296232f3b81bc9c435c64f49a2a..90f534f211a55834a4c61997c16049e6983ee7fc 100644 --- a/autosubmit/job/job_grouping.py +++ b/autosubmit/job/job_grouping.py @@ -127,7 +127,7 @@ class JobGrouping(object): for chunk in member_chunks[member_count + 1]: if chunk.find("-") != -1: numbers = chunk.split("-") - for count in range(int(numbers[0]), int(numbers[1]) + 1): + for count in xrange(int(numbers[0]), int(numbers[1]) + 1): chunks.append(count) else: chunks.append(int(chunk)) @@ -163,7 +163,7 @@ class JobGrouping(object): return Status.UNKNOWN def _create_groups(self, jobs_group_dict, blacklist=list()): - for i in reversed(range(len(self.jobs))): + for i in reversed(xrange(len(self.jobs))): job = self.jobs[i] groups = [] @@ -251,7 +251,7 @@ class JobGrouping(object): self._fix_splits_automatic_grouping(split_groups, split_groups_status, jobs_group_dict) # check if remaining jobs can be grouped - for i in reversed(range(len(self.jobs))): + for i in reversed(xrange(len(self.jobs))): job = self.jobs[i] for group, status in self.group_status_dict.items(): if group in job.name and status == job.status: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index bcf9675a102b49b498db3797203411eeb9acda67..1d66025d99fd392a8cf3587b147b6df3cbd52851 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -38,8 +38,16 @@ import autosubmit.database.db_structure as DbStructure from networkx import DiGraph from autosubmit.job.job_utils import transitive_reduction from log.log import AutosubmitCritical, AutosubmitError, Log +from threading import Thread,Lock +import multiprocessing # Log.get_logger("Log.Autosubmit") - +def threaded(fn): + def wrapper(*args, **kwargs): + thread = Thread(target=fn, args=args, kwargs=kwargs) + thread.name = "data_processing" + thread.start() + return thread + return wrapper class JobList(object): """ @@ -166,7 +174,6 @@ class JobList(object): jobs_data = dict() # jobs_data includes the name of the .our and .err files of the job in LOG_expid if not new: - try: jobs_data = {str(row[0]): row for row in self.load()} except: @@ -220,7 +227,7 @@ class JobList(object): num_jobs = 1 if isinstance(job, list): num_jobs = len(job) - for i in range(num_jobs): + for i in xrange(num_jobs): _job = job[i] if num_jobs > 1 else job JobList._manage_job_dependencies(dic_jobs, _job, date_list, member_list, chunk_list, dependencies_keys, dependencies, graph) @@ -266,23 +273,23 @@ class JobList(object): for section_chunk in sections_chunks: info = section_chunk.split('*') if info[0] in key: - for relation in range(1, len(info)): + for relation in xrange(1, len(info)): auxiliar_relation_list = [] for location in info[relation].split('-'): auxiliar_chunk_list = [] location = location.strip('[').strip(']') if ':' in location: if len(location) == 3: - for chunk_number in range(int(location[0]), int(location[2]) + 1): + for chunk_number in xrange(int(location[0]), int(location[2]) + 1): auxiliar_chunk_list.append( chunk_number) elif len(location) == 2: if ':' == location[0]: - for chunk_number in range(0, int(location[1]) + 1): + for chunk_number in xrange(0, int(location[1]) + 1): auxiliar_chunk_list.append( chunk_number) elif ':' == location[1]: - for chunk_number in range(int(location[0]) + 1, len(dic_jobs._chunk_list) - 1): + for chunk_number in xrange(int(location[0]) + 1, len(dic_jobs._chunk_list) - 1): auxiliar_chunk_list.append( chunk_number) elif ',' in location: @@ -316,7 +323,7 @@ class JobList(object): numbers = str_split.split(":") # change this to be checked in job_common.py max_splits = min(int(numbers[1]), max_splits) - for count in range(int(numbers[0]), max_splits + 1): + for count in xrange(int(numbers[0]), max_splits + 1): splits.append(int(str(count).zfill(len(numbers[0])))) else: if int(str_split) <= max_splits: @@ -430,7 +437,7 @@ class JobList(object): max_distance = (chunk_list.index(chunk) + 1) % job.frequency if max_distance == 0: max_distance = job.frequency - for distance in range(1, max_distance): + for distance in xrange(1, max_distance): for parent in dic_jobs.get_jobs(section_name, date, member, chunk - distance): job.add_parent(parent) JobList._add_edge(graph, job, parent) @@ -439,7 +446,7 @@ class JobList(object): max_distance = (member_index + 1) % job.frequency if max_distance == 0: max_distance = job.frequency - for distance in range(1, max_distance, 1): + for distance in xrange(1, max_distance, 1): for parent in dic_jobs.get_jobs(section_name, date, member_list[member_index - distance], chunk): job.add_parent(parent) @@ -449,7 +456,7 @@ class JobList(object): max_distance = (date_index + 1) % job.frequency if max_distance == 0: max_distance = job.frequency - for distance in range(1, max_distance, 1): + for distance in xrange(1, max_distance, 1): for parent in dic_jobs.get_jobs(section_name, date_list[date_index - distance], member, chunk): job.add_parent(parent) @@ -460,7 +467,7 @@ class JobList(object): num_parents = 1 if isinstance(parents, list): num_parents = len(parents) - for i in range(num_parents): + for i in xrange(num_parents): parent = parents[i] if isinstance(parents, list) else parents graph.add_edge(parent.name, job.name) @@ -523,7 +530,7 @@ class JobList(object): jobs_to_sort = [previous_job] previous_section_running_type = None # Index starts at 1 because 0 has been taken in a previous step - for index in range(1, len(sorted_jobs_list) + 1): + for index in xrange(1, len(sorted_jobs_list) + 1): # If not last item if index < len(sorted_jobs_list): job = sorted_jobs_list[index] @@ -542,7 +549,7 @@ class JobList(object): if len(k.name.split('_')) == 5 else num_chunks + 1))) # Bringing back original job if identified - for idx in range(0, len(jobs_to_sort)): + for idx in xrange(0, len(jobs_to_sort)): # Test if it is a fake job if jobs_to_sort[idx] in fake_original_job_map: fake_job = jobs_to_sort[idx] @@ -1404,6 +1411,21 @@ class JobList(object): for job in self._job_list: if not job.has_parents() and new: job.status = Status.READY + @threaded + def check_scripts_threaded(self, as_conf): + """ + When we have created the scripts, all parameters should have been substituted. + %PARAMETER% handlers not allowed (thread test) + + :param as_conf: experiment configuration + :type as_conf: AutosubmitConfig + """ + out = True + for job in self._job_list: + show_logs = job.check_warnings + if not job.check_script(as_conf, self.parameters, show_logs): + out = False + return out def check_scripts(self, as_conf): """ @@ -1423,8 +1445,7 @@ class JobList(object): 'Template {0} will be checked in running time'.format(job.section)) continue elif job.check.lower() != 'true': - Log.info( - 'Template {0} will not be checked'.format(job.section)) + Log.info('Template {0} will not be checked'.format(job.section)) continue else: if job.section in self.sections_checked: @@ -1627,7 +1648,7 @@ class JobList(object): if job.name not in visited: visited.append(job.name) prefix = "" - for i in range(level): + for i in xrange(level): prefix += "| " # Prefix + Job Name result = "\n" + prefix + \ diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 886e98df86957ae7641ed3b27a2f149ccd097f55..53bef7b55429b081a33ff6ecc995116b364ffbce 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -450,10 +450,10 @@ class JobPackager(object): if new_package is not None: current_package += new_package - for i in range(len(current_package)): + for i in xrange(len(current_package)): total_wallclock = sum_str_hours(total_wallclock, wallclock) if len(current_package) > 1: - for level in range(1,len(current_package)): + for level in xrange(1,len(current_package)): for job in current_package[level]: job.level=level return JobPackageHorizontalVertical(current_package, max_procs, total_wallclock, @@ -477,7 +477,7 @@ class JobPackager(object): for job in current_package[-1]: total_wallclock = sum_str_hours(total_wallclock, job.wallclock) if len(current_package) > 1: - for level in range(1,len(current_package)): + for level in xrange(1,len(current_package)): for job in current_package[level]: job.level=level return JobPackageVerticalHorizontal(current_package, total_processors, total_wallclock, @@ -653,7 +653,7 @@ class JobPackagerVerticalMixed(JobPackagerVertical): # Unnecessary assignment sorted_jobs = self.sorted_jobs - for index in range(self.index, len(sorted_jobs)): + for index in xrange(self.index, len(sorted_jobs)): child = sorted_jobs[index] if self._is_wrappable(child): self.index = index+1 @@ -767,13 +767,18 @@ class JobPackagerHorizontal(object): if wrappable and child not in next_section_list: next_section_list.append(child) - next_section_list.sort( - key=lambda job: self.sort_by_expression(job.name)) + next_section_list.sort(key=lambda job: self.sort_by_expression(job.name)) self.job_list = next_section_list package_jobs = self.build_horizontal_package(horizontal_vertical) if package_jobs: - # if not self.add_sectioncombo_processors(self.total_processors) and horizontal_vertical: + sections_aux = set() + wallclock = package_jobs[0].wallclock + for job in package_jobs: + if job.section not in sections_aux: + sections_aux.add(job.section) + if job.wallclock > wallclock: + wallclock = job.wallclock if self._current_processors > max_procs: return packages if max_wallclock: diff --git a/autosubmit/job/job_packages.py b/autosubmit/job/job_packages.py index a2f49c0604bdaaa64607f22d784f136efe2da381..6d8eca784d7df0e93bf2af7df0a0aab89b566799 100644 --- a/autosubmit/job/job_packages.py +++ b/autosubmit/job/job_packages.py @@ -33,8 +33,18 @@ Log.get_logger("Autosubmit") from autosubmit.job.job_exceptions import WrongTemplateException from autosubmit.job.job import Job from bscearth.utils.date import sum_str_hours - - +from threading import Thread,Lock +import multiprocessing +import tarfile + +lock = Lock() +def threaded(fn): + def wrapper(*args, **kwargs): + thread = Thread(target=fn, args=args, kwargs=kwargs) + thread.name = "data_processing" + thread.start() + return thread + return wrapper class JobPackageBase(object): """ Class to manage the package of jobs to be submitted by autosubmit @@ -77,6 +87,40 @@ class JobPackageBase(object): """ return self._platform + @threaded + def check_scripts(self,jobs,configuration, parameters,only_generate,hold): + for job in jobs: + if job.check.lower() == Job.CHECK_ON_SUBMISSION.lower(): + if only_generate: + exit = True + break + if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + lock.acquire() + if configuration.get_project_type().lower() != "none": + raise AutosubmitCritical( + "Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format( + job.name), 7014) + lock.release() + if not job.check_script(configuration, parameters, show_logs=job.check_warnings): + Log.warning("Script {0} check failed", job.name) + Log.warning("On submission script has some empty variables") + else: + Log.result("Script {0} OK", job.name) + lock.acquire() + job.update_parameters(configuration, parameters) + lock.release() + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) + @threaded + def _create_scripts_threaded(self,jobs,configuration): + for i in xrange(0, len(jobs)): + self._job_scripts[jobs[i].name] = jobs[i].create_script(configuration) + self.jobs[i].remote_logs = (self._job_scripts[jobs[i].name] + ".out".format(i), + self._job_scripts[jobs[i].name] + ".err".format(i) + ) + + def _create_common_script(self): + pass def submit(self, configuration, parameters,only_generate=False,hold=False): """ :para configuration: Autosubmit basic configuration \n @@ -87,34 +131,69 @@ class JobPackageBase(object): :type only_generate: Boolean """ exit=False - for job in self.jobs: - try: - if job.check.lower() == Job.CHECK_ON_SUBMISSION.lower(): - if only_generate: - exit=True - break - if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): - if configuration.get_project_type().lower() != "none": - raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) - if not job.check_script(configuration, parameters,show_logs=job.check_warnings): - Log.warning("Script {0} check failed",job.name) - Log.warning("On submission script has some empty variables") - else: - Log.result("Script {0} OK",job.name) - job.update_parameters(configuration, parameters) - # looking for directives on jobs - self._custom_directives = self._custom_directives | set(job.custom_directives) - - except BaseException as e: #should be IOERROR - raise AutosubmitCritical( - "Error on {1}, template [{0}] still does not exists in running time(check=on_submission actived) ".format(job.file,job.name), 7014) - + thread_number = multiprocessing.cpu_count() + if len(self.jobs) > 2500: + thread_number = thread_number * 2 + elif len(self.jobs) > 5000: + thread_number = thread_number * 3 + elif len(self.jobs) > 7500: + thread_number = thread_number * 4 + elif len(self.jobs) > 10000: + thread_number = thread_number * 5 + chunksize = int((len(self.jobs) + thread_number - 1) / thread_number); + try: + if len(self.jobs) < thread_number: + for job in self.jobs: + if job.check.lower() == Job.CHECK_ON_SUBMISSION.lower(): + if only_generate: + exit=True + break + if not os.path.exists(os.path.join(configuration.get_project_dir(), job.file)): + if configuration.get_project_type().lower() != "none": + raise AutosubmitCritical("Template [ {0} ] using CHECK=On_submission has some empty variable {0}".format(job.name),7014) + if not job.check_script(configuration, parameters,show_logs=job.check_warnings): + Log.warning("Script {0} check failed",job.name) + Log.warning("On submission script has some empty variables") + else: + Log.result("Script {0} OK",job.name) + job.update_parameters(configuration, parameters) + # looking for directives on jobs + self._custom_directives = self._custom_directives | set(job.custom_directives) + else: + Lhandle = list() + for i in xrange(0, len(self.jobs), chunksize): + Lhandle.append(self.check_scripts(self.jobs[i:i + chunksize], configuration, parameters, only_generate, hold)) + for dataThread in Lhandle: + dataThread.join() + except BaseException as e: #should be IOERROR + raise AutosubmitCritical( + "Error on {1}, template [{0}] still does not exists in running time(check=on_submission actived) ".format(job.file,job.name), 7014) + Log.debug("Creating Scripts") if only_generate: if not exit: - self._create_scripts(configuration) + if len(self.jobs) < thread_number: + self._create_scripts(configuration) + else: + Lhandle = list() + for i in xrange(0, len(self.jobs), chunksize): + Lhandle.append(self._create_scripts_threaded(self.jobs[i:i + chunksize], configuration)) + for dataThread in Lhandle: + dataThread.join() + self._common_script = self._create_common_script() + else: - self._create_scripts(configuration) + if len(self.jobs) < thread_number: + self._create_scripts(configuration) + else: + Lhandle = list() + for i in xrange(0, len(self.jobs), chunksize): + Lhandle.append(self._create_scripts_threaded(self.jobs[i:i + chunksize],configuration)) + for dataThread in Lhandle: + dataThread.join() + self._common_script = self._create_common_script() + Log.debug("Sending Files") self._send_files() + Log.debug("Submitting") self._do_submission(hold=hold) @@ -212,7 +291,7 @@ class JobPackageArray(JobPackageBase): def _create_scripts(self, configuration): timestamp = str(int(time.time())) - for i in range(0, len(self.jobs)): + for i in xrange(0, len(self.jobs)): self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) self._job_inputs[self.jobs[i].name] = self._create_i_input(timestamp, i) self.jobs[i].remote_logs = (timestamp + ".{0}.out".format(i), timestamp + ".{0}.err".format(i)) @@ -250,7 +329,7 @@ class JobPackageArray(JobPackageBase): if package_id is None: return - for i in range(0, len(self.jobs)): + for i in xrange(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) + '[{0}]'.format(i) self.jobs[i].status = Status.SUBMITTED @@ -299,7 +378,10 @@ class JobPackageThread(JobPackageBase): self._jobs_resources[job.section] = dict() self._jobs_resources[job.section]['PROCESSORS'] = job.processors self._jobs_resources[job.section]['TASKS'] = job.tasks - jobs_scripts.append(self._job_scripts[job.name]) + try: + jobs_scripts.append(self._job_scripts[job.name]) + except BaseException as e: + pass return jobs_scripts @property @@ -321,12 +403,11 @@ class JobPackageThread(JobPackageBase): def set_job_dependency(self, dependency): self._job_dependency = dependency - def _create_scripts(self, configuration): - for i in range(0, len(self.jobs)): + def _create_scripts(self, configuration): + for i in xrange(0, len(self.jobs)): self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) - self.jobs[i].remote_logs = ( - self._job_scripts[self.jobs[i].name] + ".out".format(i), + self.jobs[i].remote_logs = (self._job_scripts[self.jobs[i].name] + ".out".format(i), self._job_scripts[self.jobs[i].name] + ".err".format(i) ) self._common_script = self._create_common_script() @@ -340,15 +421,28 @@ class JobPackageThread(JobPackageBase): def _send_files(self): self.platform.check_remote_log_dir() + compress_type = "w" + output_filepath = '{0}.tar'.format("wrapper_scripts") if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() for job in self.jobs: filenames += " " + self.platform.remote_log_dir + "/" + job.name + ".cmd" self.platform.remove_multiple_files(filenames) - for job in self.jobs: - self.platform.send_file(self._job_scripts[job.name], check=False) + tar_path = os.path.join(self._tmp_path, output_filepath) + + with tarfile.open(tar_path, compress_type) as tar: + for job in self.jobs: + jfile = os.path.join(self._tmp_path,self._job_scripts[job.name]) + with open(jfile, 'rb') as f: + info = tar.gettarinfo(jfile,self._job_scripts[job.name]) + tar.addfile(info, f) + tar.close() + os.chmod(tar_path, 0o755) + self.platform.send_file(tar_path, check=False) + self.platform.send_command("cd {0}; tar -xvf {1}".format(self.platform.get_files_path(),output_filepath)) self.platform.send_file(self._common_script) + def _do_submission(self, job_scripts=None, hold=False): if callable(getattr(self.platform, 'remove_multiple_files')): filenames = str() @@ -368,7 +462,7 @@ class JobPackageThread(JobPackageBase): if package_id is None: return - for i in range(0, len(self.jobs) ): + for i in xrange(0, len(self.jobs) ): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED @@ -419,7 +513,7 @@ class JobPackageThreadWrapped(JobPackageThread): return self._platform.project def _create_scripts(self, configuration): - for i in range(0, len(self.jobs)): + for i in xrange(0, len(self.jobs)): self._job_scripts[self.jobs[i].name] = self.jobs[i].create_script(configuration) self.jobs[i].remote_logs = ( self._job_scripts[self.jobs[i].name] + ".out".format(i), @@ -451,7 +545,7 @@ class JobPackageThreadWrapped(JobPackageThread): if package_id is None: raise Exception('Submission failed') - for i in range(0, len(self.jobs)): + for i in xrange(0, len(self.jobs)): Log.info("{0} submitted", self.jobs[i].name) self.jobs[i].id = str(package_id) self.jobs[i].status = Status.SUBMITTED diff --git a/autosubmit/monitor/diagram.py b/autosubmit/monitor/diagram.py index e4484c1d9c83c2e72b95da4af8f9e3923055a6ed..2b758859ea5f56e5f5e2962eb8c880fe7a04f3ce 100644 --- a/autosubmit/monitor/diagram.py +++ b/autosubmit/monitor/diagram.py @@ -54,7 +54,7 @@ suppose it is noon, if you supply -fp 5 the query will consider changes starting exp_stats = ExperimentStats(jobs_list, period_ini, period_fi) grid_spec = gridspec.GridSpec(RATIO * num_plots + 2, 1) - for plot in range(1, num_plots + 1): + for plot in xrange(1, num_plots + 1): # Calculating jobs inside the given plot l1 = int((plot - 1) * MAX_JOBS_PER_PLOT) l2 = int(plot * MAX_JOBS_PER_PLOT) @@ -104,7 +104,7 @@ def create_csv_stats(exp_stats, jobs_list, output_file): output_file = output_file.replace('pdf', 'csv') with open(output_file, 'wb') as file: file.write("Job,Started,Ended,Queuing time (hours),Running time (hours)\n") - for i in range(len(jobs_list)): + for i in xrange(len(jobs_list)): file.write("{0},{1},{2},{3},{4}\n".format(job_names[i], start_times[i], end_times[i], queuing_times[i], running_times[i])) def build_legends(plot, rects, experiment_stats, general_stats): diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 64f48bd9ef21666ea6181e8080e279a0f1d4f186..cebb30848759908b521fc368088d1d30af794618 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -163,7 +163,7 @@ class Monitor: if len(subgraph.get_node(group[0])) == 0: subgraph.add_node(previous_node) - for i in range(1, len(group)): + for i in xrange(1, len(group)): node = exp.get_node(group[i])[0] if len(subgraph.get_node(group[i])) == 0: subgraph.add_node(node) diff --git a/autosubmit/platforms/wrappers/wrapper_builder.py b/autosubmit/platforms/wrappers/wrapper_builder.py index 846f7930433ba6e00784699fc0cb0060b82034c6..dbb7c8cbf72d938ff7f934746bfa7fabb06581e1 100644 --- a/autosubmit/platforms/wrappers/wrapper_builder.py +++ b/autosubmit/platforms/wrappers/wrapper_builder.py @@ -827,14 +827,14 @@ class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder): total_threads = float(len(self.job_scripts)) n_threads = float(self.threads) core = [] - for thread in range(int(n_threads)): + for thread in xrange(int(n_threads)): core.append(0x0) core[0] = 0x1 horizontal_wrapper_size=int(total_threads) srun_mask_values = [] - for job_id in range(horizontal_wrapper_size): - for thread in range(1, int(n_threads)): + for job_id in xrange(horizontal_wrapper_size): + for thread in xrange(1, int(n_threads)): core[thread] = core[thread-1]*2 job_mask = 0x0 for thr_mask in core: diff --git a/test/integration/test_db_manager.py b/test/integration/test_db_manager.py index 558dca17ef1cfb551c24bd0569c830986e596880..64bf413e33d7122d49abf7c168de346b6a1acab3 100644 --- a/test/integration/test_db_manager.py +++ b/test/integration/test_db_manager.py @@ -29,7 +29,7 @@ class TestDbManager(TestCase): table_name = 'test' columns = ['field1', 'field2'] self.db_manager.create_table(table_name, columns) - for i in xrange(3): + for i in range(3): self.db_manager.insert(table_name, columns, ['dummy', 'dummy']) count = self.db_manager.count(table_name) self.assertEquals(3, count) diff --git a/test/unit/test_job_list.py b/test/unit/test_job_list.py index 65d78f28dd728f2b6f81025d846986cc396bfc7c..f2305898a270abf3857f80e35a19db56025cefcd 100644 --- a/test/unit/test_job_list.py +++ b/test/unit/test_job_list.py @@ -175,27 +175,27 @@ class TestJobList(TestCase): def test_sort_by_name_returns_the_list_of_jobs_well_sorted(self): sorted_by_name = self.job_list.sort_by_name() - for i in xrange(len(sorted_by_name) - 1): + for i in range(len(sorted_by_name) - 1): self.assertTrue( sorted_by_name[i].name <= sorted_by_name[i + 1].name) def test_sort_by_id_returns_the_list_of_jobs_well_sorted(self): sorted_by_id = self.job_list.sort_by_id() - for i in xrange(len(sorted_by_id) - 1): + for i in range(len(sorted_by_id) - 1): self.assertTrue(sorted_by_id[i].id <= sorted_by_id[i + 1].id) def test_sort_by_type_returns_the_list_of_jobs_well_sorted(self): sorted_by_type = self.job_list.sort_by_type() - for i in xrange(len(sorted_by_type) - 1): + for i in range(len(sorted_by_type) - 1): self.assertTrue( sorted_by_type[i].type <= sorted_by_type[i + 1].type) def test_sort_by_status_returns_the_list_of_jobs_well_sorted(self): sorted_by_status = self.job_list.sort_by_status() - for i in xrange(len(sorted_by_status) - 1): + for i in range(len(sorted_by_status) - 1): self.assertTrue( sorted_by_status[i].status <= sorted_by_status[i + 1].status)