From 9e1216cc44d8d132bc9024c6dccf10747cd24d47 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Tue, 29 Oct 2024 12:30:46 +0100 Subject: [PATCH 1/7] WIP integrate Aiida-engine --- autosubmit/autosubmit.py | 59 +++++++++++++++++++++++++- autosubmit/history/internal_logging.py | 2 +- bin/autosubmit | 26 ++++++------ 3 files changed, 72 insertions(+), 15 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 8d3516c37..12661f3e4 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -57,6 +57,7 @@ from .notifications.notifier import Notifier from .platforms.paramiko_submitter import ParamikoSubmitter from .platforms.platform import Platform from .migrate.migrate import Migrate +from .generators import Engine, get_engine_generator dialog = None from time import sleep @@ -87,7 +88,6 @@ import autosubmit.history.utils as HUtils import autosubmit.helpers.autosubmit_helper as AutosubmitHelper import autosubmit.statistics.utils as StatisticsUtils from autosubmit.helpers.utils import proccess_id, terminate_child_process, check_jobs_file_exists - from contextlib import suppress """ @@ -684,6 +684,23 @@ class Autosubmit: help='Select the status (one or more) to filter the list of jobs.') subparser.add_argument('-t', '--target', type=str, default="FAILED", metavar='STATUS', help='Final status of killed jobs. Default is FAILED.') + + # Generate + subparser = subparsers.add_parser( + 'generate', description='Generate a workflow definition for a different Workflow Manager', + argument_default=argparse.SUPPRESS) + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-e', '--engine', type=str.lower, + help='The target Workflow Manager engine', choices=[engine.value for engine in Engine]) + #subparser.add_argument('args', nargs='?') + + # Needed? We need to expose subcommands for each engine + #if len(sys.argv) > 1 and len(sys.argv[1]) > 1 and sys.argv[1] in ['generate']: + # args, options = parser.parse_known_args() + #else: + # options = [] + # args = parser.parse_args() + args, unknown = parser.parse_known_args() if args.version: Log.info(Autosubmit.autosubmit_version) @@ -790,6 +807,9 @@ class Autosubmit: return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect) elif args.command == 'stop': return Autosubmit.stop(args.expid, args.force, args.all, args.force_all, args.cancel, args.filter_status, args.target) + elif args.command == 'generate': + return Autosubmit.generate_workflow(args.expid, Engine[args.engine], options=[]) + @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): Log.set_console_level(console_level) @@ -6104,5 +6124,42 @@ class Autosubmit: terminate_child_process(expid) + @staticmethod + def generate_workflow(expid: str, engine: Engine, options: List[str]) -> None: + """Generate the workflow configuration for a different backend engine.""" + Log.info(f'Generate workflow configuration for {engine}') + + try: + Log.info("Getting job list...") + as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) + as_conf.check_conf_files(False) + + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if len(submitter.platforms) == 0: + raise ValueError('Missing platform!') + + packages_persistence = JobPackagePersistence( + os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=False, monitor=False) + + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + + hpc_architecture = as_conf.get_platform() + for job in job_list.get_job_list(): + if job.platform_name is None or job.platform_name == '': + job.platform_name = hpc_architecture + job.platform = submitter.platforms[job.platform_name] + job.update_parameters(as_conf, job_list.parameters) + + job_list.check_scripts(as_conf) + except AutosubmitError as e: + raise AutosubmitCritical(e.message, e.code, e.trace) + except AutosubmitCritical as e: + raise + except BaseException as e: + raise AutosubmitCritical("Error while checking the configuration files or loading the job_list", 7040, + str(e)) + get_engine_generator(engine)(job_list, as_conf, [f'--experiment={expid}', *options]) diff --git a/autosubmit/history/internal_logging.py b/autosubmit/history/internal_logging.py index f9b667814..1716d7398 100644 --- a/autosubmit/history/internal_logging.py +++ b/autosubmit/history/internal_logging.py @@ -45,4 +45,4 @@ class Logging: os.makedirs(self.historiclog_dir_path) def get_log_file_path(self): - return os.path.join(self.historiclog_dir_path,"{}_log.txt".format(self.expid)) \ No newline at end of file + return os.path.join(self.historiclog_dir_path,"{}_log.txt".format(self.expid)) diff --git a/bin/autosubmit b/bin/autosubmit index d08f47593..e6f1cdfbb 100755 --- a/bin/autosubmit +++ b/bin/autosubmit @@ -55,19 +55,19 @@ def exit_from_error(e: BaseException): # noinspection PyProtectedMember def main(): - try: - return_value = Autosubmit.parse_args() - if os.path.exists(os.path.join(Log.file_path, "autosubmit.lock")): - os.remove(os.path.join(Log.file_path, "autosubmit.lock")) - if type(return_value) is int: - os._exit(return_value) - os._exit(0) - except AutosubmitError as e: - exit_from_error(e) - except AutosubmitCritical as e: - exit_from_error(e) - except BaseException as e: - exit_from_error(e) + #try: + return_value = Autosubmit.parse_args() + if os.path.exists(os.path.join(Log.file_path, "autosubmit.lock")): + os.remove(os.path.join(Log.file_path, "autosubmit.lock")) + if type(return_value) is int: + os._exit(return_value) + os._exit(0) + #except AutosubmitError as e: + # exit_from_error(e) + #except AutosubmitCritical as e: + # exit_from_error(e) + #except BaseException as e: + # exit_from_error(e) if __name__ == "__main__": main() -- GitLab From 1c242c2b76938ba4d883d0efbf2f92c0171429c1 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Tue, 29 Oct 2024 17:20:23 +0100 Subject: [PATCH 2/7] add aiida generator --- autosubmit/generators/__init__.py | 31 +++ autosubmit/generators/aiida.py | 437 ++++++++++++++++++++++++++++++ 2 files changed, 468 insertions(+) create mode 100644 autosubmit/generators/__init__.py create mode 100644 autosubmit/generators/aiida.py diff --git a/autosubmit/generators/__init__.py b/autosubmit/generators/__init__.py new file mode 100644 index 000000000..25ae600e1 --- /dev/null +++ b/autosubmit/generators/__init__.py @@ -0,0 +1,31 @@ + +from enum import Enum +from importlib import import_module +from typing import Callable, cast + +"""This module provides generators to produce workflow configurations for different backend engines.""" + +class Engine(Enum): + """Workflow Manager engine flavors.""" + aiida = 'aiida' + + def __str__(self): + return self.value + + +# TODO: use typing.Protocol instead of object when Py>=3.8 +class GenerateProto: + """Need a protocol to define the type returned by importlib.""" + generate: Callable + + +def get_engine_generator(engine: Engine) -> Callable: + """Dynamically loads the engine generate function.""" + generator_function = cast(GenerateProto, import_module(f'autosubmit.generators.{engine.value}')) + return generator_function.generate + + +__all__ = [ + 'Engine', + 'get_engine_generator' +] diff --git a/autosubmit/generators/aiida.py b/autosubmit/generators/aiida.py new file mode 100644 index 000000000..f5a2b2c86 --- /dev/null +++ b/autosubmit/generators/aiida.py @@ -0,0 +1,437 @@ +import argparse +import tempfile +from enum import Enum +from pathlib import Path +from typing import List + +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from autosubmitconfigparser.config.basicconfig import BasicConfig + +from autosubmit.job.job_list import JobList, Job + +import aiida + +"""The PyFlow generator for Autosubmit.""" + +# Autosubmit Task name separator (not to be confused with task and chunk name separator). +DEFAULT_SEPARATOR = '_' + + +class Running(Enum): + """The Running level of an Autosubmit task.""" + ONCE = 'once' + MEMBER = 'member' + CHUNK = 'chunk' + + def __str__(self): + return self.value + + +# Defines how many ``-``'s are replaced by a ``/`` for +# each Autosubmit hierarchy level (to avoid using an if/elif/else). +REPLACE_COUNT = { + Running.ONCE.value: 1, + Running.MEMBER.value: 3, + Running.CHUNK.value: 4 +} + + +def _autosubmit_id_to_ecflow_id(job_id: str, running: str): + """Given an Autosubmit ID, create the node ID for ecFlow (minus heading ``/``).""" + replace_count = REPLACE_COUNT[running] + return job_id.replace(DEFAULT_SEPARATOR, '/', replace_count) + + +def _parse_args(args) -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog='autosubmit generate ... engine=pyflow', + description='Produces a valid PyFlow workflow configuration given an Autosubmit experiment ID', + epilog='This program needs access to an Autosubmit installation' + ) + parser.add_argument('-e', '--experiment', required=True, help='Autosubmit experiment ID') + parser.add_argument('-d', '--deploy', default=False, action='store_true', help='Deploy to ecFlow or not') + parser.add_argument('-o', '--output', default=tempfile.gettempdir(), help='Output directory') + parser.add_argument('-s', '--server', default='localhost', + help='ecFlow server hostname or IP (only used if deploy=True)') + parser.add_argument('-p', '--port', default=3141, help='ecFlow server port (only used if deploy=True)') + parser.add_argument('-g', '--graph', default=False, action='store_true', help='Print the DOT plot') + parser.add_argument('-q', '--quiet', default=False, action='store_true') + + return parser.parse_args(args) + +#def _create_ecflow_suite( +# experiment_id: str, +# start_dates: List[str], +# members: List[str], +# chunks: List[int], +# jobs: List[Job], +# #server_host: str, +# #output_dir: str, +# as_conf: AutosubmitConfig) -> Suite: +# """Replicate the vanilla workflow graph structure.""" +# +# # From: https://pyflow-workflow-generator.readthedocs.io/en/latest/content/introductory-course/getting-started.html +# # /scratch is a base directory for ECF_FILES and ECF_HOME +# breakpoint() +# scratch_dir = None #Path(Path(output_dir).absolute(), 'scratch') !TODO +# # /scratch/files is the ECF_FILES, where ecflow_server looks for ecf scripts if they are not in their default location +# files_dir = scratch_dir / 'files' +# # /scratch/out is the ECF_HOME, the home of all ecFlow files, $CWD +# out_dir = scratch_dir / 'out' +# +# if not files_dir.exists(): +# files_dir.mkdir(parents=True, exist_ok=True) +# +# if not out_dir.exists(): +# out_dir.mkdir(parents=True, exist_ok=True) +# +# # First we create a suite with the same ID as the Autosubmit experiment, +# # and families for each Autosubmit hierarchy level. We use control variables +# # such as home (ECF_HOME), and files (ECF_FILES), but there are others that +# # can be used too, like include (ECF_INCLUDE), out (ECF_OUT), and extn +# # (ECF_EXTN, defaults to the extension .ecf). +# # NOTE: PyFlow does not work very well with MyPy: https://github.com/ecmwf/pyflow/issues/5 +# with Suite( # typing: ignore +# experiment_id, +# host=pf.LocalHost(server_host), +# defstatus=pf.state.suspended, # type: ignore +# home=str(out_dir), # type: ignore +# files=str(files_dir) # type: ignore +# ) as s: # typing: ignore +# for start_date in start_dates: +# with AnchorFamily(start_date, START_DATE=start_date): # type: ignore +# for member in members: +# with AnchorFamily(member, MEMBER=member): # type: ignore +# for chunk in chunks: +# AnchorFamily(str(chunk), CHUNK=chunk) +# # PyFlow API makes it very easy to create tasks having the ecFlow ID. +# # Due to how we expanded the Autosubmit graph to include the ID's, and how +# # we structured this suite, an Autosubmit ID can be seamlessly translated +# # to an ecFlow ID by simply replacing `_`'s by `/`, ignoring the `_`'s in +# # tasks names. +# # +# # This means that `a000_REMOTE_SETUP` from Autosubmit is `a000/REMOTE_SETUP` +# # in ecFlow, `a000_20220401_fc0_INI` is `a000/20220401/fc0/INI`, and so on. +# for job in jobs: +# ecflow_node = _autosubmit_id_to_ecflow_id(job.long_name, job.running) +# if job.split is not None and job.split > 0: +# t = Task(f'{job.section}_{job.split}', SPLIT=job.split) +# else: +# t = Task(job.section) +# +# # Find the direct parent of the task, based on the Autosubmit task ID. +# # Start from the Suite, and skip the first (suite), and the last (task) +# # as we know we can discard these. +# parent_node = s +# for node in ecflow_node.split('/')[1:-1]: +# parent_node = parent_node[node] +# # We just need to prevent adding a node twice since creating a task automatically adds +# # it to the suite in the context. And simply call ``add_node`` and we should have it. +# # TODO: use mapping.keys when Py>=3.8 or 3.9? if t.name not in list(parent_node.children.mapping.keys()): +# if t.name not in [child.name for child in parent_node.children]: +# parent_node.add_node(t) +# +# # Dependencies +# for parent in job.parents: +# dependency_node = _autosubmit_id_to_ecflow_id(parent.long_name, parent.running) +# parent_node = s +# for node in dependency_node.split('/')[1:-1]: +# parent_node = parent_node[node] +# parent_key = parent.section +# if parent.split is not None and parent.split > 0: +# parent_key = f'{parent_key}_{parent.split}' +# dependency_node = parent_node[parent_key] +# +# # In case we ever need to use the pre-processed file. +# # +# # script_name = job.create_script(as_conf) +# # script_text = open(Path(job._tmp_path, script_name)).read() +# # # Let's drop the Autosubmit header and tailed. +# # script_text = re.findall( +# # r'# Autosubmit job(.*)# Autosubmit tailer', +# # script_text, +# # flags=re.DOTALL | re.MULTILINE)[0][1:-1] +# # t.script = script_text +# +# # Operator overloaded in PyFlow. This creates a dependency. +# dependency_node >> t +# +# # Script +# # N.B.: We used the code below in the beginning, but later we realized it would +# # not work. In Autosubmit, the FILE: $FILE is a file, relative to the AS +# # Project folder. The $FILE template script is then pre-processed to be +# # executed by AS. That is different from ecFlow, where the Task Script is +# # value is pre-processed (i.e. if the value is ``templates/local_script.sh`` +# # that value is treated as a string when included in the final job file) +# # to generate the ecFlow job file. So ecFlow pre-processes the ``.script`` +# # value, whereas Autosubmit loads the ``FILE`` script and pre-processes it. +# # +# # In order to have a similar behavior, we insert the contents of the AS +# # template script as the ecFlow task. That way, the template script (now +# # a Task Script in ecFlow) will be pre-processed by ecFlow. +# # +# # The variables may still need to be manually adjusted, but once that is +# # done, the script should then be ready to be executed (i.e. ported). +# # FIXME +# # t.script = job.file +# # with open(Path(as_conf.get_project_dir(), job.file)) as f: +# # t.script = f.read() +# t.script = 'sleep 5' +# +# return s + +def _create_aiida_workflow(): + pass + +def generate(job_list: JobList, as_conf: AutosubmitConfig, options: List[str]) -> None: + """Generates a PyFlow workflow using Autosubmit database. + + The ``autosubmit create`` command must have been already executed prior + to calling this function. This is so that the jobs are correctly loaded + to produce the PyFlow workflow. + + :param job_list: ``JobList`` Autosubmit object, that contains the parameters, jobs, and graph + :param as_conf: Autosubmit configuration + :param options: a list of strings with arguments (equivalent to sys.argv), passed to argparse + """ + # I Remove for now args usage, I am not sure if this is needed as server_host and output_dir can be referred from HPC_ARCH. But I see the general need that the engine needs extra information that cannot be inferred from an autosubmit config file + # TODO args that are needed + # - aiida-profile: We need to store at least the scripts that are internally created here. In principle we could create them to an output path. Maybe this is not needed if we put this into the script + # - output-folder: the folder where to put the script + # + # TODO Questions: + # - Are in autosubmit the scripts always on the local machine? + # + #args: argparse.Namespace = _parse_args(options) + + # TODO will becom an output + output_path = Path("/home/alexgo/code/autosubmit/generated") + + + aiida_workflow_script_init = \ + """import aiida +from aiida import orm +from aiida_workgraph import WorkGraph +from aiida.orm.utils.builders.computer import ComputerBuilder +from aiida.common.exceptions import NotExistent +from pathlib import Path + +aiida.load_profile() + +wg = WorkGraph() +tasks = {} + """ + + # aiida computer + unique_platforms = {} + for job in job_list.get_all(): + unique_platforms[job.platform.name] = job.platform # TODO is name unique? + + aiida_workflow_script_computer = """""" + for platform in unique_platforms: + + # TODO replace with ComputerBuilder since it has same syntax as in verdi + #try: + # computer = orm.load_computer(computer_kwargs["label"]) + #except NotExistent: + # computer_builder = ComputerBuilder(**computer_kwargs) + # computer = computer_builder.new() + # computer.store() + + #computer.set_minimum_job_poll_interval(0.0) + #computer.configure() + from autosubmit.platforms.locplatform import LocalPlatform + from autosubmit.platforms.slurmplatform import SlurmPlatform + if isinstance(job.platform, LocalPlatform): + create_computer = f""" +try: + computer = orm.load_computer("{job.platform.name}") # The computer label can also be omitted here +except NotExistent: + + computer = orm.Computer( + label = "{platform.name}" + hostname = "{platform.host}" + work_dir = "{platform.scratch}" + mpiprocs_per_machine = {platform.processors_per_node} # TODO used? + mpirun_command = "mpirun -np {{tot_num_mpiprocs}}" # TODO used? + description = "" + transport_type = "core.local" + scheduler_type = "core.direct" + shebang = "/bin/bash" + ) + """ + elif isinstance(job.platform, SlurmPlatform): + create_computer = f""" +try: + computer = orm.load_computer("{platform.name}") # The computer label can also be omitted here +except NotExistent: + computer = orm.Computer( + label = "{platform.name}" + hostname = "{platform.host}" + work_dir = "{platform.scratch}" + mpiprocs_per_machine = {platform.processors_per_node} + mpirun_command = "mpirun -np {{tot_num_mpiprocs}}" + description = "" + transport_type = "core.ssh" + scheduler_type = "core.slurm" + append_text = "" + prepend_text = "" + use_double_quotes = False, + shebang = "/bin/bash" + ) + """ + else: + raise ValueError(f"Platform type {job.platform} not supported for engine aiida") + + # aiida bash code to run script + create_code = f""" +try: + bash_code = orm.load_code(f"bash@{{computer.label}}") +except NotExistent: + executable_path = Path("{trimmed_script_path}") + bash_code = orm.InstalledCode( + computer=computer, + filepath_executable="/bin/bash", + label="bash", + description='', + default_calc_job_plugin='core.shell', + prepend_text='', + append_text='', + use_double_quotes=False, + with_mpi=False + ).store() + print(f"Created and stored bash@{{computer.label}}") + """ + + + # aiida tasks + aiida_workflow_script_tasks = "" + for job in job_list.get_all(): + script_name = job.create_script(as_conf) + script_path = Path(job._tmp_path, script_name) + script_text = open(script_path).read() + # Let's drop the Autosubmit header and tailed. + import re + trimmed_script_text = re.findall( + r'# Autosubmit job(.*)# Autosubmit tailer', + script_text, + flags=re.DOTALL | re.MULTILINE)[0][1:-1] + trimmed_script_path = output_path / script_name + with open(trimmed_script_path, mode="w") as trimmed_script_file: + trimmed_script_file.write(trimmed_script_text) + create_task = f""" +tasks["{job.name}"] = wg.add_task( + "ShellJob", + name="{job.name}", + command=bash_code, + arguments=["{{script}}"], + nodes={{"script": orm.SinglefileData("{trimmed_script_path}")}} +) +tasks["{job.name}"].set({{"metadata.computer": computer}}) +# TODO set default_memory_per_machine = {job.parameters['MEMORY']} + """ + aiida_workflow_script_tasks += create_computer + create_code + create_task + + aiida_workflow_script_deps = "\n" + for edge in job_list.graph.edges: + add_edges = f""" +tasks["{edge[1]}"].waiting_on.add(tasks["{edge[0]}"])""" + aiida_workflow_script_deps += add_edges + aiida_workflow_script_deps = "\n" + + + aiida_workflow_script_run = """ +wg.submit()""" + aiida_workflow_script_text = aiida_workflow_script_init + aiida_workflow_script_tasks + aiida_workflow_script_deps + aiida_workflow_script_run + (output_path / "submit_aiida_workflow.py").write_text(aiida_workflow_script_text) + + # # TODO: Is this important? Do we need it if we directly create the graph? + # #ecflow_node = _autosubmit_id_to_ecflow_id(job.long_name, job.running) + # #if job.split is not None and job.split > 0: + # # t = Task(f'{job.section}_{job.split}', SPLIT=job.split) + # #else: + # # t = Task(job.section) + + # # TODO I think this is ecflow specific and we can reomve it + # ## Find the direct parent of the task, based on the Autosubmit task ID. + # ## Start from the Suite, and skip the first (suite), and the last (task) + # ## as we know we can discard these. + # #parent_node = s + # #for node in ecflow_node.split('/')[1:-1]: + # # parent_node = parent_node[node] + # ## We just need to prevent adding a node twice since creating a task automatically adds + # ## it to the suite in the context. And simply call ``add_node`` and we should have it. + # ## TODO: use mapping.keys when Py>=3.8 or 3.9? if t.name not in list(parent_node.children.mapping.keys()): + # #if t.name not in [child.name for child in parent_node.children]: + # # parent_node.add_node(t) + + # ## Dependencies + # #for parent in job.parents: + # # dependency_node = _autosubmit_id_to_ecflow_id(parent.long_name, parent.running) + # # parent_node = s + # # for node in dependency_node.split('/')[1:-1]: + # # parent_node = parent_node[node] + # # parent_key = parent.section + # # if parent.split is not None and parent.split > 0: + # # parent_key = f'{parent_key}_{parent.split}' + # # dependency_node = parent_node[parent_key] + + # # # In case we ever need to use the pre-processed file. + # # # + # # # script_name = job.create_script(as_conf) + # # # script_text = open(Path(job._tmp_path, script_name)).read() + # # # # Let's drop the Autosubmit header and tailed. + # # # script_text = re.findall( + # # # r'# Autosubmit job(.*)# Autosubmit tailer', + # # # script_text, + # # # flags=re.DOTALL | re.MULTILINE)[0][1:-1] + # # # t.script = script_text + + # # # Operator overloaded in PyFlow. This creates a dependency. + # # dependency_node >> t + + # # Script + # # N.B.: We used the code below in the beginning, but later we realized it would + # # not work. In Autosubmit, the FILE: $FILE is a file, relative to the AS + # # Project folder. The $FILE template script is then pre-processed to be + # # executed by AS. That is different from ecFlow, where the Task Script is + # # value is pre-processed (i.e. if the value is ``templates/local_script.sh`` + # # that value is treated as a string when included in the final job file) + # # to generate the ecFlow job file. So ecFlow pre-processes the ``.script`` + # # value, whereas Autosubmit loads the ``FILE`` script and pre-processes it. + # # + # # In order to have a similar behavior, we insert the contents of the AS + # # template script as the ecFlow task. That way, the template script (now + # # a Task Script in ecFlow) will be pre-processed by ecFlow. + # # + # # The variables may still need to be manually adjusted, but once that is + # # done, the script should then be ready to be executed (i.e. ported). + # # FIXME + # # t.script = job.file + # # with open(Path(as_conf.get_project_dir(), job.file)) as f: + # # t.script = f.read() + # t.script = 'sleep 5' + + #suite = _create_ecflow_suite( + # experiment_id=expid, + # start_dates=start_dates, + # members=members, + # chunks=chunks, + # jobs=job_list.get_all(), + # #server_host=args.server, + # #output_dir=args.output, + # as_conf=as_conf + #) + + #suite.check_definition() + #if not args.quiet: + # print(suite) + + #if args.deploy: + # suite.deploy_suite(overwrite=True) # type: ignore + # suite.replace_on_server(host=args.server, port=args.port) + + +__all__ = [ + 'generate' +] -- GitLab From 393f4c6eb49b1741ae8d3b9ec3df3ba1965a42bc Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 30 Oct 2024 09:14:52 +0100 Subject: [PATCH 3/7] Checkpoint: Runs but expects --- autosubmit/generators/aiida.py | 381 +++++++++------------------------ 1 file changed, 96 insertions(+), 285 deletions(-) diff --git a/autosubmit/generators/aiida.py b/autosubmit/generators/aiida.py index f5a2b2c86..5f465069f 100644 --- a/autosubmit/generators/aiida.py +++ b/autosubmit/generators/aiida.py @@ -59,130 +59,6 @@ def _parse_args(args) -> argparse.Namespace: return parser.parse_args(args) -#def _create_ecflow_suite( -# experiment_id: str, -# start_dates: List[str], -# members: List[str], -# chunks: List[int], -# jobs: List[Job], -# #server_host: str, -# #output_dir: str, -# as_conf: AutosubmitConfig) -> Suite: -# """Replicate the vanilla workflow graph structure.""" -# -# # From: https://pyflow-workflow-generator.readthedocs.io/en/latest/content/introductory-course/getting-started.html -# # /scratch is a base directory for ECF_FILES and ECF_HOME -# breakpoint() -# scratch_dir = None #Path(Path(output_dir).absolute(), 'scratch') !TODO -# # /scratch/files is the ECF_FILES, where ecflow_server looks for ecf scripts if they are not in their default location -# files_dir = scratch_dir / 'files' -# # /scratch/out is the ECF_HOME, the home of all ecFlow files, $CWD -# out_dir = scratch_dir / 'out' -# -# if not files_dir.exists(): -# files_dir.mkdir(parents=True, exist_ok=True) -# -# if not out_dir.exists(): -# out_dir.mkdir(parents=True, exist_ok=True) -# -# # First we create a suite with the same ID as the Autosubmit experiment, -# # and families for each Autosubmit hierarchy level. We use control variables -# # such as home (ECF_HOME), and files (ECF_FILES), but there are others that -# # can be used too, like include (ECF_INCLUDE), out (ECF_OUT), and extn -# # (ECF_EXTN, defaults to the extension .ecf). -# # NOTE: PyFlow does not work very well with MyPy: https://github.com/ecmwf/pyflow/issues/5 -# with Suite( # typing: ignore -# experiment_id, -# host=pf.LocalHost(server_host), -# defstatus=pf.state.suspended, # type: ignore -# home=str(out_dir), # type: ignore -# files=str(files_dir) # type: ignore -# ) as s: # typing: ignore -# for start_date in start_dates: -# with AnchorFamily(start_date, START_DATE=start_date): # type: ignore -# for member in members: -# with AnchorFamily(member, MEMBER=member): # type: ignore -# for chunk in chunks: -# AnchorFamily(str(chunk), CHUNK=chunk) -# # PyFlow API makes it very easy to create tasks having the ecFlow ID. -# # Due to how we expanded the Autosubmit graph to include the ID's, and how -# # we structured this suite, an Autosubmit ID can be seamlessly translated -# # to an ecFlow ID by simply replacing `_`'s by `/`, ignoring the `_`'s in -# # tasks names. -# # -# # This means that `a000_REMOTE_SETUP` from Autosubmit is `a000/REMOTE_SETUP` -# # in ecFlow, `a000_20220401_fc0_INI` is `a000/20220401/fc0/INI`, and so on. -# for job in jobs: -# ecflow_node = _autosubmit_id_to_ecflow_id(job.long_name, job.running) -# if job.split is not None and job.split > 0: -# t = Task(f'{job.section}_{job.split}', SPLIT=job.split) -# else: -# t = Task(job.section) -# -# # Find the direct parent of the task, based on the Autosubmit task ID. -# # Start from the Suite, and skip the first (suite), and the last (task) -# # as we know we can discard these. -# parent_node = s -# for node in ecflow_node.split('/')[1:-1]: -# parent_node = parent_node[node] -# # We just need to prevent adding a node twice since creating a task automatically adds -# # it to the suite in the context. And simply call ``add_node`` and we should have it. -# # TODO: use mapping.keys when Py>=3.8 or 3.9? if t.name not in list(parent_node.children.mapping.keys()): -# if t.name not in [child.name for child in parent_node.children]: -# parent_node.add_node(t) -# -# # Dependencies -# for parent in job.parents: -# dependency_node = _autosubmit_id_to_ecflow_id(parent.long_name, parent.running) -# parent_node = s -# for node in dependency_node.split('/')[1:-1]: -# parent_node = parent_node[node] -# parent_key = parent.section -# if parent.split is not None and parent.split > 0: -# parent_key = f'{parent_key}_{parent.split}' -# dependency_node = parent_node[parent_key] -# -# # In case we ever need to use the pre-processed file. -# # -# # script_name = job.create_script(as_conf) -# # script_text = open(Path(job._tmp_path, script_name)).read() -# # # Let's drop the Autosubmit header and tailed. -# # script_text = re.findall( -# # r'# Autosubmit job(.*)# Autosubmit tailer', -# # script_text, -# # flags=re.DOTALL | re.MULTILINE)[0][1:-1] -# # t.script = script_text -# -# # Operator overloaded in PyFlow. This creates a dependency. -# dependency_node >> t -# -# # Script -# # N.B.: We used the code below in the beginning, but later we realized it would -# # not work. In Autosubmit, the FILE: $FILE is a file, relative to the AS -# # Project folder. The $FILE template script is then pre-processed to be -# # executed by AS. That is different from ecFlow, where the Task Script is -# # value is pre-processed (i.e. if the value is ``templates/local_script.sh`` -# # that value is treated as a string when included in the final job file) -# # to generate the ecFlow job file. So ecFlow pre-processes the ``.script`` -# # value, whereas Autosubmit loads the ``FILE`` script and pre-processes it. -# # -# # In order to have a similar behavior, we insert the contents of the AS -# # template script as the ecFlow task. That way, the template script (now -# # a Task Script in ecFlow) will be pre-processed by ecFlow. -# # -# # The variables may still need to be manually adjusted, but once that is -# # done, the script should then be ready to be executed (i.e. ported). -# # FIXME -# # t.script = job.file -# # with open(Path(as_conf.get_project_dir(), job.file)) as f: -# # t.script = f.read() -# t.script = 'sleep 5' -# -# return s - -def _create_aiida_workflow(): - pass - def generate(job_list: JobList, as_conf: AutosubmitConfig, options: List[str]) -> None: """Generates a PyFlow workflow using Autosubmit database. @@ -196,15 +72,15 @@ def generate(job_list: JobList, as_conf: AutosubmitConfig, options: List[str]) - """ # I Remove for now args usage, I am not sure if this is needed as server_host and output_dir can be referred from HPC_ARCH. But I see the general need that the engine needs extra information that cannot be inferred from an autosubmit config file # TODO args that are needed - # - aiida-profile: We need to store at least the scripts that are internally created here. In principle we could create them to an output path. Maybe this is not needed if we put this into the script - # - output-folder: the folder where to put the script + # - output-folder: the folder where to put the scripts # # TODO Questions: - # - Are in autosubmit the scripts always on the local machine? + # - Are in autosubmit the scripts always on the local machine and then copied over to remote? + # - How is the memory handled? # #args: argparse.Namespace = _parse_args(options) - # TODO will becom an output + # TODO will becom an argument output_path = Path("/home/alexgo/code/autosubmit/generated") @@ -215,6 +91,7 @@ from aiida_workgraph import WorkGraph from aiida.orm.utils.builders.computer import ComputerBuilder from aiida.common.exceptions import NotExistent from pathlib import Path +import yaml aiida.load_profile() @@ -227,82 +104,101 @@ tasks = {} for job in job_list.get_all(): unique_platforms[job.platform.name] = job.platform # TODO is name unique? - aiida_workflow_script_computer = """""" - for platform in unique_platforms: - - # TODO replace with ComputerBuilder since it has same syntax as in verdi - #try: - # computer = orm.load_computer(computer_kwargs["label"]) - #except NotExistent: - # computer_builder = ComputerBuilder(**computer_kwargs) - # computer = computer_builder.new() - # computer.store() + aiida_workflow_nodes_init = "" + for platform in unique_platforms.values(): + # TODO find configure #computer.set_minimum_job_poll_interval(0.0) #computer.configure() from autosubmit.platforms.locplatform import LocalPlatform from autosubmit.platforms.slurmplatform import SlurmPlatform - if isinstance(job.platform, LocalPlatform): - create_computer = f""" -try: - computer = orm.load_computer("{job.platform.name}") # The computer label can also be omitted here -except NotExistent: + import yaml + if isinstance(platform, LocalPlatform): + computer_setup = { + "label": f"{platform.name}", + "hostname": f"{platform.host}", + "work_dir": f"{platform.scratch}", + "mpiprocs_per_machine": platform.processors_per_node, + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "default_memory_per_machine": 0, # TODO not sure how aiida core.direct handles this + "description": "", + "transport": "core.local", + "scheduler": "core.direct", + "append_text": "", + "prepend_text": "", + "use_double_quotes": False, + "shebang": "#!/bin/bash", + } + elif isinstance(platform, SlurmPlatform): + computer_setup = { + "label": f"{platform.name}", + "hostname": f"{platform.host}", + "work_dir": f"{platform.scratch}", + "mpiprocs_per_machine": {platform.processors_per_node}, + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "default_memory_per_machine": 10000, # TODO not sure how autosubmit handles this + "description": "", + "transport": "core.ssh", + "scheduler": "core.slurm", + "append_text": "", + "prepend_text": "", + "use_double_quotes": False, + "shebang": "#!/bin/bash", + } + else: + raise ValueError(f"Platform type {platform} not supported for engine aiida") - computer = orm.Computer( - label = "{platform.name}" - hostname = "{platform.host}" - work_dir = "{platform.scratch}" - mpiprocs_per_machine = {platform.processors_per_node} # TODO used? - mpirun_command = "mpirun -np {{tot_num_mpiprocs}}" # TODO used? - description = "" - transport_type = "core.local" - scheduler_type = "core.direct" - shebang = "/bin/bash" - ) - """ - elif isinstance(job.platform, SlurmPlatform): - create_computer = f""" + computer_setup_path = Path(output_path / f"{platform.name}/{platform.name}-setup.yml") + computer_setup_path.parent.mkdir(exist_ok=True) + computer_setup_path.write_text(yaml.dump(computer_setup)) + create_computer = f""" try: - computer = orm.load_computer("{platform.name}") # The computer label can also be omitted here + computer = orm.load_computer("{platform.name}") except NotExistent: - computer = orm.Computer( - label = "{platform.name}" - hostname = "{platform.host}" - work_dir = "{platform.scratch}" - mpiprocs_per_machine = {platform.processors_per_node} - mpirun_command = "mpirun -np {{tot_num_mpiprocs}}" - description = "" - transport_type = "core.ssh" - scheduler_type = "core.slurm" - append_text = "" - prepend_text = "" - use_double_quotes = False, - shebang = "/bin/bash" - ) - """ - else: - raise ValueError(f"Platform type {job.platform} not supported for engine aiida") + setup_path = Path("{computer_setup_path}") + config_kwargs = yaml.safe_load(setup_path.read_text()) + computer = ComputerBuilder(**config_kwargs).new().store() + computer.set_minimum_job_poll_interval(0.0) + computer.configure() + """ # aiida bash code to run script - create_code = f""" + code_setup = { + "computer": f"{platform.name}", + "filepath_executable": "/bin/bash", + "label": "bash", + "description": '', + "default_calc_job_plugin": 'core.shell', + "prepend_text": '', + "append_text": '', + "use_double_quotes": False, + "with_mpi": False + } + + # "computer": f"{platform.name}", + # "filepath_executable": "/bin/bash", + # "label": "bash", + # "description": '', + # "default_calc_job_plugin": 'core.shell', + # "prepend_text": '', + # "append_text": '', + # "use_double_quotes": False, + # "with_mpi": False + + code_setup_path = Path(output_path / f"{platform.name}/bash@{platform.name}-setup.yml") + code_setup_path.parent.mkdir(exist_ok=True) + code_setup_path.write_text(yaml.dump(code_setup)) + create_code = f""" try: - bash_code = orm.load_code(f"bash@{{computer.label}}") + bash_code = orm.load_code("bash@{platform.name}") except NotExistent: - executable_path = Path("{trimmed_script_path}") - bash_code = orm.InstalledCode( - computer=computer, - filepath_executable="/bin/bash", - label="bash", - description='', - default_calc_job_plugin='core.shell', - prepend_text='', - append_text='', - use_double_quotes=False, - with_mpi=False - ).store() + setup_path = Path("{code_setup_path}") + setup_kwargs = yaml.safe_load(setup_path.read_text()) + setup_kwargs["computer"] = orm.load_computer(setup_kwargs["computer"]) + bash_code = orm.InstalledCode(**setup_kwargs).store() print(f"Created and stored bash@{{computer.label}}") """ - + aiida_workflow_nodes_init += create_computer + create_code # aiida tasks aiida_workflow_script_tasks = "" @@ -322,15 +218,17 @@ except NotExistent: create_task = f""" tasks["{job.name}"] = wg.add_task( "ShellJob", - name="{job.name}", - command=bash_code, - arguments=["{{script}}"], - nodes={{"script": orm.SinglefileData("{trimmed_script_path}")}} + name = "{job.name}", + command = f"bash@{job.platform.name}", + arguments = ["{{script}}"], + nodes = {{"script": orm.SinglefileData("{trimmed_script_path}")}} ) tasks["{job.name}"].set({{"metadata.computer": computer}}) -# TODO set default_memory_per_machine = {job.parameters['MEMORY']} - """ - aiida_workflow_script_tasks += create_computer + create_code + create_task +""" + if job.parameters['MEMORY'] != "": + create_task += f"""tasks["{job.name}"].set({{"metadata.options.max_memory_kb": {job.parameters['MEMORY']}}}) +""" + aiida_workflow_script_tasks += create_task aiida_workflow_script_deps = "\n" for edge in job_list.graph.edges: @@ -342,96 +240,9 @@ tasks["{edge[1]}"].waiting_on.add(tasks["{edge[0]}"])""" aiida_workflow_script_run = """ wg.submit()""" - aiida_workflow_script_text = aiida_workflow_script_init + aiida_workflow_script_tasks + aiida_workflow_script_deps + aiida_workflow_script_run + aiida_workflow_script_text = aiida_workflow_script_init + aiida_workflow_nodes_init + aiida_workflow_script_tasks + aiida_workflow_script_deps + aiida_workflow_script_run (output_path / "submit_aiida_workflow.py").write_text(aiida_workflow_script_text) - # # TODO: Is this important? Do we need it if we directly create the graph? - # #ecflow_node = _autosubmit_id_to_ecflow_id(job.long_name, job.running) - # #if job.split is not None and job.split > 0: - # # t = Task(f'{job.section}_{job.split}', SPLIT=job.split) - # #else: - # # t = Task(job.section) - - # # TODO I think this is ecflow specific and we can reomve it - # ## Find the direct parent of the task, based on the Autosubmit task ID. - # ## Start from the Suite, and skip the first (suite), and the last (task) - # ## as we know we can discard these. - # #parent_node = s - # #for node in ecflow_node.split('/')[1:-1]: - # # parent_node = parent_node[node] - # ## We just need to prevent adding a node twice since creating a task automatically adds - # ## it to the suite in the context. And simply call ``add_node`` and we should have it. - # ## TODO: use mapping.keys when Py>=3.8 or 3.9? if t.name not in list(parent_node.children.mapping.keys()): - # #if t.name not in [child.name for child in parent_node.children]: - # # parent_node.add_node(t) - - # ## Dependencies - # #for parent in job.parents: - # # dependency_node = _autosubmit_id_to_ecflow_id(parent.long_name, parent.running) - # # parent_node = s - # # for node in dependency_node.split('/')[1:-1]: - # # parent_node = parent_node[node] - # # parent_key = parent.section - # # if parent.split is not None and parent.split > 0: - # # parent_key = f'{parent_key}_{parent.split}' - # # dependency_node = parent_node[parent_key] - - # # # In case we ever need to use the pre-processed file. - # # # - # # # script_name = job.create_script(as_conf) - # # # script_text = open(Path(job._tmp_path, script_name)).read() - # # # # Let's drop the Autosubmit header and tailed. - # # # script_text = re.findall( - # # # r'# Autosubmit job(.*)# Autosubmit tailer', - # # # script_text, - # # # flags=re.DOTALL | re.MULTILINE)[0][1:-1] - # # # t.script = script_text - - # # # Operator overloaded in PyFlow. This creates a dependency. - # # dependency_node >> t - - # # Script - # # N.B.: We used the code below in the beginning, but later we realized it would - # # not work. In Autosubmit, the FILE: $FILE is a file, relative to the AS - # # Project folder. The $FILE template script is then pre-processed to be - # # executed by AS. That is different from ecFlow, where the Task Script is - # # value is pre-processed (i.e. if the value is ``templates/local_script.sh`` - # # that value is treated as a string when included in the final job file) - # # to generate the ecFlow job file. So ecFlow pre-processes the ``.script`` - # # value, whereas Autosubmit loads the ``FILE`` script and pre-processes it. - # # - # # In order to have a similar behavior, we insert the contents of the AS - # # template script as the ecFlow task. That way, the template script (now - # # a Task Script in ecFlow) will be pre-processed by ecFlow. - # # - # # The variables may still need to be manually adjusted, but once that is - # # done, the script should then be ready to be executed (i.e. ported). - # # FIXME - # # t.script = job.file - # # with open(Path(as_conf.get_project_dir(), job.file)) as f: - # # t.script = f.read() - # t.script = 'sleep 5' - - #suite = _create_ecflow_suite( - # experiment_id=expid, - # start_dates=start_dates, - # members=members, - # chunks=chunks, - # jobs=job_list.get_all(), - # #server_host=args.server, - # #output_dir=args.output, - # as_conf=as_conf - #) - - #suite.check_definition() - #if not args.quiet: - # print(suite) - - #if args.deploy: - # suite.deploy_suite(overwrite=True) # type: ignore - # suite.replace_on_server(host=args.server, port=args.port) - - __all__ = [ 'generate' ] -- GitLab From 13cde7e406a60de72009b82c05df2c7a59db60ca Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 30 Oct 2024 12:17:08 +0100 Subject: [PATCH 4/7] Checkpoint: generated script runs --- autosubmit/generators/aiida.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/autosubmit/generators/aiida.py b/autosubmit/generators/aiida.py index 5f465069f..3713a73a7 100644 --- a/autosubmit/generators/aiida.py +++ b/autosubmit/generators/aiida.py @@ -114,40 +114,45 @@ tasks = {} from autosubmit.platforms.slurmplatform import SlurmPlatform import yaml if isinstance(platform, LocalPlatform): + # This a bit nasty, because we use the Builder + # we need to specify these mpirun stuff computer_setup = { "label": f"{platform.name}", "hostname": f"{platform.host}", "work_dir": f"{platform.scratch}", - "mpiprocs_per_machine": platform.processors_per_node, - "mpirun_command": "mpirun -np {tot_num_mpiprocs}", - "default_memory_per_machine": 0, # TODO not sure how aiida core.direct handles this "description": "", "transport": "core.local", "scheduler": "core.direct", + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "mpiprocs_per_machine": 1, # TODO not sure how aiida core.direct handles this + "default_memory_per_machine": None, "append_text": "", "prepend_text": "", "use_double_quotes": False, "shebang": "#!/bin/bash", } elif isinstance(platform, SlurmPlatform): + if platform.processors_per_node is None: + raise ValueError("") computer_setup = { "label": f"{platform.name}", "hostname": f"{platform.host}", "work_dir": f"{platform.scratch}", - "mpiprocs_per_machine": {platform.processors_per_node}, - "mpirun_command": "mpirun -np {tot_num_mpiprocs}", - "default_memory_per_machine": 10000, # TODO not sure how autosubmit handles this "description": "", "transport": "core.ssh", "scheduler": "core.slurm", + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "mpiprocs_per_machine": platform.processors_per_node, + "default_memory_per_machine": 10000, # TODO not sure how autosubmit handles this "append_text": "", "prepend_text": "", "use_double_quotes": False, "shebang": "#!/bin/bash", } else: - raise ValueError(f"Platform type {platform} not supported for engine aiida") + raise ValueError(f"Platform type {platform} not supported for engine aiida.") + #num_machines`, `num_mpiprocs_per_machine` or `tot_num_mpiprocs computer_setup_path = Path(output_path / f"{platform.name}/{platform.name}-setup.yml") computer_setup_path.parent.mkdir(exist_ok=True) computer_setup_path.write_text(yaml.dump(computer_setup)) @@ -158,6 +163,7 @@ except NotExistent: setup_path = Path("{computer_setup_path}") config_kwargs = yaml.safe_load(setup_path.read_text()) computer = ComputerBuilder(**config_kwargs).new().store() + computer.configure(safe_interval=0.0) computer.set_minimum_job_poll_interval(0.0) computer.configure() """ @@ -219,11 +225,10 @@ except NotExistent: tasks["{job.name}"] = wg.add_task( "ShellJob", name = "{job.name}", - command = f"bash@{job.platform.name}", + command = orm.load_code("bash@{job.platform.name}"), arguments = ["{{script}}"], nodes = {{"script": orm.SinglefileData("{trimmed_script_path}")}} ) -tasks["{job.name}"].set({{"metadata.computer": computer}}) """ if job.parameters['MEMORY'] != "": create_task += f"""tasks["{job.name}"].set({{"metadata.options.max_memory_kb": {job.parameters['MEMORY']}}}) @@ -239,7 +244,7 @@ tasks["{edge[1]}"].waiting_on.add(tasks["{edge[0]}"])""" aiida_workflow_script_run = """ -wg.submit()""" +wg.run()""" aiida_workflow_script_text = aiida_workflow_script_init + aiida_workflow_nodes_init + aiida_workflow_script_tasks + aiida_workflow_script_deps + aiida_workflow_script_run (output_path / "submit_aiida_workflow.py").write_text(aiida_workflow_script_text) -- GitLab From 94591de6a7845a443481cb81687a3ff29631513b Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 30 Oct 2024 20:54:25 +0100 Subject: [PATCH 5/7] Checkpoint: refactor usage through CLI and modularize code --- autosubmit/autosubmit.py | 30 +-- autosubmit/generators/__init__.py | 38 ++- autosubmit/generators/aiida.py | 424 ++++++++++++++++-------------- 3 files changed, 268 insertions(+), 224 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 12661f3e4..34b951539 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -687,19 +687,14 @@ class Autosubmit: # Generate subparser = subparsers.add_parser( - 'generate', description='Generate a workflow definition for a different Workflow Manager', + 'generate', description='Generate a workflow definition for a different workflow engine', argument_default=argparse.SUPPRESS) subparser.add_argument('expid', help='experiment identifier') - subparser.add_argument('-e', '--engine', type=str.lower, - help='The target Workflow Manager engine', choices=[engine.value for engine in Engine]) - #subparser.add_argument('args', nargs='?') - - # Needed? We need to expose subcommands for each engine - #if len(sys.argv) > 1 and len(sys.argv[1]) > 1 and sys.argv[1] in ['generate']: - # args, options = parser.parse_known_args() - #else: - # options = [] - # args = parser.parse_args() + subsubparser = subparser.add_subparsers(title="engines", dest='engine', required=True, description='Workflow engine identifier') + for engine in Engine: + generator_class = get_engine_generator(engine) + parser_engine = subsubparser.add_parser(engine.value, help=f"{generator_class.get_engine_name()}") + generator_class.add_parse_args(parser_engine) args, unknown = parser.parse_known_args() if args.version: @@ -808,7 +803,7 @@ class Autosubmit: elif args.command == 'stop': return Autosubmit.stop(args.expid, args.force, args.all, args.force_all, args.cancel, args.filter_status, args.target) elif args.command == 'generate': - return Autosubmit.generate_workflow(args.expid, Engine[args.engine], options=[]) + return Autosubmit.generate_workflow(args.expid, Engine[args.engine], args) @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): @@ -6125,10 +6120,11 @@ class Autosubmit: @staticmethod - def generate_workflow(expid: str, engine: Engine, options: List[str]) -> None: + def generate_workflow(expid: str, engine: Engine, args: argparse.Namespace) -> None: """Generate the workflow configuration for a different backend engine.""" Log.info(f'Generate workflow configuration for {engine}') + # TODO check the code below, if it makes sense, this I have not touched from original MR try: Log.info("Getting job list...") as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) @@ -6161,5 +6157,9 @@ class Autosubmit: raise AutosubmitCritical("Error while checking the configuration files or loading the job_list", 7040, str(e)) - get_engine_generator(engine)(job_list, as_conf, [f'--experiment={expid}', *options]) - + generator_class = get_engine_generator(engine) + parser = argparse.ArgumentParser() + generator_class.add_parse_args(parser) + generator_input_keys = vars(parser.parse_args('')).keys() + generator_kwargs = {key: args.__getattribute__(key) for key in generator_input_keys} + generator_class.generate(job_list, as_conf, **generator_kwargs) diff --git a/autosubmit/generators/__init__.py b/autosubmit/generators/__init__.py index 25ae600e1..96ce125c4 100644 --- a/autosubmit/generators/__init__.py +++ b/autosubmit/generators/__init__.py @@ -1,29 +1,47 @@ - from enum import Enum from importlib import import_module -from typing import Callable, cast +from typing import AbstractSet, Callable, cast +from abc import ABC, abstractmethod + """This module provides generators to produce workflow configurations for different backend engines.""" class Engine(Enum): """Workflow Manager engine flavors.""" aiida = 'aiida' + pyflow = 'pyflow' + fireworks = 'fireworks' def __str__(self): return self.value -# TODO: use typing.Protocol instead of object when Py>=3.8 -class GenerateProto: - """Need a protocol to define the type returned by importlib.""" - generate: Callable +# TODO COMMENT: we can make this alse a protocol, but I don't see the reason here for that +class AbstractGenerator(ABC): + """Generator of workflow for an engine.""" + + @staticmethod + @abstractmethod + def get_engine_name() -> str: + """The engine name used for the help text.""" + raise NotImplementedError + + @staticmethod + @abstractmethod + def add_parse_args(parser) -> None: + """Adds arguments to the parser that are needed for a specific engine implementation.""" + raise NotImplementedError + # TODO COMMENT: This could be also a __init__ plus method, but I thought one method is easier, then the implementation can do whatever + @classmethod + @abstractmethod + def generate(cls, job_list, as_conf, **arg_options) -> None: + """Generates the workflow from the created autosubmit workflow.""" + raise NotImplementedError -def get_engine_generator(engine: Engine) -> Callable: - """Dynamically loads the engine generate function.""" - generator_function = cast(GenerateProto, import_module(f'autosubmit.generators.{engine.value}')) - return generator_function.generate +def get_engine_generator(engine: Engine) -> AbstractGenerator: + return import_module(f'autosubmit.generators.{engine.value}').Generator __all__ = [ 'Engine', diff --git a/autosubmit/generators/aiida.py b/autosubmit/generators/aiida.py index 3713a73a7..c925efc9d 100644 --- a/autosubmit/generators/aiida.py +++ b/autosubmit/generators/aiida.py @@ -1,91 +1,124 @@ -import argparse -import tempfile -from enum import Enum from pathlib import Path -from typing import List +from functools import cached_property +import warnings +import re +import yaml from autosubmitconfigparser.config.configcommon import AutosubmitConfig -from autosubmitconfigparser.config.basicconfig import BasicConfig - -from autosubmit.job.job_list import JobList, Job -import aiida +from autosubmit.job.job_list import JobList +from autosubmit.generators import AbstractGenerator +from autosubmit.platforms.platform import Platform -"""The PyFlow generator for Autosubmit.""" +"""The AiiDA generator for Autosubmit.""" # Autosubmit Task name separator (not to be confused with task and chunk name separator). DEFAULT_SEPARATOR = '_' -class Running(Enum): - """The Running level of an Autosubmit task.""" - ONCE = 'once' - MEMBER = 'member' - CHUNK = 'chunk' - - def __str__(self): - return self.value - - -# Defines how many ``-``'s are replaced by a ``/`` for -# each Autosubmit hierarchy level (to avoid using an if/elif/else). -REPLACE_COUNT = { - Running.ONCE.value: 1, - Running.MEMBER.value: 3, - Running.CHUNK.value: 4 -} - - -def _autosubmit_id_to_ecflow_id(job_id: str, running: str): - """Given an Autosubmit ID, create the node ID for ecFlow (minus heading ``/``).""" - replace_count = REPLACE_COUNT[running] - return job_id.replace(DEFAULT_SEPARATOR, '/', replace_count) +class Generator(AbstractGenerator): + """Generates an aiida workflow script that initializes all required AiiDA resources. + The generated file is structures as the following: + * imports: All required imports + * init: Initialization of all python resources that need to be instantiated once for the whole script + * create_orm_nodes: Creation of all AiiDA's object-relational mapping (ORM) nodes covering the creation of computer and codes + * workgraph_tasks: Creation of the AiiDA-WorkGraph tasks + * workgraph_deps: Linking of the dependencies between the AiiDA-WorkGraph tasks + * workgraph_submission: Submission of the AiiDA-WorkGraph + """ + # TODO EXECUTABLE, WALLCLOCK, PROCESSORS are not supported ATM + SUPPORTED_JOB_KEYWORDS = ["WALLCLOCK", "PROCESSORS", "MEMORY", "PLATFORM", # these we need to transfer to aiida + "DEPENDENCIES", "FILE", "RUNNING"] # these are resolved by autosubmit internally + # TODO QUEUE, USER, MAX_WALLCLOCK, QUEUE, MAX_PROCESSORS are not supported at the moment + SUPPORTED_PLATFORM_KEYWORDS = ["TYPE", "HOST", "USER", "SCRATCH_DIR", "MAX_WALLCLOCK", "QUEUE", "MAX_PROCESSORS", # these we need to transfer to aiida + "PROJECT"] # these are resolved by autosubmit internally + + def __init__(self, job_list: JobList, as_conf: AutosubmitConfig, output: str): + # TODO think about dichotomy of output vs output_path + self._output_path = Path(output).absolute() + if not self._output_path.exists(): + raise ValueError(f"Given `output` {self._output_path} directory does not exist.") + self._job_list = job_list + self._as_conf = as_conf + + @classmethod + def generate(cls, job_list: JobList, as_conf: AutosubmitConfig, output: str) -> None: + # I Remove for now args usage, I am not sure if this is needed as server_host and output_dir can be referred from HPC_ARCH. But I see the general need that the engine needs extra information that cannot be inferred from an autosubmit config file + # TODO args that are needed + # - output-folder: the folder where to put the scripts + # + # TODO Questions: + # - Are in autosubmit the scripts always on the local machine and then copied over to remote? + # - How is the memory handled? + # + #args: argparse.Namespace = _parse_args(options) + + # TODO will become an argument + self = cls(job_list, as_conf, output) + self._validate() + workflow_script = self._generate_workflow_script() + (self._output_path / "submit_aiida_workflow.py").write_text(workflow_script) + + + @staticmethod + def get_engine_name() -> str: + return "AiiDA" + + @staticmethod + def add_parse_args(parser) -> None: + parser.add_argument('-o', '--output', dest="output", default=".", help='Output directory') + + def _validate(self) -> None: + ## validate jobs + for job_name, job_conf in self._as_conf.starter_conf['JOBS'].items(): + for key in job_conf.keys(): + if key not in Generator.SUPPORTED_JOB_KEYWORDS: + msg = f"Found in job {job_name} configuration file key {key} that is not supported for AiiDA generator." + warnings.warn(msg) + ## validate platforms + for platform_name, platform_conf in self._as_conf.starter_conf['PLATFORMS'].items(): + # only validate the platforms that exist in jobs + if platform_name in self._platforms_used_in_job.keys(): + for key in platform_conf.keys(): + if key not in Generator.SUPPORTED_PLATFORM_KEYWORDS: + msg = f"Found in platform {platform_name} configuration file key {key} that is not supported for AiiDA generator." + warnings.warn(msg) + + + @cached_property + def _platforms_used_in_job(self) -> dict[str, Platform]: + """""" + platforms_used_in_jobs = {} + for job in self._job_list.get_all(): + platforms_used_in_jobs[job.platform.name] = job.platform + return platforms_used_in_jobs + + def _generate_workflow_script(self) -> str: + """Generates a PyFlow workflow using Autosubmit database. + + The ``autosubmit create`` command must have been already executed prior + to calling this function. This is so that the jobs are correctly loaded + to produce the PyFlow workflow. + + :param job_list: ``JobList`` Autosubmit object, that contains the parameters, jobs, and graph + :param as_conf: Autosubmit configuration + :param options: a list of strings with arguments (equivalent to sys.argv), passed to argparse + """ -def _parse_args(args) -> argparse.Namespace: - parser = argparse.ArgumentParser( - prog='autosubmit generate ... engine=pyflow', - description='Produces a valid PyFlow workflow configuration given an Autosubmit experiment ID', - epilog='This program needs access to an Autosubmit installation' - ) - parser.add_argument('-e', '--experiment', required=True, help='Autosubmit experiment ID') - parser.add_argument('-d', '--deploy', default=False, action='store_true', help='Deploy to ecFlow or not') - parser.add_argument('-o', '--output', default=tempfile.gettempdir(), help='Output directory') - parser.add_argument('-s', '--server', default='localhost', - help='ecFlow server hostname or IP (only used if deploy=True)') - parser.add_argument('-p', '--port', default=3141, help='ecFlow server port (only used if deploy=True)') - parser.add_argument('-g', '--graph', default=False, action='store_true', help='Print the DOT plot') - parser.add_argument('-q', '--quiet', default=False, action='store_true') - - return parser.parse_args(args) + imports = self._generate_imports_section() + init = self._generate_init_section() + create_orm_nodes = self._generate_create_orm_nodes_section() + workgraph_tasks = self._generate_workgraph_tasks_section() + workgraph_deps = self._generate_workgraph_deps_section() + workgraph_submission = self._generate_workgraph_submission_section() + return imports + init + create_orm_nodes + workgraph_tasks + workgraph_deps + workgraph_submission -def generate(job_list: JobList, as_conf: AutosubmitConfig, options: List[str]) -> None: - """Generates a PyFlow workflow using Autosubmit database. - The ``autosubmit create`` command must have been already executed prior - to calling this function. This is so that the jobs are correctly loaded - to produce the PyFlow workflow. - :param job_list: ``JobList`` Autosubmit object, that contains the parameters, jobs, and graph - :param as_conf: Autosubmit configuration - :param options: a list of strings with arguments (equivalent to sys.argv), passed to argparse - """ - # I Remove for now args usage, I am not sure if this is needed as server_host and output_dir can be referred from HPC_ARCH. But I see the general need that the engine needs extra information that cannot be inferred from an autosubmit config file - # TODO args that are needed - # - output-folder: the folder where to put the scripts - # - # TODO Questions: - # - Are in autosubmit the scripts always on the local machine and then copied over to remote? - # - How is the memory handled? - # - #args: argparse.Namespace = _parse_args(options) - - # TODO will becom an argument - output_path = Path("/home/alexgo/code/autosubmit/generated") - - - aiida_workflow_script_init = \ - """import aiida + def _generate_imports_section(self) -> str: + return """# IMPORTS +import aiida from aiida import orm from aiida_workgraph import WorkGraph from aiida.orm.utils.builders.computer import ComputerBuilder @@ -93,70 +126,71 @@ from aiida.common.exceptions import NotExistent from pathlib import Path import yaml -aiida.load_profile() +""" + def _generate_init_section(self) -> str: + return """# INIT +aiida.load_profile() wg = WorkGraph() tasks = {} - """ - # aiida computer - unique_platforms = {} - for job in job_list.get_all(): - unique_platforms[job.platform.name] = job.platform # TODO is name unique? - - aiida_workflow_nodes_init = "" - for platform in unique_platforms.values(): - - # TODO find configure - #computer.set_minimum_job_poll_interval(0.0) - #computer.configure() - from autosubmit.platforms.locplatform import LocalPlatform - from autosubmit.platforms.slurmplatform import SlurmPlatform - import yaml - if isinstance(platform, LocalPlatform): - # This a bit nasty, because we use the Builder - # we need to specify these mpirun stuff - computer_setup = { - "label": f"{platform.name}", - "hostname": f"{platform.host}", - "work_dir": f"{platform.scratch}", - "description": "", - "transport": "core.local", - "scheduler": "core.direct", - "mpirun_command": "mpirun -np {tot_num_mpiprocs}", - "mpiprocs_per_machine": 1, # TODO not sure how aiida core.direct handles this - "default_memory_per_machine": None, - "append_text": "", - "prepend_text": "", - "use_double_quotes": False, - "shebang": "#!/bin/bash", - } - elif isinstance(platform, SlurmPlatform): - if platform.processors_per_node is None: - raise ValueError("") - computer_setup = { - "label": f"{platform.name}", - "hostname": f"{platform.host}", - "work_dir": f"{platform.scratch}", - "description": "", - "transport": "core.ssh", - "scheduler": "core.slurm", - "mpirun_command": "mpirun -np {tot_num_mpiprocs}", - "mpiprocs_per_machine": platform.processors_per_node, - "default_memory_per_machine": 10000, # TODO not sure how autosubmit handles this - "append_text": "", - "prepend_text": "", - "use_double_quotes": False, - "shebang": "#!/bin/bash", - } - else: - raise ValueError(f"Platform type {platform} not supported for engine aiida.") - - #num_machines`, `num_mpiprocs_per_machine` or `tot_num_mpiprocs - computer_setup_path = Path(output_path / f"{platform.name}/{platform.name}-setup.yml") - computer_setup_path.parent.mkdir(exist_ok=True) - computer_setup_path.write_text(yaml.dump(computer_setup)) - create_computer = f""" +""" + + def _generate_create_orm_nodes_section(self) -> str: + # aiida computer + + code_section = "# CREATE_ORM_NODES" + for platform in self._platforms_used_in_job.values(): + + # TODO find configure + #computer.set_minimum_job_poll_interval(0.0) + #computer.configure() + from autosubmit.platforms.locplatform import LocalPlatform + from autosubmit.platforms.slurmplatform import SlurmPlatform + if isinstance(platform, LocalPlatform): + # This a bit nasty, because we use the Builder + # we need to specify these mpirun stuff + computer_setup = { + "label": f"{platform.name}", + "hostname": f"{platform.host}", + "work_dir": f"{platform.scratch}", + "description": "", + "transport": "core.local", + "scheduler": "core.direct", + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "mpiprocs_per_machine": 1, # TODO not sure how aiida core.direct handles this + "default_memory_per_machine": None, + "append_text": "", + "prepend_text": "", + "use_double_quotes": False, + "shebang": "#!/bin/bash", + } + elif isinstance(platform, SlurmPlatform): + if platform.processors_per_node is None: + raise ValueError("") + computer_setup = { + "label": f"{platform.name}", + "hostname": f"{platform.host}", + "work_dir": f"{platform.scratch}", + "description": "", + "transport": "core.ssh", + "scheduler": "core.slurm", + "mpirun_command": "mpirun -np {tot_num_mpiprocs}", + "mpiprocs_per_machine": platform.processors_per_node, + "default_memory_per_machine": 10000, # TODO not sure how autosubmit handles this + "append_text": "", + "prepend_text": "", + "use_double_quotes": False, + "shebang": "#!/bin/bash", + } + else: + raise ValueError(f"Platform type {platform} not supported for engine aiida.") + + #num_machines`, `num_mpiprocs_per_machine` or `tot_num_mpiprocs + computer_setup_path = Path(self._output_path / f"{platform.name}/{platform.name}-setup.yml") + computer_setup_path.parent.mkdir(exist_ok=True) + computer_setup_path.write_text(yaml.dump(computer_setup)) + create_computer = f""" try: computer = orm.load_computer("{platform.name}") except NotExistent: @@ -165,36 +199,25 @@ except NotExistent: computer = ComputerBuilder(**config_kwargs).new().store() computer.configure(safe_interval=0.0) computer.set_minimum_job_poll_interval(0.0) - computer.configure() - """ + computer.configure()""" + + # aiida bash code to run script + code_setup = { + "computer": f"{platform.name}", + "filepath_executable": "/bin/bash", + "label": "bash", + "description": '', + "default_calc_job_plugin": 'core.shell', + "prepend_text": '', + "append_text": '', + "use_double_quotes": False, + "with_mpi": False + } - # aiida bash code to run script - code_setup = { - "computer": f"{platform.name}", - "filepath_executable": "/bin/bash", - "label": "bash", - "description": '', - "default_calc_job_plugin": 'core.shell', - "prepend_text": '', - "append_text": '', - "use_double_quotes": False, - "with_mpi": False - } - - # "computer": f"{platform.name}", - # "filepath_executable": "/bin/bash", - # "label": "bash", - # "description": '', - # "default_calc_job_plugin": 'core.shell', - # "prepend_text": '', - # "append_text": '', - # "use_double_quotes": False, - # "with_mpi": False - - code_setup_path = Path(output_path / f"{platform.name}/bash@{platform.name}-setup.yml") - code_setup_path.parent.mkdir(exist_ok=True) - code_setup_path.write_text(yaml.dump(code_setup)) - create_code = f""" + code_setup_path = Path(self._output_path / f"{platform.name}/bash@{platform.name}-setup.yml") + code_setup_path.parent.mkdir(exist_ok=True) + code_setup_path.write_text(yaml.dump(code_setup)) + create_code = f""" try: bash_code = orm.load_code("bash@{platform.name}") except NotExistent: @@ -202,52 +225,55 @@ except NotExistent: setup_kwargs = yaml.safe_load(setup_path.read_text()) setup_kwargs["computer"] = orm.load_computer(setup_kwargs["computer"]) bash_code = orm.InstalledCode(**setup_kwargs).store() - print(f"Created and stored bash@{{computer.label}}") - """ - aiida_workflow_nodes_init += create_computer + create_code - - # aiida tasks - aiida_workflow_script_tasks = "" - for job in job_list.get_all(): - script_name = job.create_script(as_conf) - script_path = Path(job._tmp_path, script_name) - script_text = open(script_path).read() - # Let's drop the Autosubmit header and tailed. - import re - trimmed_script_text = re.findall( - r'# Autosubmit job(.*)# Autosubmit tailer', - script_text, - flags=re.DOTALL | re.MULTILINE)[0][1:-1] - trimmed_script_path = output_path / script_name - with open(trimmed_script_path, mode="w") as trimmed_script_file: - trimmed_script_file.write(trimmed_script_text) - create_task = f""" + print(f"Created and stored bash@{{computer.label}}")""" + code_section += create_computer + create_code + code_section += "\n\n" + return code_section + + def _generate_workgraph_tasks_section(self): + code_section = "# WORKGRAPH_TASKS" + + for job in self._job_list.get_all(): + script_name = job.create_script(self._as_conf) + script_path = Path(job._tmp_path, script_name) + script_text = open(script_path).read() + # Let's drop the Autosubmit header and tailed. + trimmed_script_text = re.findall( + r'# Autosubmit job(.*)# Autosubmit tailer', + script_text, + flags=re.DOTALL | re.MULTILINE)[0][1:-1] + trimmed_script_path = self._output_path / script_name + trimmed_script_path.write_text(trimmed_script_text) + create_task = f""" tasks["{job.name}"] = wg.add_task( "ShellJob", name = "{job.name}", command = orm.load_code("bash@{job.platform.name}"), arguments = ["{{script}}"], nodes = {{"script": orm.SinglefileData("{trimmed_script_path}")}} -) -""" - if job.parameters['MEMORY'] != "": - create_task += f"""tasks["{job.name}"].set({{"metadata.options.max_memory_kb": {job.parameters['MEMORY']}}}) -""" - aiida_workflow_script_tasks += create_task - - aiida_workflow_script_deps = "\n" - for edge in job_list.graph.edges: - add_edges = f""" +)""" + if job.parameters['MEMORY'] != "": + create_task += f""" +tasks["{job.name}"].set({{"metadata.options.max_memory_kb": {job.parameters['MEMORY']}}})""" + code_section += create_task + code_section += "\n\n" + return code_section + + def _generate_workgraph_deps_section(self): + code_section = "# WORKGRAPH_DEPS" + for edge in self._job_list.graph.edges: + code_section += f""" tasks["{edge[1]}"].waiting_on.add(tasks["{edge[0]}"])""" - aiida_workflow_script_deps += add_edges - aiida_workflow_script_deps = "\n" + code_section += "\n\n" + return code_section - aiida_workflow_script_run = """ -wg.run()""" - aiida_workflow_script_text = aiida_workflow_script_init + aiida_workflow_nodes_init + aiida_workflow_script_tasks + aiida_workflow_script_deps + aiida_workflow_script_run - (output_path / "submit_aiida_workflow.py").write_text(aiida_workflow_script_text) + def _generate_workgraph_submission_section(self): + return """# WORKGRAPH_SUBMISSION +wg.run()""" # TODO change to submit + __all__ = [ - 'generate' + 'Generator' + ] -- GitLab From e1a26de894eb96c3bbf3575eb1a9781c475a34a5 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Tue, 5 Nov 2024 07:22:51 +0100 Subject: [PATCH 6/7] Various fixes integrating the job and platform metadata * Remove testing purpose further engines * Add dictionary for translating the platform/computer metadata between autosubmit and aiida * resolve dichotomy for output_path * add readme in the generation of the code * Fix usage for some job.paramaters * add usage of wallclocktime --- autosubmit/generators/__init__.py | 2 - autosubmit/generators/aiida.py | 158 +++++++++++++++------- autosubmit/platforms/paramiko_platform.py | 4 +- 3 files changed, 113 insertions(+), 51 deletions(-) diff --git a/autosubmit/generators/__init__.py b/autosubmit/generators/__init__.py index 96ce125c4..1878d74b9 100644 --- a/autosubmit/generators/__init__.py +++ b/autosubmit/generators/__init__.py @@ -9,8 +9,6 @@ from abc import ABC, abstractmethod class Engine(Enum): """Workflow Manager engine flavors.""" aiida = 'aiida' - pyflow = 'pyflow' - fireworks = 'fireworks' def __str__(self): return self.value diff --git a/autosubmit/generators/aiida.py b/autosubmit/generators/aiida.py index c925efc9d..2af90e596 100644 --- a/autosubmit/generators/aiida.py +++ b/autosubmit/generators/aiida.py @@ -9,6 +9,7 @@ from autosubmitconfigparser.config.configcommon import AutosubmitConfig from autosubmit.job.job_list import JobList from autosubmit.generators import AbstractGenerator from autosubmit.platforms.platform import Platform +from autosubmit.platforms.paramiko_platform import ParamikoPlatform """The AiiDA generator for Autosubmit.""" @@ -20,6 +21,7 @@ class Generator(AbstractGenerator): """Generates an aiida workflow script that initializes all required AiiDA resources. The generated file is structures as the following: + * header: Information about the generation of the file * imports: All required imports * init: Initialization of all python resources that need to be instantiated once for the whole script * create_orm_nodes: Creation of all AiiDA's object-relational mapping (ORM) nodes covering the creation of computer and codes @@ -27,47 +29,60 @@ class Generator(AbstractGenerator): * workgraph_deps: Linking of the dependencies between the AiiDA-WorkGraph tasks * workgraph_submission: Submission of the AiiDA-WorkGraph """ - # TODO EXECUTABLE, WALLCLOCK, PROCESSORS are not supported ATM - SUPPORTED_JOB_KEYWORDS = ["WALLCLOCK", "PROCESSORS", "MEMORY", "PLATFORM", # these we need to transfer to aiida + # AiiDA Slurm options + # --cpus-per-task -> num_cores_per_mpiproc + # --ntasks-per-node -> num_mpiprocs_per_machine + # --nodes -> num_machines + # --mem -> max_memory_kb (with some translation) + # --qos -> qos + # --partition -> queue_name + # + # Autosubmit Slurm options + # --cpus-per-task -> NUMTHREADS (job) + # --ntasks-per-node -> TASKS (job) + # --nodes -> NODES (job) + # --mem -> MEMORY (job) + # --qos -> CURRENT_QUEUE (job) + # --partition -> PARTITION (job) + # + # TODO investigate platform parameters how they interact with job + # Can the CURRENT_* parameters be specified by the user? + # + # + # TODO EXECUTABLE are not supported ATM + SUPPORTED_JOB_KEYWORDS = ["NUMTHREADS", "TASKS", "NODES", "CURRENT_QUEUE", "WALLCLOCK", "MEMORY", "PLATFORM", # these we need to transfer to aiida "DEPENDENCIES", "FILE", "RUNNING"] # these are resolved by autosubmit internally - # TODO QUEUE, USER, MAX_WALLCLOCK, QUEUE, MAX_PROCESSORS are not supported at the moment - SUPPORTED_PLATFORM_KEYWORDS = ["TYPE", "HOST", "USER", "SCRATCH_DIR", "MAX_WALLCLOCK", "QUEUE", "MAX_PROCESSORS", # these we need to transfer to aiida + # parameters, because in AiiDA we do not support them as computer + # parameters so we need to overwrite them in the tasks + + # NOTE we cannot speciy MAX_WALLCLOCK in the platform in AiiDA so we need to overwrite it in the task + SUPPORTED_PLATFORM_KEYWORDS = ["TYPE", "HOST", "USER", "QUEUE", "SCRATCH_DIR", "MAX_WALLCLOCK", "MAX_PROCESSORS", # these we need to transfer to aiida "PROJECT"] # these are resolved by autosubmit internally - def __init__(self, job_list: JobList, as_conf: AutosubmitConfig, output: str): - # TODO think about dichotomy of output vs output_path - self._output_path = Path(output).absolute() - if not self._output_path.exists(): - raise ValueError(f"Given `output` {self._output_path} directory does not exist.") + def __init__(self, job_list: JobList, as_conf: AutosubmitConfig, output_dir: str): + if not (output_path := Path(output_dir)).exists(): + raise ValueError(f"Given `output_dir` {output_path} does not exist.") + self._output_path = output_path.absolute() self._job_list = job_list self._as_conf = as_conf @classmethod - def generate(cls, job_list: JobList, as_conf: AutosubmitConfig, output: str) -> None: - # I Remove for now args usage, I am not sure if this is needed as server_host and output_dir can be referred from HPC_ARCH. But I see the general need that the engine needs extra information that cannot be inferred from an autosubmit config file - # TODO args that are needed - # - output-folder: the folder where to put the scripts - # + def generate(cls, job_list: JobList, as_conf: AutosubmitConfig, output_dir: str) -> None: # TODO Questions: - # - Are in autosubmit the scripts always on the local machine and then copied over to remote? - # - How is the memory handled? - # - #args: argparse.Namespace = _parse_args(options) - - # TODO will become an argument - self = cls(job_list, as_conf, output) + # - Are in autosubmit the scripts always on the local machine and then copied over to remote? what is with EXECUTABLE? + self = cls(job_list, as_conf, output_dir) self._validate() workflow_script = self._generate_workflow_script() (self._output_path / "submit_aiida_workflow.py").write_text(workflow_script) - - + (self._output_path / "README.md").write_text(self._generate_readme()) + @staticmethod def get_engine_name() -> str: return "AiiDA" @staticmethod def add_parse_args(parser) -> None: - parser.add_argument('-o', '--output', dest="output", default=".", help='Output directory') + parser.add_argument('-o', '--output_dir', dest="output_dir", default=".", help='Output directory') def _validate(self) -> None: ## validate jobs @@ -78,7 +93,7 @@ class Generator(AbstractGenerator): warnings.warn(msg) ## validate platforms for platform_name, platform_conf in self._as_conf.starter_conf['PLATFORMS'].items(): - # only validate the platforms that exist in jobs + # only validate platforms that are used in jobs if platform_name in self._platforms_used_in_job.keys(): for key in platform_conf.keys(): if key not in Generator.SUPPORTED_PLATFORM_KEYWORDS: @@ -88,7 +103,6 @@ class Generator(AbstractGenerator): @cached_property def _platforms_used_in_job(self) -> dict[str, Platform]: - """""" platforms_used_in_jobs = {} for job in self._job_list.get_all(): platforms_used_in_jobs[job.platform.name] = job.platform @@ -105,16 +119,23 @@ class Generator(AbstractGenerator): :param as_conf: Autosubmit configuration :param options: a list of strings with arguments (equivalent to sys.argv), passed to argparse """ - + header = self._generate_header_section() imports = self._generate_imports_section() init = self._generate_init_section() create_orm_nodes = self._generate_create_orm_nodes_section() workgraph_tasks = self._generate_workgraph_tasks_section() workgraph_deps = self._generate_workgraph_deps_section() workgraph_submission = self._generate_workgraph_submission_section() - return imports + init + create_orm_nodes + workgraph_tasks + workgraph_deps + workgraph_submission + return header + imports + init + create_orm_nodes + workgraph_tasks + workgraph_deps + workgraph_submission + + def _generate_header_section(self) -> str: + return f"""# HEADER +# This is in autogenerated file from {self._job_list.expid} +# The computer and codes are defined for the following platforms: +# {list(self._platforms_used_in_job.keys())} +""" def _generate_imports_section(self) -> str: return """# IMPORTS @@ -142,14 +163,9 @@ tasks = {} code_section = "# CREATE_ORM_NODES" for platform in self._platforms_used_in_job.values(): - # TODO find configure - #computer.set_minimum_job_poll_interval(0.0) - #computer.configure() from autosubmit.platforms.locplatform import LocalPlatform from autosubmit.platforms.slurmplatform import SlurmPlatform if isinstance(platform, LocalPlatform): - # This a bit nasty, because we use the Builder - # we need to specify these mpirun stuff computer_setup = { "label": f"{platform.name}", "hostname": f"{platform.host}", @@ -158,7 +174,7 @@ tasks = {} "transport": "core.local", "scheduler": "core.direct", "mpirun_command": "mpirun -np {tot_num_mpiprocs}", - "mpiprocs_per_machine": 1, # TODO not sure how aiida core.direct handles this + "mpiprocs_per_machine": 1, "default_memory_per_machine": None, "append_text": "", "prepend_text": "", @@ -166,27 +182,32 @@ tasks = {} "shebang": "#!/bin/bash", } elif isinstance(platform, SlurmPlatform): - if platform.processors_per_node is None: - raise ValueError("") + # TODO Do we need to handle invalid parameters or is this done by autosubmit + #if platform.processors_per_node is None: + # raise ValueError("") + #if platform.max_wallclock is None: + # raise ValueError("") computer_setup = { "label": f"{platform.name}", "hostname": f"{platform.host}", "work_dir": f"{platform.scratch}", + "user": f"{platform.user}", "description": "", "transport": "core.ssh", "scheduler": "core.slurm", "mpirun_command": "mpirun -np {tot_num_mpiprocs}", "mpiprocs_per_machine": platform.processors_per_node, - "default_memory_per_machine": 10000, # TODO not sure how autosubmit handles this - "append_text": "", - "prepend_text": "", + "default_memory_per_machine": None, # This is specified in the task option + "append_text": "", # TODO is this specified by EXTENDED_HEADER_PATH? + "prepend_text": "", # TODO is this specified by EXTENDED_TAILER_PATH? "use_double_quotes": False, "shebang": "#!/bin/bash", + "wallclock_time_seconds": platform.max_wallclock, + "tot_num_mpiprocs": platform.max_processors, } else: raise ValueError(f"Platform type {platform} not supported for engine aiida.") - #num_machines`, `num_mpiprocs_per_machine` or `tot_num_mpiprocs computer_setup_path = Path(self._output_path / f"{platform.name}/{platform.name}-setup.yml") computer_setup_path.parent.mkdir(exist_ok=True) computer_setup_path.write_text(yaml.dump(computer_setup)) @@ -197,8 +218,9 @@ except NotExistent: setup_path = Path("{computer_setup_path}") config_kwargs = yaml.safe_load(setup_path.read_text()) computer = ComputerBuilder(**config_kwargs).new().store() - computer.configure(safe_interval=0.0) - computer.set_minimum_job_poll_interval(0.0) + # TODO check if this works + #computer.configure(safe_interval=0.0) + #computer.set_minimum_job_poll_interval(0.0) computer.configure()""" # aiida bash code to run script @@ -252,14 +274,30 @@ tasks["{job.name}"] = wg.add_task( arguments = ["{{script}}"], nodes = {{"script": orm.SinglefileData("{trimmed_script_path}")}} )""" - if job.parameters['MEMORY'] != "": + # TODO is this the correct check, to check if a parameter has been specified + if job.parameters["MEMORY"] != "": + # TODO job.memory needs to be most likely converted to kb + # TODO job.memory_per_task? + create_task += f""" +tasks["{job.name}"].set({{"metadata.options.max_memory_kb": {job.memory}}})""" + if job.parameters["WALLCLOCK"] != "" or job.platform.max_wallclock is None: + if job.parameters["WALLCLOCK"] != "": + wallclock_seconds = int(ParamikoPlatform.parse_time(job.wallclock).total_seconds()) + else: + wallclock_seconds = int(ParamikoPlatform.parse_time(job.wallclock).total_seconds()) + create_task += f""" +tasks["{job.name}"].set({{"metadata.options.max_wallclock_seconds": {wallclock_seconds}}})""" + + + if job.parameters["CURRENT_QUEUE"] != "": create_task += f""" -tasks["{job.name}"].set({{"metadata.options.max_memory_kb": {job.parameters['MEMORY']}}})""" +tasks["{job.name}"].set({{"metadata.options.queue_name": {job.partition}}})""" + code_section += create_task code_section += "\n\n" return code_section - def _generate_workgraph_deps_section(self): + def _generate_workgraph_deps_section(self) -> str: code_section = "# WORKGRAPH_DEPS" for edge in self._job_list.graph.edges: code_section += f""" @@ -268,12 +306,36 @@ tasks["{edge[1]}"].waiting_on.add(tasks["{edge[0]}"])""" return code_section - def _generate_workgraph_submission_section(self): + def _generate_workgraph_submission_section(self) -> str: return """# WORKGRAPH_SUBMISSION -wg.run()""" # TODO change to submit +wg.run()""" #TODO change to submit once debugging finished + + def _generate_readme(self) -> str: + return f"""### Meta-information +This file has been auto generated from expid {self._job_list.expid}. + +### Instructions +To run the workflow please ensure that you have AiiDA installed and configured. +For that please refer to the +[installation section of AiiDA](https://aiida.readthedocs.io/projects/aiida-core/en/stable/installation/index.html) +Then you can run the generated workflow with: +```bash +python submit_aiida_workflow.py +``` + +### Node Creation + +The workflow creates the computer and code nodes as required from the +autosubmit config. If the computer and code nodes have been already created for +example through a previous run, they are loaded. If you have changed the +computer parameters in your autosubmit config, then you need to delete the +corresponding compute and code nodes. Note that this has the disadvantage thit +it will delete all calculations that are associated with these nodes. +It is therefore recommended to use the JOB paramaters in autosubmit to override +certain computer configs. +""" __all__ = [ 'Generator' - ] diff --git a/autosubmit/platforms/paramiko_platform.py b/autosubmit/platforms/paramiko_platform.py index 7dda872db..35055cc8d 100644 --- a/autosubmit/platforms/paramiko_platform.py +++ b/autosubmit/platforms/paramiko_platform.py @@ -1350,7 +1350,9 @@ class ParamikoPlatform(Platform): header = header.replace( '%HYPERTHREADING_DIRECTIVE%', self.header.get_hyperthreading_directive(job)) return header - def parse_time(self,wallclock): + + @staticmethod + def parse_time(wallclock): # noinspection Annotator regex = re.compile(r'(((?P\d+):)((?P\d+)))(:(?P\d+))?') parts = regex.match(wallclock) -- GitLab From 384000736e204dd46fb123ed04d652375bfa7a11 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Tue, 5 Nov 2024 21:25:46 +0100 Subject: [PATCH 7/7] Changes after meeting * added answers from the meeting * fixed computer configuration, but it is a bit hacky at the moment, need to dig longer how aiida does the automatic configuration. `verdi computer test THOR` passes workflow seems to run but is very slow --- autosubmit/autosubmit.py | 1 + autosubmit/generators/aiida.py | 61 +++++++++++++++++++++++----------- 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 34b951539..93a2d24c5 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -706,6 +706,7 @@ class Autosubmit: except SystemExit as e: return 1 except BaseException as e: + raise e raise AutosubmitCritical( "Incorrect arguments for this command", 7011) diff --git a/autosubmit/generators/aiida.py b/autosubmit/generators/aiida.py index 2af90e596..13e33457f 100644 --- a/autosubmit/generators/aiida.py +++ b/autosubmit/generators/aiida.py @@ -47,22 +47,32 @@ class Generator(AbstractGenerator): # # TODO investigate platform parameters how they interact with job # Can the CURRENT_* parameters be specified by the user? + # I think the CURRENT_* properties are just copied over from platform + # + # TODO what is "MAX_PROCESSORS" used for? + # do not support for the moment, need to handle warning # # # TODO EXECUTABLE are not supported ATM - SUPPORTED_JOB_KEYWORDS = ["NUMTHREADS", "TASKS", "NODES", "CURRENT_QUEUE", "WALLCLOCK", "MEMORY", "PLATFORM", # these we need to transfer to aiida + # TODO Platform.PROJECT -> sbatch -A + # TODO in the expdef the PROJECT_TYPE should be git or local + #SUPPORT_GLOBAL_KEYWORDS = ["WRAPPERS"] + SUPPORTED_JOB_KEYWORDS = ["NUMTHREADS", "TASKS", "NODES", "WALLCLOCK", "MEMORY", "PLATFORM", # these we need to transfer to aiida "DEPENDENCIES", "FILE", "RUNNING"] # these are resolved by autosubmit internally # parameters, because in AiiDA we do not support them as computer # parameters so we need to overwrite them in the tasks # NOTE we cannot speciy MAX_WALLCLOCK in the platform in AiiDA so we need to overwrite it in the task - SUPPORTED_PLATFORM_KEYWORDS = ["TYPE", "HOST", "USER", "QUEUE", "SCRATCH_DIR", "MAX_WALLCLOCK", "MAX_PROCESSORS", # these we need to transfer to aiida + SUPPORTED_PLATFORM_KEYWORDS = ["TYPE", "HOST", "USER", "QUEUE", "SCRATCH_DIR", "MAX_WALLCLOCK", # these we need to transfer to aiida "PROJECT"] # these are resolved by autosubmit internally def __init__(self, job_list: JobList, as_conf: AutosubmitConfig, output_dir: str): if not (output_path := Path(output_dir)).exists(): raise ValueError(f"Given `output_dir` {output_path} does not exist.") - self._output_path = output_path.absolute() + # TODO name is ambiguous as a bit as output_path is not -> output_dir + self._output_path = (output_path / job_list.expid).absolute() + # TODO I think here we should raise an error to not accidently overwrite + self._output_path.mkdir(exist_ok=True) self._job_list = job_list self._as_conf = as_conf @@ -95,8 +105,8 @@ class Generator(AbstractGenerator): for platform_name, platform_conf in self._as_conf.starter_conf['PLATFORMS'].items(): # only validate platforms that are used in jobs if platform_name in self._platforms_used_in_job.keys(): - for key in platform_conf.keys(): - if key not in Generator.SUPPORTED_PLATFORM_KEYWORDS: + for key, value in platform_conf.items(): + if key not in Generator.SUPPORTED_PLATFORM_KEYWORDS and value != '': msg = f"Found in platform {platform_name} configuration file key {key} that is not supported for AiiDA generator." warnings.warn(msg) @@ -183,6 +193,7 @@ tasks = {} } elif isinstance(platform, SlurmPlatform): # TODO Do we need to handle invalid parameters or is this done by autosubmit + # => Autosubmit does the validation, we do not need to worry #if platform.processors_per_node is None: # raise ValueError("") #if platform.max_wallclock is None: @@ -191,19 +202,17 @@ tasks = {} "label": f"{platform.name}", "hostname": f"{platform.host}", "work_dir": f"{platform.scratch}", - "user": f"{platform.user}", + #username": f"{platform.user}", does not work "description": "", "transport": "core.ssh", "scheduler": "core.slurm", "mpirun_command": "mpirun -np {tot_num_mpiprocs}", "mpiprocs_per_machine": platform.processors_per_node, "default_memory_per_machine": None, # This is specified in the task option - "append_text": "", # TODO is this specified by EXTENDED_HEADER_PATH? - "prepend_text": "", # TODO is this specified by EXTENDED_TAILER_PATH? + "append_text": "", + "prepend_text": "", "use_double_quotes": False, "shebang": "#!/bin/bash", - "wallclock_time_seconds": platform.max_wallclock, - "tot_num_mpiprocs": platform.max_processors, } else: raise ValueError(f"Platform type {platform} not supported for engine aiida.") @@ -214,14 +223,24 @@ tasks = {} create_computer = f""" try: computer = orm.load_computer("{platform.name}") + print(f"Loaded computer {{computer.label!r}}") except NotExistent: setup_path = Path("{computer_setup_path}") config_kwargs = yaml.safe_load(setup_path.read_text()) computer = ComputerBuilder(**config_kwargs).new().store() - # TODO check if this works + # TODO check if we should set this #computer.configure(safe_interval=0.0) #computer.set_minimum_job_poll_interval(0.0) - computer.configure()""" + + from aiida.transports.plugins.ssh import SshTransport + from aiida.transports import cli as transport_cli + default_kwargs = {{name: transport_cli.transport_option_default(name, computer) for name in SshTransport.get_valid_auth_params()}} + default_kwargs["port"] = int(default_kwargs["port"]) + default_kwargs["timeout"] = float(default_kwargs["timeout"]) + default_kwargs["username"] = "{platform.user}" + default_kwargs['key_filename'] = "/home/alexgo/.ssh/id_rsa" # TODO not sure why default kwargs don't already give this as in CLI this is the default arg + computer.configure(user=orm.User.collection.get_default(), **default_kwargs) + print(f"Created and stored computer {{computer.label}}")""" # aiida bash code to run script code_setup = { @@ -275,23 +294,27 @@ tasks["{job.name}"] = wg.add_task( nodes = {{"script": orm.SinglefileData("{trimmed_script_path}")}} )""" # TODO is this the correct check, to check if a parameter has been specified - if job.parameters["MEMORY"] != "": + # or should we do hasattr(job, "memory")? Then we get the default argument + if job.memory != "": # TODO job.memory needs to be most likely converted to kb # TODO job.memory_per_task? create_task += f""" tasks["{job.name}"].set({{"metadata.options.max_memory_kb": {job.memory}}})""" - if job.parameters["WALLCLOCK"] != "" or job.platform.max_wallclock is None: + if job.platform.max_wallclock is None: + # TODO is this actually needed or does autosubmit already cover this logic? if job.parameters["WALLCLOCK"] != "": wallclock_seconds = int(ParamikoPlatform.parse_time(job.wallclock).total_seconds()) else: - wallclock_seconds = int(ParamikoPlatform.parse_time(job.wallclock).total_seconds()) + wallclock_seconds = int(ParamikoPlatform.parse_time(job.platform.wallclock).total_seconds()) create_task += f""" tasks["{job.name}"].set({{"metadata.options.max_wallclock_seconds": {wallclock_seconds}}})""" - - - if job.parameters["CURRENT_QUEUE"] != "": + if job.partition != "": create_task += f""" -tasks["{job.name}"].set({{"metadata.options.queue_name": {job.partition}}})""" +tasks["{job.name}"].set({{"metadata.options.queue_name": "{job.partition}"}})""" + #if job.platform.user != '': + # create_task += f""" + #tasks["{job.name}"].set({{"metadata.options.account": "{job.platform.user}"}})""" + code_section += create_task code_section += "\n\n" -- GitLab