diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8d3516c370e3905522c6e205f41e559005894c1c..93a2d24c5a4c50a502214985f9adbf687f4ff46b 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -57,6 +57,7 @@ from .notifications.notifier import Notifier from .platforms.paramiko_submitter import ParamikoSubmitter from .platforms.platform import Platform from .migrate.migrate import Migrate +from .generators import Engine, get_engine_generator dialog = None from time import sleep @@ -87,7 +88,6 @@ import autosubmit.history.utils as HUtils import autosubmit.helpers.autosubmit_helper as AutosubmitHelper import autosubmit.statistics.utils as StatisticsUtils from autosubmit.helpers.utils import proccess_id, terminate_child_process, check_jobs_file_exists - from contextlib import suppress """ @@ -684,6 +684,18 @@ class Autosubmit: help='Select the status (one or more) to filter the list of jobs.') subparser.add_argument('-t', '--target', type=str, default="FAILED", metavar='STATUS', help='Final status of killed jobs. Default is FAILED.') + + # Generate + subparser = subparsers.add_parser( + 'generate', description='Generate a workflow definition for a different workflow engine', + argument_default=argparse.SUPPRESS) + subparser.add_argument('expid', help='experiment identifier') + subsubparser = subparser.add_subparsers(title="engines", dest='engine', required=True, description='Workflow engine identifier') + for engine in Engine: + generator_class = get_engine_generator(engine) + parser_engine = subsubparser.add_parser(engine.value, help=f"{generator_class.get_engine_name()}") + generator_class.add_parse_args(parser_engine) + args, unknown = parser.parse_known_args() if args.version: Log.info(Autosubmit.autosubmit_version) @@ -694,6 +706,7 @@ class Autosubmit: except SystemExit as e: return 1 except BaseException as e: + raise e raise AutosubmitCritical( "Incorrect arguments for this command", 7011) @@ -790,6 +803,9 @@ class Autosubmit: return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect) elif args.command == 'stop': return Autosubmit.stop(args.expid, args.force, args.all, args.force_all, args.cancel, args.filter_status, args.target) + elif args.command == 'generate': + return Autosubmit.generate_workflow(args.expid, Engine[args.engine], args) + @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): Log.set_console_level(console_level) @@ -6104,5 +6120,47 @@ class Autosubmit: terminate_child_process(expid) + @staticmethod + def generate_workflow(expid: str, engine: Engine, args: argparse.Namespace) -> None: + """Generate the workflow configuration for a different backend engine.""" + Log.info(f'Generate workflow configuration for {engine}') + + # TODO check the code below, if it makes sense, this I have not touched from original MR + try: + Log.info("Getting job list...") + as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) + as_conf.check_conf_files(False) + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if len(submitter.platforms) == 0: + raise ValueError('Missing platform!') + + packages_persistence = JobPackagePersistence( + os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=False, monitor=False) + + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + + hpc_architecture = as_conf.get_platform() + for job in job_list.get_job_list(): + if job.platform_name is None or job.platform_name == '': + job.platform_name = hpc_architecture + job.platform = submitter.platforms[job.platform_name] + job.update_parameters(as_conf, job_list.parameters) + + job_list.check_scripts(as_conf) + except AutosubmitError as e: + raise AutosubmitCritical(e.message, e.code, e.trace) + except AutosubmitCritical as e: + raise + except BaseException as e: + raise AutosubmitCritical("Error while checking the configuration files or loading the job_list", 7040, + str(e)) + generator_class = get_engine_generator(engine) + parser = argparse.ArgumentParser() + generator_class.add_parse_args(parser) + generator_input_keys = vars(parser.parse_args('')).keys() + generator_kwargs = {key: args.__getattribute__(key) for key in generator_input_keys} + generator_class.generate(job_list, as_conf, **generator_kwargs) diff --git a/autosubmit/generators/__init__.py b/autosubmit/generators/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1878d74b99f774b129363bb6931ca7f5596df194 --- /dev/null +++ b/autosubmit/generators/__init__.py @@ -0,0 +1,47 @@ +from enum import Enum +from importlib import import_module +from typing import AbstractSet, Callable, cast +from abc import ABC, abstractmethod + + +"""This module provides generators to produce workflow configurations for different backend engines.""" + +class Engine(Enum): + """Workflow Manager engine flavors.""" + aiida = 'aiida' + + def __str__(self): + return self.value + + +# TODO COMMENT: we can make this alse a protocol, but I don't see the reason here for that +class AbstractGenerator(ABC): + """Generator of workflow for an engine.""" + + @staticmethod + @abstractmethod + def get_engine_name() -> str: + """The engine name used for the help text.""" + raise NotImplementedError + + @staticmethod + @abstractmethod + def add_parse_args(parser) -> None: + """Adds arguments to the parser that are needed for a specific engine implementation.""" + raise NotImplementedError + + # TODO COMMENT: This could be also a __init__ plus method, but I thought one method is easier, then the implementation can do whatever + @classmethod + @abstractmethod + def generate(cls, job_list, as_conf, **arg_options) -> None: + """Generates the workflow from the created autosubmit workflow.""" + raise NotImplementedError + + +def get_engine_generator(engine: Engine) -> AbstractGenerator: + return import_module(f'autosubmit.generators.{engine.value}').Generator + +__all__ = [ + 'Engine', + 'get_engine_generator' +] diff --git a/autosubmit/generators/aiida.py b/autosubmit/generators/aiida.py new file mode 100644 index 0000000000000000000000000000000000000000..13e33457fcd0be7179f0d7206e98402371fdc079 --- /dev/null +++ b/autosubmit/generators/aiida.py @@ -0,0 +1,364 @@ +from pathlib import Path +from functools import cached_property +import warnings +import re +import yaml + +from autosubmitconfigparser.config.configcommon import AutosubmitConfig + +from autosubmit.job.job_list import JobList +from autosubmit.generators import AbstractGenerator +from autosubmit.platforms.platform import Platform +from autosubmit.platforms.paramiko_platform import ParamikoPlatform + +"""The AiiDA generator for Autosubmit.""" + +# Autosubmit Task name separator (not to be confused with task and chunk name separator). +DEFAULT_SEPARATOR = '_' + + +class Generator(AbstractGenerator): + """Generates an aiida workflow script that initializes all required AiiDA resources. + + The generated file is structures as the following: + * header: Information about the generation of the file + * imports: All required imports + * init: Initialization of all python resources that need to be instantiated once for the whole script + * create_orm_nodes: Creation of all AiiDA's object-relational mapping (ORM) nodes covering the creation of computer and codes + * workgraph_tasks: Creation of the AiiDA-WorkGraph tasks + * workgraph_deps: Linking of the dependencies between the AiiDA-WorkGraph tasks + * workgraph_submission: Submission of the AiiDA-WorkGraph + """ + # AiiDA Slurm options + # --cpus-per-task -> num_cores_per_mpiproc + # --ntasks-per-node -> num_mpiprocs_per_machine + # --nodes -> num_machines + # --mem -> max_memory_kb (with some translation) + # --qos -> qos + # --partition -> queue_name + # + # Autosubmit Slurm options + # --cpus-per-task -> NUMTHREADS (job) + # --ntasks-per-node -> TASKS (job) + # --nodes -> NODES (job) + # --mem -> MEMORY (job) + # --qos -> CURRENT_QUEUE (job) + # --partition -> PARTITION (job) + # + # TODO investigate platform parameters how they interact with job + # Can the CURRENT_* parameters be specified by the user? + # I think the CURRENT_* properties are just copied over from platform + # + # TODO what is "MAX_PROCESSORS" used for? + # do not support for the moment, need to handle warning + # + # + # TODO EXECUTABLE are not supported ATM + # TODO Platform.PROJECT -> sbatch -A + # TODO in the expdef the PROJECT_TYPE should be git or local + #SUPPORT_GLOBAL_KEYWORDS = ["WRAPPERS"] + SUPPORTED_JOB_KEYWORDS = ["NUMTHREADS", "TASKS", "NODES", "WALLCLOCK", "MEMORY", "PLATFORM", # these we need to transfer to aiida + "DEPENDENCIES", "FILE", "RUNNING"] # these are resolved by autosubmit internally + # parameters, because in AiiDA we do not support them as computer + # parameters so we need to overwrite them in the tasks + + # NOTE we cannot speciy MAX_WALLCLOCK in the platform in AiiDA so we need to overwrite it in the task + SUPPORTED_PLATFORM_KEYWORDS = ["TYPE", "HOST", "USER", "QUEUE", "SCRATCH_DIR", "MAX_WALLCLOCK", # these we need to transfer to aiida + "PROJECT"] # these are resolved by autosubmit internally + + def __init__(self, job_list: JobList, as_conf: AutosubmitConfig, output_dir: str): + if not (output_path := Path(output_dir)).exists(): + raise ValueError(f"Given `output_dir` {output_path} does not exist.") + # TODO name is ambiguous as a bit as output_path is not -> output_dir + self._output_path = (output_path / job_list.expid).absolute() + # TODO I think here we should raise an error to not accidently overwrite + self._output_path.mkdir(exist_ok=True) + self._job_list = job_list + self._as_conf = as_conf + + @classmethod + def generate(cls, job_list: JobList, as_conf: AutosubmitConfig, output_dir: str) -> None: + # TODO Questions: + # - Are in autosubmit the scripts always on the local machine and then copied over to remote? what is with EXECUTABLE? + self = cls(job_list, as_conf, output_dir) + self._validate() + workflow_script = self._generate_workflow_script() + (self._output_path / "submit_aiida_workflow.py").write_text(workflow_script) + (self._output_path / "README.md").write_text(self._generate_readme()) + + @staticmethod + def get_engine_name() -> str: + return "AiiDA" + + @staticmethod + def add_parse_args(parser) -> None: + parser.add_argument('-o', '--output_dir', dest="output_dir", default=".", help='Output directory') + + def _validate(self) -> None: + ## validate jobs + for job_name, job_conf in self._as_conf.starter_conf['JOBS'].items(): + for key in job_conf.keys(): + if key not in Generator.SUPPORTED_JOB_KEYWORDS: + msg = f"Found in job {job_name} configuration file key {key} that is not supported for AiiDA generator." + warnings.warn(msg) + ## validate platforms + for platform_name, platform_conf in self._as_conf.starter_conf['PLATFORMS'].items(): + # only validate platforms that are used in jobs + if platform_name in self._platforms_used_in_job.keys(): + for key, value in platform_conf.items(): + if key not in Generator.SUPPORTED_PLATFORM_KEYWORDS and value != '': + msg = f"Found in platform {platform_name} configuration file key {key} that is not supported for AiiDA generator." + warnings.warn(msg) + + + @cached_property + def _platforms_used_in_job(self) -> dict[str, Platform]: + platforms_used_in_jobs = {} + for job in self._job_list.get_all(): + platforms_used_in_jobs[job.platform.name] = job.platform + return platforms_used_in_jobs + + def _generate_workflow_script(self) -> str: + """Generates a PyFlow workflow using Autosubmit database. + + The ``autosubmit create`` command must have been already executed prior + to calling this function. This is so that the jobs are correctly loaded + to produce the PyFlow workflow. + + :param job_list: ``JobList`` Autosubmit object, that contains the parameters, jobs, and graph + :param as_conf: Autosubmit configuration + :param options: a list of strings with arguments (equivalent to sys.argv), passed to argparse + """ + header = self._generate_header_section() + imports = self._generate_imports_section() + init = self._generate_init_section() + create_orm_nodes = self._generate_create_orm_nodes_section() + workgraph_tasks = self._generate_workgraph_tasks_section() + workgraph_deps = self._generate_workgraph_deps_section() + workgraph_submission = self._generate_workgraph_submission_section() + return header + imports + init + create_orm_nodes + workgraph_tasks + workgraph_deps + workgraph_submission + + + def _generate_header_section(self) -> str: + return f"""# HEADER +# This is in autogenerated file from {self._job_list.expid} +# The computer and codes are defined for the following platforms: +# {list(self._platforms_used_in_job.keys())} + +""" + + def _generate_imports_section(self) -> str: + return """# IMPORTS +import aiida +from aiida import orm +from aiida_workgraph import WorkGraph +from aiida.orm.utils.builders.computer import ComputerBuilder +from aiida.common.exceptions import NotExistent +from pathlib import Path +import yaml + +""" + + def _generate_init_section(self) -> str: + return """# INIT +aiida.load_profile() +wg = WorkGraph() +tasks = {} + +""" + + def _generate_create_orm_nodes_section(self) -> str: + # aiida computer + + code_section = "# CREATE_ORM_NODES" + for platform in self._platforms_used_in_job.values(): + + from autosubmit.platforms.locplatform import LocalPlatform + from autosubmit.platforms.slurmplatform import SlurmPlatform + if isinstance(platform, LocalPlatform): + computer_setup = { + "label": f"{platform.name}", + "hostname": f"{platform.host}", + "work_dir": f"{platform.scratch}", + "description": "", + "transport": "core.local", + "scheduler": "core.direct", + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "mpiprocs_per_machine": 1, + "default_memory_per_machine": None, + "append_text": "", + "prepend_text": "", + "use_double_quotes": False, + "shebang": "#!/bin/bash", + } + elif isinstance(platform, SlurmPlatform): + # TODO Do we need to handle invalid parameters or is this done by autosubmit + # => Autosubmit does the validation, we do not need to worry + #if platform.processors_per_node is None: + # raise ValueError("") + #if platform.max_wallclock is None: + # raise ValueError("") + computer_setup = { + "label": f"{platform.name}", + "hostname": f"{platform.host}", + "work_dir": f"{platform.scratch}", + #username": f"{platform.user}", does not work + "description": "", + "transport": "core.ssh", + "scheduler": "core.slurm", + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "mpiprocs_per_machine": platform.processors_per_node, + "default_memory_per_machine": None, # This is specified in the task option + "append_text": "", + "prepend_text": "", + "use_double_quotes": False, + "shebang": "#!/bin/bash", + } + else: + raise ValueError(f"Platform type {platform} not supported for engine aiida.") + + computer_setup_path = Path(self._output_path / f"{platform.name}/{platform.name}-setup.yml") + computer_setup_path.parent.mkdir(exist_ok=True) + computer_setup_path.write_text(yaml.dump(computer_setup)) + create_computer = f""" +try: + computer = orm.load_computer("{platform.name}") + print(f"Loaded computer {{computer.label!r}}") +except NotExistent: + setup_path = Path("{computer_setup_path}") + config_kwargs = yaml.safe_load(setup_path.read_text()) + computer = ComputerBuilder(**config_kwargs).new().store() + # TODO check if we should set this + #computer.configure(safe_interval=0.0) + #computer.set_minimum_job_poll_interval(0.0) + + from aiida.transports.plugins.ssh import SshTransport + from aiida.transports import cli as transport_cli + default_kwargs = {{name: transport_cli.transport_option_default(name, computer) for name in SshTransport.get_valid_auth_params()}} + default_kwargs["port"] = int(default_kwargs["port"]) + default_kwargs["timeout"] = float(default_kwargs["timeout"]) + default_kwargs["username"] = "{platform.user}" + default_kwargs['key_filename'] = "/home/alexgo/.ssh/id_rsa" # TODO not sure why default kwargs don't already give this as in CLI this is the default arg + computer.configure(user=orm.User.collection.get_default(), **default_kwargs) + print(f"Created and stored computer {{computer.label}}")""" + + # aiida bash code to run script + code_setup = { + "computer": f"{platform.name}", + "filepath_executable": "/bin/bash", + "label": "bash", + "description": '', + "default_calc_job_plugin": 'core.shell', + "prepend_text": '', + "append_text": '', + "use_double_quotes": False, + "with_mpi": False + } + + code_setup_path = Path(self._output_path / f"{platform.name}/bash@{platform.name}-setup.yml") + code_setup_path.parent.mkdir(exist_ok=True) + code_setup_path.write_text(yaml.dump(code_setup)) + create_code = f""" +try: + bash_code = orm.load_code("bash@{platform.name}") +except NotExistent: + setup_path = Path("{code_setup_path}") + setup_kwargs = yaml.safe_load(setup_path.read_text()) + setup_kwargs["computer"] = orm.load_computer(setup_kwargs["computer"]) + bash_code = orm.InstalledCode(**setup_kwargs).store() + print(f"Created and stored bash@{{computer.label}}")""" + code_section += create_computer + create_code + code_section += "\n\n" + return code_section + + def _generate_workgraph_tasks_section(self): + code_section = "# WORKGRAPH_TASKS" + + for job in self._job_list.get_all(): + script_name = job.create_script(self._as_conf) + script_path = Path(job._tmp_path, script_name) + script_text = open(script_path).read() + # Let's drop the Autosubmit header and tailed. + trimmed_script_text = re.findall( + r'# Autosubmit job(.*)# Autosubmit tailer', + script_text, + flags=re.DOTALL | re.MULTILINE)[0][1:-1] + trimmed_script_path = self._output_path / script_name + trimmed_script_path.write_text(trimmed_script_text) + create_task = f""" +tasks["{job.name}"] = wg.add_task( + "ShellJob", + name = "{job.name}", + command = orm.load_code("bash@{job.platform.name}"), + arguments = ["{{script}}"], + nodes = {{"script": orm.SinglefileData("{trimmed_script_path}")}} +)""" + # TODO is this the correct check, to check if a parameter has been specified + # or should we do hasattr(job, "memory")? Then we get the default argument + if job.memory != "": + # TODO job.memory needs to be most likely converted to kb + # TODO job.memory_per_task? + create_task += f""" +tasks["{job.name}"].set({{"metadata.options.max_memory_kb": {job.memory}}})""" + if job.platform.max_wallclock is None: + # TODO is this actually needed or does autosubmit already cover this logic? + if job.parameters["WALLCLOCK"] != "": + wallclock_seconds = int(ParamikoPlatform.parse_time(job.wallclock).total_seconds()) + else: + wallclock_seconds = int(ParamikoPlatform.parse_time(job.platform.wallclock).total_seconds()) + create_task += f""" +tasks["{job.name}"].set({{"metadata.options.max_wallclock_seconds": {wallclock_seconds}}})""" + if job.partition != "": + create_task += f""" +tasks["{job.name}"].set({{"metadata.options.queue_name": "{job.partition}"}})""" + #if job.platform.user != '': + # create_task += f""" + #tasks["{job.name}"].set({{"metadata.options.account": "{job.platform.user}"}})""" + + + code_section += create_task + code_section += "\n\n" + return code_section + + def _generate_workgraph_deps_section(self) -> str: + code_section = "# WORKGRAPH_DEPS" + for edge in self._job_list.graph.edges: + code_section += f""" +tasks["{edge[1]}"].waiting_on.add(tasks["{edge[0]}"])""" + code_section += "\n\n" + return code_section + + + def _generate_workgraph_submission_section(self) -> str: + return """# WORKGRAPH_SUBMISSION +wg.run()""" #TODO change to submit once debugging finished + + def _generate_readme(self) -> str: + return f"""### Meta-information +This file has been auto generated from expid {self._job_list.expid}. + +### Instructions +To run the workflow please ensure that you have AiiDA installed and configured. +For that please refer to the +[installation section of AiiDA](https://aiida.readthedocs.io/projects/aiida-core/en/stable/installation/index.html) +Then you can run the generated workflow with: +```bash +python submit_aiida_workflow.py +``` + +### Node Creation + +The workflow creates the computer and code nodes as required from the +autosubmit config. If the computer and code nodes have been already created for +example through a previous run, they are loaded. If you have changed the +computer parameters in your autosubmit config, then you need to delete the +corresponding compute and code nodes. Note that this has the disadvantage thit +it will delete all calculations that are associated with these nodes. +It is therefore recommended to use the JOB paramaters in autosubmit to override +certain computer configs. +""" + + +__all__ = [ + 'Generator' +] diff --git a/autosubmit/history/internal_logging.py b/autosubmit/history/internal_logging.py index f9b667814b62716e0216ac763ea3c470164de4cd..1716d7398942f84a6404477c76c215a2a5397e6e 100644 --- a/autosubmit/history/internal_logging.py +++ b/autosubmit/history/internal_logging.py @@ -45,4 +45,4 @@ class Logging: os.makedirs(self.historiclog_dir_path) def get_log_file_path(self): - return os.path.join(self.historiclog_dir_path,"{}_log.txt".format(self.expid)) \ No newline at end of file + return os.path.join(self.historiclog_dir_path,"{}_log.txt".format(self.expid)) diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 7dda872dbe58ec80f347fe12d78d6aaa17f0a905..35055cc8dd478b776d5566f9bb0adc1485081796 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1350,7 +1350,9 @@ class ParamikoPlatform(Platform): header = header.replace( '%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job)) return header - def parse_time(self,wallclock): + + @staticmethod + def parse_time(wallclock): # noinspection Annotator regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') parts = regex.match(wallclock) diff --git a/bin/autosubmit b/bin/autosubmit index d08f47593a4bffef1fe577076a6ffe24c394b856..e6f1cdfbb18b32b26dff3369c11273d0cb0c4115 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -55,19 +55,19 @@ def exit_from_error(e: BaseException): # noinspection PyProtectedMember def main(): - try: - return_value = Autosubmit.parse_args() - if os.path.exists(os.path.join(Log.file_path, "autosubmit.lock")): - os.remove(os.path.join(Log.file_path, "autosubmit.lock")) - if type(return_value) is int: - os._exit(return_value) - os._exit(0) - except AutosubmitError as e: - exit_from_error(e) - except AutosubmitCritical as e: - exit_from_error(e) - except BaseException as e: - exit_from_error(e) + #try: + return_value = Autosubmit.parse_args() + if os.path.exists(os.path.join(Log.file_path, "autosubmit.lock")): + os.remove(os.path.join(Log.file_path, "autosubmit.lock")) + if type(return_value) is int: + os._exit(return_value) + os._exit(0) + #except AutosubmitError as e: + # exit_from_error(e) + #except AutosubmitCritical as e: + # exit_from_error(e) + #except BaseException as e: + # exit_from_error(e) if __name__ == "__main__": main()