diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index be0a76da67787315c7bf974c7c094ae6c3dc0e4b..c8d5791b6f6f0a0789927d8dd1481a260168c829 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -202,7 +202,8 @@ class Job(object): self.file = None self.additional_files = [] self.executable = None - self.x11 = False + self.x11 = None + self.x11_options = None self._local_logs = ('', '') self._remote_logs = ('', '') self.script_name = self.name + ".cmd" @@ -252,6 +253,7 @@ class Job(object): self.start_time_written = False self.submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time self.finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial + def _init_runtime_parameters(self): # hetjobs self.het = {'HETSIZE': 0} @@ -268,6 +270,25 @@ class Job(object): self.start_time_placeholder = "" self.processors_per_node = "" + + @property + @autosubmit_parameter(name='x11') + def x11(self): + """Whether to use X11 forwarding""" + return self._x11 + @x11.setter + def x11(self, value): + self._x11 = value + + @property + @autosubmit_parameter(name='x11_options') + def x11_options(self): + """Allows to set salloc parameters for x11""" + return self._x11_options + @x11_options.setter + def x11_options(self, value): + self._x11_options = value + @property @autosubmit_parameter(name='tasktype') def section(self): @@ -1120,9 +1141,11 @@ class Job(object): if not self.log_retrieved: self.local_logs = backup_logname - Log.printlog("Failed to retrieve logs for job {0}".format(self.name), 6000) if raise_error: - raise + raise AutosubmitCritical("Failed to retrieve logs for job {0}".format(self.name), 6000) + else: + Log.printlog("Failed to retrieve logs for job {0}".format(self.name), 6000) + else: # Update the logs with Autosubmit Job ID Brand try: @@ -1564,6 +1587,8 @@ class Job(object): def update_platform_associated_parameters(self,as_conf, parameters, job_platform, chunk): job_data = as_conf.jobs_data[self.section] platform_data = as_conf.platforms_data.get(job_platform.name,{}) + self.x11_options = str(as_conf.jobs_data[self.section].get("X11_OPTIONS", as_conf.platforms_data.get(job_platform.name,{}).get("X11_OPTIONS",""))) + self.ec_queue = str(job_data.get("EC_QUEUE", platform_data.get("EC_QUEUE",""))) self.executable = job_data.get("EXECUTABLE", platform_data.get("EXECUTABLE","")) self.total_jobs = job_data.get("TOTALJOBS",job_data.get("TOTAL_JOBS", job_platform.total_jobs)) @@ -1838,6 +1863,7 @@ class Job(object): self.check = as_conf.jobs_data[self.section].get("CHECK", False) self.check_warnings = as_conf.jobs_data[self.section].get("CHECK_WARNINGS", False) self.shape = as_conf.jobs_data[self.section].get("SHAPE", "") + self.x11 = str(as_conf.jobs_data[self.section].get("X11", False)).lower() if self.checkpoint: # To activate placeholder sustitution per in the template parameters["AS_CHECKPOINT"] = self.checkpoint parameters['JOBNAME'] = self.name @@ -1863,6 +1889,7 @@ class Job(object): parameters['JOB_DEPENDENCIES'] = self.dependencies parameters['EXPORT'] = self.export parameters['PROJECT_TYPE'] = as_conf.get_project_type() + parameters['X11'] = self.x11 self.wchunkinc = as_conf.get_wchunkinc(self.section) for key,value in as_conf.jobs_data[self.section].items(): parameters["CURRENT_"+key.upper()] = value @@ -1977,7 +2004,7 @@ class Job(object): template_file.close() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'xclock; xclock' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 81851902b7bd1fe3535d46e068f2782bba87fe9e..ccb59012153b39635d556a215d40353725084962 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -2570,6 +2570,9 @@ class JobList(object): if not hasattr(job,"updated_log") or not job.updated_log: # hasattr for backward compatibility (job.updated_logs is only for newer jobs, as the loaded ones may not have this set yet) # order path_to_logs by name and get the two last element log_file = False + if hasattr(job, "x11") and job.x11: + job.updated_log = True + return if job.wrapper_type == "vertical" and job.fail_count > 0: for log_recovered in self.path_to_logs.glob(f"{job.name}.*._{job.fail_count}.out"): if job.local_logs[0][-4] in log_recovered.name: diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index b09d07e36cbab682dbb8ba4c2c8dd64a093b4ce9..ea8308fd040a977f696c397fd9b1b01a422e5698 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -895,25 +895,27 @@ class ParamikoPlatform(Platform): def flush_out(self,session): while session.recv_ready(): - sys.stdout.write(session.recv(4096)) + sys.stdout.write(session.recv(4096).decode(locale.getlocale()[1])) while session.recv_stderr_ready(): - sys.stderr.write(session.recv_stderr(4096)) + sys.stderr.write(session.recv_stderr(4096).decode(locale.getlocale()[1])) @threaded def x11_status_checker(self, session, session_fileno): + poller = None self.transport.accept() while not session.exit_status_ready(): try: - if sys.platform != "linux": - self.poller = self.poller.kqueue() - else: - self.poller = self.poller.poll() + if type(self.poller) is not list: + if sys.platform != "linux": + poller = self.poller.kqueue() + else: + poller = self.poller.poll() # accept subsequent x11 connections if any if len(self.transport.server_accepts) > 0: self.transport.accept() - if not self.poller: # this should not happen, as we don't have a timeout. + if not poller: # this should not happen, as we don't have a timeout. break - for fd, event in self.poller: + for fd, event in poller: if fd == session_fileno: self.flush_out(session) # data either on local/remote x11 socket @@ -961,12 +963,22 @@ class ParamikoPlatform(Platform): if display is None or not display: display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) - chan.request_x11(handler=self.x11_handler) + chan = self.transport.open_session() + chan.request_x11(single_connection=False,handler=self.x11_handler) else: - chan.settimeout(timeout) + chan = self.transport.open_session() if x11 == "true": - command = command + " ; sleep infinity" - chan.exec_command(command) + if "timeout" in command: + timeout_command = command.split("timeout ")[1].split(" ")[0] + if timeout_command == 0: + timeout_command = "infinity" + command = f'{command} ; sleep {timeout_command} 2>/dev/null' + #command = f'export display {command}' + Log.info(command) + try: + chan.exec_command(command) + except BaseException as e: + raise AutosubmitCritical("Failed to execute command: %s" % e) chan_fileno = chan.fileno() self.poller.register(chan_fileno, select.POLLIN) self.x11_status_checker(chan, chan_fileno) @@ -1024,8 +1036,13 @@ class ParamikoPlatform(Platform): channel.settimeout(timeout) stdin.close() channel.shutdown_write() - stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) - while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): + stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) + + aux_stderr = [] + i = 0 + x11_exit = False + + while (not channel.closed or channel.recv_ready() or channel.recv_stderr_ready() ) and not x11_exit: # stop if channel was closed prematurely, and there is no data in the buffers. got_chunk = False readq, _, _ = select.select([stdout.channel], [], [], 2) @@ -1033,17 +1050,28 @@ class ParamikoPlatform(Platform): if c.recv_ready(): stdout_chunks.append( stdout.channel.recv(len(c.in_buffer))) - #stdout_chunks.append(" ") got_chunk = True if c.recv_stderr_ready(): # make sure to read stderr to prevent stall stderr_readlines.append( stderr.channel.recv_stderr(len(c.in_stderr_buffer))) - #stdout_chunks.append(" ") got_chunk = True if x11 == "true": - got_chunk = True - break + if len(stderr_readlines) > 0: + aux_stderr.extend(stderr_readlines) + for stderr_line in stderr_readlines: + stderr_line = stderr_line.decode(lang) + if "salloc" in stderr_line: # salloc is the command to allocate resources in slurm, for pjm it is different + job_id = re.findall(r'\d+', stderr_line) + if job_id: + stdout_chunks.append(job_id[0].encode(lang)) + x11_exit = True + else: + x11_exit = True + if not x11_exit: + stderr_readlines = [] + else: + stderr_readlines = aux_stderr if not got_chunk and stdout.channel.exit_status_ready() and not stderr.channel.recv_stderr_ready() and not stdout.channel.recv_ready(): # indicate that we're not going to read from this channel anymore stdout.channel.shutdown_read() @@ -1074,7 +1102,7 @@ class ParamikoPlatform(Platform): elif errorLine.find("syntax error") != -1: raise AutosubmitCritical("Syntax error",7052,self._ssh_output_err) elif errorLine.find("refused") != -1 or errorLine.find("slurm_persist_conn_open_without_init") != -1 or errorLine.find("slurmdbd") != -1 or errorLine.find("submission failed") != -1 or errorLine.find("git clone") != -1 or errorLine.find("sbatch: error: ") != -1 or errorLine.find("not submitted") != -1 or errorLine.find("invalid") != -1 or "[ERR.] PJM".lower() in errorLine: - if "[ERR.] PJM".lower() in errorLine or (self._submit_command_name == "sbatch" and (errorLine.find("policy") != -1 or errorLine.find("invalid") != -1) ) or (self._submit_command_name == "sbatch" and errorLine.find("argument") != -1) or (self._submit_command_name == "bsub" and errorLine.find("job not submitted") != -1) or self._submit_command_name == "ecaccess-job-submit" or self._submit_command_name == "qsub ": + if "salloc: error" in errorLine or "salloc: unrecognized option" in errorLine or "[ERR.] PJM".lower() in errorLine or (self._submit_command_name == "sbatch" and (errorLine.find("policy") != -1 or errorLine.find("invalid") != -1) ) or (self._submit_command_name == "sbatch" and errorLine.find("argument") != -1) or (self._submit_command_name == "bsub" and errorLine.find("job not submitted") != -1) or self._submit_command_name == "ecaccess-job-submit" or self._submit_command_name == "qsub ": raise AutosubmitError(errorLine, 7014, "Bad Parameters.") raise AutosubmitError('Command {0} in {1} warning: {2}'.format(command, self.host,self._ssh_output_err, 6005)) @@ -1095,8 +1123,10 @@ class ParamikoPlatform(Platform): except IOError as e: raise AutosubmitError(str(e),6016) except BaseException as e: + if type(stderr_readlines) is str: + stderr_readlines = '\n'.join(stderr_readlines) raise AutosubmitError('Command {0} in {1} warning: {2}'.format( - command, self.host, '\n'.join(stderr_readlines)), 6005, str(e)) + command, self.host, stderr_readlines), 6005, str(e)) def parse_job_output(self, output): """ diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 438078118503dd8ae8a5a1e9da93675b447b217b..4b4a2470602a9347a8c221e71f8a60f3772f6dcb 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -77,6 +77,7 @@ class Platform(object): self._submit_hold_cmd = None self._submit_command_name = None self._submit_cmd = None + self._submit_cmd_x11 = None self._checkhost_cmd = None self.cancel_cmd = None self.otp_timeout = None @@ -300,7 +301,8 @@ class Platform(object): save = True if not inspect: job_list.save() - valid_packages_to_submit.append(package) + if package.x11 != "true": + valid_packages_to_submit.append(package) # Log.debug("FD end-submit: {0}".format(log.fd_show.fd_table_status_str(open())) except (IOError, OSError): if package.jobs[0].id != 0: @@ -831,6 +833,9 @@ class Platform(object): continue job.children = children job.platform = self + if job.x11: + Log.debug("Job {0} is an X11 job, skipping log retrieval as they're written in the ASLOGS".format(job.name)) + continue try: job.retrieve_logfiles(self, raise_error=True) if job.wrapper_type != "vertical": diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 13d0a7cad28f0eb00d20acf0862266c23df6ae8a..4ba8248d93d151c02fe839b5d3772b0c7a0172b7 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -49,6 +49,8 @@ class SlurmPlatform(ParamikoPlatform): self._submit_hold_cmd = None self._submit_command_name = None self._submit_cmd = None + self.x11_options = None + self._submit_cmd_x11 = f'{self.remote_log_dir}' self._checkhost_cmd = None self.cancel_cmd = None self._header = SlurmHeader() @@ -72,6 +74,16 @@ class SlurmPlatform(ParamikoPlatform): tmp_path, self.config.get("LOCAL_ASLOG_DIR"), "submit_") self._submit_script_file = open(self._submit_script_path, 'wb').close() + + def get_submit_cmd_x11(self, args, script_name, job): + """ + Returns the submit command for the platform + """ + + cmd = f'salloc {args} {self._submit_cmd_x11}/{script_name}' + Log.debug(f"Salloc command: {cmd}") + return cmd + def generate_new_name_submit_script_file(self): self._submit_script_path = self._submit_script_base_name + os.urandom(16).hex() + ".sh" self._submit_script_file = open(self._submit_script_path, 'wb').close() @@ -321,6 +333,8 @@ class SlurmPlatform(ParamikoPlatform): self.put_cmd = "scp" self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir + self._submit_cmd_x11 = f'{self.remote_log_dir}' + def hold_job(self, job): try: @@ -518,7 +532,7 @@ class SlurmPlatform(ParamikoPlatform): status = "" try: status = [x.split()[1] for x in output.splitlines() - if x.split()[0][:len(job_id)] == str(job_id)] + if x.split()[0][:len(str(job_id))] == str(job_id)] except BaseException as e: pass if len(status) == 0: @@ -530,12 +544,12 @@ class SlurmPlatform(ParamikoPlatform): if outputlines.find("failed") != -1: raise AutosubmitCritical( "Submission failed. Command Failed", 7014) - jobs_id = [] - for output in outputlines.splitlines(): - jobs_id.append(int(output.split(' ')[3])) if x11 == "true": - return jobs_id[0] + return int(outputlines.splitlines()[0]) else: + jobs_id = [] + for output in outputlines.splitlines(): + jobs_id.append(int(output.split(' ')[3])) return jobs_id except IndexError: raise AutosubmitCritical( @@ -557,10 +571,7 @@ class SlurmPlatform(ParamikoPlatform): x11 = job.x11 if x11 == "true": - if not hold: - return export + self._submit_cmd + job_script - else: - return export + self._submit_hold_cmd + job_script + return export + self.get_submit_cmd_x11(job.x11_options.strip(""), job_script.strip(""), job) else: try: lang = locale.getlocale()[1] diff --git a/requeriments.txt b/requeriments.txt index 55ebc8abf0738b14e9463a5dcad2e7952ec3a4e9..872cb4ca113f7387a3bf1ff280976f5ba39168de 100644 --- a/requeriments.txt +++ b/requeriments.txt @@ -24,42 +24,4 @@ autosubmitconfigparser==1.0.62 pathlib configparser -[:python_version <= "3.7"] -PyNaCl==1.5.0 -pythondialog==3.5.3 -xlib==0.21 -setuptools==68.2.2 -cryptography==41.0.5 -bscearth.utils==0.5.2 -requests==2.31.0 -networkx==2.6.3 -portalocker==2.7.0 -mock==5.1.0 -paramiko==3.3.1 -matplotlib==3.5.3 -python_dateutil==2.8.2 -argparse==1.4.0 -configobj==5.0.8 -packaging==23.2 -bcrypt==4.0.1 -charset_normalizer==3.3.1 -kiwisolver==1.4.5 -fonttools==4.43.1 -cycler==0.12.1 -ruamel.yaml.clib==0.2.8 -typing_extensions==4.8.0 -psutil==5.6.1 -Pygments==2.3.1 -coverage==5.0 -nose==1.3.7 -six==1.12.0 -Cython==0.29.6 -cffi==1.12.2 -py==1.8.0 -atomicwrites==1.3.0 -attrs==19.1.0 -more_itertools==6.0.0 -urllib3==1.24.1 -idna==2.8 -Pillow==6.2.1 -numpy==1.17.4 +