From 684b20d50756bfc91160f10218c74d8fe574d2c1 Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Tue, 3 Jan 2023 18:07:13 +0100 Subject: [PATCH 01/16] Add autosubmit generate subcommand --- autosubmit/autosubmit.py | 27 ++ autosubmit/generators/__init__.py | 29 ++ autosubmit/generators/pyflow.py | 424 ++++++++++++++++++++++++++++++ 3 files changed, 480 insertions(+) create mode 100644 autosubmit/generators/__init__.py create mode 100644 autosubmit/generators/pyflow.py diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index d94662632..a7a21b988 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -56,6 +56,7 @@ from .notifications.mail_notifier import MailNotifier from .notifications.notifier import Notifier from .platforms.paramiko_submitter import ParamikoSubmitter from .platforms.platform import Platform +from .generators import Engine, get_engine_generator dialog = None from time import sleep @@ -637,8 +638,23 @@ class Autosubmit: help='Read job files generated by the inspect subcommand.') subparser.add_argument('ID', metavar='ID', help='An ID of a Workflow (eg a000) or a Job (eg a000_20220401_fc0_1_1_APPLICATION).') + # Generate + subparser = subparsers.add_parser( + 'generate', description='Generate a workflow definition for a different Workflow Manager', + argument_default=argparse.SUPPRESS) + subparser.add_argument('expid', help='experiment identifier') + subparser.add_argument('-engine', '--engine', default=Engine.pyflow.value, type=str.lower, + help='The target Workflow Manager engine', choices=[engine.value for engine in Engine]) + subparser.add_argument('args', nargs='?') + args = parser.parse_args() + if len(sys.argv[1]) > 1 and sys.argv[1] in ['generate']: + args, options = parser.parse_known_args() + else: + options = [] + args = parser.parse_args() + if args.command is None: parser.print_help() parser.exit() @@ -743,6 +759,8 @@ class Autosubmit: return Autosubmit.update_description(args.expid, args.description) elif args.command == 'cat-log': return Autosubmit.cat_log(args.ID, args.file, args.mode, args.inspect) + elif args.command == 'generate': + return Autosubmit.generate_workflow(args.expid, Engine[args.engine], options) @staticmethod def _init_logs(args, console_level='INFO', log_level='DEBUG', expid='None'): @@ -6155,3 +6173,12 @@ class Autosubmit: raise AutosubmitCritical(f'The job log file {file} found is not a file: {workflow_log_file}', 7011) return view_file(workflow_log_file, mode) == 0 + + @staticmethod + def generate_workflow(expid: str, engine: Engine, options: List[str]) -> None: + """Generate the workflow configuration for a different Workflow Manager engine.""" + generator = get_engine_generator(engine) + args = generator.parse_args([f'--experiment={expid}', *options]) + generator.generate(args) + Log.info(f'Generated workflow configuration for {engine}') + diff --git a/autosubmit/generators/__init__.py b/autosubmit/generators/__init__.py new file mode 100644 index 000000000..5571282ef --- /dev/null +++ b/autosubmit/generators/__init__.py @@ -0,0 +1,29 @@ +from enum import Enum +from importlib import import_module +from typing import Callable, cast, Protocol + + +class Engine(Enum): + """Workflow Manager engine flavors.""" + pyflow = 'pyflow' + + def __str__(self): + return self.value + + +class GenerateProto(Protocol): + """Need a protocol to define the type returned by importlib.""" + generate: Callable + parse_args: Callable + + +def get_engine_generator(engine: Engine) -> GenerateProto: + """Dynamically loads the engine generate function.""" + generator_function = cast(GenerateProto, import_module(f'autosubmit.generators.{engine.value}')) + return generator_function + + +__all__ = [ + 'Engine', + 'get_engine_generator' +] diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py new file mode 100644 index 000000000..5748228f0 --- /dev/null +++ b/autosubmit/generators/pyflow.py @@ -0,0 +1,424 @@ +import argparse +import sys +from collections import defaultdict +from enum import Enum +from itertools import groupby +from typing import List, Dict, Any, TypedDict, Union +import re + +import networkx as nx +from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from pyflow import * +import os + +import pyflow as pf + +# Pattern used to verify if a TASK name includes the previous CHUNK number, with a separator. +PREVIOUS_CHUNK_PATTERN = re.compile(r''' + ([a-zA-Z0-9_\-\.]+) # The Task name (e.g. TASK); + - # TASK and CHUNK separator, i.e. TASK-1 (the hyphen between TASK and 1); + ([\d]+) # The Chunk name (e.g. 1). +''', re.X) + +# Autosubmit Task name separator (not to be confused with task and chunk name separator). +DEFAULT_SEPARATOR = '_' + + +class Running(Enum): + """The Running level of an Autosubmit task.""" + ONCE = 'once' + MEMBER = 'member' + CHUNK = 'chunk' + SPLIT = 'split' + + def __str__(self): + return self.value + + +class DependencyData(TypedDict): + """Autosubmit dependency data.""" + ID: str + NAME: str + RUNNING: Running + + +class JobData(TypedDict): + """Autosubmit job data.""" + ID: str + NAME: str + FILE: str + DEPENDENCIES: Dict[str, DependencyData] + RUNNING: str + WALLCLOCK: str + ADDITIONAL_FILES: List[str] + + +# TODO: split +# Defines how many ``-``'s are replaced by a ``/`` for +# each Autosubmit hierarchy level (to avoid using an if/elif/else). +REPLACE_COUNT = { + Running.ONCE.value: 1, + Running.MEMBER.value: 3, + Running.CHUNK.value: 4 +} + + +def _autosubmit_id_to_ecflow_id(job_id, running): + """Given an Autosubmit ID, create the node ID for ecFlow (minus heading ``/``).""" + replace_count = REPLACE_COUNT[running] + return job_id.replace(DEFAULT_SEPARATOR, '/', replace_count) + + +def parse_args(args) -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog='autosubmit generate ... engine=pyflow', + description='Produces a valid PyFlow workflow configuration given an Autosubmit experiment ID', + epilog='This program needs access to an Autosubmit installation' + ) + parser.add_argument('-e', '--experiment', required=True, help='Autosubmit experiment ID') + parser.add_argument('-d', '--deploy', default=False, action='store_true', help='Deploy to ecFlow or not') + parser.add_argument('-s', '--server', default='localhost', + help='ecFlow server hostname or IP (only used if deploy=True)') + parser.add_argument('-p', '--port', default=3141, help='ecFlow server port (only used if deploy=True)') + parser.add_argument('-g', '--graph', default=False, action='store_true', help='Print the DOT plot') + parser.add_argument('-q', '--quiet', default=False, action='store_true') + + return parser.parse_args(args) + + +def _create_ecflow_suite( + experiment_id: str, + start_dates: List[str], + members: List[str], + chunks: [int], + jobs: Dict[str, JobData], + server_host: str) -> Suite: + """Replicate the vanilla workflow graph structure.""" + + # From: https://pyflow-workflow-generator.readthedocs.io/en/latest/content/introductory-course/getting-started.html + scratchdir = os.path.join(os.path.abspath(''), 'scratch') + filesdir = os.path.join(scratchdir, 'files') + outdir = os.path.join(scratchdir, 'out') + + if not os.path.exists(filesdir): + os.makedirs(filesdir, exist_ok=True) + + if not os.path.exists(outdir): + os.makedirs(outdir, exist_ok=True) + + # First we create a suite with the same ID as the Autosubmit experiment, + # and families for each Autosubmit hierarchy level. + with Suite( + experiment_id, + host=pf.LocalHost(server_host), + defstatus=pf.state.suspended, + home=outdir, + files=filesdir + ) as s: + for start_date in start_dates: + with Family(start_date, START_DATE=start_date): # type: ignore + for member in members: + with Family(member, MEMBER=member) as m: + for chunk in chunks: + Family(str(chunk), CHUNK=chunk) + # TODO: splits + # PyFlow API makes it very easy to create tasks having the ecFlow ID. + # Due to how we expanded the Autosubmit graph to include the ID's, and how + # we structured this suite, an Autosubmit ID can be seamlessly translated + # to an ecFlow ID by simply replacing `_`'s by `/`, ignoring the `_`'s in + # tasks names. + # + # This means that `a000_REMOTE_SETUP` from Autosubmit is `a000/REMOTE_SETUP` + # in ecFlow, `a000_20220401_fc0_INI` is `a000/20220401/fc0/INI`, and so on. + for job in jobs.values(): + ecflow_node = _autosubmit_id_to_ecflow_id(job['ID'], job['RUNNING']) + t = Task(job['NAME']) + + # Find the direct parent of the task, based on the Autosubmit task ID. + # Start from the Suite, and skip the first (suite), and the last (task) + # as we know we can discard these. + parent = s + for node in ecflow_node.split('/')[1:-1]: + parent = parent[node] + # We just need to prevent adding a node twice since creating a task automatically adds + # it to the suite in the context. And simply call ``add_node`` and we should have it. + if t.name not in list(parent.children.mapping.keys()): + parent.add_node(t) + + # Add dependencies. Would be better if we could do it in one-pass, + # but not sure if we can achieve that with PyFlow. Tried adding by + # names during the previous loop, but couldn't find the proper + # way to link dependencies. Ended with "externs" (tasks identified + # as belonging to external suites - due to the names tried). + for job in jobs.values(): + ecflow_node = _autosubmit_id_to_ecflow_id(job['ID'], job['RUNNING']) + parent = s + for node in ecflow_node.split('/')[1:-1]: + parent = parent[node] + ecflow_node = parent[job['NAME']] + + for dep in job['DEPENDENCIES'].values(): + dependency_node = _autosubmit_id_to_ecflow_id(dep['ID'], dep['RUNNING']) + parent = s + for node in dependency_node.split('/')[1:-1]: + parent = parent[node] + dependency_node = parent[dep['NAME']] + + # Operator overloaded in PyFlow. This creates a dependency. + dependency_node >> ecflow_node + + return s + + +def _create_job_id( + *, + expid: str, + name: str, + start_date: Union[str, None] = None, + member: Union[str, None] = None, + chunk: Union[str, None] = None, + split: Union[str, None] = None, + separator=DEFAULT_SEPARATOR) -> str: + """Create an Autosubmit Job ID. Ignores optional values passed as None.""" + if not expid or not name: + raise ValueError('You must provide valid expid and job name') + return separator.join([token for token in filter(None, [expid, start_date, member, chunk, split, name])]) + + +def _create_job( + *, + expid: str, + name: str, + start_date: Union[str, None] = None, + member: Union[str, None] = None, + chunk: Union[int, None] = None, + split: Union[str, None] = None, + separator=DEFAULT_SEPARATOR, + job_data: JobData, + jobs_data: Dict[str, JobData]) -> JobData: + """Create an Autosubmit job.""" + chunk_str = None if chunk is None else str(chunk) + job_id = _create_job_id( + expid=expid, + name=name, + member=member, + chunk=chunk_str, + split=split, + separator=separator, + start_date=start_date) + job = {'ID': job_id, **job_data.copy()} + job['DEPENDENCIES'] = {} + has_previous_chunk_dependency = any(map(lambda dep_name: PREVIOUS_CHUNK_PATTERN.match(dep_name), job_data['DEPENDENCIES'].keys())) + for dependency in job_data['DEPENDENCIES']: + # ONCE jobs can only have dependencies on other once jobs. + job_dependency = _create_dependency( + dependency_name=dependency, + jobs_data=jobs_data, + expid=expid, + start_date=start_date, + member=member, + chunk=chunk, + split=split) + # Certain dependencies do not produce an object, e.g.: + # - SIM-1 if SIM is not RUNNING=chunk, or + # - SIM-1 if current chunk is 1 (or 1 - 1 = 0) + if job_dependency: + if has_previous_chunk_dependency and chunk > 1: + # If this is a CHUNK task, and has dependencies on tasks in previous CHUNK's, we ignore + # dependencies higher up in the hierarchy (i.e. ONCE and MEMBER). + if job_dependency['RUNNING'] in [Running.ONCE.value, Running.MEMBER.value]: + continue + job['DEPENDENCIES'][dependency] = job_dependency + return job + + +def _create_dependency( + dependency_name: str, + jobs_data: Dict[str, JobData], + expid: str, + start_date: Union[str, None] = None, + member: Union[str, None] = None, + chunk: Union[int, None] = None, + split: Union[str, None] = None) -> Union[DependencyData, None]: + """Create an Autosubmit dependency object. + + The dependency created will have a field ``ID`` with the expanded dependency ID.""" + dependency_member = None + dependency_start_date = None + dependency_chunk = None + + m = re.search(PREVIOUS_CHUNK_PATTERN, dependency_name) + if m: + if chunk is None: + # We ignore if this syntax is used outside a running=chunk (same behaviour as AS?). + return None + dependency_name = m.group(1) + previous_chunk = int(m.group(2)) + if chunk - previous_chunk < 1: + # We ignore -1 when the chunk is 1 (i.e. no previous chunk). + return None + dependency_chunk = str(previous_chunk) + + dependency_data = jobs_data[dependency_name] + + if dependency_data['RUNNING'] == Running.MEMBER.value: + # if not a member dependency, then we do not add the start date and member (i.e. it is a once dependency) + dependency_member = member + dependency_start_date = start_date + elif dependency_data['RUNNING'] == Running.CHUNK.value: + dependency_member = member + dependency_start_date = start_date + if dependency_chunk is None: + dependency_chunk = str(chunk) + # TODO: split + dependency_id = _create_job_id( + expid=expid, + name=dependency_name, + member=dependency_member, + chunk=dependency_chunk, + start_date=dependency_start_date) + return {'ID': dependency_id, **dependency_data} + + +def _expand_autosubmit_graph( + jobs_grouped_by_running_level: Dict[str, List[JobData]], + expid: str, + start_dates: List[str], + members: List[str], + chunks: List[int], + jobs_data: Dict[str, JobData] +) -> Dict[str, JobData]: + """Expand the Autosubmit graph. + + Expand jobs (by member, chunk, split, previous-dependency like SIM-1). That's because the + # graph declaration in Autosubmit configuration contains a meta graph, that is expanded by + # each hierarchy level generating more jobs (i.e. SIM may become a000_202204_fc0_1_SIM for + # running=CHUNK).""" + jobs: Dict[str, JobData] = {} + for running in Running: + for job_running in jobs_grouped_by_running_level[running.value]: + if running == Running.ONCE: + job = _create_job( + expid=expid, + name=job_running['NAME'], + job_data=job_running, + jobs_data=jobs_data) + jobs[job['ID']] = job + else: + for start_date in start_dates: + if running == Running.MEMBER: + for member in members: + job = _create_job( + expid=expid, + name=job_running['NAME'], + member=member, + start_date=start_date, + job_data=job_running, + jobs_data=jobs_data) + jobs[job['ID']] = job + elif running == Running.CHUNK: + for member in members: + for chunk in chunks: + job = _create_job( + expid=expid, + name=job_running['NAME'], + member=member, + chunk=chunk, + start_date=start_date, + job_data=job_running, + jobs_data=jobs_data) + jobs[job['ID']] = job + # TODO: split + else: + # TODO: implement splits and anything else? + raise NotImplementedError(running) + return jobs + + +def generate(args) -> None: + # Init the configuration object where expid = experiment identifier that you want to load. + as_conf = AutosubmitConfig(args.experiment) + # This will load the data from the experiment. + as_conf.reload(True) + + # Autosubmit experiment configuration. + expid = args.experiment + start_dates = as_conf.experiment_data['EXPERIMENT']['DATELIST'].split(' ') + members = as_conf.experiment_data['EXPERIMENT']['MEMBERS'].split(' ') + chunks = [i for i in range(1, as_conf.experiment_data['EXPERIMENT']['NUMCHUNKS'] + 1)] + + # Place the NAME attribute in the job object. + jobs_data: Dict[str, JobData] = { + job_data[0]: {'NAME': job_data[0], **job_data[1]} + for job_data in as_conf.jobs_data.items()} + + # Create a list of jobs. + jobs_list: List[JobData] = list(jobs_data.values()) + jobs_grouped_by_running_level: Dict[str, List[JobData]] = defaultdict(list) + jobs_grouped_by_running_level.update( + {job[0]: list(job[1]) for job in groupby(jobs_list, lambda item: item['RUNNING'])}) + + # TODO: raise an error for unsupported features, like SKIPPABLE? + # Expand the Autosubmit workflow graph. + jobs: Dict[str, JobData] = _expand_autosubmit_graph(jobs_grouped_by_running_level, expid, start_dates, members, + chunks, jobs_data) + + # Create networkx graph. + G = nx.DiGraph() + for job in jobs.values(): + G.add_node(job['ID']) + for dep in job['DEPENDENCIES'].values(): + G.add_edges_from([(dep['ID'], job['ID'])]) + PG = nx.nx_pydot.to_pydot(G) + + if args.graph: + print(PG) + + # Sort the dictionary of jobs in topological order. + jobs_order = list(list(nx.topological_sort(G))) + jobs_ordered: Dict[str, JobData] = dict( + sorted(jobs.items(), key=lambda item: jobs_order.index(item[1]['ID']))) # type: ignore + + suite = _create_ecflow_suite( + experiment_id=expid, + start_dates=start_dates, + members=members, + chunks=chunks, + jobs=jobs_ordered, + server_host=args.server + ) + + suite.check_definition() + if not args.quiet: + print(suite) + + if args.deploy: + suite.deploy_suite(overwrite=True) + suite.replace_on_server(host=args.server, port=args.port) + + +## OLD: not used... for now? + + +def main() -> None: + parser = argparse.ArgumentParser( + prog='autosubmit2pyflow', + description='Produces a valid PyFlow workflow configuration given an Autosubmit experiment ID', + epilog='This program needs access to an Autosubmit installation' + ) + parser.add_argument('-e', '--experiment', required=True, help='Autosubmit experiment ID') + parser.add_argument('-d', '--deploy', default=False, action='store_true', help='Deploy to ecFlow or not') + parser.add_argument('-s', '--server', default='localhost', help='ecFlow server hostname or IP (only used if deploy=True)') + parser.add_argument('-p', '--port', default=3141, help='ecFlow server port (only used if deploy=True)') + parser.add_argument('-g', '--graph', default=False, action='store_true', help='Print the DOT plot') + parser.add_argument('-q', '--quiet', default=False, action='store_true') + + args = parser.parse_args() + + generate(args) + + sys.exit(0) + + +if __name__ == '__main__': + main() -- GitLab From 2e909b63f659a09954aee231376c487a17656ef2 Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Wed, 4 Jan 2023 09:06:03 +0100 Subject: [PATCH 02/16] Parse args in the generator, simplify and provide a single function --- autosubmit/autosubmit.py | 4 +--- autosubmit/generators/__init__.py | 5 ++--- autosubmit/generators/pyflow.py | 30 +++++++++--------------------- 3 files changed, 12 insertions(+), 27 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index a7a21b988..dcea4ef18 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -6177,8 +6177,6 @@ class Autosubmit: @staticmethod def generate_workflow(expid: str, engine: Engine, options: List[str]) -> None: """Generate the workflow configuration for a different Workflow Manager engine.""" - generator = get_engine_generator(engine) - args = generator.parse_args([f'--experiment={expid}', *options]) - generator.generate(args) Log.info(f'Generated workflow configuration for {engine}') + get_engine_generator(engine)([f'--experiment={expid}', *options]) diff --git a/autosubmit/generators/__init__.py b/autosubmit/generators/__init__.py index 5571282ef..8e2a9ca87 100644 --- a/autosubmit/generators/__init__.py +++ b/autosubmit/generators/__init__.py @@ -14,13 +14,12 @@ class Engine(Enum): class GenerateProto(Protocol): """Need a protocol to define the type returned by importlib.""" generate: Callable - parse_args: Callable -def get_engine_generator(engine: Engine) -> GenerateProto: +def get_engine_generator(engine: Engine) -> Callable: """Dynamically loads the engine generate function.""" generator_function = cast(GenerateProto, import_module(f'autosubmit.generators.{engine.value}')) - return generator_function + return generator_function.generate __all__ = [ diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index 5748228f0..0ef8e7ff9 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -69,7 +69,7 @@ def _autosubmit_id_to_ecflow_id(job_id, running): return job_id.replace(DEFAULT_SEPARATOR, '/', replace_count) -def parse_args(args) -> argparse.Namespace: +def _parse_args(args) -> argparse.Namespace: parser = argparse.ArgumentParser( prog='autosubmit generate ... engine=pyflow', description='Produces a valid PyFlow workflow configuration given an Autosubmit experiment ID', @@ -335,7 +335,13 @@ def _expand_autosubmit_graph( return jobs -def generate(args) -> None: +def generate(options: List[str]) -> None: + """Generates a PyFlow workflow using Autosubmit database. + + :param options: a list of strings with arguments (equivalent to sys.argv), passed to argparse + """ + args: argparse.Namespace = _parse_args(options) + # Init the configuration object where expid = experiment identifier that you want to load. as_conf = AutosubmitConfig(args.experiment) # This will load the data from the experiment. @@ -397,26 +403,8 @@ def generate(args) -> None: suite.replace_on_server(host=args.server, port=args.port) -## OLD: not used... for now? - - def main() -> None: - parser = argparse.ArgumentParser( - prog='autosubmit2pyflow', - description='Produces a valid PyFlow workflow configuration given an Autosubmit experiment ID', - epilog='This program needs access to an Autosubmit installation' - ) - parser.add_argument('-e', '--experiment', required=True, help='Autosubmit experiment ID') - parser.add_argument('-d', '--deploy', default=False, action='store_true', help='Deploy to ecFlow or not') - parser.add_argument('-s', '--server', default='localhost', help='ecFlow server hostname or IP (only used if deploy=True)') - parser.add_argument('-p', '--port', default=3141, help='ecFlow server port (only used if deploy=True)') - parser.add_argument('-g', '--graph', default=False, action='store_true', help='Print the DOT plot') - parser.add_argument('-q', '--quiet', default=False, action='store_true') - - args = parser.parse_args() - - generate(args) - + generate(sys.argv) sys.exit(0) -- GitLab From 6a07d5b479ce4837554626afe960fca6da14e3f5 Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Wed, 4 Jan 2023 10:27:40 +0100 Subject: [PATCH 03/16] Use existing Autosubmit code to create the workflow graph (thanks @mcastril !) --- autosubmit/autosubmit.py | 17 +- autosubmit/generators/pyflow.py | 294 +++----------------------------- 2 files changed, 43 insertions(+), 268 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index dcea4ef18..e41c37690 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -6178,5 +6178,20 @@ class Autosubmit: def generate_workflow(expid: str, engine: Engine, options: List[str]) -> None: """Generate the workflow configuration for a different Workflow Manager engine.""" Log.info(f'Generated workflow configuration for {engine}') - get_engine_generator(engine)([f'--experiment={expid}', *options]) + + try: + Log.info("Getting job list...") + as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) + as_conf.check_conf_files(False) + # Getting output type from configuration + # pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') + job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=False, monitor=False) + except AutosubmitError as e: + raise AutosubmitCritical(e.message, e.code, e.trace) + except AutosubmitCritical as e: + raise + except BaseException as e: + raise AutosubmitCritical("Error while checking the configuration files or loading the job_list", 7040, + str(e)) + get_engine_generator(engine)(job_list, [f'--experiment={expid}', *options]) diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index 0ef8e7ff9..ffa7f50b7 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -1,13 +1,10 @@ import argparse -import sys -from collections import defaultdict from enum import Enum from itertools import groupby from typing import List, Dict, Any, TypedDict, Union import re -import networkx as nx -from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from autosubmit.job.job_list import JobList, Job from pyflow import * import os @@ -34,25 +31,6 @@ class Running(Enum): def __str__(self): return self.value - -class DependencyData(TypedDict): - """Autosubmit dependency data.""" - ID: str - NAME: str - RUNNING: Running - - -class JobData(TypedDict): - """Autosubmit job data.""" - ID: str - NAME: str - FILE: str - DEPENDENCIES: Dict[str, DependencyData] - RUNNING: str - WALLCLOCK: str - ADDITIONAL_FILES: List[str] - - # TODO: split # Defines how many ``-``'s are replaced by a ``/`` for # each Autosubmit hierarchy level (to avoid using an if/elif/else). @@ -91,7 +69,7 @@ def _create_ecflow_suite( start_dates: List[str], members: List[str], chunks: [int], - jobs: Dict[str, JobData], + jobs: List[Job], server_host: str) -> Suite: """Replicate the vanilla workflow graph structure.""" @@ -130,267 +108,58 @@ def _create_ecflow_suite( # # This means that `a000_REMOTE_SETUP` from Autosubmit is `a000/REMOTE_SETUP` # in ecFlow, `a000_20220401_fc0_INI` is `a000/20220401/fc0/INI`, and so on. - for job in jobs.values(): - ecflow_node = _autosubmit_id_to_ecflow_id(job['ID'], job['RUNNING']) - t = Task(job['NAME']) + for job in jobs: + ecflow_node = _autosubmit_id_to_ecflow_id(job.long_name, job.running) + t = Task(job.section) # Find the direct parent of the task, based on the Autosubmit task ID. # Start from the Suite, and skip the first (suite), and the last (task) # as we know we can discard these. - parent = s + parent_node = s for node in ecflow_node.split('/')[1:-1]: - parent = parent[node] + parent_node = parent_node[node] # We just need to prevent adding a node twice since creating a task automatically adds # it to the suite in the context. And simply call ``add_node`` and we should have it. - if t.name not in list(parent.children.mapping.keys()): - parent.add_node(t) + if t.name not in list(parent_node.children.mapping.keys()): + parent_node.add_node(t) - # Add dependencies. Would be better if we could do it in one-pass, - # but not sure if we can achieve that with PyFlow. Tried adding by - # names during the previous loop, but couldn't find the proper - # way to link dependencies. Ended with "externs" (tasks identified - # as belonging to external suites - due to the names tried). - for job in jobs.values(): - ecflow_node = _autosubmit_id_to_ecflow_id(job['ID'], job['RUNNING']) - parent = s - for node in ecflow_node.split('/')[1:-1]: - parent = parent[node] - ecflow_node = parent[job['NAME']] - - for dep in job['DEPENDENCIES'].values(): - dependency_node = _autosubmit_id_to_ecflow_id(dep['ID'], dep['RUNNING']) - parent = s + for parent in job.parents: + dependency_node = _autosubmit_id_to_ecflow_id(parent.long_name, parent.running) + parent_node = s for node in dependency_node.split('/')[1:-1]: - parent = parent[node] - dependency_node = parent[dep['NAME']] + parent_node = parent_node[node] + dependency_node = parent_node[parent.section] # Operator overloaded in PyFlow. This creates a dependency. - dependency_node >> ecflow_node + dependency_node >> t return s -def _create_job_id( - *, - expid: str, - name: str, - start_date: Union[str, None] = None, - member: Union[str, None] = None, - chunk: Union[str, None] = None, - split: Union[str, None] = None, - separator=DEFAULT_SEPARATOR) -> str: - """Create an Autosubmit Job ID. Ignores optional values passed as None.""" - if not expid or not name: - raise ValueError('You must provide valid expid and job name') - return separator.join([token for token in filter(None, [expid, start_date, member, chunk, split, name])]) - - -def _create_job( - *, - expid: str, - name: str, - start_date: Union[str, None] = None, - member: Union[str, None] = None, - chunk: Union[int, None] = None, - split: Union[str, None] = None, - separator=DEFAULT_SEPARATOR, - job_data: JobData, - jobs_data: Dict[str, JobData]) -> JobData: - """Create an Autosubmit job.""" - chunk_str = None if chunk is None else str(chunk) - job_id = _create_job_id( - expid=expid, - name=name, - member=member, - chunk=chunk_str, - split=split, - separator=separator, - start_date=start_date) - job = {'ID': job_id, **job_data.copy()} - job['DEPENDENCIES'] = {} - has_previous_chunk_dependency = any(map(lambda dep_name: PREVIOUS_CHUNK_PATTERN.match(dep_name), job_data['DEPENDENCIES'].keys())) - for dependency in job_data['DEPENDENCIES']: - # ONCE jobs can only have dependencies on other once jobs. - job_dependency = _create_dependency( - dependency_name=dependency, - jobs_data=jobs_data, - expid=expid, - start_date=start_date, - member=member, - chunk=chunk, - split=split) - # Certain dependencies do not produce an object, e.g.: - # - SIM-1 if SIM is not RUNNING=chunk, or - # - SIM-1 if current chunk is 1 (or 1 - 1 = 0) - if job_dependency: - if has_previous_chunk_dependency and chunk > 1: - # If this is a CHUNK task, and has dependencies on tasks in previous CHUNK's, we ignore - # dependencies higher up in the hierarchy (i.e. ONCE and MEMBER). - if job_dependency['RUNNING'] in [Running.ONCE.value, Running.MEMBER.value]: - continue - job['DEPENDENCIES'][dependency] = job_dependency - return job - - -def _create_dependency( - dependency_name: str, - jobs_data: Dict[str, JobData], - expid: str, - start_date: Union[str, None] = None, - member: Union[str, None] = None, - chunk: Union[int, None] = None, - split: Union[str, None] = None) -> Union[DependencyData, None]: - """Create an Autosubmit dependency object. - - The dependency created will have a field ``ID`` with the expanded dependency ID.""" - dependency_member = None - dependency_start_date = None - dependency_chunk = None - - m = re.search(PREVIOUS_CHUNK_PATTERN, dependency_name) - if m: - if chunk is None: - # We ignore if this syntax is used outside a running=chunk (same behaviour as AS?). - return None - dependency_name = m.group(1) - previous_chunk = int(m.group(2)) - if chunk - previous_chunk < 1: - # We ignore -1 when the chunk is 1 (i.e. no previous chunk). - return None - dependency_chunk = str(previous_chunk) - - dependency_data = jobs_data[dependency_name] - - if dependency_data['RUNNING'] == Running.MEMBER.value: - # if not a member dependency, then we do not add the start date and member (i.e. it is a once dependency) - dependency_member = member - dependency_start_date = start_date - elif dependency_data['RUNNING'] == Running.CHUNK.value: - dependency_member = member - dependency_start_date = start_date - if dependency_chunk is None: - dependency_chunk = str(chunk) - # TODO: split - dependency_id = _create_job_id( - expid=expid, - name=dependency_name, - member=dependency_member, - chunk=dependency_chunk, - start_date=dependency_start_date) - return {'ID': dependency_id, **dependency_data} - - -def _expand_autosubmit_graph( - jobs_grouped_by_running_level: Dict[str, List[JobData]], - expid: str, - start_dates: List[str], - members: List[str], - chunks: List[int], - jobs_data: Dict[str, JobData] -) -> Dict[str, JobData]: - """Expand the Autosubmit graph. - - Expand jobs (by member, chunk, split, previous-dependency like SIM-1). That's because the - # graph declaration in Autosubmit configuration contains a meta graph, that is expanded by - # each hierarchy level generating more jobs (i.e. SIM may become a000_202204_fc0_1_SIM for - # running=CHUNK).""" - jobs: Dict[str, JobData] = {} - for running in Running: - for job_running in jobs_grouped_by_running_level[running.value]: - if running == Running.ONCE: - job = _create_job( - expid=expid, - name=job_running['NAME'], - job_data=job_running, - jobs_data=jobs_data) - jobs[job['ID']] = job - else: - for start_date in start_dates: - if running == Running.MEMBER: - for member in members: - job = _create_job( - expid=expid, - name=job_running['NAME'], - member=member, - start_date=start_date, - job_data=job_running, - jobs_data=jobs_data) - jobs[job['ID']] = job - elif running == Running.CHUNK: - for member in members: - for chunk in chunks: - job = _create_job( - expid=expid, - name=job_running['NAME'], - member=member, - chunk=chunk, - start_date=start_date, - job_data=job_running, - jobs_data=jobs_data) - jobs[job['ID']] = job - # TODO: split - else: - # TODO: implement splits and anything else? - raise NotImplementedError(running) - return jobs - - -def generate(options: List[str]) -> None: +def generate(job_list: JobList, options: List[str]) -> None: """Generates a PyFlow workflow using Autosubmit database. + The ``autosubmit create`` command must have been already executed prior + to calling this function. This is so that the jobs are correctly loaded + to produce the PyFlow workflow. + + :param job_list: ``JobList`` Autosubmit object, that contains the parameters, jobs, and graph :param options: a list of strings with arguments (equivalent to sys.argv), passed to argparse """ args: argparse.Namespace = _parse_args(options) - # Init the configuration object where expid = experiment identifier that you want to load. - as_conf = AutosubmitConfig(args.experiment) - # This will load the data from the experiment. - as_conf.reload(True) - - # Autosubmit experiment configuration. - expid = args.experiment - start_dates = as_conf.experiment_data['EXPERIMENT']['DATELIST'].split(' ') - members = as_conf.experiment_data['EXPERIMENT']['MEMBERS'].split(' ') - chunks = [i for i in range(1, as_conf.experiment_data['EXPERIMENT']['NUMCHUNKS'] + 1)] - - # Place the NAME attribute in the job object. - jobs_data: Dict[str, JobData] = { - job_data[0]: {'NAME': job_data[0], **job_data[1]} - for job_data in as_conf.jobs_data.items()} - - # Create a list of jobs. - jobs_list: List[JobData] = list(jobs_data.values()) - jobs_grouped_by_running_level: Dict[str, List[JobData]] = defaultdict(list) - jobs_grouped_by_running_level.update( - {job[0]: list(job[1]) for job in groupby(jobs_list, lambda item: item['RUNNING'])}) - - # TODO: raise an error for unsupported features, like SKIPPABLE? - # Expand the Autosubmit workflow graph. - jobs: Dict[str, JobData] = _expand_autosubmit_graph(jobs_grouped_by_running_level, expid, start_dates, members, - chunks, jobs_data) - - # Create networkx graph. - G = nx.DiGraph() - for job in jobs.values(): - G.add_node(job['ID']) - for dep in job['DEPENDENCIES'].values(): - G.add_edges_from([(dep['ID'], job['ID'])]) - PG = nx.nx_pydot.to_pydot(G) - - if args.graph: - print(PG) - - # Sort the dictionary of jobs in topological order. - jobs_order = list(list(nx.topological_sort(G))) - jobs_ordered: Dict[str, JobData] = dict( - sorted(jobs.items(), key=lambda item: jobs_order.index(item[1]['ID']))) # type: ignore + expid = job_list.expid + start_dates = [d.strftime("%Y%m%d") for d in job_list.get_date_list()] + members = job_list.get_member_list() + chunks = job_list.get_chunk_list() + # TODO: splits? suite = _create_ecflow_suite( experiment_id=expid, start_dates=start_dates, members=members, chunks=chunks, - jobs=jobs_ordered, + jobs=job_list.get_all(), server_host=args.server ) @@ -401,12 +170,3 @@ def generate(options: List[str]) -> None: if args.deploy: suite.deploy_suite(overwrite=True) suite.replace_on_server(host=args.server, port=args.port) - - -def main() -> None: - generate(sys.argv) - sys.exit(0) - - -if __name__ == '__main__': - main() -- GitLab From c197b11408a9c83c8a3c7486adb4e92d6d95d6de Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Wed, 4 Jan 2023 10:41:19 +0100 Subject: [PATCH 04/16] Organize imports, add note about MyPy and PyFlow --- autosubmit/generators/pyflow.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index ffa7f50b7..5490918e1 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -1,14 +1,13 @@ import argparse -from enum import Enum -from itertools import groupby -from typing import List, Dict, Any, TypedDict, Union +import os import re +from enum import Enum +from typing import List -from autosubmit.job.job_list import JobList, Job +import pyflow as pf from pyflow import * -import os -import pyflow as pf +from autosubmit.job.job_list import JobList, Job # Pattern used to verify if a TASK name includes the previous CHUNK number, with a separator. PREVIOUS_CHUNK_PATTERN = re.compile(r''' @@ -31,7 +30,8 @@ class Running(Enum): def __str__(self): return self.value -# TODO: split + +# TODO: split? # Defines how many ``-``'s are replaced by a ``/`` for # each Autosubmit hierarchy level (to avoid using an if/elif/else). REPLACE_COUNT = { @@ -86,17 +86,18 @@ def _create_ecflow_suite( # First we create a suite with the same ID as the Autosubmit experiment, # and families for each Autosubmit hierarchy level. - with Suite( - experiment_id, - host=pf.LocalHost(server_host), - defstatus=pf.state.suspended, - home=outdir, - files=filesdir - ) as s: + # NOTE: PyFlow does not work very well with MyPy: https://github.com/ecmwf/pyflow/issues/5 + with Suite( # typing: ignore + experiment_id, + host=pf.LocalHost(server_host), + defstatus=pf.state.suspended, # type: ignore + home=outdir, # type: ignore + files=filesdir # type: ignore + ) as s: # typing: ignore for start_date in start_dates: with Family(start_date, START_DATE=start_date): # type: ignore for member in members: - with Family(member, MEMBER=member) as m: + with Family(member, MEMBER=member) as m: # type: ignore for chunk in chunks: Family(str(chunk), CHUNK=chunk) # TODO: splits @@ -168,5 +169,5 @@ def generate(job_list: JobList, options: List[str]) -> None: print(suite) if args.deploy: - suite.deploy_suite(overwrite=True) + suite.deploy_suite(overwrite=True) # type: ignore suite.replace_on_server(host=args.server, port=args.port) -- GitLab From ffaa63c3952bd49a4b68466cb25fe8452cd13580 Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Wed, 4 Jan 2023 17:56:02 +0100 Subject: [PATCH 05/16] Create job scripts with envVars --- autosubmit/autosubmit.py | 24 ++++++++++++++-- autosubmit/generators/pyflow.py | 51 ++++++++++++++++++++++++--------- 2 files changed, 58 insertions(+), 17 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index e41c37690..1b4e57350 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -6183,9 +6183,26 @@ class Autosubmit: Log.info("Getting job list...") as_conf = AutosubmitConfig(expid, BasicConfig, YAMLParserFactory()) as_conf.check_conf_files(False) - # Getting output type from configuration - # pkl_dir = os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, 'pkl') + + submitter = Autosubmit._get_submitter(as_conf) + submitter.load_platforms(as_conf) + if len(submitter.platforms) == 0: + raise ValueError('Missing platform!') + + packages_persistence = JobPackagePersistence( + os.path.join(BasicConfig.LOCAL_ROOT_DIR, expid, "pkl"), "job_packages_" + expid) job_list = Autosubmit.load_job_list(expid, as_conf, notransitive=False, monitor=False) + + Autosubmit._load_parameters(as_conf, job_list, submitter.platforms) + + hpc_architecture = as_conf.get_platform() + for job in job_list.get_job_list(): + if job.platform_name is None or job.platform_name == '': + job.platform_name = hpc_architecture + job.platform = submitter.platforms[job.platform_name] + job.update_parameters(as_conf, job_list.parameters) + + job_list.check_scripts(as_conf) except AutosubmitError as e: raise AutosubmitCritical(e.message, e.code, e.trace) except AutosubmitCritical as e: @@ -6193,5 +6210,6 @@ class Autosubmit: except BaseException as e: raise AutosubmitCritical("Error while checking the configuration files or loading the job_list", 7040, str(e)) - get_engine_generator(engine)(job_list, [f'--experiment={expid}', *options]) + + get_engine_generator(engine)(job_list, as_conf, [f'--experiment={expid}', *options]) diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index 5490918e1..185c9b2d0 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -1,6 +1,7 @@ import argparse import os import re +import tempfile from enum import Enum from typing import List @@ -55,6 +56,7 @@ def _parse_args(args) -> argparse.Namespace: ) parser.add_argument('-e', '--experiment', required=True, help='Autosubmit experiment ID') parser.add_argument('-d', '--deploy', default=False, action='store_true', help='Deploy to ecFlow or not') + parser.add_argument('-o', '--output', default=tempfile.gettempdir(), help='Output directory') parser.add_argument('-s', '--server', default='localhost', help='ecFlow server hostname or IP (only used if deploy=True)') parser.add_argument('-p', '--port', default=3141, help='ecFlow server port (only used if deploy=True)') @@ -70,19 +72,20 @@ def _create_ecflow_suite( members: List[str], chunks: [int], jobs: List[Job], - server_host: str) -> Suite: + server_host: str, + output_dir: str) -> Suite: """Replicate the vanilla workflow graph structure.""" # From: https://pyflow-workflow-generator.readthedocs.io/en/latest/content/introductory-course/getting-started.html - scratchdir = os.path.join(os.path.abspath(''), 'scratch') - filesdir = os.path.join(scratchdir, 'files') - outdir = os.path.join(scratchdir, 'out') + scratch_dir = os.path.join(os.path.abspath(output_dir), 'scratch') + files_dir = os.path.join(scratch_dir, 'files') + out_dir = os.path.join(scratch_dir, 'out') - if not os.path.exists(filesdir): - os.makedirs(filesdir, exist_ok=True) + if not os.path.exists(files_dir): + os.makedirs(files_dir, exist_ok=True) - if not os.path.exists(outdir): - os.makedirs(outdir, exist_ok=True) + if not os.path.exists(out_dir): + os.makedirs(out_dir, exist_ok=True) # First we create a suite with the same ID as the Autosubmit experiment, # and families for each Autosubmit hierarchy level. @@ -91,15 +94,15 @@ def _create_ecflow_suite( experiment_id, host=pf.LocalHost(server_host), defstatus=pf.state.suspended, # type: ignore - home=outdir, # type: ignore - files=filesdir # type: ignore + home=out_dir, # type: ignore + files=files_dir # type: ignore ) as s: # typing: ignore for start_date in start_dates: - with Family(start_date, START_DATE=start_date): # type: ignore + with AnchorFamily(start_date, START_DATE=start_date): # type: ignore for member in members: - with Family(member, MEMBER=member) as m: # type: ignore + with AnchorFamily(member, MEMBER=member) as m: # type: ignore for chunk in chunks: - Family(str(chunk), CHUNK=chunk) + AnchorFamily(str(chunk), CHUNK=chunk) # TODO: splits # PyFlow API makes it very easy to create tasks having the ecFlow ID. # Due to how we expanded the Autosubmit graph to include the ID's, and how @@ -124,6 +127,7 @@ def _create_ecflow_suite( if t.name not in list(parent_node.children.mapping.keys()): parent_node.add_node(t) + # Dependencies for parent in job.parents: dependency_node = _autosubmit_id_to_ecflow_id(parent.long_name, parent.running) parent_node = s @@ -134,6 +138,24 @@ def _create_ecflow_suite( # Operator overloaded in PyFlow. This creates a dependency. dependency_node >> t + # Script + # N.B.: The PyFlow documentation states that it is recommended + # to minimize the number of variables exposed to scripts. We + # are exposing every parameter available in AS, which is not + # really recommended. Maybe there is a better way? + # Note too, that we need to use ``job.file``, not ``job.script_name``, + # as ``script_name`` is the final generated AS script name, not the + # job script from the job configuration. + autosubmit_project_script = os.path.join(files_dir, job.file) + script = pf.FileScript(autosubmit_project_script) + + for key, value in [(k, v) for k, v in job.parameters.items() if k.isupper()]: + script.define_environment_variable(key, value) + for variable in job.undefined_variables: + script.define_environment_variable(variable, '') + t.script = script + + return s @@ -161,7 +183,8 @@ def generate(job_list: JobList, options: List[str]) -> None: members=members, chunks=chunks, jobs=job_list.get_all(), - server_host=args.server + server_host=args.server, + output_dir=args.output ) suite.check_definition() -- GitLab From 7df1d47d1cd07aed4ba447e86db6a050c7b7515d Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Wed, 4 Jan 2023 18:38:58 +0100 Subject: [PATCH 06/16] Insert the Autosubmit script (minus templates) --- autosubmit/autosubmit.py | 4 +-- autosubmit/generators/__init__.py | 1 + autosubmit/generators/pyflow.py | 45 +++++++++++++++++-------------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 1b4e57350..0ab372df4 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -6176,8 +6176,8 @@ class Autosubmit: @staticmethod def generate_workflow(expid: str, engine: Engine, options: List[str]) -> None: - """Generate the workflow configuration for a different Workflow Manager engine.""" - Log.info(f'Generated workflow configuration for {engine}') + """Generate the workflow configuration for a different backend engine.""" + Log.info(f'Generate workflow configuration for {engine}') try: Log.info("Getting job list...") diff --git a/autosubmit/generators/__init__.py b/autosubmit/generators/__init__.py index 8e2a9ca87..a2ab20ea7 100644 --- a/autosubmit/generators/__init__.py +++ b/autosubmit/generators/__init__.py @@ -2,6 +2,7 @@ from enum import Enum from importlib import import_module from typing import Callable, cast, Protocol +"""This module provides generators to produce workflow configurations for different backend engines.""" class Engine(Enum): """Workflow Manager engine flavors.""" diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index 185c9b2d0..128f8726b 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -6,10 +6,13 @@ from enum import Enum from typing import List import pyflow as pf +from autosubmitconfigparser.config.configcommon import AutosubmitConfig from pyflow import * from autosubmit.job.job_list import JobList, Job +"""The PyFlow generator for Autosubmit.""" + # Pattern used to verify if a TASK name includes the previous CHUNK number, with a separator. PREVIOUS_CHUNK_PATTERN = re.compile(r''' ([a-zA-Z0-9_\-\.]+) # The Task name (e.g. TASK); @@ -17,6 +20,7 @@ PREVIOUS_CHUNK_PATTERN = re.compile(r''' ([\d]+) # The Chunk name (e.g. 1). ''', re.X) + # Autosubmit Task name separator (not to be confused with task and chunk name separator). DEFAULT_SEPARATOR = '_' @@ -42,7 +46,7 @@ REPLACE_COUNT = { } -def _autosubmit_id_to_ecflow_id(job_id, running): +def _autosubmit_id_to_ecflow_id(job_id: str, running: str): """Given an Autosubmit ID, create the node ID for ecFlow (minus heading ``/``).""" replace_count = REPLACE_COUNT[running] return job_id.replace(DEFAULT_SEPARATOR, '/', replace_count) @@ -73,7 +77,8 @@ def _create_ecflow_suite( chunks: [int], jobs: List[Job], server_host: str, - output_dir: str) -> Suite: + output_dir: str, + as_conf: AutosubmitConfig) -> Suite: """Replicate the vanilla workflow graph structure.""" # From: https://pyflow-workflow-generator.readthedocs.io/en/latest/content/introductory-course/getting-started.html @@ -100,7 +105,7 @@ def _create_ecflow_suite( for start_date in start_dates: with AnchorFamily(start_date, START_DATE=start_date): # type: ignore for member in members: - with AnchorFamily(member, MEMBER=member) as m: # type: ignore + with AnchorFamily(member, MEMBER=member): # type: ignore for chunk in chunks: AnchorFamily(str(chunk), CHUNK=chunk) # TODO: splits @@ -139,27 +144,20 @@ def _create_ecflow_suite( dependency_node >> t # Script - # N.B.: The PyFlow documentation states that it is recommended - # to minimize the number of variables exposed to scripts. We - # are exposing every parameter available in AS, which is not - # really recommended. Maybe there is a better way? - # Note too, that we need to use ``job.file``, not ``job.script_name``, - # as ``script_name`` is the final generated AS script name, not the - # job script from the job configuration. - autosubmit_project_script = os.path.join(files_dir, job.file) - script = pf.FileScript(autosubmit_project_script) - - for key, value in [(k, v) for k, v in job.parameters.items() if k.isupper()]: - script.define_environment_variable(key, value) - for variable in job.undefined_variables: - script.define_environment_variable(variable, '') - t.script = script + script_name = job.create_script(as_conf) + script_text = open(os.path.join(job._tmp_path, script_name)).read() + # Let's drop the Autosubmit header and tailed. + script_text = re.findall( + r'# Autosubmit job(.*)# Autosubmit tailer', + script_text, + flags=re.DOTALL | re.MULTILINE)[0][1:-1] + t.script = script_text return s -def generate(job_list: JobList, options: List[str]) -> None: +def generate(job_list: JobList, as_conf: AutosubmitConfig, options: List[str]) -> None: """Generates a PyFlow workflow using Autosubmit database. The ``autosubmit create`` command must have been already executed prior @@ -167,6 +165,7 @@ def generate(job_list: JobList, options: List[str]) -> None: to produce the PyFlow workflow. :param job_list: ``JobList`` Autosubmit object, that contains the parameters, jobs, and graph + :param as_conf: Autosubmit configuration :param options: a list of strings with arguments (equivalent to sys.argv), passed to argparse """ args: argparse.Namespace = _parse_args(options) @@ -184,7 +183,8 @@ def generate(job_list: JobList, options: List[str]) -> None: chunks=chunks, jobs=job_list.get_all(), server_host=args.server, - output_dir=args.output + output_dir=args.output, + as_conf=as_conf ) suite.check_definition() @@ -194,3 +194,8 @@ def generate(job_list: JobList, options: List[str]) -> None: if args.deploy: suite.deploy_suite(overwrite=True) # type: ignore suite.replace_on_server(host=args.server, port=args.port) + + +__all__ = [ + 'generate' +] -- GitLab From 6aae04bf72404e1f431a99057d5bc5940d641210 Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Wed, 18 Jan 2023 10:45:22 +0100 Subject: [PATCH 07/16] Add splits --- autosubmit/generators/pyflow.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index 128f8726b..646474f79 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -36,7 +36,6 @@ class Running(Enum): return self.value -# TODO: split? # Defines how many ``-``'s are replaced by a ``/`` for # each Autosubmit hierarchy level (to avoid using an if/elif/else). REPLACE_COUNT = { @@ -108,7 +107,6 @@ def _create_ecflow_suite( with AnchorFamily(member, MEMBER=member): # type: ignore for chunk in chunks: AnchorFamily(str(chunk), CHUNK=chunk) - # TODO: splits # PyFlow API makes it very easy to create tasks having the ecFlow ID. # Due to how we expanded the Autosubmit graph to include the ID's, and how # we structured this suite, an Autosubmit ID can be seamlessly translated @@ -119,7 +117,10 @@ def _create_ecflow_suite( # in ecFlow, `a000_20220401_fc0_INI` is `a000/20220401/fc0/INI`, and so on. for job in jobs: ecflow_node = _autosubmit_id_to_ecflow_id(job.long_name, job.running) - t = Task(job.section) + if job.split is not None: + t = Task(f'{job.section}_{job.split}', SPLIT=job.split) + else: + t = Task(job.section) # Find the direct parent of the task, based on the Autosubmit task ID. # Start from the Suite, and skip the first (suite), and the last (task) @@ -174,7 +175,6 @@ def generate(job_list: JobList, as_conf: AutosubmitConfig, options: List[str]) - start_dates = [d.strftime("%Y%m%d") for d in job_list.get_date_list()] members = job_list.get_member_list() chunks = job_list.get_chunk_list() - # TODO: splits? suite = _create_ecflow_suite( experiment_id=expid, -- GitLab From bae6e999bc5d43201f4671b6b8c45001a575da0e Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Wed, 1 Feb 2023 12:59:52 +0100 Subject: [PATCH 08/16] Prevent error running autosubmit with no arg --- autosubmit/autosubmit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 0ab372df4..7c101a56e 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -649,7 +649,7 @@ class Autosubmit: args = parser.parse_args() - if len(sys.argv[1]) > 1 and sys.argv[1] in ['generate']: + if len(sys.argv) > 1 and len(sys.argv[1]) > 1 and sys.argv[1] in ['generate']: args, options = parser.parse_known_args() else: options = [] -- GitLab From d0699bd7c1e773b68ba4887b8c877bc3168d5e2b Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Wed, 1 Feb 2023 13:00:02 +0100 Subject: [PATCH 09/16] Add some more comments/doc --- autosubmit/generators/pyflow.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index 646474f79..fdb1f43aa 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -13,6 +13,10 @@ from autosubmit.job.job_list import JobList, Job """The PyFlow generator for Autosubmit.""" +# N.B.: Avoid causing conflicts with variables defined in ecFlow, such +# as ECF_FILES, ECF_HOME, SUITE, DAY, MONTH, FAMILY, etc. +# Ref: https://ecflow.readthedocs.io/en/latest/ug/user_manual/ecflow_variables/generated_variables.html#generated-variables + # Pattern used to verify if a TASK name includes the previous CHUNK number, with a separator. PREVIOUS_CHUNK_PATTERN = re.compile(r''' ([a-zA-Z0-9_\-\.]+) # The Task name (e.g. TASK); @@ -81,8 +85,11 @@ def _create_ecflow_suite( """Replicate the vanilla workflow graph structure.""" # From: https://pyflow-workflow-generator.readthedocs.io/en/latest/content/introductory-course/getting-started.html + # /scratch is a base directory for ECF_FILES and ECF_HOME scratch_dir = os.path.join(os.path.abspath(output_dir), 'scratch') + # /scratch/files is the ECF_FILES, where ecflow_server looks for ecf scripts if they are not in their default location files_dir = os.path.join(scratch_dir, 'files') + # /scratch/out is the ECF_HOME, the home of all ecFlow files, $CWD out_dir = os.path.join(scratch_dir, 'out') if not os.path.exists(files_dir): @@ -92,7 +99,10 @@ def _create_ecflow_suite( os.makedirs(out_dir, exist_ok=True) # First we create a suite with the same ID as the Autosubmit experiment, - # and families for each Autosubmit hierarchy level. + # and families for each Autosubmit hierarchy level. We use control variables + # such as home (ECF_HOME), and files (ECF_FILES), but there are others that + # can be used too, like include (ECF_INCLUDE), out (ECF_OUT), and extn + # (ECF_EXTN, defaults to the extension .ecf). # NOTE: PyFlow does not work very well with MyPy: https://github.com/ecmwf/pyflow/issues/5 with Suite( # typing: ignore experiment_id, -- GitLab From ae019ff3e4c1f223493e35d640b2931b4ef0c0dd Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Mon, 6 Feb 2023 11:54:33 +0100 Subject: [PATCH 10/16] Make the code Python 3.7 compatible and add docs --- .gitignore | 1 + autosubmit/generators/__init__.py | 5 +- autosubmit/generators/pyflow.py | 3 +- docs/source/index.rst | 3 +- docs/source/qstartguide/index.rst | 2 + docs/source/userguide/generate/index.rst | 114 +++++++++++++++++++++++ 6 files changed, 124 insertions(+), 4 deletions(-) create mode 100644 docs/source/userguide/generate/index.rst diff --git a/.gitignore b/.gitignore index 40e7cb3ec..62eb691d7 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ autosubmit/simple_test.py .idea/ autosubmit.egg-info/ docs/build/ +docs/_build/ dist/ build/ .cache diff --git a/autosubmit/generators/__init__.py b/autosubmit/generators/__init__.py index a2ab20ea7..513c9e582 100644 --- a/autosubmit/generators/__init__.py +++ b/autosubmit/generators/__init__.py @@ -1,6 +1,6 @@ from enum import Enum from importlib import import_module -from typing import Callable, cast, Protocol +from typing import Callable, cast """This module provides generators to produce workflow configurations for different backend engines.""" @@ -12,7 +12,8 @@ class Engine(Enum): return self.value -class GenerateProto(Protocol): +# TODO: use typing.Protocol instead of object when Py>=3.8 +class GenerateProto(object): """Need a protocol to define the type returned by importlib.""" generate: Callable diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index fdb1f43aa..7b69e03ed 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -140,7 +140,8 @@ def _create_ecflow_suite( parent_node = parent_node[node] # We just need to prevent adding a node twice since creating a task automatically adds # it to the suite in the context. And simply call ``add_node`` and we should have it. - if t.name not in list(parent_node.children.mapping.keys()): + # TODO: use mapping.keys when Py>=3.8 or 3.9? if t.name not in list(parent_node.children.mapping.keys()): + if t.name not in [child.name for child in parent_node.children]: parent_node.add_node(t) # Dependencies diff --git a/docs/source/index.rst b/docs/source/index.rst index c26993703..8f6e16905 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -45,6 +45,7 @@ Welcome to autosubmit's documentation! /userguide/variables /userguide/expids /userguide/provenance + /userguide/generate/index .. toctree:: :caption: Database Documentation @@ -92,4 +93,4 @@ Resource Management Autosubmit supports a per-platform configuration, allowing users to run their experiments without adapting job scripts. Multiple Platform - Autosubmit can run jobs of an experiment in different platforms \ No newline at end of file + Autosubmit can run jobs of an experiment in different platforms diff --git a/docs/source/qstartguide/index.rst b/docs/source/qstartguide/index.rst index c99973882..5c619d9b9 100644 --- a/docs/source/qstartguide/index.rst +++ b/docs/source/qstartguide/index.rst @@ -57,6 +57,8 @@ Description of most used commands - Recovers the experiment workflow obtaining the last run complete jobs. * - **setstatus ** - Sets one or multiple jobs status to a given value. + * - **generate ** + - Generate workflow configuration for a different workflow backend engine (e.g. PyFlow/ecFlow). Create a new experiment diff --git a/docs/source/userguide/generate/index.rst b/docs/source/userguide/generate/index.rst new file mode 100644 index 000000000..2109c57ec --- /dev/null +++ b/docs/source/userguide/generate/index.rst @@ -0,0 +1,114 @@ +Generate Workflow Configuration +=============================== + +By default, Autosubmit produces an internal representation of a workflow to run +the experiment created and configured. With the ``autosubmit generate`` subcommand, +it is possible to generate an external configuration of the same workflow in a +different syntax, for a different workflow backend engine. + +At the moment, the only workflow engine supported is `ecFlow `_. + +Prerequisites +------------- + +Before running ``autosubmit generate``, you **must** have executed ``autosubmit create``. +This is important as the ``create`` subcommand produces the information necessary to have +a workflow graph. This graph is traversed by the ``generate`` subcommand to produce a new +graph for a different workflow engine. + +How to generate workflow configuration +-------------------------------------- + +The command syntax is: + +.. code-block:: bash + + autosubmit generate --engine args... + +PyFlow / ecFlow +~~~~~~~~~~~~~~~ + +`PyFlow `_ is a Python utility developed +by the ECMWF to generate workflow configuration for workflow engines. At the moment its only +output workflow configuration is for ecFlow, an ECMWF workflow manager. + +.. code-block:: bash + :caption: Command usage to generate PyFlow/ecFlow configuration + + autosubmit generate \ + --generate {pyflow} \ + --server= \ + --output= \ + [--quiet --deploy --port=] + +For PyFlow/ecFlow, the required parameters are the ``--server`` where the workflow +will run, and the ``--output`` with the directory to write the ecFlow generated +files. If you enable ``--deploy``, it will call code from PyFlow to deploy it to +ecFlow. For this option, you will also have to specify ``--port``. + +To reduce verbosity of the command, you can specify ``--quiet``, although that does not +guarantee complete the command will not output anything — as it calls other modules. + +Scripts preprocessing +--------------------- + +One important thing to keep in mind when generating workflow configurations for different +workflow engines, is the use of preprocessing of script templates. + +Autosubmit, as many other workflow managers, offers a variable substitution (interpolation) +that is used to preprocess task scripts. For example: + +.. code-block:: bash + :caption: Task script that require Autosubmit preprocessing + + echo "The root dir is %ROOTDIR%" + +The ``ROOTDIR`` variable is :doc:`replaced by Autosubmit `, before Bash shell executes the script +(i.e. it is not an environment variable). The ``ROOTDIR`` is a variable provided by the +Autosubmit runtime, that may exist in other workflow managers, but it may have a different +name. + +This is a problem for the portability of the generated scripts. A recommended workaround +for this issue is to use a single script that defines the variables used by the workflow +tasks. For example, a file called ``prepare_environment.sh``: + +.. code-block:: bash + :caption: ``prepare_environment.sh`` for Autosubmit + + ROOTDIR=%ROOTDIR% + CHUNK=%CHUNK% + +This script will have to ``source`` that script in your Bash scripts, like so: + +.. code-block:: bash + :caption: Task script that does not require Autosubmit preprocessing + + #!/bin/bash + set -xuve + + source prepare_environment.sh + + echo "The root dir is ${ROOTDIR}" + +The idea of this approach is to reduce the necessary modifications when porting +the workflow from Autosubmit to a different workflow engine. In contrast, if you +used the Autosubmit variables in all your template files, that means that when +porting to a different workflow engine you would have to ``a)`` adjust every +script to use the correct variables, or ``b)`` preprocess the scripts with +Autosubmit assuming you have an identical target platform, or ``c)`` change the +generated workflow configuration manually. + +In the case of PyFlow/ecFlow, for instance, the ``prepare_environment.sh`` file +would have to be updated to use the correct variable substitution syntax and the +correct ecFlow variable. For example: + +.. code-block:: bash + :caption: ``prepare_environment.sh`` modified for ecFlow + + ROOTDIR=%ECF_HOME% # ECF_HOME is a possible replacement for ROOTDIR + CHUNK=%CHUNK% # CHUNK is set by the generate subcommand via PyFlow + +.. note:: Note + Autosubmit and ecFlow have similar syntax for the variables that are preprocessed, + using ``%`` to wrap the variables. However, this may not be always the case. You can + find workflow managers that use other symbols, or Jinja, YAML input files, etc. -- GitLab From d995dc64e53fafe89725b4f6b1c4f6bacb519618 Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Mon, 6 Feb 2023 15:32:58 +0100 Subject: [PATCH 11/16] Add unit tests for PyFlow, and remove Running.SPLIT as SPLIT is part of a CHUNK. --- autosubmit/generators/pyflow.py | 23 +------- test/unit/generators/test_pyflow.py | 91 +++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 22 deletions(-) create mode 100644 test/unit/generators/test_pyflow.py diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index 7b69e03ed..dec938299 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -1,6 +1,5 @@ import argparse import os -import re import tempfile from enum import Enum from typing import List @@ -13,18 +12,6 @@ from autosubmit.job.job_list import JobList, Job """The PyFlow generator for Autosubmit.""" -# N.B.: Avoid causing conflicts with variables defined in ecFlow, such -# as ECF_FILES, ECF_HOME, SUITE, DAY, MONTH, FAMILY, etc. -# Ref: https://ecflow.readthedocs.io/en/latest/ug/user_manual/ecflow_variables/generated_variables.html#generated-variables - -# Pattern used to verify if a TASK name includes the previous CHUNK number, with a separator. -PREVIOUS_CHUNK_PATTERN = re.compile(r''' - ([a-zA-Z0-9_\-\.]+) # The Task name (e.g. TASK); - - # TASK and CHUNK separator, i.e. TASK-1 (the hyphen between TASK and 1); - ([\d]+) # The Chunk name (e.g. 1). -''', re.X) - - # Autosubmit Task name separator (not to be confused with task and chunk name separator). DEFAULT_SEPARATOR = '_' @@ -34,7 +21,6 @@ class Running(Enum): ONCE = 'once' MEMBER = 'member' CHUNK = 'chunk' - SPLIT = 'split' def __str__(self): return self.value @@ -156,14 +142,7 @@ def _create_ecflow_suite( dependency_node >> t # Script - script_name = job.create_script(as_conf) - script_text = open(os.path.join(job._tmp_path, script_name)).read() - # Let's drop the Autosubmit header and tailed. - script_text = re.findall( - r'# Autosubmit job(.*)# Autosubmit tailer', - script_text, - flags=re.DOTALL | re.MULTILINE)[0][1:-1] - t.script = script_text + t.script = job.file return s diff --git a/test/unit/generators/test_pyflow.py b/test/unit/generators/test_pyflow.py new file mode 100644 index 000000000..8b8c79046 --- /dev/null +++ b/test/unit/generators/test_pyflow.py @@ -0,0 +1,91 @@ +from unittest import TestCase +from unittest.mock import MagicMock, patch +from autosubmit.generators.pyflow import generate, Running +from pyflow import Suite +from tempfile import TemporaryDirectory +from datetime import datetime +from autosubmit.job.job import Job, Status + + +class TestPyFlow(TestCase): + + def setUp(self) -> None: + self.suite_name = 'a000' + + def _create_job(self, section, name, status, running, split=None): + """Create an Autosubmit job with pre-defined values.""" + # TODO: maybe suggest a kwags approach in the constructor to expand to local vars? + job = Job(name, 0, status, 0) + job.section = section + job.running = str(running) + job.split = split + job.file = 'templates/script.sh' + return job + + def _get_jobs(self): + """Create list of Autosubmit jobs. For these tests we use a very simple experiment workflow.""" + ini_job = self._create_job('INI', 'a000_INI', Status.COMPLETED, Running.ONCE) + prep_job = self._create_job('PREP', 'a000_20000101_fc0_PREP', Status.READY, Running.MEMBER) + prep_job.parents = {ini_job} + sim_job_1 = self._create_job('SIM', 'a000_20000101_fc0_1_1_SIM', Status.QUEUING, Running.CHUNK, 1) + sim_job_1.parent = {prep_job} + sim_job_2 = self._create_job('SIM', 'a000_20000101_fc0_1_2_SIM', Status.QUEUING, Running.CHUNK, 2) + sim_job_2.parent = {prep_job, sim_job_1} + return [ini_job, prep_job, sim_job_1, sim_job_2] + + def _create_job_list(self, expid, dates=None, members=None, chunks=None, empty=False): + if dates is None: + dates = [] + if members is None: + members = [] + if chunks is None: + chunks = [] + job_list = MagicMock(expid=expid) + job_list.get_date_list.return_value = dates + job_list.get_member_list.return_value = members + job_list.get_chunk_list.return_value = chunks + job_list.get_all.return_value = [] if empty is True else self._get_jobs() + return job_list + + def test_generate(self): + with TemporaryDirectory() as temp_out_dir: + tests = [ + { + 'job_list': self._create_job_list('a000', [datetime(2000, 1, 1)], ['fc0'], ['1']), + 'as_conf': None, + 'options': ['-e', 'a000', '-o', temp_out_dir, '-s', 'localhost'], + 'expected_error': None + }, + { + 'job_list': self._create_job_list('a000', [datetime(2000, 1, 1)], ['fc0'], ['1']), + 'as_conf': None, + 'options': ['-e', 'a000', '-o', temp_out_dir, '-s', 'localhost', '--quiet'], + 'expected_error': None + }, + { + 'job_list': self._create_job_list('a001', [], [], [], empty=True), + 'as_conf': None, + 'options': ['-e', 'a001', '-o', temp_out_dir, '-s', 'localhost', '--quiet'], + 'expected_error': None + }, + { + 'job_list': self._create_job_list('a002', [], [], [], empty=True), + 'as_conf': None, + 'options': ['-e', 'a002', '-o', None, '-s', 'localhost', '--quiet'], + 'expected_error': TypeError + } + ] + for test in tests: + job_list = test['job_list'] + as_conf = test['as_conf'] + options = test['options'] + expected_error = test['expected_error'] + + if expected_error is not None: + try: + generate(job_list, as_conf, options) + self.fail('Test case expected to fail') + except: + pass + else: + generate(job_list, as_conf, options) -- GitLab From 05785eb93eb12d9be0d3e2f210bf12eef5976d1f Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Mon, 20 Feb 2023 17:50:24 +0100 Subject: [PATCH 12/16] Use pathlib, load script for pre-processing --- autosubmit/generators/pyflow.py | 52 +++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index dec938299..ec018ae89 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -1,11 +1,12 @@ import argparse -import os import tempfile from enum import Enum +from pathlib import Path from typing import List import pyflow as pf from autosubmitconfigparser.config.configcommon import AutosubmitConfig +from autosubmitconfigparser.config.basicconfig import BasicConfig from pyflow import * from autosubmit.job.job_list import JobList, Job @@ -72,17 +73,17 @@ def _create_ecflow_suite( # From: https://pyflow-workflow-generator.readthedocs.io/en/latest/content/introductory-course/getting-started.html # /scratch is a base directory for ECF_FILES and ECF_HOME - scratch_dir = os.path.join(os.path.abspath(output_dir), 'scratch') + scratch_dir = Path(Path(output_dir).absolute(), 'scratch') # /scratch/files is the ECF_FILES, where ecflow_server looks for ecf scripts if they are not in their default location - files_dir = os.path.join(scratch_dir, 'files') + files_dir = scratch_dir / 'files' # /scratch/out is the ECF_HOME, the home of all ecFlow files, $CWD - out_dir = os.path.join(scratch_dir, 'out') + out_dir = scratch_dir / 'out' - if not os.path.exists(files_dir): - os.makedirs(files_dir, exist_ok=True) + if not files_dir.exists(): + files_dir.mkdir(parents=True, exist_ok=True) - if not os.path.exists(out_dir): - os.makedirs(out_dir, exist_ok=True) + if not out_dir.exists(): + out_dir.mkdir(parents=True, exist_ok=True) # First we create a suite with the same ID as the Autosubmit experiment, # and families for each Autosubmit hierarchy level. We use control variables @@ -94,8 +95,8 @@ def _create_ecflow_suite( experiment_id, host=pf.LocalHost(server_host), defstatus=pf.state.suspended, # type: ignore - home=out_dir, # type: ignore - files=files_dir # type: ignore + home=str(out_dir), # type: ignore + files=str(files_dir) # type: ignore ) as s: # typing: ignore for start_date in start_dates: with AnchorFamily(start_date, START_DATE=start_date): # type: ignore @@ -138,12 +139,39 @@ def _create_ecflow_suite( parent_node = parent_node[node] dependency_node = parent_node[parent.section] + # In case we ever need to use the pre-processed file. + # + # script_name = job.create_script(as_conf) + # script_text = open(Path(job._tmp_path, script_name)).read() + # # Let's drop the Autosubmit header and tailed. + # script_text = re.findall( + # r'# Autosubmit job(.*)# Autosubmit tailer', + # script_text, + # flags=re.DOTALL | re.MULTILINE)[0][1:-1] + # t.script = script_text + # Operator overloaded in PyFlow. This creates a dependency. dependency_node >> t # Script - t.script = job.file - + # N.B.: We used the code below in the beginning, but later we realized it would + # not work. In Autosubmit, the FILE: $FILE is a file, relative to the AS + # Project folder. The $FILE template script is then pre-processed to be + # executed by AS. That is different than ecFlow, where the Task Script is + # value is pre-processed (i.e. if the value is ``templates/local_script.sh`` + # that value is treated as a string when included in the final job file) + # to generate the ecFlow job file. So ecFlow pre-processes the ``.script`` + # value, whereas Autosubmit loads the ``FILE`` script and pre-processes it. + # + # In order to have a similar behavior, we insert the contents of the AS + # template script as the ecFlow task. That way, the template script (now + # a Task Script in ecFlow) will be pre-processed by ecFlow. + # + # The variables may still need to be manually adjusted, but once that is + # done, the script should then be ready to be executed (i.e. ported). + # t.script = job.file + with open(Path(as_conf.get_local_project_path(), job.file)) as f: + t.script = f.read() return s -- GitLab From f0ca28617c3a5485267d22ffc0f700ced30c2184 Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Mon, 27 Feb 2023 11:35:00 +0100 Subject: [PATCH 13/16] Fix Sphinx build (location of variables page) --- docs/source/userguide/generate/index.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/userguide/generate/index.rst b/docs/source/userguide/generate/index.rst index 2109c57ec..42b9cc28d 100644 --- a/docs/source/userguide/generate/index.rst +++ b/docs/source/userguide/generate/index.rst @@ -63,7 +63,7 @@ that is used to preprocess task scripts. For example: echo "The root dir is %ROOTDIR%" -The ``ROOTDIR`` variable is :doc:`replaced by Autosubmit `, before Bash shell executes the script +The ``ROOTDIR`` variable is :doc:`replaced by Autosubmit `, before Bash shell executes the script (i.e. it is not an environment variable). The ``ROOTDIR`` is a variable provided by the Autosubmit runtime, that may exist in other workflow managers, but it may have a different name. -- GitLab From ba69fb4e38e9774f3ccbdaa849e82dae32ff0580 Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Wed, 11 Oct 2023 16:48:25 +0200 Subject: [PATCH 14/16] Add pyflow-workflow-generator dependency, fix issue parsing args --- autosubmit/autosubmit.py | 2 -- requeriments.txt | 1 + setup.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/autosubmit/autosubmit.py b/autosubmit/autosubmit.py index 7c101a56e..c8b8c58e1 100644 --- a/autosubmit/autosubmit.py +++ b/autosubmit/autosubmit.py @@ -647,8 +647,6 @@ class Autosubmit: help='The target Workflow Manager engine', choices=[engine.value for engine in Engine]) subparser.add_argument('args', nargs='?') - args = parser.parse_args() - if len(sys.argv) > 1 and len(sys.argv[1]) > 1 and sys.argv[1] in ['generate']: args, options = parser.parse_known_args() else: diff --git a/requeriments.txt b/requeriments.txt index d357f39dd..f63354fd1 100644 --- a/requeriments.txt +++ b/requeriments.txt @@ -31,3 +31,4 @@ typing>=3.7 wheel psutil rocrate==0.* +pyflow-workflow-generator diff --git a/setup.py b/setup.py index 7ad4b3409..d68bef11a 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ setup( url='http://www.bsc.es/projects/earthscience/autosubmit/', download_url='https://earth.bsc.es/wiki/doku.php?id=tools:autosubmit', keywords=['climate', 'weather', 'workflow', 'HPC'], - install_requires=['ruamel.yaml==0.17.21','cython','autosubmitconfigparser','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','argparse>=1.4.0','python-dateutil>=2.8.2','matplotlib<3.6','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2,<=2.7.0','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','pythondialog','pytest','nose','coverage','PyNaCl>=1.5.0','Pygments','psutil','rocrate==0.*'], + install_requires=['ruamel.yaml==0.17.21','cython','autosubmitconfigparser','bcrypt>=3.2','packaging>19','six>=1.10.0','configobj>=5.0.6','argparse>=1.4.0','python-dateutil>=2.8.2','matplotlib<3.6','py3dotplus>=1.1.0','pyparsing>=3.0.7','paramiko>=2.9.2','mock>=4.0.3','portalocker>=2.3.2,<=2.7.0','networkx==2.6.3','requests>=2.27.1','bscearth.utils>=0.5.2','cryptography>=36.0.1','setuptools>=60.8.2','xlib>=0.21','pip>=22.0.3','pythondialog','pytest','nose','coverage','PyNaCl>=1.5.0','Pygments','psutil','rocrate==0.*','pyflow-workflow-generator'], classifiers=[ "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.9", -- GitLab From 652fbcbdc0e7b80db1dfe54f77516e7f2bc640ed Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Mon, 16 Oct 2023 14:48:40 +0200 Subject: [PATCH 15/16] Fix the generator so it works with any type of project, not only LOCAL --- autosubmit/generators/pyflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index ec018ae89..fcbd48957 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -157,7 +157,7 @@ def _create_ecflow_suite( # N.B.: We used the code below in the beginning, but later we realized it would # not work. In Autosubmit, the FILE: $FILE is a file, relative to the AS # Project folder. The $FILE template script is then pre-processed to be - # executed by AS. That is different than ecFlow, where the Task Script is + # executed by AS. That is different from ecFlow, where the Task Script is # value is pre-processed (i.e. if the value is ``templates/local_script.sh`` # that value is treated as a string when included in the final job file) # to generate the ecFlow job file. So ecFlow pre-processes the ``.script`` @@ -170,7 +170,7 @@ def _create_ecflow_suite( # The variables may still need to be manually adjusted, but once that is # done, the script should then be ready to be executed (i.e. ported). # t.script = job.file - with open(Path(as_conf.get_local_project_path(), job.file)) as f: + with open(Path(as_conf.get_project_dir(), job.file)) as f: t.script = f.read() return s -- GitLab From c665c9e6da460f1aa38fc3e03640b6a5d8200e4c Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Mon, 13 Nov 2023 16:18:34 +0100 Subject: [PATCH 16/16] Handle splits and use a sleep 5 for now to render workflows --- autosubmit/generators/pyflow.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/autosubmit/generators/pyflow.py b/autosubmit/generators/pyflow.py index fcbd48957..5ba972d99 100644 --- a/autosubmit/generators/pyflow.py +++ b/autosubmit/generators/pyflow.py @@ -114,7 +114,7 @@ def _create_ecflow_suite( # in ecFlow, `a000_20220401_fc0_INI` is `a000/20220401/fc0/INI`, and so on. for job in jobs: ecflow_node = _autosubmit_id_to_ecflow_id(job.long_name, job.running) - if job.split is not None: + if job.split is not None and job.split > 0: t = Task(f'{job.section}_{job.split}', SPLIT=job.split) else: t = Task(job.section) @@ -137,7 +137,10 @@ def _create_ecflow_suite( parent_node = s for node in dependency_node.split('/')[1:-1]: parent_node = parent_node[node] - dependency_node = parent_node[parent.section] + parent_key = parent.section + if parent.split is not None and parent.split > 0: + parent_key = f'{parent_key}_{parent.split}' + dependency_node = parent_node[parent_key] # In case we ever need to use the pre-processed file. # @@ -169,9 +172,11 @@ def _create_ecflow_suite( # # The variables may still need to be manually adjusted, but once that is # done, the script should then be ready to be executed (i.e. ported). + # FIXME # t.script = job.file - with open(Path(as_conf.get_project_dir(), job.file)) as f: - t.script = f.read() + # with open(Path(as_conf.get_project_dir(), job.file)) as f: + # t.script = f.read() + t.script = 'sleep 5' return s -- GitLab