From 2490afbb7fcf9d3da5413ee8a01e5abe6bc2355d Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 3 Aug 2023 14:22:09 +0200 Subject: [PATCH 01/20] testing xclock t --- autosubmit/job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 4de01df59..28ed5a092 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1500,7 +1500,7 @@ class Job(object): template_file.close() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'xclock' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: -- GitLab From 4e01aecc83ed7cbbe5b371a4fe4e33dc9e3b14ad Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 4 Aug 2023 10:46:18 +0200 Subject: [PATCH 02/20] Pycharm fixing imports --- autosubmit/database/db_common.py | 7 ++- autosubmit/database/db_manager.py | 2 +- autosubmit/database/db_structure.py | 7 ++- autosubmit/experiment/experiment_common.py | 4 +- autosubmit/experiment/statistics.py | 3 +- autosubmit/git/autosubmit_git.py | 13 +++-- autosubmit/helpers/autosubmit_helper.py | 14 +++-- autosubmit/helpers/parameters.py | 3 +- autosubmit/helpers/utils.py | 5 +- autosubmit/history/data_classes/job_data.py | 9 ++- .../database_managers/database_manager.py | 7 ++- .../experiment_history_db_manager.py | 5 +- .../experiment_status_db_manager.py | 5 +- autosubmit/history/experiment_history.py | 16 ++--- autosubmit/history/experiment_status.py | 6 +- autosubmit/history/internal_logging.py | 2 + .../platform_monitor/platform_utils.py | 3 +- autosubmit/history/strategies.py | 6 +- autosubmit/job/job.py | 58 ++++++++++++------- autosubmit/job/job_common.py | 2 +- autosubmit/job/job_dict.py | 8 ++- autosubmit/job/job_grouping.py | 6 +- autosubmit/job/job_list.py | 36 ++++++------ autosubmit/job/job_list_persistence.py | 5 +- autosubmit/job/job_packager.py | 14 ++--- autosubmit/job/job_utils.py | 8 +-- autosubmit/monitor/diagram.py | 4 +- autosubmit/monitor/monitor.py | 29 +++++----- autosubmit/notifications/mail_notifier.py | 4 +- autosubmit/platforms/ecplatform.py | 14 +++-- autosubmit/platforms/locplatform.py | 11 ++-- autosubmit/platforms/lsfplatform.py | 2 +- autosubmit/platforms/paramiko_platform.py | 26 ++++----- autosubmit/platforms/paramiko_submitter.py | 21 +++---- autosubmit/platforms/pbsplatform.py | 5 +- autosubmit/platforms/pjmplatform.py | 3 +- autosubmit/platforms/platform.py | 1 + autosubmit/platforms/psplatform.py | 2 +- autosubmit/platforms/sgeplatform.py | 3 +- autosubmit/platforms/slurmplatform.py | 12 ++-- autosubmit/statistics/jobs_stat.py | 2 + autosubmit/statistics/statistics.py | 4 +- autosubmit/statistics/utils.py | 5 +- 43 files changed, 225 insertions(+), 177 deletions(-) diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index 626cfa1e9..bdc381b89 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -20,11 +20,12 @@ """ Module containing functions to manage autosubmit's database. """ +import multiprocessing import os import sqlite3 -import multiprocessing -import autosubmit -from log.log import Log, AutosubmitCritical, AutosubmitError + +from log.log import Log, AutosubmitCritical + Log.get_logger("Autosubmit") from autosubmitconfigparser.config.basicconfig import BasicConfig diff --git a/autosubmit/database/db_manager.py b/autosubmit/database/db_manager.py index 6e7376b9b..51639f2b7 100644 --- a/autosubmit/database/db_manager.py +++ b/autosubmit/database/db_manager.py @@ -17,8 +17,8 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import sqlite3 import os +import sqlite3 class DbManager(object): diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py index b42854359..f6681b5e6 100644 --- a/autosubmit/database/db_structure.py +++ b/autosubmit/database/db_structure.py @@ -18,13 +18,14 @@ # along with Autosubmit. If not, see . import os - +import sqlite3 import textwrap import traceback -import sqlite3 - from typing import Dict, List + from log.log import Log + + # from networkx import DiGraph # DB_FILE_AS_TIMES = "/esarchive/autosubmit/as_times.db" diff --git a/autosubmit/experiment/experiment_common.py b/autosubmit/experiment/experiment_common.py index 17eba23fc..2ee9c3929 100644 --- a/autosubmit/experiment/experiment_common.py +++ b/autosubmit/experiment/experiment_common.py @@ -21,8 +21,10 @@ Module containing functions to manage autosubmit's experiments. """ import string + import autosubmit.database.db_common as db_common -from log.log import Log,AutosubmitCritical +from log.log import Log, AutosubmitCritical + Log.get_logger("Autosubmit") def new_experiment(description, version, test=False, operational=False): diff --git a/autosubmit/experiment/statistics.py b/autosubmit/experiment/statistics.py index 793210923..1ca50fdbd 100644 --- a/autosubmit/experiment/statistics.py +++ b/autosubmit/experiment/statistics.py @@ -18,9 +18,10 @@ # along with Autosubmit. If not, see . import datetime + from autosubmit.job.job import Job from autosubmit.monitor.utils import FixedSizeList -from log.log import Log, AutosubmitError, AutosubmitCritical +from log.log import Log def timedelta2hours(deltatime): diff --git a/autosubmit/git/autosubmit_git.py b/autosubmit/git/autosubmit_git.py index 976b02f9f..42e795e9b 100644 --- a/autosubmit/git/autosubmit_git.py +++ b/autosubmit/git/autosubmit_git.py @@ -14,18 +14,21 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. +from os import path + # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import locale -from os import path import os -from shutil import rmtree -import subprocess import shutil +import subprocess +from shutil import rmtree +from time import time + # from autosubmit import Autosubmit from autosubmitconfigparser.config.basicconfig import BasicConfig -from time import time -from log.log import Log, AutosubmitCritical, AutosubmitError +from log.log import Log, AutosubmitCritical + Log.get_logger("Autosubmit") diff --git a/autosubmit/helpers/autosubmit_helper.py b/autosubmit/helpers/autosubmit_helper.py index 2aef35c49..cf04f79bc 100644 --- a/autosubmit/helpers/autosubmit_helper.py +++ b/autosubmit/helpers/autosubmit_helper.py @@ -17,16 +17,18 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from log.log import AutosubmitCritical, Log -from time import sleep -from autosubmitconfigparser.config.basicconfig import BasicConfig -from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from autosubmit.history.experiment_history import ExperimentHistory -from autosubmit.database.db_common import check_experiment_exists import datetime import sys +from time import sleep from typing import List +from autosubmit.database.db_common import check_experiment_exists +from autosubmit.history.experiment_history import ExperimentHistory +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from log.log import AutosubmitCritical, Log + + def handle_start_time(start_time): # type: (str) -> None """ Wait until the supplied time. """ diff --git a/autosubmit/helpers/parameters.py b/autosubmit/helpers/parameters.py index 97ddb6235..e52d3d664 100644 --- a/autosubmit/helpers/parameters.py +++ b/autosubmit/helpers/parameters.py @@ -1,6 +1,7 @@ +from collections import defaultdict + import functools import inspect -from collections import defaultdict from typing import Dict PARAMETERS = defaultdict(defaultdict) diff --git a/autosubmit/helpers/utils.py b/autosubmit/helpers/utils.py index fca94a126..5436643c7 100644 --- a/autosubmit/helpers/utils.py +++ b/autosubmit/helpers/utils.py @@ -1,9 +1,8 @@ import os import pwd -from log.log import Log, AutosubmitCritical -from autosubmitconfigparser.config.basicconfig import BasicConfig -from typing import Tuple +from log.log import AutosubmitCritical + def check_experiment_ownership(expid, basic_config, raise_error=False, logger=None): # [A-Za-z09]+ variable is not needed, LOG is global thus it will be read if available diff --git a/autosubmit/history/data_classes/job_data.py b/autosubmit/history/data_classes/job_data.py index bf1d394e5..33b3b8857 100644 --- a/autosubmit/history/data_classes/job_data.py +++ b/autosubmit/history/data_classes/job_data.py @@ -16,11 +16,14 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . +from json import dumps, loads + import collections -import autosubmit.history.utils as HUtils -import autosubmit.history.database_managers.database_models as Models from datetime import datetime, timedelta -from json import dumps , loads + +import autosubmit.history.database_managers.database_models as Models +import autosubmit.history.utils as HUtils + class JobData(object): """ diff --git a/autosubmit/history/database_managers/database_manager.py b/autosubmit/history/database_managers/database_manager.py index ccc702f5c..6e882660c 100644 --- a/autosubmit/history/database_managers/database_manager.py +++ b/autosubmit/history/database_managers/database_manager.py @@ -16,12 +16,13 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import sqlite3 import os -import autosubmit.history.utils as HUtils -import autosubmit.history.database_managers.database_models as Models +import sqlite3 from abc import ABCMeta +import autosubmit.history.database_managers.database_models as Models +import autosubmit.history.utils as HUtils + DEFAULT_JOBDATA_DIR = os.path.join('/esarchive', 'autosubmit', 'as_metadata', 'data') DEFAULT_HISTORICAL_LOGS_DIR = os.path.join('/esarchive', 'autosubmit', 'as_metadata', 'logs') DEFAULT_LOCAL_ROOT_DIR = os.path.join('/esarchive', 'autosubmit') diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py index 9e5662af6..78e9e23b2 100644 --- a/autosubmit/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit/history/database_managers/experiment_history_db_manager.py @@ -17,10 +17,11 @@ # along with Autosubmit. If not, see . import os import textwrap + import autosubmit.history.utils as HUtils -from . import database_models as Models -from autosubmit.history.data_classes.job_data import JobData from autosubmit.history.data_classes.experiment_run import ExperimentRun +from autosubmit.history.data_classes.job_data import JobData +from . import database_models as Models from .database_manager import DatabaseManager, DEFAULT_JOBDATA_DIR CURRENT_DB_VERSION = 18 diff --git a/autosubmit/history/database_managers/experiment_status_db_manager.py b/autosubmit/history/database_managers/experiment_status_db_manager.py index ed2f9f7b8..4ca93f885 100644 --- a/autosubmit/history/database_managers/experiment_status_db_manager.py +++ b/autosubmit/history/database_managers/experiment_status_db_manager.py @@ -20,10 +20,11 @@ import os import textwrap import time -from .database_manager import DatabaseManager, DEFAULT_LOCAL_ROOT_DIR -from autosubmitconfigparser.config.basicconfig import BasicConfig + import autosubmit.history.utils as HUtils +from autosubmitconfigparser.config.basicconfig import BasicConfig from . import database_models as Models +from .database_manager import DatabaseManager, DEFAULT_LOCAL_ROOT_DIR BasicConfig.read() diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index 7f6a49648..fba54b9e6 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -16,17 +16,19 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import traceback +from time import time, sleep + import autosubmit.history.database_managers.database_models as Models import autosubmit.history.utils as HUtils -from time import time, sleep -from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager -from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR -from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, TwoDimWrapperDistributionStrategy, GeneralizedWrapperDistributionStrategy -from .data_classes.job_data import JobData +from autosubmitconfigparser.config.basicconfig import BasicConfig from .data_classes.experiment_run import ExperimentRun -from .platform_monitor.slurm_monitor import SlurmMonitor +from .data_classes.job_data import JobData +from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR +from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager from .internal_logging import Logging -from autosubmitconfigparser.config.basicconfig import BasicConfig +from .platform_monitor.slurm_monitor import SlurmMonitor +from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, \ + TwoDimWrapperDistributionStrategy, GeneralizedWrapperDistributionStrategy SECONDS_WAIT_PLATFORM = 60 diff --git a/autosubmit/history/experiment_status.py b/autosubmit/history/experiment_status.py index 9d4c7deb0..4d17262eb 100644 --- a/autosubmit/history/experiment_status.py +++ b/autosubmit/history/experiment_status.py @@ -17,10 +17,12 @@ # along with Autosubmit. If not, see . import traceback -from .database_managers.experiment_status_db_manager import ExperimentStatusDbManager + +from autosubmitconfigparser.config.basicconfig import BasicConfig from .database_managers.database_manager import DEFAULT_LOCAL_ROOT_DIR, DEFAULT_HISTORICAL_LOGS_DIR +from .database_managers.experiment_status_db_manager import ExperimentStatusDbManager from .internal_logging import Logging -from autosubmitconfigparser.config.basicconfig import BasicConfig + class ExperimentStatus: """ Represents the Experiment Status Mechanism that keeps track of currently active experiments """ diff --git a/autosubmit/history/internal_logging.py b/autosubmit/history/internal_logging.py index f9b667814..39c674097 100644 --- a/autosubmit/history/internal_logging.py +++ b/autosubmit/history/internal_logging.py @@ -16,9 +16,11 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import os + from . import utils as HUtils from .database_managers.database_manager import DEFAULT_HISTORICAL_LOGS_DIR + class Logging: def __init__(self, expid, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR): self.expid = expid diff --git a/autosubmit/history/platform_monitor/platform_utils.py b/autosubmit/history/platform_monitor/platform_utils.py index 433e654ec..fc00541dc 100644 --- a/autosubmit/history/platform_monitor/platform_utils.py +++ b/autosubmit/history/platform_monitor/platform_utils.py @@ -16,9 +16,8 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import os -from time import mktime from datetime import datetime +from time import mktime SLURM_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S" diff --git a/autosubmit/history/strategies.py b/autosubmit/history/strategies.py index c18ae96f5..d86cd2aa0 100644 --- a/autosubmit/history/strategies.py +++ b/autosubmit/history/strategies.py @@ -16,11 +16,13 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . +import traceback from abc import ABCMeta, abstractmethod + import autosubmit.history.database_managers.database_models as Models -import traceback +from .database_managers.database_manager import DEFAULT_HISTORICAL_LOGS_DIR from .internal_logging import Logging -from .database_managers.database_manager import DEFAULT_LOCAL_ROOT_DIR, DEFAULT_HISTORICAL_LOGS_DIR + class PlatformInformationHandler: def __init__(self, strategy): diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 28ed5a092..d021611c9 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -21,33 +21,33 @@ Main module for Autosubmit. Only contains an interface class to all functionality implemented on Autosubmit """ -import os -import re -import time -import json -import datetime -import textwrap from collections import OrderedDict -import copy +import copy +import datetime +import json import locale +import os +import re +import textwrap +import time +from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates +from functools import reduce +from threading import Thread +from time import sleep +from typing import List, Union -from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from autosubmit.job.job_common import Status, Type, increase_wallclock_by_chunk +from autosubmit.helpers.parameters import autosubmit_parameter, autosubmit_parameters +from autosubmit.history.experiment_history import ExperimentHistory from autosubmit.job.job_common import StatisticsSnippetBash, StatisticsSnippetPython from autosubmit.job.job_common import StatisticsSnippetR, StatisticsSnippetEmpty +from autosubmit.job.job_common import Status, Type, increase_wallclock_by_chunk from autosubmit.job.job_utils import get_job_package_code -from autosubmitconfigparser.config.basicconfig import BasicConfig -from autosubmit.history.experiment_history import ExperimentHistory -from bscearth.utils.date import date2str, parse_date, previous_day, chunk_end_date, chunk_start_date, Log, subs_dates -from time import sleep -from threading import Thread from autosubmit.platforms.paramiko_submitter import ParamikoSubmitter -from log.log import Log, AutosubmitCritical, AutosubmitError -from typing import List, Union -from functools import reduce +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmitconfigparser.config.configcommon import AutosubmitConfig from autosubmitconfigparser.config.yamlparser import YAMLParserFactory -from autosubmit.helpers.parameters import autosubmit_parameter, autosubmit_parameters +from log.log import Log, AutosubmitCritical, AutosubmitError Log.get_logger("Autosubmit") @@ -150,7 +150,8 @@ class Job(object): self._platform = None self._queue = None self._partition = None - + self.x11 = None + self.x11_options = None self.retry_delay = "0" self.platform_name = None # type: str #: (str): Type of the job, as given on job configuration file. (job: TASKTYPE) @@ -188,7 +189,6 @@ class Job(object): self.file = None self.additional_files = [] self.executable = None - self.x11 = False self._local_logs = ('', '') self._remote_logs = ('', '') self.script_name = self.name + ".cmd" @@ -664,6 +664,23 @@ class Job(object): """ self.fail_count += 1 + @property + @autosubmit_parameter(name='x11') + def x11(self): + """Whether to use X11 forwarding""" + return self._x11 + @x11.setter + def x11(self, value): + self._x11 = value + + @property + @autosubmit_parameter(name='x11_options') + def x11_options(self): + """Allows to set salloc parameters for x11""" + return self._x11_options + @x11_options.setter + def x11_options(self, value): + self._x11_options = value # Maybe should be renamed to the plural? def add_parent(self, *parents): """ @@ -1257,6 +1274,7 @@ class Job(object): return parameters def update_platform_associated_parameters(self,as_conf, parameters, job_platform, chunk): + self.x11_options = str(as_conf.jobs_data[self.section].get("X11_OPTIONS", as_conf.platforms_data.get(job_platform.name,{}).get("X11_OPTIONS",""))) self.executable = str(as_conf.jobs_data[self.section].get("EXECUTABLE", as_conf.platforms_data.get(job_platform.name,{}).get("EXECUTABLE",""))) self.total_jobs = int(as_conf.jobs_data[self.section].get("TOTALJOBS", job_platform.total_jobs)) self.max_waiting_jobs = int(as_conf.jobs_data[self.section].get("MAXWAITINGJOBS", job_platform.max_waiting_jobs)) diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index 4d05d985c..6b818fd03 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -14,10 +14,10 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. +import datetime # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import textwrap -import datetime class Status: diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index e2f673563..e9c3ef491 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -17,11 +17,13 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from autosubmit.job.job import Job from bscearth.utils.date import date2str + +from autosubmit.job.job import Job from autosubmit.job.job_common import Status, Type -from log.log import Log, AutosubmitError, AutosubmitCritical -from collections.abc import Iterable +from log.log import AutosubmitCritical + + class DicJobs: """ Class to create jobs from conf file and to find jobs by start date, member and chunk diff --git a/autosubmit/job/job_grouping.py b/autosubmit/job/job_grouping.py index bcddaf038..ad9a09bf0 100644 --- a/autosubmit/job/job_grouping.py +++ b/autosubmit/job/job_grouping.py @@ -17,9 +17,11 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from autosubmit.job.job_common import Status -from bscearth.utils.date import date2str import copy +from bscearth.utils.date import date2str + +from autosubmit.job.job_common import Status + class JobGrouping(object): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index edd67d1c5..300f13c12 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -14,36 +14,36 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# You should have received a copy of the GNU General Public License -# along with Autosubmit. If not, see . -import collections import copy -import re +import datetime +import math import os import pickle +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . +import re import traceback -import math -import copy -from time import localtime, strftime, mktime +from bscearth.utils.date import date2str, parse_date +from networkx import DiGraph from shutil import move +from threading import Thread +from time import localtime, strftime, mktime +from typing import List, Dict + +import autosubmit.database.db_structure as DbStructure +from autosubmit.helpers.data_transfer import JobRow from autosubmit.job.job import Job -from autosubmit.job.job_package_persistence import JobPackagePersistence +from autosubmit.job.job_common import Status, bcolors from autosubmit.job.job_dict import DicJobs +from autosubmit.job.job_package_persistence import JobPackagePersistence from autosubmit.job.job_packages import JobPackageThread from autosubmit.job.job_utils import Dependency -from autosubmit.job.job_common import Status, bcolors -from bscearth.utils.date import date2str, parse_date -import autosubmit.database.db_structure as DbStructure -import datetime -from networkx import DiGraph from autosubmit.job.job_utils import transitive_reduction -from log.log import AutosubmitCritical, AutosubmitError, Log -from threading import Thread from autosubmitconfigparser.config.basicconfig import BasicConfig from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from autosubmit.helpers.data_transfer import JobRow -from typing import List, Dict -import log.fd_show +from log.log import AutosubmitCritical, AutosubmitError, Log + + # Log.get_logger("Log.Autosubmit") diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 7554ddad7..abffe5080 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -14,15 +14,14 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. +import os # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import pickle from sys import setrecursionlimit -import os - -from log.log import Log from autosubmit.database.db_manager import DbManager +from log.log import Log class JobListPersistence(object): diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 67e833e27..2fce137a7 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -17,17 +17,17 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import collections -from log.log import Log, AutosubmitCritical, AutosubmitError -from autosubmit.job.job_common import Status, Type +import copy +import operator from bscearth.utils.date import sum_str_hours -from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal, \ - JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal, JobPackageBase -from operator import attrgetter from math import ceil -import operator +from operator import attrgetter from typing import List -import copy +from autosubmit.job.job_common import Status, Type +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal, \ + JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal, JobPackageBase +from log.log import Log, AutosubmitCritical class JobPackager(object): diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index 978212273..d7071bb7f 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -19,14 +19,14 @@ import networkx import os - -from networkx.algorithms.dag import is_directed_acyclic_graph from networkx import DiGraph -from networkx import dfs_edges from networkx import NetworkXError +from networkx import dfs_edges +from networkx.algorithms.dag import is_directed_acyclic_graph +from typing import Dict + from autosubmit.job.job_package_persistence import JobPackagePersistence from autosubmitconfigparser.config.basicconfig import BasicConfig -from typing import Dict def transitive_reduction(graph): diff --git a/autosubmit/monitor/diagram.py b/autosubmit/monitor/diagram.py index 786c66e49..b59e3e864 100644 --- a/autosubmit/monitor/diagram.py +++ b/autosubmit/monitor/diagram.py @@ -17,9 +17,9 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import traceback -import numpy as np import matplotlib as mtp +import numpy as np +import traceback mtp.use('Agg') import matplotlib.pyplot as plt diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index 8b8bffc55..e1e2c1e70 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -14,34 +14,31 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# You should have received a copy of the GNU General Public License -# along with Autosubmit. If not, see . -import datetime -import os -import time -import sys -from os import path from os import chdir from os import listdir +from os import path from os import remove -import py3dotplus as pydotplus import copy - +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . +import datetime +import os +import py3dotplus as pydotplus import subprocess -import autosubmit.history.utils as HUtils -import autosubmit.helpers.utils as HelperUtils +import sys +import time +from typing import Dict, List -from autosubmit.job.job_common import Status +import autosubmit.helpers.utils as HelperUtils +import autosubmit.history.utils as HUtils from autosubmit.job.job import Job +from autosubmit.job.job_common import Status from autosubmitconfigparser.config.basicconfig import BasicConfig from autosubmitconfigparser.config.configcommon import AutosubmitConfig - -from log.log import Log, AutosubmitCritical from autosubmitconfigparser.config.yamlparser import YAMLParserFactory - +from log.log import Log, AutosubmitCritical from .diagram import create_bar_diagram -from typing import Dict, List GENERAL_STATS_OPTION_MAX_LENGTH = 1000 diff --git a/autosubmit/notifications/mail_notifier.py b/autosubmit/notifications/mail_notifier.py index 45c01c1c9..be8e0dcfd 100644 --- a/autosubmit/notifications/mail_notifier.py +++ b/autosubmit/notifications/mail_notifier.py @@ -17,11 +17,13 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import smtplib import email.utils +import smtplib from email.mime.text import MIMEText + from log.log import Log + class MailNotifier: def __init__(self, basic_config): self.config = basic_config diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 99c44aa7a..517eed2a1 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -14,19 +14,21 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. +import locale # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import locale import os import subprocess -from autosubmit.platforms.paramiko_platform import ParamikoPlatform, ParamikoPlatformException -from log.log import Log,AutosubmitError -from autosubmit.platforms.headers.ec_header import EcHeader +from time import sleep + from autosubmit.platforms.headers.ec_cca_header import EcCcaHeader +from autosubmit.platforms.headers.ec_header import EcHeader from autosubmit.platforms.headers.slurm_header import SlurmHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform, ParamikoPlatformException from autosubmit.platforms.wrappers.wrapper_factory import EcWrapperFactory -from time import sleep -import locale +from log.log import Log, AutosubmitError + + class EcPlatform(ParamikoPlatform): """ Class to manage queues with ecaccess diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 2950a7176..543e8b631 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -18,16 +18,15 @@ # along with Autosubmit. If not, see . import locale import os -from xml.dom.minidom import parseString import subprocess +from time import sleep +from xml.dom.minidom import parseString - -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.local_header import LocalHeader - +from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmitconfigparser.config.basicconfig import BasicConfig -from time import sleep -from log.log import Log, AutosubmitError, AutosubmitCritical +from log.log import Log, AutosubmitError + class LocalPlatform(ParamikoPlatform): """ diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index a03ec5dee..c401bdb32 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -19,8 +19,8 @@ import os -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.lsf_header import LsfHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.wrappers.wrapper_factory import LSFWrapperFactory diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index cfca6e3db..8808b2048 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1,25 +1,23 @@ +import Xlib.support.connect as xlib_connect +import datetime import locale -from binascii import hexlify -from contextlib import suppress -from time import sleep -import sys -import socket import os import paramiko -import datetime -import time -import select +import random import re +import select +import socket +import sys +from bscearth.utils.date import date2str from datetime import timedelta -import random +from paramiko.ssh_exception import (SSHException) +from threading import Thread +from time import sleep + from autosubmit.job.job_common import Status from autosubmit.job.job_common import Type from autosubmit.platforms.platform import Platform -from bscearth.utils.date import date2str from log.log import AutosubmitError, AutosubmitCritical, Log -from paramiko.ssh_exception import (SSHException) -import Xlib.support.connect as xlib_connect -from threading import Thread def threaded(fn): @@ -898,7 +896,7 @@ class ParamikoPlatform(Platform): chan.settimeout(timeout) if x11 == "true": command = command + " ; sleep infinity" - chan.exec_command(command) + chan.exec_command(command,x11) chan_fileno = chan.fileno() self.poller.register(chan_fileno, select.POLLIN) self.x11_status_checker(chan, chan_fileno) diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index 534a64a42..8589f6813 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -18,22 +18,23 @@ # along with Autosubmit. If not, see . -import os from collections import defaultdict -from log.log import Log,AutosubmitCritical,AutosubmitError -from autosubmitconfigparser.config.basicconfig import BasicConfig -from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from .submitter import Submitter -from autosubmit.platforms.psplatform import PsPlatform +import os + +from autosubmit.platforms.ecplatform import EcPlatform +from autosubmit.platforms.locplatform import LocalPlatform from autosubmit.platforms.lsfplatform import LsfPlatform +from autosubmit.platforms.paramiko_platform import ParamikoPlatformException from autosubmit.platforms.pbsplatform import PBSPlatform +from autosubmit.platforms.pjmplatform import PJMPlatform +from autosubmit.platforms.psplatform import PsPlatform from autosubmit.platforms.sgeplatform import SgePlatform -from autosubmit.platforms.ecplatform import EcPlatform from autosubmit.platforms.slurmplatform import SlurmPlatform -from autosubmit.platforms.pjmplatform import PJMPlatform -from autosubmit.platforms.locplatform import LocalPlatform -from autosubmit.platforms.paramiko_platform import ParamikoPlatformException +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from log.log import Log, AutosubmitError +from .submitter import Submitter class ParamikoSubmitter(Submitter): diff --git a/autosubmit/platforms/pbsplatform.py b/autosubmit/platforms/pbsplatform.py index 132b8715c..9960e4246 100644 --- a/autosubmit/platforms/pbsplatform.py +++ b/autosubmit/platforms/pbsplatform.py @@ -19,12 +19,11 @@ import os -from autosubmit.platforms.paramiko_platform import ParamikoPlatform -from log.log import Log, AutosubmitCritical - from autosubmit.platforms.headers.pbs10_header import Pbs10Header from autosubmit.platforms.headers.pbs11_header import Pbs11Header from autosubmit.platforms.headers.pbs12_header import Pbs12Header +from autosubmit.platforms.paramiko_platform import ParamikoPlatform +from log.log import Log, AutosubmitCritical class PBSPlatform(ParamikoPlatform): diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 3b88cbe31..7acca0fe7 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -23,11 +23,12 @@ from time import sleep from typing import List, Union from autosubmit.job.job_common import Status -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.pjm_header import PJMHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.wrappers.wrapper_factory import PJMWrapperFactory from log.log import AutosubmitCritical, AutosubmitError, Log + class PJMPlatform(ParamikoPlatform): """ Class to manage jobs to host using PJM scheduler diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 1f23cc6fc..4f34427b0 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -61,6 +61,7 @@ class Platform(object): self._submit_hold_cmd = None self._submit_command_name = None self._submit_cmd = None + self._submit_cmd_x11 = None self._checkhost_cmd = None self.cancel_cmd = None diff --git a/autosubmit/platforms/psplatform.py b/autosubmit/platforms/psplatform.py index 2a5fe05d5..d68ba3216 100644 --- a/autosubmit/platforms/psplatform.py +++ b/autosubmit/platforms/psplatform.py @@ -20,8 +20,8 @@ import os from xml.dom.minidom import parseString -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.ps_header import PsHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform class PsPlatform(ParamikoPlatform): diff --git a/autosubmit/platforms/sgeplatform.py b/autosubmit/platforms/sgeplatform.py index 58671cd98..662d58cb2 100644 --- a/autosubmit/platforms/sgeplatform.py +++ b/autosubmit/platforms/sgeplatform.py @@ -19,11 +19,10 @@ import os import subprocess - from xml.dom.minidom import parseString -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.sge_header import SgeHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform class SgePlatform(ParamikoPlatform): diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index e867ff062..f6a353736 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -18,18 +18,16 @@ # along with Autosubmit. If not, see . import locale import os -from contextlib import suppress -from time import sleep +from datetime import datetime from time import mktime +from time import sleep from time import time -from datetime import datetime from typing import List, Union - from xml.dom.minidom import parseString from autosubmit.job.job_common import Status, parse_output_number -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.slurm_header import SlurmHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.wrappers.wrapper_factory import SlurmWrapperFactory from log.log import AutosubmitCritical, AutosubmitError, Log @@ -51,6 +49,7 @@ class SlurmPlatform(ParamikoPlatform): self._submit_hold_cmd = None self._submit_command_name = None self._submit_cmd = None + self.x11_options = None self._checkhost_cmd = None self.cancel_cmd = None self._header = SlurmHeader() @@ -282,6 +281,7 @@ class SlurmPlatform(ParamikoPlatform): self._checkhost_cmd = "echo 1" self._submit_cmd = 'sbatch -D {1} {1}/'.format( self.host, self.remote_log_dir) + self._submit_cmd_x11 = f'salloc -D {self.remote_log_dir} {self.remote_log_dir}' self._submit_command_name = "sbatch" self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format( self.host, self.remote_log_dir) @@ -526,7 +526,7 @@ class SlurmPlatform(ParamikoPlatform): if x11 == "true": if not hold: - return export + self._submit_cmd + job_script + return f'{export.strip("")} {self._submit_cmd_x11} {job_script.strip("")} {job.x11_options.strip("")}' else: return export + self._submit_hold_cmd + job_script else: diff --git a/autosubmit/statistics/jobs_stat.py b/autosubmit/statistics/jobs_stat.py index b2d1de97b..c0338fe18 100644 --- a/autosubmit/statistics/jobs_stat.py +++ b/autosubmit/statistics/jobs_stat.py @@ -1,7 +1,9 @@ #!/bin/env/python from datetime import datetime, timedelta + from .utils import timedelta2hours + class JobStat(object): def __init__(self, name, processors, wallclock, section, date, member, chunk): # type: (str, int, float, str, str, str, str) -> None diff --git a/autosubmit/statistics/statistics.py b/autosubmit/statistics/statistics.py index 504cd9b50..8187a2df3 100644 --- a/autosubmit/statistics/statistics.py +++ b/autosubmit/statistics/statistics.py @@ -1,11 +1,13 @@ #!/bin/env/python from datetime import datetime, timedelta +from typing import List, Union, Dict + from autosubmit.job.job import Job from .jobs_stat import JobStat from .stats_summary import StatsSummary from .utils import timedelta2hours, parse_number_processors -from typing import List, Union, Dict + # from collections import namedtuple _COMPLETED_RETRIAL = 1 diff --git a/autosubmit/statistics/utils.py b/autosubmit/statistics/utils.py index 4dc9acf2f..a0260b875 100644 --- a/autosubmit/statistics/utils.py +++ b/autosubmit/statistics/utils.py @@ -1,12 +1,13 @@ #!/bin/env/python import math -from autosubmit.job.job import Job from datetime import datetime, timedelta -from autosubmit.job.job_common import Status from typing import List, Tuple +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status from log.log import AutosubmitCritical + def filter_by_section(jobs, section): # type: (List[Job], str) -> List[Job] """ Filter jobs by provided section """ -- GitLab From 5ef9ae5d84de8cdcb0cc8cadb52bf4ad1de5f738 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 9 Aug 2023 15:36:36 +0200 Subject: [PATCH 03/20] Trying to making x11 work --- autosubmit/platforms/paramiko_platform.py | 7 ++++--- autosubmit/platforms/slurmplatform.py | 10 ++++++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 8808b2048..4411442f0 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -885,18 +885,19 @@ class ParamikoPlatform(Platform): """ while retries > 0: try: - chan = self.transport.open_session() + chan = self.transport.open_session(timeout=10) if x11 == "true": display = os.getenv('DISPLAY') if display is None or not display: display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) - chan.request_x11(handler=self.x11_handler) + chan.request_x11(screen_number=self.local_x11_display[:4],single_connection=False,handler=self.x11_handler) else: chan.settimeout(timeout) if x11 == "true": command = command + " ; sleep infinity" - chan.exec_command(command,x11) + #command = command + chan.exec_command(command) chan_fileno = chan.fileno() self.poller.register(chan_fileno, select.POLLIN) self.x11_status_checker(chan, chan_fileno) diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index f6a353736..c1a2d5cee 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -281,7 +281,7 @@ class SlurmPlatform(ParamikoPlatform): self._checkhost_cmd = "echo 1" self._submit_cmd = 'sbatch -D {1} {1}/'.format( self.host, self.remote_log_dir) - self._submit_cmd_x11 = f'salloc -D {self.remote_log_dir} {self.remote_log_dir}' + self._submit_cmd_x11 = f'-D {self.remote_log_dir} {self.remote_log_dir}' self._submit_command_name = "sbatch" self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format( self.host, self.remote_log_dir) @@ -290,6 +290,12 @@ class SlurmPlatform(ParamikoPlatform): self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir + def get_submit_cmd_x11(self, args, script_name): + """ + Returns the submit command for the platform + """ + return f'salloc {args} {self._submit_cmd_x11}/{script_name}' + def hold_job(self, job): try: cmd = "scontrol release {0} ; sleep 2 ; scontrol hold {0} ".format(job.id) @@ -526,7 +532,7 @@ class SlurmPlatform(ParamikoPlatform): if x11 == "true": if not hold: - return f'{export.strip("")} {self._submit_cmd_x11} {job_script.strip("")} {job.x11_options.strip("")}' + return self.get_submit_cmd_x11(job.x11_options.strip(""), job_script.strip("")) else: return export + self._submit_hold_cmd + job_script else: -- GitLab From 7fea4c4a94db0e87d212b19776a919c4a8a1e0df Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 10 Aug 2023 10:33:20 +0200 Subject: [PATCH 04/20] Not working but some improvements --- autosubmit/platforms/paramiko_platform.py | 47 ++++++++++++++--------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 4411442f0..ae3cb4346 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -18,7 +18,7 @@ from autosubmit.job.job_common import Status from autosubmit.job.job_common import Type from autosubmit.platforms.platform import Platform from log.log import AutosubmitError, AutosubmitCritical, Log - +import re def threaded(fn): def wrapper(*args, **kwargs): @@ -832,6 +832,8 @@ class ParamikoPlatform(Platform): @threaded def x11_status_checker(self, session, session_fileno): self.transport.accept() + Log.warning(f'x11_status_checker() {session_fileno} {session}') + while not session.exit_status_ready(): try: if sys.platform != "linux": @@ -852,6 +854,7 @@ class ParamikoPlatform(Platform): try: # forward data between local/remote x11 socket. data = channel.recv(4096) + Log.warning(data) counterpart.sendall(data) except socket.error: channel.close() @@ -860,7 +863,6 @@ class ParamikoPlatform(Platform): except Exception as e: pass - def exec_command(self, command, bufsize=-1, timeout=None, get_pty=False,retries=3, x11=False): """ Execute a command on the SSH server. A new `.Channel` is opened and @@ -885,19 +887,24 @@ class ParamikoPlatform(Platform): """ while retries > 0: try: - chan = self.transport.open_session(timeout=10) if x11 == "true": display = os.getenv('DISPLAY') if display is None or not display: display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) - chan.request_x11(screen_number=self.local_x11_display[:4],single_connection=False,handler=self.x11_handler) + chan = self.transport.open_session() + chan.request_x11(screen_number=self.local_x11_display[3],single_connection=False,handler=self.x11_handler) else: + chan = self.transport.open_session() chan.settimeout(timeout) if x11 == "true": - command = command + " ; sleep infinity" - #command = command - chan.exec_command(command) + #command = command + " ; sleep infinity 2>/dev/null" + command = command + Log.info(command) + try: + chan.exec_command(command) + except BaseException as e: + raise AutosubmitCritical("Failed to execute command: %s" % e) chan_fileno = chan.fileno() self.poller.register(chan_fileno, select.POLLIN) self.x11_status_checker(chan, chan_fileno) @@ -915,14 +922,7 @@ class ParamikoPlatform(Platform): retries = retries - 1 if retries <= 0: return False , False, False - def exec_command_x11(self, command, bufsize=-1, timeout=None, get_pty=False,retries=3, x11=False): - session = self.transport.open_session() - session.request_x11(handler=self.x11_handler) - session.exec_command(command + " ; sleep infinity") - session_fileno = session.fileno() - self.poller.register(session_fileno, select.POLLIN) - self.x11_status_checker(session, session_fileno) - pass + def send_command(self, command, ignore_log=False, x11 = False): """ Sends given command to HPC @@ -955,7 +955,7 @@ class ParamikoPlatform(Platform): channel.settimeout(timeout) stdin.close() channel.shutdown_write() - stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) + stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): # stop if channel was closed prematurely, and there is no data in the buffers. got_chunk = False @@ -972,9 +972,17 @@ class ParamikoPlatform(Platform): stderr.channel.recv_stderr(len(c.in_stderr_buffer))) #stdout_chunks.append(" ") got_chunk = True - if x11 == "true": - got_chunk = True - break + if x11 == "true": + if len(stderr_readlines) > 0: + job_id = re.findall(r'\d+', str(stderr_readlines[0]))[0] + stdout_chunks.append(job_id) + print(job_id) + stderr_readlines = [] + break + Log.info(f'stdout_chunks: {stdout_chunks} stderr_readlines: {stderr_readlines}') + # if x11 == "true": + # got_chunk = True + # break if not got_chunk and stdout.channel.exit_status_ready() and not stderr.channel.recv_stderr_ready() and not stdout.channel.recv_ready(): # indicate that we're not going to read from this channel anymore stdout.channel.shutdown_read() @@ -1014,6 +1022,7 @@ class ParamikoPlatform(Platform): else: pass #Log.debug('Command {0} in {1} successful with out message: {2}', command, self.host, self._ssh_output) + Log.info("reached send_command") return True except AttributeError as e: raise AutosubmitError( -- GitLab From 5da74785f68783399eb0b7da10c04708889085fb Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 29 Aug 2023 13:06:08 +0200 Subject: [PATCH 05/20] x11 ( xclock working) --- autosubmit/platforms/paramiko_platform.py | 48 +++++++++++------------ autosubmit/platforms/platform.py | 3 +- autosubmit/platforms/slurmplatform.py | 8 ++-- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index ae3cb4346..c90633ffd 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -825,27 +825,30 @@ class ParamikoPlatform(Platform): self.transport._queue_incoming_channel(channel) def flush_out(self,session): + #flush_out is not working fine + while session.recv_ready(): - sys.stdout.write(session.recv(4096)) + sys.stdout.write(session.recv(4096).decode(locale.getlocale()[1])) while session.recv_stderr_ready(): - sys.stderr.write(session.recv_stderr(4096)) + sys.stderr.write(session.recv_stderr(4096).decode(locale.getlocale()[1])) + @threaded def x11_status_checker(self, session, session_fileno): + poller = None self.transport.accept() - Log.warning(f'x11_status_checker() {session_fileno} {session}') - while not session.exit_status_ready(): try: - if sys.platform != "linux": - self.poller = self.poller.kqueue() - else: - self.poller = self.poller.poll() + if type(self.poller) is not list: + if sys.platform != "linux": + poller = self.poller.kqueue() + else: + poller = self.poller.poll() # accept subsequent x11 connections if any if len(self.transport.server_accepts) > 0: self.transport.accept() - if not self.poller: # this should not happen, as we don't have a timeout. + if not poller: # this should not happen, as we don't have a timeout. break - for fd, event in self.poller: + for fd, event in poller: if fd == session_fileno: self.flush_out(session) # data either on local/remote x11 socket @@ -854,7 +857,6 @@ class ParamikoPlatform(Platform): try: # forward data between local/remote x11 socket. data = channel.recv(4096) - Log.warning(data) counterpart.sendall(data) except socket.error: channel.close() @@ -893,13 +895,13 @@ class ParamikoPlatform(Platform): display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) chan = self.transport.open_session() - chan.request_x11(screen_number=self.local_x11_display[3],single_connection=False,handler=self.x11_handler) + chan.request_x11(single_connection=False,handler=self.x11_handler) else: chan = self.transport.open_session() chan.settimeout(timeout) if x11 == "true": - #command = command + " ; sleep infinity 2>/dev/null" - command = command + command = f'{command} ; sleep infinity 2>/dev/null' + #command = f'export display {command}' Log.info(command) try: chan.exec_command(command) @@ -956,8 +958,12 @@ class ParamikoPlatform(Platform): stdin.close() channel.shutdown_write() stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) - while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): + i = 0 + x11_exit = False + while (not channel.closed or channel.recv_ready() or channel.recv_stderr_ready() ) and not x11_exit: # stop if channel was closed prematurely, and there is no data in the buffers. + i = i+1 + print(i) got_chunk = False readq, _, _ = select.select([stdout.channel], [], [], 2) for c in readq: @@ -975,16 +981,13 @@ class ParamikoPlatform(Platform): if x11 == "true": if len(stderr_readlines) > 0: job_id = re.findall(r'\d+', str(stderr_readlines[0]))[0] - stdout_chunks.append(job_id) + stdout_chunks.append(job_id.encode(lang)) print(job_id) stderr_readlines = [] + x11_exit = True break - Log.info(f'stdout_chunks: {stdout_chunks} stderr_readlines: {stderr_readlines}') - # if x11 == "true": - # got_chunk = True - # break if not got_chunk and stdout.channel.exit_status_ready() and not stderr.channel.recv_stderr_ready() and not stdout.channel.recv_ready(): - # indicate that we're not going to read from this channel anymore + # indicate that we're not going to read from this channel any more stdout.channel.shutdown_read() # close the channel stdout.channel.close() @@ -993,8 +996,6 @@ class ParamikoPlatform(Platform): if not x11: stdout.close() stderr.close() - - self._ssh_output = "" self._ssh_output_err = "" for s in stdout_chunks: @@ -1002,7 +1003,6 @@ class ParamikoPlatform(Platform): self._ssh_output += s.decode(lang) for errorLineCase in stderr_readlines: self._ssh_output_err += errorLineCase.decode(lang) - errorLine = errorLineCase.lower().decode(lang) if "not active" in errorLine: raise AutosubmitError( diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 4f34427b0..0fd63fbe8 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -265,7 +265,8 @@ class Platform(object): save = True if not inspect: job_list.save() - valid_packages_to_submit.append(package) + if package.x11 != "true": + valid_packages_to_submit.append(package) # Log.debug("FD end-submit: {0}".format(log.fd_show.fd_table_status_str(open())) except (IOError, OSError): if package.jobs[0].id != 0: diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index c1a2d5cee..f29636575 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -504,12 +504,12 @@ class SlurmPlatform(ParamikoPlatform): if outputlines.find("failed") != -1: raise AutosubmitCritical( "Submission failed. Command Failed", 7014) - jobs_id = [] - for output in outputlines.splitlines(): - jobs_id.append(int(output.split(' ')[3])) if x11 == "true": - return jobs_id[0] + return int(outputlines.splitlines()[0]) else: + jobs_id = [] + for output in outputlines.splitlines(): + jobs_id.append(int(output.split(' ')[3])) return jobs_id except IndexError: raise AutosubmitCritical( -- GitLab From 2cf37fb3a5967c20e98fb3c483c54a7652677557 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 29 Aug 2023 13:35:18 +0200 Subject: [PATCH 06/20] wxparaver working --- autosubmit/job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index d021611c9..3fe7cf769 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1518,7 +1518,7 @@ class Job(object): template_file.close() else: if self.type == Type.BASH: - template = 'xclock' + template = 'sleep 5' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: -- GitLab From 68a8da53692dd6e45a36c4fc0c7d974b08325296 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 29 Aug 2023 14:06:50 +0200 Subject: [PATCH 07/20] docs --- docs/source/userguide/configure/index.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/source/userguide/configure/index.rst b/docs/source/userguide/configure/index.rst index b0a000bc0..0b12f29ef 100644 --- a/docs/source/userguide/configure/index.rst +++ b/docs/source/userguide/configure/index.rst @@ -111,7 +111,6 @@ To do this use: * NODES: nodes number to be submitted to the HPC. If not specified, the directive is not added. - * HYPERTHREADING: Enables Hyper-threading, this will double the max amount of threads. defaults to false. ( Not available on slurm platforms ) * QUEUE: queue to add the job to. If not specified, uses PLATFORM default. @@ -147,6 +146,10 @@ There are also other, less used features that you can use: * QUEUE: queue to add the job to. If not specified, uses PLATFORM default. +* X11: Allows to run a job with X11 forwarding using salloc. + +* X11_OPTIONS: Allows to set salloc ( X11 ) options. EX: "-n 4 --qos=debug --partition=debug --x11=all --time=00:30:00" + How to configure email notifications ------------------------------------ -- GitLab From 1fef7959b56a35e3cb7e7ebc016ceb513d3a9782 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 3 Aug 2023 14:22:09 +0200 Subject: [PATCH 08/20] testing xclock t --- autosubmit/job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 6dc85a87e..6c5ddeaf9 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1525,7 +1525,7 @@ class Job(object): template_file.close() else: if self.type == Type.BASH: - template = 'sleep 5' + template = 'xclock' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: -- GitLab From fd05a72f34e00b7a1a23a583558d8b9ed5ab2927 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Fri, 4 Aug 2023 10:46:18 +0200 Subject: [PATCH 09/20] Pycharm fixing imports --- autosubmit/database/db_common.py | 7 +++-- autosubmit/database/db_manager.py | 2 +- autosubmit/database/db_structure.py | 7 +++-- autosubmit/experiment/experiment_common.py | 4 ++- autosubmit/experiment/statistics.py | 3 +- autosubmit/git/autosubmit_git.py | 13 +++++---- autosubmit/helpers/autosubmit_helper.py | 14 +++++---- autosubmit/helpers/parameters.py | 3 +- autosubmit/helpers/utils.py | 5 ++-- autosubmit/history/data_classes/job_data.py | 9 ++++-- .../database_managers/database_manager.py | 7 +++-- .../experiment_history_db_manager.py | 5 ++-- .../experiment_status_db_manager.py | 5 ++-- autosubmit/history/experiment_history.py | 16 +++++----- autosubmit/history/experiment_status.py | 6 ++-- autosubmit/history/internal_logging.py | 2 ++ .../platform_monitor/platform_utils.py | 3 +- autosubmit/history/strategies.py | 6 ++-- autosubmit/job/job.py | 22 ++++++++++++-- autosubmit/job/job_common.py | 2 ++ autosubmit/job/job_dict.py | 8 +++-- autosubmit/job/job_grouping.py | 6 ++-- autosubmit/job/job_list.py | 3 +- autosubmit/job/job_list_persistence.py | 5 ++-- autosubmit/job/job_packager.py | 14 ++++----- autosubmit/job/job_utils.py | 8 ++--- autosubmit/monitor/monitor.py | 29 +++++++++---------- autosubmit/notifications/mail_notifier.py | 4 ++- autosubmit/platforms/ecplatform.py | 14 +++++---- autosubmit/platforms/locplatform.py | 11 ++++--- autosubmit/platforms/lsfplatform.py | 2 +- autosubmit/platforms/paramiko_platform.py | 26 ++++++++--------- autosubmit/platforms/paramiko_submitter.py | 21 +++++++------- autosubmit/platforms/pbsplatform.py | 5 ++-- autosubmit/platforms/pjmplatform.py | 3 +- autosubmit/platforms/platform.py | 1 + autosubmit/platforms/psplatform.py | 2 +- autosubmit/platforms/sgeplatform.py | 3 +- autosubmit/platforms/slurmplatform.py | 4 ++- autosubmit/statistics/jobs_stat.py | 2 ++ autosubmit/statistics/utils.py | 5 ++-- 41 files changed, 184 insertions(+), 133 deletions(-) diff --git a/autosubmit/database/db_common.py b/autosubmit/database/db_common.py index 626cfa1e9..bdc381b89 100644 --- a/autosubmit/database/db_common.py +++ b/autosubmit/database/db_common.py @@ -20,11 +20,12 @@ """ Module containing functions to manage autosubmit's database. """ +import multiprocessing import os import sqlite3 -import multiprocessing -import autosubmit -from log.log import Log, AutosubmitCritical, AutosubmitError + +from log.log import Log, AutosubmitCritical + Log.get_logger("Autosubmit") from autosubmitconfigparser.config.basicconfig import BasicConfig diff --git a/autosubmit/database/db_manager.py b/autosubmit/database/db_manager.py index 6e7376b9b..51639f2b7 100644 --- a/autosubmit/database/db_manager.py +++ b/autosubmit/database/db_manager.py @@ -17,8 +17,8 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import sqlite3 import os +import sqlite3 class DbManager(object): diff --git a/autosubmit/database/db_structure.py b/autosubmit/database/db_structure.py index b42854359..f6681b5e6 100644 --- a/autosubmit/database/db_structure.py +++ b/autosubmit/database/db_structure.py @@ -18,13 +18,14 @@ # along with Autosubmit. If not, see . import os - +import sqlite3 import textwrap import traceback -import sqlite3 - from typing import Dict, List + from log.log import Log + + # from networkx import DiGraph # DB_FILE_AS_TIMES = "/esarchive/autosubmit/as_times.db" diff --git a/autosubmit/experiment/experiment_common.py b/autosubmit/experiment/experiment_common.py index 17eba23fc..2ee9c3929 100644 --- a/autosubmit/experiment/experiment_common.py +++ b/autosubmit/experiment/experiment_common.py @@ -21,8 +21,10 @@ Module containing functions to manage autosubmit's experiments. """ import string + import autosubmit.database.db_common as db_common -from log.log import Log,AutosubmitCritical +from log.log import Log, AutosubmitCritical + Log.get_logger("Autosubmit") def new_experiment(description, version, test=False, operational=False): diff --git a/autosubmit/experiment/statistics.py b/autosubmit/experiment/statistics.py index 793210923..1ca50fdbd 100644 --- a/autosubmit/experiment/statistics.py +++ b/autosubmit/experiment/statistics.py @@ -18,9 +18,10 @@ # along with Autosubmit. If not, see . import datetime + from autosubmit.job.job import Job from autosubmit.monitor.utils import FixedSizeList -from log.log import Log, AutosubmitError, AutosubmitCritical +from log.log import Log def timedelta2hours(deltatime): diff --git a/autosubmit/git/autosubmit_git.py b/autosubmit/git/autosubmit_git.py index 976b02f9f..42e795e9b 100644 --- a/autosubmit/git/autosubmit_git.py +++ b/autosubmit/git/autosubmit_git.py @@ -14,18 +14,21 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. +from os import path + # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import locale -from os import path import os -from shutil import rmtree -import subprocess import shutil +import subprocess +from shutil import rmtree +from time import time + # from autosubmit import Autosubmit from autosubmitconfigparser.config.basicconfig import BasicConfig -from time import time -from log.log import Log, AutosubmitCritical, AutosubmitError +from log.log import Log, AutosubmitCritical + Log.get_logger("Autosubmit") diff --git a/autosubmit/helpers/autosubmit_helper.py b/autosubmit/helpers/autosubmit_helper.py index 2aef35c49..cf04f79bc 100644 --- a/autosubmit/helpers/autosubmit_helper.py +++ b/autosubmit/helpers/autosubmit_helper.py @@ -17,16 +17,18 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from log.log import AutosubmitCritical, Log -from time import sleep -from autosubmitconfigparser.config.basicconfig import BasicConfig -from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from autosubmit.history.experiment_history import ExperimentHistory -from autosubmit.database.db_common import check_experiment_exists import datetime import sys +from time import sleep from typing import List +from autosubmit.database.db_common import check_experiment_exists +from autosubmit.history.experiment_history import ExperimentHistory +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from log.log import AutosubmitCritical, Log + + def handle_start_time(start_time): # type: (str) -> None """ Wait until the supplied time. """ diff --git a/autosubmit/helpers/parameters.py b/autosubmit/helpers/parameters.py index 97ddb6235..e52d3d664 100644 --- a/autosubmit/helpers/parameters.py +++ b/autosubmit/helpers/parameters.py @@ -1,6 +1,7 @@ +from collections import defaultdict + import functools import inspect -from collections import defaultdict from typing import Dict PARAMETERS = defaultdict(defaultdict) diff --git a/autosubmit/helpers/utils.py b/autosubmit/helpers/utils.py index fca94a126..5436643c7 100644 --- a/autosubmit/helpers/utils.py +++ b/autosubmit/helpers/utils.py @@ -1,9 +1,8 @@ import os import pwd -from log.log import Log, AutosubmitCritical -from autosubmitconfigparser.config.basicconfig import BasicConfig -from typing import Tuple +from log.log import AutosubmitCritical + def check_experiment_ownership(expid, basic_config, raise_error=False, logger=None): # [A-Za-z09]+ variable is not needed, LOG is global thus it will be read if available diff --git a/autosubmit/history/data_classes/job_data.py b/autosubmit/history/data_classes/job_data.py index bf1d394e5..33b3b8857 100644 --- a/autosubmit/history/data_classes/job_data.py +++ b/autosubmit/history/data_classes/job_data.py @@ -16,11 +16,14 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . +from json import dumps, loads + import collections -import autosubmit.history.utils as HUtils -import autosubmit.history.database_managers.database_models as Models from datetime import datetime, timedelta -from json import dumps , loads + +import autosubmit.history.database_managers.database_models as Models +import autosubmit.history.utils as HUtils + class JobData(object): """ diff --git a/autosubmit/history/database_managers/database_manager.py b/autosubmit/history/database_managers/database_manager.py index ccc702f5c..6e882660c 100644 --- a/autosubmit/history/database_managers/database_manager.py +++ b/autosubmit/history/database_managers/database_manager.py @@ -16,12 +16,13 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import sqlite3 import os -import autosubmit.history.utils as HUtils -import autosubmit.history.database_managers.database_models as Models +import sqlite3 from abc import ABCMeta +import autosubmit.history.database_managers.database_models as Models +import autosubmit.history.utils as HUtils + DEFAULT_JOBDATA_DIR = os.path.join('/esarchive', 'autosubmit', 'as_metadata', 'data') DEFAULT_HISTORICAL_LOGS_DIR = os.path.join('/esarchive', 'autosubmit', 'as_metadata', 'logs') DEFAULT_LOCAL_ROOT_DIR = os.path.join('/esarchive', 'autosubmit') diff --git a/autosubmit/history/database_managers/experiment_history_db_manager.py b/autosubmit/history/database_managers/experiment_history_db_manager.py index 9e5662af6..78e9e23b2 100644 --- a/autosubmit/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit/history/database_managers/experiment_history_db_manager.py @@ -17,10 +17,11 @@ # along with Autosubmit. If not, see . import os import textwrap + import autosubmit.history.utils as HUtils -from . import database_models as Models -from autosubmit.history.data_classes.job_data import JobData from autosubmit.history.data_classes.experiment_run import ExperimentRun +from autosubmit.history.data_classes.job_data import JobData +from . import database_models as Models from .database_manager import DatabaseManager, DEFAULT_JOBDATA_DIR CURRENT_DB_VERSION = 18 diff --git a/autosubmit/history/database_managers/experiment_status_db_manager.py b/autosubmit/history/database_managers/experiment_status_db_manager.py index ed2f9f7b8..4ca93f885 100644 --- a/autosubmit/history/database_managers/experiment_status_db_manager.py +++ b/autosubmit/history/database_managers/experiment_status_db_manager.py @@ -20,10 +20,11 @@ import os import textwrap import time -from .database_manager import DatabaseManager, DEFAULT_LOCAL_ROOT_DIR -from autosubmitconfigparser.config.basicconfig import BasicConfig + import autosubmit.history.utils as HUtils +from autosubmitconfigparser.config.basicconfig import BasicConfig from . import database_models as Models +from .database_manager import DatabaseManager, DEFAULT_LOCAL_ROOT_DIR BasicConfig.read() diff --git a/autosubmit/history/experiment_history.py b/autosubmit/history/experiment_history.py index 7f6a49648..fba54b9e6 100644 --- a/autosubmit/history/experiment_history.py +++ b/autosubmit/history/experiment_history.py @@ -16,17 +16,19 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import traceback +from time import time, sleep + import autosubmit.history.database_managers.database_models as Models import autosubmit.history.utils as HUtils -from time import time, sleep -from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager -from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR -from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, TwoDimWrapperDistributionStrategy, GeneralizedWrapperDistributionStrategy -from .data_classes.job_data import JobData +from autosubmitconfigparser.config.basicconfig import BasicConfig from .data_classes.experiment_run import ExperimentRun -from .platform_monitor.slurm_monitor import SlurmMonitor +from .data_classes.job_data import JobData +from .database_managers.database_manager import DEFAULT_JOBDATA_DIR, DEFAULT_HISTORICAL_LOGS_DIR +from .database_managers.experiment_history_db_manager import ExperimentHistoryDbManager from .internal_logging import Logging -from autosubmitconfigparser.config.basicconfig import BasicConfig +from .platform_monitor.slurm_monitor import SlurmMonitor +from .strategies import PlatformInformationHandler, SingleAssociationStrategy, StraightWrapperAssociationStrategy, \ + TwoDimWrapperDistributionStrategy, GeneralizedWrapperDistributionStrategy SECONDS_WAIT_PLATFORM = 60 diff --git a/autosubmit/history/experiment_status.py b/autosubmit/history/experiment_status.py index 9d4c7deb0..4d17262eb 100644 --- a/autosubmit/history/experiment_status.py +++ b/autosubmit/history/experiment_status.py @@ -17,10 +17,12 @@ # along with Autosubmit. If not, see . import traceback -from .database_managers.experiment_status_db_manager import ExperimentStatusDbManager + +from autosubmitconfigparser.config.basicconfig import BasicConfig from .database_managers.database_manager import DEFAULT_LOCAL_ROOT_DIR, DEFAULT_HISTORICAL_LOGS_DIR +from .database_managers.experiment_status_db_manager import ExperimentStatusDbManager from .internal_logging import Logging -from autosubmitconfigparser.config.basicconfig import BasicConfig + class ExperimentStatus: """ Represents the Experiment Status Mechanism that keeps track of currently active experiments """ diff --git a/autosubmit/history/internal_logging.py b/autosubmit/history/internal_logging.py index f9b667814..39c674097 100644 --- a/autosubmit/history/internal_logging.py +++ b/autosubmit/history/internal_logging.py @@ -16,9 +16,11 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import os + from . import utils as HUtils from .database_managers.database_manager import DEFAULT_HISTORICAL_LOGS_DIR + class Logging: def __init__(self, expid, historiclog_dir_path=DEFAULT_HISTORICAL_LOGS_DIR): self.expid = expid diff --git a/autosubmit/history/platform_monitor/platform_utils.py b/autosubmit/history/platform_monitor/platform_utils.py index 433e654ec..fc00541dc 100644 --- a/autosubmit/history/platform_monitor/platform_utils.py +++ b/autosubmit/history/platform_monitor/platform_utils.py @@ -16,9 +16,8 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import os -from time import mktime from datetime import datetime +from time import mktime SLURM_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S" diff --git a/autosubmit/history/strategies.py b/autosubmit/history/strategies.py index c18ae96f5..d86cd2aa0 100644 --- a/autosubmit/history/strategies.py +++ b/autosubmit/history/strategies.py @@ -16,11 +16,13 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . +import traceback from abc import ABCMeta, abstractmethod + import autosubmit.history.database_managers.database_models as Models -import traceback +from .database_managers.database_manager import DEFAULT_HISTORICAL_LOGS_DIR from .internal_logging import Logging -from .database_managers.database_manager import DEFAULT_LOCAL_ROOT_DIR, DEFAULT_HISTORICAL_LOGS_DIR + class PlatformInformationHandler: def __init__(self, strategy): diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 6c5ddeaf9..02879802e 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -150,7 +150,8 @@ class Job(object): self._platform = None self._queue = None self._partition = None - + self.x11 = None + self.x11_options = None self.retry_delay = "0" self.platform_name = None # type: str #: (str): Type of the job, as given on job configuration file. (job: TASKTYPE) @@ -188,7 +189,6 @@ class Job(object): self.file = None self.additional_files = [] self.executable = None - self.x11 = False self._local_logs = ('', '') self._remote_logs = ('', '') self.script_name = self.name + ".cmd" @@ -685,6 +685,23 @@ class Job(object): """ self.fail_count += 1 + @property + @autosubmit_parameter(name='x11') + def x11(self): + """Whether to use X11 forwarding""" + return self._x11 + @x11.setter + def x11(self, value): + self._x11 = value + + @property + @autosubmit_parameter(name='x11_options') + def x11_options(self): + """Allows to set salloc parameters for x11""" + return self._x11_options + @x11_options.setter + def x11_options(self, value): + self._x11_options = value # Maybe should be renamed to the plural? def add_parent(self, *parents): """ @@ -1278,6 +1295,7 @@ class Job(object): return parameters def update_platform_associated_parameters(self,as_conf, parameters, job_platform, chunk): + self.x11_options = str(as_conf.jobs_data[self.section].get("X11_OPTIONS", as_conf.platforms_data.get(job_platform.name,{}).get("X11_OPTIONS",""))) self.executable = str(as_conf.jobs_data[self.section].get("EXECUTABLE", as_conf.platforms_data.get(job_platform.name,{}).get("EXECUTABLE",""))) self.total_jobs = int(as_conf.jobs_data[self.section].get("TOTALJOBS", job_platform.total_jobs)) self.max_waiting_jobs = int(as_conf.jobs_data[self.section].get("MAXWAITINGJOBS", job_platform.max_waiting_jobs)) diff --git a/autosubmit/job/job_common.py b/autosubmit/job/job_common.py index d705b1d1d..f167d59a8 100644 --- a/autosubmit/job/job_common.py +++ b/autosubmit/job/job_common.py @@ -13,6 +13,8 @@ # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. + +import datetime # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import datetime diff --git a/autosubmit/job/job_dict.py b/autosubmit/job/job_dict.py index e2f673563..e9c3ef491 100644 --- a/autosubmit/job/job_dict.py +++ b/autosubmit/job/job_dict.py @@ -17,11 +17,13 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from autosubmit.job.job import Job from bscearth.utils.date import date2str + +from autosubmit.job.job import Job from autosubmit.job.job_common import Status, Type -from log.log import Log, AutosubmitError, AutosubmitCritical -from collections.abc import Iterable +from log.log import AutosubmitCritical + + class DicJobs: """ Class to create jobs from conf file and to find jobs by start date, member and chunk diff --git a/autosubmit/job/job_grouping.py b/autosubmit/job/job_grouping.py index bcddaf038..ad9a09bf0 100644 --- a/autosubmit/job/job_grouping.py +++ b/autosubmit/job/job_grouping.py @@ -17,9 +17,11 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -from autosubmit.job.job_common import Status -from bscearth.utils.date import date2str import copy +from bscearth.utils.date import date2str + +from autosubmit.job.job_common import Status + class JobGrouping(object): diff --git a/autosubmit/job/job_list.py b/autosubmit/job/job_list.py index 59580672b..cc2862eac 100644 --- a/autosubmit/job/job_list.py +++ b/autosubmit/job/job_list.py @@ -13,6 +13,7 @@ # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. + # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import copy @@ -896,7 +897,7 @@ class JobList(object): members_to = str(filter_.get("MEMBERS_TO", "natural")).lower() chunks_to = str(filter_.get("CHUNKS_TO", "natural")).lower() splits_to = str(filter_.get("SPLITS_TO", "natural")).lower() - + if not is_a_natural_relation: if dates_to == "natural": dates_to = "none" diff --git a/autosubmit/job/job_list_persistence.py b/autosubmit/job/job_list_persistence.py index 7554ddad7..abffe5080 100644 --- a/autosubmit/job/job_list_persistence.py +++ b/autosubmit/job/job_list_persistence.py @@ -14,15 +14,14 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. +import os # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import pickle from sys import setrecursionlimit -import os - -from log.log import Log from autosubmit.database.db_manager import DbManager +from log.log import Log class JobListPersistence(object): diff --git a/autosubmit/job/job_packager.py b/autosubmit/job/job_packager.py index 67e833e27..2fce137a7 100644 --- a/autosubmit/job/job_packager.py +++ b/autosubmit/job/job_packager.py @@ -17,17 +17,17 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import collections -from log.log import Log, AutosubmitCritical, AutosubmitError -from autosubmit.job.job_common import Status, Type +import copy +import operator from bscearth.utils.date import sum_str_hours -from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal, \ - JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal, JobPackageBase -from operator import attrgetter from math import ceil -import operator +from operator import attrgetter from typing import List -import copy +from autosubmit.job.job_common import Status, Type +from autosubmit.job.job_packages import JobPackageSimple, JobPackageVertical, JobPackageHorizontal, \ + JobPackageSimpleWrapped, JobPackageHorizontalVertical, JobPackageVerticalHorizontal, JobPackageBase +from log.log import Log, AutosubmitCritical class JobPackager(object): diff --git a/autosubmit/job/job_utils.py b/autosubmit/job/job_utils.py index 978212273..d7071bb7f 100644 --- a/autosubmit/job/job_utils.py +++ b/autosubmit/job/job_utils.py @@ -19,14 +19,14 @@ import networkx import os - -from networkx.algorithms.dag import is_directed_acyclic_graph from networkx import DiGraph -from networkx import dfs_edges from networkx import NetworkXError +from networkx import dfs_edges +from networkx.algorithms.dag import is_directed_acyclic_graph +from typing import Dict + from autosubmit.job.job_package_persistence import JobPackagePersistence from autosubmitconfigparser.config.basicconfig import BasicConfig -from typing import Dict def transitive_reduction(graph): diff --git a/autosubmit/monitor/monitor.py b/autosubmit/monitor/monitor.py index f1de48885..ed55f64fb 100644 --- a/autosubmit/monitor/monitor.py +++ b/autosubmit/monitor/monitor.py @@ -14,34 +14,31 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# You should have received a copy of the GNU General Public License -# along with Autosubmit. If not, see . -import datetime -import os -import time -import sys -from os import path from os import chdir from os import listdir +from os import path from os import remove -import py3dotplus as pydotplus import copy - +# You should have received a copy of the GNU General Public License +# along with Autosubmit. If not, see . +import datetime +import os +import py3dotplus as pydotplus import subprocess -import autosubmit.history.utils as HUtils -import autosubmit.helpers.utils as HelperUtils +import sys +import time +from typing import Dict, List -from autosubmit.job.job_common import Status +import autosubmit.helpers.utils as HelperUtils +import autosubmit.history.utils as HUtils from autosubmit.job.job import Job +from autosubmit.job.job_common import Status from autosubmitconfigparser.config.basicconfig import BasicConfig from autosubmitconfigparser.config.configcommon import AutosubmitConfig - -from log.log import Log, AutosubmitCritical from autosubmitconfigparser.config.yamlparser import YAMLParserFactory - +from log.log import Log, AutosubmitCritical from .diagram import create_bar_diagram -from typing import Dict, List GENERAL_STATS_OPTION_MAX_LENGTH = 1000 diff --git a/autosubmit/notifications/mail_notifier.py b/autosubmit/notifications/mail_notifier.py index 45c01c1c9..be8e0dcfd 100644 --- a/autosubmit/notifications/mail_notifier.py +++ b/autosubmit/notifications/mail_notifier.py @@ -17,11 +17,13 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import smtplib import email.utils +import smtplib from email.mime.text import MIMEText + from log.log import Log + class MailNotifier: def __init__(self, basic_config): self.config = basic_config diff --git a/autosubmit/platforms/ecplatform.py b/autosubmit/platforms/ecplatform.py index 99c44aa7a..517eed2a1 100644 --- a/autosubmit/platforms/ecplatform.py +++ b/autosubmit/platforms/ecplatform.py @@ -14,19 +14,21 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. +import locale # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import locale import os import subprocess -from autosubmit.platforms.paramiko_platform import ParamikoPlatform, ParamikoPlatformException -from log.log import Log,AutosubmitError -from autosubmit.platforms.headers.ec_header import EcHeader +from time import sleep + from autosubmit.platforms.headers.ec_cca_header import EcCcaHeader +from autosubmit.platforms.headers.ec_header import EcHeader from autosubmit.platforms.headers.slurm_header import SlurmHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform, ParamikoPlatformException from autosubmit.platforms.wrappers.wrapper_factory import EcWrapperFactory -from time import sleep -import locale +from log.log import Log, AutosubmitError + + class EcPlatform(ParamikoPlatform): """ Class to manage queues with ecaccess diff --git a/autosubmit/platforms/locplatform.py b/autosubmit/platforms/locplatform.py index 2950a7176..543e8b631 100644 --- a/autosubmit/platforms/locplatform.py +++ b/autosubmit/platforms/locplatform.py @@ -18,16 +18,15 @@ # along with Autosubmit. If not, see . import locale import os -from xml.dom.minidom import parseString import subprocess +from time import sleep +from xml.dom.minidom import parseString - -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.local_header import LocalHeader - +from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmitconfigparser.config.basicconfig import BasicConfig -from time import sleep -from log.log import Log, AutosubmitError, AutosubmitCritical +from log.log import Log, AutosubmitError + class LocalPlatform(ParamikoPlatform): """ diff --git a/autosubmit/platforms/lsfplatform.py b/autosubmit/platforms/lsfplatform.py index a03ec5dee..c401bdb32 100644 --- a/autosubmit/platforms/lsfplatform.py +++ b/autosubmit/platforms/lsfplatform.py @@ -19,8 +19,8 @@ import os -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.lsf_header import LsfHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.wrappers.wrapper_factory import LSFWrapperFactory diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index cfca6e3db..8808b2048 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1,25 +1,23 @@ +import Xlib.support.connect as xlib_connect +import datetime import locale -from binascii import hexlify -from contextlib import suppress -from time import sleep -import sys -import socket import os import paramiko -import datetime -import time -import select +import random import re +import select +import socket +import sys +from bscearth.utils.date import date2str from datetime import timedelta -import random +from paramiko.ssh_exception import (SSHException) +from threading import Thread +from time import sleep + from autosubmit.job.job_common import Status from autosubmit.job.job_common import Type from autosubmit.platforms.platform import Platform -from bscearth.utils.date import date2str from log.log import AutosubmitError, AutosubmitCritical, Log -from paramiko.ssh_exception import (SSHException) -import Xlib.support.connect as xlib_connect -from threading import Thread def threaded(fn): @@ -898,7 +896,7 @@ class ParamikoPlatform(Platform): chan.settimeout(timeout) if x11 == "true": command = command + " ; sleep infinity" - chan.exec_command(command) + chan.exec_command(command,x11) chan_fileno = chan.fileno() self.poller.register(chan_fileno, select.POLLIN) self.x11_status_checker(chan, chan_fileno) diff --git a/autosubmit/platforms/paramiko_submitter.py b/autosubmit/platforms/paramiko_submitter.py index 534a64a42..8589f6813 100644 --- a/autosubmit/platforms/paramiko_submitter.py +++ b/autosubmit/platforms/paramiko_submitter.py @@ -18,22 +18,23 @@ # along with Autosubmit. If not, see . -import os from collections import defaultdict -from log.log import Log,AutosubmitCritical,AutosubmitError -from autosubmitconfigparser.config.basicconfig import BasicConfig -from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from .submitter import Submitter -from autosubmit.platforms.psplatform import PsPlatform +import os + +from autosubmit.platforms.ecplatform import EcPlatform +from autosubmit.platforms.locplatform import LocalPlatform from autosubmit.platforms.lsfplatform import LsfPlatform +from autosubmit.platforms.paramiko_platform import ParamikoPlatformException from autosubmit.platforms.pbsplatform import PBSPlatform +from autosubmit.platforms.pjmplatform import PJMPlatform +from autosubmit.platforms.psplatform import PsPlatform from autosubmit.platforms.sgeplatform import SgePlatform -from autosubmit.platforms.ecplatform import EcPlatform from autosubmit.platforms.slurmplatform import SlurmPlatform -from autosubmit.platforms.pjmplatform import PJMPlatform -from autosubmit.platforms.locplatform import LocalPlatform -from autosubmit.platforms.paramiko_platform import ParamikoPlatformException +from autosubmitconfigparser.config.basicconfig import BasicConfig +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from log.log import Log, AutosubmitError +from .submitter import Submitter class ParamikoSubmitter(Submitter): diff --git a/autosubmit/platforms/pbsplatform.py b/autosubmit/platforms/pbsplatform.py index 132b8715c..9960e4246 100644 --- a/autosubmit/platforms/pbsplatform.py +++ b/autosubmit/platforms/pbsplatform.py @@ -19,12 +19,11 @@ import os -from autosubmit.platforms.paramiko_platform import ParamikoPlatform -from log.log import Log, AutosubmitCritical - from autosubmit.platforms.headers.pbs10_header import Pbs10Header from autosubmit.platforms.headers.pbs11_header import Pbs11Header from autosubmit.platforms.headers.pbs12_header import Pbs12Header +from autosubmit.platforms.paramiko_platform import ParamikoPlatform +from log.log import Log, AutosubmitCritical class PBSPlatform(ParamikoPlatform): diff --git a/autosubmit/platforms/pjmplatform.py b/autosubmit/platforms/pjmplatform.py index 3b88cbe31..7acca0fe7 100644 --- a/autosubmit/platforms/pjmplatform.py +++ b/autosubmit/platforms/pjmplatform.py @@ -23,11 +23,12 @@ from time import sleep from typing import List, Union from autosubmit.job.job_common import Status -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.pjm_header import PJMHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.wrappers.wrapper_factory import PJMWrapperFactory from log.log import AutosubmitCritical, AutosubmitError, Log + class PJMPlatform(ParamikoPlatform): """ Class to manage jobs to host using PJM scheduler diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index 79ea7919a..b2241b7e7 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -62,6 +62,7 @@ class Platform(object): self._submit_hold_cmd = None self._submit_command_name = None self._submit_cmd = None + self._submit_cmd_x11 = None self._checkhost_cmd = None self.cancel_cmd = None diff --git a/autosubmit/platforms/psplatform.py b/autosubmit/platforms/psplatform.py index 2a5fe05d5..d68ba3216 100644 --- a/autosubmit/platforms/psplatform.py +++ b/autosubmit/platforms/psplatform.py @@ -20,8 +20,8 @@ import os from xml.dom.minidom import parseString -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.ps_header import PsHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform class PsPlatform(ParamikoPlatform): diff --git a/autosubmit/platforms/sgeplatform.py b/autosubmit/platforms/sgeplatform.py index 58671cd98..662d58cb2 100644 --- a/autosubmit/platforms/sgeplatform.py +++ b/autosubmit/platforms/sgeplatform.py @@ -19,11 +19,10 @@ import os import subprocess - from xml.dom.minidom import parseString -from autosubmit.platforms.paramiko_platform import ParamikoPlatform from autosubmit.platforms.headers.sge_header import SgeHeader +from autosubmit.platforms.paramiko_platform import ParamikoPlatform class SgePlatform(ParamikoPlatform): diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index a64d386e8..cd7b4b862 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -49,6 +49,7 @@ class SlurmPlatform(ParamikoPlatform): self._submit_hold_cmd = None self._submit_command_name = None self._submit_cmd = None + self.x11_options = None self._checkhost_cmd = None self.cancel_cmd = None self._header = SlurmHeader() @@ -280,6 +281,7 @@ class SlurmPlatform(ParamikoPlatform): self._checkhost_cmd = "echo 1" self._submit_cmd = 'sbatch -D {1} {1}/'.format( self.host, self.remote_log_dir) + self._submit_cmd_x11 = f'salloc -D {self.remote_log_dir} {self.remote_log_dir}' self._submit_command_name = "sbatch" self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format( self.host, self.remote_log_dir) @@ -524,7 +526,7 @@ class SlurmPlatform(ParamikoPlatform): if x11 == "true": if not hold: - return export + self._submit_cmd + job_script + return f'{export.strip("")} {self._submit_cmd_x11} {job_script.strip("")} {job.x11_options.strip("")}' else: return export + self._submit_hold_cmd + job_script else: diff --git a/autosubmit/statistics/jobs_stat.py b/autosubmit/statistics/jobs_stat.py index b2d1de97b..c0338fe18 100644 --- a/autosubmit/statistics/jobs_stat.py +++ b/autosubmit/statistics/jobs_stat.py @@ -1,7 +1,9 @@ #!/bin/env/python from datetime import datetime, timedelta + from .utils import timedelta2hours + class JobStat(object): def __init__(self, name, processors, wallclock, section, date, member, chunk): # type: (str, int, float, str, str, str, str) -> None diff --git a/autosubmit/statistics/utils.py b/autosubmit/statistics/utils.py index 4dc9acf2f..a0260b875 100644 --- a/autosubmit/statistics/utils.py +++ b/autosubmit/statistics/utils.py @@ -1,12 +1,13 @@ #!/bin/env/python import math -from autosubmit.job.job import Job from datetime import datetime, timedelta -from autosubmit.job.job_common import Status from typing import List, Tuple +from autosubmit.job.job import Job +from autosubmit.job.job_common import Status from log.log import AutosubmitCritical + def filter_by_section(jobs, section): # type: (List[Job], str) -> List[Job] """ Filter jobs by provided section """ -- GitLab From 4fe7500bfe2faba5f5a8c4a7cc64cc8339890e2c Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 9 Aug 2023 15:36:36 +0200 Subject: [PATCH 10/20] Trying to making x11 work --- autosubmit/platforms/paramiko_platform.py | 7 ++++--- autosubmit/platforms/slurmplatform.py | 10 ++++++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 8808b2048..4411442f0 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -885,18 +885,19 @@ class ParamikoPlatform(Platform): """ while retries > 0: try: - chan = self.transport.open_session() + chan = self.transport.open_session(timeout=10) if x11 == "true": display = os.getenv('DISPLAY') if display is None or not display: display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) - chan.request_x11(handler=self.x11_handler) + chan.request_x11(screen_number=self.local_x11_display[:4],single_connection=False,handler=self.x11_handler) else: chan.settimeout(timeout) if x11 == "true": command = command + " ; sleep infinity" - chan.exec_command(command,x11) + #command = command + chan.exec_command(command) chan_fileno = chan.fileno() self.poller.register(chan_fileno, select.POLLIN) self.x11_status_checker(chan, chan_fileno) diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index cd7b4b862..e5b751e8f 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -281,7 +281,7 @@ class SlurmPlatform(ParamikoPlatform): self._checkhost_cmd = "echo 1" self._submit_cmd = 'sbatch -D {1} {1}/'.format( self.host, self.remote_log_dir) - self._submit_cmd_x11 = f'salloc -D {self.remote_log_dir} {self.remote_log_dir}' + self._submit_cmd_x11 = f'-D {self.remote_log_dir} {self.remote_log_dir}' self._submit_command_name = "sbatch" self._submit_hold_cmd = 'sbatch -H -D {1} {1}/'.format( self.host, self.remote_log_dir) @@ -290,6 +290,12 @@ class SlurmPlatform(ParamikoPlatform): self.get_cmd = "scp" self.mkdir_cmd = "mkdir -p " + self.remote_log_dir + def get_submit_cmd_x11(self, args, script_name): + """ + Returns the submit command for the platform + """ + return f'salloc {args} {self._submit_cmd_x11}/{script_name}' + def hold_job(self, job): try: cmd = "scontrol release {0} ; sleep 2 ; scontrol hold {0} ".format(job.id) @@ -526,7 +532,7 @@ class SlurmPlatform(ParamikoPlatform): if x11 == "true": if not hold: - return f'{export.strip("")} {self._submit_cmd_x11} {job_script.strip("")} {job.x11_options.strip("")}' + return self.get_submit_cmd_x11(job.x11_options.strip(""), job_script.strip("")) else: return export + self._submit_hold_cmd + job_script else: -- GitLab From 67e2ba4f9c1bcbaea19c5805fdc8d7c59120d81e Mon Sep 17 00:00:00 2001 From: dbeltran Date: Thu, 10 Aug 2023 10:33:20 +0200 Subject: [PATCH 11/20] Not working but some improvements --- autosubmit/platforms/paramiko_platform.py | 47 ++++++++++++++--------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 4411442f0..ae3cb4346 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -18,7 +18,7 @@ from autosubmit.job.job_common import Status from autosubmit.job.job_common import Type from autosubmit.platforms.platform import Platform from log.log import AutosubmitError, AutosubmitCritical, Log - +import re def threaded(fn): def wrapper(*args, **kwargs): @@ -832,6 +832,8 @@ class ParamikoPlatform(Platform): @threaded def x11_status_checker(self, session, session_fileno): self.transport.accept() + Log.warning(f'x11_status_checker() {session_fileno} {session}') + while not session.exit_status_ready(): try: if sys.platform != "linux": @@ -852,6 +854,7 @@ class ParamikoPlatform(Platform): try: # forward data between local/remote x11 socket. data = channel.recv(4096) + Log.warning(data) counterpart.sendall(data) except socket.error: channel.close() @@ -860,7 +863,6 @@ class ParamikoPlatform(Platform): except Exception as e: pass - def exec_command(self, command, bufsize=-1, timeout=None, get_pty=False,retries=3, x11=False): """ Execute a command on the SSH server. A new `.Channel` is opened and @@ -885,19 +887,24 @@ class ParamikoPlatform(Platform): """ while retries > 0: try: - chan = self.transport.open_session(timeout=10) if x11 == "true": display = os.getenv('DISPLAY') if display is None or not display: display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) - chan.request_x11(screen_number=self.local_x11_display[:4],single_connection=False,handler=self.x11_handler) + chan = self.transport.open_session() + chan.request_x11(screen_number=self.local_x11_display[3],single_connection=False,handler=self.x11_handler) else: + chan = self.transport.open_session() chan.settimeout(timeout) if x11 == "true": - command = command + " ; sleep infinity" - #command = command - chan.exec_command(command) + #command = command + " ; sleep infinity 2>/dev/null" + command = command + Log.info(command) + try: + chan.exec_command(command) + except BaseException as e: + raise AutosubmitCritical("Failed to execute command: %s" % e) chan_fileno = chan.fileno() self.poller.register(chan_fileno, select.POLLIN) self.x11_status_checker(chan, chan_fileno) @@ -915,14 +922,7 @@ class ParamikoPlatform(Platform): retries = retries - 1 if retries <= 0: return False , False, False - def exec_command_x11(self, command, bufsize=-1, timeout=None, get_pty=False,retries=3, x11=False): - session = self.transport.open_session() - session.request_x11(handler=self.x11_handler) - session.exec_command(command + " ; sleep infinity") - session_fileno = session.fileno() - self.poller.register(session_fileno, select.POLLIN) - self.x11_status_checker(session, session_fileno) - pass + def send_command(self, command, ignore_log=False, x11 = False): """ Sends given command to HPC @@ -955,7 +955,7 @@ class ParamikoPlatform(Platform): channel.settimeout(timeout) stdin.close() channel.shutdown_write() - stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) + stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): # stop if channel was closed prematurely, and there is no data in the buffers. got_chunk = False @@ -972,9 +972,17 @@ class ParamikoPlatform(Platform): stderr.channel.recv_stderr(len(c.in_stderr_buffer))) #stdout_chunks.append(" ") got_chunk = True - if x11 == "true": - got_chunk = True - break + if x11 == "true": + if len(stderr_readlines) > 0: + job_id = re.findall(r'\d+', str(stderr_readlines[0]))[0] + stdout_chunks.append(job_id) + print(job_id) + stderr_readlines = [] + break + Log.info(f'stdout_chunks: {stdout_chunks} stderr_readlines: {stderr_readlines}') + # if x11 == "true": + # got_chunk = True + # break if not got_chunk and stdout.channel.exit_status_ready() and not stderr.channel.recv_stderr_ready() and not stdout.channel.recv_ready(): # indicate that we're not going to read from this channel anymore stdout.channel.shutdown_read() @@ -1014,6 +1022,7 @@ class ParamikoPlatform(Platform): else: pass #Log.debug('Command {0} in {1} successful with out message: {2}', command, self.host, self._ssh_output) + Log.info("reached send_command") return True except AttributeError as e: raise AutosubmitError( -- GitLab From bc5f48de21aeec1932af3f3df949e69dab55a613 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 29 Aug 2023 13:06:08 +0200 Subject: [PATCH 12/20] x11 ( xclock working) --- autosubmit/platforms/paramiko_platform.py | 48 +++++++++++------------ autosubmit/platforms/platform.py | 3 +- autosubmit/platforms/slurmplatform.py | 8 ++-- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index ae3cb4346..c90633ffd 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -825,27 +825,30 @@ class ParamikoPlatform(Platform): self.transport._queue_incoming_channel(channel) def flush_out(self,session): + #flush_out is not working fine + while session.recv_ready(): - sys.stdout.write(session.recv(4096)) + sys.stdout.write(session.recv(4096).decode(locale.getlocale()[1])) while session.recv_stderr_ready(): - sys.stderr.write(session.recv_stderr(4096)) + sys.stderr.write(session.recv_stderr(4096).decode(locale.getlocale()[1])) + @threaded def x11_status_checker(self, session, session_fileno): + poller = None self.transport.accept() - Log.warning(f'x11_status_checker() {session_fileno} {session}') - while not session.exit_status_ready(): try: - if sys.platform != "linux": - self.poller = self.poller.kqueue() - else: - self.poller = self.poller.poll() + if type(self.poller) is not list: + if sys.platform != "linux": + poller = self.poller.kqueue() + else: + poller = self.poller.poll() # accept subsequent x11 connections if any if len(self.transport.server_accepts) > 0: self.transport.accept() - if not self.poller: # this should not happen, as we don't have a timeout. + if not poller: # this should not happen, as we don't have a timeout. break - for fd, event in self.poller: + for fd, event in poller: if fd == session_fileno: self.flush_out(session) # data either on local/remote x11 socket @@ -854,7 +857,6 @@ class ParamikoPlatform(Platform): try: # forward data between local/remote x11 socket. data = channel.recv(4096) - Log.warning(data) counterpart.sendall(data) except socket.error: channel.close() @@ -893,13 +895,13 @@ class ParamikoPlatform(Platform): display = "localhost:0" self.local_x11_display = xlib_connect.get_display(display) chan = self.transport.open_session() - chan.request_x11(screen_number=self.local_x11_display[3],single_connection=False,handler=self.x11_handler) + chan.request_x11(single_connection=False,handler=self.x11_handler) else: chan = self.transport.open_session() chan.settimeout(timeout) if x11 == "true": - #command = command + " ; sleep infinity 2>/dev/null" - command = command + command = f'{command} ; sleep infinity 2>/dev/null' + #command = f'export display {command}' Log.info(command) try: chan.exec_command(command) @@ -956,8 +958,12 @@ class ParamikoPlatform(Platform): stdin.close() channel.shutdown_write() stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer))) - while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): + i = 0 + x11_exit = False + while (not channel.closed or channel.recv_ready() or channel.recv_stderr_ready() ) and not x11_exit: # stop if channel was closed prematurely, and there is no data in the buffers. + i = i+1 + print(i) got_chunk = False readq, _, _ = select.select([stdout.channel], [], [], 2) for c in readq: @@ -975,16 +981,13 @@ class ParamikoPlatform(Platform): if x11 == "true": if len(stderr_readlines) > 0: job_id = re.findall(r'\d+', str(stderr_readlines[0]))[0] - stdout_chunks.append(job_id) + stdout_chunks.append(job_id.encode(lang)) print(job_id) stderr_readlines = [] + x11_exit = True break - Log.info(f'stdout_chunks: {stdout_chunks} stderr_readlines: {stderr_readlines}') - # if x11 == "true": - # got_chunk = True - # break if not got_chunk and stdout.channel.exit_status_ready() and not stderr.channel.recv_stderr_ready() and not stdout.channel.recv_ready(): - # indicate that we're not going to read from this channel anymore + # indicate that we're not going to read from this channel any more stdout.channel.shutdown_read() # close the channel stdout.channel.close() @@ -993,8 +996,6 @@ class ParamikoPlatform(Platform): if not x11: stdout.close() stderr.close() - - self._ssh_output = "" self._ssh_output_err = "" for s in stdout_chunks: @@ -1002,7 +1003,6 @@ class ParamikoPlatform(Platform): self._ssh_output += s.decode(lang) for errorLineCase in stderr_readlines: self._ssh_output_err += errorLineCase.decode(lang) - errorLine = errorLineCase.lower().decode(lang) if "not active" in errorLine: raise AutosubmitError( diff --git a/autosubmit/platforms/platform.py b/autosubmit/platforms/platform.py index b2241b7e7..20cb28d4e 100644 --- a/autosubmit/platforms/platform.py +++ b/autosubmit/platforms/platform.py @@ -266,7 +266,8 @@ class Platform(object): save = True if not inspect: job_list.save() - valid_packages_to_submit.append(package) + if package.x11 != "true": + valid_packages_to_submit.append(package) # Log.debug("FD end-submit: {0}".format(log.fd_show.fd_table_status_str(open())) except (IOError, OSError): if package.jobs[0].id != 0: diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index e5b751e8f..50e2b2794 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -504,12 +504,12 @@ class SlurmPlatform(ParamikoPlatform): if outputlines.find("failed") != -1: raise AutosubmitCritical( "Submission failed. Command Failed", 7014) - jobs_id = [] - for output in outputlines.splitlines(): - jobs_id.append(int(output.split(' ')[3])) if x11 == "true": - return jobs_id[0] + return int(outputlines.splitlines()[0]) else: + jobs_id = [] + for output in outputlines.splitlines(): + jobs_id.append(int(output.split(' ')[3])) return jobs_id except IndexError: raise AutosubmitCritical( -- GitLab From 0cdc63568651c992c09fd1d2e87cd6fa653dd358 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 29 Aug 2023 13:35:18 +0200 Subject: [PATCH 13/20] wxparaver working --- autosubmit/job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/job/job.py b/autosubmit/job/job.py index 02879802e..79beb3dbb 100644 --- a/autosubmit/job/job.py +++ b/autosubmit/job/job.py @@ -1543,7 +1543,7 @@ class Job(object): template_file.close() else: if self.type == Type.BASH: - template = 'xclock' + template = 'sleep 5' elif self.type == Type.PYTHON2: template = 'time.sleep(5)' + "\n" elif self.type == Type.PYTHON3 or self.type == Type.PYTHON: -- GitLab From dbe574c683235a38831ecd8bf801bb4919f9eca6 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 29 Aug 2023 14:06:50 +0200 Subject: [PATCH 14/20] docs --- docs/source/userguide/configure/index.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/source/userguide/configure/index.rst b/docs/source/userguide/configure/index.rst index b0a000bc0..0b12f29ef 100644 --- a/docs/source/userguide/configure/index.rst +++ b/docs/source/userguide/configure/index.rst @@ -111,7 +111,6 @@ To do this use: * NODES: nodes number to be submitted to the HPC. If not specified, the directive is not added. - * HYPERTHREADING: Enables Hyper-threading, this will double the max amount of threads. defaults to false. ( Not available on slurm platforms ) * QUEUE: queue to add the job to. If not specified, uses PLATFORM default. @@ -147,6 +146,10 @@ There are also other, less used features that you can use: * QUEUE: queue to add the job to. If not specified, uses PLATFORM default. +* X11: Allows to run a job with X11 forwarding using salloc. + +* X11_OPTIONS: Allows to set salloc ( X11 ) options. EX: "-n 4 --qos=debug --partition=debug --x11=all --time=00:30:00" + How to configure email notifications ------------------------------------ -- GitLab From 227e3ceb532a2f98e61e1e24421169ed1ee33523 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Tue, 29 Aug 2023 14:20:05 +0200 Subject: [PATCH 15/20] version --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index e89c23ab4..b2cb6d09d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.0.87 +4.0.88 -- GitLab From 954f293e4fdb5a8474c58a2311fb36f5ee744e1f Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 30 Aug 2023 10:07:28 +0200 Subject: [PATCH 16/20] timeout for psplatform was higher than intented --- autosubmit/platforms/paramiko_platform.py | 8 ++------ autosubmit/platforms/psplatform.py | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 8cb8c4250..9f6c15f18 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -900,7 +900,8 @@ class ParamikoPlatform(Platform): chan = self.transport.open_session() chan.settimeout(timeout) if x11 == "true": - command = f'{command} ; sleep infinity 2>/dev/null' + + command = f'{command} ; sleep {command.split(" ")[1]} 2>/dev/null' #command = f'export display {command}' Log.info(command) try: @@ -962,21 +963,17 @@ class ParamikoPlatform(Platform): x11_exit = False while (not channel.closed or channel.recv_ready() or channel.recv_stderr_ready() ) and not x11_exit: # stop if channel was closed prematurely, and there is no data in the buffers. - i = i+1 - print(i) got_chunk = False readq, _, _ = select.select([stdout.channel], [], [], 2) for c in readq: if c.recv_ready(): stdout_chunks.append( stdout.channel.recv(len(c.in_buffer))) - #stdout_chunks.append(" ") got_chunk = True if c.recv_stderr_ready(): # make sure to read stderr to prevent stall stderr_readlines.append( stderr.channel.recv_stderr(len(c.in_stderr_buffer))) - #stdout_chunks.append(" ") got_chunk = True if x11 == "true": if len(stderr_readlines) > 0: @@ -1022,7 +1019,6 @@ class ParamikoPlatform(Platform): else: pass #Log.debug('Command {0} in {1} successful with out message: {2}', command, self.host, self._ssh_output) - Log.info("reached send_command") return True except AttributeError as e: raise AutosubmitError( diff --git a/autosubmit/platforms/psplatform.py b/autosubmit/platforms/psplatform.py index d68ba3216..433eb4063 100644 --- a/autosubmit/platforms/psplatform.py +++ b/autosubmit/platforms/psplatform.py @@ -97,7 +97,7 @@ class PsPlatform(ParamikoPlatform): def get_submit_cmd(self, job_script, job, hold=False, export=""): wallclock = self.parse_time(job.wallclock) - seconds = int(wallclock.days * 86400 + wallclock.seconds * 60) + seconds = int(wallclock.days * 86400 + wallclock.seconds) * 1.30 if export == "none" or export == "None" or export is None or export == "": export = "" else: -- GitLab From aa79d4c774abcc3594caf04f202ec912b1d49a84 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 30 Aug 2023 10:16:00 +0200 Subject: [PATCH 17/20] sleep timeout only for ps now --- autosubmit/platforms/paramiko_platform.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 9f6c15f18..a6f1a5a39 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -362,7 +362,7 @@ class ParamikoPlatform(Platform): except Exception as e: try: os.remove(file_path) - except Exception as e: + except Exception as ep: pass if str(e) in "Garbage": if not ignore_log: @@ -900,8 +900,11 @@ class ParamikoPlatform(Platform): chan = self.transport.open_session() chan.settimeout(timeout) if x11 == "true": - - command = f'{command} ; sleep {command.split(" ")[1]} 2>/dev/null' + if not "salloc" in command: + timeout_command = command.split(" ")[1] + if timeout_command == 0: + timeout_command = "infinity" + command = f'{command} ; sleep {timeout_command} 2>/dev/null' #command = f'export display {command}' Log.info(command) try: -- GitLab From a9675aa0a179622f80ec477b633c7ef3cf2af94a Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 30 Aug 2023 12:08:59 +0200 Subject: [PATCH 18/20] export option now works with x11 --- autosubmit/platforms/paramiko_platform.py | 2 +- autosubmit/platforms/slurmplatform.py | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index a6f1a5a39..c8992004b 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -900,7 +900,7 @@ class ParamikoPlatform(Platform): chan = self.transport.open_session() chan.settimeout(timeout) if x11 == "true": - if not "salloc" in command: + if "timeout" in command: timeout_command = command.split(" ")[1] if timeout_command == 0: timeout_command = "infinity" diff --git a/autosubmit/platforms/slurmplatform.py b/autosubmit/platforms/slurmplatform.py index 50e2b2794..d83aed18b 100644 --- a/autosubmit/platforms/slurmplatform.py +++ b/autosubmit/platforms/slurmplatform.py @@ -531,10 +531,7 @@ class SlurmPlatform(ParamikoPlatform): x11 = job.x11 if x11 == "true": - if not hold: - return self.get_submit_cmd_x11(job.x11_options.strip(""), job_script.strip("")) - else: - return export + self._submit_hold_cmd + job_script + return export + self.get_submit_cmd_x11(job.x11_options.strip(""), job_script.strip("")) else: try: lang = locale.getlocale()[1] -- GitLab From 57446a9dc33d846060a35a95e709dd385dba8c2a Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 30 Aug 2023 12:38:48 +0200 Subject: [PATCH 19/20] sleep timeout only for ps now --- autosubmit/platforms/paramiko_platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index c8992004b..30f6c5889 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -901,7 +901,7 @@ class ParamikoPlatform(Platform): chan.settimeout(timeout) if x11 == "true": if "timeout" in command: - timeout_command = command.split(" ")[1] + timeout_command = command.split("timeout ")[1].split(" ")[0] if timeout_command == 0: timeout_command = "infinity" command = f'{command} ; sleep {timeout_command} 2>/dev/null' -- GitLab From 7cf5d01c681ebbf163daa6095d48a64368664ad9 Mon Sep 17 00:00:00 2001 From: dbeltran Date: Wed, 30 Aug 2023 14:18:21 +0200 Subject: [PATCH 20/20] pynalc for hubs --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f96a9a0c9..676a96fc1 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ setup( url='http://www.bsc.es/projects/earthscience/autosubmit/', download_url='https://earth.bsc.es/wiki/doku.php?id=tools:autosubmit', keywords=['climate', 'weather', 'workflow', 'HPC'], - install_requires=['ruamel.yaml==0.17.21','cython','autosubmitconfigparser','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','argparse>=1.4.0','python-dateutil>=2.8.2','matplotlib<3.6','numpy<1.22','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','pythondialog','pytest','nose','coverage','PyNaCl>=1.4.0','Pygments','psutil'], + install_requires=['PyNaCl>=1.5.0','ruamel.yaml==0.17.21','cython','autosubmitconfigparser','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','argparse>=1.4.0','python-dateutil>=2.8.2','matplotlib<3.6','numpy<1.22','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','pythondialog','pytest','nose','coverage','Pygments','psutil'], classifiers=[ "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.9", -- GitLab